Last active
October 13, 2020 00:01
-
-
Save kshefchek/dc9324023f9cc54333298658e1f9f49e to your computer and use it in GitHub Desktop.
python implementation of phenodigm for HPO only
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from typing import Set, Union, List, Optional, Sequence, Num | |
| from functools import reduce, partial | |
| from collections import Hashable | |
| #from decorators import memoized | |
| import copy | |
| from itertools import chain | |
| from rdflib import URIRef, BNode, Literal, Graph, RDFS | |
| # Union types | |
| Num = Union[int, float] | |
| # I/O | |
| ic_fh = open("/home/kshefchek/ic-cache.tsv", "r") | |
| ic_map = {} | |
| for line in ic_fh.readlines(): | |
| hpo_id, ic = line.rstrip("\n").split("\t") | |
| ic_map[hpo_id] = float(ic) | |
| def jaccard(set1: Set, set2: Set) -> float: | |
| return len(set1.intersection(set2))/len(set1.union(set2)) | |
| def geometric_mean(values: Sequence[Num]) -> float: | |
| return reduce(lambda x,y: x*y, values)**(1/(len(values))) | |
| def mean(values: Sequence[Num]) -> float: | |
| return reduce(lambda x,y: x+y, values)/(len(values)) | |
| def hp_iri2curie(iri:str) -> str: | |
| return iri.replace("http://purl.obolibrary.org/obo/HP_", "HP:") | |
| def hp_curie2iri(curie:str) -> URIRef: | |
| return URIRef(curie.replace("HP:", "http://purl.obolibrary.org/obo/HP_")) | |
| def flip_matrix(matrix: Sequence[Sequence]) -> Sequence[Sequence]: | |
| """ | |
| swap rows and columns in a list of lists | |
| """ | |
| flipped_matrix = [[0 for column in range(len(row))] for row in matrix] | |
| for row_index, row in enumerate(matrix): | |
| for col_index, cell in enumerate(row): | |
| flipped_matrix[col_index][row_index] = cell | |
| return flipped_matrix | |
| def _get_closure(graph: Graph, | |
| node: str, | |
| edge: Optional[URIRef]=None, | |
| root: Optional[str]=None, | |
| reflexive: Optional[bool] = True) -> Set[str]: | |
| nodes = set() | |
| root_seen = {} | |
| if node is not None: | |
| node = hp_curie2iri(node) | |
| if root is not None: | |
| root = hp_curie2iri(root) | |
| root_seen = {root: 1} | |
| for obj in graph.transitive_objects(node, edge, root_seen): | |
| if isinstance(obj, Literal) or isinstance(obj, BNode): | |
| continue | |
| if not reflexive and node == obj: | |
| continue | |
| nodes.add(hp_iri2curie(str(obj))) | |
| # Add root to graph | |
| if root is not None: | |
| nodes.add(hp_iri2curie(str(root))) | |
| return nodes | |
| def get_closure(graph: Graph, | |
| node: str, | |
| edge: Optional[URIRef]=None, | |
| root: Optional[str]=None, | |
| reflexive: Optional[bool] = True) -> Set[str]: | |
| # @memoized | |
| def get_closure_memoized(node: int) -> List[int]: | |
| node = "HP:" + str(node).zfill(7) | |
| closure = _get_closure(graph, node, edge, root, reflexive) | |
| return [int(id.replace("HP:","")) for id in closure] | |
| hp_id = int(node.replace("HP:", "")) | |
| closure = get_closure_memoized(hp_id) | |
| return {"HP:" + str(node).zfill(7) for node in closure} | |
| def get_mica_ic(phenotype1: str, phenotype2: str, graph: Graph) -> Num: | |
| ROOT = "HP:0000118" | |
| p1_closure = get_closure(graph, phenotype1, RDFS['subClassOf'], ROOT) | |
| p2_closure = get_closure(graph, phenotype2, RDFS['subClassOf'], ROOT) | |
| return max([ic_map[parent] for parent in p1_closure.intersection(p2_closure)]) | |
| def jac_ic_geomean(pheno1, pheno2, graph: Graph): | |
| ROOT = "HP:0000118" | |
| jaccard_sim = jaccard( | |
| get_closure(graph, pheno1, RDFS['subClassOf'], ROOT), | |
| get_closure(graph, pheno2, RDFS['subClassOf'], ROOT) | |
| ) | |
| mica = get_mica_ic(pheno1, pheno2, graph) | |
| return geometric_mean([jaccard_sim, mica]) | |
| def get_score_matrix(profile1, profile2, graph:Graph): | |
| score_matrix = [[]] | |
| for index, pheno1 in enumerate(profile1): | |
| if index == len(score_matrix): | |
| score_matrix.append([]) | |
| for pheno2 in profile2: | |
| score_matrix[index].append(jac_ic_geomean(pheno1, pheno2, graph)) | |
| return score_matrix | |
| def get_optimal_matrix(profile:Sequence[str]): | |
| """ | |
| Only implemented for same species comparisons | |
| """ | |
| score_matrix = [] | |
| for pheno in profile: | |
| score_matrix.append([geometric_mean([1, ic_map[pheno]])]) | |
| return score_matrix | |
| def sym_bma_score(matrix: Sequence[Sequence[Num]]) -> float: | |
| """ | |
| best max average score | |
| """ | |
| forwards = [max(row) for row in matrix] | |
| flipped = flip_matrix(matrix) | |
| backwards = [max(row) for row in flipped] | |
| combined_maxes = forwards + backwards | |
| return mean(combined_maxes) | |
| def max_score(matrix): | |
| return max(list(chain.from_iterable(matrix))) | |
| def bma_score(matrix): | |
| return mean([max(row) for row in matrix]) | |
| def max_percentage_score(query_matrix, optimal_matrix): | |
| return max_score(query_matrix) / max_score(optimal_matrix) | |
| def bma_percentage_score(query_matrix, optimal_matrix): | |
| return sym_bma_score(query_matrix) / sym_bma_score(optimal_matrix) | |
| def compute_phenodigm_score(query_matrix: Sequence[Sequence], | |
| optimal_matrix: Sequence[Sequence]) -> float: | |
| return 100 * mean([max_percentage_score(query_matrix, optimal_matrix), | |
| bma_percentage_score(query_matrix, optimal_matrix)]) | |
| def phenodigm_compare(profile_a: Sequence[str], | |
| profile_b: Sequence[str], | |
| graph: Graph, | |
| is_same_species: Optional[bool]=True) -> float: | |
| query_matrix = get_score_matrix(profile_a, profile_b, graph) | |
| if is_same_species: | |
| optimal_matrix = get_optimal_matrix(profile_a) | |
| else: | |
| raise NotImplementedError | |
| return compute_phenodigm_score(query_matrix, optimal_matrix) | |
| hp_graph = Graph() | |
| hp_graph.parse('http://purl.obolibrary.org/obo/hp.owl', format='xml') | |
| pheno_profile1 = ["HP:0001595", "HP:0002360"] | |
| pheno_profile2 = ["HP:0002219", "HP:0002360"] | |
| print(phenodigm_compare(pheno_profile1, pheno_profile2, hp_graph)) | |
| print(phenodigm_compare(pheno_profile2, pheno_profile1, hp_graph)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi Kent,
I was trying to calculate the phenodigm scores for two sets of HPO terms, and your script would be very useful for that. Would you mind also sharing the file "/home/kshefchek/ic-cache.tsv"?
Best,
Emily