-
Notifications
You must be signed in to change notification settings - Fork 52
Expand file tree
/
Copy pathusbserialandroid_ascii.py
More file actions
executable file
·69 lines (52 loc) · 2.25 KB
/
usbserialandroid_ascii.py
File metadata and controls
executable file
·69 lines (52 loc) · 2.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#!/usr/bin/env python3
# Software License Agreement (BSD License)
#
# Author: Adrian Clark <adrian.clark@canterbury.ac.nz>
# Modifed from file written by: Duke Fong <duke@ufactory.cc>
import _thread, threading
# Import USB 4 Android and USB Serial 4 Android libraries
from usb4a import usb
from usbserial4a import serial4a
# We need to use the select_port function defined in these scripts as the port selection is slightly different
from ..utils.usbserialandroid_select_serial_port import select_port
from ...utils.log import *
class USBSerialAndroidAscii(threading.Thread):
def __init__(self, ufc, node, iomap, dev_port = None, baud = 115200, filters = None):
self.ports = {
'in': {'dir': 'in', 'type': 'topic', 'callback': self.in_cb},
'out': {'dir': 'out', 'type': 'topic'},
'service': {'dir': 'in', 'type': 'service', 'callback': self.service_cb}
}
self.node = node
self.logger = logging.getLogger('uf.' + node.replace('/', '.'))
ufc.node_init(node, self.ports, iomap)
dev_port = select_port(logger = self.logger, dev_port = dev_port, filters = filters)
if not dev_port:
quit(1)
# Use the USB Serial 4 Android Serial Port Function to open
self.com = serial4a.get_serial_port(dev_port.name, baud, 8, 'N', 1)
if not self.com.isOpen():
raise Exception('serial open failed')
threading.Thread.__init__(self)
self.daemon = True
self.alive = True
self.start()
def run(self):
while self.alive:
# Read_Until is the equivalent function to ReadLines
line = self.com.read_until()
if not line:
continue
line = ''.join(map(chr, line)).rstrip()
self.logger.log(logging.VERBOSE, '-> ' + line)
if self.ports['out']['handle']:
self.ports['out']['handle'].publish(line)
self.com.close()
def stop(self):
self.alive = False
self.join()
def in_cb(self, message):
self.logger.log(logging.VERBOSE, '<- ' + message)
self.com.write(bytes(map(ord, message + '\n')))
def service_cb(self, message):
pass