Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions Testing_Library/CAN-messages/motor_interface_test_script1.29.ino
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#include <Wire.h>

#define THROTTLE_ADDR 0x2F
#define REGEN_ADDR 0x2E
#define THROTTLE_MARKER 0xFF
#define REGEN_MARKER 0xFE
#define SWAP_COMMAND 0x1FF

// Test Mode: 0 = Throttle only, 1 = Regen only
#define TEST_MODE 0

int currentMode = 0; // 0 = Throttle, 1 = Regen

void setup() {
Serial.begin(9600);
while (!Serial);
Serial.println("Arduino Starting...");

// Initialize based on test mode
if (TEST_MODE == 1) {
// Start in regen mode
Wire.begin(REGEN_ADDR);
currentMode = 1;
Serial.println("Initialized on REGEN_ADDR (0x2E)");
} else {
// Start in throttle mode (default)
Wire.begin(THROTTLE_ADDR);
currentMode = 0;
Serial.println("Initialized on THROTTLE_ADDR (0x2F)");
}

Wire.onReceive(receiveEvent);
Serial.println("Ready to receive I2C data");
Serial.print("TEST_MODE: ");
Serial.println(TEST_MODE);
}

void loop() {
delay(100);
}

void swapMode() {
Wire.end();
currentMode = !currentMode;

if (currentMode == 0) {
Wire.begin(THROTTLE_ADDR);
Serial.println("Swapped to THROTTLE mode");
} else {
Wire.begin(REGEN_ADDR);
Serial.println("Swapped to REGEN mode");
}
Wire.onReceive(receiveEvent);
}

void receiveEvent(int howMany) {
if (Wire.available() < 2) return;

// Read 2 bytes (16-bit value)
int highByte = Wire.read();
int lowByte = Wire.read();
int value = ((highByte << 8) | lowByte) & 0x1FF;

// Handle swap command
if (value == SWAP_COMMAND) {
if (TEST_MODE == 2) { // Only swap if in mode 2 (both)
swapMode();
}
return;
}

// Clamp value to valid range [0, 256]
if (value > 256) value = 256;


// Ensure value stays in valid range after inversion
if (value > 256) value = 256;
if (value < 0) value = 0;

// Send based on current mode
if (currentMode == 0) {
Serial.write(THROTTLE_MARKER);
Serial.write(value & 0xFF); // Low byte
Serial.write((value >> 8) & 0x01); // Bit 8
} else {
Serial.write(REGEN_MARKER);
Serial.write(value & 0xFF); // Low byte
Serial.write((value >> 8) & 0x01); // Bit 8
}
}
108 changes: 108 additions & 0 deletions Testing_Library/MotorInterfaceTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import sys
import serial
import time
import threading


sys.path.append('/home/solarcar/solarcar/HiL_Testing/Server')

THROTTLE_ADDR = 0x2F
REGEN_ADDR = 0x2E
THROTTLE_START_BYTE = b'\xFF'
REGEN_START_BYTE = b'\xFE'
'''
while True:
byte_read = ser.read(1)
if byte_read == b'\xFF':
data = ser.read(3) # Read 3 bytes for two 9-bit values
if len(data) == 3:
# Unpack two 9-bit values from 3 bytes
# Byte 0: throttle[7:0]
# Byte 1: throttle[8] in bit 0, regen[6:0] in bits 7:1
# Byte 2: regen[8:7] in bits 1:0

# Extracting throttle and regen
throttle = data[0] | ((data[1] & 0x01) << 8)
regen = ((data[1] >> 1) & 0x7F) | ((data[2] & 0x03) << 7)

print(f"Throttle: {throttle}%) | Regen: {regen})")
else:
print(f"Incomplete data: got {len(data)} bytes instead of 3")
elif byte_read:
print(f"Unexpected byte:{byte_read}")
'''

class MotorInterfaceTest:
"""API for test Motor interface"""
def __init__(self, port='/dev/ttyACM0', baudrate=9600):
self.stop_thread = False
self.mru_throttle = None
self.mru_regen = None
self.THROTTLE_ADDR = 0x2F
self.REGEN_ADDR = 0x2E
self.Lock = threading.Lock()
self.ser = serial.Serial(
port=port,
baudrate=baudrate,
timeout=1
)
print("Waiting for Arduino to start...")
time.sleep(2) # Give Arduino time to boot
self.startReadThread()


#Run infinite thread to read in Throttle and Regen Values
def startReadThread(self):
read_thread = threading.Thread(target=self.readThrottleAndRegen)
read_thread.daemon = True
try:
read_thread.start()
except ValueError as e:
print(f"Exception")
def readThrottleAndRegen(self):
while(not self.stop_thread):
with self.Lock:
try:
byte_read = self.ser.read(1)
except serial.SerialException as e:
return
if byte_read == THROTTLE_START_BYTE: # Throttle start marker
data = self.ser.read(2) # Read 2 bytes for 9-bit value
if len(data) == 2:
# motorinterface.cpp sends (0x100 - throttle)
# first byte contains lower 8 bits, second byte LSB is the 9th bit
raw_from_arduino = data[0] | ((data[1] & 0x01) << 8)
self.mru_throttle = 256 - raw_from_arduino # PowerBoard sends (0x100 - throttle) over I2C
print(f"Throttle: {self.mru_throttle}")
elif byte_read == REGEN_START_BYTE:
data = self.ser.read(2) # Read 2 bytes for 9-bit value
if len(data) == 2:
# motorinterface.cpp sends (0x100 - regen)
# first byte contains lower 8 bits, second byte LSB is the 9th bit
raw_from_arduino = data[0] | ((data[1] & 0x01) << 8)
self.mru_regen = 256 - raw_from_arduino # PowerBoard sends (0x100 - regen) over I2C
print(f"Regen: {self.mru_regen}")

def get_throttle(self) -> float:
# Normalized between [0-1.0]
if self.mru_throttle is None:
return None
return self.mru_throttle/256.0

def get_throttle_raw(self) -> int:
# raw between [0-256]
if self.mru_throttle is None:
return None
return self.mru_throttle

def get_regen(self) -> float:
# Normalized between [0,1.0]
if self.mru_regen is None:
return None
return self.mru_regen/256.0

def get_regen_raw(self) -> int:
# raw between [0-256]
if self.mru_regen is None:
return None
return self.mru_regen
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#include <Wire.h>

#define THROTTLE_ADDR 0x2F
#define REGEN_ADDR 0x2E
#define THROTTLE_MARKER 0xFF
#define REGEN_MARKER 0xFE
#define SWAP_COMMAND 0x1FF

// Test Mode: 0 = Throttle only (listen on 0x2F), 1 = Regen only (listen on 0x2E).
// Must match PowerBoard main.cpp I2C_TEST_MODE: test_throttle → 0, test_regen → 1.
#define TEST_MODE 1

int currentMode = 1; // 0 = Throttle, 1 = Regen

void setup() {
Serial.begin(9600);
while (!Serial);
Serial.println("Arduino Starting...");

// Initialize based on test mode
if (TEST_MODE == 1) {
// Start in regen mode
Wire.begin(REGEN_ADDR);
currentMode = 1;
Serial.println("Initialized on REGEN_ADDR (0x2E)");
} else {
// Start in throttle mode (default)
Wire.begin(THROTTLE_ADDR);
currentMode = 0;
Serial.println("Initialized on THROTTLE_ADDR (0x2F)");
}

Wire.onReceive(receiveEvent);
Serial.println("Ready to receive I2C data");
Serial.print("TEST_MODE: ");
Serial.println(TEST_MODE);
}

void loop() {
delay(100);
}

void receiveEvent(int howMany) {
if (Wire.available() < 2) return;

// Read 2 bytes (16-bit value)
int highByte = Wire.read();
int lowByte = Wire.read();
int value = ((highByte << 8) | lowByte) & 0x1FF;

// Handle swap command
if (value == SWAP_COMMAND) {
//TODO:
return;
}
// Clamp value to valid range [0, 256]
if (value > 256) value = 256;


// Ensure value stays in valid range after inversion
if (value > 256) value = 256;
if (value < 0) value = 0;

// Send based on current mode
if (currentMode == 0) {
Serial.write(THROTTLE_MARKER);
Serial.write(value & 0xFF); // Low byte
Serial.write((value >> 8) & 0x01); // Bit 8
} else {
Serial.write(REGEN_MARKER);
Serial.write(value & 0xFF); // Low byte
Serial.write((value >> 8) & 0x01); // Bit 8
}
}
106 changes: 106 additions & 0 deletions Testing_Library/test_all_comms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import sys
sys.path.append('/home/solarcar/solarcar/HiL_Testing/Server')

from CANBus import CANBus
from CANMessage import CanMessage
from CANPi import readIn
import time
import serial
from gpioPins import DigitalInput, DigitalOutput
import random
import gpiozero
from gpioPins import AnalogOutput

# Global Pins for Synchronization
pinOut = DigitalOutput("2")
pinIn = DigitalInput("11")

# -----------Testing CAN Comms-----------

def testAddMethod(canBus : CANBus):
canBus.printCANBus()

input("WAITING")
#Example CANMessage
name = "BPSError"
id = 0x106
signals = {"internal_communications_fault" : 1}
timestamp = 1.0
can_message = CanMessage(name, id, signals, timestamp)
canBus.addToCANBus(can_message)
canBus.printCANBus()

def testReadThread(canBus : CANBus):
time.sleep(1) # Wait for Nucleo to send CAN
counter = 0
while (counter < 5):
print(f'Reading number {counter}')
canBus.printCANBus()
time.sleep(0.5)
counter = counter + 1

#Send BPSError to Nucleo (currently turn LD2 off and on)
def testWrite(canBus : CANBus):
#Example CANMessage
name = "BPSError"
id = 0x106
signals = {"internal_communications_fault" : 1}
timestamp = 1.0
can_message = CanMessage(name, id, signals, timestamp)
counter = 0
canBus.sendMessage(can_message)

def testCAN():
canBus = CANBus()
testWrite(canBus)

pinOut.value = 0 # Block nucleo from proceeding
# Check that nucleo modified pin to 1 to switch to receiving
while pinIn.value == 0:
pinOut.value = 0
time.sleep(0.1)

testReadThread(canBus)

#----------------Testing Analog Comms---------------

#Test case without the wrapper api
def send_to_nucleo():
pinOut = gpiozero.PWMOutputDevice("GPIO5", frequency=5000)

#Sending analog value to nucleo
while True:
pinOut.value = 0.64
print(f"SENDING {pinOut.value}")
#pinOut.pulse() #Useful function to sweep all ranges

time.sleep(2)

#Test the wrapper api
def test_analog_class(pinOut):
value = round(random.uniform(1.0, 100.0), 2) # Generate random 2 d.p. float

pinOut.write_voltage(value)
print(f'READING IN PIN VALUE: {pinOut.read()}')

def testAnalog():
pinOut = AnalogOutput("5")
test_analog_class(pinOut)

if __name__ == "__main__":
# Wait for nucleo
while pinIn.value == 1:
pinOut.value = 0
time.sleep(0.1)

pinOut.value = 1
testCAN()

# Make sure finished sending
while pinIn.value == 1:
pinOut.value = 0
print("THERE WAS A SYNC ERROR WITH RECEIVING CAN")
time.sleep(0.1)

pinOut.value = 1
testAnalog()