Skip to content

Instantly share code, notes, and snippets.

@difranco
Last active May 29, 2018 09:27
Show Gist options
  • Select an option

  • Save difranco/bf5b58603a9f8fa0e8585a6279017edc to your computer and use it in GitHub Desktop.

Select an option

Save difranco/bf5b58603a9f8fa0e8585a6279017edc to your computer and use it in GitHub Desktop.
Petri Net parser and simulator
from lark import Lark, Transformer
grammar = r"""
%import common.WORD
%import common.INT
%import common.WS
%ignore WS
_arrow : "->" | "→"
?name : WORD
place : "(" name ")"
transition : "[" name "]"
arc : name _arrow name [INT]
mark : name _arrow INT
marking : "{" mark ("," mark)* "}"
_bang : "!" | "fire"
fire : _bang name
net : ( place | transition | arc | marking | fire)*
"""
parser = Lark(grammar, start = "net")
test = r"""
(stop)
[change]
(go)
go → change
change → stop 1
{stop → 1, go → 1}
! change
fire change
"""
tree = parser.parse(test)
print(tree.pretty())
from collections import defaultdict
places = set()
transitions = set()
arcs = dict()
marks = defaultdict(int)
import sqlite3
conn = sqlite3.connect(':memory:')
cur = conn.cursor()
cur.execute('CREATE TABLE places (name text)')
cur.execute('CREATE TABLE transitions (name text)')
cur.execute('CREATE TABLE arcs (source text, target text, weight int)')
cur.execute('CREATE TABLE marks (name text, weight int DEFAULT 0)')
# there should be foreign key constraints among these to assure integrity
# typed marks would have a reference to another table with tuples of the mark type
# instead of a mere weight
cur.execute('''CREATE TRIGGER default_marks INSERT ON places
BEGIN
INSERT INTO marks VALUES (NEW.name, 0);
END;
'''
)
def put_place(name):
cur.execute("INSERT INTO places (name) VALUES ('{name}')".format(name=name))
def get_places():
r = set()
for row in cur.execute('SELECT name FROM places'):
r.add(row[0])
return r
def put_transition(name):
query = "INSERT INTO transitions (name) VALUES ('{name}')".format(name=name)
cur.execute(query)
def get_transitions():
r = set()
for row in cur.execute('SELECT name FROM transitions'):
r.add(row[0])
return r
def put_arc(source, target, weight):
cur.execute("INSERT INTO arcs (source, target, weight) VALUES ('{s}', '{t}', '{w}')".format(s=source, t=target, w=weight))
def get_arcs():
r = {} # returning a dict of (source, target) → weight
for row in cur.execute('SELECT source, target, weight FROM arcs'):
source, target, weight = row[0], row[1], row[2]
r[(source, target)] = weight
return r
def put_mark(place, weight):
cur.execute("INSERT INTO marks (name, weight) VALUES ('{p}', {w})".format(p=place, w=weight))
def get_mark(name):
return cur.execute("SELECT weight FROM marks WHERE name = '{n}'".format(n=name))[0]
def get_marks():
r = {} # returning a dict of (source, target) → weight
for row in cur.execute('SELECT name, weight FROM marks'):
r[row[0]] = row[1]
return r
def inputs(name):
r = []
for row in cur.execute("SELECT source FROM arcs WHERE target = '{n}'".format(n=name)):
r.append(row[0])
return r
def outputs(name):
r = []
for row in cur.execute("SELECT target FROM arcs WHERE source = '{n}'".format(n=name)):
r.append(row[0])
return r
def enabled(name):
query = "SELECT target FROM arcs, marks WHERE source = marks.name AND arcs.weight >= marks.weight AND marks.name = '{n}'".format(n=name)
all = [row[0] for row in cur.execute(query)]
return name in all
import logging
def fire(name):
if enabled(name):
ins = inputs(name)
outs = outputs(name)
for i in ins:
query = "UPDATE marks, arcs SET marks.weight = marks.weight - arcs.weight WHERE marks.name = '{i}' AND arcs.source = marks.name".format(i=i)
cur.execute(query)
for o in outs:
query = "UPDATE marks, arcs SET marks.weight = marks.weight + arcs.weight WHERE marks.name = '{o}' AND arcs.target = marks.name".format(o=o)
cur.execute(query)
else:
print("Cannot fire transition '"+name+"'")
class NetTransformer(Transformer):
def place(self, matches):
put_place(matches[0][0:])
def transition(self, matches):
put_transition(matches[0][0:])
def arc(self, matches):
left = matches[0][0:]
right = matches[1][0:]
weight = int(matches[2]) if (len(matches) == 3 and matches[2].isnumeric()) else 1
places = get_places()
transitions = get_transitions()
if (left in places and right in transitions) or (left in transitions and right in places):
put_arc(left, right, weight)
def mark(self, matches):
put_mark(matches[0][0:], int(matches[1]))
def fire(self, matches):
name = matches[0][0:]
fire(name)
new_tree = NetTransformer().transform(tree)
print(get_places())
print(get_transitions())
print(get_arcs())
print(get_marks())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment