-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathcommunicationport.py
More file actions
129 lines (101 loc) · 4.73 KB
/
communicationport.py
File metadata and controls
129 lines (101 loc) · 4.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import serial
import re
import time
import random
import inspect
from threading import RLock
from .commands import *
class CommunicationReadTimeout(serial.SerialException):
pass
class CommunicationReadNoMatch(Exception):
pass
class CommunicationReadAlternateMatch(Exception):
pass
class CommunicationPort:
"""CommunicationPort class with basic application-level protocol
functions to write strings and read strings, and abstract away
the details of the communication.
"""
def __init__(self):
self.portLock = RLock()
self.transactionLock = RLock()
self.terminator = b'\n'
@property
def isOpen(self):
fctName = inspect.currentframe().f_code.co_name
raise NotImplementedError("Derived class must implement {0}".format(fctName))
@property
def isNotOpen(self):
return not self.isOpen
def open(self):
fctName = inspect.currentframe().f_code.co_name
raise NotImplementedError("Derived class must implement {0}".format(fctName))
def close(self):
fctName = inspect.currentframe().f_code.co_name
raise NotImplementedError("Derived class must implement {0}".format(fctName))
def bytesAvailable(self) -> int:
fctName = inspect.currentframe().f_code.co_name
raise NotImplementedError("Derived class must implement {0}".format(fctName))
def flush(self):
fctName = inspect.currentframe().f_code.co_name
raise NotImplementedError("Derived class must implement {0}".format(fctName))
def readData(self, length, endPoint=None) -> bytearray:
fctName = inspect.currentframe().f_code.co_name
raise NotImplementedError("Derived class must implement {0}".format(fctName))
def writeData(self, data, endPoint=None) -> int:
fctName = inspect.currentframe().f_code.co_name
raise NotImplementedError("Derived class must implement {0}".format(fctName))
def readString(self, endPoint=None) -> str:
with self.portLock:
byte = None
data = bytearray(0)
try:
while (byte != b''):
byte = self.readData(1, endPoint)
data += byte
if byte == self.terminator:
break
except CommunicationReadTimeout as err:
raise CommunicationReadTimeout("Only obtained {0}".format(data))
return data.decode('utf-8', 'replace')
def writeString(self, string, endPoint=None) -> int:
nBytes = 0
with self.portLock:
data = bytearray(string, 'utf-8', 'replace')
nBytes = self.writeData(data, endPoint)
return nBytes
def writeStringExpectMatchingString(self, string, replyPattern, alternatePattern = None, endPoints=(None,None)):
with self.transactionLock:
self.writeString(string, endPoints[0])
reply = self.readString(endPoints[1])
match = re.search(replyPattern, reply)
if match is None:
if alternatePattern is not None:
match = re.search(alternatePattern, reply)
if match is None:
raise CommunicationReadAlternateMatch(reply)
raise CommunicationReadNoMatch("Unable to find first group with pattern:'{0}'".format(replyPattern))
return reply
def writeStringReadFirstMatchingGroup(self, string, replyPattern, alternatePattern = None, endPoints=(None,None)):
with self.transactionLock:
reply, groups = self.writeStringReadMatchingGroups(string, replyPattern, alternatePattern, endPoints)
if len(groups) >= 1:
return reply, groups[0]
else:
raise CommunicationReadNoMatch("Unable to find first group with pattern:'{0}' in {1}".format(replyPattern, groups))
def writeStringReadMatchingGroups(self, string, replyPattern, alternatePattern = None, endPoints=(None,None)):
with self.transactionLock:
self.writeString(string, endPoints[0])
reply = self.readString(endPoints[1])
match = re.search(replyPattern, reply)
if match is not None:
return reply, match.groups()
else:
raise CommunicationReadNoMatch("Unable to match pattern:'{0}' in reply:'{1}'".format(replyPattern, reply))
def readMatchingGroups(self, replyPattern, alternatePattern = None, endPoint=None):
reply = self.readString(endPoint=endPoint)
match = re.search(replyPattern, reply)
if match is not None:
return reply, match.groups()
else:
raise CommunicationReadNoMatch("Unable to match pattern:'{0}' in reply:'{1}'".format(replyPattern, reply))