Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 23 additions & 22 deletions modules/target_tracking/stereo_node.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,50 @@
"""
Module for initializing and configuring the StereoDepth node.
This setup aligns the depth map to the RGB camera for spatial logic.
Updated to depthai v3 API.
"""

import depthai as dai


def create_stereo_depth(pipeline: dai.Pipeline) -> dai.node.StereoDepth:
"""
Creates the StereoDepth node and links it to the Mono cameras.
Creates the StereoDepth node and links it to the stereo cameras.

The pipeline must be created with a device:
device = dai.Device()
with dai.Pipeline(device) as pipeline:
stereo = create_stereo_depth(pipeline)

Args:
pipeline (dai.Pipeline): The DepthAI pipeline object.
pipeline (dai.Pipeline): The DepthAI pipeline object (created with device).

Returns:
dai.node.StereoDepth: The configured stereo node.
"""
# --- 1. Define Sources ---
mono_left = pipeline.create(dai.node.MonoCamera)
mono_right = pipeline.create(dai.node.MonoCamera)

# Configure the hardware sockets (Left vs Right)
mono_left.setBoardSocket(dai.CameraBoardSocket.LEFT)
mono_right.setBoardSocket(dai.CameraBoardSocket.RIGHT)
# --- 1. Define Sources ) ---
cam_left = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_B)
cam_right = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_C)

# Set Resolution (400p is standard)
# Breaking line to satisfy flake8 line length limit
mono_left.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
mono_right.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)

# --- 2. Define the Processor ---
# --- 2. Define the Processor ---
stereo = pipeline.create(dai.node.StereoDepth)

# --- 3. Configuration ---
# CRITICAL: Align depth to RGB (bc mono cams are 20 pixels off)
stereo.setDepthAlign(dai.CameraBoardSocket.RGB)
# Enable rectification
stereo.setRectification(True)

# Align depth to RGB camera (CAM_A) for spatial logic
stereo.setDepthAlign(dai.CameraBoardSocket.CAM_A)

# Improve quality
stereo.setSubpixel(True)
stereo.setLeftRightCheck(True) # Removes ghost pixels at edges
# Change to True if <50cm need tracking needed
# Change to True if <50cm tracking needed
stereo.setExtendedDisparity(False)

# --- 4. Linking ---
mono_left.out.link(stereo.left)
mono_right.out.link(stereo.right)
left_out = cam_left.requestOutput((640, 400), dai.ImgFrame.Type.GRAY8)
right_out = cam_right.requestOutput((640, 400), dai.ImgFrame.Type.GRAY8)

left_out.link(stereo.left)
right_out.link(stereo.right)

return stereo
129 changes: 129 additions & 0 deletions utilities/data_collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""
Data Collection Utility for OAK-D
Captures synchronized Left, Right, and RGB images for stereo depth calibration and testing.

Usage:
python utilities/data_collector.py --interval 2 --out dataset_01
"""

import cv2
import depthai as dai
import time
import os
import argparse
import shutil


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"-i", "--interval", type=float, default=3.0, help="Time in seconds between captures"
)
parser.add_argument("-o", "--out", type=str, default="dataset", help="Output directory path")
return parser.parse_args()


def main() -> None:
args = parse_args()

# Create unique session folder to prevent overwriting
timestamp_start = int(time.time())
session_dir = f"{args.out}_{timestamp_start}"

for stream in ["left", "right", "rgb"]:
path = os.path.join(session_dir, stream)
os.makedirs(path, exist_ok=True)

print(f"Starting Data Collector... Saving to '{session_dir}'")

# In depthai v3.x, device is created first
device = dai.Device()

with dai.Pipeline(device) as pipeline:
# Define Camera Sources using the new Camera node
camLeft = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_B) # Left mono
camRight = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_C) # Right mono
camRgb = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_A) # RGB center

# Request outputs and create output queues
qLeft = camLeft.requestOutput((640, 400), dai.ImgFrame.Type.GRAY8).createOutputQueue()
qRight = camRight.requestOutput((640, 400), dai.ImgFrame.Type.GRAY8).createOutputQueue()
qRgb = camRgb.requestOutput((640, 400), dai.ImgFrame.Type.BGR888p).createOutputQueue()

pipeline.start()

count = 0
last_capture_time = time.time()
last_capture_display_time = 0 # Track when to show "CAPTURED!" message

# Store the latest frames from each camera
latestLeft = None
latestRight = None
latestRgb = None

while pipeline.isRunning():
# Non-blocking calls to get frames - update latest if available
inLeft = qLeft.tryGet()
inRight = qRight.tryGet()
inRgb = qRgb.tryGet()

# Update latest frames when new ones arrive
if inLeft:
latestLeft = inLeft
if inRight:
latestRight = inRight
if inRgb:
latestRgb = inRgb

if latestRgb:
frameRgb = latestRgb.getCvFrame().copy() # Copy for drawing

# --- VISUAL COUNTDOWN LOGIC ---
time_since_last = time.time() - last_capture_time
time_remaining = args.interval - time_since_last
time_since_capture_display = time.time() - last_capture_display_time

if time_since_capture_display < 0.5:
# Show "CAPTURED!" for 0.5 seconds after capture
text = "CAPTURED!"
color = (0, 255, 0) # Green
elif time_remaining > 0:
# Draw countdown on screen
text = f"Capture in: {int(time_remaining) + 1}"
color = (0, 255, 255) # Yellow
else:
text = "CAPTURING..."
color = (0, 0, 255) # Red

cv2.putText(frameRgb, text, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
cv2.imshow("Data Collector (Preview)", frameRgb)

# Check if we have frames from all cameras AND it's time to capture
if (
latestLeft
and latestRight
and latestRgb
and (time.time() - last_capture_time > args.interval)
):
timestamp = int(time.time() * 1000)

# Save frames
cv2.imwrite(f"{session_dir}/left/{timestamp}.png", latestLeft.getCvFrame())
cv2.imwrite(f"{session_dir}/right/{timestamp}.png", latestRight.getCvFrame())
cv2.imwrite(f"{session_dir}/rgb/{timestamp}.png", latestRgb.getCvFrame())

print(f"[{count}] Saved set at {timestamp}ms")
count += 1
last_capture_time = time.time() # Reset timer
last_capture_display_time = time.time() # Trigger "CAPTURED!" display

key = cv2.waitKey(10) # Increased to 10ms for better GUI responsiveness
if key == ord("q"):
break

print("Data collection finished.")
cv2.destroyAllWindows()


if __name__ == "__main__":
main()
Loading