def random_weighted_walk( start: str, graph: Graph, end_distance: int, travelled_cache: Optional[Dict]=None, direction: str='outgoing', dist_walked=0) -> str: """ Traverse an ontology, ending up {distance} edges away from the starting node This has a hardcoded weighting for moving towards the root of the ontology Assumes that the starting node is a leaf or leafy, and therefore the first step is always over an outgoing subClassOf, ie towards the root travelled_cache A dictionary of nodes and their distance from the starting node """ # Type conversions if not isinstance(start, URIRef): if start.startswith('http'): start = URIRef(start) else: start = URIRef(expand_uri(start, strict=True)) if travelled_cache is None: travelled_cache = { str(start): 0 } if dist_walked == end_distance: return contract_uri(str(start), strict=True)[0] # Convert curie to IRI root = expand_uri("HP:0000118", strict=True) if direction == 'incoming': nodes = list(graph.subjects(RDFS['subClassOf'], start)) if len(nodes) == 0: # We're on a leaf node and can only go up, reverse course nodes = list(graph.objects(start, RDFS['subClassOf'])) elif direction == 'outgoing': nodes = list(graph.objects(start, RDFS['subClassOf'])) else: raise ValueError("direction can only be incoming or outgoing") # Pick a parent term randomly rand_node = np.random.choice(nodes, 1)[0] if root in [str(n) for n in graph.objects(start, RDFS['subClassOf'])]: return contract_uri(str(start), strict=True)[0] if str(rand_node) == root: raise ValueError("Should not have made it to the root") if str(rand_node) in travelled_cache: dist_walked = travelled_cache[str(rand_node)] else: dist_walked += 1 travelled_cache[str(rand_node)] = dist_walked # Pick a weighted random direction # hardcoded .3 incoming and .7 outgoing for now new_direction = np.random.choice(['incoming', 'outgoing'], 1, p=[.35, .65])[0] return random_weighted_walk( rand_node, graph, end_distance, travelled_cache, new_direction, dist_walked ) # Credit https://stackoverflow.com/a/36205932 def randn_skew_fast(N, alpha=0.0, loc=0.0, scale=1.0): sigma = alpha / np.sqrt(1.0 + alpha ** 2) u0 = np.random.randn(N) v = np.random.randn(N) u1 = (sigma * u0 + np.sqrt(1.0 - sigma ** 2) * v) * scale u1[u0 < 0] *= -1 u1 = u1 + loc return u1 def _get_noisy_phenotypes() -> List[str]: return [ 'HP:0011410', 'HP:0002020', 'HP:0001787', 'HP:0001622', 'HP:0002019', 'HP:0002015', 'HP:0001344', 'HP:0002099', 'HP:0002376', 'HP:0000739', 'HP:0002870', 'HP:0002194', 'HP:0001763', 'HP:0000218', 'HP:0002033', 'HP:0001773', 'HP:0000545', 'HP:0002643', 'HP:0001382', 'HP:0002315', 'HP:0000938', 'HP:0012450', 'HP:0012378', 'HP:0007018', 'HP:0000822', 'HP:0030363', 'HP:0002076', 'HP:0011471', 'HP:0001518', 'HP:0001513', 'HP:0000483', 'HP:0002360', 'HP:0001558', 'HP:0000322', 'HP:0000565', 'HP:0200055', 'HP:0000716', 'HP:0000577', 'HP:0000964', 'HP:0002829', 'HP:0002027', 'HP:0010862', 'HP:0001288' ] def simulate_from_gold(phenotypes: List[str], graph: Graph) -> FrozenSet[str]: """ Based on analysis of UDN patients Omissions: np.random.choice( [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1], p=[0.132, 0.317, 0.317, 0.150, 0.060, 0.012, 0.005, 0.003, 0.0015, 0.0015, 0.0010]) Round up for 0, .1, half to even otherwise Imprecision: Right normal skewed alpha: 1.5, loc (mean): 6, scale (std dev): 12 imp_num = round(abs(randn_skew_fast(1, 1.5, 6, 12))) imp_phenotypes = np.random.choice(phenotypes, imp_num, replace=True) Distance from node: round(abs(np.random.normal(4.822, 2.329, 1)[0])) Noise: noise_count = np.random.choice([0,1,2], 1) noisy_terms = np.random.choice(noise, replace=False) """ simulated_profile = set() # Omissions/Inclusions # First pick percent overlap/intersection of gold standard profile if len(phenotypes) > 5: int_perc = np.random.choice( [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1], 1, p=[0.132, 0.317, 0.317, 0.150, 0.060, 0.012, 0.005, 0.003, 0.0015, 0.0015, 0.0010] )[0] else: # If the profile is 5 phenotypes or fewer, boost (avg = .25) int_perc = np.random.choice( [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1], 1, p=[0.4, 0.0, 0.2, 0.15, 0.05, 0.05, 0.05, 0.04, 0.03, 0.02, 0.01] )[0] int_count = 0 if int_perc == 0 or int_perc == 0.1: int_count = int(math.ceil(int_perc * len(phenotypes))) else: int_count = int(round(int_perc * len(phenotypes))) if int_count != 0: for pheno in np.random.choice(phenotypes, int_count, replace=False): simulated_profile.add(pheno) remaining_phenotypes = set(phenotypes) - simulated_profile if len(remaining_phenotypes) == 0: remaining_phenotypes = set(phenotypes) imp_num = int(round(abs(randn_skew_fast(1, 1.5, 8, 5)[0]))) added_nodes = 0 for pheno in np.random.choice(list(remaining_phenotypes), imp_num+1000, replace=True): distance = int(round(abs(np.random.normal(4.822, 2.329, 1)[0]))) node = random_weighted_walk(pheno, graph, distance) if node not in simulated_profile: simulated_profile.add(node) added_nodes += 1 if added_nodes == imp_num: break noise_count = np.random.choice([0, 1, 2], 1)[0] noisy_terms = np.random.choice(_get_noisy_phenotypes(), noise_count, replace=False) for noise in noisy_terms: simulated_profile.add(noise) return frozenset(simulated_profile)