seqmat

SeqMat — fast, vectorized genomic sequences with first-class mutation tracking.

 1"""SeqMat — fast, vectorized genomic sequences with first-class mutation tracking."""
 2
 3__version__ = "1.5.1"
 4__author__ = "Nicolas Lynn Vila"
 5__email__ = "nicolasalynn@gmail.com"
 6
 7# Core classes
 8from .seqmat import SeqMat
 9from .gene import Gene
10from .transcript import Transcript
11
12# Config / paths
13from .config import (
14    get_config_dir,
15    get_config_file,
16    get_data_base,
17    get_data_dir,
18    get_default_organism,
19    load_config,
20    save_config,
21)
22
23# Optional LMDB backend
24from .lmdb_store import build_lmdb
25
26# Position → gene lookup
27from .locator import build_location_index, gene_names_at_position
28
29# Data setup
30from .data_setup import set_fasta_path, setup_genomics_data
31
32# Discovery
33from .discovery import (
34    available_genes,
35    count_genes,
36    data_summary,
37    get_all_genes,
38    get_gene_list,
39    get_organism_info,
40    list_available_organisms,
41    list_gene_biotypes,
42    list_supported_organisms,
43    print_data_summary,
44    search_genes,
45)
46
47# Errors
48from .errors import (
49    DataUnavailableError,
50    GeneNotFoundError,
51    OrganismNotConfiguredError,
52    PrebuiltDataUnavailableError,
53    SeqMatError,
54)
55
56__all__ = [
57    # Core
58    "SeqMat", "Gene", "Transcript",
59    # Config / paths
60    "get_default_organism", "get_data_dir", "get_config_dir",
61    "get_config_file", "get_data_base", "load_config", "save_config",
62    # Setup
63    "setup_genomics_data", "set_fasta_path",
64    # Discovery
65    "list_available_organisms", "list_supported_organisms", "get_organism_info",
66    "list_gene_biotypes", "count_genes", "get_gene_list", "get_all_genes",
67    "data_summary", "print_data_summary", "search_genes", "available_genes",
68    # Position lookup
69    "gene_names_at_position", "build_location_index",
70    # LMDB
71    "build_lmdb",
72    # Errors
73    "SeqMatError", "GeneNotFoundError", "OrganismNotConfiguredError",
74    "DataUnavailableError", "PrebuiltDataUnavailableError",
75]
@dataclass(slots=True)
class SeqMat:
 46@dataclass(slots=True)
 47class SeqMat:
 48    """Sequence matrix with indices, mutations (SNP/ins/del), complement, slicing, remove_regions, from_fasta."""
 49
 50    name: str = field(default="wild_type")
 51    version: str = field(default="1.0")
 52    source: str = field(default="Unknown")
 53    notes: dict = field(default_factory=dict)
 54
 55    seq_array: np.ndarray = field(init=False, repr=False)
 56    insertions: Dict[int, List[Tuple[int, str]]] = field(default_factory=lambda: defaultdict(list), init=False, repr=False)
 57    mutations: List[Dict[str, Any]] = field(default_factory=list, init=False, repr=False)
 58    mutated_positions: set[int] = field(default_factory=set, init=False, repr=False)
 59    rev: bool = field(default=False, init=False, repr=False)
 60    predicted_splicing: Optional["pd.DataFrame"] = field(default=None, init=False, repr=False)
 61    _cow: bool = field(default=False, init=False, repr=False)
 62
 63    def __init__(
 64        self,
 65        nucleotides: Union[str, np.ndarray] = "",
 66        indices: Optional[np.ndarray] = None,
 67        conservation: Optional[np.ndarray] = None,
 68        reference: Optional[Union[str, np.ndarray]] = None,
 69        name: str = 'wild_type',
 70        source: Optional[str] = None,
 71        version: str = '1.0',
 72        notes: Optional[dict] = None,
 73        rev: bool = False,
 74        seq: Optional[str] = None,
 75    ) -> None:
 76        if seq is not None and not nucleotides:
 77            nucleotides = seq
 78        self.name = name
 79        self.version = version
 80        self.source = source or "Unknown"
 81        self.notes = notes or {}
 82        self.rev = rev
 83        self.predicted_splicing = None
 84        self._cow = False
 85        self.insertions = defaultdict(list)
 86        self.mutations = []
 87        self.mutated_positions = set()
 88
 89        # Prepare sequence
 90        if isinstance(nucleotides, str):
 91            nts = np.frombuffer(nucleotides.encode('ascii'), dtype='S1')
 92        else:
 93            nts = np.asarray(nucleotides, dtype='S1')
 94        L = len(nts)
 95
 96        # Indices default to 1-based
 97        if indices is None:
 98            indices = np.arange(1, L+1, dtype=np.int64)
 99        else:
100            indices = np.asarray(indices, dtype=np.int64)
101        if len(indices) != L:
102            raise ValueError(f"Indices length {len(indices)} != sequence length {L}")
103
104        # Structured dtype
105        dtype = np.dtype([
106            ('nt', 'S1'), ('index', np.int64), ('subidx', np.int16),
107            ('ref', 'S1'), ('cons', np.float32), ('mut_type', 'S10'), ('valid', bool)
108        ])
109        arr = np.zeros(L, dtype=dtype)
110        arr['nt'] = nts
111        arr['index'] = indices
112        arr['subidx'] = 0
113
114        # Reference
115        if reference is None:
116            arr['ref'] = nts
117        else:
118            if isinstance(reference, str):
119                ref_arr = np.array(list(reference), dtype='S1')
120            else:
121                ref_arr = np.array(reference, dtype='S1')
122            if len(ref_arr) != L:
123                raise ValueError("Reference length mismatch")
124            arr['ref'] = ref_arr
125
126        # Conservation
127        if conservation is not None:
128            cons = np.asarray(conservation, dtype=np.float32)
129            if len(cons) != L:
130                raise ValueError("Conservation length mismatch")
131            arr['cons'] = cons
132        else:
133            arr['cons'] = 0.0
134
135        arr['mut_type'] = b''
136        arr['valid'] = arr['nt'] != b'-'
137        self.seq_array = arr
138
139    @classmethod
140    def from_fasta(
141        cls,
142        genome: str,
143        chrom: str,
144        start: int,
145        end: int,
146        source_fasta: Optional[Path] = None,
147        **kwargs
148    ) -> SeqMat:
149        """
150        Load a genomic interval from FASTA.
151
152        Args:
153            genome: Genome identifier (e.g. 'hg38')
154            chrom: Chromosome name
155            start: Start position (1-based)
156            end: End position (1-based, inclusive)
157            source_fasta: Path to the FASTA file. If None, uses 'fasta_full_genome' from config.
158            **kwargs: Additional arguments for SeqMat constructor
159
160        Returns:
161            SeqMat object containing the requested sequence
162        """
163        if source_fasta is None:
164            config = get_organism_config(genome)
165            # Prefer single full-genome FASTA (no per-chromosome split needed)
166            source_fasta = config.get('fasta_full_genome') or config.get('fasta')
167            if source_fasta is None:
168                # Fall back to per-chromosome files for backward compatibility
169                chrom_source = config.get('CHROM_SOURCE')
170                if chrom_source:
171                    chrom_path = Path(chrom_source)
172                    chrom_file = chrom_path / f"{chrom}.fasta"
173                    if chrom_file.exists():
174                        source_fasta = chrom_file
175            if source_fasta is None:
176                raise ValueError(
177                    f"No FASTA configured for genome '{genome}'. Set 'fasta_full_genome' or 'fasta' in config, "
178                    f"or provide per-chromosome files in CHROM_SOURCE. Run setup_genomics_data() or set_fasta_path()."
179                )
180
181        seq = _fetch_fasta_region(str(source_fasta), chrom, start, end)
182        indices = np.arange(start, end+1, dtype=np.int64)
183        return cls(nucleotides=seq, indices=indices, name=f"{chrom}:{start}-{end}", source=genome, **kwargs)
184
185    @classmethod
186    def from_fasta_file(cls, fasta_path: Union[str, Path], chrom: str, start: int, end: int, **kwargs) -> SeqMat:
187        """Load a genomic interval directly from a FASTA file path"""
188        seq = _fetch_fasta_region(str(fasta_path), chrom, start, end)
189        indices = np.arange(start, end+1, dtype=np.int64)
190        return cls(nucleotides=seq, indices=indices, name=f"{chrom}:{start}-{end}", **kwargs)
191
192    def __len__(self) -> int:
193        """Return the number of valid bases in the sequence."""
194        return int(self.seq_array['valid'].sum())
195
196    def __repr__(self) -> str:
197        """Return a concise representation of the SeqMat object."""
198        return f"<SeqMat {self.name}: {len(self)} bp, {len(self.mutated_positions)} muts>"
199
200    @property
201    def seq(self) -> str:
202        """Return the current sequence as a string (only valid bases)."""
203        return self.seq_array['nt'][self.seq_array['valid']].tobytes().decode()
204
205    @property
206    def reference_seq(self) -> str:
207        """Return the reference sequence as a string (only valid bases)."""
208        return self.seq_array['ref'][self.seq_array['valid']].tobytes().decode()
209
210    def alignment(self) -> Tuple[str, str]:
211        """Return gapped reference and mutated sequences aligned to each other.
212
213        Deletions appear as '-' in the mutated string, insertions appear as '-'
214        in the reference string, and SNPs show the differing bases at the same
215        position.
216
217        Returns:
218            (reference, mutated) tuple of equal-length strings.
219        """
220        arr = self.seq_array
221        ref = arr['ref'].tobytes().decode()
222        mut = np.where(arr['valid'], arr['nt'], b'-').tobytes().decode()
223        return ref, mut
224
225    @property
226    def index(self) -> np.ndarray:
227        """Return the genomic indices of valid bases."""
228        return self.seq_array['index'][self.seq_array['valid']]
229
230    @property
231    def mutation_vector(self) -> np.ndarray:
232        """Return a binary vector indicating mutated positions (1=mutated, 0=not mutated)."""
233        valid_mask = self.seq_array['valid']
234        indices = self.seq_array['index'][valid_mask]
235        mutated_array = np.array(list(self.mutated_positions), dtype=np.int64)
236        return np.isin(indices, mutated_array).astype(np.int8)
237
238    def _refresh_mutation_state(self) -> None:
239        """Update mutation tracking based on current state."""
240        self._ensure_writable()
241        # Clear
242        self.seq_array['mut_type'] = b''
243        self.mutated_positions.clear()
244        # SNPs
245        snp = (self.seq_array['ref'] != self.seq_array['nt']) & self.seq_array['valid']
246        self.seq_array['mut_type'][snp] = b'snp'
247        self.mutated_positions.update(self.seq_array['index'][snp].tolist())
248        # insertions
249        for pos in self.insertions:
250            self.mutated_positions.add(pos)
251
252    def _ensure_writable(self) -> None:
253        """Copy seq_array if it is shared via copy-on-write."""
254        if self._cow:
255            self.seq_array = self.seq_array.copy()
256            self._cow = False
257
258    def __deepcopy__(self, memo: dict) -> SeqMat:
259        """COW-aware deep copy — shares seq_array, copies on first write."""
260        new = object.__new__(type(self))
261        new.name = self.name
262        new.version = self.version
263        new.source = self.source
264        new.notes = copy.deepcopy(self.notes, memo)
265        new.rev = self.rev
266        new.predicted_splicing = copy.deepcopy(self.predicted_splicing, memo)
267        new.insertions = copy.deepcopy(self.insertions, memo)
268        new.mutations = copy.deepcopy(self.mutations, memo)
269        new.mutated_positions = set(self.mutated_positions)
270        # COW: share seq_array
271        new.seq_array = self.seq_array
272        new._cow = True
273        self._cow = True
274        return new
275
276    def clone(self, start: Optional[int] = None, end: Optional[int] = None) -> SeqMat:
277        """
278        Create a copy of this SeqMat, optionally sliced to a specific range.
279
280        Args:
281            start: Start position (genomic coordinate)
282            end: End position (genomic coordinate)
283
284        Returns:
285            A new SeqMat object
286        """
287        new = copy.copy(self)
288        new.notes = copy.deepcopy(self.notes)
289        new.insertions = copy.deepcopy(self.insertions)
290        new.mutations = copy.deepcopy(self.mutations)
291        new.mutated_positions = set(self.mutated_positions)
292        if start is not None or end is not None:
293            lo = start if start is not None else self.index.min()
294            hi = end if end is not None else self.index.max()
295            lo, hi = min(lo, hi), max(lo, hi)
296            mask = (self.seq_array['index'] >= lo) & (self.seq_array['index'] <= hi)
297            new.seq_array = self.seq_array[mask].copy()
298            new._cow = False
299        else:
300            # COW: share seq_array, copy on first write
301            new.seq_array = self.seq_array
302            new._cow = True
303            self._cow = True
304        return new
305
306    def __getitem__(
307        self, key: Union[int, slice, Tuple[int, int]]
308    ) -> Union[np.void, SeqMat]:
309        """
310        Access sequence by genomic position or range.
311        
312        Args:
313            key: Position (int), slice, or (start, end) tuple
314            
315        Returns:
316            Single base record (if int) or new SeqMat (if slice/tuple)
317        """
318        if isinstance(key, int):
319            mask = self.seq_array['index'] == key
320            if not mask.any():
321                raise KeyError(f"{key} not in SeqMat")
322            return self.seq_array[mask][0]
323        if isinstance(key, slice) or (isinstance(key, tuple) and len(key) == 2):
324            if isinstance(key, slice):
325                lo, hi = key.start, key.stop
326            else:
327                lo, hi = key
328            return self.clone(lo, hi)
329        raise TypeError("Index must be int, slice or (start,end)")
330
331    def _classify_mutation(self, ref: str, alt: str) -> str:
332        """Classify a mutation type based on reference and alternate alleles."""
333        if ref == '-':
334            return 'ins'
335        if alt == '-':
336            return 'del'
337        if len(ref) == len(alt) == 1:
338            return 'snp'
339        return 'complex'
340
341    @staticmethod
342    def _all_simple_snps(muts: List[Mutation]) -> bool:
343        """Fast guard: True iff every mutation is a single-base SNP (no '-')."""
344        # Tight C-level any/all over a generator — faster than an explicit loop.
345        DASH = '-'
346        return all(
347            len(ref) == 1 and len(alt) == 1 and ref != DASH and alt != DASH
348            for _, ref, alt in muts
349        )
350
351    def _validate_mutation_batch(
352        self,
353        muts: List[Mutation],
354        *,
355        allow_multiple_insertions: bool = True
356    ) -> bool:
357        """
358        Ensure no two mutations in batch have overlapping reference spans.
359
360        Args:
361            muts: List of (pos, ref, alt) tuples
362            allow_multiple_insertions: Whether to allow multiple insertions at same position
363            
364        Returns:
365            True if valid, False if conflicts found
366        """
367        # Fast path: when every mutation is a single-base SNP, two of them conflict
368        # iff they share a position. O(N) — no span construction, no sort.
369        if self._all_simple_snps(muts):
370            seen: dict = {}
371            for i, (pos, _, _) in enumerate(muts):
372                if pos in seen:
373                    import logging as _logging
374                    _logging.getLogger(__name__).warning(
375                        "Found conflicting mutations:\n  #%d: %s  <-->  #%d: %s",
376                        seen[pos], muts[seen[pos]], i, muts[i],
377                    )
378                    return False
379                seen[pos] = i
380            return True
381
382        # General path: sweep-line sort by start; an interval overlaps with an
383        # earlier one iff it overlaps with the max-end-so-far one.
384        spans = []
385        for i, (pos, ref, alt) in enumerate(muts):
386            is_ins = (ref == '-')
387            if is_ins:
388                start, end = pos, pos
389            else:
390                start, end = pos, pos + len(ref) - 1
391            spans.append((start, end, i, is_ins))
392        spans.sort(key=lambda s: (s[0], s[1]))
393
394        conflicts: List[Tuple[int, int]] = []
395        max_end = -1
396        max_end_owner = -1
397        max_end_is_ins = False
398        for sb, eb, ib, is_ins_b in spans:
399            if sb <= max_end:
400                if not (is_ins_b and max_end_is_ins and allow_multiple_insertions):
401                    conflicts.append((max_end_owner, ib))
402            if eb > max_end:
403                max_end = eb
404                max_end_owner = ib
405                max_end_is_ins = is_ins_b
406
407        if conflicts:
408            import logging as _logging
409            lines = ["Found conflicting mutations:"]
410            for ia, ib in conflicts:
411                lines.append(f"  #{ia}: {muts[ia]}  <-->  #{ib}: {muts[ib]}")
412            _logging.getLogger(__name__).warning("\n".join(lines))
413            return False
414
415        return True
416
417    def apply_mutations(
418        self,
419        mutations: Union[Mutation, List[Mutation]],
420        *,
421        permissive_ref: bool = False
422    ) -> SeqMat:
423        """
424        Apply SNPs/insertions/deletions to the sequence.
425        
426        Mutations are always defined relative to the positive strand. If the sequence
427        is currently on the negative strand, it will be temporarily converted to the
428        positive strand for mutation application, then converted back.
429        
430        Args:
431            mutations: Single mutation or list of (pos, ref, alt) tuples
432            permissive_ref: If True, skip reference validation
433            
434        Returns:
435            Self (for chaining)
436        """
437        if isinstance(mutations, tuple):
438            mutations = [mutations]
439        if not self._validate_mutation_batch(mutations):
440            return self
441
442        self._ensure_writable()
443
444        # Track original strand state
445        was_on_negative_strand = self.rev
446        
447        # If on negative strand, convert to positive strand for mutation application
448        if was_on_negative_strand:
449            self.reverse_complement()  # Convert to positive strand
450            
451        # Snapshot per-batch derived state once instead of recomputing per mutation.
452        valid_index = self.seq_array['index'][self.seq_array['valid']]
453        index_min = int(valid_index[0]) if len(valid_index) else 0
454        index_max = int(valid_index[-1]) if len(valid_index) else -1
455        # Coordinates are sorted (ascending or descending). Bound-check covers both forms.
456        if index_min > index_max:
457            index_min, index_max = index_max, index_min
458
459        # Normalize, classify, and bucket. Pure SNPs (len(ref)==len(alt)==1, neither '-')
460        # share one vectorized fast path; everything else falls through per-mutation.
461        snp_batch: List[Tuple[int, str, str]] = []
462        other: List[Tuple[int, str, str, str]] = []
463        snp_records: List[Dict[str, Any]] = []
464        other_records: List[Dict[str, Any]] = []
465        try:
466            for pos, ref, alt in mutations:
467                while ref != '-' and alt != '-' and ref[0] == alt[0]:
468                    pos += 1
469                    ref = ref[1:] or '-'
470                    alt = alt[1:] or '-'
471                if ref == '-' and alt == '-':
472                    continue
473                if pos < index_min or pos > index_max:
474                    continue
475                if ref != '-' and alt != '-' and len(ref) == 1 and len(alt) == 1:
476                    snp_batch.append((pos, ref, alt))
477                    snp_records.append({'pos': pos, 'ref': ref, 'alt': alt, 'type': 'snp'})
478                else:
479                    typ = self._classify_mutation(ref, alt)
480                    other.append((pos, ref, alt, typ))
481                    other_records.append({'pos': pos, 'ref': ref, 'alt': alt, 'type': typ})
482
483            if snp_batch:
484                applied = self._substitute_batch(snp_batch, permissive_ref)
485                self.mutations.extend(rec for rec, ok in zip(snp_records, applied) if ok)
486
487            for (pos, ref, alt, typ), rec in zip(other, other_records):
488                if typ == 'snp':
489                    self._substitute(pos, ref, alt, permissive_ref)
490                elif typ == 'ins':
491                    self._insert(pos, alt)
492                elif typ == 'del':
493                    self._delete(pos, ref)
494                elif typ == 'complex':
495                    self._delete(pos, ref)
496                    self._insert(pos, alt)
497                self.mutations.append(rec)
498        finally:
499            if was_on_negative_strand:
500                self.reverse_complement()
501
502        # Full refresh is O(seq length); skip when we only ran SNPs (mutated_positions
503        # was already updated in _substitute_batch and mut_type was set in place).
504        if other:
505            self._refresh_mutation_state()
506        return self
507
508    def _substitute_batch(
509        self, snps: List[Tuple[int, str, str]], permissive: bool
510    ) -> np.ndarray:
511        """Apply many single-base substitutions in one vectorized pass.
512
513        All entries must be (pos, ref, alt) with ``len(ref) == len(alt) == 1`` and
514        neither equal to the ``'-'`` sentinel. Returns a bool array of length
515        ``len(snps)`` indicating which mutations were actually applied (positions
516        that don't exist in the index — e.g. removed by splicing — are skipped).
517        """
518        m = len(snps)
519        if m == 0:
520            return np.zeros(0, dtype=bool)
521        positions = np.fromiter((s[0] for s in snps), dtype=np.int64, count=m)
522        refs = np.array([s[1].encode() for s in snps], dtype='S1')
523        alts = np.array([s[2].encode() for s in snps], dtype='S1')
524
525        arr_idx = self.seq_array['index']
526        n = len(arr_idx)
527        applied = np.zeros(m, dtype=bool)
528        if n == 0:
529            return applied
530
531        idxs = np.searchsorted(arr_idx, positions, side='left')
532        bounded = np.clip(idxs, 0, n - 1)
533        present = (idxs < n) & (arr_idx[bounded] == positions)
534        if not present.any():
535            return applied
536
537        sel_idxs = idxs[present]
538        sel_refs = refs[present]
539        sel_alts = alts[present]
540
541        if not permissive:
542            actual = self.seq_array['ref'][sel_idxs]
543            mismatch = actual != sel_refs
544            if mismatch.any():
545                bad_pos = int(positions[present][mismatch][0])
546                raise ValueError(
547                    f"Ref mismatch @{bad_pos}. Use apply_mutations(..., permissive_ref=True) to skip reference validation."
548                )
549
550        self.seq_array['nt'][sel_idxs] = sel_alts
551        self.seq_array['mut_type'][sel_idxs] = b'snp'
552        # Update mutated_positions incrementally — avoids a full O(N) refresh later.
553        self.mutated_positions.update(arr_idx[sel_idxs].tolist())
554        applied[present] = True
555        return applied
556
557    def _substitute(self, pos: int, ref: str, alt: str, permissive: bool) -> None:
558        """Apply a substitution mutation."""
559        arr_idx = self.seq_array['index']
560        i = int(np.searchsorted(arr_idx, pos, side='left'))
561        if i >= len(arr_idx) or arr_idx[i] != pos:
562            return
563        rbytes = np.array(list(ref), dtype='S1')
564        if not permissive and not np.array_equal(self.seq_array["ref"][i : i + len(rbytes)], rbytes):
565            raise ValueError(
566                f"Ref mismatch @{pos}. Use apply_mutations(..., permissive_ref=True) to skip reference validation."
567            )
568        self.seq_array['nt'][i:i+len(rbytes)] = np.array(list(alt), dtype='S1')
569        self.seq_array['mut_type'][i] = b'snp'
570
571    def _insert(self, pos: int, seq: str) -> None:
572        """Apply an insertion mutation."""
573        subid = len(self.insertions[pos]) + 1
574        self.insertions[pos].append((subid, seq))
575        entries = []
576        for nt in seq:
577            e = np.zeros(1, dtype=self.seq_array.dtype)[0]
578            e['nt'] = nt.encode()
579            e['index'] = pos
580            e['subidx'] = subid
581            e['ref'] = b'-'
582            e['cons'] = 0
583            e['mut_type'] = b'ins'
584            e['valid'] = True
585            entries.append(e)
586        i = np.searchsorted(self.seq_array['index'], pos, 'right')
587        # Explicit dtype for NumPy 2.x: list of np.void must be stacked with same dtype
588        entries_arr = np.array(entries, dtype=self.seq_array.dtype)
589        self.seq_array = np.concatenate([self.seq_array[:i], entries_arr, self.seq_array[i:]])
590
591    def _delete(self, pos: int, ref: str) -> None:
592        """Apply a deletion mutation."""
593        for i, base in enumerate(ref):
594            mask = (self.seq_array['index'] == pos + i) & (self.seq_array['subidx'] == 0)
595            self.seq_array['valid'][mask] = False
596            self.seq_array['mut_type'][mask] = b'del'
597
598    @staticmethod
599    def _complement_nt(nt_field: np.ndarray) -> np.ndarray:
600        """Complement nucleotide bytes via single-pass LUT (A<->T, C<->G)."""
601        return _COMPLEMENT_LUT[nt_field.view(np.uint8)].view('S1')
602
603    def _complement_in_place(self) -> None:
604        """Complement ``nt`` (and ``ref``) in place using ``bytes.translate``.
605
606        ~10x faster than numpy fancy indexing for the typical sequence sizes we see.
607        """
608        nt = self.seq_array['nt']
609        nt[:] = np.frombuffer(nt.tobytes().translate(_BYTES_COMPLEMENT), dtype='S1')
610
611    def complement(self, copy: bool = False) -> SeqMat:
612        """Complement the sequence (A<->T, C<->G) in place or return a copy."""
613        target = self.clone() if copy else self
614        target._ensure_writable()
615        target._complement_in_place()
616        return target
617
618    def reverse_complement(self, copy: bool = False) -> SeqMat:
619        """Reverse-complement the sequence in place or return a copy.
620
621        Reverses each column of the structured array (column-by-column is faster
622        than reversing the full 27-byte-per-record array as one slab).
623        """
624        target = self.clone() if copy else self
625        target._ensure_writable()
626        src = target.seq_array
627        # Complement nt via bytes.translate, then reverse all columns into a fresh array.
628        rc_nt = src['nt'].tobytes().translate(_BYTES_COMPLEMENT)[::-1]
629        new = np.empty_like(src)
630        new['nt'] = np.frombuffer(rc_nt, dtype='S1')
631        for col in src.dtype.names:
632            if col == 'nt':
633                continue
634            new[col] = src[col][::-1]
635        target.seq_array = new
636        target.rev = not target.rev
637        return target
638
639    def remove_regions(self, regions: List[Tuple[int, int]]) -> SeqMat:
640        """
641        Excise given genomic intervals (inclusive on both ends).
642
643        Args:
644            regions: List of (start, end) tuples to remove
645
646        Returns:
647            New SeqMat with regions removed
648        """
649        src = self.seq_array
650        indices = src['index']
651        n = len(indices)
652        remove = np.zeros(n, dtype=bool)
653
654        ascending = n < 2 or indices[0] <= indices[-1]
655        sorted_idx = indices if ascending else indices[::-1]
656
657        for lo, hi in regions:
658            lo, hi = min(lo, hi), max(lo, hi)
659            i = np.searchsorted(sorted_idx, lo)
660            j = np.searchsorted(sorted_idx, hi, side='right')
661            if ascending:
662                remove[i:j] = True
663            else:
664                remove[n - j:n - i] = True
665
666        keep = ~remove
667        kept_n = int(keep.sum())
668
669        # Build the new SeqMat directly — avoid the COW clone + per-column structured
670        # boolean-mask copy (which materialises 27 bytes per surviving row). Doing
671        # the keep-filter column by column is ~2-3x faster on large arrays.
672        new = copy.copy(self)
673        new.notes = copy.deepcopy(self.notes)
674        new.insertions = copy.deepcopy(self.insertions)
675        new.mutations = list(self.mutations)
676        new.mutated_positions = set(self.mutated_positions)
677        new._cow = False
678        new_array = np.empty(kept_n, dtype=src.dtype)
679        for col in src.dtype.names:
680            new_array[col] = src[col][keep]
681        new.seq_array = new_array
682        return new
683
684    def summary(self) -> str:
685        """Return a summary of the SeqMat object."""
686        return (f"SeqMat '{self.name}': {len(self)}bp valid, mutations={len(self.mutations)}, "
687                f"inserts at {list(self.insertions.keys())}")
688
689    def to_dict(self) -> Dict[str, Any]:
690        """Convert SeqMat to a dictionary representation."""
691        return {
692            'name': self.name,
693            'sequence': self.seq,
694            'reference': self.reference_seq,
695            'mutations': self.mutations,
696            'length': len(self),
697            'mutated_positions': list(self.mutated_positions)
698        }
699    
700    def to_fasta(self, wrap: int = 80) -> str:
701        """
702        Export sequence in FASTA format.
703        
704        Args:
705            wrap: Line length for sequence wrapping
706            
707        Returns:
708            FASTA-formatted string
709        """
710        header = f">{self.name}"
711        if self.mutations:
712            header += f" mutations={len(self.mutations)}"
713        
714        seq = self.seq
715        lines = [header]
716        for i in range(0, len(seq), wrap):
717            lines.append(seq[i:i+wrap])
718        
719        return '\n'.join(lines)
720    
721    def reset_mutation_vector(self) -> None:
722        """
723        Reset mutation tracking (mutated_positions, mut_type) while preserving
724        the current nucleotides (nt) and reference info.
725        This means the current sequence becomes the new baseline.
726        """
727        self._ensure_writable()
728        # Clear existing mutation annotations
729        self.seq_array['ref'] = self.seq_array['nt']    # new reference = current nt
730        self.seq_array['mut_type'] = b''                # clear mutation type
731        self.insertions.clear()                         # clear insertion tracking
732        self.mutations.clear()                          # clear mutation history
733        self.mutated_positions.clear()                  # clear mutation set
734
735
736    # ---------- Fast I/O ----------
737    def save_seqmat(self, path: Union[str, Path], *, compressed: bool = False) -> Path:
738        """
739        Save the entire SeqMat to a single .npz (fast).
740        - The structured seq_array is stored verbatim (zero copy conversion).
741        - Python containers (insertions, mutations, mutated_positions, notes) are pickled.
742        - If predicted_splicing exists, it is saved to a sibling .parquet file.
743
744        Args:
745            path: target file path ('.npz' will be appended if missing)
746            compressed: if True, use np.savez_compressed (slower, smaller)
747
748        Returns:
749            The written .npz Path
750        """
751        path = Path(path)
752        if path.suffix.lower() != ".npz":
753            path = path.with_suffix(".npz")
754
755        meta = {
756            "name": self.name,
757            "version": self.version,
758            "source": self.source,
759            "rev": self.rev,
760            "has_predicted_splicing": self.predicted_splicing is not None,
761        }
762
763        # For maximum speed, use np.savez (zip, no deflate). Set allow_pickle=True for load.
764        saver = np.savez_compressed if compressed else np.savez
765
766        # Write atomically
767        tmp = path.with_suffix(".npz.tmp")
768        saver(
769            tmp,
770            seq_array=self.seq_array,                                # structured array
771            meta=np.array(meta, dtype=object),                       # small dict (pickled)
772            notes=np.array(self.notes, dtype=object),                # user metadata (pickled)
773            insertions=np.array(dict(self.insertions), dtype=object),# defaultdict(list) (pickled)
774            mutations=np.array(self.mutations, dtype=object),        # list[dict] (pickled)
775            mutated_positions=np.array(list(self.mutated_positions), dtype=np.int64),
776        )
777        tmp.replace(path)
778
779        # Save DataFrame separately if present (fast columnar)
780        if self.predicted_splicing is not None:
781            ppath = path.with_suffix(".parquet")
782            # Use pandas fast parquet writer if available
783            try:
784                self.predicted_splicing.to_parquet(ppath, index=False)
785            except Exception:
786                # Feather as fallback
787                self.predicted_splicing.reset_index(drop=True).to_feather(ppath.with_suffix(".feather"))
788
789        return path
790
791    @classmethod
792    def read_seqmat(cls, path: Union[str, Path]) -> "SeqMat":
793        """
794        Load a SeqMat saved by save_seqmat() (fast).
795        Returns:
796            SeqMat instance with all fields restored.
797        """
798        path = Path(path)
799        if path.suffix.lower() != ".npz":
800            path = path.with_suffix(".npz")
801
802        with np.load(path, allow_pickle=True) as z:
803            seq_array = z["seq_array"]
804            meta = z["meta"].item() if isinstance(z["meta"], np.ndarray) else z["meta"]
805            notes = z["notes"].item() if isinstance(z["notes"], np.ndarray) else z["notes"]
806
807            # Containers
808            ins_obj = z["insertions"].item() if isinstance(z["insertions"], np.ndarray) else z["insertions"]
809            mut_list = z["mutations"].tolist() if hasattr(z["mutations"], "tolist") else z["mutations"]
810            mutated_positions = set(map(int, z["mutated_positions"].tolist()))
811
812        # Build a blank instance and populate directly to avoid recomputing arrays
813        obj = object.__new__(cls)
814        # Metadata
815        obj.name = meta.get("name", "wild_type")
816        obj.version = meta.get("version", "1.0")
817        obj.source = meta.get("source", "Unknown")
818        obj.notes = notes if isinstance(notes, dict) else dict(notes)
819        obj.rev = bool(meta.get("rev", False))
820        obj.predicted_splicing = None
821
822        # Core storage
823        obj.seq_array = seq_array
824        # Ensure types
825        from collections import defaultdict
826        dl = defaultdict(list)
827        dl.update(ins_obj if isinstance(ins_obj, dict) else dict(ins_obj))
828        obj.insertions = dl
829        obj.mutations = list(mut_list) if isinstance(mut_list, (list, tuple)) else []
830        obj.mutated_positions = mutated_positions
831        obj._cow = False
832
833        # Keep slots that are not persisted explicitly in a clean state
834        # (no additional refresh needed; seq_array already contains nt/ref/valid/mut_type)
835        return obj

Sequence matrix with indices, mutations (SNP/ins/del), complement, slicing, remove_regions, from_fasta.

SeqMat( nucleotides: Union[str, numpy.ndarray] = '', indices: Optional[numpy.ndarray] = None, conservation: Optional[numpy.ndarray] = None, reference: Union[str, numpy.ndarray, NoneType] = None, name: str = 'wild_type', source: Optional[str] = None, version: str = '1.0', notes: Optional[dict] = None, rev: bool = False, seq: Optional[str] = None)
 63    def __init__(
 64        self,
 65        nucleotides: Union[str, np.ndarray] = "",
 66        indices: Optional[np.ndarray] = None,
 67        conservation: Optional[np.ndarray] = None,
 68        reference: Optional[Union[str, np.ndarray]] = None,
 69        name: str = 'wild_type',
 70        source: Optional[str] = None,
 71        version: str = '1.0',
 72        notes: Optional[dict] = None,
 73        rev: bool = False,
 74        seq: Optional[str] = None,
 75    ) -> None:
 76        if seq is not None and not nucleotides:
 77            nucleotides = seq
 78        self.name = name
 79        self.version = version
 80        self.source = source or "Unknown"
 81        self.notes = notes or {}
 82        self.rev = rev
 83        self.predicted_splicing = None
 84        self._cow = False
 85        self.insertions = defaultdict(list)
 86        self.mutations = []
 87        self.mutated_positions = set()
 88
 89        # Prepare sequence
 90        if isinstance(nucleotides, str):
 91            nts = np.frombuffer(nucleotides.encode('ascii'), dtype='S1')
 92        else:
 93            nts = np.asarray(nucleotides, dtype='S1')
 94        L = len(nts)
 95
 96        # Indices default to 1-based
 97        if indices is None:
 98            indices = np.arange(1, L+1, dtype=np.int64)
 99        else:
100            indices = np.asarray(indices, dtype=np.int64)
101        if len(indices) != L:
102            raise ValueError(f"Indices length {len(indices)} != sequence length {L}")
103
104        # Structured dtype
105        dtype = np.dtype([
106            ('nt', 'S1'), ('index', np.int64), ('subidx', np.int16),
107            ('ref', 'S1'), ('cons', np.float32), ('mut_type', 'S10'), ('valid', bool)
108        ])
109        arr = np.zeros(L, dtype=dtype)
110        arr['nt'] = nts
111        arr['index'] = indices
112        arr['subidx'] = 0
113
114        # Reference
115        if reference is None:
116            arr['ref'] = nts
117        else:
118            if isinstance(reference, str):
119                ref_arr = np.array(list(reference), dtype='S1')
120            else:
121                ref_arr = np.array(reference, dtype='S1')
122            if len(ref_arr) != L:
123                raise ValueError("Reference length mismatch")
124            arr['ref'] = ref_arr
125
126        # Conservation
127        if conservation is not None:
128            cons = np.asarray(conservation, dtype=np.float32)
129            if len(cons) != L:
130                raise ValueError("Conservation length mismatch")
131            arr['cons'] = cons
132        else:
133            arr['cons'] = 0.0
134
135        arr['mut_type'] = b''
136        arr['valid'] = arr['nt'] != b'-'
137        self.seq_array = arr
name: str
version: str
source: str
notes: dict
seq_array: numpy.ndarray
insertions: Dict[int, List[Tuple[int, str]]]
mutations: List[Dict[str, Any]]
mutated_positions: set[int]
rev: bool
predicted_splicing: Optional[pandas.core.frame.DataFrame]
@classmethod
def from_fasta( cls, genome: str, chrom: str, start: int, end: int, source_fasta: Optional[pathlib.Path] = None, **kwargs) -> SeqMat:
139    @classmethod
140    def from_fasta(
141        cls,
142        genome: str,
143        chrom: str,
144        start: int,
145        end: int,
146        source_fasta: Optional[Path] = None,
147        **kwargs
148    ) -> SeqMat:
149        """
150        Load a genomic interval from FASTA.
151
152        Args:
153            genome: Genome identifier (e.g. 'hg38')
154            chrom: Chromosome name
155            start: Start position (1-based)
156            end: End position (1-based, inclusive)
157            source_fasta: Path to the FASTA file. If None, uses 'fasta_full_genome' from config.
158            **kwargs: Additional arguments for SeqMat constructor
159
160        Returns:
161            SeqMat object containing the requested sequence
162        """
163        if source_fasta is None:
164            config = get_organism_config(genome)
165            # Prefer single full-genome FASTA (no per-chromosome split needed)
166            source_fasta = config.get('fasta_full_genome') or config.get('fasta')
167            if source_fasta is None:
168                # Fall back to per-chromosome files for backward compatibility
169                chrom_source = config.get('CHROM_SOURCE')
170                if chrom_source:
171                    chrom_path = Path(chrom_source)
172                    chrom_file = chrom_path / f"{chrom}.fasta"
173                    if chrom_file.exists():
174                        source_fasta = chrom_file
175            if source_fasta is None:
176                raise ValueError(
177                    f"No FASTA configured for genome '{genome}'. Set 'fasta_full_genome' or 'fasta' in config, "
178                    f"or provide per-chromosome files in CHROM_SOURCE. Run setup_genomics_data() or set_fasta_path()."
179                )
180
181        seq = _fetch_fasta_region(str(source_fasta), chrom, start, end)
182        indices = np.arange(start, end+1, dtype=np.int64)
183        return cls(nucleotides=seq, indices=indices, name=f"{chrom}:{start}-{end}", source=genome, **kwargs)

Load a genomic interval from FASTA.

Arguments:
  • genome: Genome identifier (e.g. 'hg38')
  • chrom: Chromosome name
  • start: Start position (1-based)
  • end: End position (1-based, inclusive)
  • source_fasta: Path to the FASTA file. If None, uses 'fasta_full_genome' from config.
  • **kwargs: Additional arguments for SeqMat constructor
Returns:

SeqMat object containing the requested sequence

@classmethod
def from_fasta_file( cls, fasta_path: Union[str, pathlib.Path], chrom: str, start: int, end: int, **kwargs) -> SeqMat:
185    @classmethod
186    def from_fasta_file(cls, fasta_path: Union[str, Path], chrom: str, start: int, end: int, **kwargs) -> SeqMat:
187        """Load a genomic interval directly from a FASTA file path"""
188        seq = _fetch_fasta_region(str(fasta_path), chrom, start, end)
189        indices = np.arange(start, end+1, dtype=np.int64)
190        return cls(nucleotides=seq, indices=indices, name=f"{chrom}:{start}-{end}", **kwargs)

Load a genomic interval directly from a FASTA file path

seq: str
200    @property
201    def seq(self) -> str:
202        """Return the current sequence as a string (only valid bases)."""
203        return self.seq_array['nt'][self.seq_array['valid']].tobytes().decode()

Return the current sequence as a string (only valid bases).

reference_seq: str
205    @property
206    def reference_seq(self) -> str:
207        """Return the reference sequence as a string (only valid bases)."""
208        return self.seq_array['ref'][self.seq_array['valid']].tobytes().decode()

Return the reference sequence as a string (only valid bases).

def alignment(self) -> Tuple[str, str]:
210    def alignment(self) -> Tuple[str, str]:
211        """Return gapped reference and mutated sequences aligned to each other.
212
213        Deletions appear as '-' in the mutated string, insertions appear as '-'
214        in the reference string, and SNPs show the differing bases at the same
215        position.
216
217        Returns:
218            (reference, mutated) tuple of equal-length strings.
219        """
220        arr = self.seq_array
221        ref = arr['ref'].tobytes().decode()
222        mut = np.where(arr['valid'], arr['nt'], b'-').tobytes().decode()
223        return ref, mut

Return gapped reference and mutated sequences aligned to each other.

Deletions appear as '-' in the mutated string, insertions appear as '-' in the reference string, and SNPs show the differing bases at the same position.

Returns:

(reference, mutated) tuple of equal-length strings.

index: numpy.ndarray
225    @property
226    def index(self) -> np.ndarray:
227        """Return the genomic indices of valid bases."""
228        return self.seq_array['index'][self.seq_array['valid']]

Return the genomic indices of valid bases.

mutation_vector: numpy.ndarray
230    @property
231    def mutation_vector(self) -> np.ndarray:
232        """Return a binary vector indicating mutated positions (1=mutated, 0=not mutated)."""
233        valid_mask = self.seq_array['valid']
234        indices = self.seq_array['index'][valid_mask]
235        mutated_array = np.array(list(self.mutated_positions), dtype=np.int64)
236        return np.isin(indices, mutated_array).astype(np.int8)

Return a binary vector indicating mutated positions (1=mutated, 0=not mutated).

def clone( self, start: Optional[int] = None, end: Optional[int] = None) -> SeqMat:
276    def clone(self, start: Optional[int] = None, end: Optional[int] = None) -> SeqMat:
277        """
278        Create a copy of this SeqMat, optionally sliced to a specific range.
279
280        Args:
281            start: Start position (genomic coordinate)
282            end: End position (genomic coordinate)
283
284        Returns:
285            A new SeqMat object
286        """
287        new = copy.copy(self)
288        new.notes = copy.deepcopy(self.notes)
289        new.insertions = copy.deepcopy(self.insertions)
290        new.mutations = copy.deepcopy(self.mutations)
291        new.mutated_positions = set(self.mutated_positions)
292        if start is not None or end is not None:
293            lo = start if start is not None else self.index.min()
294            hi = end if end is not None else self.index.max()
295            lo, hi = min(lo, hi), max(lo, hi)
296            mask = (self.seq_array['index'] >= lo) & (self.seq_array['index'] <= hi)
297            new.seq_array = self.seq_array[mask].copy()
298            new._cow = False
299        else:
300            # COW: share seq_array, copy on first write
301            new.seq_array = self.seq_array
302            new._cow = True
303            self._cow = True
304        return new

Create a copy of this SeqMat, optionally sliced to a specific range.

Arguments:
  • start: Start position (genomic coordinate)
  • end: End position (genomic coordinate)
Returns:

A new SeqMat object

def apply_mutations( self, mutations: Union[Tuple[int, str, str], List[Tuple[int, str, str]]], *, permissive_ref: bool = False) -> SeqMat:
417    def apply_mutations(
418        self,
419        mutations: Union[Mutation, List[Mutation]],
420        *,
421        permissive_ref: bool = False
422    ) -> SeqMat:
423        """
424        Apply SNPs/insertions/deletions to the sequence.
425        
426        Mutations are always defined relative to the positive strand. If the sequence
427        is currently on the negative strand, it will be temporarily converted to the
428        positive strand for mutation application, then converted back.
429        
430        Args:
431            mutations: Single mutation or list of (pos, ref, alt) tuples
432            permissive_ref: If True, skip reference validation
433            
434        Returns:
435            Self (for chaining)
436        """
437        if isinstance(mutations, tuple):
438            mutations = [mutations]
439        if not self._validate_mutation_batch(mutations):
440            return self
441
442        self._ensure_writable()
443
444        # Track original strand state
445        was_on_negative_strand = self.rev
446        
447        # If on negative strand, convert to positive strand for mutation application
448        if was_on_negative_strand:
449            self.reverse_complement()  # Convert to positive strand
450            
451        # Snapshot per-batch derived state once instead of recomputing per mutation.
452        valid_index = self.seq_array['index'][self.seq_array['valid']]
453        index_min = int(valid_index[0]) if len(valid_index) else 0
454        index_max = int(valid_index[-1]) if len(valid_index) else -1
455        # Coordinates are sorted (ascending or descending). Bound-check covers both forms.
456        if index_min > index_max:
457            index_min, index_max = index_max, index_min
458
459        # Normalize, classify, and bucket. Pure SNPs (len(ref)==len(alt)==1, neither '-')
460        # share one vectorized fast path; everything else falls through per-mutation.
461        snp_batch: List[Tuple[int, str, str]] = []
462        other: List[Tuple[int, str, str, str]] = []
463        snp_records: List[Dict[str, Any]] = []
464        other_records: List[Dict[str, Any]] = []
465        try:
466            for pos, ref, alt in mutations:
467                while ref != '-' and alt != '-' and ref[0] == alt[0]:
468                    pos += 1
469                    ref = ref[1:] or '-'
470                    alt = alt[1:] or '-'
471                if ref == '-' and alt == '-':
472                    continue
473                if pos < index_min or pos > index_max:
474                    continue
475                if ref != '-' and alt != '-' and len(ref) == 1 and len(alt) == 1:
476                    snp_batch.append((pos, ref, alt))
477                    snp_records.append({'pos': pos, 'ref': ref, 'alt': alt, 'type': 'snp'})
478                else:
479                    typ = self._classify_mutation(ref, alt)
480                    other.append((pos, ref, alt, typ))
481                    other_records.append({'pos': pos, 'ref': ref, 'alt': alt, 'type': typ})
482
483            if snp_batch:
484                applied = self._substitute_batch(snp_batch, permissive_ref)
485                self.mutations.extend(rec for rec, ok in zip(snp_records, applied) if ok)
486
487            for (pos, ref, alt, typ), rec in zip(other, other_records):
488                if typ == 'snp':
489                    self._substitute(pos, ref, alt, permissive_ref)
490                elif typ == 'ins':
491                    self._insert(pos, alt)
492                elif typ == 'del':
493                    self._delete(pos, ref)
494                elif typ == 'complex':
495                    self._delete(pos, ref)
496                    self._insert(pos, alt)
497                self.mutations.append(rec)
498        finally:
499            if was_on_negative_strand:
500                self.reverse_complement()
501
502        # Full refresh is O(seq length); skip when we only ran SNPs (mutated_positions
503        # was already updated in _substitute_batch and mut_type was set in place).
504        if other:
505            self._refresh_mutation_state()
506        return self

Apply SNPs/insertions/deletions to the sequence.

Mutations are always defined relative to the positive strand. If the sequence is currently on the negative strand, it will be temporarily converted to the positive strand for mutation application, then converted back.

Arguments:
  • mutations: Single mutation or list of (pos, ref, alt) tuples
  • permissive_ref: If True, skip reference validation
Returns:

Self (for chaining)

def complement(self, copy: bool = False) -> SeqMat:
611    def complement(self, copy: bool = False) -> SeqMat:
612        """Complement the sequence (A<->T, C<->G) in place or return a copy."""
613        target = self.clone() if copy else self
614        target._ensure_writable()
615        target._complement_in_place()
616        return target

Complement the sequence (A<->T, C<->G) in place or return a copy.

def reverse_complement(self, copy: bool = False) -> SeqMat:
618    def reverse_complement(self, copy: bool = False) -> SeqMat:
619        """Reverse-complement the sequence in place or return a copy.
620
621        Reverses each column of the structured array (column-by-column is faster
622        than reversing the full 27-byte-per-record array as one slab).
623        """
624        target = self.clone() if copy else self
625        target._ensure_writable()
626        src = target.seq_array
627        # Complement nt via bytes.translate, then reverse all columns into a fresh array.
628        rc_nt = src['nt'].tobytes().translate(_BYTES_COMPLEMENT)[::-1]
629        new = np.empty_like(src)
630        new['nt'] = np.frombuffer(rc_nt, dtype='S1')
631        for col in src.dtype.names:
632            if col == 'nt':
633                continue
634            new[col] = src[col][::-1]
635        target.seq_array = new
636        target.rev = not target.rev
637        return target

Reverse-complement the sequence in place or return a copy.

Reverses each column of the structured array (column-by-column is faster than reversing the full 27-byte-per-record array as one slab).

def remove_regions(self, regions: List[Tuple[int, int]]) -> SeqMat:
639    def remove_regions(self, regions: List[Tuple[int, int]]) -> SeqMat:
640        """
641        Excise given genomic intervals (inclusive on both ends).
642
643        Args:
644            regions: List of (start, end) tuples to remove
645
646        Returns:
647            New SeqMat with regions removed
648        """
649        src = self.seq_array
650        indices = src['index']
651        n = len(indices)
652        remove = np.zeros(n, dtype=bool)
653
654        ascending = n < 2 or indices[0] <= indices[-1]
655        sorted_idx = indices if ascending else indices[::-1]
656
657        for lo, hi in regions:
658            lo, hi = min(lo, hi), max(lo, hi)
659            i = np.searchsorted(sorted_idx, lo)
660            j = np.searchsorted(sorted_idx, hi, side='right')
661            if ascending:
662                remove[i:j] = True
663            else:
664                remove[n - j:n - i] = True
665
666        keep = ~remove
667        kept_n = int(keep.sum())
668
669        # Build the new SeqMat directly — avoid the COW clone + per-column structured
670        # boolean-mask copy (which materialises 27 bytes per surviving row). Doing
671        # the keep-filter column by column is ~2-3x faster on large arrays.
672        new = copy.copy(self)
673        new.notes = copy.deepcopy(self.notes)
674        new.insertions = copy.deepcopy(self.insertions)
675        new.mutations = list(self.mutations)
676        new.mutated_positions = set(self.mutated_positions)
677        new._cow = False
678        new_array = np.empty(kept_n, dtype=src.dtype)
679        for col in src.dtype.names:
680            new_array[col] = src[col][keep]
681        new.seq_array = new_array
682        return new

Excise given genomic intervals (inclusive on both ends).

Arguments:
  • regions: List of (start, end) tuples to remove
Returns:

New SeqMat with regions removed

def summary(self) -> str:
684    def summary(self) -> str:
685        """Return a summary of the SeqMat object."""
686        return (f"SeqMat '{self.name}': {len(self)}bp valid, mutations={len(self.mutations)}, "
687                f"inserts at {list(self.insertions.keys())}")

Return a summary of the SeqMat object.

def to_dict(self) -> Dict[str, Any]:
689    def to_dict(self) -> Dict[str, Any]:
690        """Convert SeqMat to a dictionary representation."""
691        return {
692            'name': self.name,
693            'sequence': self.seq,
694            'reference': self.reference_seq,
695            'mutations': self.mutations,
696            'length': len(self),
697            'mutated_positions': list(self.mutated_positions)
698        }

Convert SeqMat to a dictionary representation.

def to_fasta(self, wrap: int = 80) -> str:
700    def to_fasta(self, wrap: int = 80) -> str:
701        """
702        Export sequence in FASTA format.
703        
704        Args:
705            wrap: Line length for sequence wrapping
706            
707        Returns:
708            FASTA-formatted string
709        """
710        header = f">{self.name}"
711        if self.mutations:
712            header += f" mutations={len(self.mutations)}"
713        
714        seq = self.seq
715        lines = [header]
716        for i in range(0, len(seq), wrap):
717            lines.append(seq[i:i+wrap])
718        
719        return '\n'.join(lines)

Export sequence in FASTA format.

Arguments:
  • wrap: Line length for sequence wrapping
Returns:

FASTA-formatted string

def reset_mutation_vector(self) -> None:
721    def reset_mutation_vector(self) -> None:
722        """
723        Reset mutation tracking (mutated_positions, mut_type) while preserving
724        the current nucleotides (nt) and reference info.
725        This means the current sequence becomes the new baseline.
726        """
727        self._ensure_writable()
728        # Clear existing mutation annotations
729        self.seq_array['ref'] = self.seq_array['nt']    # new reference = current nt
730        self.seq_array['mut_type'] = b''                # clear mutation type
731        self.insertions.clear()                         # clear insertion tracking
732        self.mutations.clear()                          # clear mutation history
733        self.mutated_positions.clear()                  # clear mutation set

Reset mutation tracking (mutated_positions, mut_type) while preserving the current nucleotides (nt) and reference info. This means the current sequence becomes the new baseline.

def save_seqmat( self, path: Union[str, pathlib.Path], *, compressed: bool = False) -> pathlib.Path:
737    def save_seqmat(self, path: Union[str, Path], *, compressed: bool = False) -> Path:
738        """
739        Save the entire SeqMat to a single .npz (fast).
740        - The structured seq_array is stored verbatim (zero copy conversion).
741        - Python containers (insertions, mutations, mutated_positions, notes) are pickled.
742        - If predicted_splicing exists, it is saved to a sibling .parquet file.
743
744        Args:
745            path: target file path ('.npz' will be appended if missing)
746            compressed: if True, use np.savez_compressed (slower, smaller)
747
748        Returns:
749            The written .npz Path
750        """
751        path = Path(path)
752        if path.suffix.lower() != ".npz":
753            path = path.with_suffix(".npz")
754
755        meta = {
756            "name": self.name,
757            "version": self.version,
758            "source": self.source,
759            "rev": self.rev,
760            "has_predicted_splicing": self.predicted_splicing is not None,
761        }
762
763        # For maximum speed, use np.savez (zip, no deflate). Set allow_pickle=True for load.
764        saver = np.savez_compressed if compressed else np.savez
765
766        # Write atomically
767        tmp = path.with_suffix(".npz.tmp")
768        saver(
769            tmp,
770            seq_array=self.seq_array,                                # structured array
771            meta=np.array(meta, dtype=object),                       # small dict (pickled)
772            notes=np.array(self.notes, dtype=object),                # user metadata (pickled)
773            insertions=np.array(dict(self.insertions), dtype=object),# defaultdict(list) (pickled)
774            mutations=np.array(self.mutations, dtype=object),        # list[dict] (pickled)
775            mutated_positions=np.array(list(self.mutated_positions), dtype=np.int64),
776        )
777        tmp.replace(path)
778
779        # Save DataFrame separately if present (fast columnar)
780        if self.predicted_splicing is not None:
781            ppath = path.with_suffix(".parquet")
782            # Use pandas fast parquet writer if available
783            try:
784                self.predicted_splicing.to_parquet(ppath, index=False)
785            except Exception:
786                # Feather as fallback
787                self.predicted_splicing.reset_index(drop=True).to_feather(ppath.with_suffix(".feather"))
788
789        return path

Save the entire SeqMat to a single .npz (fast).

  • The structured seq_array is stored verbatim (zero copy conversion).
  • Python containers (insertions, mutations, mutated_positions, notes) are pickled.
  • If predicted_splicing exists, it is saved to a sibling .parquet file.
Arguments:
  • path: target file path ('.npz' will be appended if missing)
  • compressed: if True, use np.savez_compressed (slower, smaller)
Returns:

The written .npz Path

@classmethod
def read_seqmat(cls, path: Union[str, pathlib.Path]) -> SeqMat:
791    @classmethod
792    def read_seqmat(cls, path: Union[str, Path]) -> "SeqMat":
793        """
794        Load a SeqMat saved by save_seqmat() (fast).
795        Returns:
796            SeqMat instance with all fields restored.
797        """
798        path = Path(path)
799        if path.suffix.lower() != ".npz":
800            path = path.with_suffix(".npz")
801
802        with np.load(path, allow_pickle=True) as z:
803            seq_array = z["seq_array"]
804            meta = z["meta"].item() if isinstance(z["meta"], np.ndarray) else z["meta"]
805            notes = z["notes"].item() if isinstance(z["notes"], np.ndarray) else z["notes"]
806
807            # Containers
808            ins_obj = z["insertions"].item() if isinstance(z["insertions"], np.ndarray) else z["insertions"]
809            mut_list = z["mutations"].tolist() if hasattr(z["mutations"], "tolist") else z["mutations"]
810            mutated_positions = set(map(int, z["mutated_positions"].tolist()))
811
812        # Build a blank instance and populate directly to avoid recomputing arrays
813        obj = object.__new__(cls)
814        # Metadata
815        obj.name = meta.get("name", "wild_type")
816        obj.version = meta.get("version", "1.0")
817        obj.source = meta.get("source", "Unknown")
818        obj.notes = notes if isinstance(notes, dict) else dict(notes)
819        obj.rev = bool(meta.get("rev", False))
820        obj.predicted_splicing = None
821
822        # Core storage
823        obj.seq_array = seq_array
824        # Ensure types
825        from collections import defaultdict
826        dl = defaultdict(list)
827        dl.update(ins_obj if isinstance(ins_obj, dict) else dict(ins_obj))
828        obj.insertions = dl
829        obj.mutations = list(mut_list) if isinstance(mut_list, (list, tuple)) else []
830        obj.mutated_positions = mutated_positions
831        obj._cow = False
832
833        # Keep slots that are not persisted explicitly in a clean state
834        # (no additional refresh needed; seq_array already contains nt/ref/valid/mut_type)
835        return obj

Load a SeqMat saved by save_seqmat() (fast).

Returns:

SeqMat instance with all fields restored.

class Gene:
 17class Gene:
 18    """
 19    A class representing a Gene, with associated transcripts and metadata.
 20
 21    Attributes:
 22        organism (str): The organism build (e.g. 'hg38').
 23        transcripts (dict): A dictionary of transcript annotations keyed by transcript ID.
 24        gene_name (str): The name of the gene.
 25        gene_id (str): The unique identifier for the gene.
 26        chrm (str): The chromosome on which the gene resides.
 27        rev (bool): Whether the gene is on the reverse strand.
 28    """
 29
 30    def __init__(self, gene_name: str, gene_id: str, rev: bool, chrm: str, 
 31                 transcripts: Optional[Dict[str, Any]] = None, organism: Optional[str] = None):
 32        """
 33        Initialize a Gene instance.
 34
 35        Args:
 36            gene_name: Name of the gene
 37            gene_id: Unique identifier for the gene
 38            rev: Whether gene is on reverse strand
 39            chrm: Chromosome identifier
 40            transcripts: Dictionary of transcript annotations
 41            organism: Organism reference build (default from config)
 42        """
 43        self.gene_name = gene_name
 44        self.gene_id = gene_id
 45        self.rev = rev
 46        self.chrm = chrm
 47        self.organism = organism if organism is not None else get_default_organism()
 48        self.transcripts = transcripts if transcripts is not None else {}
 49
 50    def __repr__(self) -> str:
 51        """Official string representation of the Gene object."""
 52        return f"Gene({self.gene_name})"
 53
 54    def __str__(self) -> str:
 55        """User-friendly string representation of the Gene object."""
 56        return f"Gene: {self.gene_name}, ID: {self.gene_id}, Chr: {self.chrm}, Transcripts: {len(self.transcripts)}"
 57
 58    def __len__(self) -> int:
 59        """Returns the number of transcripts associated with this gene."""
 60        return len(self.transcripts)
 61
 62    def __iter__(self) -> Iterator[Transcript]:
 63        """Allow iteration over the gene's transcripts, yielding Transcript objects."""
 64        for tid, annotations in self.transcripts.items():
 65            yield Transcript(annotations, organism=self.organism)
 66
 67    def __getitem__(self, item: str) -> Optional[Transcript]:
 68        """Get a transcript by ID. Returns ``None`` if the ID isn't an annotated transcript."""
 69        if item not in self.transcripts:
 70            _log.warning("%s is not an annotated transcript of %s", item, self.gene_name)
 71            return None
 72        return Transcript(self.transcripts[item], organism=self.organism)
 73
 74    @classmethod
 75    def _load_dict(cls, gene_name: str, organism: str) -> Optional[Dict[str, Any]]:
 76        """Load the raw gene dict from LMDB → SQLite → per-gene pickle, in that order.
 77
 78        Returns ``None`` when the gene isn't found in any backend.
 79        """
 80        try:
 81            from .lmdb_store import load_gene_from_lmdb
 82            data = load_gene_from_lmdb(gene_name, organism)
 83            if data is not None:
 84                return data
 85        except ImportError:
 86            pass
 87        try:
 88            from .sqlite_store import load_gene_from_sqlite
 89            data = load_gene_from_sqlite(gene_name, organism)
 90            if data is not None:
 91                return data
 92        except ImportError:
 93            pass
 94        try:
 95            config = get_organism_config(organism)
 96        except ValueError as exc:
 97            raise OrganismNotConfiguredError(
 98                f"Organism '{organism}' not configured. Run setup_genomics_data() first."
 99            ) from exc
100        mrna_path = Path(config["MRNA_PATH"])
101        if mrna_path.exists():
102            for biotype_dir in mrna_path.iterdir():
103                if biotype_dir.is_dir():
104                    for pkl in biotype_dir.glob(f"*_{gene_name}.pkl"):
105                        return unload_pickle(pkl)
106        return None
107
108    @classmethod
109    def get(cls, gene_name: str, organism: Optional[str] = None) -> 'Gene':
110        """Load a gene, raising on failure.
111
112        Raises:
113            OrganismNotConfiguredError: when the organism has no data set up.
114            GeneNotFoundError: when the organism is configured but the gene is missing.
115        """
116        if organism is None:
117            organism = get_default_organism()
118        data = cls._load_dict(gene_name, organism)
119        if data is None:
120            raise GeneNotFoundError(
121                f"Gene '{gene_name}' not found in organism '{organism}'."
122            )
123        return cls(
124            gene_name=data.get('gene_name'),
125            gene_id=data.get('gene_id'),
126            rev=data.get('rev'),
127            chrm=data.get('chrm'),
128            transcripts=data.get('transcripts', {}),
129            organism=organism,
130        )
131
132    @classmethod
133    def from_file(cls, gene_name: str, organism: Optional[str] = None) -> Optional['Gene']:
134        """Load a gene by name, returning ``None`` if not found.
135
136        Load order: LMDB (if installed) → SQLite (``genes.db``) → per-gene pickle files.
137        Prefer :meth:`get` if you'd rather have a typed exception than ``None``.
138        """
139        if organism is None:
140            organism = get_default_organism()
141        try:
142            return cls.get(gene_name, organism=organism)
143        except OrganismNotConfiguredError as exc:
144            _log.warning("%s", exc)
145            return None
146        except GeneNotFoundError as exc:
147            _log.warning("%s", exc)
148            return None
149
150    @classmethod
151    def from_position(
152        cls,
153        chrm: str,
154        pos: PosArg,
155        organism: Optional[str] = None,
156    ) -> List['Gene']:
157        """Return all genes overlapping a point or range on a chromosome.
158
159        Args:
160            chrm: Chromosome (e.g. "12", "chr12", "X"). Leading 'chr' is stripped.
161            pos: Either an int position or a (start, end) tuple (inclusive).
162            organism: Organism build (uses default if None).
163
164        Returns:
165            List of Gene objects, possibly empty. Returned in ascending start order.
166        """
167        if organism is None:
168            organism = get_default_organism()
169        names = gene_names_at_position(chrm, pos, organism=organism)
170        genes: List['Gene'] = []
171        for name in names:
172            g = cls.from_file(name, organism=organism)
173            if g is not None:
174                genes.append(g)
175        return genes
176
177    def splice_sites(self) -> Tuple[Counter, Counter]:
178        """Return (Counter of acceptors, Counter of donors) across all transcripts."""
179        acceptors: List[Any] = []
180        donors: List[Any] = []
181        for transcript in self.transcripts.values():
182            acceptors.extend(transcript.get('acceptors', []))
183            donors.extend(transcript.get('donors', []))
184
185        return Counter(acceptors), Counter(donors)
186
187    def transcript(self, tid: Optional[str] = None) -> Optional[Transcript]:
188        """Return Transcript by ID, or primary transcript if tid is None."""
189        if tid is None:
190            tid = self.primary_transcript
191            
192        if tid is None or tid not in self.transcripts:
193            return None
194
195        return Transcript(self.transcripts[tid], organism=self.organism)
196
197    @property
198    def primary_transcript(self) -> Optional[str]:
199        """Primary transcript ID, or first protein-coding transcript, or None."""
200        if hasattr(self, "_primary_transcript"):
201            return self._primary_transcript
202        primary = [k for k, v in self.transcripts.items() if v.get("primary_transcript")]
203        if primary:
204            self._primary_transcript = primary[0]
205            return self._primary_transcript
206        protein_coding = [k for k, v in self.transcripts.items()
207                         if v.get("transcript_biotype") == "protein_coding"]
208        if protein_coding:
209            self._primary_transcript = protein_coding[0]
210            return self._primary_transcript
211        self._primary_transcript = None
212        return None

A class representing a Gene, with associated transcripts and metadata.

Attributes:
  • organism (str): The organism build (e.g. 'hg38').
  • transcripts (dict): A dictionary of transcript annotations keyed by transcript ID.
  • gene_name (str): The name of the gene.
  • gene_id (str): The unique identifier for the gene.
  • chrm (str): The chromosome on which the gene resides.
  • rev (bool): Whether the gene is on the reverse strand.
Gene( gene_name: str, gene_id: str, rev: bool, chrm: str, transcripts: Optional[Dict[str, Any]] = None, organism: Optional[str] = None)
30    def __init__(self, gene_name: str, gene_id: str, rev: bool, chrm: str, 
31                 transcripts: Optional[Dict[str, Any]] = None, organism: Optional[str] = None):
32        """
33        Initialize a Gene instance.
34
35        Args:
36            gene_name: Name of the gene
37            gene_id: Unique identifier for the gene
38            rev: Whether gene is on reverse strand
39            chrm: Chromosome identifier
40            transcripts: Dictionary of transcript annotations
41            organism: Organism reference build (default from config)
42        """
43        self.gene_name = gene_name
44        self.gene_id = gene_id
45        self.rev = rev
46        self.chrm = chrm
47        self.organism = organism if organism is not None else get_default_organism()
48        self.transcripts = transcripts if transcripts is not None else {}

Initialize a Gene instance.

Arguments:
  • gene_name: Name of the gene
  • gene_id: Unique identifier for the gene
  • rev: Whether gene is on reverse strand
  • chrm: Chromosome identifier
  • transcripts: Dictionary of transcript annotations
  • organism: Organism reference build (default from config)
gene_name
gene_id
rev
chrm
organism
transcripts
@classmethod
def get(cls, gene_name: str, organism: Optional[str] = None) -> Gene:
108    @classmethod
109    def get(cls, gene_name: str, organism: Optional[str] = None) -> 'Gene':
110        """Load a gene, raising on failure.
111
112        Raises:
113            OrganismNotConfiguredError: when the organism has no data set up.
114            GeneNotFoundError: when the organism is configured but the gene is missing.
115        """
116        if organism is None:
117            organism = get_default_organism()
118        data = cls._load_dict(gene_name, organism)
119        if data is None:
120            raise GeneNotFoundError(
121                f"Gene '{gene_name}' not found in organism '{organism}'."
122            )
123        return cls(
124            gene_name=data.get('gene_name'),
125            gene_id=data.get('gene_id'),
126            rev=data.get('rev'),
127            chrm=data.get('chrm'),
128            transcripts=data.get('transcripts', {}),
129            organism=organism,
130        )

Load a gene, raising on failure.

Raises:
  • OrganismNotConfiguredError: when the organism has no data set up.
  • GeneNotFoundError: when the organism is configured but the gene is missing.
@classmethod
def from_file( cls, gene_name: str, organism: Optional[str] = None) -> Optional[Gene]:
132    @classmethod
133    def from_file(cls, gene_name: str, organism: Optional[str] = None) -> Optional['Gene']:
134        """Load a gene by name, returning ``None`` if not found.
135
136        Load order: LMDB (if installed) → SQLite (``genes.db``) → per-gene pickle files.
137        Prefer :meth:`get` if you'd rather have a typed exception than ``None``.
138        """
139        if organism is None:
140            organism = get_default_organism()
141        try:
142            return cls.get(gene_name, organism=organism)
143        except OrganismNotConfiguredError as exc:
144            _log.warning("%s", exc)
145            return None
146        except GeneNotFoundError as exc:
147            _log.warning("%s", exc)
148            return None

Load a gene by name, returning None if not found.

Load order: LMDB (if installed) → SQLite (genes.db) → per-gene pickle files. Prefer get() if you'd rather have a typed exception than None.

@classmethod
def from_position( cls, chrm: str, pos: Union[int, Tuple[int, int], List[int]], organism: Optional[str] = None) -> List[Gene]:
150    @classmethod
151    def from_position(
152        cls,
153        chrm: str,
154        pos: PosArg,
155        organism: Optional[str] = None,
156    ) -> List['Gene']:
157        """Return all genes overlapping a point or range on a chromosome.
158
159        Args:
160            chrm: Chromosome (e.g. "12", "chr12", "X"). Leading 'chr' is stripped.
161            pos: Either an int position or a (start, end) tuple (inclusive).
162            organism: Organism build (uses default if None).
163
164        Returns:
165            List of Gene objects, possibly empty. Returned in ascending start order.
166        """
167        if organism is None:
168            organism = get_default_organism()
169        names = gene_names_at_position(chrm, pos, organism=organism)
170        genes: List['Gene'] = []
171        for name in names:
172            g = cls.from_file(name, organism=organism)
173            if g is not None:
174                genes.append(g)
175        return genes

Return all genes overlapping a point or range on a chromosome.

Arguments:
  • chrm: Chromosome (e.g. "12", "chr12", "X"). Leading 'chr' is stripped.
  • pos: Either an int position or a (start, end) tuple (inclusive).
  • organism: Organism build (uses default if None).
Returns:

List of Gene objects, possibly empty. Returned in ascending start order.

def splice_sites(self) -> Tuple[collections.Counter, collections.Counter]:
177    def splice_sites(self) -> Tuple[Counter, Counter]:
178        """Return (Counter of acceptors, Counter of donors) across all transcripts."""
179        acceptors: List[Any] = []
180        donors: List[Any] = []
181        for transcript in self.transcripts.values():
182            acceptors.extend(transcript.get('acceptors', []))
183            donors.extend(transcript.get('donors', []))
184
185        return Counter(acceptors), Counter(donors)

Return (Counter of acceptors, Counter of donors) across all transcripts.

def transcript( self, tid: Optional[str] = None) -> Optional[Transcript]:
187    def transcript(self, tid: Optional[str] = None) -> Optional[Transcript]:
188        """Return Transcript by ID, or primary transcript if tid is None."""
189        if tid is None:
190            tid = self.primary_transcript
191            
192        if tid is None or tid not in self.transcripts:
193            return None
194
195        return Transcript(self.transcripts[tid], organism=self.organism)

Return Transcript by ID, or primary transcript if tid is None.

primary_transcript: Optional[str]
197    @property
198    def primary_transcript(self) -> Optional[str]:
199        """Primary transcript ID, or first protein-coding transcript, or None."""
200        if hasattr(self, "_primary_transcript"):
201            return self._primary_transcript
202        primary = [k for k, v in self.transcripts.items() if v.get("primary_transcript")]
203        if primary:
204            self._primary_transcript = primary[0]
205            return self._primary_transcript
206        protein_coding = [k for k, v in self.transcripts.items()
207                         if v.get("transcript_biotype") == "protein_coding"]
208        if protein_coding:
209            self._primary_transcript = protein_coding[0]
210            return self._primary_transcript
211        self._primary_transcript = None
212        return None

Primary transcript ID, or first protein-coding transcript, or None.

class Transcript:
 42class Transcript:
 43    """Transcript with genomic boundaries, donors/acceptors, pre_mrna, mature_mrna, and optional ORF/protein."""
 44
 45    def __init__(self, d: Dict[str, Any], organism: Optional[str] = None):
 46        """Build Transcript from dict of attributes. Requires transcript_start, transcript_end, rev, chrm."""
 47        self._cons_vector = None
 48        array_fields = {"acceptors", "donors", "cons_vector", "rev"}
 49        for k, v in d.items():
 50            if k == "cons_vector":
 51                self._cons_vector = np.asarray(v, dtype=np.float32) if v is not None else None
 52                continue
 53            if k in array_fields and v is not None:
 54                v = np.array(v)
 55            setattr(self, k, v)
 56
 57        self.organism = organism if organism is not None else get_default_organism()
 58        required_attrs = ["transcript_start", "transcript_end", "rev", "chrm"]
 59        missing = [attr for attr in required_attrs if not hasattr(self, attr)]
 60        if missing:
 61            raise AssertionError(f"Transcript is missing required attributes: {missing}")
 62        if not hasattr(self, "donors") or self.donors is None:
 63            self.donors = np.array([])
 64        if not hasattr(self, "acceptors") or self.acceptors is None:
 65            self.acceptors = np.array([])
 66        if not hasattr(self, "cons_available"):
 67            self.cons_available = False
 68        self.protein_coding = hasattr(self, "TIS") and hasattr(self, "TTS")
 69        self.transcript_upper = max(self.transcript_start, self.transcript_end)
 70        self.transcript_lower = min(self.transcript_start, self.transcript_end)
 71        self._pre_mrna = None
 72        if self.cons_available and hasattr(self, "cons_seq") and self._cons_vector is not None:
 73            if self.cons_seq.endswith("*") and len(self.cons_seq) == len(self._cons_vector):
 74                self._cons_vector = self._cons_vector[:-1]
 75                self.cons_seq = self.cons_seq[:-1]
 76
 77    def __repr__(self) -> str:
 78        """Official string representation."""
 79        return f"Transcript({getattr(self, 'transcript_id', 'unknown_id')})"
 80
 81    def __str__(self) -> str:
 82        """User-friendly string representation of the transcript."""
 83        transcript_biotype = getattr(self, 'transcript_biotype', 'unknown').replace('_', ' ').title()
 84        primary = getattr(self, 'primary_transcript', False)
 85        return f"Transcript {getattr(self, 'transcript_id', 'unknown_id')}, " \
 86               f"Type: {transcript_biotype}, Primary: {primary}"
 87
 88    def __len__(self) -> int:
 89        """Length of the transcript sequence."""
 90        return len(getattr(self, 'transcript_seq', ''))
 91
 92    def __eq__(self, other: object) -> bool:
 93        """Check equality of two transcripts based on their transcript sequences."""
 94        if not isinstance(other, Transcript):
 95            return NotImplemented
 96        return self.transcript_seq == other.transcript_seq
 97
 98    def __contains__(self, subvalue: Any) -> bool:
 99        """Check if a given subsequence is contained within the pre_mRNA."""
100        if not hasattr(subvalue, 'seq_array'):
101            return False
102        return np.all(np.isin(subvalue.index, self.pre_mrna.index))
103
104    def clone(self) -> Transcript:
105        """Returns a deep copy of this Transcript instance."""
106        return copy.deepcopy(self)
107
108    @property
109    def pre_mrna(self) -> SeqMat:
110        """Pre-mRNA sequence. Lazy-loaded from FASTA on first access."""
111        if self._pre_mrna is None:
112            self.generate_pre_mrna()
113        return self._pre_mrna
114
115    @pre_mrna.setter
116    def pre_mrna(self, value: Optional[SeqMat]) -> None:
117        self._pre_mrna = value
118
119    @property
120    def cons_vector(self) -> np.ndarray:
121        """Per-residue conservation scores; ones vector when conservation data not loaded."""
122        if self._cons_vector is not None:
123            return self._cons_vector
124        n = len(getattr(self, "protein", "")) if getattr(self, "protein", "") else len(self.pre_mrna)
125        return np.ones(n, dtype=np.float32)
126
127    @property
128    def exons(self) -> List[Tuple[int, int]]:
129        """Return a list of exon boundary tuples (acceptor, donor)."""
130        exon_starts = np.concatenate(([self.transcript_start], self.acceptors))
131        exon_ends = np.concatenate((self.donors, [self.transcript_end]))
132        return list(zip(exon_starts, exon_ends))
133
134    @property
135    def exons_pos(self) -> List[Tuple[int, int]]:
136        """Return exons with positions adjusted for strand orientation."""
137        exon_positions = self.exons
138        if self.rev:
139            # Reverse order and swap coordinates for reverse strand
140            exon_positions = [(end, start) for start, end in exon_positions[::-1]]
141        return exon_positions
142
143    @property
144    def introns(self) -> List[Tuple[int, int]]:
145        """Return a list of intron boundaries derived from donors and acceptors."""
146        valid_donors = self.donors[self.donors != self.transcript_end]
147        valid_acceptors = self.acceptors[self.acceptors != self.transcript_start]
148        
149        # Adjust intron boundaries to exclude exon splice sites
150        introns = []
151        for donor, acceptor in zip(valid_donors, valid_acceptors):
152            if self.rev:
153                # For reverse strand: intron from (donor-1) to (acceptor+1)
154                intron_start = donor - 1
155                intron_end = acceptor + 1
156            else:
157                # For forward strand: intron from (donor+1) to (acceptor-1)
158                intron_start = donor + 1
159                intron_end = acceptor - 1
160            introns.append((intron_start, intron_end))
161        
162        return introns
163
164    @property
165    def introns_pos(self) -> List[Tuple[int, int]]:
166        """Return introns with positions adjusted for strand orientation."""
167        intron_positions = self.introns
168        if self.rev:
169            intron_positions = [(end, start) for start, end in intron_positions[::-1]]
170        return intron_positions
171
172    def _fix_and_check_introns(self) -> 'Transcript':
173        """
174        Ensure acceptors and donors are sorted and unique, and validate exon/intron structures.
175
176        Raises:
177            ValueError: If there are mismatches or ordering issues in exons/introns
178
179        Returns:
180            The current Transcript object (for chaining)
181        """
182        # Ensure uniqueness and correct ordering based on strand
183        self.acceptors = np.unique(self.acceptors)
184        self.donors = np.unique(self.donors)
185
186        if self.rev:
187            self.acceptors = np.sort(self.acceptors)[::-1]
188            self.donors = np.sort(self.donors)[::-1]
189        else:
190            self.acceptors = np.sort(self.acceptors)
191            self.donors = np.sort(self.donors)
192
193        # Validation checks
194        if self._exon_intron_matchup_flag():
195            raise ValueError("Unequal number of acceptors and donors.")
196
197        if self._exon_intron_order_flag():
198            raise ValueError("Exon/intron order out of position.")
199
200        if self._transcript_boundary_flag():
201            raise ValueError("Transcript boundaries must straddle acceptors and donors.")
202
203        return self
204
205    def _exon_intron_matchup_flag(self) -> bool:
206        """Check if acceptors and donors count match."""
207        return len(self.acceptors) != len(self.donors)
208
209    def _exon_intron_order_flag(self) -> bool:
210        """Check for ordering issues in exon boundaries."""
211        return any(start > end for start, end in self.exons_pos)
212
213    def _transcript_boundary_flag(self) -> bool:
214        """Check if boundaries are within the transcript start/end range."""
215        if not len(self.acceptors) and not len(self.donors):
216            return False
217        min_boundary = np.min(np.concatenate((self.acceptors, self.donors)))
218        max_boundary = np.max(np.concatenate((self.acceptors, self.donors)))
219        return (self.transcript_lower > min_boundary) or (self.transcript_upper < max_boundary)
220
221    @property
222    def exonic_indices(self) -> np.ndarray:
223        """Return the indices covering exons in the transcript."""
224        return np.concatenate([np.arange(a, b + 1) for a, b in self.exons_pos])
225
226    def _resolve_fasta_path(self, fasta_path: Optional[Path] = None) -> Tuple[Path, List[str]]:
227        """Resolve FASTA file path and contig names for this transcript."""
228        if fasta_path is None:
229            config = get_organism_config(self.organism)
230            fasta_path = config.get("fasta_full_genome") or config.get("fasta")
231            if fasta_path is None:
232                fasta_path = config.get("CHROM_SOURCE") and (Path(config["CHROM_SOURCE"]) / f"chr{self.chrm}.fasta")
233            fasta_path = Path(fasta_path) if fasta_path else None
234            if not fasta_path or not fasta_path.exists():
235                raise ValueError(
236                    "No FASTA path in config (fasta_full_genome or fasta). "
237                    "Run setup_genomics_data() to install the full-genome FASTA (e.g. hg38.fa)."
238                )
239        raw_chr = str(self.chrm).replace("chr", "") or "17"
240        with_chr = f"chr{raw_chr}"
241        contigs = [with_chr, raw_chr] if with_chr != raw_chr else [with_chr]
242        return Path(fasta_path), contigs
243
244    def _assemble_mature_from_fasta(self) -> SeqMat:
245        """Fetch exons directly from FASTA and assemble mature mRNA (skips pre-mRNA)."""
246        fasta_path, contigs = self._resolve_fasta_path()
247
248        # Sort exons by ascending genomic position
249        exon_intervals = sorted((min(a, b), max(a, b)) for a, b in self.exons)
250
251        last_err = None
252        for contig in contigs:
253            try:
254                seqs = []
255                all_indices = []
256                for lo, hi in exon_intervals:
257                    seq_str = _fetch_fasta_region(str(fasta_path), contig, lo, hi)
258                    seqs.append(seq_str)
259                    all_indices.append(np.arange(lo, hi + 1, dtype=np.int64))
260
261                mature = SeqMat(''.join(seqs), indices=np.concatenate(all_indices))
262                if self.rev:
263                    mature.reverse_complement()
264                return mature
265            except Exception as e:
266                last_err = e
267        if last_err is not None:
268            raise last_err
269        raise ValueError(f"Could not fetch exons from FASTA for {self.chrm}")
270
271    def pull_pre_mrna_from_fasta(
272        self,
273        fasta_path: Optional[Path] = None,
274        upstream: int = 0,
275        downstream: int = 0,
276        region_start: Optional[int] = None,
277        region_end: Optional[int] = None,
278    ) -> Dict[str, Any]:
279        """Pre-mRNA sequence from the single full-genome FASTA (e.g. hg38.fa). Optional upstream/downstream."""
280        fasta_path, contigs_to_try = self._resolve_fasta_path(fasta_path)
281        if region_start is not None or region_end is not None:
282            start = region_start if region_start is not None else max(1, self.transcript_lower - upstream)
283            end = region_end if region_end is not None else (self.transcript_upper + downstream)
284        else:
285            start = max(1, self.transcript_lower - upstream)
286            end = self.transcript_upper + downstream
287        last_err = None
288        for contig in contigs_to_try:
289            try:
290                seq_mat = SeqMat.from_fasta_file(fasta_path, contig, start, end)
291                if len(seq_mat.seq) > 0:
292                    return {"seq": seq_mat.seq, "indices": seq_mat.index}
293            except Exception as e:
294                last_err = e
295        if last_err is not None:
296            raise last_err
297        raise ValueError(f"Could not fetch {self.chrm}:{start}-{end} from FASTA (tried contigs {contigs_to_try})")
298
299    def generate_pre_mrna(
300        self,
301        upstream: int = 0,
302        downstream: int = 0,
303        region_start: Optional[int] = None,
304        region_end: Optional[int] = None,
305    ) -> "Transcript":
306        """Load pre-mRNA and set self.pre_mrna. Use upstream/downstream to add flanking (bp).
307
308        Optional region_start/region_end restrict the FASTA read to a genomic
309        sub-interval (e.g. ±7500 bp around a mutation site) instead of fetching
310        the entire transcript, which can be orders of magnitude faster for large
311        genes.
312        """
313        try:
314            seq_data = self.pull_pre_mrna_from_fasta(
315                upstream=upstream, downstream=downstream,
316                region_start=region_start, region_end=region_end,
317            )
318            pre_mrna = SeqMat(**seq_data)
319        except Exception:
320            if region_start is not None or region_end is not None:
321                lo = region_start if region_start is not None else max(1, self.transcript_lower - upstream)
322                hi = region_end if region_end is not None else (self.transcript_upper + downstream)
323            else:
324                lo = max(1, self.transcript_lower - upstream)
325                hi = self.transcript_upper + downstream
326            length = hi - lo + 1
327            indices = np.arange(lo, lo + length)
328            pre_mrna = SeqMat("N" * length, indices=indices)
329        if self.rev:
330            pre_mrna.reverse_complement()
331        self.pre_mrna = pre_mrna
332        return self
333
334    def generate_mature_mrna(self, inplace: bool = True) -> Union['Transcript', SeqMat]:
335        """
336        Generate the mature mRNA by concatenating exon regions from pre_mRNA.
337
338        When pre-mRNA has not been loaded yet, exons are fetched directly from
339        the indexed FASTA — skipping the full-gene read entirely.
340
341        Args:
342            inplace: If True, set self.mature_mrna, else return a new SeqMat
343
344        Returns:
345            The Transcript object (if inplace=True) or a SeqMat (if inplace=False)
346        """
347        self._fix_and_check_introns()
348
349        if self._pre_mrna is not None:
350            # Pre-mRNA loaded (possibly mutated): splice from it
351            result = self.pre_mrna.remove_regions(self.introns)
352        else:
353            # Fast path: fetch exons directly, skip full-gene read
354            try:
355                result = self._assemble_mature_from_fasta()
356            except Exception:
357                # Fall back to full pre-mRNA load + splice
358                result = self.pre_mrna.remove_regions(self.introns)
359
360        if inplace:
361            self.mature_mrna = result
362            return self
363        return result
364
365    @property
366    def orf(self) -> Union[SeqMat, 'Transcript']:
367        """
368        Return the ORF (Open Reading Frame) SeqMat object, if TIS and TTS are available.
369
370        Returns:
371            The ORF SeqMat if TIS/TTS are set, else self
372        """
373        if not self.protein_coding:
374            import logging as _logging
375            _logging.getLogger(__name__).warning(
376                "Cannot create protein without set TIS and TTS values."
377            )
378            return self
379
380        # Extract ORF region from mature mRNA via clone (no string re-parse)
381        if hasattr(self, 'mature_mrna') and self.mature_mrna is not None:
382            return self.mature_mrna.clone(self.TIS, self.TTS)
383
384        return self
385
386    def generate_protein(self, inplace: bool = True) -> Union['Transcript', str]:
387        """
388        Translate the ORF into a protein sequence.
389
390        Args:
391            inplace: If True, store protein in self. Otherwise, return it
392
393        Returns:
394            The Transcript object if inplace=True, else the protein sequence
395        """
396        if not self.protein_coding:
397            return self if inplace else ""
398
399        # Translate the ORF to protein
400        orf_seq = self.orf
401        if isinstance(orf_seq, SeqMat):
402            protein = _translate(orf_seq.seq).strip('*')
403        else:
404            protein = ""
405
406        if inplace:
407            self.protein = protein
408            if self._cons_vector is not None and len(self._cons_vector) != len(protein):
409                self._cons_vector = np.ones(len(protein), dtype=np.float32)
410            return self
411        
412        return protein

Transcript with genomic boundaries, donors/acceptors, pre_mrna, mature_mrna, and optional ORF/protein.

Transcript(d: Dict[str, Any], organism: Optional[str] = None)
45    def __init__(self, d: Dict[str, Any], organism: Optional[str] = None):
46        """Build Transcript from dict of attributes. Requires transcript_start, transcript_end, rev, chrm."""
47        self._cons_vector = None
48        array_fields = {"acceptors", "donors", "cons_vector", "rev"}
49        for k, v in d.items():
50            if k == "cons_vector":
51                self._cons_vector = np.asarray(v, dtype=np.float32) if v is not None else None
52                continue
53            if k in array_fields and v is not None:
54                v = np.array(v)
55            setattr(self, k, v)
56
57        self.organism = organism if organism is not None else get_default_organism()
58        required_attrs = ["transcript_start", "transcript_end", "rev", "chrm"]
59        missing = [attr for attr in required_attrs if not hasattr(self, attr)]
60        if missing:
61            raise AssertionError(f"Transcript is missing required attributes: {missing}")
62        if not hasattr(self, "donors") or self.donors is None:
63            self.donors = np.array([])
64        if not hasattr(self, "acceptors") or self.acceptors is None:
65            self.acceptors = np.array([])
66        if not hasattr(self, "cons_available"):
67            self.cons_available = False
68        self.protein_coding = hasattr(self, "TIS") and hasattr(self, "TTS")
69        self.transcript_upper = max(self.transcript_start, self.transcript_end)
70        self.transcript_lower = min(self.transcript_start, self.transcript_end)
71        self._pre_mrna = None
72        if self.cons_available and hasattr(self, "cons_seq") and self._cons_vector is not None:
73            if self.cons_seq.endswith("*") and len(self.cons_seq) == len(self._cons_vector):
74                self._cons_vector = self._cons_vector[:-1]
75                self.cons_seq = self.cons_seq[:-1]

Build Transcript from dict of attributes. Requires transcript_start, transcript_end, rev, chrm.

organism
protein_coding
transcript_upper
transcript_lower
def clone(self) -> Transcript:
104    def clone(self) -> Transcript:
105        """Returns a deep copy of this Transcript instance."""
106        return copy.deepcopy(self)

Returns a deep copy of this Transcript instance.

pre_mrna: SeqMat
108    @property
109    def pre_mrna(self) -> SeqMat:
110        """Pre-mRNA sequence. Lazy-loaded from FASTA on first access."""
111        if self._pre_mrna is None:
112            self.generate_pre_mrna()
113        return self._pre_mrna

Pre-mRNA sequence. Lazy-loaded from FASTA on first access.

cons_vector: numpy.ndarray
119    @property
120    def cons_vector(self) -> np.ndarray:
121        """Per-residue conservation scores; ones vector when conservation data not loaded."""
122        if self._cons_vector is not None:
123            return self._cons_vector
124        n = len(getattr(self, "protein", "")) if getattr(self, "protein", "") else len(self.pre_mrna)
125        return np.ones(n, dtype=np.float32)

Per-residue conservation scores; ones vector when conservation data not loaded.

exons: List[Tuple[int, int]]
127    @property
128    def exons(self) -> List[Tuple[int, int]]:
129        """Return a list of exon boundary tuples (acceptor, donor)."""
130        exon_starts = np.concatenate(([self.transcript_start], self.acceptors))
131        exon_ends = np.concatenate((self.donors, [self.transcript_end]))
132        return list(zip(exon_starts, exon_ends))

Return a list of exon boundary tuples (acceptor, donor).

exons_pos: List[Tuple[int, int]]
134    @property
135    def exons_pos(self) -> List[Tuple[int, int]]:
136        """Return exons with positions adjusted for strand orientation."""
137        exon_positions = self.exons
138        if self.rev:
139            # Reverse order and swap coordinates for reverse strand
140            exon_positions = [(end, start) for start, end in exon_positions[::-1]]
141        return exon_positions

Return exons with positions adjusted for strand orientation.

introns: List[Tuple[int, int]]
143    @property
144    def introns(self) -> List[Tuple[int, int]]:
145        """Return a list of intron boundaries derived from donors and acceptors."""
146        valid_donors = self.donors[self.donors != self.transcript_end]
147        valid_acceptors = self.acceptors[self.acceptors != self.transcript_start]
148        
149        # Adjust intron boundaries to exclude exon splice sites
150        introns = []
151        for donor, acceptor in zip(valid_donors, valid_acceptors):
152            if self.rev:
153                # For reverse strand: intron from (donor-1) to (acceptor+1)
154                intron_start = donor - 1
155                intron_end = acceptor + 1
156            else:
157                # For forward strand: intron from (donor+1) to (acceptor-1)
158                intron_start = donor + 1
159                intron_end = acceptor - 1
160            introns.append((intron_start, intron_end))
161        
162        return introns

Return a list of intron boundaries derived from donors and acceptors.

introns_pos: List[Tuple[int, int]]
164    @property
165    def introns_pos(self) -> List[Tuple[int, int]]:
166        """Return introns with positions adjusted for strand orientation."""
167        intron_positions = self.introns
168        if self.rev:
169            intron_positions = [(end, start) for start, end in intron_positions[::-1]]
170        return intron_positions

Return introns with positions adjusted for strand orientation.

exonic_indices: numpy.ndarray
221    @property
222    def exonic_indices(self) -> np.ndarray:
223        """Return the indices covering exons in the transcript."""
224        return np.concatenate([np.arange(a, b + 1) for a, b in self.exons_pos])

Return the indices covering exons in the transcript.

def pull_pre_mrna_from_fasta( self, fasta_path: Optional[pathlib.Path] = None, upstream: int = 0, downstream: int = 0, region_start: Optional[int] = None, region_end: Optional[int] = None) -> Dict[str, Any]:
271    def pull_pre_mrna_from_fasta(
272        self,
273        fasta_path: Optional[Path] = None,
274        upstream: int = 0,
275        downstream: int = 0,
276        region_start: Optional[int] = None,
277        region_end: Optional[int] = None,
278    ) -> Dict[str, Any]:
279        """Pre-mRNA sequence from the single full-genome FASTA (e.g. hg38.fa). Optional upstream/downstream."""
280        fasta_path, contigs_to_try = self._resolve_fasta_path(fasta_path)
281        if region_start is not None or region_end is not None:
282            start = region_start if region_start is not None else max(1, self.transcript_lower - upstream)
283            end = region_end if region_end is not None else (self.transcript_upper + downstream)
284        else:
285            start = max(1, self.transcript_lower - upstream)
286            end = self.transcript_upper + downstream
287        last_err = None
288        for contig in contigs_to_try:
289            try:
290                seq_mat = SeqMat.from_fasta_file(fasta_path, contig, start, end)
291                if len(seq_mat.seq) > 0:
292                    return {"seq": seq_mat.seq, "indices": seq_mat.index}
293            except Exception as e:
294                last_err = e
295        if last_err is not None:
296            raise last_err
297        raise ValueError(f"Could not fetch {self.chrm}:{start}-{end} from FASTA (tried contigs {contigs_to_try})")

Pre-mRNA sequence from the single full-genome FASTA (e.g. hg38.fa). Optional upstream/downstream.

def generate_pre_mrna( self, upstream: int = 0, downstream: int = 0, region_start: Optional[int] = None, region_end: Optional[int] = None) -> Transcript:
299    def generate_pre_mrna(
300        self,
301        upstream: int = 0,
302        downstream: int = 0,
303        region_start: Optional[int] = None,
304        region_end: Optional[int] = None,
305    ) -> "Transcript":
306        """Load pre-mRNA and set self.pre_mrna. Use upstream/downstream to add flanking (bp).
307
308        Optional region_start/region_end restrict the FASTA read to a genomic
309        sub-interval (e.g. ±7500 bp around a mutation site) instead of fetching
310        the entire transcript, which can be orders of magnitude faster for large
311        genes.
312        """
313        try:
314            seq_data = self.pull_pre_mrna_from_fasta(
315                upstream=upstream, downstream=downstream,
316                region_start=region_start, region_end=region_end,
317            )
318            pre_mrna = SeqMat(**seq_data)
319        except Exception:
320            if region_start is not None or region_end is not None:
321                lo = region_start if region_start is not None else max(1, self.transcript_lower - upstream)
322                hi = region_end if region_end is not None else (self.transcript_upper + downstream)
323            else:
324                lo = max(1, self.transcript_lower - upstream)
325                hi = self.transcript_upper + downstream
326            length = hi - lo + 1
327            indices = np.arange(lo, lo + length)
328            pre_mrna = SeqMat("N" * length, indices=indices)
329        if self.rev:
330            pre_mrna.reverse_complement()
331        self.pre_mrna = pre_mrna
332        return self

Load pre-mRNA and set self.pre_mrna. Use upstream/downstream to add flanking (bp).

Optional region_start/region_end restrict the FASTA read to a genomic sub-interval (e.g. ±7500 bp around a mutation site) instead of fetching the entire transcript, which can be orders of magnitude faster for large genes.

def generate_mature_mrna( self, inplace: bool = True) -> Union[Transcript, SeqMat]:
334    def generate_mature_mrna(self, inplace: bool = True) -> Union['Transcript', SeqMat]:
335        """
336        Generate the mature mRNA by concatenating exon regions from pre_mRNA.
337
338        When pre-mRNA has not been loaded yet, exons are fetched directly from
339        the indexed FASTA — skipping the full-gene read entirely.
340
341        Args:
342            inplace: If True, set self.mature_mrna, else return a new SeqMat
343
344        Returns:
345            The Transcript object (if inplace=True) or a SeqMat (if inplace=False)
346        """
347        self._fix_and_check_introns()
348
349        if self._pre_mrna is not None:
350            # Pre-mRNA loaded (possibly mutated): splice from it
351            result = self.pre_mrna.remove_regions(self.introns)
352        else:
353            # Fast path: fetch exons directly, skip full-gene read
354            try:
355                result = self._assemble_mature_from_fasta()
356            except Exception:
357                # Fall back to full pre-mRNA load + splice
358                result = self.pre_mrna.remove_regions(self.introns)
359
360        if inplace:
361            self.mature_mrna = result
362            return self
363        return result

Generate the mature mRNA by concatenating exon regions from pre_mRNA.

When pre-mRNA has not been loaded yet, exons are fetched directly from the indexed FASTA — skipping the full-gene read entirely.

Arguments:
  • inplace: If True, set self.mature_mrna, else return a new SeqMat
Returns:

The Transcript object (if inplace=True) or a SeqMat (if inplace=False)

orf: Union[SeqMat, Transcript]
365    @property
366    def orf(self) -> Union[SeqMat, 'Transcript']:
367        """
368        Return the ORF (Open Reading Frame) SeqMat object, if TIS and TTS are available.
369
370        Returns:
371            The ORF SeqMat if TIS/TTS are set, else self
372        """
373        if not self.protein_coding:
374            import logging as _logging
375            _logging.getLogger(__name__).warning(
376                "Cannot create protein without set TIS and TTS values."
377            )
378            return self
379
380        # Extract ORF region from mature mRNA via clone (no string re-parse)
381        if hasattr(self, 'mature_mrna') and self.mature_mrna is not None:
382            return self.mature_mrna.clone(self.TIS, self.TTS)
383
384        return self

Return the ORF (Open Reading Frame) SeqMat object, if TIS and TTS are available.

Returns:

The ORF SeqMat if TIS/TTS are set, else self

def generate_protein(self, inplace: bool = True) -> Union[Transcript, str]:
386    def generate_protein(self, inplace: bool = True) -> Union['Transcript', str]:
387        """
388        Translate the ORF into a protein sequence.
389
390        Args:
391            inplace: If True, store protein in self. Otherwise, return it
392
393        Returns:
394            The Transcript object if inplace=True, else the protein sequence
395        """
396        if not self.protein_coding:
397            return self if inplace else ""
398
399        # Translate the ORF to protein
400        orf_seq = self.orf
401        if isinstance(orf_seq, SeqMat):
402            protein = _translate(orf_seq.seq).strip('*')
403        else:
404            protein = ""
405
406        if inplace:
407            self.protein = protein
408            if self._cons_vector is not None and len(self._cons_vector) != len(protein):
409                self._cons_vector = np.ones(len(protein), dtype=np.float32)
410            return self
411        
412        return protein

Translate the ORF into a protein sequence.

Arguments:
  • inplace: If True, store protein in self. Otherwise, return it
Returns:

The Transcript object if inplace=True, else the protein sequence

def get_default_organism() -> str:
115def get_default_organism() -> str:
116    """Default organism: from SEQMAT_DEFAULT_ORGANISM in config-less mode, else from config file."""
117    if get_data_base() is not None:
118        return os.environ.get("SEQMAT_DEFAULT_ORGANISM", "").strip() or "hg38"
119    config = load_config()
120    return config.get('default_organism', DEFAULT_SETTINGS['default_organism'])

Default organism: from SEQMAT_DEFAULT_ORGANISM in config-less mode, else from config file.

def get_data_dir() -> pathlib.Path:
206def get_data_dir() -> Path:
207    """Return user data directory for seqmat (platformdirs)."""
208    return DEFAULT_DATA_DIR

Return user data directory for seqmat (platformdirs).

def get_config_dir() -> pathlib.Path:
211def get_config_dir() -> Path:
212    """Return the directory containing the active config file."""
213    return DEFAULT_CONFIG_DIR

Return the directory containing the active config file.

def get_config_file() -> pathlib.Path:
216def get_config_file() -> Path:
217    """Return the active config file path (for display or override)."""
218    return CONFIG_FILE

Return the active config file path (for display or override).

def get_data_base() -> Optional[pathlib.Path]:
106def get_data_base() -> Optional[Path]:
107    """If SEQMAT_DATA_DIR is set and exists, return that path (config-less mode). Else None."""
108    env = os.environ.get("SEQMAT_DATA_DIR", "").strip()
109    if not env:
110        return None
111    p = Path(env).expanduser().resolve()
112    return p if p.exists() else None

If SEQMAT_DATA_DIR is set and exists, return that path (config-less mode). Else None.

def load_config() -> Dict[str, Any]:
90def load_config() -> Dict[str, Any]:
91    """Load config from CONFIG_FILE, merged with DEFAULT_SETTINGS."""
92    if CONFIG_FILE.exists():
93        with open(CONFIG_FILE, "r") as f:
94            config = json.load(f)
95            merged_config = DEFAULT_SETTINGS.copy()
96            merged_config.update(config)
97            return merged_config
98    return DEFAULT_SETTINGS.copy()

Load config from CONFIG_FILE, merged with DEFAULT_SETTINGS.

def save_config(config: Dict[str, Any]) -> None:
100def save_config(config: Dict[str, Any]) -> None:
101    """Save config to CONFIG_FILE (creates parent dir if needed)."""
102    CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True)
103    with open(CONFIG_FILE, "w") as f:
104        json.dump(config, f, indent=2)

Save config to CONFIG_FILE (creates parent dir if needed).

def setup_genomics_data( basepath: str, organism: Optional[str] = None, force: bool = False, pickup: bool = False, n_jobs: Optional[int] = None, from_prebuilt: bool = True) -> None:
 72def setup_genomics_data(
 73    basepath: str,
 74    organism: Optional[str] = None,
 75    force: bool = False,
 76    pickup: bool = False,
 77    n_jobs: Optional[int] = None,
 78    from_prebuilt: bool = True,
 79) -> None:
 80    """Set up genomics data for a specific organism.
 81
 82    Args:
 83        basepath: Base directory for storing genomic data.
 84        organism: Organism identifier (e.g. ``"hg38"`` or ``"mm39"``). Uses the configured
 85            default if omitted.
 86        force: Overwrite existing data.
 87        pickup: Resume an interrupted setup, reusing any already-downloaded files.
 88        n_jobs: Workers for GTF parsing (default: CPU count - 1). ``1`` is serial.
 89        from_prebuilt: When ``True`` (default), download prebuilt ``genes.db`` + FASTA from S3.
 90            When ``False``, download GTF/FASTA from Ensembl/UCSC and build ``genes.db`` locally.
 91    """
 92    if organism is None:
 93        organism = get_default_organism()
 94    base_path = Path(basepath) / organism
 95
 96    config_paths = {
 97        "CHROM_SOURCE": str(base_path),
 98        "MRNA_PATH": str(base_path),
 99        "MISSPLICING_PATH": str(base_path / "missplicing"),
100        "ONCOSPLICE_PATH": str(base_path / "oncosplice"),
101        "BASE": str(base_path),
102        "TEMP": str(base_path / "temp"),
103    }
104
105    config = load_config()
106    if organism in config and not force and not pickup:
107        _log.warning(
108            "Organism %s already configured. Use force=True to overwrite or pickup=True to resume.",
109            organism,
110        )
111        return
112
113    if base_path.exists() and any(base_path.iterdir()) and not force and not pickup:
114        _log.warning(
115            "Directory %s not empty. Use force=True to overwrite or pickup=True to resume.",
116            base_path,
117        )
118        return
119
120    _log.info("Setting up genomics data in %s", base_path)
121    base_path.mkdir(parents=True, exist_ok=True)
122
123    if from_prebuilt:
124        _log.info("Downloading prebuilt data for %s from S3...", organism)
125        try:
126            files = download_prebuilt_data(organism, base_path, skip_existing=pickup)
127        except PrebuiltDataUnavailableError as e:
128            _log.error("%s", e)
129            raise
130        config_paths["fasta_full_genome"] = str(files["fasta_file"])
131        config_paths["genes_db"] = str(files["genes_db"])
132    else:
133        _log.info("Downloading source data and building genes.db for %s...", organism)
134        files = download_genome_data(organism, base_path, skip_existing=pickup)
135        config_paths["fasta_full_genome"] = str(files["fasta_file"])
136        cons_data = None
137        if files["cons_file"] is not None:
138            try:
139                cons_data = load_conservation(files["cons_file"])
140            except Exception:
141                cons_data = None
142        annotation_jobs = n_jobs if n_jobs is not None else max(1, cpu_count() - 1)
143        retrieve_and_parse_ensembl_annotations(
144            base_path,
145            files["ensembl_file"],
146            cons_data,
147            gtex_file=files["gtex_file"],
148            n_jobs=annotation_jobs,
149        )
150        config_paths["genes_db"] = str(base_path / "genes.db")
151        if not pickup:
152            for key, file_path in files.items():
153                if key == "fasta_file":
154                    continue
155                if file_path and file_path.exists():
156                    file_path.unlink()
157
158    for path_key in ("MISSPLICING_PATH", "ONCOSPLICE_PATH", "TEMP"):
159        Path(config_paths[path_key]).mkdir(parents=True, exist_ok=True)
160
161    if get_data_base() is None:
162        config[organism] = config_paths
163        save_config(config)
164        _log.info("Configuration saved to: %s", CONFIG_FILE)
165    else:
166        _log.info("SEQMAT_DATA_DIR is set; no config file written.")
167    _append_seqmat_env_to_shell(Path(basepath).resolve(), organism)
168    _log.info("Successfully set up genomics data for %s in %s", organism, basepath)

Set up genomics data for a specific organism.

Arguments:
  • basepath: Base directory for storing genomic data.
  • organism: Organism identifier (e.g. "hg38" or "mm39"). Uses the configured default if omitted.
  • force: Overwrite existing data.
  • pickup: Resume an interrupted setup, reusing any already-downloaded files.
  • n_jobs: Workers for GTF parsing (default: CPU count - 1). 1 is serial.
  • from_prebuilt: When True (default), download prebuilt genes.db + FASTA from S3. When False, download GTF/FASTA from Ensembl/UCSC and build genes.db locally.
def set_fasta_path(fasta_path: str, organism: Optional[str] = None) -> None:
171def set_fasta_path(fasta_path: str, organism: Optional[str] = None) -> None:
172    """Set the full genome FASTA path for an organism in the saved config.
173
174    Useful when you have a FASTA but haven't run the full setup, or need to point
175    SeqMat at a different reference build.
176    """
177    if organism is None:
178        organism = get_default_organism()
179    fa_path = Path(fasta_path)
180    if not fa_path.exists():
181        raise ValueError(f"FASTA file not found: {fa_path}")
182
183    config = load_config()
184    config.setdefault(organism, {})
185    config[organism]["fasta_full_genome"] = str(fa_path)
186    save_config(config)
187    _log.info("Set fasta_full_genome for %s to: %s (config: %s)", organism, fa_path, CONFIG_FILE)

Set the full genome FASTA path for an organism in the saved config.

Useful when you have a FASTA but haven't run the full setup, or need to point SeqMat at a different reference build.

def list_available_organisms() -> List[str]:
26def list_available_organisms() -> List[str]:
27    """Organism IDs that are currently configured/installed."""
28    return get_available_organisms()

Organism IDs that are currently configured/installed.

def list_supported_organisms() -> List[str]:
31def list_supported_organisms() -> List[str]:
32    """Alias of :func:`list_available_organisms` (kept for back-compat)."""
33    return get_available_organisms()

Alias of list_available_organisms() (kept for back-compat).

def get_organism_info(organism: str) -> Dict[str, Any]:
197def get_organism_info(organism: str) -> Dict[str, Any]:
198    """Detailed information about an organism: paths + biotype-keyed gene counts."""
199    try:
200        config = get_organism_config(organism)
201    except ValueError:
202        return {"error": f"Organism '{organism}' not configured"}
203
204    info: Dict[str, Any] = {
205        "organism": organism,
206        "configured": True,
207        "paths": {k: str(v) for k, v in config.items()},
208        "data_available": {},
209    }
210
211    counts = count_genes(organism)
212    if counts:
213        info["data_available"]["biotypes"] = sorted(counts.keys())
214        info["data_available"]["gene_counts"] = counts
215
216    chrom_path = config.get("CHROM_SOURCE")
217    if chrom_path and Path(chrom_path).exists():
218        chrom_files = list(Path(chrom_path).glob("*.fasta"))
219        if chrom_files:
220            info["data_available"]["chromosomes"] = [f.stem for f in chrom_files]
221
222    return info

Detailed information about an organism: paths + biotype-keyed gene counts.

def list_gene_biotypes(organism: Optional[str] = None) -> List[str]:
52def list_gene_biotypes(organism: Optional[str] = None) -> List[str]:
53    """List distinct ``gene_biotype`` values present in ``genes.db``."""
54    conn = _connect(organism)
55    if conn is None:
56        return []
57    try:
58        rows = conn.execute("SELECT DISTINCT biotype FROM genes ORDER BY biotype").fetchall()
59    finally:
60        conn.close()
61    return [r[0] for r in rows if r[0]]

List distinct gene_biotype values present in genes.db.

def count_genes( organism: Optional[str] = None, biotype: Optional[str] = None) -> Dict[str, int]:
64def count_genes(organism: Optional[str] = None, biotype: Optional[str] = None) -> Dict[str, int]:
65    """Count genes per biotype.
66
67    If ``biotype`` is given, returns ``{biotype: count}`` for that one biotype.
68    Otherwise returns a dict of all biotypes -> counts.
69    """
70    conn = _connect(organism)
71    if conn is None:
72        return {}
73    try:
74        if biotype:
75            row = conn.execute(
76                "SELECT COUNT(*) FROM genes WHERE biotype = ?", (biotype,)
77            ).fetchone()
78            return {biotype: int(row[0])} if row else {biotype: 0}
79        rows = conn.execute(
80            "SELECT biotype, COUNT(*) FROM genes GROUP BY biotype ORDER BY biotype"
81        ).fetchall()
82    finally:
83        conn.close()
84    return {bt: int(n) for bt, n in rows if bt}

Count genes per biotype.

If biotype is given, returns {biotype: count} for that one biotype. Otherwise returns a dict of all biotypes -> counts.

def get_gene_list( organism: Optional[str] = None, biotype: Optional[str] = None, limit: Optional[int] = None) -> List[str]:
 87def get_gene_list(
 88    organism: Optional[str] = None,
 89    biotype: Optional[str] = None,
 90    limit: Optional[int] = None,
 91) -> List[str]:
 92    """List gene names (skipping empty Ensembl symbols), optionally filtered by biotype."""
 93    conn = _connect(organism)
 94    if conn is None:
 95        return []
 96    sql = "SELECT gene_name FROM genes WHERE gene_name != ''"
 97    params: tuple = ()
 98    if biotype:
 99        sql += " AND biotype = ?"
100        params = (biotype,)
101    sql += " ORDER BY gene_name"
102    if limit:
103        sql += " LIMIT ?"
104        params = (*params, int(limit))
105    try:
106        rows = conn.execute(sql, params).fetchall()
107    finally:
108        conn.close()
109    return [r[0] for r in rows]

List gene names (skipping empty Ensembl symbols), optionally filtered by biotype.

def get_all_genes( organism: Optional[str] = None, biotype: Optional[str] = None) -> List[Dict[str, str]]:
112def get_all_genes(
113    organism: Optional[str] = None,
114    biotype: Optional[str] = None,
115) -> List[Dict[str, str]]:
116    """Return all genes for an organism as ``[{organism, biotype, gene_name, gene_id}, ...]``."""
117    organism = organism or "<default>"
118    conn = _connect(organism if organism != "<default>" else None)
119    if conn is None:
120        return []
121    sql = "SELECT gene_name, gene_id, biotype FROM genes"
122    params: tuple = ()
123    if biotype:
124        sql += " WHERE biotype = ?"
125        params = (biotype,)
126    sql += " ORDER BY gene_name"
127    try:
128        rows = conn.execute(sql, params).fetchall()
129    finally:
130        conn.close()
131    return [
132        {"organism": organism, "biotype": bt, "gene_name": gn, "gene_id": gid}
133        for gn, gid, bt in rows
134    ]

Return all genes for an organism as [{organism, biotype, gene_name, gene_id}, ...].

def data_summary() -> Dict[str, Any]:
225def data_summary() -> Dict[str, Any]:
226    """A complete data overview across every configured organism."""
227    configured = list_available_organisms()
228    summary: Dict[str, Any] = {
229        "supported_organisms": configured,
230        "configured_organisms": configured,
231        "organisms": {},
232    }
233    for organism in configured:
234        try:
235            summary["organisms"][organism] = get_organism_info(organism)
236        except Exception as e:  # pragma: no cover - defensive
237            summary["organisms"][organism] = {"error": f"Configuration error: {e}"}
238
239    total_genes = 0
240    total_biotypes: set = set()
241    for org_info in summary["organisms"].values():
242        gc = org_info.get("data_available", {}).get("gene_counts", {})
243        for biotype, count in gc.items():
244            total_genes += count
245            total_biotypes.add(biotype)
246    summary["totals"] = {
247        "organisms": len(configured),
248        "biotypes": len(total_biotypes),
249        "genes": total_genes,
250    }
251    return summary

A complete data overview across every configured organism.

def search_genes( organism: Optional[str] = None, query: str = '', biotype: Optional[str] = None, limit: int = 10) -> List[Dict[str, str]]:
137def search_genes(
138    organism: Optional[str] = None,
139    query: str = "",
140    biotype: Optional[str] = None,
141    limit: int = 10,
142) -> List[Dict[str, str]]:
143    """Search genes by ``gene_name`` or ``gene_id`` substring (case-insensitive)."""
144    if not query:
145        return []
146    conn = _connect(organism)
147    if conn is None:
148        return []
149    pattern = f"%{query}%"
150    sql = (
151        "SELECT gene_name, gene_id, biotype FROM genes "
152        "WHERE (UPPER(gene_name) LIKE UPPER(?) OR UPPER(gene_id) LIKE UPPER(?))"
153    )
154    params: tuple = (pattern, pattern)
155    if biotype:
156        sql += " AND biotype = ?"
157        params = (*params, biotype)
158    sql += " ORDER BY gene_name LIMIT ?"
159    params = (*params, int(limit))
160    try:
161        rows = conn.execute(sql, params).fetchall()
162    finally:
163        conn.close()
164    org_label = organism or "<default>"
165    return [
166        {"organism": org_label, "biotype": bt, "gene_name": gn, "gene_id": gid}
167        for gn, gid, bt in rows
168    ]

Search genes by gene_name or gene_id substring (case-insensitive).

def available_genes( organism: str = 'hg38', biotype: Optional[str] = 'protein_coding') -> Iterator[str]:
171def available_genes(organism: str = "hg38", biotype: Optional[str] = "protein_coding") -> Iterator[str]:
172    """Yield distinct gene symbols installed for an organism.
173
174    Defaults to protein-coding genes (matches the legacy filesystem-backed behavior).
175    Pass ``biotype=None`` to stream every biotype. Empty Ensembl symbols are skipped.
176    """
177    conn = _connect(organism)
178    if conn is None:
179        return
180    sql = "SELECT DISTINCT gene_name FROM genes WHERE gene_name != ''"
181    params: tuple = ()
182    if biotype is not None:
183        sql += " AND biotype = ?"
184        params = (biotype,)
185    sql += " ORDER BY gene_name"
186    try:
187        for (name,) in conn.execute(sql, params):
188            yield name
189    finally:
190        conn.close()

Yield distinct gene symbols installed for an organism.

Defaults to protein-coding genes (matches the legacy filesystem-backed behavior). Pass biotype=None to stream every biotype. Empty Ensembl symbols are skipped.

def gene_names_at_position( chrm: str, pos: Union[int, Tuple[int, int], List[int]], organism: Optional[str] = None) -> List[str]:
201def gene_names_at_position(
202    chrm: str, pos: PosArg, organism: Optional[str] = None
203) -> List[str]:
204    """Fast name-only lookup: gene names overlapping a point or range. No BLOB load."""
205    return _get_index(organism).query_names(chrm, pos)

Fast name-only lookup: gene names overlapping a point or range. No BLOB load.

def build_location_index(organism: Optional[str] = None, force: bool = False) -> pathlib.Path:
184def build_location_index(organism: Optional[str] = None, force: bool = False) -> Path:
185    """Build (or rebuild) the on-disk location index for an organism.
186
187    Returns the sidecar path. Use ``force=True`` to overwrite an existing index.
188    """
189    if organism is None:
190        organism = get_default_organism()
191    npz_path = _locations_path(organism)
192    if npz_path.exists() and not force:
193        _INDEX_CACHE.pop(organism, None)
194        _get_index(organism)
195        return npz_path
196    _INDEX_CACHE.pop(organism, None)
197    _get_index(organism, rebuild=True)
198    return npz_path

Build (or rebuild) the on-disk location index for an organism.

Returns the sidecar path. Use force=True to overwrite an existing index.

def build_lmdb( annotations_dir: Optional[str] = None, output_path: Optional[str] = None, organism: Optional[str] = None) -> str:
105def build_lmdb(
106    annotations_dir: Optional[str] = None,
107    output_path: Optional[str] = None,
108    organism: Optional[str] = None,
109) -> str:
110    """Build LMDB from per-gene pickle files. Requires pip install seqmat[lmdb]."""
111    _require_lmdb()
112    if organism is None:
113        organism = get_default_organism()
114    if annotations_dir is None:
115        config = get_organism_config(organism)
116        annotations_dir = str(config["MRNA_PATH"])
117    ann_path = Path(annotations_dir)
118    if not ann_path.exists():
119        raise FileNotFoundError(f"Annotations directory not found: {ann_path}")
120    if output_path is None:
121        output_path = str(ann_path / "genes.lmdb")
122    out = Path(output_path)
123    if out.exists():
124        shutil.rmtree(str(out))
125    pkl_files = sorted(ann_path.glob("**/*.pkl"))
126    if not pkl_files:
127        raise FileNotFoundError(f"No .pkl files found under {ann_path}")
128    total_bytes = sum(f.stat().st_size for f in pkl_files)
129    map_size = int(total_bytes * 1.5) + 10 * 1024 * 1024
130    env = _lmdb.open(str(out), map_size=map_size, max_dbs=0)
131    genes_written = 0
132    skipped = 0
133    total_size = 0
134    with env.begin(write=True) as txn:
135        for pkl_file in pkl_files:
136            try:
137                raw_bytes = pkl_file.read_bytes()
138                gene_name = _gene_name_from_pkl_stem(pkl_file.stem)
139                txn.put(gene_name.encode("utf-8"), raw_bytes)
140                genes_written += 1
141                total_size += len(raw_bytes)
142            except Exception as exc:
143                _log.warning("Skipped %s: %s", pkl_file.name, exc)
144                skipped += 1
145    env.close()
146    _log.info(
147        "LMDB built at %s%s genes, %.1f MB%s",
148        out, f"{genes_written:,}", total_size / (1024 * 1024),
149        f", {skipped} skipped" if skipped else "",
150    )
151    return str(out)

Build LMDB from per-gene pickle files. Requires pip install seqmat[lmdb].

class SeqMatError(builtins.Exception):
10class SeqMatError(Exception):
11    """Base class for all SeqMat exceptions."""

Base class for all SeqMat exceptions.

class GeneNotFoundError(seqmat.SeqMatError, builtins.LookupError):
14class GeneNotFoundError(SeqMatError, LookupError):
15    """Raised when a gene cannot be located in the configured organism's database."""

Raised when a gene cannot be located in the configured organism's database.

class OrganismNotConfiguredError(seqmat.SeqMatError, builtins.ValueError):
18class OrganismNotConfiguredError(SeqMatError, ValueError):
19    """Raised when an organism has no data set up (run ``seqmat setup``)."""

Raised when an organism has no data set up (run seqmat setup).

class DataUnavailableError(seqmat.SeqMatError, builtins.RuntimeError):
22class DataUnavailableError(SeqMatError, RuntimeError):
23    """Raised when prebuilt data (genes.db, FASTA, etc.) cannot be obtained."""

Raised when prebuilt data (genes.db, FASTA, etc.) cannot be obtained.

PrebuiltDataUnavailableError = <class 'DataUnavailableError'>