Created
January 2, 2019 13:37
-
-
Save whong92/d0df440fbad880bc0221488eeedbb89f to your computer and use it in GitHub Desktop.
A short script of tools used to calculate lamport/vector clock timestamps for different events in a simulated distributed process
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import numpy as np | |
| class node: | |
| """ | |
| Node class to model the events as nodes in a graph. | |
| Have parent and child pointers, a node id (unique integer) and a process id | |
| Has a timestamp (ts) attribute which can be set by the BFS | |
| """ | |
| def __init__(self): | |
| self.id = None | |
| self.type_name = None | |
| self.parents = [] | |
| self.childs = [] | |
| self.ts = None | |
| self.proc_id = None | |
| def add_parent(self, parent): | |
| if not parent in self.parents: | |
| self.parents.append(parent) | |
| def add_child(self, child): | |
| if not child in self.childs: | |
| self.childs.append(child) | |
| def set_id(self, id): | |
| self.id = id | |
| def set_ts(self, ts): | |
| self.ts = ts | |
| def set_proc_id(self, proc_id): | |
| self.proc_id = proc_id | |
| def get_parents(self): | |
| return self.parents | |
| def get_childs(self): | |
| return self.childs | |
| def get_name(self): | |
| return None | |
| def get_id(self): | |
| return self.id | |
| def get_ts(self): | |
| return self.ts | |
| def get_proc_id(self): | |
| return self.proc_id | |
| def __str__(self): | |
| return self.get_name() | |
| class PI(node): | |
| """ | |
| Process Instruction event | |
| """ | |
| def __init__(self): | |
| super(PI, self).__init__() | |
| self.type_name = 'PI' | |
| def get_name(self): | |
| return self.type_name #+ '_' + str(self.proc_id) + '_' + str(self.id) # for debugging | |
| class MR(node): | |
| """ | |
| Message receive event, must have a single message send event as a child | |
| """ | |
| def __init__(self, mid): | |
| super(MR, self).__init__() | |
| self.mid = mid | |
| self.type_name = 'MR' | |
| def get_name(self): | |
| return self.type_name + '_' + str(self.mid) | |
| class MS(node): | |
| """ | |
| Message send event, must have a single message receiv event as a child | |
| """ | |
| def __init__(self, mid): | |
| super(MS, self).__init__() | |
| self.mid = mid | |
| self.type_name = 'MS' | |
| def get_name(self): | |
| return self.type_name + '_' + str(self.mid) | |
| def BFS_lamport(S, stamped): | |
| """ | |
| BFS to calculate a topological sort of dependent events starting with an event S with no parents, | |
| and calculate timestamps, using lamport timestamps | |
| """ | |
| # first event S with no parents has lamport timestamp = 1 | |
| stamped[S.get_id()] = True | |
| S.set_ts(1) | |
| q = [S] | |
| # Use BFS to compute topsort of dependent events and place in a queue only when all parents have | |
| # timestamps, then calculate timestamps based on parents | |
| while len(q) > 0: | |
| U = q.pop(0) | |
| for V in U.get_childs(): | |
| if stamped[V.get_id()]: | |
| raise Exception('node was stamped before being reached by all parents!') | |
| parent_ts = [] | |
| is_det = True | |
| for W in V.get_parents(): | |
| if W.get_ts() is None: | |
| is_det = False | |
| break | |
| parent_ts.append(W.get_ts()) | |
| if is_det: | |
| stamped[V.get_id()] = True | |
| V.set_ts(max(parent_ts)+1) | |
| q.append(V) | |
| def BFS_vector(S, stamped, num_proc): | |
| """ | |
| BFS to calculate a topological sort of dependent events starting with an event S with no parents, | |
| and calculate timestamps, using vector clocks | |
| """ | |
| def set_TS(N, num_proc): | |
| """ | |
| Calculate vector timestamp of node N based on its parents, given all parents resolved | |
| """ | |
| par = N.get_parents() | |
| TS = None | |
| if len(par)==0: | |
| TS = np.zeros(shape=(num_proc,), dtype=int) | |
| else: | |
| if (type(N) is PI) or (type(N) is MS): | |
| assert len(par)==1, 'Process Instructions or Message Sends can have at most 1 parent' | |
| TS = par[0].get_ts().copy() | |
| else: | |
| assert len(par)<=2, 'Message Received can have at most 2 parents!' | |
| TS = np.max([p.get_ts() for p in par] ,axis=0) | |
| TS[N.get_proc_id()] += 1 | |
| N.set_ts(TS) | |
| # First event S | |
| stamped[S.get_id()] = True | |
| set_TS(S, num_proc) | |
| q = [S] | |
| # Use BFS to compute topsort of dependent events and place in a queue only when all parents have | |
| # timestamps, then calculate timestamps based on parents | |
| while len(q) > 0: | |
| U = q.pop(0) | |
| for V in U.get_childs(): | |
| if stamped[V.get_id()]: | |
| raise Exception('node was stamped before being reached by all parents!') | |
| parent_ts = [] | |
| is_det = True | |
| for W in V.get_parents(): | |
| if W.get_ts() is None: | |
| is_det = False | |
| break | |
| parent_ts.append(W.get_ts()) | |
| if is_det: | |
| stamped[V.get_id()] = True | |
| set_TS(V, num_proc) | |
| q.append(V) | |
| def is_concur(E1, E2): | |
| E1_ts = E1.get_ts() | |
| E2_ts = E2.get_ts() | |
| if (np.any(np.less(E1_ts, E2_ts))) and (np.any(np.less(E2_ts, E1_ts))): | |
| return True | |
| return False | |
| class distProc: | |
| """ | |
| Class representing a distributed process with multiple concurrent processes communicating with each other | |
| via message passing. Has a 'clock' argument which can take on either 'lamport or 'vector', and a procs argument | |
| which is a list of lists. | |
| The procs argument is a list of P lists for P processes. | |
| Each list is a list of events which can be one of PI, MR or MS. | |
| """ | |
| def __init__(self, clock='lamport', procs=None): | |
| assert clock in ['lamport', 'vector'], 'clock must be one of vector clock or lamport!' | |
| self.clock = clock | |
| self.procs = procs | |
| self.stamped = {} | |
| id = 0 | |
| for ip, p in enumerate(self.procs): | |
| for ie, e in enumerate(p): | |
| e.set_proc_id(ip) | |
| e.set_id(id) | |
| self.stamped[id] = False | |
| id += 1 | |
| if ie == 0: | |
| continue | |
| f = p[ie-1] | |
| e.add_parent(f) | |
| f.add_child(e) | |
| self.is_solved = False | |
| def solve(self): | |
| """ | |
| Calculate the timestamps | |
| """ | |
| if self.is_solved: | |
| return | |
| for proc in self.procs: | |
| if (proc[0].get_ts() is None) and (len(proc[0].get_parents())==0) : | |
| if self.clock=='vector': | |
| BFS_vector(proc[0], self.stamped, len(self.procs)) | |
| else: | |
| BFS_lamport(proc[0], self.stamped) | |
| self.is_solved = True | |
| def __str__(self): | |
| """ | |
| Return a string representation of the distProc. | |
| This is P lines (one for each process), each line will show the process type ('PI' for process instructions | |
| , 'MS_i' for send event for message i, and 'MR_i' for receive event of message i) with its calcuated timestamp. | |
| """ | |
| the_str = '' | |
| for proc in self.procs: | |
| for e in proc: | |
| the_str += e.__str__() + '(' + e.get_ts().__str__() + ') ' | |
| the_str += '\n' | |
| return the_str | |
| def make_messages(N): | |
| """ | |
| Use this function to create and appropriately link N message send and message receive events | |
| returns a list of N message received events (MRs), corresponding element-wise to the list of | |
| N message send events (MSs) | |
| So MRs[4] is a child event of MSs[4] for example, for N>4 | |
| """ | |
| MRs = {} | |
| MSs = {} | |
| for i in range(N): | |
| MRs[i] = MR(i) | |
| MSs[i] = MS(i) | |
| MRs[i].add_parent(MSs[i]) | |
| MSs[i].add_child(MRs[i]) | |
| return MRs, MSs | |
| if __name__=="__main__": | |
| """ | |
| Here we calculate the lamport and vector timestamps for a distributed process consisting of 3 concurrent processes, | |
| and 4 messages in total, and print the result | |
| """ | |
| # Make messages to calculate timestamps | |
| MRs, MSs = make_messages(4) | |
| dp_lamport = distProc( | |
| clock='lamport', | |
| procs=[ | |
| [PI(),MSs[0],PI(), MRs[1], MSs[2]], | |
| [MRs[3],MRs[0], MSs[1]], | |
| [MSs[3],PI(), MRs[2]] | |
| ] | |
| ) | |
| dp_lamport.solve() | |
| print('Lamport timestamped: ') | |
| print(dp_lamport) | |
| # Make new message events since old ones have had timestamps calculated already | |
| MRs, MSs = make_messages(4) | |
| dp_vector = distProc( | |
| clock='vector', | |
| procs=[ | |
| [PI(),MSs[0],PI(), MRs[1], MSs[2]], | |
| [MRs[3],MRs[0], MSs[1]], | |
| [MSs[3],PI(), MRs[2]] | |
| ] | |
| ) | |
| dp_vector.solve() | |
| print('Vector timestamped: ') | |
| print(dp_vector) | |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The
distProcclass is used simulate a distributed process with 3 kinds of events (process instructionPI, message sendMS, and message receiveMR). It takes in two arguments:clockis one of'vector'or'lamport', which is the timestamping system used for the distributed process.To simulate events with
Pconcurrent processes,distProccan be given aprocsargument, which is a list ofPlists.Each list is
procsis a list of events. This is one of the following:PI()- Process instruction. Constructor takes no argumentsMS(mid)- Message send event. Constructor takes in a message identifier (mid). This event must be linked to a message received event of the samemid.MR(mid)- Message receive event. Constructor takes in a message identifier (mid). This event must be linked to a message received event of the samemid.We will use the following example to illustrate the use of the simulator. This example is taken from the Coursera Cloud Computing Concepts course (Part 1), Lecture 2.4 of week 4.

The events are labelled A-K, and are represented as follows:
PIMS(0), G: message 0 receiveMR(0)MS(1), D: message 1 receiveMR(1)MS(2), K: message 2 receiveMR(2)MS(3), F: message 3 receiveMR(3)To simulate a distributed process with
Pconcurrent processes, andM=4messages passed between processes, we start by creating and linkingMMSandMRobjects. This is done via themake_messages(M)function. To create 4 messagesMRsis a list of 4 message received events ([MR(0), MR(1), MR(2), MR(3)]), andMSsis a list of 4 message sent events ([MS(0), MS(1), MS(2), MS(3)]).MRs[i]is the corresponding message received event forMSs[i], and they are linked together already.Now we are ready to define the distributed process as shown in the above diagram:
Now that the distributed process is defined, we can proceed to solve it by calling:
My strategy was to convert the distributed process into a graph, by representing events as nodes in the graph, and directed edges as two nodes being causally related. For the above process, the generated graph looks like:

Note that this is a directed acyclic graph (DAG) assuming all events must be causally related in a way that makes sense. I then use a variant of breadth-first-search (BFS) to topologically sort the events into a queue, and compute timestamps of the events of the queue bases on the already computed timestamps of its parents.
For the above example, the output for the lamport timestamp calculation is (the timestamp is shown in the parenthese next to the event name):

The vector clock calculation gives output:

The results of these calculations agree with the given answers in the lectures, as well as my own calculations by hand. I also used this tool to check my answers in the quiz about timestamps, given as part of the Coursera Cloud Computing Course.