diff --git a/.DS_Store b/.DS_Store index 46ef8b1..7a45687 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/Server/TestRunner.py b/Server/TestRunner.py index fa2aa42..765ce8a 100644 --- a/Server/TestRunner.py +++ b/Server/TestRunner.py @@ -10,23 +10,15 @@ sys.path.append(new_root) sys.path.append(os.getcwd()) -from upload_wrapper import upload_firmware - -# Custom TestSuite that overrides the run method to run setup and teardown code; added code to maek firmware upload once per board, not test +# Custom TestSuite that overrides the run method to run setup and teardown code class CustomTestSuite(unittest.TestSuite): - def __init__(self, tests, board_name): - super().__init__(tests) - self.board_name = board_name - def run(self, result, debug=False): - setup_suite(self.board_name) + setup_suite() super().run(result, debug) - # teardown_suite(self.board_name) # optional - -def setup_suite(board_name): - print(f"[SUITE] Uploading firmware for {board_name}") - upload_firmware(board_name) + teardown_suite() +def setup_suite(): + print("Setting up suite...") # replace with setup code def teardown_suite(): print("Tearing down suite...") # replace with teardown code @@ -44,8 +36,6 @@ def make_suite(board_folder): def run_tests() -> None: board_names = config.REPO_CONFIG["boards"].keys() - print("DEBUG: Boards in config:", board_names) - all_suites = unittest.TestSuite() @@ -55,13 +45,11 @@ def run_tests() -> None: if os.path.isdir(board_folder): suite = make_suite(board_folder) - custom_suite = CustomTestSuite([suite], board) - all_suites.addTest(custom_suite) - print("Added test to custom suite") + custom_suite = CustomTestSuite([suite]) + all_suites.addTests(custom_suite) else: print(f"Warning: Folder for board '{board}' not found at path: {board_folder}") with open("test-results.xml", "wb") as output: runner = xmlrunner.XMLTestRunner(output=output, buffer=True) runner.run(all_suites) - diff --git a/Testing_Library/MotorInterfaceTest.py b/Testing_Library/MotorInterfaceTest.py new file mode 100644 index 0000000..9ba17a6 --- /dev/null +++ b/Testing_Library/MotorInterfaceTest.py @@ -0,0 +1,122 @@ +import sys +import serial +import time +import threading + +sys.path.append('/home/solarcar/solarcar/HiL_Testing/Server') + +THROTTLE_ADDR = 0x2F +REGEN_ADDR = 0x2E +THROTTLE_START_BYTE = b'\xFF' +REGEN_START_BYTE = b'\xFE' +''' +while True: + byte_read = ser.read(1) + if byte_read == b'\xFF': + data = ser.read(3) # Read 3 bytes for two 9-bit values + if len(data) == 3: + # Unpack two 9-bit values from 3 bytes + # Byte 0: throttle[7:0] + # Byte 1: throttle[8] in bit 0, regen[6:0] in bits 7:1 + # Byte 2: regen[8:7] in bits 1:0 + + # Extracting throttle and regen + throttle = data[0] | ((data[1] & 0x01) << 8) + regen = ((data[1] >> 1) & 0x7F) | ((data[2] & 0x03) << 7) + + print(f"Throttle: {throttle}%) | Regen: {regen})") + else: + print(f"Incomplete data: got {len(data)} bytes instead of 3") + elif byte_read: + print(f"Unexpected byte:{byte_read}") +''' + +class MotorInterfaceTest: + """API for test Motor interface""" + def __init__(self, port='/dev/ttyACM0', baudrate=9600): + self.stop_thread = False + self.mru_throttle = None # Most Recently Used Throttle Value + self.mru_regen = None # Most Recently Used Regen Value + self.Lock = threading.Lock() + self.ser = serial.Serial( + port=port, + baudrate=baudrate, + timeout=1 + ) + print("Waiting for Arduino to start...") + time.sleep(2) # Give Arduino time to boot + self.startReadThread() + + + # Run infinite thread to read in Throttle and Regen Values + def startReadThread(self): + read_thread = threading.Thread(target=self.readThrottleAndRegen) + read_thread.daemon = True + try: + read_thread.start() + except RuntimeError as e: + raise RuntimeError("Thread Was not Started") + + # Constantly read thorttle and regen values + def readThrottleAndRegen(self): + while(not self.stop_thread): + try: + byte_read = self.ser.read(1) + + if byte_read == THROTTLE_START_BYTE: # Throttle start marker + data = self.ser.read(2) # Read 2 bytes for 9-bit value + if len(data) == 2: + # motorinterface.cpp sends (0x100 - throttle) + # first byte contains lower 8 bits, second byte LSB is the 9th bit + raw_from_arduino = data[0] | ((data[1] & 0x01) << 8) + with self.Lock: + self.mru_throttle = 256 - raw_from_arduino # PowerBoard sends (0x100 - throttle) over I2C + + elif byte_read == REGEN_START_BYTE: + data = self.ser.read(2) # Read 2 bytes for 9-bit value + if len(data) == 2: + # motorinterface.cpp sends (0x100 - regen) + # first byte contains lower 8 bits, second byte LSB is the 9th bit + raw_from_arduino = data[0] | ((data[1] & 0x01) << 8) + with self.Lock: + self.mru_regen = 256 - raw_from_arduino # PowerBoard sends (0x100 - regen) over I2C + + except serial.SerialException as e: + print(e) + # setting Both to None so that Exception is triggered within the getter methods + self.mru_throttle = None + self.mru_regen = None + + def get_throttle(self) -> float: + # Normalized between [0-1.0] + with self.Lock: + if self.mru_throttle is None: + raise ValueError('MRU-Throttle Value is None') + return self.mru_throttle / 256.0 + + def get_throttle_raw(self) -> int: + # raw between [0-256] + with self.Lock: + if self.mru_throttle is None: + raise ValueError('MRU-Raw-Throttle Value is None') + return self.mru_throttle + + def get_regen(self) -> float: + # Normalized between [0,1.0] + with self.Lock: + if self.mru_regen is None: + raise ValueError('MRU-Regen Value is None') + return self.mru_regen / 256.0 + + def get_regen_raw(self) -> int: + # raw between [0-256] + with self.Lock: + if self.mru_regen is None: + raise ValueError('MRU-Regen-Raw Value is None') + return self.mru_regen + + def close(self): + self.stop_thread = True + if self.ser and self.ser.is_open: + self.ser.close() + diff --git a/Testing_Library/motor_interface_test_script1.29/motor_interface_test_script1.29.ino b/Testing_Library/motor_interface_test_script1.29/motor_interface_test_script1.29.ino new file mode 100644 index 0000000..1a32eeb --- /dev/null +++ b/Testing_Library/motor_interface_test_script1.29/motor_interface_test_script1.29.ino @@ -0,0 +1,63 @@ +#include + +#define THROTTLE_ADDR 0x2F +#define REGEN_ADDR 0x2E +#define THROTTLE_MARKER 0xFF +#define REGEN_MARKER 0xFE +#define SWAP_COMMAND 0x1FF + +// Test Mode: 0 = Throttle only (listen on 0x2F), 1 = Regen only (listen on 0x2E). +// Must match PowerBoard main.cpp I2C_TEST_MODE: test_throttle → 0, test_regen → 1. +#define TEST_MODE 1 + +int currentMode = 1; // 0 = Throttle, 1 = Regen + +void setup() { + Serial.begin(9600); + while (!Serial); + + // Initialize based on test mode + if (TEST_MODE == 1) { + // Start in regen mode + Wire.begin(REGEN_ADDR); + currentMode = 1; + } else { + // Start in throttle mode (default) + Wire.begin(THROTTLE_ADDR); + currentMode = 0; + } + + Wire.onReceive(receiveEvent); +} + +void loop() { + delay(100); +} + +void receiveEvent(int howMany) { + if (Wire.available() < 2) return; + + // Read 2 bytes (16-bit value) + int highByte = Wire.read(); + int lowByte = Wire.read(); + int value = ((highByte << 8) | lowByte) & 0x1FF; + + // Clamp value to valid range [0, 256] + if (value > 256) value = 256; + + + // Ensure value stays in valid range after inversion + if (value > 256) value = 256; + if (value < 0) value = 0; + + // Send based on current mode + if (currentMode == 0) { + Serial.write(THROTTLE_MARKER); + Serial.write(value & 0xFF); // Low byte + Serial.write((value >> 8) & 0x01); // Bit 8 + } else { + Serial.write(REGEN_MARKER); + Serial.write(value & 0xFF); // Low byte + Serial.write((value >> 8) & 0x01); // Bit 8 + } +} diff --git a/Testing_Library/test_all_comms.py b/Testing_Library/test_all_comms.py new file mode 100644 index 0000000..0e1b3ea --- /dev/null +++ b/Testing_Library/test_all_comms.py @@ -0,0 +1,106 @@ +import sys +sys.path.append('/home/solarcar/solarcar/HiL_Testing/Server') + +from CANBus import CANBus +from CANMessage import CanMessage +from CANPi import readIn +import time +import serial +from gpioPins import DigitalInput, DigitalOutput +import random +import gpiozero +from gpioPins import AnalogOutput + +# Global Pins for Synchronization +pinOut = DigitalOutput("2") +pinIn = DigitalInput("11") + +# -----------Testing CAN Comms----------- + +def testAddMethod(canBus : CANBus): + canBus.printCANBus() + + input("WAITING") + #Example CANMessage + name = "BPSError" + id = 0x106 + signals = {"internal_communications_fault" : 1} + timestamp = 1.0 + can_message = CanMessage(name, id, signals, timestamp) + canBus.addToCANBus(can_message) + canBus.printCANBus() + +def testReadThread(canBus : CANBus): + time.sleep(1) # Wait for Nucleo to send CAN + counter = 0 + while (counter < 5): + print(f'Reading number {counter}') + canBus.printCANBus() + time.sleep(0.5) + counter = counter + 1 + +#Send BPSError to Nucleo (currently turn LD2 off and on) +def testWrite(canBus : CANBus): + #Example CANMessage + name = "BPSError" + id = 0x106 + signals = {"internal_communications_fault" : 1} + timestamp = 1.0 + can_message = CanMessage(name, id, signals, timestamp) + counter = 0 + canBus.sendMessage(can_message) + +def testCAN(): + canBus = CANBus() + testWrite(canBus) + + pinOut.write(0) # Block nucleo from proceeding + # Check that nucleo modified pin to 1 to switch to receiving + while pinIn.read() == 0: + pinOut.write(0) + time.sleep(0.5) + + testReadThread(canBus) + +#----------------Testing Analog Comms--------------- + +#Test case without the wrapper api +def send_to_nucleo(): + pinOut = gpiozero.PWMOutputDevice("GPIO5", frequency=5000) + + #Sending analog value to nucleo + while True: + pinOut.value = 0.64 + print(f"SENDING {pinOut.value}") + #pinOut.pulse() #Useful function to sweep all ranges + + time.sleep(2) + +#Test the wrapper api +def test_analog_class(pinOut): + value = round(random.uniform(1.0, 100.0), 2) # Generate random 2 d.p. float + + pinOut.write_voltage(value) + print(f'READING IN PIN VALUE: {pinOut.read()}') + +def testAnalog(): + pinOut = AnalogOutput("5") + test_analog_class(pinOut) + +if __name__ == "__main__": + # Wait for nucleo + while pinIn.read() == 1: + pinOut.write(0) + time.sleep(0.5) + + pinOut.write(1) + testCAN() + + # Make sure finished sending + while pinIn.read() == 1: + pinOut.write(0) + print("THERE WAS A SYNC ERROR WITH RECEIVING CAN") + time.sleep(0.5) + + pinOut.write(1) + testAnalog()