From 3957ac536e8ed8d12e54c7e308802f1a591ef2b8 Mon Sep 17 00:00:00 2001 From: Freddie Akeroyd Date: Thu, 14 May 2026 01:07:22 +0100 Subject: [PATCH 1/2] Update to use new pysnmp --- src/server_common/snmpWalker.py | 71 ++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 28 deletions(-) diff --git a/src/server_common/snmpWalker.py b/src/server_common/snmpWalker.py index d083d59..58a973d 100644 --- a/src/server_common/snmpWalker.py +++ b/src/server_common/snmpWalker.py @@ -1,17 +1,27 @@ -from pysnmp.hlapi import * -from pysnmp.smi import builder, compiler, rfc1902, view +import asyncio + +from pysnmp.hlapi.v1arch.asyncio import ( + CommunityData, + ObjectIdentity, + ObjectType, + SnmpDispatcher, + UdpTransportTarget, + walk_cmd, +) +from pysnmp.smi import builder, compiler, error, rfc1902, view from server_common.utilities import SEVERITY, print_and_log # Assemble MIB browser -mibBuilder = builder.MibBuilder() -mibViewController = view.MibViewController(mibBuilder) +MIB_BUILDER = builder.MibBuilder() +MIB_VIEW_CONTROLLER = view.MibViewController(MIB_BUILDER) -# compiler.addMibCompiler(mibBuilder, sources=['https://mibs.pysnmp.com/asn1/@mib@'], destination='snmp/mibs') +# compiler.addMibCompiler(mibBuilder, sources=['https://mibs.pysnmp.com/asn1/@mib@'], +# destination='snmp/mibs') # The below code needs MIBS to have been downloaded in the below subfolder. -compiler.addMibCompiler(mibBuilder, sources=["file://snmp/mibs"]) +compiler.addMibCompiler(MIB_BUILDER, sources=["file://snmp/mibs"]) # Pre-load MIB modules we expect to work with -mibBuilder.loadModules( +MIB_BUILDER.load_modules( "SNMPv2-MIB", "SNMP-COMMUNITY-MIB", "DISMAN-EXPRESSION-MIB", "RFC1213-MIB", "IF-MIB" ) @@ -25,32 +35,32 @@ ] -def walk(host, oid, requestedMIBs=INTERESTING_MIBS): +async def walkasync( + host: str, oid: str, requested_mibs: list[str] = INTERESTING_MIBS +) -> dict[str, str]: mibmap = dict() - for errorIndication, errorStatus, errorIndex, varBinds in nextCmd( - SnmpEngine(), + async for error_indication, error_status, error_index, var_binds in walk_cmd( + SnmpDispatcher(), CommunityData("public", mpModel=0), - UdpTransportTarget((host, 161), timeout=3, retries=0), - ContextData(), + await UdpTransportTarget.create((host, 161), timeout=3, retries=0), ObjectType(ObjectIdentity(oid)), lookupMib=False, lexicographicMode=False, lookupNames=True, lookupValues=True, ): - if errorIndication: + if error_indication: ## we need to look at later - currently will print forever for a moxa that is ## not on the network. Maybe return status and let caller decide whether to print # print_and_log(f"Error:: for {host}: {errorIndication}", severity=SEVERITY.MINOR) break - elif errorStatus: + elif error_status: print_and_log( - "host %s: %s at %s" - % ( + "host {}: {} at {}".format( host, - errorStatus.prettyPrint(), - errorIndex and varBinds[int(errorIndex) - 1][0] or "?", + error_status, + error_index and var_binds[int(error_index) - 1][0] or "?", ), severity=SEVERITY.MAJOR, ) @@ -59,18 +69,19 @@ def walk(host, oid, requestedMIBs=INTERESTING_MIBS): else: # Run var-binds through MIB resolver # You may want to catch and ignore resolution errors here - varBinds = [ - rfc1902.ObjectType(rfc1902.ObjectIdentity(x[0]), x[1]).resolveWithMib( - mibViewController - ) - for x in varBinds - ] - for name, value in varBinds: + res_var_binds = [] + for x in var_binds: + try: + y = rfc1902.ObjectType(rfc1902.ObjectIdentity(x[0]), x[1]).resolveWithMib( + MIB_VIEW_CONTROLLER + ) + res_var_binds.append((y[0], y[1])) + except error.SmiError: + pass + for name, value in res_var_binds: mib, exists, port = name.prettyPrint().partition(".") - if not exists: - port = "" # print(name.prettyPrint(), ' = ', value.prettyPrint()) - if mib in requestedMIBs: + if mib in requested_mibs: mibmap[name.prettyPrint()] = value.prettyPrint() # print('MIB-->', mib, ' port-->', port, ' = ', value.prettyPrint()) # print_and_log('MIB -->%s, port --> %s, = %s' % (mib, port, value)) @@ -78,4 +89,8 @@ def walk(host, oid, requestedMIBs=INTERESTING_MIBS): return mibmap +def walk(host: str, oid: str, requested_mibs: list[str] = INTERESTING_MIBS) -> dict[str, str]: + return asyncio.run(walkasync(host, oid, requested_mibs=requested_mibs)) + + # walk('130.246.49.46', '1.3.6.1.2.1') From d4d3eb70c5f7c29fa0acc6a67d247b6677d6184d Mon Sep 17 00:00:00 2001 From: Freddie Akeroyd Date: Sat, 16 May 2026 01:30:13 +0100 Subject: [PATCH 2/2] Update --- src/server_common/snmpWalker.py | 101 ++++++++++++++++---------------- 1 file changed, 51 insertions(+), 50 deletions(-) diff --git a/src/server_common/snmpWalker.py b/src/server_common/snmpWalker.py index 58a973d..f1f81fe 100644 --- a/src/server_common/snmpWalker.py +++ b/src/server_common/snmpWalker.py @@ -6,10 +6,12 @@ ObjectType, SnmpDispatcher, UdpTransportTarget, - walk_cmd, + next_cmd, ) from pysnmp.smi import builder, compiler, error, rfc1902, view +# must import pysnmp_sync_adapter after pysnmp.hlapi to ensure correct v1 functions chosen +# from pysnmp_sync_adapter import create_transport, next_cmd_sync from server_common.utilities import SEVERITY, print_and_log # Assemble MIB browser @@ -35,62 +37,61 @@ ] -async def walkasync( +async def walk_async( host: str, oid: str, requested_mibs: list[str] = INTERESTING_MIBS ) -> dict[str, str]: mibmap = dict() - async for error_indication, error_status, error_index, var_binds in walk_cmd( - SnmpDispatcher(), - CommunityData("public", mpModel=0), - await UdpTransportTarget.create((host, 161), timeout=3, retries=0), - ObjectType(ObjectIdentity(oid)), - lookupMib=False, - lexicographicMode=False, - lookupNames=True, - lookupValues=True, - ): - if error_indication: - ## we need to look at later - currently will print forever for a moxa that is - ## not on the network. Maybe return status and let caller decide whether to print - # print_and_log(f"Error:: for {host}: {errorIndication}", severity=SEVERITY.MINOR) - break - - elif error_status: - print_and_log( - "host {}: {} at {}".format( - host, - error_status, - error_index and var_binds[int(error_index) - 1][0] or "?", - ), - severity=SEVERITY.MAJOR, + current_oid = ObjectType(ObjectIdentity(oid)) + with SnmpDispatcher() as snmp_dispatcher: + while True: + error_indication, error_status, error_index, var_binds = await next_cmd( + snmp_dispatcher, + CommunityData("public", mpModel=0), + await UdpTransportTarget.create((host, 161), timeout=5, retries=0), + current_oid, + lookupMib=False, + lexicographicMode=False, + lookupNames=True, + lookupValues=True, ) - break + if error_indication: + ## we need to look at later - currently will print forever for a moxa that is + ## not on the network. Maybe return status and let caller decide whether to print + print_and_log(f"Error:: for {host}: {error_indication}", severity=SEVERITY.MINOR) + break - else: - # Run var-binds through MIB resolver - # You may want to catch and ignore resolution errors here - res_var_binds = [] - for x in var_binds: - try: - y = rfc1902.ObjectType(rfc1902.ObjectIdentity(x[0]), x[1]).resolveWithMib( - MIB_VIEW_CONTROLLER - ) - res_var_binds.append((y[0], y[1])) - except error.SmiError: - pass - for name, value in res_var_binds: - mib, exists, port = name.prettyPrint().partition(".") - # print(name.prettyPrint(), ' = ', value.prettyPrint()) - if mib in requested_mibs: - mibmap[name.prettyPrint()] = value.prettyPrint() - # print('MIB-->', mib, ' port-->', port, ' = ', value.prettyPrint()) - # print_and_log('MIB -->%s, port --> %s, = %s' % (mib, port, value)) + elif error_status: + print_and_log( + "host {}: {} at {}".format( + host, + error_status, + error_index and var_binds[int(error_index) - 1][0] or "?", + ), + severity=SEVERITY.MAJOR, + ) + break + else: + # Run var-binds through MIB resolver + # We get some `Excessive instance identifier sub-OIDs left at MibTableRow` errors + # which is what the error.SmiError ignore is all about + res_var_binds = [] + for x in var_binds: + try: + y = rfc1902.ObjectType(rfc1902.ObjectIdentity(x[0]), x[1]).resolveWithMib( + MIB_VIEW_CONTROLLER + ) + res_var_binds.append((y[0], y[1])) + except error.SmiError: + pass + for name, value in res_var_binds: + mib, exists, port = name.prettyPrint().partition(".") + if mib in requested_mibs: + mibmap[name.prettyPrint()] = value.prettyPrint() + current_oid = var_binds[0] return mibmap +# for example walk('130.246.49.46', '1.3.6.1.2.1') def walk(host: str, oid: str, requested_mibs: list[str] = INTERESTING_MIBS) -> dict[str, str]: - return asyncio.run(walkasync(host, oid, requested_mibs=requested_mibs)) - - -# walk('130.246.49.46', '1.3.6.1.2.1') + return asyncio.run(walk_async(host, oid, requested_mibs))