Skip to content

Commit 5699c83

Browse files
committed
Multithread jpeg compression in stream sink
1 parent c344989 commit 5699c83

2 files changed

Lines changed: 152 additions & 40 deletions

File tree

src/main/java/io/github/deltacv/visionloop/sink/CanvasViewportSink.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ public void run() {
9292
while (!Thread.interrupted()) {
9393
MatRecycler.RecyclableMat frame = frames.poll();
9494
if (frame == null) {
95+
Thread.yield();
9596
continue;
9697
}
9798

streaming/src/main/java/io/github/deltacv/visionloop/sink/MjpegHttpStreamSink.java

Lines changed: 151 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.github.deltacv.visionloop.tj.TJLoader;
66
import io.javalin.Javalin;
77
import io.javalin.http.Handler;
8+
import org.eclipse.jetty.io.EofException;
89
import org.firstinspires.ftc.robotcore.internal.collections.EvictingBlockingQueue;
910
import org.jetbrains.skia.impl.BufferUtil;
1011
import org.libjpegturbo.turbojpeg.TJ;
@@ -13,11 +14,14 @@
1314
import org.opencv.imgproc.Imgproc;
1415
import org.openftc.easyopencv.MatRecycler;
1516

17+
import java.io.EOFException;
1618
import java.io.OutputStream;
1719
import java.nio.ByteBuffer;
1820
import java.util.HashMap;
19-
import java.util.concurrent.ArrayBlockingQueue;
20-
import java.util.concurrent.Executors;
21+
import java.util.Map;
22+
import java.util.Queue;
23+
import java.util.concurrent.*;
24+
import java.util.concurrent.atomic.AtomicBoolean;
2125

2226
/**
2327
* A receiver that streams MJPEG video over HTTP.
@@ -41,7 +45,9 @@ public class MjpegHttpStreamSink extends CanvasViewportSink {
4145
private static final byte[] contentLengthBytes = ("Content-Length: ").getBytes();
4246
private static final byte[] crlfBytes = "\r\n\r\n".getBytes();
4347

44-
private static final int QUEUE_SIZE = 4;
48+
private static final int QUEUE_SIZE = 5;
49+
private static final int REUSABLE_BUFFER_QUEUE_SIZE = 10;
50+
private static final int COMPRESSION_THREAD_POOL_SIZE = 4; // Number of threads for JPEG compression
4551

4652
private final int port;
4753
private final int quality;
@@ -50,12 +56,34 @@ public class MjpegHttpStreamSink extends CanvasViewportSink {
5056
private volatile boolean getHandlerCalled = false;
5157

5258
private final EvictingBlockingQueue<MatRecycler.RecyclableMat> frames = new EvictingBlockingQueue<>(new ArrayBlockingQueue<>(QUEUE_SIZE));
53-
private final MatRecycler matRecycler = new MatRecycler(QUEUE_SIZE + 2);
59+
private final MatRecycler matRecycler = new MatRecycler(QUEUE_SIZE + 4);
60+
61+
private final Map<Integer, Queue<byte[]>> reusableBuffers = new ConcurrentHashMap<>();
62+
63+
// Thread pool for JPEG compression
64+
private final ExecutorService compressionThreadPool;
65+
66+
// Queue for compressed frames
67+
private final BlockingQueue<CompressedFrame> compressedFrames = new ArrayBlockingQueue<>(QUEUE_SIZE);
68+
private final AtomicBoolean isRunning = new AtomicBoolean(true);
5469

5570
static {
5671
TJLoader.load();
5772
}
5873

74+
/**
75+
* Represents a compressed JPEG frame ready to be sent to clients
76+
*/
77+
private static class CompressedFrame {
78+
final byte[] data;
79+
final int size;
80+
81+
CompressedFrame(byte[] data, int size) {
82+
this.data = data;
83+
this.size = size;
84+
}
85+
}
86+
5987
/**
6088
* Creates a new MjpegHttpStreamerReceiver with the specified port and stream size.
6189
* @param port The port to start the Javalin server on. Specify 0 to use a random port.
@@ -77,8 +105,100 @@ public MjpegHttpStreamSink(int port, Size streamSize, int quality, String descri
77105
this.quality = quality;
78106
this.port = port;
79107

108+
// Create fixed thread pool for compression
109+
this.compressionThreadPool = Executors.newFixedThreadPool(COMPRESSION_THREAD_POOL_SIZE,
110+
r -> {
111+
Thread t = new Thread(r, "JPEG-Compression-Thread");
112+
t.setDaemon(true);
113+
return t;
114+
});
115+
80116
// the frame queue will automatically recycle the Mat objects
81-
frames.setEvictAction(MatRecycler.RecyclableMat::returnMat);
117+
frames.setEvictAction(mat -> {
118+
// Submit compression task when a new frame is available
119+
submitCompressionTask(mat);
120+
// Return the mat to the recycler
121+
mat.returnMat();
122+
});
123+
}
124+
125+
/**
126+
* Submits a compression task to the thread pool
127+
* @param frame The frame to compress
128+
*/
129+
private void submitCompressionTask(MatRecycler.RecyclableMat frame) {
130+
if (!isRunning.get()) return;
131+
132+
MatRecycler.RecyclableMat frameCopy = matRecycler.takeMatOrNull();
133+
134+
if(frameCopy == null) {
135+
return;
136+
}
137+
138+
frame.copyTo(frameCopy);
139+
140+
compressionThreadPool.submit(() -> {
141+
try {
142+
// Create a copy of the frame data to work with
143+
byte[] frameData = getOrCreateReusableBuffer((int) frame.total() * frame.channels());
144+
frameCopy.get(0, 0, frameData);
145+
frameCopy.returnMat();
146+
147+
TJCompressor compressor = new TJCompressor();
148+
try {
149+
compressor.setJPEGQuality(quality);
150+
compressor.setSubsamp(TJ.SAMP_440);
151+
compressor.setSourceImage(frameData, frame.width(), 0, frame.height(), TJ.PF_BGR);
152+
153+
byte[] buffer = getOrCreateReusableBuffer(2_000_000); // Pre-allocate buffer
154+
compressor.compress(buffer, TJ.FLAG_FASTDCT);
155+
156+
returnReusableBuffer(frameData);
157+
158+
int compressedSize = (int) compressor.getCompressedSize();
159+
160+
// Add compressed frame to the output queue if we're still running
161+
if (isRunning.get()) {
162+
CompressedFrame compressedFrame = new CompressedFrame(buffer, compressedSize);
163+
compressedFrames.offer(compressedFrame, 100, TimeUnit.MILLISECONDS);
164+
}
165+
} finally {
166+
compressor.close();
167+
}
168+
} catch (Exception e) {
169+
if (isRunning.get()) {
170+
System.err.println("Error compressing frame: " + e.getMessage());
171+
}
172+
}
173+
});
174+
}
175+
176+
177+
public void returnReusableBuffer(byte[] buffer) {
178+
synchronized (reusableBuffers) {
179+
Queue<byte[]> queue = reusableBuffers.get(buffer.length);
180+
if (queue != null) {
181+
queue.offer(buffer);
182+
} else {
183+
System.err.println("Buffer pool for size " + buffer.length + " is null");
184+
}
185+
}
186+
}
187+
188+
public byte[] getOrCreateReusableBuffer(int size) {
189+
synchronized (reusableBuffers) {
190+
reusableBuffers.computeIfAbsent(size, k -> {
191+
Queue<byte[]> queue = new ArrayBlockingQueue<>(REUSABLE_BUFFER_QUEUE_SIZE);
192+
for (int i = 0; i < REUSABLE_BUFFER_QUEUE_SIZE; i++) {
193+
queue.offer(new byte[size]);
194+
}
195+
return queue;
196+
});
197+
198+
Queue<byte[]> queue = reusableBuffers.get(size);
199+
byte[] buffer = queue.poll();
200+
return (buffer != null) ? buffer : new byte[size];
201+
}
82202
}
83203

84204
/**
@@ -95,7 +215,7 @@ public MjpegHttpStreamSink(int port, Size streamSize, int quality, String descri
95215
*/
96216
public Handler takeHandler() {
97217
if(getHandlerCalled) {
98-
throw new IllegalStateException("getHandler can only be called once");
218+
throw new IllegalStateException("takeHandler can only be called once. Has init() already been called?");
99219
}
100220

101221
getHandlerCalled = true;
@@ -107,43 +227,17 @@ public Handler takeHandler() {
107227
// get the output stream
108228
OutputStream outputStream = ctx.res().getOutputStream();
109229

110-
// reusable instances
111-
byte[] frameArray = new byte[0];
112-
HashMap<Integer, byte[]> bufMap = new HashMap<>();
113-
114-
byte[] bufArray = new byte[2_000_000];
115-
116230
byte[] contentLengthNumberBytes = new byte[16];
117231
int lastContentLengthNumber = 0;
118232

119-
TJCompressor tj = new TJCompressor();
120-
tj.setJPEGQuality(quality);
121-
tj.setSubsamp(TJ.SAMP_440);
122-
123233
try {
124-
while (!Thread.interrupted()) {
125-
// peek at the frame queue
126-
MatRecycler.RecyclableMat frame = frames.peek();
234+
while (!Thread.interrupted() && isRunning.get()) {
235+
// Get compressed frame from the queue
236+
CompressedFrame frame = compressedFrames.poll(500, TimeUnit.MILLISECONDS);
127237

128238
if (frame != null) {
129239
try {
130-
if (frameArray.length < frame.total()) {
131-
// allocate a new buffer if the existing one is too small
132-
frameArray = new byte[(int) frame.width() * (int) frame.height() * 3];
133-
tj.setSourceImage(frameArray, frame.width(), 0, frame.height(), TJ.PF_BGR);
134-
}
135-
136-
frame.get(0, 0, frameArray);
137-
138-
if (bufArray.length < tj.getCompressedSize()) {
139-
// allocate a new buffer if the existing one is too small
140-
bufArray = new byte[(int) tj.getCompressedSize()];
141-
}
142-
143-
// actual one-liner JPEG encoding magic
144-
tj.compress(bufArray, TJ.FLAG_FASTDCT);
145-
146-
int contentLength = (int) tj.getCompressedSize();
240+
int contentLength = frame.size;
147241

148242
// Avoid string conversions: preallocate and reuse byte buffer
149243
if (lastContentLengthNumber != contentLength) {
@@ -159,17 +253,21 @@ public Handler takeHandler() {
159253
outputStream.write(contentLengthBytes);
160254
outputStream.write(contentLengthNumberBytes);
161255
outputStream.write(crlfBytes);
162-
outputStream.write(bufArray, 0, contentLength);
256+
outputStream.write(frame.data, 0, contentLength);
163257

164258
outputStream.flush();
165-
// no need to recycle the Mat, as the frame queue will do it
259+
} catch (EOFException e) {
260+
// ignore
261+
break;
166262
} catch (Exception e) {
167263
throw new RuntimeException(e);
264+
} finally {
265+
returnReusableBuffer(frame.data);
168266
}
169267
}
170268
}
171-
} finally {
172-
tj.close();
269+
} catch (InterruptedException e) {
270+
Thread.currentThread().interrupt();
173271
}
174272
};
175273
}
@@ -241,13 +339,26 @@ public boolean pollViewportTapped() {
241339

242340
@Override
243341
public void close() {
342+
isRunning.set(false);
244343
super.close();
245344

345+
// Stop the compression thread pool
346+
compressionThreadPool.shutdown();
347+
try {
348+
if (!compressionThreadPool.awaitTermination(500, TimeUnit.MILLISECONDS)) {
349+
compressionThreadPool.shutdownNow();
350+
}
351+
} catch (InterruptedException e) {
352+
compressionThreadPool.shutdownNow();
353+
Thread.currentThread().interrupt();
354+
}
355+
246356
// flush flush flush
247357
if(app != null) {
248358
app.stop();
249359
}
250360
frames.clear();
361+
compressedFrames.clear();
251362
matRecycler.releaseAll();
252363
}
253364
}

0 commit comments

Comments
 (0)