Created
December 10, 2019 01:15
-
-
Save kshefchek/94f1b9fb2bda98994819188ed54e0aca to your computer and use it in GitHub Desktop.
Synthetic patient code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def random_weighted_walk( | |
| start: str, | |
| graph: Graph, | |
| end_distance: int, | |
| travelled_cache: Optional[Dict]=None, | |
| direction: str='outgoing', | |
| dist_walked=0) -> str: | |
| """ | |
| Traverse an ontology, ending up {distance} edges away from the | |
| starting node | |
| This has a hardcoded weighting for moving towards the root of the ontology | |
| Assumes that the starting node is a leaf or leafy, and therefore the | |
| first step is always over an outgoing subClassOf, ie towards the root | |
| travelled_cache | |
| A dictionary of nodes and their distance from the starting node | |
| """ | |
| # Type conversions | |
| if not isinstance(start, URIRef): | |
| if start.startswith('http'): | |
| start = URIRef(start) | |
| else: | |
| start = URIRef(expand_uri(start, strict=True)) | |
| if travelled_cache is None: | |
| travelled_cache = { | |
| str(start): 0 | |
| } | |
| if dist_walked == end_distance: | |
| return contract_uri(str(start), strict=True)[0] | |
| # Convert curie to IRI | |
| root = expand_uri("HP:0000118", strict=True) | |
| if direction == 'incoming': | |
| nodes = list(graph.subjects(RDFS['subClassOf'], start)) | |
| if len(nodes) == 0: | |
| # We're on a leaf node and can only go up, reverse course | |
| nodes = list(graph.objects(start, RDFS['subClassOf'])) | |
| elif direction == 'outgoing': | |
| nodes = list(graph.objects(start, RDFS['subClassOf'])) | |
| else: | |
| raise ValueError("direction can only be incoming or outgoing") | |
| # Pick a parent term randomly | |
| rand_node = np.random.choice(nodes, 1)[0] | |
| if root in [str(n) for n in graph.objects(start, RDFS['subClassOf'])]: | |
| return contract_uri(str(start), strict=True)[0] | |
| if str(rand_node) == root: | |
| raise ValueError("Should not have made it to the root") | |
| if str(rand_node) in travelled_cache: | |
| dist_walked = travelled_cache[str(rand_node)] | |
| else: | |
| dist_walked += 1 | |
| travelled_cache[str(rand_node)] = dist_walked | |
| # Pick a weighted random direction | |
| # hardcoded .3 incoming and .7 outgoing for now | |
| new_direction = np.random.choice(['incoming', 'outgoing'], 1, p=[.35, .65])[0] | |
| return random_weighted_walk( | |
| rand_node, graph, end_distance, travelled_cache, new_direction, dist_walked | |
| ) | |
| # Credit https://stackoverflow.com/a/36205932 | |
| def randn_skew_fast(N, alpha=0.0, loc=0.0, scale=1.0): | |
| sigma = alpha / np.sqrt(1.0 + alpha ** 2) | |
| u0 = np.random.randn(N) | |
| v = np.random.randn(N) | |
| u1 = (sigma * u0 + np.sqrt(1.0 - sigma ** 2) * v) * scale | |
| u1[u0 < 0] *= -1 | |
| u1 = u1 + loc | |
| return u1 | |
| def _get_noisy_phenotypes() -> List[str]: | |
| return [ | |
| 'HP:0011410', | |
| 'HP:0002020', | |
| 'HP:0001787', | |
| 'HP:0001622', | |
| 'HP:0002019', | |
| 'HP:0002015', | |
| 'HP:0001344', | |
| 'HP:0002099', | |
| 'HP:0002376', | |
| 'HP:0000739', | |
| 'HP:0002870', | |
| 'HP:0002194', | |
| 'HP:0001763', | |
| 'HP:0000218', | |
| 'HP:0002033', | |
| 'HP:0001773', | |
| 'HP:0000545', | |
| 'HP:0002643', | |
| 'HP:0001382', | |
| 'HP:0002315', | |
| 'HP:0000938', | |
| 'HP:0012450', | |
| 'HP:0012378', | |
| 'HP:0007018', | |
| 'HP:0000822', | |
| 'HP:0030363', | |
| 'HP:0002076', | |
| 'HP:0011471', | |
| 'HP:0001518', | |
| 'HP:0001513', | |
| 'HP:0000483', | |
| 'HP:0002360', | |
| 'HP:0001558', | |
| 'HP:0000322', | |
| 'HP:0000565', | |
| 'HP:0200055', | |
| 'HP:0000716', | |
| 'HP:0000577', | |
| 'HP:0000964', | |
| 'HP:0002829', | |
| 'HP:0002027', | |
| 'HP:0010862', | |
| 'HP:0001288' | |
| ] | |
| def simulate_from_gold(phenotypes: List[str], graph: Graph) -> FrozenSet[str]: | |
| """ | |
| Based on analysis of UDN patients | |
| Omissions: | |
| np.random.choice( | |
| [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1], | |
| p=[0.132, 0.317, 0.317, 0.150, 0.060, 0.012, 0.005, | |
| 0.003, 0.0015, 0.0015, 0.0010]) | |
| Round up for 0, .1, half to even otherwise | |
| Imprecision: | |
| Right normal skewed | |
| alpha: 1.5, loc (mean): 6, scale (std dev): 12 | |
| imp_num = round(abs(randn_skew_fast(1, 1.5, 6, 12))) | |
| imp_phenotypes = np.random.choice(phenotypes, imp_num, replace=True) | |
| Distance from node: | |
| round(abs(np.random.normal(4.822, 2.329, 1)[0])) | |
| Noise: | |
| noise_count = np.random.choice([0,1,2], 1) | |
| noisy_terms = np.random.choice(noise, replace=False) | |
| """ | |
| simulated_profile = set() | |
| # Omissions/Inclusions | |
| # First pick percent overlap/intersection of gold standard profile | |
| if len(phenotypes) > 5: | |
| int_perc = np.random.choice( | |
| [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1], | |
| 1, | |
| p=[0.132, 0.317, 0.317, 0.150, 0.060, 0.012, 0.005, | |
| 0.003, 0.0015, 0.0015, 0.0010] | |
| )[0] | |
| else: | |
| # If the profile is 5 phenotypes or fewer, boost (avg = .25) | |
| int_perc = np.random.choice( | |
| [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1], | |
| 1, | |
| p=[0.4, 0.0, 0.2, 0.15, 0.05, 0.05, 0.05, 0.04, 0.03, 0.02, 0.01] | |
| )[0] | |
| int_count = 0 | |
| if int_perc == 0 or int_perc == 0.1: | |
| int_count = int(math.ceil(int_perc * len(phenotypes))) | |
| else: | |
| int_count = int(round(int_perc * len(phenotypes))) | |
| if int_count != 0: | |
| for pheno in np.random.choice(phenotypes, int_count, replace=False): | |
| simulated_profile.add(pheno) | |
| remaining_phenotypes = set(phenotypes) - simulated_profile | |
| if len(remaining_phenotypes) == 0: | |
| remaining_phenotypes = set(phenotypes) | |
| imp_num = int(round(abs(randn_skew_fast(1, 1.5, 8, 5)[0]))) | |
| added_nodes = 0 | |
| for pheno in np.random.choice(list(remaining_phenotypes), imp_num+1000, replace=True): | |
| distance = int(round(abs(np.random.normal(4.822, 2.329, 1)[0]))) | |
| node = random_weighted_walk(pheno, graph, distance) | |
| if node not in simulated_profile: | |
| simulated_profile.add(node) | |
| added_nodes += 1 | |
| if added_nodes == imp_num: | |
| break | |
| noise_count = np.random.choice([0, 1, 2], 1)[0] | |
| noisy_terms = np.random.choice(_get_noisy_phenotypes(), noise_count, replace=False) | |
| for noise in noisy_terms: | |
| simulated_profile.add(noise) | |
| return frozenset(simulated_profile) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment