Skip to content

Instantly share code, notes, and snippets.

@kshefchek
Created December 10, 2019 01:15
Show Gist options
  • Select an option

  • Save kshefchek/94f1b9fb2bda98994819188ed54e0aca to your computer and use it in GitHub Desktop.

Select an option

Save kshefchek/94f1b9fb2bda98994819188ed54e0aca to your computer and use it in GitHub Desktop.
Synthetic patient code
def random_weighted_walk(
start: str,
graph: Graph,
end_distance: int,
travelled_cache: Optional[Dict]=None,
direction: str='outgoing',
dist_walked=0) -> str:
"""
Traverse an ontology, ending up {distance} edges away from the
starting node
This has a hardcoded weighting for moving towards the root of the ontology
Assumes that the starting node is a leaf or leafy, and therefore the
first step is always over an outgoing subClassOf, ie towards the root
travelled_cache
A dictionary of nodes and their distance from the starting node
"""
# Type conversions
if not isinstance(start, URIRef):
if start.startswith('http'):
start = URIRef(start)
else:
start = URIRef(expand_uri(start, strict=True))
if travelled_cache is None:
travelled_cache = {
str(start): 0
}
if dist_walked == end_distance:
return contract_uri(str(start), strict=True)[0]
# Convert curie to IRI
root = expand_uri("HP:0000118", strict=True)
if direction == 'incoming':
nodes = list(graph.subjects(RDFS['subClassOf'], start))
if len(nodes) == 0:
# We're on a leaf node and can only go up, reverse course
nodes = list(graph.objects(start, RDFS['subClassOf']))
elif direction == 'outgoing':
nodes = list(graph.objects(start, RDFS['subClassOf']))
else:
raise ValueError("direction can only be incoming or outgoing")
# Pick a parent term randomly
rand_node = np.random.choice(nodes, 1)[0]
if root in [str(n) for n in graph.objects(start, RDFS['subClassOf'])]:
return contract_uri(str(start), strict=True)[0]
if str(rand_node) == root:
raise ValueError("Should not have made it to the root")
if str(rand_node) in travelled_cache:
dist_walked = travelled_cache[str(rand_node)]
else:
dist_walked += 1
travelled_cache[str(rand_node)] = dist_walked
# Pick a weighted random direction
# hardcoded .3 incoming and .7 outgoing for now
new_direction = np.random.choice(['incoming', 'outgoing'], 1, p=[.35, .65])[0]
return random_weighted_walk(
rand_node, graph, end_distance, travelled_cache, new_direction, dist_walked
)
# Credit https://stackoverflow.com/a/36205932
def randn_skew_fast(N, alpha=0.0, loc=0.0, scale=1.0):
sigma = alpha / np.sqrt(1.0 + alpha ** 2)
u0 = np.random.randn(N)
v = np.random.randn(N)
u1 = (sigma * u0 + np.sqrt(1.0 - sigma ** 2) * v) * scale
u1[u0 < 0] *= -1
u1 = u1 + loc
return u1
def _get_noisy_phenotypes() -> List[str]:
return [
'HP:0011410',
'HP:0002020',
'HP:0001787',
'HP:0001622',
'HP:0002019',
'HP:0002015',
'HP:0001344',
'HP:0002099',
'HP:0002376',
'HP:0000739',
'HP:0002870',
'HP:0002194',
'HP:0001763',
'HP:0000218',
'HP:0002033',
'HP:0001773',
'HP:0000545',
'HP:0002643',
'HP:0001382',
'HP:0002315',
'HP:0000938',
'HP:0012450',
'HP:0012378',
'HP:0007018',
'HP:0000822',
'HP:0030363',
'HP:0002076',
'HP:0011471',
'HP:0001518',
'HP:0001513',
'HP:0000483',
'HP:0002360',
'HP:0001558',
'HP:0000322',
'HP:0000565',
'HP:0200055',
'HP:0000716',
'HP:0000577',
'HP:0000964',
'HP:0002829',
'HP:0002027',
'HP:0010862',
'HP:0001288'
]
def simulate_from_gold(phenotypes: List[str], graph: Graph) -> FrozenSet[str]:
"""
Based on analysis of UDN patients
Omissions:
np.random.choice(
[0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1],
p=[0.132, 0.317, 0.317, 0.150, 0.060, 0.012, 0.005,
0.003, 0.0015, 0.0015, 0.0010])
Round up for 0, .1, half to even otherwise
Imprecision:
Right normal skewed
alpha: 1.5, loc (mean): 6, scale (std dev): 12
imp_num = round(abs(randn_skew_fast(1, 1.5, 6, 12)))
imp_phenotypes = np.random.choice(phenotypes, imp_num, replace=True)
Distance from node:
round(abs(np.random.normal(4.822, 2.329, 1)[0]))
Noise:
noise_count = np.random.choice([0,1,2], 1)
noisy_terms = np.random.choice(noise, replace=False)
"""
simulated_profile = set()
# Omissions/Inclusions
# First pick percent overlap/intersection of gold standard profile
if len(phenotypes) > 5:
int_perc = np.random.choice(
[0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1],
1,
p=[0.132, 0.317, 0.317, 0.150, 0.060, 0.012, 0.005,
0.003, 0.0015, 0.0015, 0.0010]
)[0]
else:
# If the profile is 5 phenotypes or fewer, boost (avg = .25)
int_perc = np.random.choice(
[0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1],
1,
p=[0.4, 0.0, 0.2, 0.15, 0.05, 0.05, 0.05, 0.04, 0.03, 0.02, 0.01]
)[0]
int_count = 0
if int_perc == 0 or int_perc == 0.1:
int_count = int(math.ceil(int_perc * len(phenotypes)))
else:
int_count = int(round(int_perc * len(phenotypes)))
if int_count != 0:
for pheno in np.random.choice(phenotypes, int_count, replace=False):
simulated_profile.add(pheno)
remaining_phenotypes = set(phenotypes) - simulated_profile
if len(remaining_phenotypes) == 0:
remaining_phenotypes = set(phenotypes)
imp_num = int(round(abs(randn_skew_fast(1, 1.5, 8, 5)[0])))
added_nodes = 0
for pheno in np.random.choice(list(remaining_phenotypes), imp_num+1000, replace=True):
distance = int(round(abs(np.random.normal(4.822, 2.329, 1)[0])))
node = random_weighted_walk(pheno, graph, distance)
if node not in simulated_profile:
simulated_profile.add(node)
added_nodes += 1
if added_nodes == imp_num:
break
noise_count = np.random.choice([0, 1, 2], 1)[0]
noisy_terms = np.random.choice(_get_noisy_phenotypes(), noise_count, replace=False)
for noise in noisy_terms:
simulated_profile.add(noise)
return frozenset(simulated_profile)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment