-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinficon.py
More file actions
executable file
·204 lines (169 loc) · 5.65 KB
/
inficon.py
File metadata and controls
executable file
·204 lines (169 loc) · 5.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#!/usr/bin/env python
'''
Implement the Inficon protocol for retrieving gauge status. Errors are written to STDERR (not logged).
'''
# The default serial port is /dev/ttyUSB0, which is typically the first USB serial device.
# If you're using an on-board RS232 port, you will want to use eg. /dev/ttyS0
# If you are using a USB to serial dongle, you'll need the correct device name. To list all available ports on your system, try:
# $ python -m serial.tools.list_ports
# Common serial names include:
# * /dev/cu.PL2303-*
# * /dev/cu.UC-232AC
# * /dev/ttyAMA0 (Raspberry Pi)
import serial
import sys
import os
import re
import datetime
import argparse
from time import sleep
opts = {
'port': { 'value': '/dev/ttyUSB0', 'help': 'serial port', 'type': str },
'baudrate': { 'value': 9600, 'help': 'serial baud rate', 'type': int },
'databits': { 'value': 8, 'help': 'data bits', 'type': int },
'parity': { 'value': 'N', 'help': 'parity', 'type': str },
'stopbits': { 'value': 1, 'help': 'stop bits', 'type': int },
'timeout': { 'value': 5, 'help': 'serial timeout', 'type': int },
'softflow': { 'value': False, 'help': 'software flow control', 'type': bool },
'hardflow': { 'value': False, 'help': 'hardware flow control', 'type': bool },
'log': { 'value': 'STDOUT', 'help': 'Log file to append to. STDOUT writes to the console.', 'type': str },
'interactive': { 'value': False, 'help': 'Interactive mode. Prompts for INFICON commands and returns any reply.', 'type': bool },
'poll': { 'value': 60, 'help': 'Seconds to wait between polls', 'type': int },
'oneshot': { 'value': False, 'help': 'Poll all sensors once and exit', 'type': bool },
'gauges': { 'value': '0,1', 'help': 'Comma-separated list of ports to poll', 'type': str },
}
STX = chr(2)
def get_terminal_width(margin=5):
'''
Return the current terminal width minus a nice margin.
In python 3.3+, use shutil.get_terminal_size() instead.
'''
if not sys.stdout.isatty():
return 80
import struct
from fcntl import ioctl
from termios import TIOCGWINSZ
reply = ioctl(sys.stdout, TIOCGWINSZ, struct.pack("HHHH", 0, 0, 0, 0))
return struct.unpack("HHHH", reply)[0:2][1] - margin
def checksum(line):
''' Inficon checksum algorithm '''
chk = 0
for char in line:
chk += ord(char)
return chr(chk & 255)
def send(port, cmd):
''' Send a single line, with checksum. '''
try:
port.write(STX + chr(len(cmd)) + cmd + str(checksum(cmd)))
except serial.SerialTimeoutException:
sys.stderr.write('%s: serial timeout sending command: %s'.format(str(datetime.datetime.now()), cmd))
def receive(port, retries=5):
''' Receive up to one full response message. Returns the message or an appropriate error. '''
start = None
attempt = 1
# Skip anything that isn't STX
while start != STX:
start = port.read(1)
if start != STX and attempt > retries:
sys.stderr.write('Timeout while waiting for STX\n')
return ''
attempt += 1
# Next byte is size
size = ord(port.read(1))
# Read size data bytes
data = port.read(size)
if len(data) != size:
sys.stderr.write('%s: serial timeout while receiving data: %s'.format(str(datetime.datetime.now()), data))
return ''
# Grab the checksum
chk = port.read(1)
if chk == checksum(data):
return data
else:
sys.stderr.write('Invalid message: ' + data + '\n')
if __name__ == '__main__':
# argparse foolishly relies on the COLUMNS environment variable to determine the terminal width.
# This is only used in Bash-like shells, and even then it isn't exported by default, so argparse
# defaults to a paltry 80 columns.
#
# This gets the real terminal width if COLUMNS isn't already available.
if not 'COLUMNS' in os.environ:
os.environ['COLUMNS'] = str(get_terminal_width())
# Build out a dynamic argument list based on opts
PARSER = argparse.ArgumentParser(description=__doc__)
for opt in opts:
this_type = opts[opt]['type']
this_val = this_type(opts[opt]['value'])
if this_type == bool:
action = 'store_false' if this_val == True else 'store_true'
PARSER.add_argument(
'--' + opt,
default=this_val,
action=action,
help='%s (default: %s)' % (opts[opt]['help'], str(this_val))
)
else:
PARSER.add_argument(
'--' + opt,
default=this_val,
type=this_type,
action='store',
help='%s (default: %s)' % (opts[opt]['help'], str(this_val))
)
ARGS = PARSER.parse_args()
# Gauge sanity check
gauges = ARGS.gauges.split(',')
for gauge in gauges:
try:
assert int(gauge) in range(10)
except ValueError:
print 'Invalid gauge: ' + gauge
sys.exit(1)
except AssertionError:
print 'Invalid gauge %s (should be 0-9)' % gauge
sys.exit(1)
# Write to STDOUT, or append to the specified log.
print 'datetime,' + ARGS.gauges
if ARGS.log == 'STDOUT':
log = sys.stdout
else:
log = open(ARGS.log, 'a', 0)
# Add a nice CSV header if appropriate.
if log.tell() == 0:
log.write('datetime,' + ARGS.gauges + '\n')
ser = serial.Serial(
port=ARGS.port,
baudrate=ARGS.baudrate,
bytesize=ARGS.databits,
parity=ARGS.parity,
stopbits=ARGS.stopbits,
timeout=ARGS.timeout,
xonxoff=ARGS.softflow,
rtscts=ARGS.hardflow,
# dsrdtr follows rtscts
dsrdtr=None
)
# Until ^C
while True:
try:
if ARGS.interactive:
send(ser, raw_input('> ').rstrip())
print receive(ser)
else:
readings = []
for gauge in gauges:
send(ser, "S0" + gauge)
match = re.match(r"(.*-\d+)", receive(ser))
if(match):
readings.append(match.group(1))
else:
readings.append('')
msg = ','.join([str(datetime.datetime.now())] + readings)
print msg
log.write(msg + "\n")
if ARGS.oneshot:
sys.exit(0)
sleep(ARGS.poll)
except (KeyboardInterrupt, EOFError):
print
sys.exit(0)