-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample_runtime.py
More file actions
196 lines (161 loc) · 10.8 KB
/
example_runtime.py
File metadata and controls
196 lines (161 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
"""Demonstrates the use of MicroControllerInterface with custom ModuleInterface classes.
Showcases how to use the PC client to control custom hardware modules running on the Arduino or Teensy
microcontroller in real time. Also demonstrates how to access the data received from the microcontroller that is
saved to disk via the DataLogger instance. The example is intentionally kept simple and does not cover all possible
use cases.
This example is intended to be used together with a microcontroller running the module_integration.cpp from the
companion ataraxis-micro-controller library: https://github.com/Sun-Lab-NBB/ataraxis-micro-controller#quickstart
See https://github.com/Sun-Lab-NBB/ataraxis-communication-interface#quickstart for more details.
API documentation: https://ataraxis-communication-interface-api.netlify.app/
Authors: Ivan Kondratyev (Inkaros), Jacob Groner
"""
import tempfile
from pathlib import Path
import numpy as np
from ataraxis_base_utilities import LogLevel, console
from ataraxis_data_structures import DataLogger, assemble_log_archives
from ataraxis_time import PrecisionTimer, TimerPrecisions
from example_interface import TestModuleInterface
from ataraxis_communication_interface import (
MICROCONTROLLER_MANIFEST_FILENAME,
MicroControllerInterface,
ModuleExtractionConfig,
KernelExtractionConfig,
ControllerExtractionConfig,
create_extraction_config,
run_log_processing_pipeline,
)
# Guards the runtime to support MicroControllerInterface's multiprocessing architecture.
if __name__ == "__main__":
# Enables the console module to communicate the example's runtime progress via the terminal.
console.enable()
# Specifies the directory where to save all incoming and outgoing messages processed by the MicroControllerInterface
# instance for each hardware module.
tempdir = tempfile.TemporaryDirectory() # Creates a temporary directory for illustration purposes.
output_directory = Path(tempdir.name)
# Instantiates the DataLogger, which is used to save all incoming and outgoing MicroControllerInterface messages
# to disk. See https://github.com/Sun-Lab-NBB/ataraxis-data-structures for more details on DataLogger class.
data_logger = DataLogger(output_directory=output_directory, instance_name="AMC")
data_logger.start() # Starts the DataLogger before it can save any log entries.
# Defines two interface instances, one for each TestModule used at the same time. Note that each instance uses
# different module_id codes, but the same type (family) id code.
interface_1 = TestModuleInterface(module_type=np.uint8(1), module_id=np.uint8(1))
interface_2 = TestModuleInterface(module_type=np.uint8(1), module_id=np.uint8(2))
interfaces = (interface_1, interface_2)
# Instantiates the MicroControllerInterface. Functions similar to the Kernel class from the
# ataraxis-micro-controller library and abstracts most inner-workings of the library. Expects a Teensy 4.1
# microcontroller, and the parameters defined below may not be optimal for all supported microcontrollers.
mc_interface = MicroControllerInterface(
controller_id=np.uint8(222),
buffer_size=8192,
port="/dev/ttyACM1",
data_logger=data_logger,
module_interfaces=interfaces,
name="test_controller",
baudrate=115200,
keepalive_interval=5000,
)
console.echo(message="Initializing the communication process...")
# Starts the serial communication with the microcontroller by initializing a separate process that handles the
# communication. This method may take up to 15 seconds to execute, as it verifies that the microcontroller is
# configured correctly, given the MicroControllerInterface configuration.
mc_interface.start()
console.echo(message="Communication process: Initialized.", level=LogLevel.SUCCESS)
console.echo(message="Updating hardware module runtime parameters...")
# Due to the current SharedMemoryArray implementation, the shared memory instances require additional setup after
# the communication process is started.
interface_1.start_shared_memory_array()
interface_2.start_shared_memory_array()
# Generates and sends new runtime parameters to both hardware module instances running on the microcontroller.
# On and Off durations are in microseconds.
interface_1.set_parameters(
on_duration=np.uint32(1000000), off_duration=np.uint32(1000000), echo_value=np.uint16(121)
)
interface_2.set_parameters(
on_duration=np.uint32(5000000), off_duration=np.uint32(5000000), echo_value=np.uint16(333)
)
console.echo(message="Hardware module runtime parameters: Updated.", level=LogLevel.SUCCESS)
console.echo(message="Sending the 'echo' command to the TestModule 1...")
# Requests instance 1 to return its echo value. By default, the echo command only runs once.
interface_1.echo()
# Waits until the microcontroller responds to the echo command. The interface is configured to update shared
# memory array index 2 with the received echo value when it receives the response from the microcontroller.
while interface_1.shared_memory[2] == 0:
continue
# Retrieves and prints the microcontroller's response. The returned value should match the parameter set above: 121.
console.echo(message=f"TestModule 1 echo value: {interface_1.shared_memory[2]}.", level=LogLevel.SUCCESS)
# Demonstrates the use of non-blocking recurrent commands.
console.echo(message="Executing the example non-blocking runtime, standby for ~5 seconds...")
# Instructs the first TestModule instance to start pulsing the managed pin (Pin 5 by default). With the parameters
# sent earlier, it keeps the pin ON for 1 second and keeps it off for ~ 2 seconds (1 from off_duration,
# 1 from waiting before repeating the command). The microcontroller repeats this command at regular intervals
# until it is given a new command or receives a 'dequeue' command (see below).
interface_1.pulse(repetition_delay=np.uint32(1000000), noblock=True)
# Instructs the second TestModule instance to start sending its echo value to the PC every 500 milliseconds.
interface_2.echo(repetition_delay=np.uint32(500000))
# Delays for 5 seconds, accumulating echo values from TestModule 2 and pin On / Off notifications from TestModule
# 1. Uses the PrecisionTimer instance to delay the main process for 5 seconds.
delay_timer = PrecisionTimer(precision=TimerPrecisions.SECOND)
delay_timer.delay(delay=5, block=False)
# Cancels both recurrent commands by issuing a dequeue command. Note, the dequeue command does not interrupt already
# running commands, it only prevents further command repetitions.
interface_1.reset_command_queue()
interface_2.reset_command_queue()
# The result seen here depends on the communication speed between the PC and the microcontroller and the precision
# of the microcontroller's clock. For Teensy 4.1, which was used to write this example, the pin is expected to
# pulse ~2 times and the echo value is expected to be transmitted ~10 times during the test period.
console.echo(message="Non-blocking runtime: Complete.", level=LogLevel.SUCCESS)
console.echo(message=f"TestModule 1 Pin pulses: {interface_1.shared_memory[0]}")
console.echo(message=f"TestModule 2 Echo values: {interface_2.shared_memory[1]}")
# Resets the pulse and echo counters before executing the demonstration below.
interface_1.shared_memory[0] = 0
interface_2.shared_memory[1] = 0
# Repeats the example above, but now uses blocking commands instead of non-blocking.
console.echo(message="Executing the example blocking runtime, standby for ~5 seconds...")
interface_1.pulse(repetition_delay=np.uint32(1000000), noblock=False)
interface_2.echo(repetition_delay=np.uint32(500000))
delay_timer.delay(delay=5, block=False) # Reuses the same delay timer
interface_1.reset_command_queue()
interface_2.reset_command_queue()
# This time, since the pin pulsing performed by module 1 interferes with the echo command performed by module 2,
# both pulse and echo counters are expected to be ~5.
console.echo(message="Blocking runtime: Complete.", level=LogLevel.SUCCESS)
console.echo(message=f"TestModule 1 Pin pulses: {interface_1.shared_memory[0]}")
console.echo(message=f"TestModule 2 Echo values: {interface_2.shared_memory[1]}")
# Stops the communication process and releases all resources used during runtime.
mc_interface.stop()
console.echo(message="Communication process: Stopped.", level=LogLevel.SUCCESS)
# Stops the DataLogger and assembles all logged data into a single .npz archive file. This step is required to be
# able to extract the logged message data for further analysis.
data_logger.stop()
console.echo(message="Assembling the message log archive...")
assemble_log_archives(log_directory=data_logger.output_directory, remove_sources=True, verbose=True)
# To process the data logged during runtime, first generate a precursor extraction configuration from the
# microcontroller manifest. The manifest is automatically created by MicroControllerInterface during start().
console.echo(message="Creating extraction configuration from manifest...")
manifest_path = data_logger.output_directory / MICROCONTROLLER_MANIFEST_FILENAME
config = create_extraction_config(manifest_path=manifest_path)
# The generated config has empty event_codes for each module. Fill in the event codes that should be extracted.
# Event codes 52 (kHigh), 53 (kLow), and 54 (kEcho) are the TestModule event codes demonstrated above.
config.controllers[0] = ControllerExtractionConfig(
controller_id=222,
modules=(
ModuleExtractionConfig(module_type=1, module_id=1, event_codes=(52, 53, 54)),
ModuleExtractionConfig(module_type=1, module_id=2, event_codes=(52, 53, 54)),
),
kernel=KernelExtractionConfig(event_codes=(2,)), # Extracts kernel status code 2 (module setup) events.
)
# Saves the filled-in config to disk. The pipeline reads it from disk to support both CLI and API usage.
config_path = data_logger.output_directory / "extraction_config.yaml"
config.save(file_path=config_path)
console.echo(message=f"Extraction config written to: {config_path}", level=LogLevel.SUCCESS)
# Runs the log processing pipeline. Extracts hardware module and kernel message data from the log archives and
# writes the results to feather (IPC) files for downstream analysis.
console.echo(message="Processing the logged message data...")
output_path = Path(tempfile.mkdtemp())
run_log_processing_pipeline(
log_directory=data_logger.output_directory,
output_directory=output_path,
config=config_path,
)
console.echo(message=f"Processing complete. Feather output written to: {output_path}", level=LogLevel.SUCCESS)