Created
April 17, 2018 10:26
-
-
Save vog3lm/a7a1066299af4b1bd57eb0935048f5af to your computer and use it in GitHub Desktop.
playstation controller interface; depends on pygame, xboxdrv; listen on button and axis events
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import signal, os, sys, psc, logging | |
| # @id :: | |
| # @job :: | |
| # @res :: | |
| class Engine(psc.PscIocJob): | |
| def __init__(self): | |
| psc.PscIocJob.__init__(self) | |
| def execute(self,key,value): | |
| if(psc.PscJobKeys.R1 == key): | |
| if(1 == value): | |
| print "right backwards go" | |
| else: | |
| print "right stop" | |
| elif(psc.PscJobKeys.R2 == key): | |
| if(1 == value): | |
| print "right forwards go" | |
| else: | |
| print "right stop" | |
| elif(psc.PscJobKeys.L1 == key): | |
| if(1 == value): | |
| print "left backwards go" | |
| else: | |
| print "right stop" | |
| elif(psc.PscJobKeys.L2 == key): | |
| if(1 == value): | |
| print "left forwards go" | |
| else: | |
| print "left stop" | |
| else: | |
| print "full stop" | |
| # @id :: | |
| # @job :: | |
| # @res :: | |
| class Launcher(psc.PscIocJob): | |
| def __init__(self): | |
| psc.PscIocJob.__init__(self) | |
| def execute(self,key,value): | |
| if(psc.PscJobKeys.RT == key): | |
| print "launch" | |
| if(psc.PscJobKeys.RTX == key): | |
| if(0 < value): | |
| print "Launcher x right" | |
| elif(0 > value): | |
| print "Launcher x left" | |
| else: | |
| print "Launcher x stop" | |
| if(psc.PscJobKeys.RTY == key): | |
| if(0 < value): | |
| print "Launcher y down" | |
| elif(0 > value): | |
| print "Launcher y up" | |
| else: | |
| print "Launcher y stop" | |
| # @id :: | |
| # @job :: | |
| # @res :: | |
| class Gimbal(psc.PscIocJob): | |
| def __init__(self): | |
| psc.PscIocJob.__init__(self) | |
| def execute(self,key,value): | |
| if(psc.PscJobKeys.LT == key): | |
| print "snapshot" | |
| if(psc.PscJobKeys.LTX == key): | |
| if(0 < value): | |
| print "gimbal x right" | |
| elif(0 > value): | |
| print "gimbal x left" | |
| else: | |
| print "gimbal x stop" | |
| if(psc.PscJobKeys.LTY == key): | |
| if(0 < value): | |
| print "gimbal y down" | |
| elif(0 > value): | |
| print "gimbal y up" | |
| else: | |
| print "gimbal y stop" | |
| # @id :: main | |
| # @job :: start application | |
| # @res :: running application | |
| if __name__ == '__main__': | |
| signal.signal(signal.SIGINT, signal.SIG_DFL) | |
| os.chdir(os.path.dirname(os.path.abspath(__file__))) | |
| exit = 1 | |
| logger = logging.getLogger() | |
| if(logger.handlers): | |
| logger.handlers.pop() | |
| formatter = logging.Formatter(fmt='%(message)s [%(module)s.%(funcName)s():%(lineno)d]', datefmt='%Y-%m-%d %H:%M:%S') | |
| handler = logging.StreamHandler() | |
| handler.setFormatter(formatter) | |
| logger.addHandler(handler) | |
| logger.setLevel(logging.ERROR) | |
| if('--debug' in sys.argv): | |
| logger.setLevel(logging.DEBUG) | |
| try: | |
| launcher = Launcher() | |
| gimbal = Gimbal() | |
| engine = Engine() | |
| jobs = {psc.PscJobKeys.R1:engine,psc.PscJobKeys.R2:engine,psc.PscJobKeys.L1:engine,psc.PscJobKeys.L2:engine | |
| ,psc.PscJobKeys.RTX:launcher,psc.PscJobKeys.RTY:launcher,psc.PscJobKeys.RT:launcher | |
| ,psc.PscJobKeys.LTX:gimbal,psc.PscJobKeys.LTY:gimbal,psc.PscJobKeys.LT:gimbal} | |
| exit = psc.Psc4p1Objects().arguments(jobs).start() | |
| except KeyboardInterrupt as e: | |
| logging.debug('system exit keyboard interrupt %s'%str(e)) | |
| except Exception as e: | |
| import traceback | |
| logging.error(str(traceback.format_exc())) | |
| logging.error(str(e)) | |
| sys.exit(exit) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pygame, logging, os, threading, time | |
| from pygame.locals import * | |
| # @id :: nullstate.pattern | |
| # @job :: | |
| # @get :: id | |
| # @res :: | |
| class Pattern(object): | |
| def __init__(self): | |
| self.ignore = 'ignore' | |
| self.fail = 'unset' | |
| self.errors = [] | |
| self.warnings = [] | |
| self.args = {'id':self.fail} | |
| def argument(self,key,value): | |
| if(key in self.args.keys() and not 'id' == key): | |
| self.args[key] = value | |
| return self | |
| def arguments(self,options): | |
| for k in options.keys(): | |
| self.argument(k,options[k]) | |
| return self | |
| def error(self): | |
| if(len(self.errors) == 0): return False | |
| else: return True | |
| def exception(self,E): | |
| if(len(self.errors) == 0): return self | |
| else: raise E(self.errors) | |
| def validate(self): | |
| self.errors = [] | |
| self.warnings = [] | |
| for k in self.args.keys(): | |
| if(self.fail == self.args[k] or None == self.args[k]): | |
| self.errors.append('%s argument Error! %s not Found!'%(self.args['id'],k)) | |
| if(self.ignore == self.args[k] or None == self.args[k]): | |
| self.warnings.append('%s argument Warning! %s is ignored!'%(self.args['id'],k)) | |
| return self | |
| # @id :: module | |
| # @sos :: | |
| # | |
| # L1 L2 R1 R2 | |
| # _=====_ _=====_ | |
| # / _____ \ / _____ \ | |
| # +.-'_____'-.---------------------------.-'_____'-.+ | |
| # / | | '. .' | | \ | |
| # / ___| UP |___ \ / ___| Y |___ \ | |
| # / | | ; ____ ____ ; | | ; | |
| # | | LEFT RIGHT | | |____| |____| | | X B | | | |
| # | |___ ___| ; SELECT START ; |___ ___| ; | |
| # |\ |DOWN | / _ _ \ | A | /| | |
| # | \ |_____| .','" "', ,'" "', '. |_____| .' | | |
| # | '-.______.-' / \ / \ '-._____.-' | | |
| # | | LT |------| RT | | | |
| # | /\ / \ /\ | | |
| # | / '.___.' '.___.' \ | | |
| # | / \ | | |
| # \ / --LTX-- --RTX-- \ / | |
| # \________/ Y Y \_________/ | |
| # | |
| # @eos :: | |
| # @id :: exception | |
| # @job :: | |
| # @get :: | |
| # @res :: | |
| class PscDeamonException(Exception): | |
| def __init__(self,errors): | |
| self.errors = errors | |
| def toString(self): | |
| return str(self.errors).encode('ascii') | |
| # @id :: stop | |
| # @job :: | |
| # @get :: | |
| # @res :: | |
| class PscEngineStop(Exception): | |
| pass | |
| # @id :: keys | |
| # @job :: generalize job key access | |
| # @get :: | |
| # @res :: | |
| class PscJobKeys(object): | |
| LTX = 'ltx' | |
| LTY = 'lty' | |
| RTX = 'rtx' | |
| RTY = 'rty' | |
| LT = 'lt' # 13 | |
| RT = 'rt' # 14 | |
| B = 'b' # 1 | |
| A = 'a' # 2 | |
| X = 'x' # 3 | |
| Y = 'y' # 4 | |
| START = 'start' # 11 | |
| SELECT= 'select' # 10 | |
| L1 = 'l1' # 6 | |
| L2 = 'l2' # 8 | |
| R1 = 'r1' # 7 | |
| R2 = 'r2' # 9 | |
| RIGHT = 'right' # 99 | |
| UP = 'up' # 98 | |
| LEFT = 'left' # 97 | |
| DOWN = 'down' # 96 | |
| HAT = 'hat' # 95 | |
| # @id :: ioc | |
| # @job :: generalize job logic | |
| # @res :: | |
| class PscIocJob(Pattern): | |
| def __init__(self): | |
| Pattern.__init__(self) | |
| self.args.update({'id':'ioc','psc':'ioc'}) | |
| def execute(self,key,value): | |
| logging.debug('not implemented key: %s val: %s'%(key,value)) | |
| # @id :: stop job | |
| # @job :: | |
| # @res :: | |
| class Stop(Pattern): | |
| def __init__(self): | |
| Pattern.__init__(self) | |
| self.args.update({'id':'stop','psc':'stop'}) | |
| def execute(self,key,value): | |
| logging.debug('psc stop.') | |
| raise PscEngineStop('stop') | |
| # @id :: linker | |
| # @job :: execute valide event jobs | |
| # @get :: jobs, exception | |
| # @res :: | |
| class PscJobCenter(Pattern): | |
| def __init__(self): | |
| Pattern.__init__(self) | |
| self.args.update({'id':'linker','jobs':self.fail,'oops':self.fail}) | |
| def create(self): | |
| self.validate() | |
| self.exception(self.args['oops']) | |
| self.jobs = self.args['jobs'] | |
| self.keys = self.jobs.keys() | |
| self.fail = self.args['oops'] | |
| def execute(self,key,value): | |
| if(key in self.keys): | |
| val = self.jobs[key].validate().exception(self.fail).execute(key,value) | |
| else: | |
| raise self.fail(['unknown job key %s'%key]) | |
| # @id :: engine | |
| # @job :: keep it processing | |
| # @get :: linker, exception | |
| # @res :: react on pygame joystick events and process them to linker | |
| class PscPygameEngine(Pattern): | |
| def __init__(self): | |
| Pattern.__init__(self) | |
| self.args.update({'id':'engine','timer':0.1,'joystick':0,'linker':self.fail,'oops':self.fail}) | |
| self.lock = threading.Event() | |
| self.true = True | |
| # pygame controller event ids | |
| self.axis = {0:PscJobKeys.LTX,1:PscJobKeys.LTY,2:PscJobKeys.RTX,3:PscJobKeys.RTY} | |
| self.btns = {0:PscJobKeys.A,1:PscJobKeys.B,3:PscJobKeys.X,4:PscJobKeys.Y | |
| #,2:PscJobKeys.LB,5:PscJobKeys.RB | |
| ,6:PscJobKeys.L1,7:PscJobKeys.R1,8:PscJobKeys.L2,9:PscJobKeys.R2 | |
| ,10:PscJobKeys.SELECT,11:PscJobKeys.START | |
| ,13:PscJobKeys.LT,14:PscJobKeys.RT | |
| ,99:PscJobKeys.RIGHT,98:PscJobKeys.UP,97:PscJobKeys.LEFT,96:PscJobKeys.DOWN,95:PscJobKeys.HAT} | |
| self.internal = {JOYAXISMOTION:self.axis,JOYHATMOTION:self.btns,JOYBUTTONUP:self.btns,JOYBUTTONDOWN:self.btns} # 7: 9: 10: 11: | |
| self.evaluate = {JOYAXISMOTION:self.axe,JOYHATMOTION:self.hat,JOYBUTTONUP:self.up,JOYBUTTONDOWN:self.down} # 7: 9: 10: 11: | |
| def create(self): | |
| self.validate() | |
| self.exception(self.args['oops']) | |
| # set SDL to use dummy NULL video driver | |
| # it doesn't need a windowing system. | |
| os.environ["SDL_VIDEODRIVER"] = "dummy" | |
| pygame.init() | |
| # create unused 1x1 pixel screen | |
| screen = pygame.display.set_mode((1, 1)) | |
| pygame.joystick.init() | |
| index = self.args['joystick'] | |
| if(pygame.joystick.get_count() <= index): | |
| raise self.args['oops'](['invalid joystick index %s'%index]) | |
| joy = pygame.joystick.Joystick(index) | |
| joy.init() | |
| return self | |
| def axe(self,event): | |
| return event.axis, event.value | |
| def hat(self,event): # right:(1,0) up:(0,1) left:(-1,0) down:(0.-1) clear:(0:0) | |
| value = 1 | |
| key = None | |
| if(cmp((1,0),event.value) == 0): | |
| key = 99 | |
| elif(cmp((-1,0),event.value) == 0): | |
| key = 97 | |
| elif(cmp((0,1),event.value) == 0): | |
| key = 98 | |
| elif(cmp((0,-1),event.value) == 0): | |
| key = 96 | |
| else: | |
| key = 95 | |
| value = 0 | |
| return key, value | |
| def up(self,event): | |
| return event.button, 0 | |
| def down(self,event): | |
| return event.button, 1 | |
| def execute(self): | |
| try: | |
| timer = int(self.args['timer']) | |
| self.validate().exception(self.args['oops']) | |
| evts = self.internal.keys() | |
| vals = self.evaluate.keys() | |
| link = self.args['linker'] | |
| while self.true: | |
| for evt in pygame.event.get(): | |
| if(evt.type in evts and evt.type in vals): | |
| key, value = self.evaluate[evt.type](evt) | |
| tmp = self.internal[evt.type] | |
| logging.debug('engine type %s key %s value %s'%(evt.type,key,value)) | |
| if(key in tmp.keys()): | |
| link.execute(tmp[key],value) | |
| time.sleep(timer) | |
| logging.info('Controller %s Flag Stop.'%(self.args['id'])) | |
| except PscEngineStop as e: | |
| logging.info('Controller %s Exception Stop.'%(self.args['id'])) | |
| return 0 | |
| # @id :: linker | |
| # @job :: | |
| # @res :: | |
| class Psc4p1Objects(object): | |
| def __init__(self): | |
| self.reset() | |
| self.linker = PscJobCenter() | |
| self.engine = PscPygameEngine() | |
| def arguments(self,injection): | |
| self.reset() | |
| for key in self.jobs.keys(): | |
| keys = injection.keys() | |
| if(key in keys): | |
| self.jobs[key] = injection[key] | |
| return self | |
| def reset(self): | |
| tmp = PscIocJob() | |
| self.jobs = {PscJobKeys.LTX:tmp,PscJobKeys.LTY:tmp,PscJobKeys.RTX:tmp,PscJobKeys.RTY:tmp,PscJobKeys.LT:tmp,PscJobKeys.RT:tmp | |
| ,PscJobKeys.L1:tmp,PscJobKeys.L2:tmp,PscJobKeys.R1:tmp,PscJobKeys.R2:tmp | |
| ,PscJobKeys.A:tmp,PscJobKeys.B:tmp,PscJobKeys.X:tmp,PscJobKeys.Y:tmp | |
| ,PscJobKeys.RIGHT:tmp,PscJobKeys.UP:tmp,PscJobKeys.LEFT:tmp,PscJobKeys.DOWN:tmp,PscJobKeys.HAT:tmp | |
| ,PscJobKeys.START:Stop(),PscJobKeys.SELECT:tmp} | |
| def start(self): | |
| try: | |
| self.linker.arguments({'jobs':self.jobs,'oops':PscDeamonException}).create() | |
| self.engine.arguments({'linker':self.linker,'oops':PscDeamonException}).create() | |
| return self.engine.execute() | |
| except PscDeamonException as e: | |
| logging.error(e.toString()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment