-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathxeon_stream_headless.py
More file actions
164 lines (140 loc) · 5.43 KB
/
xeon_stream_headless.py
File metadata and controls
164 lines (140 loc) · 5.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import socket
import cv2
import numpy as np
import logging
import struct
import time
from flask import Flask, Response
from threading import Thread, Lock
import io
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Configuration variables (match Pi)
CONFIG = {
'HOST': '', # Bind to all interfaces for TCP
'PORT_STREAM': 5001, # Port for video stream
'BUFFER_SIZE': 4096, # Socket buffer size
'FRAME_TIMEOUT': 1.0, # Seconds before dropping frame
'WEB_PORT': 8080 # Port for Flask web server
}
# Initialize Flask app
app = Flask(__name__)
# Store latest edge-detected frames in memory with thread-safe lock
latest_frames = {0: None, 1: None} # cam_id: edge-detected frame
frame_lock = Lock()
def receive_frame(client):
"""Receives a frame with header (cam_id, frame_size) and decodes it."""
try:
# Receive header (8 bytes: 4 for cam_id, 4 for frame_size)
header = client.recv(8)
if len(header) != 8:
logging.warning("Incomplete header received")
return None, None
cam_id, frame_size = struct.unpack('!II', header)
logging.debug(f"Received header: cam_id={cam_id}, frame_size={frame_size}")
# Receive frame data
frame_data = b''
bytes_received = 0
while bytes_received < frame_size:
chunk = client.recv(min(CONFIG['BUFFER_SIZE'], frame_size - bytes_received))
if not chunk:
logging.warning("Connection closed during frame receive")
return None, None
frame_data += chunk
bytes_received += len(chunk)
# Decode JPEG to OpenCV image
frame_array = np.frombuffer(frame_data, dtype=np.uint8)
frame = cv2.imdecode(frame_array, cv2.IMREAD_COLOR)
if frame is None:
logging.error(f"Failed to decode frame for cam {cam_id}")
return None, None
return cam_id, frame
except Exception as e:
logging.error(f"Error receiving frame: {e}")
return None, None
def apply_edge_detection(frame):
"""Applies Canny edge detection to the frame."""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 100, 200) # Thresholds for edge detection
return edges
def generate_mjpeg_stream(cam_id):
"""Generates MJPEG stream for a specific camera."""
while True:
with frame_lock:
frame = latest_frames.get(cam_id)
if frame is not None:
# Encode frame as JPEG
ret, jpeg = cv2.imencode('.jpg', frame)
if ret:
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + jpeg.tobytes() + b'\r\n')
time.sleep(0.1) # Limit refresh rate to reduce CPU load
@app.route('/')
def index():
"""Simple HTML page to display both camera streams."""
return '''
<html>
<body>
<h1>Stereo Camera Edge Detection</h1>
<h2>Camera 0</h2>
<img src="/video_feed/0" width="320">
<h2>Camera 1</h2>
<img src="/video_feed/1" width="320">
</body>
</html>
'''
@app.route('/video_feed/<int:cam_id>')
def video_feed(cam_id):
"""Serves MJPEG stream for a specific camera."""
return Response(generate_mjpeg_stream(cam_id),
mimetype='multipart/x-mixed-replace; boundary=frame')
def run_flask():
"""Runs Flask web server in a separate thread."""
app.run(host='0.0.0.0', port=CONFIG['WEB_PORT'], threaded=True)
def main():
"""Main loop to receive and process stereo stream, updating web frames."""
# Start Flask server in a separate thread
flask_thread = Thread(target=run_flask)
flask_thread.daemon = True
flask_thread.start()
logging.info(f"Flask web server started at http://0.0.0.0:{CONFIG['WEB_PORT']}")
# Create TCP server socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((CONFIG['HOST'], CONFIG['PORT_STREAM']))
server.listen(1)
server.settimeout(10.0)
logging.info(f"Listening for stream on port {CONFIG['PORT_STREAM']}...")
try:
# Accept connection from Pi
client, addr = server.accept()
logging.info(f"Connected to Pi at {addr}")
# Main receiving loop
last_frame_time = time.time()
while True:
cam_id, frame = receive_frame(client)
if frame is None or cam_id is None:
logging.warning("Skipping invalid frame")
continue
# Apply edge detection
edges = apply_edge_detection(frame)
# Update latest frame in memory (thread-safe)
with frame_lock:
latest_frames[cam_id] = edges
logging.debug(f"Updated edge-detected frame for cam {cam_id}")
# Check for timeout
if time.time() - last_frame_time > CONFIG['FRAME_TIMEOUT']:
logging.warning("Frame timeout, possible lag")
last_frame_time = time.time()
except Exception as e:
logging.error(f"Streaming error: {e}")
finally:
# Clean up
client.close()
server.close()
logging.info("Streaming stopped, resources released")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
logging.info("Streaming server terminated by user")