-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdoord.py
More file actions
217 lines (170 loc) · 7.76 KB
/
doord.py
File metadata and controls
217 lines (170 loc) · 7.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import json
import sys
import time
import subprocess
import re
import os
import threading
import logging
from syslogger import Syslogger
from base64 import b64encode
from urllib.request import Request, urlopen
VERSION = "2.1.0"
CARD_PATTERN = re.compile("(0x[a-f0-9]+)", re.IGNORECASE)
CARD_READER_BIN = os.getenv("CARD_READER_BIN", default="./nfcreader/nfcreader")
OPEN_DOOR_BIN = os.getenv("OPEN_DOOR_BIN", default="./open-door")
CARD_DATA_URL = os.getenv("CARD_DATA_URL")
CARD_DATA_USERNAME = os.getenv("CARD_DATA_USERNAME")
CARD_DATA_PASSWORD = os.getenv("CARD_DATA_PASSWORD")
CARD_DATA_ID_FIELDS = [
"card_number", # "new"
"access_card", # "old"
]
UPDATE_INTERVAL = 15
CARDS_SAVE_FILE = os.getenv("CARD_DATA_FILE", default="./.card_data")
logger = Syslogger(level=logging.INFO)
DEBUG = bool(os.getenv("DEBUG", False))
if DEBUG:
logger.setLevel(logging.DEBUG)
TESTING = bool(os.getenv("TESTING", False))
if TESTING:
CARD_READER_BIN = "./test/nfcreader-mock"
OPEN_DOOR_BIN = ["echo", "Door opened!"]
CARDS_SAVE_FILE = "/tmp/testfile"
class DoorControl:
def __init__(self):
# Master list of authorized cards, shared between threads
self.authorized_cards = list()
def run(self):
# Attempt to start off from last successful download
self.load_saved_cards()
# Start NFC card reader thread, that also opens the door
logger.info("Starting NFC card reader thread")
nfc_thread = threading.Thread(target=DoorControl.nfc_reader_worker, args=(self,))
nfc_thread.start()
logger.info("Updating list of authorized cards every %d second(s)", UPDATE_INTERVAL)
last_download_failed = True
while nfc_thread.is_alive():
# Download new cards and update list if necessary
try:
fresh_cards = self.download_card_data()
if last_download_failed:
logger.info("Successfully downloaded card data")
last_download_failed = False
if len(fresh_cards) < 1:
logger.error("No cards were returned by Hula, wtf?")
elif fresh_cards == self.authorized_cards:
logger.info("No new cards returned")
elif len(fresh_cards) > 0 and fresh_cards != self.authorized_cards:
logger.info("Now there are %d authorized card(s) (was %d)", len(fresh_cards), len(self.authorized_cards))
self.authorized_cards = fresh_cards
# Persist successfully downloaded lists
self.save_cards()
except Exception as e:
last_download_failed = True
logger.error("Failed to download new card data: %s", e)
time.sleep(UPDATE_INTERVAL)
logger.warning("Card reader thread stopped. Exiting!")
def load_saved_cards(self):
try:
with open(CARDS_SAVE_FILE, "r") as f:
deserialized = f.read().strip().split(",")
if type(deserialized) is list:
self.authorized_cards = deserialized
logger.info("Successfully loaded last used list of cards (%d cards)", len(self.authorized_cards))
else:
logger.error("File with authorized cards was invalid format. Starting from scratch.")
except FileNotFoundError:
logger.info("File with authorized cards not found. Starting from scratch.")
def save_cards(self):
try:
serialized = ",".join(self.authorized_cards)
with open(CARDS_SAVE_FILE, "w") as f:
f.write(serialized)
logger.info("Saved list of authorized cards to file: %s", CARDS_SAVE_FILE)
except Exception as e:
# TODO: Catch a less generic exception. Just not quite sure if it's necessary to be very defensive here
logger.error("An error occured while attempting to save list of cards: %s", e)
def download_card_data(self):
# Build an authenticated request
req = Request(CARD_DATA_URL)
credentials = CARD_DATA_USERNAME + ":" + CARD_DATA_PASSWORD
auth_str = b64encode(credentials.encode()).decode("ascii")
req.add_header("Authorization", "Basic " + auth_str)
cards = list()
with urlopen(req) as res:
user_data = json.loads(res.read().decode())
# Expect an array
if type(user_data) is not list:
raise ValueError("Invalid data format %s. Expected %s", type(user_data), list)
# Extract card numbers from all users that have a registered card
for user in user_data:
for k in CARD_DATA_ID_FIELDS:
cardid = user.get(k)
if cardid:
# Strip leading zeros from IDs
if cardid.startswith("0x"):
cardid = cardid[2:]
cards.append(hex(int(cardid, base=16)))
return cards
def nfc_reader_worker(self):
import pylibnfc
monitor = pylibnfc.NfcMonitor()
try:
while True:
logger.info("Performing new tag poll")
tag_id = monitor.poll_for_tag(5)
if tag_id == 0:
continue
line = hex(tag_id).strip()
logger.info("Raw card: %r" % (line, ))
# Match on successful NFC tag reads
card_match = CARD_PATTERN.search(line)
if card_match is None:
logger.debug("card_id regex pattern did not match: '%s'", line)
continue
# Verify card is authorized
card_id = card_match.group(1)
if card_id not in self.authorized_cards:
logger.warning("Card NOT authorized: %s", card_id)
continue
logger.info("Card authorized: %s", card_id)
# Trigger door lock
try:
with subprocess.Popen(OPEN_DOOR_BIN) as proc:
proc.wait(timeout=10)
if proc.returncode == 0:
logger.info("Door lock trigger script exited successfully")
else:
logger.error("Door lock trigger script exited uncleanly: %d", proc.returncode)
(stdout, stderr) = proc.communicate()
logger.error("door stdout: %s", stdout)
logger.error("door stderr: %s", stderr)
except subprocess.TimeoutExpired:
logger.error("Timed out waiting for lock trigger script to exit")
except Exception as ex:
logger.exception("wtf bro")
raise SystemError(-1)
def main(argv):
import argparse
import stat
parser = argparse.ArgumentParser()
parser.add_argument("--card_ids", type=str, help="Comma-separated list of authorized cards (for testing)")
options = parser.parse_args(args=argv)
if not os.path.isfile(CARD_READER_BIN):
logger.fatal('File specified as CARD_READER_BIN does not exist: %s', CARD_READER_BIN)
sys.exit(1)
if not os.stat(CARD_READER_BIN).st_mode & stat.S_IXUSR:
logger.fatal('File %s is not executable by user', CARD_READER_BIN)
sys.exit(1)
door = DoorControl()
logger.info("%s version %s" % (os.path.basename(__file__), VERSION))
# Use a list of pre-authorized cards, valid up until first load/download
if options.card_ids:
for card_id in options.card_ids.split(","):
door.authorized_cards.append(card_id)
if TESTING:
door.authorized_cards.append("0x1337")
door.run()
if __name__ == "__main__":
main(sys.argv[1:])