Skip to content

Instantly share code, notes, and snippets.

@vog3lm
Created April 17, 2018 10:26
Show Gist options
  • Select an option

  • Save vog3lm/a7a1066299af4b1bd57eb0935048f5af to your computer and use it in GitHub Desktop.

Select an option

Save vog3lm/a7a1066299af4b1bd57eb0935048f5af to your computer and use it in GitHub Desktop.
playstation controller interface; depends on pygame, xboxdrv; listen on button and axis events
import signal, os, sys, psc, logging
# @id ::
# @job ::
# @res ::
class Engine(psc.PscIocJob):
def __init__(self):
psc.PscIocJob.__init__(self)
def execute(self,key,value):
if(psc.PscJobKeys.R1 == key):
if(1 == value):
print "right backwards go"
else:
print "right stop"
elif(psc.PscJobKeys.R2 == key):
if(1 == value):
print "right forwards go"
else:
print "right stop"
elif(psc.PscJobKeys.L1 == key):
if(1 == value):
print "left backwards go"
else:
print "right stop"
elif(psc.PscJobKeys.L2 == key):
if(1 == value):
print "left forwards go"
else:
print "left stop"
else:
print "full stop"
# @id ::
# @job ::
# @res ::
class Launcher(psc.PscIocJob):
def __init__(self):
psc.PscIocJob.__init__(self)
def execute(self,key,value):
if(psc.PscJobKeys.RT == key):
print "launch"
if(psc.PscJobKeys.RTX == key):
if(0 < value):
print "Launcher x right"
elif(0 > value):
print "Launcher x left"
else:
print "Launcher x stop"
if(psc.PscJobKeys.RTY == key):
if(0 < value):
print "Launcher y down"
elif(0 > value):
print "Launcher y up"
else:
print "Launcher y stop"
# @id ::
# @job ::
# @res ::
class Gimbal(psc.PscIocJob):
def __init__(self):
psc.PscIocJob.__init__(self)
def execute(self,key,value):
if(psc.PscJobKeys.LT == key):
print "snapshot"
if(psc.PscJobKeys.LTX == key):
if(0 < value):
print "gimbal x right"
elif(0 > value):
print "gimbal x left"
else:
print "gimbal x stop"
if(psc.PscJobKeys.LTY == key):
if(0 < value):
print "gimbal y down"
elif(0 > value):
print "gimbal y up"
else:
print "gimbal y stop"
# @id :: main
# @job :: start application
# @res :: running application
if __name__ == '__main__':
signal.signal(signal.SIGINT, signal.SIG_DFL)
os.chdir(os.path.dirname(os.path.abspath(__file__)))
exit = 1
logger = logging.getLogger()
if(logger.handlers):
logger.handlers.pop()
formatter = logging.Formatter(fmt='%(message)s [%(module)s.%(funcName)s():%(lineno)d]', datefmt='%Y-%m-%d %H:%M:%S')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.ERROR)
if('--debug' in sys.argv):
logger.setLevel(logging.DEBUG)
try:
launcher = Launcher()
gimbal = Gimbal()
engine = Engine()
jobs = {psc.PscJobKeys.R1:engine,psc.PscJobKeys.R2:engine,psc.PscJobKeys.L1:engine,psc.PscJobKeys.L2:engine
,psc.PscJobKeys.RTX:launcher,psc.PscJobKeys.RTY:launcher,psc.PscJobKeys.RT:launcher
,psc.PscJobKeys.LTX:gimbal,psc.PscJobKeys.LTY:gimbal,psc.PscJobKeys.LT:gimbal}
exit = psc.Psc4p1Objects().arguments(jobs).start()
except KeyboardInterrupt as e:
logging.debug('system exit keyboard interrupt %s'%str(e))
except Exception as e:
import traceback
logging.error(str(traceback.format_exc()))
logging.error(str(e))
sys.exit(exit)
import pygame, logging, os, threading, time
from pygame.locals import *
# @id :: nullstate.pattern
# @job ::
# @get :: id
# @res ::
class Pattern(object):
def __init__(self):
self.ignore = 'ignore'
self.fail = 'unset'
self.errors = []
self.warnings = []
self.args = {'id':self.fail}
def argument(self,key,value):
if(key in self.args.keys() and not 'id' == key):
self.args[key] = value
return self
def arguments(self,options):
for k in options.keys():
self.argument(k,options[k])
return self
def error(self):
if(len(self.errors) == 0): return False
else: return True
def exception(self,E):
if(len(self.errors) == 0): return self
else: raise E(self.errors)
def validate(self):
self.errors = []
self.warnings = []
for k in self.args.keys():
if(self.fail == self.args[k] or None == self.args[k]):
self.errors.append('%s argument Error! %s not Found!'%(self.args['id'],k))
if(self.ignore == self.args[k] or None == self.args[k]):
self.warnings.append('%s argument Warning! %s is ignored!'%(self.args['id'],k))
return self
# @id :: module
# @sos ::
#
# L1 L2 R1 R2
# _=====_ _=====_
# / _____ \ / _____ \
# +.-'_____'-.---------------------------.-'_____'-.+
# / | | '. .' | | \
# / ___| UP |___ \ / ___| Y |___ \
# / | | ; ____ ____ ; | | ;
# | | LEFT RIGHT | | |____| |____| | | X B | |
# | |___ ___| ; SELECT START ; |___ ___| ;
# |\ |DOWN | / _ _ \ | A | /|
# | \ |_____| .','" "', ,'" "', '. |_____| .' |
# | '-.______.-' / \ / \ '-._____.-' |
# | | LT |------| RT | |
# | /\ / \ /\ |
# | / '.___.' '.___.' \ |
# | / \ |
# \ / --LTX-- --RTX-- \ /
# \________/ Y Y \_________/
#
# @eos ::
# @id :: exception
# @job ::
# @get ::
# @res ::
class PscDeamonException(Exception):
def __init__(self,errors):
self.errors = errors
def toString(self):
return str(self.errors).encode('ascii')
# @id :: stop
# @job ::
# @get ::
# @res ::
class PscEngineStop(Exception):
pass
# @id :: keys
# @job :: generalize job key access
# @get ::
# @res ::
class PscJobKeys(object):
LTX = 'ltx'
LTY = 'lty'
RTX = 'rtx'
RTY = 'rty'
LT = 'lt' # 13
RT = 'rt' # 14
B = 'b' # 1
A = 'a' # 2
X = 'x' # 3
Y = 'y' # 4
START = 'start' # 11
SELECT= 'select' # 10
L1 = 'l1' # 6
L2 = 'l2' # 8
R1 = 'r1' # 7
R2 = 'r2' # 9
RIGHT = 'right' # 99
UP = 'up' # 98
LEFT = 'left' # 97
DOWN = 'down' # 96
HAT = 'hat' # 95
# @id :: ioc
# @job :: generalize job logic
# @res ::
class PscIocJob(Pattern):
def __init__(self):
Pattern.__init__(self)
self.args.update({'id':'ioc','psc':'ioc'})
def execute(self,key,value):
logging.debug('not implemented key: %s val: %s'%(key,value))
# @id :: stop job
# @job ::
# @res ::
class Stop(Pattern):
def __init__(self):
Pattern.__init__(self)
self.args.update({'id':'stop','psc':'stop'})
def execute(self,key,value):
logging.debug('psc stop.')
raise PscEngineStop('stop')
# @id :: linker
# @job :: execute valide event jobs
# @get :: jobs, exception
# @res ::
class PscJobCenter(Pattern):
def __init__(self):
Pattern.__init__(self)
self.args.update({'id':'linker','jobs':self.fail,'oops':self.fail})
def create(self):
self.validate()
self.exception(self.args['oops'])
self.jobs = self.args['jobs']
self.keys = self.jobs.keys()
self.fail = self.args['oops']
def execute(self,key,value):
if(key in self.keys):
val = self.jobs[key].validate().exception(self.fail).execute(key,value)
else:
raise self.fail(['unknown job key %s'%key])
# @id :: engine
# @job :: keep it processing
# @get :: linker, exception
# @res :: react on pygame joystick events and process them to linker
class PscPygameEngine(Pattern):
def __init__(self):
Pattern.__init__(self)
self.args.update({'id':'engine','timer':0.1,'joystick':0,'linker':self.fail,'oops':self.fail})
self.lock = threading.Event()
self.true = True
# pygame controller event ids
self.axis = {0:PscJobKeys.LTX,1:PscJobKeys.LTY,2:PscJobKeys.RTX,3:PscJobKeys.RTY}
self.btns = {0:PscJobKeys.A,1:PscJobKeys.B,3:PscJobKeys.X,4:PscJobKeys.Y
#,2:PscJobKeys.LB,5:PscJobKeys.RB
,6:PscJobKeys.L1,7:PscJobKeys.R1,8:PscJobKeys.L2,9:PscJobKeys.R2
,10:PscJobKeys.SELECT,11:PscJobKeys.START
,13:PscJobKeys.LT,14:PscJobKeys.RT
,99:PscJobKeys.RIGHT,98:PscJobKeys.UP,97:PscJobKeys.LEFT,96:PscJobKeys.DOWN,95:PscJobKeys.HAT}
self.internal = {JOYAXISMOTION:self.axis,JOYHATMOTION:self.btns,JOYBUTTONUP:self.btns,JOYBUTTONDOWN:self.btns} # 7: 9: 10: 11:
self.evaluate = {JOYAXISMOTION:self.axe,JOYHATMOTION:self.hat,JOYBUTTONUP:self.up,JOYBUTTONDOWN:self.down} # 7: 9: 10: 11:
def create(self):
self.validate()
self.exception(self.args['oops'])
# set SDL to use dummy NULL video driver
# it doesn't need a windowing system.
os.environ["SDL_VIDEODRIVER"] = "dummy"
pygame.init()
# create unused 1x1 pixel screen
screen = pygame.display.set_mode((1, 1))
pygame.joystick.init()
index = self.args['joystick']
if(pygame.joystick.get_count() <= index):
raise self.args['oops'](['invalid joystick index %s'%index])
joy = pygame.joystick.Joystick(index)
joy.init()
return self
def axe(self,event):
return event.axis, event.value
def hat(self,event): # right:(1,0) up:(0,1) left:(-1,0) down:(0.-1) clear:(0:0)
value = 1
key = None
if(cmp((1,0),event.value) == 0):
key = 99
elif(cmp((-1,0),event.value) == 0):
key = 97
elif(cmp((0,1),event.value) == 0):
key = 98
elif(cmp((0,-1),event.value) == 0):
key = 96
else:
key = 95
value = 0
return key, value
def up(self,event):
return event.button, 0
def down(self,event):
return event.button, 1
def execute(self):
try:
timer = int(self.args['timer'])
self.validate().exception(self.args['oops'])
evts = self.internal.keys()
vals = self.evaluate.keys()
link = self.args['linker']
while self.true:
for evt in pygame.event.get():
if(evt.type in evts and evt.type in vals):
key, value = self.evaluate[evt.type](evt)
tmp = self.internal[evt.type]
logging.debug('engine type %s key %s value %s'%(evt.type,key,value))
if(key in tmp.keys()):
link.execute(tmp[key],value)
time.sleep(timer)
logging.info('Controller %s Flag Stop.'%(self.args['id']))
except PscEngineStop as e:
logging.info('Controller %s Exception Stop.'%(self.args['id']))
return 0
# @id :: linker
# @job ::
# @res ::
class Psc4p1Objects(object):
def __init__(self):
self.reset()
self.linker = PscJobCenter()
self.engine = PscPygameEngine()
def arguments(self,injection):
self.reset()
for key in self.jobs.keys():
keys = injection.keys()
if(key in keys):
self.jobs[key] = injection[key]
return self
def reset(self):
tmp = PscIocJob()
self.jobs = {PscJobKeys.LTX:tmp,PscJobKeys.LTY:tmp,PscJobKeys.RTX:tmp,PscJobKeys.RTY:tmp,PscJobKeys.LT:tmp,PscJobKeys.RT:tmp
,PscJobKeys.L1:tmp,PscJobKeys.L2:tmp,PscJobKeys.R1:tmp,PscJobKeys.R2:tmp
,PscJobKeys.A:tmp,PscJobKeys.B:tmp,PscJobKeys.X:tmp,PscJobKeys.Y:tmp
,PscJobKeys.RIGHT:tmp,PscJobKeys.UP:tmp,PscJobKeys.LEFT:tmp,PscJobKeys.DOWN:tmp,PscJobKeys.HAT:tmp
,PscJobKeys.START:Stop(),PscJobKeys.SELECT:tmp}
def start(self):
try:
self.linker.arguments({'jobs':self.jobs,'oops':PscDeamonException}).create()
self.engine.arguments({'linker':self.linker,'oops':PscDeamonException}).create()
return self.engine.execute()
except PscDeamonException as e:
logging.error(e.toString())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment