forked from madengr/ham2mon
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathchannel_loggers.py
More file actions
182 lines (150 loc) · 5.75 KB
/
channel_loggers.py
File metadata and controls
182 lines (150 loc) · 5.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
'''
Log channel activity in various formats and provide channel activity to scanner.
'''
import logging
import datetime
from frequency_manager import ChannelMessage
from abc import ABC
from dataclasses import dataclass, asdict
from importlib import import_module
import asyncio
@dataclass(kw_only=True)
class ChannelLogParams:
'''
Holds channel log command line options provided by the user
'''
type: str
target: str
timeout: int
class ChannelLogger(ABC):
'''
Base class for all loggers. Also notify scanner of activity.
'''
def __init__(self, params: ChannelLogParams) -> None:
logging.debug(f'Creating {self.__class__.__name__} channel logger')
self.timeout: int = 0 # overridden by child classes
self.log_task: dict[int, asyncio.Task] = {} # activity logging tasks are channel specific
self.params = params
async def log(self, msg: ChannelMessage | None) -> None:
'''
Abstract method to log an event. Also provide message to scanner
with receiver details (files created and/or classified)
Overridden in each child class for specific loggers
'''
if msg is None:
return
@staticmethod
def get_logger(params: ChannelLogParams) -> 'ChannelLogger':
'''
Factory to generate a class instance based on command line options
'''
if params.type == 'fixed-field':
return FixedField(params)
elif params.type == 'json-server':
return JsonToServer(params)
elif params.type == 'debug':
return Debug(params)
else:
return NoOp(params)
def handle_channel_state(self, msg: ChannelMessage) -> None:
'''
Use on/off events to start/stop activity timer
'''
if self.timeout == 0:
return
channel = msg.channel
if msg.state == 'on':
# start reoccurring task to log that channel is active
self.log_task[channel] = asyncio.create_task(self.log_active(msg))
elif msg.state == 'off':
# stop the reoccurring task
if self.log_task[channel]:
was_cancelled = self.log_task[channel].cancel()
if not was_cancelled:
logging.error('Could not cancel logging task')
async def log_active(self, msg: ChannelMessage) -> None:
'''
While the channel is active log at an interval
'''
while True:
await asyncio.sleep(self.timeout)
await self.log(ChannelMessage(state='act',
rf=msg.rf,
channel=msg.channel))
class NoOp(ChannelLogger):
'''
Logger that ignores all events
'''
def __init__(self, params: ChannelLogParams) -> None:
super().__init__(params)
self.timeout: int = 0
async def log(self, msg: ChannelMessage | None) -> None:
if msg is None:
return
await super().log(msg)
class Debug(ChannelLogger):
'''
Send channel events to the debug log (enable with --debug)
'''
def __init__(self, params: ChannelLogParams) -> None:
super().__init__(params)
self.timeout = params.timeout
async def log(self, msg: ChannelMessage | None) -> None:
if msg is None:
return
await super().log(msg)
# for this logger we just write to the debug log
logging.debug(msg)
self.handle_channel_state(msg)
class FixedField(ChannelLogger):
'''
Send channel events to a file with fixed field length records
'''
def __init__(self, params) -> None:
super().__init__(params)
self.file_name = params.target
self.timeout = params.timeout
async def log(self, msg: ChannelMessage | None) -> None:
if msg is None:
return
await super().log(msg)
now = datetime.datetime.now()
with open(self.file_name, 'a') as file:
text = (f'{now.strftime("%Y-%m-%d, %H:%M:%S.%f")}: {msg.state:<4}{msg.rf:<10}'
f'{msg.channel:<2}{msg.priority if msg.priority else "":<2}'
f'{msg.classification if msg.classification else "":<2}'
f'{msg.file if msg.file else "":<50}\n'
)
file.write(text)
self.handle_channel_state(msg)
class JsonToServer(ChannelLogger):
'''
Send channels events as json messages to a remote server
'''
def __init__(self, params) -> None:
super().__init__(params)
self.server = params.target
self.timeout = params.timeout
self.requests = import_module('requests')
# urllib3 is very chatty. Uncomment is log event for every connection is needed.
# remove this if there is a single connection approach
logging.getLogger("urllib3").setLevel(logging.WARNING)
async def log(self, msg: ChannelMessage | None) -> None:
if msg is None:
return
await super().log(msg)
msg_dict = asdict(msg)
logging.debug(f'{msg =}')
try:
# TODO: open the connection once
request = self.requests.post(self.server, json=msg_dict)
request.raise_for_status()
except self.requests.exceptions.HTTPError as errh:
logging.error(f'HTTP Error: {errh.args[0]}')
except self.requests.exceptions.ConnectionError as errc:
logging.error(f'Connection Error: {errc.args[0]}')
except self.requests.exceptions.Timeout as errt:
logging.error(f'Timeout Error: {errt.args[0]}')
except self.requests.exceptions.RequestException as err:
logging.error(f'Some kind of Error: {err.args[0]}')
self.handle_channel_state(msg)