Skip to content

Instantly share code, notes, and snippets.

@simon-anders
Created May 10, 2026 15:16
Show Gist options
  • Select an option

  • Save simon-anders/77245bfdc91cbc3fb6f24e8cf7c5fa4e to your computer and use it in GitHub Desktop.

Select an option

Save simon-anders/77245bfdc91cbc3fb6f24e8cf7c5fa4e to your computer and use it in GitHub Desktop.
import numpy as np
import pysam
import numba
@numba.njit
def _check_cigar_pos(pos, cigar):
if pos > cigar.shape[0]:
raise ValueError("Invalid CIGAR string (1)")
@numba.njit
def _tokenize_cigar_string( cigar ):
# constants
ord_0 = 48 # ord(b'0')
ord_9 = 57 # ord(b'9')
# get length
num_ops = 0
for i in range(len(cigar)):
if cigar[i] < ord_0 or cigar[i] > ord_9:
# not a digit, thus an operation
num_ops += 1
operations = np.zeros(num_ops, np.uint8)
step_sizes = np.zeros(num_ops, np.int32)
cigar_pos = 0
for i in range(num_ops):
# Decode next CIGAR operation. First find number.
step = 0
while cigar[cigar_pos] >= ord_0 and cigar[cigar_pos] <= ord_9:
step = step*10 + cigar[cigar_pos] - ord_0
cigar_pos += 1
_check_cigar_pos(cigar_pos, cigar)
step_sizes[i] = step
# Now get CIGAR operation letter
if cigar_pos >= cigar.shape[0]:
raise ValueError("Invalid CIGAR string (2)")
operations[i] = cigar[cigar_pos]
cigar_pos += 1
if i < num_ops:
_check_cigar_pos(cigar_pos, cigar)
return operations, step_sizes
def tokenize_cigar_string( cigarstring ):
"""Given a CIGAR string, this function returns a pair of arrays:
- an np.uint8 array with the CIGAR operation letters
(as ASCII codes of one the letters M, I, D, N, S, H, P, =, X)
- an np.int32 vector with the sizes of the operations
"""
cigar = np.frombuffer( bytes(cigarstring, 'ascii'), np.uint8 )
return _tokenize_cigar_string( cigar )
_valid_cigar_ops = np.frombuffer( b'MIDNSHP=X', np.uint8 )
_query_consuming_cigar_ops = np.frombuffer( b'MIS=X', np.uint8 )
_reference_consuming_cigar_ops = np.frombuffer( b'MDN=X', np.uint8 )
@numba.njit
def _get_q2r_positions( cigar_ops, cigar_step_sizes, seq_len, ref_start ):
q2rpos = np.empty( seq_len, dtype=np.int64 )
query_pos = 0
ref_pos = ref_start
cigar_pos = 0
for i in range(cigar_ops.shape[0]):
# Check whether letter is valied
if cigar_ops[i] not in _valid_cigar_ops: #b'MIDNSHP=X':
raise ValueError("Invalid CIGAR string (unknown operation)")
# Check whether operation consumes query and/or reference
query_consuming = cigar_ops[i] in _query_consuming_cigar_ops
ref_consuming = cigar_ops[i] in _reference_consuming_cigar_ops
if query_consuming:
step_end = query_pos + cigar_step_sizes[i]
while query_pos < step_end:
if ref_consuming:
q2rpos[query_pos] = ref_pos
ref_pos += 1
else:
q2rpos[query_pos] = -1
query_pos += 1
else:
if ref_consuming:
ref_pos += cigar_step_sizes[i]
if query_pos != seq_len:
raise ValueError("Invalid CIGAR string (Query sequence not exactly consumed)")
return q2rpos
def get_q2r_positions( cigarstring, query_seq_len, ref_start=0 ):
""" Given a CIGAR string, find for each base in the query sequence (i.e., the read)
the position on the reference sequence (i.e., the chromosome) with which the
alignment pairs it.
The function needs as arguments the CIGAR string, the length of the read and
the reference start position of the alignment.
"""
cigar_ops, cigar_step_sizes = tokenize_cigar_string( cigarstring )
return _get_q2r_positions( cigar_ops, cigar_step_sizes, query_seq_len, ref_start )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment