seqmat
SeqMat — fast, vectorized genomic sequences with first-class mutation tracking.
1"""SeqMat — fast, vectorized genomic sequences with first-class mutation tracking.""" 2 3__version__ = "1.5.1" 4__author__ = "Nicolas Lynn Vila" 5__email__ = "nicolasalynn@gmail.com" 6 7# Core classes 8from .seqmat import SeqMat 9from .gene import Gene 10from .transcript import Transcript 11 12# Config / paths 13from .config import ( 14 get_config_dir, 15 get_config_file, 16 get_data_base, 17 get_data_dir, 18 get_default_organism, 19 load_config, 20 save_config, 21) 22 23# Optional LMDB backend 24from .lmdb_store import build_lmdb 25 26# Position → gene lookup 27from .locator import build_location_index, gene_names_at_position 28 29# Data setup 30from .data_setup import set_fasta_path, setup_genomics_data 31 32# Discovery 33from .discovery import ( 34 available_genes, 35 count_genes, 36 data_summary, 37 get_all_genes, 38 get_gene_list, 39 get_organism_info, 40 list_available_organisms, 41 list_gene_biotypes, 42 list_supported_organisms, 43 print_data_summary, 44 search_genes, 45) 46 47# Errors 48from .errors import ( 49 DataUnavailableError, 50 GeneNotFoundError, 51 OrganismNotConfiguredError, 52 PrebuiltDataUnavailableError, 53 SeqMatError, 54) 55 56__all__ = [ 57 # Core 58 "SeqMat", "Gene", "Transcript", 59 # Config / paths 60 "get_default_organism", "get_data_dir", "get_config_dir", 61 "get_config_file", "get_data_base", "load_config", "save_config", 62 # Setup 63 "setup_genomics_data", "set_fasta_path", 64 # Discovery 65 "list_available_organisms", "list_supported_organisms", "get_organism_info", 66 "list_gene_biotypes", "count_genes", "get_gene_list", "get_all_genes", 67 "data_summary", "print_data_summary", "search_genes", "available_genes", 68 # Position lookup 69 "gene_names_at_position", "build_location_index", 70 # LMDB 71 "build_lmdb", 72 # Errors 73 "SeqMatError", "GeneNotFoundError", "OrganismNotConfiguredError", 74 "DataUnavailableError", "PrebuiltDataUnavailableError", 75]
46@dataclass(slots=True) 47class SeqMat: 48 """Sequence matrix with indices, mutations (SNP/ins/del), complement, slicing, remove_regions, from_fasta.""" 49 50 name: str = field(default="wild_type") 51 version: str = field(default="1.0") 52 source: str = field(default="Unknown") 53 notes: dict = field(default_factory=dict) 54 55 seq_array: np.ndarray = field(init=False, repr=False) 56 insertions: Dict[int, List[Tuple[int, str]]] = field(default_factory=lambda: defaultdict(list), init=False, repr=False) 57 mutations: List[Dict[str, Any]] = field(default_factory=list, init=False, repr=False) 58 mutated_positions: set[int] = field(default_factory=set, init=False, repr=False) 59 rev: bool = field(default=False, init=False, repr=False) 60 predicted_splicing: Optional["pd.DataFrame"] = field(default=None, init=False, repr=False) 61 _cow: bool = field(default=False, init=False, repr=False) 62 63 def __init__( 64 self, 65 nucleotides: Union[str, np.ndarray] = "", 66 indices: Optional[np.ndarray] = None, 67 conservation: Optional[np.ndarray] = None, 68 reference: Optional[Union[str, np.ndarray]] = None, 69 name: str = 'wild_type', 70 source: Optional[str] = None, 71 version: str = '1.0', 72 notes: Optional[dict] = None, 73 rev: bool = False, 74 seq: Optional[str] = None, 75 ) -> None: 76 if seq is not None and not nucleotides: 77 nucleotides = seq 78 self.name = name 79 self.version = version 80 self.source = source or "Unknown" 81 self.notes = notes or {} 82 self.rev = rev 83 self.predicted_splicing = None 84 self._cow = False 85 self.insertions = defaultdict(list) 86 self.mutations = [] 87 self.mutated_positions = set() 88 89 # Prepare sequence 90 if isinstance(nucleotides, str): 91 nts = np.frombuffer(nucleotides.encode('ascii'), dtype='S1') 92 else: 93 nts = np.asarray(nucleotides, dtype='S1') 94 L = len(nts) 95 96 # Indices default to 1-based 97 if indices is None: 98 indices = np.arange(1, L+1, dtype=np.int64) 99 else: 100 indices = np.asarray(indices, dtype=np.int64) 101 if len(indices) != L: 102 raise ValueError(f"Indices length {len(indices)} != sequence length {L}") 103 104 # Structured dtype 105 dtype = np.dtype([ 106 ('nt', 'S1'), ('index', np.int64), ('subidx', np.int16), 107 ('ref', 'S1'), ('cons', np.float32), ('mut_type', 'S10'), ('valid', bool) 108 ]) 109 arr = np.zeros(L, dtype=dtype) 110 arr['nt'] = nts 111 arr['index'] = indices 112 arr['subidx'] = 0 113 114 # Reference 115 if reference is None: 116 arr['ref'] = nts 117 else: 118 if isinstance(reference, str): 119 ref_arr = np.array(list(reference), dtype='S1') 120 else: 121 ref_arr = np.array(reference, dtype='S1') 122 if len(ref_arr) != L: 123 raise ValueError("Reference length mismatch") 124 arr['ref'] = ref_arr 125 126 # Conservation 127 if conservation is not None: 128 cons = np.asarray(conservation, dtype=np.float32) 129 if len(cons) != L: 130 raise ValueError("Conservation length mismatch") 131 arr['cons'] = cons 132 else: 133 arr['cons'] = 0.0 134 135 arr['mut_type'] = b'' 136 arr['valid'] = arr['nt'] != b'-' 137 self.seq_array = arr 138 139 @classmethod 140 def from_fasta( 141 cls, 142 genome: str, 143 chrom: str, 144 start: int, 145 end: int, 146 source_fasta: Optional[Path] = None, 147 **kwargs 148 ) -> SeqMat: 149 """ 150 Load a genomic interval from FASTA. 151 152 Args: 153 genome: Genome identifier (e.g. 'hg38') 154 chrom: Chromosome name 155 start: Start position (1-based) 156 end: End position (1-based, inclusive) 157 source_fasta: Path to the FASTA file. If None, uses 'fasta_full_genome' from config. 158 **kwargs: Additional arguments for SeqMat constructor 159 160 Returns: 161 SeqMat object containing the requested sequence 162 """ 163 if source_fasta is None: 164 config = get_organism_config(genome) 165 # Prefer single full-genome FASTA (no per-chromosome split needed) 166 source_fasta = config.get('fasta_full_genome') or config.get('fasta') 167 if source_fasta is None: 168 # Fall back to per-chromosome files for backward compatibility 169 chrom_source = config.get('CHROM_SOURCE') 170 if chrom_source: 171 chrom_path = Path(chrom_source) 172 chrom_file = chrom_path / f"{chrom}.fasta" 173 if chrom_file.exists(): 174 source_fasta = chrom_file 175 if source_fasta is None: 176 raise ValueError( 177 f"No FASTA configured for genome '{genome}'. Set 'fasta_full_genome' or 'fasta' in config, " 178 f"or provide per-chromosome files in CHROM_SOURCE. Run setup_genomics_data() or set_fasta_path()." 179 ) 180 181 seq = _fetch_fasta_region(str(source_fasta), chrom, start, end) 182 indices = np.arange(start, end+1, dtype=np.int64) 183 return cls(nucleotides=seq, indices=indices, name=f"{chrom}:{start}-{end}", source=genome, **kwargs) 184 185 @classmethod 186 def from_fasta_file(cls, fasta_path: Union[str, Path], chrom: str, start: int, end: int, **kwargs) -> SeqMat: 187 """Load a genomic interval directly from a FASTA file path""" 188 seq = _fetch_fasta_region(str(fasta_path), chrom, start, end) 189 indices = np.arange(start, end+1, dtype=np.int64) 190 return cls(nucleotides=seq, indices=indices, name=f"{chrom}:{start}-{end}", **kwargs) 191 192 def __len__(self) -> int: 193 """Return the number of valid bases in the sequence.""" 194 return int(self.seq_array['valid'].sum()) 195 196 def __repr__(self) -> str: 197 """Return a concise representation of the SeqMat object.""" 198 return f"<SeqMat {self.name}: {len(self)} bp, {len(self.mutated_positions)} muts>" 199 200 @property 201 def seq(self) -> str: 202 """Return the current sequence as a string (only valid bases).""" 203 return self.seq_array['nt'][self.seq_array['valid']].tobytes().decode() 204 205 @property 206 def reference_seq(self) -> str: 207 """Return the reference sequence as a string (only valid bases).""" 208 return self.seq_array['ref'][self.seq_array['valid']].tobytes().decode() 209 210 def alignment(self) -> Tuple[str, str]: 211 """Return gapped reference and mutated sequences aligned to each other. 212 213 Deletions appear as '-' in the mutated string, insertions appear as '-' 214 in the reference string, and SNPs show the differing bases at the same 215 position. 216 217 Returns: 218 (reference, mutated) tuple of equal-length strings. 219 """ 220 arr = self.seq_array 221 ref = arr['ref'].tobytes().decode() 222 mut = np.where(arr['valid'], arr['nt'], b'-').tobytes().decode() 223 return ref, mut 224 225 @property 226 def index(self) -> np.ndarray: 227 """Return the genomic indices of valid bases.""" 228 return self.seq_array['index'][self.seq_array['valid']] 229 230 @property 231 def mutation_vector(self) -> np.ndarray: 232 """Return a binary vector indicating mutated positions (1=mutated, 0=not mutated).""" 233 valid_mask = self.seq_array['valid'] 234 indices = self.seq_array['index'][valid_mask] 235 mutated_array = np.array(list(self.mutated_positions), dtype=np.int64) 236 return np.isin(indices, mutated_array).astype(np.int8) 237 238 def _refresh_mutation_state(self) -> None: 239 """Update mutation tracking based on current state.""" 240 self._ensure_writable() 241 # Clear 242 self.seq_array['mut_type'] = b'' 243 self.mutated_positions.clear() 244 # SNPs 245 snp = (self.seq_array['ref'] != self.seq_array['nt']) & self.seq_array['valid'] 246 self.seq_array['mut_type'][snp] = b'snp' 247 self.mutated_positions.update(self.seq_array['index'][snp].tolist()) 248 # insertions 249 for pos in self.insertions: 250 self.mutated_positions.add(pos) 251 252 def _ensure_writable(self) -> None: 253 """Copy seq_array if it is shared via copy-on-write.""" 254 if self._cow: 255 self.seq_array = self.seq_array.copy() 256 self._cow = False 257 258 def __deepcopy__(self, memo: dict) -> SeqMat: 259 """COW-aware deep copy — shares seq_array, copies on first write.""" 260 new = object.__new__(type(self)) 261 new.name = self.name 262 new.version = self.version 263 new.source = self.source 264 new.notes = copy.deepcopy(self.notes, memo) 265 new.rev = self.rev 266 new.predicted_splicing = copy.deepcopy(self.predicted_splicing, memo) 267 new.insertions = copy.deepcopy(self.insertions, memo) 268 new.mutations = copy.deepcopy(self.mutations, memo) 269 new.mutated_positions = set(self.mutated_positions) 270 # COW: share seq_array 271 new.seq_array = self.seq_array 272 new._cow = True 273 self._cow = True 274 return new 275 276 def clone(self, start: Optional[int] = None, end: Optional[int] = None) -> SeqMat: 277 """ 278 Create a copy of this SeqMat, optionally sliced to a specific range. 279 280 Args: 281 start: Start position (genomic coordinate) 282 end: End position (genomic coordinate) 283 284 Returns: 285 A new SeqMat object 286 """ 287 new = copy.copy(self) 288 new.notes = copy.deepcopy(self.notes) 289 new.insertions = copy.deepcopy(self.insertions) 290 new.mutations = copy.deepcopy(self.mutations) 291 new.mutated_positions = set(self.mutated_positions) 292 if start is not None or end is not None: 293 lo = start if start is not None else self.index.min() 294 hi = end if end is not None else self.index.max() 295 lo, hi = min(lo, hi), max(lo, hi) 296 mask = (self.seq_array['index'] >= lo) & (self.seq_array['index'] <= hi) 297 new.seq_array = self.seq_array[mask].copy() 298 new._cow = False 299 else: 300 # COW: share seq_array, copy on first write 301 new.seq_array = self.seq_array 302 new._cow = True 303 self._cow = True 304 return new 305 306 def __getitem__( 307 self, key: Union[int, slice, Tuple[int, int]] 308 ) -> Union[np.void, SeqMat]: 309 """ 310 Access sequence by genomic position or range. 311 312 Args: 313 key: Position (int), slice, or (start, end) tuple 314 315 Returns: 316 Single base record (if int) or new SeqMat (if slice/tuple) 317 """ 318 if isinstance(key, int): 319 mask = self.seq_array['index'] == key 320 if not mask.any(): 321 raise KeyError(f"{key} not in SeqMat") 322 return self.seq_array[mask][0] 323 if isinstance(key, slice) or (isinstance(key, tuple) and len(key) == 2): 324 if isinstance(key, slice): 325 lo, hi = key.start, key.stop 326 else: 327 lo, hi = key 328 return self.clone(lo, hi) 329 raise TypeError("Index must be int, slice or (start,end)") 330 331 def _classify_mutation(self, ref: str, alt: str) -> str: 332 """Classify a mutation type based on reference and alternate alleles.""" 333 if ref == '-': 334 return 'ins' 335 if alt == '-': 336 return 'del' 337 if len(ref) == len(alt) == 1: 338 return 'snp' 339 return 'complex' 340 341 @staticmethod 342 def _all_simple_snps(muts: List[Mutation]) -> bool: 343 """Fast guard: True iff every mutation is a single-base SNP (no '-').""" 344 # Tight C-level any/all over a generator — faster than an explicit loop. 345 DASH = '-' 346 return all( 347 len(ref) == 1 and len(alt) == 1 and ref != DASH and alt != DASH 348 for _, ref, alt in muts 349 ) 350 351 def _validate_mutation_batch( 352 self, 353 muts: List[Mutation], 354 *, 355 allow_multiple_insertions: bool = True 356 ) -> bool: 357 """ 358 Ensure no two mutations in batch have overlapping reference spans. 359 360 Args: 361 muts: List of (pos, ref, alt) tuples 362 allow_multiple_insertions: Whether to allow multiple insertions at same position 363 364 Returns: 365 True if valid, False if conflicts found 366 """ 367 # Fast path: when every mutation is a single-base SNP, two of them conflict 368 # iff they share a position. O(N) — no span construction, no sort. 369 if self._all_simple_snps(muts): 370 seen: dict = {} 371 for i, (pos, _, _) in enumerate(muts): 372 if pos in seen: 373 import logging as _logging 374 _logging.getLogger(__name__).warning( 375 "Found conflicting mutations:\n #%d: %s <--> #%d: %s", 376 seen[pos], muts[seen[pos]], i, muts[i], 377 ) 378 return False 379 seen[pos] = i 380 return True 381 382 # General path: sweep-line sort by start; an interval overlaps with an 383 # earlier one iff it overlaps with the max-end-so-far one. 384 spans = [] 385 for i, (pos, ref, alt) in enumerate(muts): 386 is_ins = (ref == '-') 387 if is_ins: 388 start, end = pos, pos 389 else: 390 start, end = pos, pos + len(ref) - 1 391 spans.append((start, end, i, is_ins)) 392 spans.sort(key=lambda s: (s[0], s[1])) 393 394 conflicts: List[Tuple[int, int]] = [] 395 max_end = -1 396 max_end_owner = -1 397 max_end_is_ins = False 398 for sb, eb, ib, is_ins_b in spans: 399 if sb <= max_end: 400 if not (is_ins_b and max_end_is_ins and allow_multiple_insertions): 401 conflicts.append((max_end_owner, ib)) 402 if eb > max_end: 403 max_end = eb 404 max_end_owner = ib 405 max_end_is_ins = is_ins_b 406 407 if conflicts: 408 import logging as _logging 409 lines = ["Found conflicting mutations:"] 410 for ia, ib in conflicts: 411 lines.append(f" #{ia}: {muts[ia]} <--> #{ib}: {muts[ib]}") 412 _logging.getLogger(__name__).warning("\n".join(lines)) 413 return False 414 415 return True 416 417 def apply_mutations( 418 self, 419 mutations: Union[Mutation, List[Mutation]], 420 *, 421 permissive_ref: bool = False 422 ) -> SeqMat: 423 """ 424 Apply SNPs/insertions/deletions to the sequence. 425 426 Mutations are always defined relative to the positive strand. If the sequence 427 is currently on the negative strand, it will be temporarily converted to the 428 positive strand for mutation application, then converted back. 429 430 Args: 431 mutations: Single mutation or list of (pos, ref, alt) tuples 432 permissive_ref: If True, skip reference validation 433 434 Returns: 435 Self (for chaining) 436 """ 437 if isinstance(mutations, tuple): 438 mutations = [mutations] 439 if not self._validate_mutation_batch(mutations): 440 return self 441 442 self._ensure_writable() 443 444 # Track original strand state 445 was_on_negative_strand = self.rev 446 447 # If on negative strand, convert to positive strand for mutation application 448 if was_on_negative_strand: 449 self.reverse_complement() # Convert to positive strand 450 451 # Snapshot per-batch derived state once instead of recomputing per mutation. 452 valid_index = self.seq_array['index'][self.seq_array['valid']] 453 index_min = int(valid_index[0]) if len(valid_index) else 0 454 index_max = int(valid_index[-1]) if len(valid_index) else -1 455 # Coordinates are sorted (ascending or descending). Bound-check covers both forms. 456 if index_min > index_max: 457 index_min, index_max = index_max, index_min 458 459 # Normalize, classify, and bucket. Pure SNPs (len(ref)==len(alt)==1, neither '-') 460 # share one vectorized fast path; everything else falls through per-mutation. 461 snp_batch: List[Tuple[int, str, str]] = [] 462 other: List[Tuple[int, str, str, str]] = [] 463 snp_records: List[Dict[str, Any]] = [] 464 other_records: List[Dict[str, Any]] = [] 465 try: 466 for pos, ref, alt in mutations: 467 while ref != '-' and alt != '-' and ref[0] == alt[0]: 468 pos += 1 469 ref = ref[1:] or '-' 470 alt = alt[1:] or '-' 471 if ref == '-' and alt == '-': 472 continue 473 if pos < index_min or pos > index_max: 474 continue 475 if ref != '-' and alt != '-' and len(ref) == 1 and len(alt) == 1: 476 snp_batch.append((pos, ref, alt)) 477 snp_records.append({'pos': pos, 'ref': ref, 'alt': alt, 'type': 'snp'}) 478 else: 479 typ = self._classify_mutation(ref, alt) 480 other.append((pos, ref, alt, typ)) 481 other_records.append({'pos': pos, 'ref': ref, 'alt': alt, 'type': typ}) 482 483 if snp_batch: 484 applied = self._substitute_batch(snp_batch, permissive_ref) 485 self.mutations.extend(rec for rec, ok in zip(snp_records, applied) if ok) 486 487 for (pos, ref, alt, typ), rec in zip(other, other_records): 488 if typ == 'snp': 489 self._substitute(pos, ref, alt, permissive_ref) 490 elif typ == 'ins': 491 self._insert(pos, alt) 492 elif typ == 'del': 493 self._delete(pos, ref) 494 elif typ == 'complex': 495 self._delete(pos, ref) 496 self._insert(pos, alt) 497 self.mutations.append(rec) 498 finally: 499 if was_on_negative_strand: 500 self.reverse_complement() 501 502 # Full refresh is O(seq length); skip when we only ran SNPs (mutated_positions 503 # was already updated in _substitute_batch and mut_type was set in place). 504 if other: 505 self._refresh_mutation_state() 506 return self 507 508 def _substitute_batch( 509 self, snps: List[Tuple[int, str, str]], permissive: bool 510 ) -> np.ndarray: 511 """Apply many single-base substitutions in one vectorized pass. 512 513 All entries must be (pos, ref, alt) with ``len(ref) == len(alt) == 1`` and 514 neither equal to the ``'-'`` sentinel. Returns a bool array of length 515 ``len(snps)`` indicating which mutations were actually applied (positions 516 that don't exist in the index — e.g. removed by splicing — are skipped). 517 """ 518 m = len(snps) 519 if m == 0: 520 return np.zeros(0, dtype=bool) 521 positions = np.fromiter((s[0] for s in snps), dtype=np.int64, count=m) 522 refs = np.array([s[1].encode() for s in snps], dtype='S1') 523 alts = np.array([s[2].encode() for s in snps], dtype='S1') 524 525 arr_idx = self.seq_array['index'] 526 n = len(arr_idx) 527 applied = np.zeros(m, dtype=bool) 528 if n == 0: 529 return applied 530 531 idxs = np.searchsorted(arr_idx, positions, side='left') 532 bounded = np.clip(idxs, 0, n - 1) 533 present = (idxs < n) & (arr_idx[bounded] == positions) 534 if not present.any(): 535 return applied 536 537 sel_idxs = idxs[present] 538 sel_refs = refs[present] 539 sel_alts = alts[present] 540 541 if not permissive: 542 actual = self.seq_array['ref'][sel_idxs] 543 mismatch = actual != sel_refs 544 if mismatch.any(): 545 bad_pos = int(positions[present][mismatch][0]) 546 raise ValueError( 547 f"Ref mismatch @{bad_pos}. Use apply_mutations(..., permissive_ref=True) to skip reference validation." 548 ) 549 550 self.seq_array['nt'][sel_idxs] = sel_alts 551 self.seq_array['mut_type'][sel_idxs] = b'snp' 552 # Update mutated_positions incrementally — avoids a full O(N) refresh later. 553 self.mutated_positions.update(arr_idx[sel_idxs].tolist()) 554 applied[present] = True 555 return applied 556 557 def _substitute(self, pos: int, ref: str, alt: str, permissive: bool) -> None: 558 """Apply a substitution mutation.""" 559 arr_idx = self.seq_array['index'] 560 i = int(np.searchsorted(arr_idx, pos, side='left')) 561 if i >= len(arr_idx) or arr_idx[i] != pos: 562 return 563 rbytes = np.array(list(ref), dtype='S1') 564 if not permissive and not np.array_equal(self.seq_array["ref"][i : i + len(rbytes)], rbytes): 565 raise ValueError( 566 f"Ref mismatch @{pos}. Use apply_mutations(..., permissive_ref=True) to skip reference validation." 567 ) 568 self.seq_array['nt'][i:i+len(rbytes)] = np.array(list(alt), dtype='S1') 569 self.seq_array['mut_type'][i] = b'snp' 570 571 def _insert(self, pos: int, seq: str) -> None: 572 """Apply an insertion mutation.""" 573 subid = len(self.insertions[pos]) + 1 574 self.insertions[pos].append((subid, seq)) 575 entries = [] 576 for nt in seq: 577 e = np.zeros(1, dtype=self.seq_array.dtype)[0] 578 e['nt'] = nt.encode() 579 e['index'] = pos 580 e['subidx'] = subid 581 e['ref'] = b'-' 582 e['cons'] = 0 583 e['mut_type'] = b'ins' 584 e['valid'] = True 585 entries.append(e) 586 i = np.searchsorted(self.seq_array['index'], pos, 'right') 587 # Explicit dtype for NumPy 2.x: list of np.void must be stacked with same dtype 588 entries_arr = np.array(entries, dtype=self.seq_array.dtype) 589 self.seq_array = np.concatenate([self.seq_array[:i], entries_arr, self.seq_array[i:]]) 590 591 def _delete(self, pos: int, ref: str) -> None: 592 """Apply a deletion mutation.""" 593 for i, base in enumerate(ref): 594 mask = (self.seq_array['index'] == pos + i) & (self.seq_array['subidx'] == 0) 595 self.seq_array['valid'][mask] = False 596 self.seq_array['mut_type'][mask] = b'del' 597 598 @staticmethod 599 def _complement_nt(nt_field: np.ndarray) -> np.ndarray: 600 """Complement nucleotide bytes via single-pass LUT (A<->T, C<->G).""" 601 return _COMPLEMENT_LUT[nt_field.view(np.uint8)].view('S1') 602 603 def _complement_in_place(self) -> None: 604 """Complement ``nt`` (and ``ref``) in place using ``bytes.translate``. 605 606 ~10x faster than numpy fancy indexing for the typical sequence sizes we see. 607 """ 608 nt = self.seq_array['nt'] 609 nt[:] = np.frombuffer(nt.tobytes().translate(_BYTES_COMPLEMENT), dtype='S1') 610 611 def complement(self, copy: bool = False) -> SeqMat: 612 """Complement the sequence (A<->T, C<->G) in place or return a copy.""" 613 target = self.clone() if copy else self 614 target._ensure_writable() 615 target._complement_in_place() 616 return target 617 618 def reverse_complement(self, copy: bool = False) -> SeqMat: 619 """Reverse-complement the sequence in place or return a copy. 620 621 Reverses each column of the structured array (column-by-column is faster 622 than reversing the full 27-byte-per-record array as one slab). 623 """ 624 target = self.clone() if copy else self 625 target._ensure_writable() 626 src = target.seq_array 627 # Complement nt via bytes.translate, then reverse all columns into a fresh array. 628 rc_nt = src['nt'].tobytes().translate(_BYTES_COMPLEMENT)[::-1] 629 new = np.empty_like(src) 630 new['nt'] = np.frombuffer(rc_nt, dtype='S1') 631 for col in src.dtype.names: 632 if col == 'nt': 633 continue 634 new[col] = src[col][::-1] 635 target.seq_array = new 636 target.rev = not target.rev 637 return target 638 639 def remove_regions(self, regions: List[Tuple[int, int]]) -> SeqMat: 640 """ 641 Excise given genomic intervals (inclusive on both ends). 642 643 Args: 644 regions: List of (start, end) tuples to remove 645 646 Returns: 647 New SeqMat with regions removed 648 """ 649 src = self.seq_array 650 indices = src['index'] 651 n = len(indices) 652 remove = np.zeros(n, dtype=bool) 653 654 ascending = n < 2 or indices[0] <= indices[-1] 655 sorted_idx = indices if ascending else indices[::-1] 656 657 for lo, hi in regions: 658 lo, hi = min(lo, hi), max(lo, hi) 659 i = np.searchsorted(sorted_idx, lo) 660 j = np.searchsorted(sorted_idx, hi, side='right') 661 if ascending: 662 remove[i:j] = True 663 else: 664 remove[n - j:n - i] = True 665 666 keep = ~remove 667 kept_n = int(keep.sum()) 668 669 # Build the new SeqMat directly — avoid the COW clone + per-column structured 670 # boolean-mask copy (which materialises 27 bytes per surviving row). Doing 671 # the keep-filter column by column is ~2-3x faster on large arrays. 672 new = copy.copy(self) 673 new.notes = copy.deepcopy(self.notes) 674 new.insertions = copy.deepcopy(self.insertions) 675 new.mutations = list(self.mutations) 676 new.mutated_positions = set(self.mutated_positions) 677 new._cow = False 678 new_array = np.empty(kept_n, dtype=src.dtype) 679 for col in src.dtype.names: 680 new_array[col] = src[col][keep] 681 new.seq_array = new_array 682 return new 683 684 def summary(self) -> str: 685 """Return a summary of the SeqMat object.""" 686 return (f"SeqMat '{self.name}': {len(self)}bp valid, mutations={len(self.mutations)}, " 687 f"inserts at {list(self.insertions.keys())}") 688 689 def to_dict(self) -> Dict[str, Any]: 690 """Convert SeqMat to a dictionary representation.""" 691 return { 692 'name': self.name, 693 'sequence': self.seq, 694 'reference': self.reference_seq, 695 'mutations': self.mutations, 696 'length': len(self), 697 'mutated_positions': list(self.mutated_positions) 698 } 699 700 def to_fasta(self, wrap: int = 80) -> str: 701 """ 702 Export sequence in FASTA format. 703 704 Args: 705 wrap: Line length for sequence wrapping 706 707 Returns: 708 FASTA-formatted string 709 """ 710 header = f">{self.name}" 711 if self.mutations: 712 header += f" mutations={len(self.mutations)}" 713 714 seq = self.seq 715 lines = [header] 716 for i in range(0, len(seq), wrap): 717 lines.append(seq[i:i+wrap]) 718 719 return '\n'.join(lines) 720 721 def reset_mutation_vector(self) -> None: 722 """ 723 Reset mutation tracking (mutated_positions, mut_type) while preserving 724 the current nucleotides (nt) and reference info. 725 This means the current sequence becomes the new baseline. 726 """ 727 self._ensure_writable() 728 # Clear existing mutation annotations 729 self.seq_array['ref'] = self.seq_array['nt'] # new reference = current nt 730 self.seq_array['mut_type'] = b'' # clear mutation type 731 self.insertions.clear() # clear insertion tracking 732 self.mutations.clear() # clear mutation history 733 self.mutated_positions.clear() # clear mutation set 734 735 736 # ---------- Fast I/O ---------- 737 def save_seqmat(self, path: Union[str, Path], *, compressed: bool = False) -> Path: 738 """ 739 Save the entire SeqMat to a single .npz (fast). 740 - The structured seq_array is stored verbatim (zero copy conversion). 741 - Python containers (insertions, mutations, mutated_positions, notes) are pickled. 742 - If predicted_splicing exists, it is saved to a sibling .parquet file. 743 744 Args: 745 path: target file path ('.npz' will be appended if missing) 746 compressed: if True, use np.savez_compressed (slower, smaller) 747 748 Returns: 749 The written .npz Path 750 """ 751 path = Path(path) 752 if path.suffix.lower() != ".npz": 753 path = path.with_suffix(".npz") 754 755 meta = { 756 "name": self.name, 757 "version": self.version, 758 "source": self.source, 759 "rev": self.rev, 760 "has_predicted_splicing": self.predicted_splicing is not None, 761 } 762 763 # For maximum speed, use np.savez (zip, no deflate). Set allow_pickle=True for load. 764 saver = np.savez_compressed if compressed else np.savez 765 766 # Write atomically 767 tmp = path.with_suffix(".npz.tmp") 768 saver( 769 tmp, 770 seq_array=self.seq_array, # structured array 771 meta=np.array(meta, dtype=object), # small dict (pickled) 772 notes=np.array(self.notes, dtype=object), # user metadata (pickled) 773 insertions=np.array(dict(self.insertions), dtype=object),# defaultdict(list) (pickled) 774 mutations=np.array(self.mutations, dtype=object), # list[dict] (pickled) 775 mutated_positions=np.array(list(self.mutated_positions), dtype=np.int64), 776 ) 777 tmp.replace(path) 778 779 # Save DataFrame separately if present (fast columnar) 780 if self.predicted_splicing is not None: 781 ppath = path.with_suffix(".parquet") 782 # Use pandas fast parquet writer if available 783 try: 784 self.predicted_splicing.to_parquet(ppath, index=False) 785 except Exception: 786 # Feather as fallback 787 self.predicted_splicing.reset_index(drop=True).to_feather(ppath.with_suffix(".feather")) 788 789 return path 790 791 @classmethod 792 def read_seqmat(cls, path: Union[str, Path]) -> "SeqMat": 793 """ 794 Load a SeqMat saved by save_seqmat() (fast). 795 Returns: 796 SeqMat instance with all fields restored. 797 """ 798 path = Path(path) 799 if path.suffix.lower() != ".npz": 800 path = path.with_suffix(".npz") 801 802 with np.load(path, allow_pickle=True) as z: 803 seq_array = z["seq_array"] 804 meta = z["meta"].item() if isinstance(z["meta"], np.ndarray) else z["meta"] 805 notes = z["notes"].item() if isinstance(z["notes"], np.ndarray) else z["notes"] 806 807 # Containers 808 ins_obj = z["insertions"].item() if isinstance(z["insertions"], np.ndarray) else z["insertions"] 809 mut_list = z["mutations"].tolist() if hasattr(z["mutations"], "tolist") else z["mutations"] 810 mutated_positions = set(map(int, z["mutated_positions"].tolist())) 811 812 # Build a blank instance and populate directly to avoid recomputing arrays 813 obj = object.__new__(cls) 814 # Metadata 815 obj.name = meta.get("name", "wild_type") 816 obj.version = meta.get("version", "1.0") 817 obj.source = meta.get("source", "Unknown") 818 obj.notes = notes if isinstance(notes, dict) else dict(notes) 819 obj.rev = bool(meta.get("rev", False)) 820 obj.predicted_splicing = None 821 822 # Core storage 823 obj.seq_array = seq_array 824 # Ensure types 825 from collections import defaultdict 826 dl = defaultdict(list) 827 dl.update(ins_obj if isinstance(ins_obj, dict) else dict(ins_obj)) 828 obj.insertions = dl 829 obj.mutations = list(mut_list) if isinstance(mut_list, (list, tuple)) else [] 830 obj.mutated_positions = mutated_positions 831 obj._cow = False 832 833 # Keep slots that are not persisted explicitly in a clean state 834 # (no additional refresh needed; seq_array already contains nt/ref/valid/mut_type) 835 return obj
Sequence matrix with indices, mutations (SNP/ins/del), complement, slicing, remove_regions, from_fasta.
63 def __init__( 64 self, 65 nucleotides: Union[str, np.ndarray] = "", 66 indices: Optional[np.ndarray] = None, 67 conservation: Optional[np.ndarray] = None, 68 reference: Optional[Union[str, np.ndarray]] = None, 69 name: str = 'wild_type', 70 source: Optional[str] = None, 71 version: str = '1.0', 72 notes: Optional[dict] = None, 73 rev: bool = False, 74 seq: Optional[str] = None, 75 ) -> None: 76 if seq is not None and not nucleotides: 77 nucleotides = seq 78 self.name = name 79 self.version = version 80 self.source = source or "Unknown" 81 self.notes = notes or {} 82 self.rev = rev 83 self.predicted_splicing = None 84 self._cow = False 85 self.insertions = defaultdict(list) 86 self.mutations = [] 87 self.mutated_positions = set() 88 89 # Prepare sequence 90 if isinstance(nucleotides, str): 91 nts = np.frombuffer(nucleotides.encode('ascii'), dtype='S1') 92 else: 93 nts = np.asarray(nucleotides, dtype='S1') 94 L = len(nts) 95 96 # Indices default to 1-based 97 if indices is None: 98 indices = np.arange(1, L+1, dtype=np.int64) 99 else: 100 indices = np.asarray(indices, dtype=np.int64) 101 if len(indices) != L: 102 raise ValueError(f"Indices length {len(indices)} != sequence length {L}") 103 104 # Structured dtype 105 dtype = np.dtype([ 106 ('nt', 'S1'), ('index', np.int64), ('subidx', np.int16), 107 ('ref', 'S1'), ('cons', np.float32), ('mut_type', 'S10'), ('valid', bool) 108 ]) 109 arr = np.zeros(L, dtype=dtype) 110 arr['nt'] = nts 111 arr['index'] = indices 112 arr['subidx'] = 0 113 114 # Reference 115 if reference is None: 116 arr['ref'] = nts 117 else: 118 if isinstance(reference, str): 119 ref_arr = np.array(list(reference), dtype='S1') 120 else: 121 ref_arr = np.array(reference, dtype='S1') 122 if len(ref_arr) != L: 123 raise ValueError("Reference length mismatch") 124 arr['ref'] = ref_arr 125 126 # Conservation 127 if conservation is not None: 128 cons = np.asarray(conservation, dtype=np.float32) 129 if len(cons) != L: 130 raise ValueError("Conservation length mismatch") 131 arr['cons'] = cons 132 else: 133 arr['cons'] = 0.0 134 135 arr['mut_type'] = b'' 136 arr['valid'] = arr['nt'] != b'-' 137 self.seq_array = arr
139 @classmethod 140 def from_fasta( 141 cls, 142 genome: str, 143 chrom: str, 144 start: int, 145 end: int, 146 source_fasta: Optional[Path] = None, 147 **kwargs 148 ) -> SeqMat: 149 """ 150 Load a genomic interval from FASTA. 151 152 Args: 153 genome: Genome identifier (e.g. 'hg38') 154 chrom: Chromosome name 155 start: Start position (1-based) 156 end: End position (1-based, inclusive) 157 source_fasta: Path to the FASTA file. If None, uses 'fasta_full_genome' from config. 158 **kwargs: Additional arguments for SeqMat constructor 159 160 Returns: 161 SeqMat object containing the requested sequence 162 """ 163 if source_fasta is None: 164 config = get_organism_config(genome) 165 # Prefer single full-genome FASTA (no per-chromosome split needed) 166 source_fasta = config.get('fasta_full_genome') or config.get('fasta') 167 if source_fasta is None: 168 # Fall back to per-chromosome files for backward compatibility 169 chrom_source = config.get('CHROM_SOURCE') 170 if chrom_source: 171 chrom_path = Path(chrom_source) 172 chrom_file = chrom_path / f"{chrom}.fasta" 173 if chrom_file.exists(): 174 source_fasta = chrom_file 175 if source_fasta is None: 176 raise ValueError( 177 f"No FASTA configured for genome '{genome}'. Set 'fasta_full_genome' or 'fasta' in config, " 178 f"or provide per-chromosome files in CHROM_SOURCE. Run setup_genomics_data() or set_fasta_path()." 179 ) 180 181 seq = _fetch_fasta_region(str(source_fasta), chrom, start, end) 182 indices = np.arange(start, end+1, dtype=np.int64) 183 return cls(nucleotides=seq, indices=indices, name=f"{chrom}:{start}-{end}", source=genome, **kwargs)
Load a genomic interval from FASTA.
Arguments:
- genome: Genome identifier (e.g. 'hg38')
- chrom: Chromosome name
- start: Start position (1-based)
- end: End position (1-based, inclusive)
- source_fasta: Path to the FASTA file. If None, uses 'fasta_full_genome' from config.
- **kwargs: Additional arguments for SeqMat constructor
Returns:
SeqMat object containing the requested sequence
185 @classmethod 186 def from_fasta_file(cls, fasta_path: Union[str, Path], chrom: str, start: int, end: int, **kwargs) -> SeqMat: 187 """Load a genomic interval directly from a FASTA file path""" 188 seq = _fetch_fasta_region(str(fasta_path), chrom, start, end) 189 indices = np.arange(start, end+1, dtype=np.int64) 190 return cls(nucleotides=seq, indices=indices, name=f"{chrom}:{start}-{end}", **kwargs)
Load a genomic interval directly from a FASTA file path
200 @property 201 def seq(self) -> str: 202 """Return the current sequence as a string (only valid bases).""" 203 return self.seq_array['nt'][self.seq_array['valid']].tobytes().decode()
Return the current sequence as a string (only valid bases).
205 @property 206 def reference_seq(self) -> str: 207 """Return the reference sequence as a string (only valid bases).""" 208 return self.seq_array['ref'][self.seq_array['valid']].tobytes().decode()
Return the reference sequence as a string (only valid bases).
210 def alignment(self) -> Tuple[str, str]: 211 """Return gapped reference and mutated sequences aligned to each other. 212 213 Deletions appear as '-' in the mutated string, insertions appear as '-' 214 in the reference string, and SNPs show the differing bases at the same 215 position. 216 217 Returns: 218 (reference, mutated) tuple of equal-length strings. 219 """ 220 arr = self.seq_array 221 ref = arr['ref'].tobytes().decode() 222 mut = np.where(arr['valid'], arr['nt'], b'-').tobytes().decode() 223 return ref, mut
Return gapped reference and mutated sequences aligned to each other.
Deletions appear as '-' in the mutated string, insertions appear as '-' in the reference string, and SNPs show the differing bases at the same position.
Returns:
(reference, mutated) tuple of equal-length strings.
225 @property 226 def index(self) -> np.ndarray: 227 """Return the genomic indices of valid bases.""" 228 return self.seq_array['index'][self.seq_array['valid']]
Return the genomic indices of valid bases.
230 @property 231 def mutation_vector(self) -> np.ndarray: 232 """Return a binary vector indicating mutated positions (1=mutated, 0=not mutated).""" 233 valid_mask = self.seq_array['valid'] 234 indices = self.seq_array['index'][valid_mask] 235 mutated_array = np.array(list(self.mutated_positions), dtype=np.int64) 236 return np.isin(indices, mutated_array).astype(np.int8)
Return a binary vector indicating mutated positions (1=mutated, 0=not mutated).
276 def clone(self, start: Optional[int] = None, end: Optional[int] = None) -> SeqMat: 277 """ 278 Create a copy of this SeqMat, optionally sliced to a specific range. 279 280 Args: 281 start: Start position (genomic coordinate) 282 end: End position (genomic coordinate) 283 284 Returns: 285 A new SeqMat object 286 """ 287 new = copy.copy(self) 288 new.notes = copy.deepcopy(self.notes) 289 new.insertions = copy.deepcopy(self.insertions) 290 new.mutations = copy.deepcopy(self.mutations) 291 new.mutated_positions = set(self.mutated_positions) 292 if start is not None or end is not None: 293 lo = start if start is not None else self.index.min() 294 hi = end if end is not None else self.index.max() 295 lo, hi = min(lo, hi), max(lo, hi) 296 mask = (self.seq_array['index'] >= lo) & (self.seq_array['index'] <= hi) 297 new.seq_array = self.seq_array[mask].copy() 298 new._cow = False 299 else: 300 # COW: share seq_array, copy on first write 301 new.seq_array = self.seq_array 302 new._cow = True 303 self._cow = True 304 return new
Create a copy of this SeqMat, optionally sliced to a specific range.
Arguments:
- start: Start position (genomic coordinate)
- end: End position (genomic coordinate)
Returns:
A new SeqMat object
417 def apply_mutations( 418 self, 419 mutations: Union[Mutation, List[Mutation]], 420 *, 421 permissive_ref: bool = False 422 ) -> SeqMat: 423 """ 424 Apply SNPs/insertions/deletions to the sequence. 425 426 Mutations are always defined relative to the positive strand. If the sequence 427 is currently on the negative strand, it will be temporarily converted to the 428 positive strand for mutation application, then converted back. 429 430 Args: 431 mutations: Single mutation or list of (pos, ref, alt) tuples 432 permissive_ref: If True, skip reference validation 433 434 Returns: 435 Self (for chaining) 436 """ 437 if isinstance(mutations, tuple): 438 mutations = [mutations] 439 if not self._validate_mutation_batch(mutations): 440 return self 441 442 self._ensure_writable() 443 444 # Track original strand state 445 was_on_negative_strand = self.rev 446 447 # If on negative strand, convert to positive strand for mutation application 448 if was_on_negative_strand: 449 self.reverse_complement() # Convert to positive strand 450 451 # Snapshot per-batch derived state once instead of recomputing per mutation. 452 valid_index = self.seq_array['index'][self.seq_array['valid']] 453 index_min = int(valid_index[0]) if len(valid_index) else 0 454 index_max = int(valid_index[-1]) if len(valid_index) else -1 455 # Coordinates are sorted (ascending or descending). Bound-check covers both forms. 456 if index_min > index_max: 457 index_min, index_max = index_max, index_min 458 459 # Normalize, classify, and bucket. Pure SNPs (len(ref)==len(alt)==1, neither '-') 460 # share one vectorized fast path; everything else falls through per-mutation. 461 snp_batch: List[Tuple[int, str, str]] = [] 462 other: List[Tuple[int, str, str, str]] = [] 463 snp_records: List[Dict[str, Any]] = [] 464 other_records: List[Dict[str, Any]] = [] 465 try: 466 for pos, ref, alt in mutations: 467 while ref != '-' and alt != '-' and ref[0] == alt[0]: 468 pos += 1 469 ref = ref[1:] or '-' 470 alt = alt[1:] or '-' 471 if ref == '-' and alt == '-': 472 continue 473 if pos < index_min or pos > index_max: 474 continue 475 if ref != '-' and alt != '-' and len(ref) == 1 and len(alt) == 1: 476 snp_batch.append((pos, ref, alt)) 477 snp_records.append({'pos': pos, 'ref': ref, 'alt': alt, 'type': 'snp'}) 478 else: 479 typ = self._classify_mutation(ref, alt) 480 other.append((pos, ref, alt, typ)) 481 other_records.append({'pos': pos, 'ref': ref, 'alt': alt, 'type': typ}) 482 483 if snp_batch: 484 applied = self._substitute_batch(snp_batch, permissive_ref) 485 self.mutations.extend(rec for rec, ok in zip(snp_records, applied) if ok) 486 487 for (pos, ref, alt, typ), rec in zip(other, other_records): 488 if typ == 'snp': 489 self._substitute(pos, ref, alt, permissive_ref) 490 elif typ == 'ins': 491 self._insert(pos, alt) 492 elif typ == 'del': 493 self._delete(pos, ref) 494 elif typ == 'complex': 495 self._delete(pos, ref) 496 self._insert(pos, alt) 497 self.mutations.append(rec) 498 finally: 499 if was_on_negative_strand: 500 self.reverse_complement() 501 502 # Full refresh is O(seq length); skip when we only ran SNPs (mutated_positions 503 # was already updated in _substitute_batch and mut_type was set in place). 504 if other: 505 self._refresh_mutation_state() 506 return self
Apply SNPs/insertions/deletions to the sequence.
Mutations are always defined relative to the positive strand. If the sequence is currently on the negative strand, it will be temporarily converted to the positive strand for mutation application, then converted back.
Arguments:
- mutations: Single mutation or list of (pos, ref, alt) tuples
- permissive_ref: If True, skip reference validation
Returns:
Self (for chaining)
611 def complement(self, copy: bool = False) -> SeqMat: 612 """Complement the sequence (A<->T, C<->G) in place or return a copy.""" 613 target = self.clone() if copy else self 614 target._ensure_writable() 615 target._complement_in_place() 616 return target
Complement the sequence (A<->T, C<->G) in place or return a copy.
618 def reverse_complement(self, copy: bool = False) -> SeqMat: 619 """Reverse-complement the sequence in place or return a copy. 620 621 Reverses each column of the structured array (column-by-column is faster 622 than reversing the full 27-byte-per-record array as one slab). 623 """ 624 target = self.clone() if copy else self 625 target._ensure_writable() 626 src = target.seq_array 627 # Complement nt via bytes.translate, then reverse all columns into a fresh array. 628 rc_nt = src['nt'].tobytes().translate(_BYTES_COMPLEMENT)[::-1] 629 new = np.empty_like(src) 630 new['nt'] = np.frombuffer(rc_nt, dtype='S1') 631 for col in src.dtype.names: 632 if col == 'nt': 633 continue 634 new[col] = src[col][::-1] 635 target.seq_array = new 636 target.rev = not target.rev 637 return target
Reverse-complement the sequence in place or return a copy.
Reverses each column of the structured array (column-by-column is faster than reversing the full 27-byte-per-record array as one slab).
639 def remove_regions(self, regions: List[Tuple[int, int]]) -> SeqMat: 640 """ 641 Excise given genomic intervals (inclusive on both ends). 642 643 Args: 644 regions: List of (start, end) tuples to remove 645 646 Returns: 647 New SeqMat with regions removed 648 """ 649 src = self.seq_array 650 indices = src['index'] 651 n = len(indices) 652 remove = np.zeros(n, dtype=bool) 653 654 ascending = n < 2 or indices[0] <= indices[-1] 655 sorted_idx = indices if ascending else indices[::-1] 656 657 for lo, hi in regions: 658 lo, hi = min(lo, hi), max(lo, hi) 659 i = np.searchsorted(sorted_idx, lo) 660 j = np.searchsorted(sorted_idx, hi, side='right') 661 if ascending: 662 remove[i:j] = True 663 else: 664 remove[n - j:n - i] = True 665 666 keep = ~remove 667 kept_n = int(keep.sum()) 668 669 # Build the new SeqMat directly — avoid the COW clone + per-column structured 670 # boolean-mask copy (which materialises 27 bytes per surviving row). Doing 671 # the keep-filter column by column is ~2-3x faster on large arrays. 672 new = copy.copy(self) 673 new.notes = copy.deepcopy(self.notes) 674 new.insertions = copy.deepcopy(self.insertions) 675 new.mutations = list(self.mutations) 676 new.mutated_positions = set(self.mutated_positions) 677 new._cow = False 678 new_array = np.empty(kept_n, dtype=src.dtype) 679 for col in src.dtype.names: 680 new_array[col] = src[col][keep] 681 new.seq_array = new_array 682 return new
Excise given genomic intervals (inclusive on both ends).
Arguments:
- regions: List of (start, end) tuples to remove
Returns:
New SeqMat with regions removed
684 def summary(self) -> str: 685 """Return a summary of the SeqMat object.""" 686 return (f"SeqMat '{self.name}': {len(self)}bp valid, mutations={len(self.mutations)}, " 687 f"inserts at {list(self.insertions.keys())}")
Return a summary of the SeqMat object.
689 def to_dict(self) -> Dict[str, Any]: 690 """Convert SeqMat to a dictionary representation.""" 691 return { 692 'name': self.name, 693 'sequence': self.seq, 694 'reference': self.reference_seq, 695 'mutations': self.mutations, 696 'length': len(self), 697 'mutated_positions': list(self.mutated_positions) 698 }
Convert SeqMat to a dictionary representation.
700 def to_fasta(self, wrap: int = 80) -> str: 701 """ 702 Export sequence in FASTA format. 703 704 Args: 705 wrap: Line length for sequence wrapping 706 707 Returns: 708 FASTA-formatted string 709 """ 710 header = f">{self.name}" 711 if self.mutations: 712 header += f" mutations={len(self.mutations)}" 713 714 seq = self.seq 715 lines = [header] 716 for i in range(0, len(seq), wrap): 717 lines.append(seq[i:i+wrap]) 718 719 return '\n'.join(lines)
Export sequence in FASTA format.
Arguments:
- wrap: Line length for sequence wrapping
Returns:
FASTA-formatted string
721 def reset_mutation_vector(self) -> None: 722 """ 723 Reset mutation tracking (mutated_positions, mut_type) while preserving 724 the current nucleotides (nt) and reference info. 725 This means the current sequence becomes the new baseline. 726 """ 727 self._ensure_writable() 728 # Clear existing mutation annotations 729 self.seq_array['ref'] = self.seq_array['nt'] # new reference = current nt 730 self.seq_array['mut_type'] = b'' # clear mutation type 731 self.insertions.clear() # clear insertion tracking 732 self.mutations.clear() # clear mutation history 733 self.mutated_positions.clear() # clear mutation set
Reset mutation tracking (mutated_positions, mut_type) while preserving the current nucleotides (nt) and reference info. This means the current sequence becomes the new baseline.
737 def save_seqmat(self, path: Union[str, Path], *, compressed: bool = False) -> Path: 738 """ 739 Save the entire SeqMat to a single .npz (fast). 740 - The structured seq_array is stored verbatim (zero copy conversion). 741 - Python containers (insertions, mutations, mutated_positions, notes) are pickled. 742 - If predicted_splicing exists, it is saved to a sibling .parquet file. 743 744 Args: 745 path: target file path ('.npz' will be appended if missing) 746 compressed: if True, use np.savez_compressed (slower, smaller) 747 748 Returns: 749 The written .npz Path 750 """ 751 path = Path(path) 752 if path.suffix.lower() != ".npz": 753 path = path.with_suffix(".npz") 754 755 meta = { 756 "name": self.name, 757 "version": self.version, 758 "source": self.source, 759 "rev": self.rev, 760 "has_predicted_splicing": self.predicted_splicing is not None, 761 } 762 763 # For maximum speed, use np.savez (zip, no deflate). Set allow_pickle=True for load. 764 saver = np.savez_compressed if compressed else np.savez 765 766 # Write atomically 767 tmp = path.with_suffix(".npz.tmp") 768 saver( 769 tmp, 770 seq_array=self.seq_array, # structured array 771 meta=np.array(meta, dtype=object), # small dict (pickled) 772 notes=np.array(self.notes, dtype=object), # user metadata (pickled) 773 insertions=np.array(dict(self.insertions), dtype=object),# defaultdict(list) (pickled) 774 mutations=np.array(self.mutations, dtype=object), # list[dict] (pickled) 775 mutated_positions=np.array(list(self.mutated_positions), dtype=np.int64), 776 ) 777 tmp.replace(path) 778 779 # Save DataFrame separately if present (fast columnar) 780 if self.predicted_splicing is not None: 781 ppath = path.with_suffix(".parquet") 782 # Use pandas fast parquet writer if available 783 try: 784 self.predicted_splicing.to_parquet(ppath, index=False) 785 except Exception: 786 # Feather as fallback 787 self.predicted_splicing.reset_index(drop=True).to_feather(ppath.with_suffix(".feather")) 788 789 return path
Save the entire SeqMat to a single .npz (fast).
- The structured seq_array is stored verbatim (zero copy conversion).
- Python containers (insertions, mutations, mutated_positions, notes) are pickled.
- If predicted_splicing exists, it is saved to a sibling .parquet file.
Arguments:
- path: target file path ('.npz' will be appended if missing)
- compressed: if True, use np.savez_compressed (slower, smaller)
Returns:
The written .npz Path
791 @classmethod 792 def read_seqmat(cls, path: Union[str, Path]) -> "SeqMat": 793 """ 794 Load a SeqMat saved by save_seqmat() (fast). 795 Returns: 796 SeqMat instance with all fields restored. 797 """ 798 path = Path(path) 799 if path.suffix.lower() != ".npz": 800 path = path.with_suffix(".npz") 801 802 with np.load(path, allow_pickle=True) as z: 803 seq_array = z["seq_array"] 804 meta = z["meta"].item() if isinstance(z["meta"], np.ndarray) else z["meta"] 805 notes = z["notes"].item() if isinstance(z["notes"], np.ndarray) else z["notes"] 806 807 # Containers 808 ins_obj = z["insertions"].item() if isinstance(z["insertions"], np.ndarray) else z["insertions"] 809 mut_list = z["mutations"].tolist() if hasattr(z["mutations"], "tolist") else z["mutations"] 810 mutated_positions = set(map(int, z["mutated_positions"].tolist())) 811 812 # Build a blank instance and populate directly to avoid recomputing arrays 813 obj = object.__new__(cls) 814 # Metadata 815 obj.name = meta.get("name", "wild_type") 816 obj.version = meta.get("version", "1.0") 817 obj.source = meta.get("source", "Unknown") 818 obj.notes = notes if isinstance(notes, dict) else dict(notes) 819 obj.rev = bool(meta.get("rev", False)) 820 obj.predicted_splicing = None 821 822 # Core storage 823 obj.seq_array = seq_array 824 # Ensure types 825 from collections import defaultdict 826 dl = defaultdict(list) 827 dl.update(ins_obj if isinstance(ins_obj, dict) else dict(ins_obj)) 828 obj.insertions = dl 829 obj.mutations = list(mut_list) if isinstance(mut_list, (list, tuple)) else [] 830 obj.mutated_positions = mutated_positions 831 obj._cow = False 832 833 # Keep slots that are not persisted explicitly in a clean state 834 # (no additional refresh needed; seq_array already contains nt/ref/valid/mut_type) 835 return obj
Load a SeqMat saved by save_seqmat() (fast).
Returns:
SeqMat instance with all fields restored.
17class Gene: 18 """ 19 A class representing a Gene, with associated transcripts and metadata. 20 21 Attributes: 22 organism (str): The organism build (e.g. 'hg38'). 23 transcripts (dict): A dictionary of transcript annotations keyed by transcript ID. 24 gene_name (str): The name of the gene. 25 gene_id (str): The unique identifier for the gene. 26 chrm (str): The chromosome on which the gene resides. 27 rev (bool): Whether the gene is on the reverse strand. 28 """ 29 30 def __init__(self, gene_name: str, gene_id: str, rev: bool, chrm: str, 31 transcripts: Optional[Dict[str, Any]] = None, organism: Optional[str] = None): 32 """ 33 Initialize a Gene instance. 34 35 Args: 36 gene_name: Name of the gene 37 gene_id: Unique identifier for the gene 38 rev: Whether gene is on reverse strand 39 chrm: Chromosome identifier 40 transcripts: Dictionary of transcript annotations 41 organism: Organism reference build (default from config) 42 """ 43 self.gene_name = gene_name 44 self.gene_id = gene_id 45 self.rev = rev 46 self.chrm = chrm 47 self.organism = organism if organism is not None else get_default_organism() 48 self.transcripts = transcripts if transcripts is not None else {} 49 50 def __repr__(self) -> str: 51 """Official string representation of the Gene object.""" 52 return f"Gene({self.gene_name})" 53 54 def __str__(self) -> str: 55 """User-friendly string representation of the Gene object.""" 56 return f"Gene: {self.gene_name}, ID: {self.gene_id}, Chr: {self.chrm}, Transcripts: {len(self.transcripts)}" 57 58 def __len__(self) -> int: 59 """Returns the number of transcripts associated with this gene.""" 60 return len(self.transcripts) 61 62 def __iter__(self) -> Iterator[Transcript]: 63 """Allow iteration over the gene's transcripts, yielding Transcript objects.""" 64 for tid, annotations in self.transcripts.items(): 65 yield Transcript(annotations, organism=self.organism) 66 67 def __getitem__(self, item: str) -> Optional[Transcript]: 68 """Get a transcript by ID. Returns ``None`` if the ID isn't an annotated transcript.""" 69 if item not in self.transcripts: 70 _log.warning("%s is not an annotated transcript of %s", item, self.gene_name) 71 return None 72 return Transcript(self.transcripts[item], organism=self.organism) 73 74 @classmethod 75 def _load_dict(cls, gene_name: str, organism: str) -> Optional[Dict[str, Any]]: 76 """Load the raw gene dict from LMDB → SQLite → per-gene pickle, in that order. 77 78 Returns ``None`` when the gene isn't found in any backend. 79 """ 80 try: 81 from .lmdb_store import load_gene_from_lmdb 82 data = load_gene_from_lmdb(gene_name, organism) 83 if data is not None: 84 return data 85 except ImportError: 86 pass 87 try: 88 from .sqlite_store import load_gene_from_sqlite 89 data = load_gene_from_sqlite(gene_name, organism) 90 if data is not None: 91 return data 92 except ImportError: 93 pass 94 try: 95 config = get_organism_config(organism) 96 except ValueError as exc: 97 raise OrganismNotConfiguredError( 98 f"Organism '{organism}' not configured. Run setup_genomics_data() first." 99 ) from exc 100 mrna_path = Path(config["MRNA_PATH"]) 101 if mrna_path.exists(): 102 for biotype_dir in mrna_path.iterdir(): 103 if biotype_dir.is_dir(): 104 for pkl in biotype_dir.glob(f"*_{gene_name}.pkl"): 105 return unload_pickle(pkl) 106 return None 107 108 @classmethod 109 def get(cls, gene_name: str, organism: Optional[str] = None) -> 'Gene': 110 """Load a gene, raising on failure. 111 112 Raises: 113 OrganismNotConfiguredError: when the organism has no data set up. 114 GeneNotFoundError: when the organism is configured but the gene is missing. 115 """ 116 if organism is None: 117 organism = get_default_organism() 118 data = cls._load_dict(gene_name, organism) 119 if data is None: 120 raise GeneNotFoundError( 121 f"Gene '{gene_name}' not found in organism '{organism}'." 122 ) 123 return cls( 124 gene_name=data.get('gene_name'), 125 gene_id=data.get('gene_id'), 126 rev=data.get('rev'), 127 chrm=data.get('chrm'), 128 transcripts=data.get('transcripts', {}), 129 organism=organism, 130 ) 131 132 @classmethod 133 def from_file(cls, gene_name: str, organism: Optional[str] = None) -> Optional['Gene']: 134 """Load a gene by name, returning ``None`` if not found. 135 136 Load order: LMDB (if installed) → SQLite (``genes.db``) → per-gene pickle files. 137 Prefer :meth:`get` if you'd rather have a typed exception than ``None``. 138 """ 139 if organism is None: 140 organism = get_default_organism() 141 try: 142 return cls.get(gene_name, organism=organism) 143 except OrganismNotConfiguredError as exc: 144 _log.warning("%s", exc) 145 return None 146 except GeneNotFoundError as exc: 147 _log.warning("%s", exc) 148 return None 149 150 @classmethod 151 def from_position( 152 cls, 153 chrm: str, 154 pos: PosArg, 155 organism: Optional[str] = None, 156 ) -> List['Gene']: 157 """Return all genes overlapping a point or range on a chromosome. 158 159 Args: 160 chrm: Chromosome (e.g. "12", "chr12", "X"). Leading 'chr' is stripped. 161 pos: Either an int position or a (start, end) tuple (inclusive). 162 organism: Organism build (uses default if None). 163 164 Returns: 165 List of Gene objects, possibly empty. Returned in ascending start order. 166 """ 167 if organism is None: 168 organism = get_default_organism() 169 names = gene_names_at_position(chrm, pos, organism=organism) 170 genes: List['Gene'] = [] 171 for name in names: 172 g = cls.from_file(name, organism=organism) 173 if g is not None: 174 genes.append(g) 175 return genes 176 177 def splice_sites(self) -> Tuple[Counter, Counter]: 178 """Return (Counter of acceptors, Counter of donors) across all transcripts.""" 179 acceptors: List[Any] = [] 180 donors: List[Any] = [] 181 for transcript in self.transcripts.values(): 182 acceptors.extend(transcript.get('acceptors', [])) 183 donors.extend(transcript.get('donors', [])) 184 185 return Counter(acceptors), Counter(donors) 186 187 def transcript(self, tid: Optional[str] = None) -> Optional[Transcript]: 188 """Return Transcript by ID, or primary transcript if tid is None.""" 189 if tid is None: 190 tid = self.primary_transcript 191 192 if tid is None or tid not in self.transcripts: 193 return None 194 195 return Transcript(self.transcripts[tid], organism=self.organism) 196 197 @property 198 def primary_transcript(self) -> Optional[str]: 199 """Primary transcript ID, or first protein-coding transcript, or None.""" 200 if hasattr(self, "_primary_transcript"): 201 return self._primary_transcript 202 primary = [k for k, v in self.transcripts.items() if v.get("primary_transcript")] 203 if primary: 204 self._primary_transcript = primary[0] 205 return self._primary_transcript 206 protein_coding = [k for k, v in self.transcripts.items() 207 if v.get("transcript_biotype") == "protein_coding"] 208 if protein_coding: 209 self._primary_transcript = protein_coding[0] 210 return self._primary_transcript 211 self._primary_transcript = None 212 return None
A class representing a Gene, with associated transcripts and metadata.
Attributes:
- organism (str): The organism build (e.g. 'hg38').
- transcripts (dict): A dictionary of transcript annotations keyed by transcript ID.
- gene_name (str): The name of the gene.
- gene_id (str): The unique identifier for the gene.
- chrm (str): The chromosome on which the gene resides.
- rev (bool): Whether the gene is on the reverse strand.
30 def __init__(self, gene_name: str, gene_id: str, rev: bool, chrm: str, 31 transcripts: Optional[Dict[str, Any]] = None, organism: Optional[str] = None): 32 """ 33 Initialize a Gene instance. 34 35 Args: 36 gene_name: Name of the gene 37 gene_id: Unique identifier for the gene 38 rev: Whether gene is on reverse strand 39 chrm: Chromosome identifier 40 transcripts: Dictionary of transcript annotations 41 organism: Organism reference build (default from config) 42 """ 43 self.gene_name = gene_name 44 self.gene_id = gene_id 45 self.rev = rev 46 self.chrm = chrm 47 self.organism = organism if organism is not None else get_default_organism() 48 self.transcripts = transcripts if transcripts is not None else {}
Initialize a Gene instance.
Arguments:
- gene_name: Name of the gene
- gene_id: Unique identifier for the gene
- rev: Whether gene is on reverse strand
- chrm: Chromosome identifier
- transcripts: Dictionary of transcript annotations
- organism: Organism reference build (default from config)
108 @classmethod 109 def get(cls, gene_name: str, organism: Optional[str] = None) -> 'Gene': 110 """Load a gene, raising on failure. 111 112 Raises: 113 OrganismNotConfiguredError: when the organism has no data set up. 114 GeneNotFoundError: when the organism is configured but the gene is missing. 115 """ 116 if organism is None: 117 organism = get_default_organism() 118 data = cls._load_dict(gene_name, organism) 119 if data is None: 120 raise GeneNotFoundError( 121 f"Gene '{gene_name}' not found in organism '{organism}'." 122 ) 123 return cls( 124 gene_name=data.get('gene_name'), 125 gene_id=data.get('gene_id'), 126 rev=data.get('rev'), 127 chrm=data.get('chrm'), 128 transcripts=data.get('transcripts', {}), 129 organism=organism, 130 )
Load a gene, raising on failure.
Raises:
- OrganismNotConfiguredError: when the organism has no data set up.
- GeneNotFoundError: when the organism is configured but the gene is missing.
132 @classmethod 133 def from_file(cls, gene_name: str, organism: Optional[str] = None) -> Optional['Gene']: 134 """Load a gene by name, returning ``None`` if not found. 135 136 Load order: LMDB (if installed) → SQLite (``genes.db``) → per-gene pickle files. 137 Prefer :meth:`get` if you'd rather have a typed exception than ``None``. 138 """ 139 if organism is None: 140 organism = get_default_organism() 141 try: 142 return cls.get(gene_name, organism=organism) 143 except OrganismNotConfiguredError as exc: 144 _log.warning("%s", exc) 145 return None 146 except GeneNotFoundError as exc: 147 _log.warning("%s", exc) 148 return None
Load a gene by name, returning None if not found.
Load order: LMDB (if installed) → SQLite (genes.db) → per-gene pickle files.
Prefer get() if you'd rather have a typed exception than None.
150 @classmethod 151 def from_position( 152 cls, 153 chrm: str, 154 pos: PosArg, 155 organism: Optional[str] = None, 156 ) -> List['Gene']: 157 """Return all genes overlapping a point or range on a chromosome. 158 159 Args: 160 chrm: Chromosome (e.g. "12", "chr12", "X"). Leading 'chr' is stripped. 161 pos: Either an int position or a (start, end) tuple (inclusive). 162 organism: Organism build (uses default if None). 163 164 Returns: 165 List of Gene objects, possibly empty. Returned in ascending start order. 166 """ 167 if organism is None: 168 organism = get_default_organism() 169 names = gene_names_at_position(chrm, pos, organism=organism) 170 genes: List['Gene'] = [] 171 for name in names: 172 g = cls.from_file(name, organism=organism) 173 if g is not None: 174 genes.append(g) 175 return genes
Return all genes overlapping a point or range on a chromosome.
Arguments:
- chrm: Chromosome (e.g. "12", "chr12", "X"). Leading 'chr' is stripped.
- pos: Either an int position or a (start, end) tuple (inclusive).
- organism: Organism build (uses default if None).
Returns:
List of Gene objects, possibly empty. Returned in ascending start order.
177 def splice_sites(self) -> Tuple[Counter, Counter]: 178 """Return (Counter of acceptors, Counter of donors) across all transcripts.""" 179 acceptors: List[Any] = [] 180 donors: List[Any] = [] 181 for transcript in self.transcripts.values(): 182 acceptors.extend(transcript.get('acceptors', [])) 183 donors.extend(transcript.get('donors', [])) 184 185 return Counter(acceptors), Counter(donors)
Return (Counter of acceptors, Counter of donors) across all transcripts.
187 def transcript(self, tid: Optional[str] = None) -> Optional[Transcript]: 188 """Return Transcript by ID, or primary transcript if tid is None.""" 189 if tid is None: 190 tid = self.primary_transcript 191 192 if tid is None or tid not in self.transcripts: 193 return None 194 195 return Transcript(self.transcripts[tid], organism=self.organism)
Return Transcript by ID, or primary transcript if tid is None.
197 @property 198 def primary_transcript(self) -> Optional[str]: 199 """Primary transcript ID, or first protein-coding transcript, or None.""" 200 if hasattr(self, "_primary_transcript"): 201 return self._primary_transcript 202 primary = [k for k, v in self.transcripts.items() if v.get("primary_transcript")] 203 if primary: 204 self._primary_transcript = primary[0] 205 return self._primary_transcript 206 protein_coding = [k for k, v in self.transcripts.items() 207 if v.get("transcript_biotype") == "protein_coding"] 208 if protein_coding: 209 self._primary_transcript = protein_coding[0] 210 return self._primary_transcript 211 self._primary_transcript = None 212 return None
Primary transcript ID, or first protein-coding transcript, or None.
42class Transcript: 43 """Transcript with genomic boundaries, donors/acceptors, pre_mrna, mature_mrna, and optional ORF/protein.""" 44 45 def __init__(self, d: Dict[str, Any], organism: Optional[str] = None): 46 """Build Transcript from dict of attributes. Requires transcript_start, transcript_end, rev, chrm.""" 47 self._cons_vector = None 48 array_fields = {"acceptors", "donors", "cons_vector", "rev"} 49 for k, v in d.items(): 50 if k == "cons_vector": 51 self._cons_vector = np.asarray(v, dtype=np.float32) if v is not None else None 52 continue 53 if k in array_fields and v is not None: 54 v = np.array(v) 55 setattr(self, k, v) 56 57 self.organism = organism if organism is not None else get_default_organism() 58 required_attrs = ["transcript_start", "transcript_end", "rev", "chrm"] 59 missing = [attr for attr in required_attrs if not hasattr(self, attr)] 60 if missing: 61 raise AssertionError(f"Transcript is missing required attributes: {missing}") 62 if not hasattr(self, "donors") or self.donors is None: 63 self.donors = np.array([]) 64 if not hasattr(self, "acceptors") or self.acceptors is None: 65 self.acceptors = np.array([]) 66 if not hasattr(self, "cons_available"): 67 self.cons_available = False 68 self.protein_coding = hasattr(self, "TIS") and hasattr(self, "TTS") 69 self.transcript_upper = max(self.transcript_start, self.transcript_end) 70 self.transcript_lower = min(self.transcript_start, self.transcript_end) 71 self._pre_mrna = None 72 if self.cons_available and hasattr(self, "cons_seq") and self._cons_vector is not None: 73 if self.cons_seq.endswith("*") and len(self.cons_seq) == len(self._cons_vector): 74 self._cons_vector = self._cons_vector[:-1] 75 self.cons_seq = self.cons_seq[:-1] 76 77 def __repr__(self) -> str: 78 """Official string representation.""" 79 return f"Transcript({getattr(self, 'transcript_id', 'unknown_id')})" 80 81 def __str__(self) -> str: 82 """User-friendly string representation of the transcript.""" 83 transcript_biotype = getattr(self, 'transcript_biotype', 'unknown').replace('_', ' ').title() 84 primary = getattr(self, 'primary_transcript', False) 85 return f"Transcript {getattr(self, 'transcript_id', 'unknown_id')}, " \ 86 f"Type: {transcript_biotype}, Primary: {primary}" 87 88 def __len__(self) -> int: 89 """Length of the transcript sequence.""" 90 return len(getattr(self, 'transcript_seq', '')) 91 92 def __eq__(self, other: object) -> bool: 93 """Check equality of two transcripts based on their transcript sequences.""" 94 if not isinstance(other, Transcript): 95 return NotImplemented 96 return self.transcript_seq == other.transcript_seq 97 98 def __contains__(self, subvalue: Any) -> bool: 99 """Check if a given subsequence is contained within the pre_mRNA.""" 100 if not hasattr(subvalue, 'seq_array'): 101 return False 102 return np.all(np.isin(subvalue.index, self.pre_mrna.index)) 103 104 def clone(self) -> Transcript: 105 """Returns a deep copy of this Transcript instance.""" 106 return copy.deepcopy(self) 107 108 @property 109 def pre_mrna(self) -> SeqMat: 110 """Pre-mRNA sequence. Lazy-loaded from FASTA on first access.""" 111 if self._pre_mrna is None: 112 self.generate_pre_mrna() 113 return self._pre_mrna 114 115 @pre_mrna.setter 116 def pre_mrna(self, value: Optional[SeqMat]) -> None: 117 self._pre_mrna = value 118 119 @property 120 def cons_vector(self) -> np.ndarray: 121 """Per-residue conservation scores; ones vector when conservation data not loaded.""" 122 if self._cons_vector is not None: 123 return self._cons_vector 124 n = len(getattr(self, "protein", "")) if getattr(self, "protein", "") else len(self.pre_mrna) 125 return np.ones(n, dtype=np.float32) 126 127 @property 128 def exons(self) -> List[Tuple[int, int]]: 129 """Return a list of exon boundary tuples (acceptor, donor).""" 130 exon_starts = np.concatenate(([self.transcript_start], self.acceptors)) 131 exon_ends = np.concatenate((self.donors, [self.transcript_end])) 132 return list(zip(exon_starts, exon_ends)) 133 134 @property 135 def exons_pos(self) -> List[Tuple[int, int]]: 136 """Return exons with positions adjusted for strand orientation.""" 137 exon_positions = self.exons 138 if self.rev: 139 # Reverse order and swap coordinates for reverse strand 140 exon_positions = [(end, start) for start, end in exon_positions[::-1]] 141 return exon_positions 142 143 @property 144 def introns(self) -> List[Tuple[int, int]]: 145 """Return a list of intron boundaries derived from donors and acceptors.""" 146 valid_donors = self.donors[self.donors != self.transcript_end] 147 valid_acceptors = self.acceptors[self.acceptors != self.transcript_start] 148 149 # Adjust intron boundaries to exclude exon splice sites 150 introns = [] 151 for donor, acceptor in zip(valid_donors, valid_acceptors): 152 if self.rev: 153 # For reverse strand: intron from (donor-1) to (acceptor+1) 154 intron_start = donor - 1 155 intron_end = acceptor + 1 156 else: 157 # For forward strand: intron from (donor+1) to (acceptor-1) 158 intron_start = donor + 1 159 intron_end = acceptor - 1 160 introns.append((intron_start, intron_end)) 161 162 return introns 163 164 @property 165 def introns_pos(self) -> List[Tuple[int, int]]: 166 """Return introns with positions adjusted for strand orientation.""" 167 intron_positions = self.introns 168 if self.rev: 169 intron_positions = [(end, start) for start, end in intron_positions[::-1]] 170 return intron_positions 171 172 def _fix_and_check_introns(self) -> 'Transcript': 173 """ 174 Ensure acceptors and donors are sorted and unique, and validate exon/intron structures. 175 176 Raises: 177 ValueError: If there are mismatches or ordering issues in exons/introns 178 179 Returns: 180 The current Transcript object (for chaining) 181 """ 182 # Ensure uniqueness and correct ordering based on strand 183 self.acceptors = np.unique(self.acceptors) 184 self.donors = np.unique(self.donors) 185 186 if self.rev: 187 self.acceptors = np.sort(self.acceptors)[::-1] 188 self.donors = np.sort(self.donors)[::-1] 189 else: 190 self.acceptors = np.sort(self.acceptors) 191 self.donors = np.sort(self.donors) 192 193 # Validation checks 194 if self._exon_intron_matchup_flag(): 195 raise ValueError("Unequal number of acceptors and donors.") 196 197 if self._exon_intron_order_flag(): 198 raise ValueError("Exon/intron order out of position.") 199 200 if self._transcript_boundary_flag(): 201 raise ValueError("Transcript boundaries must straddle acceptors and donors.") 202 203 return self 204 205 def _exon_intron_matchup_flag(self) -> bool: 206 """Check if acceptors and donors count match.""" 207 return len(self.acceptors) != len(self.donors) 208 209 def _exon_intron_order_flag(self) -> bool: 210 """Check for ordering issues in exon boundaries.""" 211 return any(start > end for start, end in self.exons_pos) 212 213 def _transcript_boundary_flag(self) -> bool: 214 """Check if boundaries are within the transcript start/end range.""" 215 if not len(self.acceptors) and not len(self.donors): 216 return False 217 min_boundary = np.min(np.concatenate((self.acceptors, self.donors))) 218 max_boundary = np.max(np.concatenate((self.acceptors, self.donors))) 219 return (self.transcript_lower > min_boundary) or (self.transcript_upper < max_boundary) 220 221 @property 222 def exonic_indices(self) -> np.ndarray: 223 """Return the indices covering exons in the transcript.""" 224 return np.concatenate([np.arange(a, b + 1) for a, b in self.exons_pos]) 225 226 def _resolve_fasta_path(self, fasta_path: Optional[Path] = None) -> Tuple[Path, List[str]]: 227 """Resolve FASTA file path and contig names for this transcript.""" 228 if fasta_path is None: 229 config = get_organism_config(self.organism) 230 fasta_path = config.get("fasta_full_genome") or config.get("fasta") 231 if fasta_path is None: 232 fasta_path = config.get("CHROM_SOURCE") and (Path(config["CHROM_SOURCE"]) / f"chr{self.chrm}.fasta") 233 fasta_path = Path(fasta_path) if fasta_path else None 234 if not fasta_path or not fasta_path.exists(): 235 raise ValueError( 236 "No FASTA path in config (fasta_full_genome or fasta). " 237 "Run setup_genomics_data() to install the full-genome FASTA (e.g. hg38.fa)." 238 ) 239 raw_chr = str(self.chrm).replace("chr", "") or "17" 240 with_chr = f"chr{raw_chr}" 241 contigs = [with_chr, raw_chr] if with_chr != raw_chr else [with_chr] 242 return Path(fasta_path), contigs 243 244 def _assemble_mature_from_fasta(self) -> SeqMat: 245 """Fetch exons directly from FASTA and assemble mature mRNA (skips pre-mRNA).""" 246 fasta_path, contigs = self._resolve_fasta_path() 247 248 # Sort exons by ascending genomic position 249 exon_intervals = sorted((min(a, b), max(a, b)) for a, b in self.exons) 250 251 last_err = None 252 for contig in contigs: 253 try: 254 seqs = [] 255 all_indices = [] 256 for lo, hi in exon_intervals: 257 seq_str = _fetch_fasta_region(str(fasta_path), contig, lo, hi) 258 seqs.append(seq_str) 259 all_indices.append(np.arange(lo, hi + 1, dtype=np.int64)) 260 261 mature = SeqMat(''.join(seqs), indices=np.concatenate(all_indices)) 262 if self.rev: 263 mature.reverse_complement() 264 return mature 265 except Exception as e: 266 last_err = e 267 if last_err is not None: 268 raise last_err 269 raise ValueError(f"Could not fetch exons from FASTA for {self.chrm}") 270 271 def pull_pre_mrna_from_fasta( 272 self, 273 fasta_path: Optional[Path] = None, 274 upstream: int = 0, 275 downstream: int = 0, 276 region_start: Optional[int] = None, 277 region_end: Optional[int] = None, 278 ) -> Dict[str, Any]: 279 """Pre-mRNA sequence from the single full-genome FASTA (e.g. hg38.fa). Optional upstream/downstream.""" 280 fasta_path, contigs_to_try = self._resolve_fasta_path(fasta_path) 281 if region_start is not None or region_end is not None: 282 start = region_start if region_start is not None else max(1, self.transcript_lower - upstream) 283 end = region_end if region_end is not None else (self.transcript_upper + downstream) 284 else: 285 start = max(1, self.transcript_lower - upstream) 286 end = self.transcript_upper + downstream 287 last_err = None 288 for contig in contigs_to_try: 289 try: 290 seq_mat = SeqMat.from_fasta_file(fasta_path, contig, start, end) 291 if len(seq_mat.seq) > 0: 292 return {"seq": seq_mat.seq, "indices": seq_mat.index} 293 except Exception as e: 294 last_err = e 295 if last_err is not None: 296 raise last_err 297 raise ValueError(f"Could not fetch {self.chrm}:{start}-{end} from FASTA (tried contigs {contigs_to_try})") 298 299 def generate_pre_mrna( 300 self, 301 upstream: int = 0, 302 downstream: int = 0, 303 region_start: Optional[int] = None, 304 region_end: Optional[int] = None, 305 ) -> "Transcript": 306 """Load pre-mRNA and set self.pre_mrna. Use upstream/downstream to add flanking (bp). 307 308 Optional region_start/region_end restrict the FASTA read to a genomic 309 sub-interval (e.g. ±7500 bp around a mutation site) instead of fetching 310 the entire transcript, which can be orders of magnitude faster for large 311 genes. 312 """ 313 try: 314 seq_data = self.pull_pre_mrna_from_fasta( 315 upstream=upstream, downstream=downstream, 316 region_start=region_start, region_end=region_end, 317 ) 318 pre_mrna = SeqMat(**seq_data) 319 except Exception: 320 if region_start is not None or region_end is not None: 321 lo = region_start if region_start is not None else max(1, self.transcript_lower - upstream) 322 hi = region_end if region_end is not None else (self.transcript_upper + downstream) 323 else: 324 lo = max(1, self.transcript_lower - upstream) 325 hi = self.transcript_upper + downstream 326 length = hi - lo + 1 327 indices = np.arange(lo, lo + length) 328 pre_mrna = SeqMat("N" * length, indices=indices) 329 if self.rev: 330 pre_mrna.reverse_complement() 331 self.pre_mrna = pre_mrna 332 return self 333 334 def generate_mature_mrna(self, inplace: bool = True) -> Union['Transcript', SeqMat]: 335 """ 336 Generate the mature mRNA by concatenating exon regions from pre_mRNA. 337 338 When pre-mRNA has not been loaded yet, exons are fetched directly from 339 the indexed FASTA — skipping the full-gene read entirely. 340 341 Args: 342 inplace: If True, set self.mature_mrna, else return a new SeqMat 343 344 Returns: 345 The Transcript object (if inplace=True) or a SeqMat (if inplace=False) 346 """ 347 self._fix_and_check_introns() 348 349 if self._pre_mrna is not None: 350 # Pre-mRNA loaded (possibly mutated): splice from it 351 result = self.pre_mrna.remove_regions(self.introns) 352 else: 353 # Fast path: fetch exons directly, skip full-gene read 354 try: 355 result = self._assemble_mature_from_fasta() 356 except Exception: 357 # Fall back to full pre-mRNA load + splice 358 result = self.pre_mrna.remove_regions(self.introns) 359 360 if inplace: 361 self.mature_mrna = result 362 return self 363 return result 364 365 @property 366 def orf(self) -> Union[SeqMat, 'Transcript']: 367 """ 368 Return the ORF (Open Reading Frame) SeqMat object, if TIS and TTS are available. 369 370 Returns: 371 The ORF SeqMat if TIS/TTS are set, else self 372 """ 373 if not self.protein_coding: 374 import logging as _logging 375 _logging.getLogger(__name__).warning( 376 "Cannot create protein without set TIS and TTS values." 377 ) 378 return self 379 380 # Extract ORF region from mature mRNA via clone (no string re-parse) 381 if hasattr(self, 'mature_mrna') and self.mature_mrna is not None: 382 return self.mature_mrna.clone(self.TIS, self.TTS) 383 384 return self 385 386 def generate_protein(self, inplace: bool = True) -> Union['Transcript', str]: 387 """ 388 Translate the ORF into a protein sequence. 389 390 Args: 391 inplace: If True, store protein in self. Otherwise, return it 392 393 Returns: 394 The Transcript object if inplace=True, else the protein sequence 395 """ 396 if not self.protein_coding: 397 return self if inplace else "" 398 399 # Translate the ORF to protein 400 orf_seq = self.orf 401 if isinstance(orf_seq, SeqMat): 402 protein = _translate(orf_seq.seq).strip('*') 403 else: 404 protein = "" 405 406 if inplace: 407 self.protein = protein 408 if self._cons_vector is not None and len(self._cons_vector) != len(protein): 409 self._cons_vector = np.ones(len(protein), dtype=np.float32) 410 return self 411 412 return protein
Transcript with genomic boundaries, donors/acceptors, pre_mrna, mature_mrna, and optional ORF/protein.
45 def __init__(self, d: Dict[str, Any], organism: Optional[str] = None): 46 """Build Transcript from dict of attributes. Requires transcript_start, transcript_end, rev, chrm.""" 47 self._cons_vector = None 48 array_fields = {"acceptors", "donors", "cons_vector", "rev"} 49 for k, v in d.items(): 50 if k == "cons_vector": 51 self._cons_vector = np.asarray(v, dtype=np.float32) if v is not None else None 52 continue 53 if k in array_fields and v is not None: 54 v = np.array(v) 55 setattr(self, k, v) 56 57 self.organism = organism if organism is not None else get_default_organism() 58 required_attrs = ["transcript_start", "transcript_end", "rev", "chrm"] 59 missing = [attr for attr in required_attrs if not hasattr(self, attr)] 60 if missing: 61 raise AssertionError(f"Transcript is missing required attributes: {missing}") 62 if not hasattr(self, "donors") or self.donors is None: 63 self.donors = np.array([]) 64 if not hasattr(self, "acceptors") or self.acceptors is None: 65 self.acceptors = np.array([]) 66 if not hasattr(self, "cons_available"): 67 self.cons_available = False 68 self.protein_coding = hasattr(self, "TIS") and hasattr(self, "TTS") 69 self.transcript_upper = max(self.transcript_start, self.transcript_end) 70 self.transcript_lower = min(self.transcript_start, self.transcript_end) 71 self._pre_mrna = None 72 if self.cons_available and hasattr(self, "cons_seq") and self._cons_vector is not None: 73 if self.cons_seq.endswith("*") and len(self.cons_seq) == len(self._cons_vector): 74 self._cons_vector = self._cons_vector[:-1] 75 self.cons_seq = self.cons_seq[:-1]
Build Transcript from dict of attributes. Requires transcript_start, transcript_end, rev, chrm.
104 def clone(self) -> Transcript: 105 """Returns a deep copy of this Transcript instance.""" 106 return copy.deepcopy(self)
Returns a deep copy of this Transcript instance.
108 @property 109 def pre_mrna(self) -> SeqMat: 110 """Pre-mRNA sequence. Lazy-loaded from FASTA on first access.""" 111 if self._pre_mrna is None: 112 self.generate_pre_mrna() 113 return self._pre_mrna
Pre-mRNA sequence. Lazy-loaded from FASTA on first access.
119 @property 120 def cons_vector(self) -> np.ndarray: 121 """Per-residue conservation scores; ones vector when conservation data not loaded.""" 122 if self._cons_vector is not None: 123 return self._cons_vector 124 n = len(getattr(self, "protein", "")) if getattr(self, "protein", "") else len(self.pre_mrna) 125 return np.ones(n, dtype=np.float32)
Per-residue conservation scores; ones vector when conservation data not loaded.
127 @property 128 def exons(self) -> List[Tuple[int, int]]: 129 """Return a list of exon boundary tuples (acceptor, donor).""" 130 exon_starts = np.concatenate(([self.transcript_start], self.acceptors)) 131 exon_ends = np.concatenate((self.donors, [self.transcript_end])) 132 return list(zip(exon_starts, exon_ends))
Return a list of exon boundary tuples (acceptor, donor).
134 @property 135 def exons_pos(self) -> List[Tuple[int, int]]: 136 """Return exons with positions adjusted for strand orientation.""" 137 exon_positions = self.exons 138 if self.rev: 139 # Reverse order and swap coordinates for reverse strand 140 exon_positions = [(end, start) for start, end in exon_positions[::-1]] 141 return exon_positions
Return exons with positions adjusted for strand orientation.
143 @property 144 def introns(self) -> List[Tuple[int, int]]: 145 """Return a list of intron boundaries derived from donors and acceptors.""" 146 valid_donors = self.donors[self.donors != self.transcript_end] 147 valid_acceptors = self.acceptors[self.acceptors != self.transcript_start] 148 149 # Adjust intron boundaries to exclude exon splice sites 150 introns = [] 151 for donor, acceptor in zip(valid_donors, valid_acceptors): 152 if self.rev: 153 # For reverse strand: intron from (donor-1) to (acceptor+1) 154 intron_start = donor - 1 155 intron_end = acceptor + 1 156 else: 157 # For forward strand: intron from (donor+1) to (acceptor-1) 158 intron_start = donor + 1 159 intron_end = acceptor - 1 160 introns.append((intron_start, intron_end)) 161 162 return introns
Return a list of intron boundaries derived from donors and acceptors.
164 @property 165 def introns_pos(self) -> List[Tuple[int, int]]: 166 """Return introns with positions adjusted for strand orientation.""" 167 intron_positions = self.introns 168 if self.rev: 169 intron_positions = [(end, start) for start, end in intron_positions[::-1]] 170 return intron_positions
Return introns with positions adjusted for strand orientation.
221 @property 222 def exonic_indices(self) -> np.ndarray: 223 """Return the indices covering exons in the transcript.""" 224 return np.concatenate([np.arange(a, b + 1) for a, b in self.exons_pos])
Return the indices covering exons in the transcript.
271 def pull_pre_mrna_from_fasta( 272 self, 273 fasta_path: Optional[Path] = None, 274 upstream: int = 0, 275 downstream: int = 0, 276 region_start: Optional[int] = None, 277 region_end: Optional[int] = None, 278 ) -> Dict[str, Any]: 279 """Pre-mRNA sequence from the single full-genome FASTA (e.g. hg38.fa). Optional upstream/downstream.""" 280 fasta_path, contigs_to_try = self._resolve_fasta_path(fasta_path) 281 if region_start is not None or region_end is not None: 282 start = region_start if region_start is not None else max(1, self.transcript_lower - upstream) 283 end = region_end if region_end is not None else (self.transcript_upper + downstream) 284 else: 285 start = max(1, self.transcript_lower - upstream) 286 end = self.transcript_upper + downstream 287 last_err = None 288 for contig in contigs_to_try: 289 try: 290 seq_mat = SeqMat.from_fasta_file(fasta_path, contig, start, end) 291 if len(seq_mat.seq) > 0: 292 return {"seq": seq_mat.seq, "indices": seq_mat.index} 293 except Exception as e: 294 last_err = e 295 if last_err is not None: 296 raise last_err 297 raise ValueError(f"Could not fetch {self.chrm}:{start}-{end} from FASTA (tried contigs {contigs_to_try})")
Pre-mRNA sequence from the single full-genome FASTA (e.g. hg38.fa). Optional upstream/downstream.
299 def generate_pre_mrna( 300 self, 301 upstream: int = 0, 302 downstream: int = 0, 303 region_start: Optional[int] = None, 304 region_end: Optional[int] = None, 305 ) -> "Transcript": 306 """Load pre-mRNA and set self.pre_mrna. Use upstream/downstream to add flanking (bp). 307 308 Optional region_start/region_end restrict the FASTA read to a genomic 309 sub-interval (e.g. ±7500 bp around a mutation site) instead of fetching 310 the entire transcript, which can be orders of magnitude faster for large 311 genes. 312 """ 313 try: 314 seq_data = self.pull_pre_mrna_from_fasta( 315 upstream=upstream, downstream=downstream, 316 region_start=region_start, region_end=region_end, 317 ) 318 pre_mrna = SeqMat(**seq_data) 319 except Exception: 320 if region_start is not None or region_end is not None: 321 lo = region_start if region_start is not None else max(1, self.transcript_lower - upstream) 322 hi = region_end if region_end is not None else (self.transcript_upper + downstream) 323 else: 324 lo = max(1, self.transcript_lower - upstream) 325 hi = self.transcript_upper + downstream 326 length = hi - lo + 1 327 indices = np.arange(lo, lo + length) 328 pre_mrna = SeqMat("N" * length, indices=indices) 329 if self.rev: 330 pre_mrna.reverse_complement() 331 self.pre_mrna = pre_mrna 332 return self
Load pre-mRNA and set self.pre_mrna. Use upstream/downstream to add flanking (bp).
Optional region_start/region_end restrict the FASTA read to a genomic sub-interval (e.g. ±7500 bp around a mutation site) instead of fetching the entire transcript, which can be orders of magnitude faster for large genes.
334 def generate_mature_mrna(self, inplace: bool = True) -> Union['Transcript', SeqMat]: 335 """ 336 Generate the mature mRNA by concatenating exon regions from pre_mRNA. 337 338 When pre-mRNA has not been loaded yet, exons are fetched directly from 339 the indexed FASTA — skipping the full-gene read entirely. 340 341 Args: 342 inplace: If True, set self.mature_mrna, else return a new SeqMat 343 344 Returns: 345 The Transcript object (if inplace=True) or a SeqMat (if inplace=False) 346 """ 347 self._fix_and_check_introns() 348 349 if self._pre_mrna is not None: 350 # Pre-mRNA loaded (possibly mutated): splice from it 351 result = self.pre_mrna.remove_regions(self.introns) 352 else: 353 # Fast path: fetch exons directly, skip full-gene read 354 try: 355 result = self._assemble_mature_from_fasta() 356 except Exception: 357 # Fall back to full pre-mRNA load + splice 358 result = self.pre_mrna.remove_regions(self.introns) 359 360 if inplace: 361 self.mature_mrna = result 362 return self 363 return result
Generate the mature mRNA by concatenating exon regions from pre_mRNA.
When pre-mRNA has not been loaded yet, exons are fetched directly from the indexed FASTA — skipping the full-gene read entirely.
Arguments:
- inplace: If True, set self.mature_mrna, else return a new SeqMat
Returns:
The Transcript object (if inplace=True) or a SeqMat (if inplace=False)
365 @property 366 def orf(self) -> Union[SeqMat, 'Transcript']: 367 """ 368 Return the ORF (Open Reading Frame) SeqMat object, if TIS and TTS are available. 369 370 Returns: 371 The ORF SeqMat if TIS/TTS are set, else self 372 """ 373 if not self.protein_coding: 374 import logging as _logging 375 _logging.getLogger(__name__).warning( 376 "Cannot create protein without set TIS and TTS values." 377 ) 378 return self 379 380 # Extract ORF region from mature mRNA via clone (no string re-parse) 381 if hasattr(self, 'mature_mrna') and self.mature_mrna is not None: 382 return self.mature_mrna.clone(self.TIS, self.TTS) 383 384 return self
Return the ORF (Open Reading Frame) SeqMat object, if TIS and TTS are available.
Returns:
The ORF SeqMat if TIS/TTS are set, else self
386 def generate_protein(self, inplace: bool = True) -> Union['Transcript', str]: 387 """ 388 Translate the ORF into a protein sequence. 389 390 Args: 391 inplace: If True, store protein in self. Otherwise, return it 392 393 Returns: 394 The Transcript object if inplace=True, else the protein sequence 395 """ 396 if not self.protein_coding: 397 return self if inplace else "" 398 399 # Translate the ORF to protein 400 orf_seq = self.orf 401 if isinstance(orf_seq, SeqMat): 402 protein = _translate(orf_seq.seq).strip('*') 403 else: 404 protein = "" 405 406 if inplace: 407 self.protein = protein 408 if self._cons_vector is not None and len(self._cons_vector) != len(protein): 409 self._cons_vector = np.ones(len(protein), dtype=np.float32) 410 return self 411 412 return protein
Translate the ORF into a protein sequence.
Arguments:
- inplace: If True, store protein in self. Otherwise, return it
Returns:
The Transcript object if inplace=True, else the protein sequence
115def get_default_organism() -> str: 116 """Default organism: from SEQMAT_DEFAULT_ORGANISM in config-less mode, else from config file.""" 117 if get_data_base() is not None: 118 return os.environ.get("SEQMAT_DEFAULT_ORGANISM", "").strip() or "hg38" 119 config = load_config() 120 return config.get('default_organism', DEFAULT_SETTINGS['default_organism'])
Default organism: from SEQMAT_DEFAULT_ORGANISM in config-less mode, else from config file.
206def get_data_dir() -> Path: 207 """Return user data directory for seqmat (platformdirs).""" 208 return DEFAULT_DATA_DIR
Return user data directory for seqmat (platformdirs).
211def get_config_dir() -> Path: 212 """Return the directory containing the active config file.""" 213 return DEFAULT_CONFIG_DIR
Return the directory containing the active config file.
216def get_config_file() -> Path: 217 """Return the active config file path (for display or override).""" 218 return CONFIG_FILE
Return the active config file path (for display or override).
106def get_data_base() -> Optional[Path]: 107 """If SEQMAT_DATA_DIR is set and exists, return that path (config-less mode). Else None.""" 108 env = os.environ.get("SEQMAT_DATA_DIR", "").strip() 109 if not env: 110 return None 111 p = Path(env).expanduser().resolve() 112 return p if p.exists() else None
If SEQMAT_DATA_DIR is set and exists, return that path (config-less mode). Else None.
90def load_config() -> Dict[str, Any]: 91 """Load config from CONFIG_FILE, merged with DEFAULT_SETTINGS.""" 92 if CONFIG_FILE.exists(): 93 with open(CONFIG_FILE, "r") as f: 94 config = json.load(f) 95 merged_config = DEFAULT_SETTINGS.copy() 96 merged_config.update(config) 97 return merged_config 98 return DEFAULT_SETTINGS.copy()
Load config from CONFIG_FILE, merged with DEFAULT_SETTINGS.
100def save_config(config: Dict[str, Any]) -> None: 101 """Save config to CONFIG_FILE (creates parent dir if needed).""" 102 CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True) 103 with open(CONFIG_FILE, "w") as f: 104 json.dump(config, f, indent=2)
Save config to CONFIG_FILE (creates parent dir if needed).
72def setup_genomics_data( 73 basepath: str, 74 organism: Optional[str] = None, 75 force: bool = False, 76 pickup: bool = False, 77 n_jobs: Optional[int] = None, 78 from_prebuilt: bool = True, 79) -> None: 80 """Set up genomics data for a specific organism. 81 82 Args: 83 basepath: Base directory for storing genomic data. 84 organism: Organism identifier (e.g. ``"hg38"`` or ``"mm39"``). Uses the configured 85 default if omitted. 86 force: Overwrite existing data. 87 pickup: Resume an interrupted setup, reusing any already-downloaded files. 88 n_jobs: Workers for GTF parsing (default: CPU count - 1). ``1`` is serial. 89 from_prebuilt: When ``True`` (default), download prebuilt ``genes.db`` + FASTA from S3. 90 When ``False``, download GTF/FASTA from Ensembl/UCSC and build ``genes.db`` locally. 91 """ 92 if organism is None: 93 organism = get_default_organism() 94 base_path = Path(basepath) / organism 95 96 config_paths = { 97 "CHROM_SOURCE": str(base_path), 98 "MRNA_PATH": str(base_path), 99 "MISSPLICING_PATH": str(base_path / "missplicing"), 100 "ONCOSPLICE_PATH": str(base_path / "oncosplice"), 101 "BASE": str(base_path), 102 "TEMP": str(base_path / "temp"), 103 } 104 105 config = load_config() 106 if organism in config and not force and not pickup: 107 _log.warning( 108 "Organism %s already configured. Use force=True to overwrite or pickup=True to resume.", 109 organism, 110 ) 111 return 112 113 if base_path.exists() and any(base_path.iterdir()) and not force and not pickup: 114 _log.warning( 115 "Directory %s not empty. Use force=True to overwrite or pickup=True to resume.", 116 base_path, 117 ) 118 return 119 120 _log.info("Setting up genomics data in %s", base_path) 121 base_path.mkdir(parents=True, exist_ok=True) 122 123 if from_prebuilt: 124 _log.info("Downloading prebuilt data for %s from S3...", organism) 125 try: 126 files = download_prebuilt_data(organism, base_path, skip_existing=pickup) 127 except PrebuiltDataUnavailableError as e: 128 _log.error("%s", e) 129 raise 130 config_paths["fasta_full_genome"] = str(files["fasta_file"]) 131 config_paths["genes_db"] = str(files["genes_db"]) 132 else: 133 _log.info("Downloading source data and building genes.db for %s...", organism) 134 files = download_genome_data(organism, base_path, skip_existing=pickup) 135 config_paths["fasta_full_genome"] = str(files["fasta_file"]) 136 cons_data = None 137 if files["cons_file"] is not None: 138 try: 139 cons_data = load_conservation(files["cons_file"]) 140 except Exception: 141 cons_data = None 142 annotation_jobs = n_jobs if n_jobs is not None else max(1, cpu_count() - 1) 143 retrieve_and_parse_ensembl_annotations( 144 base_path, 145 files["ensembl_file"], 146 cons_data, 147 gtex_file=files["gtex_file"], 148 n_jobs=annotation_jobs, 149 ) 150 config_paths["genes_db"] = str(base_path / "genes.db") 151 if not pickup: 152 for key, file_path in files.items(): 153 if key == "fasta_file": 154 continue 155 if file_path and file_path.exists(): 156 file_path.unlink() 157 158 for path_key in ("MISSPLICING_PATH", "ONCOSPLICE_PATH", "TEMP"): 159 Path(config_paths[path_key]).mkdir(parents=True, exist_ok=True) 160 161 if get_data_base() is None: 162 config[organism] = config_paths 163 save_config(config) 164 _log.info("Configuration saved to: %s", CONFIG_FILE) 165 else: 166 _log.info("SEQMAT_DATA_DIR is set; no config file written.") 167 _append_seqmat_env_to_shell(Path(basepath).resolve(), organism) 168 _log.info("Successfully set up genomics data for %s in %s", organism, basepath)
Set up genomics data for a specific organism.
Arguments:
- basepath: Base directory for storing genomic data.
- organism: Organism identifier (e.g.
"hg38"or"mm39"). Uses the configured default if omitted. - force: Overwrite existing data.
- pickup: Resume an interrupted setup, reusing any already-downloaded files.
- n_jobs: Workers for GTF parsing (default: CPU count - 1).
1is serial. - from_prebuilt: When
True(default), download prebuiltgenes.db+ FASTA from S3. WhenFalse, download GTF/FASTA from Ensembl/UCSC and buildgenes.dblocally.
171def set_fasta_path(fasta_path: str, organism: Optional[str] = None) -> None: 172 """Set the full genome FASTA path for an organism in the saved config. 173 174 Useful when you have a FASTA but haven't run the full setup, or need to point 175 SeqMat at a different reference build. 176 """ 177 if organism is None: 178 organism = get_default_organism() 179 fa_path = Path(fasta_path) 180 if not fa_path.exists(): 181 raise ValueError(f"FASTA file not found: {fa_path}") 182 183 config = load_config() 184 config.setdefault(organism, {}) 185 config[organism]["fasta_full_genome"] = str(fa_path) 186 save_config(config) 187 _log.info("Set fasta_full_genome for %s to: %s (config: %s)", organism, fa_path, CONFIG_FILE)
Set the full genome FASTA path for an organism in the saved config.
Useful when you have a FASTA but haven't run the full setup, or need to point SeqMat at a different reference build.
26def list_available_organisms() -> List[str]: 27 """Organism IDs that are currently configured/installed.""" 28 return get_available_organisms()
Organism IDs that are currently configured/installed.
31def list_supported_organisms() -> List[str]: 32 """Alias of :func:`list_available_organisms` (kept for back-compat).""" 33 return get_available_organisms()
Alias of list_available_organisms() (kept for back-compat).
197def get_organism_info(organism: str) -> Dict[str, Any]: 198 """Detailed information about an organism: paths + biotype-keyed gene counts.""" 199 try: 200 config = get_organism_config(organism) 201 except ValueError: 202 return {"error": f"Organism '{organism}' not configured"} 203 204 info: Dict[str, Any] = { 205 "organism": organism, 206 "configured": True, 207 "paths": {k: str(v) for k, v in config.items()}, 208 "data_available": {}, 209 } 210 211 counts = count_genes(organism) 212 if counts: 213 info["data_available"]["biotypes"] = sorted(counts.keys()) 214 info["data_available"]["gene_counts"] = counts 215 216 chrom_path = config.get("CHROM_SOURCE") 217 if chrom_path and Path(chrom_path).exists(): 218 chrom_files = list(Path(chrom_path).glob("*.fasta")) 219 if chrom_files: 220 info["data_available"]["chromosomes"] = [f.stem for f in chrom_files] 221 222 return info
Detailed information about an organism: paths + biotype-keyed gene counts.
52def list_gene_biotypes(organism: Optional[str] = None) -> List[str]: 53 """List distinct ``gene_biotype`` values present in ``genes.db``.""" 54 conn = _connect(organism) 55 if conn is None: 56 return [] 57 try: 58 rows = conn.execute("SELECT DISTINCT biotype FROM genes ORDER BY biotype").fetchall() 59 finally: 60 conn.close() 61 return [r[0] for r in rows if r[0]]
List distinct gene_biotype values present in genes.db.
64def count_genes(organism: Optional[str] = None, biotype: Optional[str] = None) -> Dict[str, int]: 65 """Count genes per biotype. 66 67 If ``biotype`` is given, returns ``{biotype: count}`` for that one biotype. 68 Otherwise returns a dict of all biotypes -> counts. 69 """ 70 conn = _connect(organism) 71 if conn is None: 72 return {} 73 try: 74 if biotype: 75 row = conn.execute( 76 "SELECT COUNT(*) FROM genes WHERE biotype = ?", (biotype,) 77 ).fetchone() 78 return {biotype: int(row[0])} if row else {biotype: 0} 79 rows = conn.execute( 80 "SELECT biotype, COUNT(*) FROM genes GROUP BY biotype ORDER BY biotype" 81 ).fetchall() 82 finally: 83 conn.close() 84 return {bt: int(n) for bt, n in rows if bt}
Count genes per biotype.
If biotype is given, returns {biotype: count} for that one biotype.
Otherwise returns a dict of all biotypes -> counts.
87def get_gene_list( 88 organism: Optional[str] = None, 89 biotype: Optional[str] = None, 90 limit: Optional[int] = None, 91) -> List[str]: 92 """List gene names (skipping empty Ensembl symbols), optionally filtered by biotype.""" 93 conn = _connect(organism) 94 if conn is None: 95 return [] 96 sql = "SELECT gene_name FROM genes WHERE gene_name != ''" 97 params: tuple = () 98 if biotype: 99 sql += " AND biotype = ?" 100 params = (biotype,) 101 sql += " ORDER BY gene_name" 102 if limit: 103 sql += " LIMIT ?" 104 params = (*params, int(limit)) 105 try: 106 rows = conn.execute(sql, params).fetchall() 107 finally: 108 conn.close() 109 return [r[0] for r in rows]
List gene names (skipping empty Ensembl symbols), optionally filtered by biotype.
112def get_all_genes( 113 organism: Optional[str] = None, 114 biotype: Optional[str] = None, 115) -> List[Dict[str, str]]: 116 """Return all genes for an organism as ``[{organism, biotype, gene_name, gene_id}, ...]``.""" 117 organism = organism or "<default>" 118 conn = _connect(organism if organism != "<default>" else None) 119 if conn is None: 120 return [] 121 sql = "SELECT gene_name, gene_id, biotype FROM genes" 122 params: tuple = () 123 if biotype: 124 sql += " WHERE biotype = ?" 125 params = (biotype,) 126 sql += " ORDER BY gene_name" 127 try: 128 rows = conn.execute(sql, params).fetchall() 129 finally: 130 conn.close() 131 return [ 132 {"organism": organism, "biotype": bt, "gene_name": gn, "gene_id": gid} 133 for gn, gid, bt in rows 134 ]
Return all genes for an organism as [{organism, biotype, gene_name, gene_id}, ...].
225def data_summary() -> Dict[str, Any]: 226 """A complete data overview across every configured organism.""" 227 configured = list_available_organisms() 228 summary: Dict[str, Any] = { 229 "supported_organisms": configured, 230 "configured_organisms": configured, 231 "organisms": {}, 232 } 233 for organism in configured: 234 try: 235 summary["organisms"][organism] = get_organism_info(organism) 236 except Exception as e: # pragma: no cover - defensive 237 summary["organisms"][organism] = {"error": f"Configuration error: {e}"} 238 239 total_genes = 0 240 total_biotypes: set = set() 241 for org_info in summary["organisms"].values(): 242 gc = org_info.get("data_available", {}).get("gene_counts", {}) 243 for biotype, count in gc.items(): 244 total_genes += count 245 total_biotypes.add(biotype) 246 summary["totals"] = { 247 "organisms": len(configured), 248 "biotypes": len(total_biotypes), 249 "genes": total_genes, 250 } 251 return summary
A complete data overview across every configured organism.
254def print_data_summary() -> None: 255 """Print a formatted summary of all installed genomics data.""" 256 from .config import DEFAULT_ORGANISM_DATA 257 258 summary = data_summary() 259 print("SeqMat Genomics Data Summary") 260 print("=" * 40) 261 totals = summary["totals"] 262 print(f"Total: {totals['organisms']} organism(s), {totals['biotypes']} biotype(s), {totals['genes']:,} gene(s)\n") 263 264 print("Configured organisms:") 265 configured = set(summary["configured_organisms"]) 266 for org in sorted(DEFAULT_ORGANISM_DATA.keys() | configured): 267 status = "configured" if org in configured else "not configured" 268 name = DEFAULT_ORGANISM_DATA.get(org, {}).get("name", org) 269 print(f" - {org}: {name} [{status}]") 270 print() 271 272 for organism, info in summary["organisms"].items(): 273 print(f"{organism.upper()} data:") 274 if "error" in info: 275 print(f" error: {info['error']}") 276 continue 277 data_avail = info.get("data_available", {}) 278 if "gene_counts" in data_avail: 279 print(" Gene types:") 280 for biotype, count in sorted(data_avail["gene_counts"].items()): 281 print(f" {biotype}: {count:,}") 282 if "chromosomes" in data_avail: 283 chroms = data_avail["chromosomes"] 284 preview = ", ".join(sorted(chroms)[:5]) + ("..." if len(chroms) > 5 else "") 285 print(f" Chromosomes ({len(chroms)}): {preview}") 286 print(" Paths:") 287 for path_name, path_value in info["paths"].items(): 288 ok = "✓" if Path(path_value).exists() else "✗" 289 print(f" {ok} {path_name}: {path_value}") 290 print()
Print a formatted summary of all installed genomics data.
137def search_genes( 138 organism: Optional[str] = None, 139 query: str = "", 140 biotype: Optional[str] = None, 141 limit: int = 10, 142) -> List[Dict[str, str]]: 143 """Search genes by ``gene_name`` or ``gene_id`` substring (case-insensitive).""" 144 if not query: 145 return [] 146 conn = _connect(organism) 147 if conn is None: 148 return [] 149 pattern = f"%{query}%" 150 sql = ( 151 "SELECT gene_name, gene_id, biotype FROM genes " 152 "WHERE (UPPER(gene_name) LIKE UPPER(?) OR UPPER(gene_id) LIKE UPPER(?))" 153 ) 154 params: tuple = (pattern, pattern) 155 if biotype: 156 sql += " AND biotype = ?" 157 params = (*params, biotype) 158 sql += " ORDER BY gene_name LIMIT ?" 159 params = (*params, int(limit)) 160 try: 161 rows = conn.execute(sql, params).fetchall() 162 finally: 163 conn.close() 164 org_label = organism or "<default>" 165 return [ 166 {"organism": org_label, "biotype": bt, "gene_name": gn, "gene_id": gid} 167 for gn, gid, bt in rows 168 ]
Search genes by gene_name or gene_id substring (case-insensitive).
171def available_genes(organism: str = "hg38", biotype: Optional[str] = "protein_coding") -> Iterator[str]: 172 """Yield distinct gene symbols installed for an organism. 173 174 Defaults to protein-coding genes (matches the legacy filesystem-backed behavior). 175 Pass ``biotype=None`` to stream every biotype. Empty Ensembl symbols are skipped. 176 """ 177 conn = _connect(organism) 178 if conn is None: 179 return 180 sql = "SELECT DISTINCT gene_name FROM genes WHERE gene_name != ''" 181 params: tuple = () 182 if biotype is not None: 183 sql += " AND biotype = ?" 184 params = (biotype,) 185 sql += " ORDER BY gene_name" 186 try: 187 for (name,) in conn.execute(sql, params): 188 yield name 189 finally: 190 conn.close()
Yield distinct gene symbols installed for an organism.
Defaults to protein-coding genes (matches the legacy filesystem-backed behavior).
Pass biotype=None to stream every biotype. Empty Ensembl symbols are skipped.
201def gene_names_at_position( 202 chrm: str, pos: PosArg, organism: Optional[str] = None 203) -> List[str]: 204 """Fast name-only lookup: gene names overlapping a point or range. No BLOB load.""" 205 return _get_index(organism).query_names(chrm, pos)
Fast name-only lookup: gene names overlapping a point or range. No BLOB load.
184def build_location_index(organism: Optional[str] = None, force: bool = False) -> Path: 185 """Build (or rebuild) the on-disk location index for an organism. 186 187 Returns the sidecar path. Use ``force=True`` to overwrite an existing index. 188 """ 189 if organism is None: 190 organism = get_default_organism() 191 npz_path = _locations_path(organism) 192 if npz_path.exists() and not force: 193 _INDEX_CACHE.pop(organism, None) 194 _get_index(organism) 195 return npz_path 196 _INDEX_CACHE.pop(organism, None) 197 _get_index(organism, rebuild=True) 198 return npz_path
Build (or rebuild) the on-disk location index for an organism.
Returns the sidecar path. Use force=True to overwrite an existing index.
105def build_lmdb( 106 annotations_dir: Optional[str] = None, 107 output_path: Optional[str] = None, 108 organism: Optional[str] = None, 109) -> str: 110 """Build LMDB from per-gene pickle files. Requires pip install seqmat[lmdb].""" 111 _require_lmdb() 112 if organism is None: 113 organism = get_default_organism() 114 if annotations_dir is None: 115 config = get_organism_config(organism) 116 annotations_dir = str(config["MRNA_PATH"]) 117 ann_path = Path(annotations_dir) 118 if not ann_path.exists(): 119 raise FileNotFoundError(f"Annotations directory not found: {ann_path}") 120 if output_path is None: 121 output_path = str(ann_path / "genes.lmdb") 122 out = Path(output_path) 123 if out.exists(): 124 shutil.rmtree(str(out)) 125 pkl_files = sorted(ann_path.glob("**/*.pkl")) 126 if not pkl_files: 127 raise FileNotFoundError(f"No .pkl files found under {ann_path}") 128 total_bytes = sum(f.stat().st_size for f in pkl_files) 129 map_size = int(total_bytes * 1.5) + 10 * 1024 * 1024 130 env = _lmdb.open(str(out), map_size=map_size, max_dbs=0) 131 genes_written = 0 132 skipped = 0 133 total_size = 0 134 with env.begin(write=True) as txn: 135 for pkl_file in pkl_files: 136 try: 137 raw_bytes = pkl_file.read_bytes() 138 gene_name = _gene_name_from_pkl_stem(pkl_file.stem) 139 txn.put(gene_name.encode("utf-8"), raw_bytes) 140 genes_written += 1 141 total_size += len(raw_bytes) 142 except Exception as exc: 143 _log.warning("Skipped %s: %s", pkl_file.name, exc) 144 skipped += 1 145 env.close() 146 _log.info( 147 "LMDB built at %s — %s genes, %.1f MB%s", 148 out, f"{genes_written:,}", total_size / (1024 * 1024), 149 f", {skipped} skipped" if skipped else "", 150 ) 151 return str(out)
Build LMDB from per-gene pickle files. Requires pip install seqmat[lmdb].
Base class for all SeqMat exceptions.
14class GeneNotFoundError(SeqMatError, LookupError): 15 """Raised when a gene cannot be located in the configured organism's database."""
Raised when a gene cannot be located in the configured organism's database.
18class OrganismNotConfiguredError(SeqMatError, ValueError): 19 """Raised when an organism has no data set up (run ``seqmat setup``)."""
Raised when an organism has no data set up (run seqmat setup).