Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 69 additions & 53 deletions src/server_common/snmpWalker.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
from pysnmp.hlapi import *
from pysnmp.smi import builder, compiler, rfc1902, view
import asyncio

from pysnmp.hlapi.v1arch.asyncio import (
CommunityData,
ObjectIdentity,
ObjectType,
SnmpDispatcher,
UdpTransportTarget,
next_cmd,
)
from pysnmp.smi import builder, compiler, error, rfc1902, view

# must import pysnmp_sync_adapter after pysnmp.hlapi to ensure correct v1 functions chosen
# from pysnmp_sync_adapter import create_transport, next_cmd_sync
from server_common.utilities import SEVERITY, print_and_log

# Assemble MIB browser
mibBuilder = builder.MibBuilder()
mibViewController = view.MibViewController(mibBuilder)
MIB_BUILDER = builder.MibBuilder()
MIB_VIEW_CONTROLLER = view.MibViewController(MIB_BUILDER)

# compiler.addMibCompiler(mibBuilder, sources=['https://mibs.pysnmp.com/asn1/@mib@'], destination='snmp/mibs')
# compiler.addMibCompiler(mibBuilder, sources=['https://mibs.pysnmp.com/asn1/@mib@'],
# destination='snmp/mibs')
# The below code needs MIBS to have been downloaded in the below subfolder.
compiler.addMibCompiler(mibBuilder, sources=["file://snmp/mibs"])
compiler.addMibCompiler(MIB_BUILDER, sources=["file://snmp/mibs"])
# Pre-load MIB modules we expect to work with
mibBuilder.loadModules(
MIB_BUILDER.load_modules(
"SNMPv2-MIB", "SNMP-COMMUNITY-MIB", "DISMAN-EXPRESSION-MIB", "RFC1213-MIB", "IF-MIB"
)

Expand All @@ -25,57 +37,61 @@
]


def walk(host, oid, requestedMIBs=INTERESTING_MIBS):
async def walk_async(
host: str, oid: str, requested_mibs: list[str] = INTERESTING_MIBS
) -> dict[str, str]:
mibmap = dict()
for errorIndication, errorStatus, errorIndex, varBinds in nextCmd(
SnmpEngine(),
CommunityData("public", mpModel=0),
UdpTransportTarget((host, 161), timeout=3, retries=0),
ContextData(),
ObjectType(ObjectIdentity(oid)),
lookupMib=False,
lexicographicMode=False,
lookupNames=True,
lookupValues=True,
):
if errorIndication:
## we need to look at later - currently will print forever for a moxa that is
## not on the network. Maybe return status and let caller decide whether to print
# print_and_log(f"Error:: for {host}: {errorIndication}", severity=SEVERITY.MINOR)
break

elif errorStatus:
print_and_log(
"host %s: %s at %s"
% (
host,
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex) - 1][0] or "?",
),
severity=SEVERITY.MAJOR,
current_oid = ObjectType(ObjectIdentity(oid))
with SnmpDispatcher() as snmp_dispatcher:
while True:
error_indication, error_status, error_index, var_binds = await next_cmd(
snmp_dispatcher,
CommunityData("public", mpModel=0),
await UdpTransportTarget.create((host, 161), timeout=5, retries=0),
current_oid,
lookupMib=False,
lexicographicMode=False,
lookupNames=True,
lookupValues=True,
)
break
if error_indication:
## we need to look at later - currently will print forever for a moxa that is
## not on the network. Maybe return status and let caller decide whether to print
print_and_log(f"Error:: for {host}: {error_indication}", severity=SEVERITY.MINOR)
break

else:
# Run var-binds through MIB resolver
# You may want to catch and ignore resolution errors here
varBinds = [
rfc1902.ObjectType(rfc1902.ObjectIdentity(x[0]), x[1]).resolveWithMib(
mibViewController
elif error_status:
print_and_log(
"host {}: {} at {}".format(
host,
error_status,
error_index and var_binds[int(error_index) - 1][0] or "?",
),
severity=SEVERITY.MAJOR,
)
for x in varBinds
]
for name, value in varBinds:
mib, exists, port = name.prettyPrint().partition(".")
if not exists:
port = ""
# print(name.prettyPrint(), ' = ', value.prettyPrint())
if mib in requestedMIBs:
mibmap[name.prettyPrint()] = value.prettyPrint()
# print('MIB-->', mib, ' port-->', port, ' = ', value.prettyPrint())
# print_and_log('MIB -->%s, port --> %s, = %s' % (mib, port, value))
break

else:
# Run var-binds through MIB resolver
# We get some `Excessive instance identifier sub-OIDs left at MibTableRow` errors
# which is what the error.SmiError ignore is all about
res_var_binds = []
for x in var_binds:
try:
y = rfc1902.ObjectType(rfc1902.ObjectIdentity(x[0]), x[1]).resolveWithMib(
MIB_VIEW_CONTROLLER
)
res_var_binds.append((y[0], y[1]))
except error.SmiError:

Check notice

Code scanning / CodeQL

Empty except Note

'except' clause does nothing but pass and there is no explanatory comment.
pass
for name, value in res_var_binds:
mib, exists, port = name.prettyPrint().partition(".")
if mib in requested_mibs:
mibmap[name.prettyPrint()] = value.prettyPrint()
current_oid = var_binds[0]
return mibmap


# walk('130.246.49.46', '1.3.6.1.2.1')
# for example walk('130.246.49.46', '1.3.6.1.2.1')
def walk(host: str, oid: str, requested_mibs: list[str] = INTERESTING_MIBS) -> dict[str, str]:
return asyncio.run(walk_async(host, oid, requested_mibs))
Loading