Created
May 10, 2026 15:16
-
-
Save simon-anders/77245bfdc91cbc3fb6f24e8cf7c5fa4e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import numpy as np | |
| import pysam | |
| import numba | |
| @numba.njit | |
| def _check_cigar_pos(pos, cigar): | |
| if pos > cigar.shape[0]: | |
| raise ValueError("Invalid CIGAR string (1)") | |
| @numba.njit | |
| def _tokenize_cigar_string( cigar ): | |
| # constants | |
| ord_0 = 48 # ord(b'0') | |
| ord_9 = 57 # ord(b'9') | |
| # get length | |
| num_ops = 0 | |
| for i in range(len(cigar)): | |
| if cigar[i] < ord_0 or cigar[i] > ord_9: | |
| # not a digit, thus an operation | |
| num_ops += 1 | |
| operations = np.zeros(num_ops, np.uint8) | |
| step_sizes = np.zeros(num_ops, np.int32) | |
| cigar_pos = 0 | |
| for i in range(num_ops): | |
| # Decode next CIGAR operation. First find number. | |
| step = 0 | |
| while cigar[cigar_pos] >= ord_0 and cigar[cigar_pos] <= ord_9: | |
| step = step*10 + cigar[cigar_pos] - ord_0 | |
| cigar_pos += 1 | |
| _check_cigar_pos(cigar_pos, cigar) | |
| step_sizes[i] = step | |
| # Now get CIGAR operation letter | |
| if cigar_pos >= cigar.shape[0]: | |
| raise ValueError("Invalid CIGAR string (2)") | |
| operations[i] = cigar[cigar_pos] | |
| cigar_pos += 1 | |
| if i < num_ops: | |
| _check_cigar_pos(cigar_pos, cigar) | |
| return operations, step_sizes | |
| def tokenize_cigar_string( cigarstring ): | |
| """Given a CIGAR string, this function returns a pair of arrays: | |
| - an np.uint8 array with the CIGAR operation letters | |
| (as ASCII codes of one the letters M, I, D, N, S, H, P, =, X) | |
| - an np.int32 vector with the sizes of the operations | |
| """ | |
| cigar = np.frombuffer( bytes(cigarstring, 'ascii'), np.uint8 ) | |
| return _tokenize_cigar_string( cigar ) | |
| _valid_cigar_ops = np.frombuffer( b'MIDNSHP=X', np.uint8 ) | |
| _query_consuming_cigar_ops = np.frombuffer( b'MIS=X', np.uint8 ) | |
| _reference_consuming_cigar_ops = np.frombuffer( b'MDN=X', np.uint8 ) | |
| @numba.njit | |
| def _get_q2r_positions( cigar_ops, cigar_step_sizes, seq_len, ref_start ): | |
| q2rpos = np.empty( seq_len, dtype=np.int64 ) | |
| query_pos = 0 | |
| ref_pos = ref_start | |
| cigar_pos = 0 | |
| for i in range(cigar_ops.shape[0]): | |
| # Check whether letter is valied | |
| if cigar_ops[i] not in _valid_cigar_ops: #b'MIDNSHP=X': | |
| raise ValueError("Invalid CIGAR string (unknown operation)") | |
| # Check whether operation consumes query and/or reference | |
| query_consuming = cigar_ops[i] in _query_consuming_cigar_ops | |
| ref_consuming = cigar_ops[i] in _reference_consuming_cigar_ops | |
| if query_consuming: | |
| step_end = query_pos + cigar_step_sizes[i] | |
| while query_pos < step_end: | |
| if ref_consuming: | |
| q2rpos[query_pos] = ref_pos | |
| ref_pos += 1 | |
| else: | |
| q2rpos[query_pos] = -1 | |
| query_pos += 1 | |
| else: | |
| if ref_consuming: | |
| ref_pos += cigar_step_sizes[i] | |
| if query_pos != seq_len: | |
| raise ValueError("Invalid CIGAR string (Query sequence not exactly consumed)") | |
| return q2rpos | |
| def get_q2r_positions( cigarstring, query_seq_len, ref_start=0 ): | |
| """ Given a CIGAR string, find for each base in the query sequence (i.e., the read) | |
| the position on the reference sequence (i.e., the chromosome) with which the | |
| alignment pairs it. | |
| The function needs as arguments the CIGAR string, the length of the read and | |
| the reference start position of the alignment. | |
| """ | |
| cigar_ops, cigar_step_sizes = tokenize_cigar_string( cigarstring ) | |
| return _get_q2r_positions( cigar_ops, cigar_step_sizes, query_seq_len, ref_start ) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment