Last active
May 29, 2018 09:27
-
-
Save difranco/bf5b58603a9f8fa0e8585a6279017edc to your computer and use it in GitHub Desktop.
Petri Net parser and simulator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from lark import Lark, Transformer | |
| grammar = r""" | |
| %import common.WORD | |
| %import common.INT | |
| %import common.WS | |
| %ignore WS | |
| _arrow : "->" | "→" | |
| ?name : WORD | |
| place : "(" name ")" | |
| transition : "[" name "]" | |
| arc : name _arrow name [INT] | |
| mark : name _arrow INT | |
| marking : "{" mark ("," mark)* "}" | |
| _bang : "!" | "fire" | |
| fire : _bang name | |
| net : ( place | transition | arc | marking | fire)* | |
| """ | |
| parser = Lark(grammar, start = "net") | |
| test = r""" | |
| (stop) | |
| [change] | |
| (go) | |
| go → change | |
| change → stop 1 | |
| {stop → 1, go → 1} | |
| ! change | |
| fire change | |
| """ | |
| tree = parser.parse(test) | |
| print(tree.pretty()) | |
| from collections import defaultdict | |
| places = set() | |
| transitions = set() | |
| arcs = dict() | |
| marks = defaultdict(int) | |
| import sqlite3 | |
| conn = sqlite3.connect(':memory:') | |
| cur = conn.cursor() | |
| cur.execute('CREATE TABLE places (name text)') | |
| cur.execute('CREATE TABLE transitions (name text)') | |
| cur.execute('CREATE TABLE arcs (source text, target text, weight int)') | |
| cur.execute('CREATE TABLE marks (name text, weight int DEFAULT 0)') | |
| # there should be foreign key constraints among these to assure integrity | |
| # typed marks would have a reference to another table with tuples of the mark type | |
| # instead of a mere weight | |
| cur.execute('''CREATE TRIGGER default_marks INSERT ON places | |
| BEGIN | |
| INSERT INTO marks VALUES (NEW.name, 0); | |
| END; | |
| ''' | |
| ) | |
| def put_place(name): | |
| cur.execute("INSERT INTO places (name) VALUES ('{name}')".format(name=name)) | |
| def get_places(): | |
| r = set() | |
| for row in cur.execute('SELECT name FROM places'): | |
| r.add(row[0]) | |
| return r | |
| def put_transition(name): | |
| query = "INSERT INTO transitions (name) VALUES ('{name}')".format(name=name) | |
| cur.execute(query) | |
| def get_transitions(): | |
| r = set() | |
| for row in cur.execute('SELECT name FROM transitions'): | |
| r.add(row[0]) | |
| return r | |
| def put_arc(source, target, weight): | |
| cur.execute("INSERT INTO arcs (source, target, weight) VALUES ('{s}', '{t}', '{w}')".format(s=source, t=target, w=weight)) | |
| def get_arcs(): | |
| r = {} # returning a dict of (source, target) → weight | |
| for row in cur.execute('SELECT source, target, weight FROM arcs'): | |
| source, target, weight = row[0], row[1], row[2] | |
| r[(source, target)] = weight | |
| return r | |
| def put_mark(place, weight): | |
| cur.execute("INSERT INTO marks (name, weight) VALUES ('{p}', {w})".format(p=place, w=weight)) | |
| def get_mark(name): | |
| return cur.execute("SELECT weight FROM marks WHERE name = '{n}'".format(n=name))[0] | |
| def get_marks(): | |
| r = {} # returning a dict of (source, target) → weight | |
| for row in cur.execute('SELECT name, weight FROM marks'): | |
| r[row[0]] = row[1] | |
| return r | |
| def inputs(name): | |
| r = [] | |
| for row in cur.execute("SELECT source FROM arcs WHERE target = '{n}'".format(n=name)): | |
| r.append(row[0]) | |
| return r | |
| def outputs(name): | |
| r = [] | |
| for row in cur.execute("SELECT target FROM arcs WHERE source = '{n}'".format(n=name)): | |
| r.append(row[0]) | |
| return r | |
| def enabled(name): | |
| query = "SELECT target FROM arcs, marks WHERE source = marks.name AND arcs.weight >= marks.weight AND marks.name = '{n}'".format(n=name) | |
| all = [row[0] for row in cur.execute(query)] | |
| return name in all | |
| import logging | |
| def fire(name): | |
| if enabled(name): | |
| ins = inputs(name) | |
| outs = outputs(name) | |
| for i in ins: | |
| query = "UPDATE marks, arcs SET marks.weight = marks.weight - arcs.weight WHERE marks.name = '{i}' AND arcs.source = marks.name".format(i=i) | |
| cur.execute(query) | |
| for o in outs: | |
| query = "UPDATE marks, arcs SET marks.weight = marks.weight + arcs.weight WHERE marks.name = '{o}' AND arcs.target = marks.name".format(o=o) | |
| cur.execute(query) | |
| else: | |
| print("Cannot fire transition '"+name+"'") | |
| class NetTransformer(Transformer): | |
| def place(self, matches): | |
| put_place(matches[0][0:]) | |
| def transition(self, matches): | |
| put_transition(matches[0][0:]) | |
| def arc(self, matches): | |
| left = matches[0][0:] | |
| right = matches[1][0:] | |
| weight = int(matches[2]) if (len(matches) == 3 and matches[2].isnumeric()) else 1 | |
| places = get_places() | |
| transitions = get_transitions() | |
| if (left in places and right in transitions) or (left in transitions and right in places): | |
| put_arc(left, right, weight) | |
| def mark(self, matches): | |
| put_mark(matches[0][0:], int(matches[1])) | |
| def fire(self, matches): | |
| name = matches[0][0:] | |
| fire(name) | |
| new_tree = NetTransformer().transform(tree) | |
| print(get_places()) | |
| print(get_transitions()) | |
| print(get_arcs()) | |
| print(get_marks()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment