Skip to content

Instantly share code, notes, and snippets.

@blackwithwhite666
Forked from xfguo/pysnmp_sim_agent.py
Last active December 12, 2015 00:08
Show Gist options
  • Select an option

  • Save blackwithwhite666/4680980 to your computer and use it in GitHub Desktop.

Select an option

Save blackwithwhite666/4680980 to your computer and use it in GitHub Desktop.
from __future__ import absolute_import
import time
import bisect
import select
from threading import Thread
from pyasn1.codec.ber import encoder, decoder
from pysnmp.proto import api
from pysnmp.carrier.asynsock.dispatch import AsynsockDispatcher
from pysnmp.carrier.asynsock.dgram import udp
class Instr(object):
"""Abstract MIB instruction."""
isColumn = False
@property
def name(self):
raise NotImplementedError()
def __cmp__(self, other):
if len(other) > len(self.name) and self.name == other[:len(self.name)]:
return 1
elif self.isColumn and other == self.name:
return 1
else:
return cmp(self.name, other)
def execute(self, module, *args, **kwargs):
raise NotImplementedError()
def __call__(self, protoVer, *args, **kwargs):
return self.execute(api.protoModules[protoVer], *args, **kwargs)
def get_next_oid(self, curr_oid):
return self.name
class SysDescr(Instr):
name = (1, 3, 6, 1, 2, 1, 1, 1, 0)
def execute(self, module, oid=None):
return module.OctetString(
'PySNMP example command responder at %s' % __file__
)
class Uptime(Instr):
name = (1, 3, 6, 1, 2, 1, 1, 3, 0)
birthday = time.time()
def execute(self, module, oid=None):
return module.TimeTicks(
(time.time() - self.birthday) * 100
)
class SysORIndex(Instr):
name = (1, 3, 6, 1, 2, 1, 1, 9, 1, 1)
isColumn = True
def execute(self, module, oid=None):
if oid and len(oid) == (len(self.name) + 1) and oid[-1] in range(1, 4):
return module.Integer(oid[-1])
else:
return None
def get_next_oid(self, curr_oid=None):
if curr_oid == self.name:
return self.name + (1,)
elif len(curr_oid) == len(self.name) + 1 and curr_oid[:len(self.name)] == self.name:
if curr_oid[-1] < 3:
return self.name + (curr_oid[-1] + 1,)
else:
return None
else:
return self.name + (1,)
class SysORDescr(Instr):
name = (1, 3, 6, 1, 2, 1, 1, 9, 1, 3)
isColumn = True
def execute(self, module, oid=None):
if oid and len(oid) == (len(self.name) + 1) and oid[-1] in range(1, 4):
return module.OctetString("OR-" + str(oid[-1]))
else:
return None
def get_next_oid(self, curr_oid=None):
if curr_oid == self.name:
return self.name + (1,)
elif len(curr_oid) == len(self.name) + 1 and curr_oid[:len(self.name)] == self.name:
if curr_oid[-1] < 3:
return self.name + (curr_oid[-1] + 1,)
else:
return None
else:
return self.name + (1,)
def test_instr_compare():
instr_list = [SysDescr(), Uptime(), SysORIndex(), SysORDescr()]
assert bisect.bisect(instr_list, (1, 3, 6, 1, 2, 1, 1, 1)) is 0
assert bisect.bisect(instr_list, (1, 3, 6, 1, 2, 1, 1, 1, 0)) is 1
assert bisect.bisect(instr_list, (1, 3, 6, 1, 2, 1, 1, 3)) is 1
assert bisect.bisect(instr_list, (1, 3, 6, 1, 2, 1, 1, 3, 0)) is 2
assert bisect.bisect(instr_list, (1, 3, 6, 1, 2, 1, 1, 9, 1, 1)) is 2
assert bisect.bisect(instr_list, (1, 3, 6, 1, 2, 1, 1, 9, 1, 1, 1)) is 2
assert bisect.bisect(instr_list, (1, 3, 6, 1, 2, 1, 1, 9, 1, 3)) is 3
assert bisect.bisect(instr_list, (1, 3, 6, 1, 2, 1, 1, 9, 1, 3, 1)) is 3
assert bisect.bisect(instr_list, (1, 3, 6, 1, 2, 1, 1, 9, 1, 4)) is 4
assert bisect.bisect(instr_list, (1, 3, 6, 1, 2, 1, 1, 9, 1, 4, 1)) is 4
class Agent(object):
def __init__(self, host, port):
self._mibInstr = []
self._mibInstrIdx = {}
self._transportDispatcher = AsynsockDispatcher()
self.host = host
self.port = port
def prepare(self):
address = (self.host, self.port)
transportDispatcher = self._transportDispatcher
transportDispatcher.registerTransport(
udp.domainName, udp.UdpSocketTransport().openServerMode(address)
)
transportDispatcher.registerRecvCbFun(self.cbFun)
transportDispatcher.jobStarted(1)
def start(self):
try:
self._transportDispatcher.runDispatcher()
except select.error:
pass
def stop(self):
self._transportDispatcher.closeDispatcher()
def registerInstr(self, instr):
assert callable(instr)
mibInstr = self._mibInstr
mibInstr.insert(bisect.bisect(mibInstr, instr.name), instr)
self._mibInstrIdx[instr.name] = instr
def getNext(self, msgVer, oid, val):
"""Search next OID to report."""
mibInstr = self._mibInstr
nextIdx = bisect.bisect(mibInstr, oid)
while nextIdx < len(mibInstr):
nextOid = mibInstr[nextIdx].get_next_oid(oid)
if nextOid is None:
nextIdx += 1
else:
return (nextOid, mibInstr[nextIdx](msgVer, nextOid))
def getOne(self, msgVer, oid, val):
"""Search value for given OID."""
mibInstrIdx = self._mibInstrIdx
if oid in mibInstrIdx:
return (oid, mibInstrIdx[oid](msgVer))
else:
mibInstr = self._mibInstr
nextIdx = bisect.bisect(mibInstr, oid)
if nextIdx < len(mibInstr):
var = mibInstr[nextIdx](msgVer, oid)
if var is not None:
return (oid, var)
def cbFun(self, transportDispatcher, transportDomain, transportAddress, wholeMsg):
while wholeMsg:
msgVer = api.decodeMessageVersion(wholeMsg)
if msgVer in api.protoModules:
pMod = api.protoModules[msgVer]
else:
return
reqMsg, wholeMsg = decoder.decode(
wholeMsg, asn1Spec=pMod.Message(),
)
rspMsg = pMod.apiMessage.getResponse(reqMsg)
rspPDU = pMod.apiMessage.getPDU(rspMsg)
reqPDU = pMod.apiMessage.getPDU(reqMsg)
varBinds = []
pendingErrors = []
if reqPDU.isSameTypeWith(pMod.GetNextRequestPDU()):
# Process GET NEXT request here
for errorIndex, (oid, val) in enumerate(pMod.apiPDU.getVarBinds(reqPDU)):
varBind = self.getNext(msgVer, oid, val)
if varBind is None:
varBinds.append((oid, None))
pendingErrors.append(
(pMod.apiPDU.setEndOfMibError, errorIndex)
)
else:
varBinds.append(varBind)
elif reqPDU.isSameTypeWith(pMod.GetRequestPDU()):
# Process GET request here
for errorIndex, (oid, val) in enumerate(pMod.apiPDU.getVarBinds(reqPDU)):
varBind = self.getOne(msgVer, oid, val)
if varBind is None:
varBinds.append((oid, None))
pendingErrors.append(
(pMod.apiPDU.setNoSuchInstanceError, errorIndex)
)
else:
varBinds.append(varBind)
else:
# Report unsupported request type
pMod.apiPDU.setErrorStatus(rspPDU, 'genErr')
pMod.apiPDU.setVarBinds(rspPDU, varBinds)
# Commit possible error indices to response PDU
for fn, errorIndex in pendingErrors:
fn(rspPDU, errorIndex)
transportDispatcher.sendMessage(
encoder.encode(rspMsg), transportDomain, transportAddress
)
return wholeMsg
class BackgroundAgent(Agent):
Container = Thread
def __init__(self, host, port):
super(BackgroundAgent, self).__init__(host, port)
self.container = None
def start(self):
assert self.container is None
self.prepare()
container = self.container = self.Container(
target=super(BackgroundAgent, self).start
)
container.daemon = True
container.start()
def stop(self):
assert self.container is not None
super(BackgroundAgent, self).stop()
self.container.join()
self.container = None
if __name__ == "__main__":
import time
agent = BackgroundAgent("0.0.0.0", 10161)
agent.registerInstr(SysDescr())
agent.registerInstr(Uptime())
agent.registerInstr(SysORIndex())
agent.registerInstr(SysORDescr())
agent.start()
while True:
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment