-
Notifications
You must be signed in to change notification settings - Fork 670
Expand file tree
/
Copy pathmethods.py
More file actions
90 lines (77 loc) · 2.78 KB
/
methods.py
File metadata and controls
90 lines (77 loc) · 2.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
"""
High level method related functions
"""
from opcua import ua
from opcua.common import node
def call_method(parent, methodid, *args):
"""
Call an OPC-UA method. methodid is browse name of child method or the
nodeid of method as a NodeId object
arguments are variants or python object convertible to variants.
which may be of different types
returns a list of values or a single value depending on the output of the method
"""
result = call_method_full(parent, methodid, *args)
if len(result.OutputArguments) == 0:
return None
elif len(result.OutputArguments) == 1:
return result.OutputArguments[0]
else:
return result.OutputArguments
def call_method_full(parent, methodid, *args):
"""
Call an OPC-UA method. methodid is browse name of child method or the
nodeid of method as a NodeId object
arguments are variants or python object convertible to variants.
which may be of different types
returns a CallMethodResult object with converted OutputArguments
"""
if isinstance(methodid, (str, ua.uatypes.QualifiedName)):
methodid = parent.get_child(methodid).nodeid
elif isinstance(methodid, node.Node):
methodid = methodid.nodeid
result = _call_method(parent.session_server, parent.nodeid, methodid, to_variant(*args))
result.OutputArguments = [var.Value for var in result.OutputArguments]
return result
def _call_method(session_server, parentnodeid, methodid, arguments):
request = ua.CallMethodRequest()
request.ObjectId = parentnodeid
request.MethodId = methodid
request.InputArguments = arguments
methodstocall = [request]
results = session_server.call(methodstocall)
res = results[0]
res.StatusCode.check()
return res
def uamethod(func):
"""
Method decorator to automatically convert
arguments and output to and from variants
"""
def wrapper(parent, *args):
if isinstance(parent, ua.NodeId):
result = func(parent, *[arg.Value for arg in args])
else:
self = parent
parent = args[0]
args = args[1:]
result = func(self, parent, *[arg.Value for arg in args])
if result is None:
return []
elif isinstance(result, ua.CallMethodResult):
result.OutputArguments = to_variant(*result.OutputArguments)
return result
elif isinstance(result, ua.StatusCode):
return result
elif isinstance(result, tuple):
return to_variant(*result)
else:
return to_variant(result)
return wrapper
def to_variant(*args):
uaargs = []
for arg in args:
if not isinstance(arg, ua.Variant):
arg = ua.Variant(arg)
uaargs.append(arg)
return uaargs