Skip to content

Commit d651be0

Browse files
committed
Use per-camera writer threads
1 parent 0c91955 commit d651be0

2 files changed

Lines changed: 129 additions & 72 deletions

File tree

ScopeOneCore/internal/RecordingManager.h

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,11 @@ class RecordingManager : public QObject
137137
Source source{Source::PreviewStream};
138138
};
139139
struct WriteTask {
140-
QString cameraId;
141140
RecordingFrame frame;
142141
};
143142

144143
struct CameraOutput {
144+
QString cameraId;
145145
QString rawPath;
146146
QString frameInfoPath;
147147
QString metadataFileName;
@@ -150,6 +150,11 @@ class RecordingManager : public QObject
150150
int width{0};
151151
int height{0};
152152
int bits{0};
153+
std::deque<WriteTask> writeQueue;
154+
mutable std::mutex queueMutex;
155+
std::condition_variable writeCondition;
156+
std::thread writerThread;
157+
bool stopRequested{false};
153158
};
154159
struct SessionState {
155160
std::shared_ptr<RecordingSessionData> activeSession;
@@ -158,11 +163,7 @@ class RecordingManager : public QObject
158163
size_t recordedMaxBytes{16ull * 1024 * 1024 * 1024};
159164
size_t pendingWriteBytes{0};
160165
QHash<QString, std::shared_ptr<CameraOutput>> cameraOutputs;
161-
std::deque<WriteTask> writeQueue;
162166
mutable std::mutex writeMutex;
163-
std::condition_variable writeCondition;
164-
std::thread writerThread;
165-
bool writerStopRequested{false};
166167
QString writerError;
167168
RecordingWriterStatus status;
168169
};
@@ -215,8 +216,9 @@ class RecordingManager : public QObject
215216
void emitProgress();
216217
bool startStreamingOutputs(const CapturePlan& plan);
217218
void stopStreamingOutputs();
218-
void writerLoop();
219-
bool writeTask(const WriteTask& task, QString& errorMessage);
219+
void requestWriterStop();
220+
void writerLoop(const std::shared_ptr<CameraOutput>& output);
221+
bool writeTask(CameraOutput& output, const WriteTask& task, QString& errorMessage);
220222
void writeFrameInfoHeader(CameraOutput& output);
221223
QByteArray buildFrameInfoLine(const QString& cameraId, const RecordingFrame& frame) const;
222224
QString formatName(RecordingFormat format) const;

ScopeOneCore/src/RecordingManager.cpp

Lines changed: 120 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -382,9 +382,9 @@ void RecordingManager::emitWriterStatus()
382382
status = m_writerState.status;
383383
status.setPendingWriteBytes(static_cast<qint64>(m_writerState.pendingWriteBytes));
384384
status.setMaxPendingWriteBytes(static_cast<qint64>(m_writerState.recordedMaxBytes));
385-
}
386-
if (m_sessionState.activeSession) {
387-
m_sessionState.activeSession->setWriterStatusSnapshot(status);
385+
if (m_sessionState.activeSession) {
386+
m_sessionState.activeSession->setWriterStatusSnapshot(status);
387+
}
388388
}
389389
emit writerStatusChanged(status);
390390
}
@@ -564,6 +564,7 @@ bool RecordingManager::startStreamingOutputs(const CapturePlan& plan)
564564

565565
for (const QString& cameraId : m_captureState.activeCameraIds) {
566566
auto output = std::make_shared<CameraOutput>();
567+
output->cameraId = cameraId;
567568
output->rawPath = buildSessionFilePath(plan.saveDir,
568569
plan.baseName,
569570
cameraId,
@@ -598,32 +599,61 @@ bool RecordingManager::startStreamingOutputs(const CapturePlan& plan)
598599
{
599600
std::lock_guard<std::mutex> lock(m_writerState.writeMutex);
600601
m_writerState.pendingWriteBytes = 0;
601-
m_writerState.writeQueue.clear();
602-
m_writerState.writerStopRequested = false;
603602
}
604603
emitBufferUsageChanged(0);
605-
m_writerState.writerThread = std::thread([this]() { writerLoop(); });
604+
for (auto it = m_writerState.cameraOutputs.begin(); it != m_writerState.cameraOutputs.end(); ++it) {
605+
const auto& output = it.value();
606+
if (!output) {
607+
continue;
608+
}
609+
{
610+
std::lock_guard<std::mutex> lock(output->queueMutex);
611+
output->writeQueue.clear();
612+
output->stopRequested = false;
613+
}
614+
output->writerThread = std::thread([this, output]() {
615+
writerLoop(output);
616+
});
617+
}
606618
setWriterStatus(RecordingWriterPhase::Writing);
607619
return true;
608620
}
609621

610-
void RecordingManager::stopStreamingOutputs()
622+
void RecordingManager::requestWriterStop()
611623
{
624+
QList<std::shared_ptr<CameraOutput>> outputs;
612625
{
613626
std::lock_guard<std::mutex> lock(m_writerState.writeMutex);
614-
m_writerState.writerStopRequested = true;
627+
outputs = m_writerState.cameraOutputs.values();
615628
}
616-
m_writerState.writeCondition.notify_all();
629+
for (const auto& output : outputs) {
630+
if (!output) {
631+
continue;
632+
}
633+
{
634+
std::lock_guard<std::mutex> lock(output->queueMutex);
635+
output->stopRequested = true;
636+
}
637+
output->writeCondition.notify_all();
638+
}
639+
}
617640

618-
if (m_writerState.writerThread.joinable()) {
619-
m_writerState.writerThread.join();
641+
void RecordingManager::stopStreamingOutputs()
642+
{
643+
QList<std::shared_ptr<CameraOutput>> outputs;
644+
{
645+
std::lock_guard<std::mutex> lock(m_writerState.writeMutex);
646+
outputs = m_writerState.cameraOutputs.values();
620647
}
648+
requestWriterStop();
621649

622-
for (auto it = m_writerState.cameraOutputs.begin(); it != m_writerState.cameraOutputs.end(); ++it) {
623-
auto output = it.value();
650+
for (const auto& output : outputs) {
624651
if (!output) {
625652
continue;
626653
}
654+
if (output->writerThread.joinable()) {
655+
output->writerThread.join();
656+
}
627657
if (output->backend) {
628658
auto* backend = reinterpret_cast<SaveBackend*>(output->backend);
629659
delete backend;
@@ -633,51 +663,54 @@ void RecordingManager::stopStreamingOutputs()
633663
output->frameInfoFile.flush();
634664
output->frameInfoFile.close();
635665
}
666+
{
667+
std::lock_guard<std::mutex> lock(output->queueMutex);
668+
output->writeQueue.clear();
669+
output->stopRequested = false;
670+
}
636671
}
637-
m_writerState.cameraOutputs.clear();
638-
639672
{
640673
std::lock_guard<std::mutex> lock(m_writerState.writeMutex);
641-
m_writerState.writeQueue.clear();
674+
m_writerState.cameraOutputs.clear();
642675
m_writerState.pendingWriteBytes = 0;
643-
m_writerState.writerStopRequested = false;
644676
}
645677
emitBufferUsageChanged(0);
646678
emitWriterStatus();
647679
}
648680

649-
void RecordingManager::writerLoop()
681+
void RecordingManager::writerLoop(const std::shared_ptr<CameraOutput>& output)
650682
{
651-
// Drain queued frames on the writer thread
683+
// Drain queued frames for one camera on its dedicated writer thread
684+
const QString cameraId = output ? output->cameraId : QString{};
652685
while (true) {
653686
WriteTask task;
654687
{
655-
std::unique_lock<std::mutex> lock(m_writerState.writeMutex);
656-
m_writerState.writeCondition.wait(lock, [this]() {
657-
return m_writerState.writerStopRequested || !m_writerState.writeQueue.empty();
688+
std::unique_lock<std::mutex> lock(output->queueMutex);
689+
output->writeCondition.wait(lock, [&output]() {
690+
return output->stopRequested || !output->writeQueue.empty();
658691
});
659-
if (m_writerState.writeQueue.empty()) {
660-
if (m_writerState.writerStopRequested) {
692+
if (output->writeQueue.empty()) {
693+
if (output->stopRequested) {
661694
break;
662695
}
663696
continue;
664697
}
665-
task = std::move(m_writerState.writeQueue.front());
666-
m_writerState.writeQueue.pop_front();
698+
task = std::move(output->writeQueue.front());
699+
output->writeQueue.pop_front();
667700
}
668701

669702
QString errorMessage;
670-
if (!writeTask(task, errorMessage)) {
703+
if (!writeTask(*output, task, errorMessage)) {
671704
{
672705
std::lock_guard<std::mutex> lock(m_writerState.writeMutex);
673706
if (m_writerState.writerError.isEmpty()) {
674707
m_writerState.writerError = errorMessage.isEmpty()
675708
? QStringLiteral("Unknown recording writer error")
676709
: errorMessage;
677710
}
678-
m_writerState.writerStopRequested = true;
679711
m_writerState.status.setPhase(RecordingWriterPhase::Failed, m_writerState.writerError);
680712
}
713+
requestWriterStop();
681714
emitWriterStatus();
682715
QMetaObject::invokeMethod(this, [this]() {
683716
if (m_captureState.isRecording) {
@@ -693,69 +726,62 @@ void RecordingManager::writerLoop()
693726
m_writerState.pendingWriteBytes -= static_cast<size_t>(task.frame.rawData.size());
694727
m_writerState.status.addWrittenFrames(1);
695728
pendingWriteBytes = static_cast<qint64>(m_writerState.pendingWriteBytes);
729+
if (m_sessionState.activeSession) {
730+
const qint64 framesWritten =
731+
m_sessionState.activeSession->ensureFileManifest(cameraId).framesWritten + 1;
732+
m_sessionState.activeSession->setOutputFramesWritten(cameraId, framesWritten);
733+
}
696734
}
697735
emitBufferUsageChanged(pendingWriteBytes);
698-
if (m_sessionState.activeSession) {
699-
const qint64 framesWritten =
700-
m_sessionState.activeSession->ensureFileManifest(task.cameraId).framesWritten + 1;
701-
m_sessionState.activeSession->setOutputFramesWritten(task.cameraId, framesWritten);
702-
}
703736
emitWriterStatus();
704737
}
705738
}
706739

707-
bool RecordingManager::writeTask(const WriteTask& task, QString& errorMessage)
740+
bool RecordingManager::writeTask(CameraOutput& output, const WriteTask& task, QString& errorMessage)
708741
{
709-
const auto it = m_writerState.cameraOutputs.constFind(task.cameraId);
710-
if (it == m_writerState.cameraOutputs.constEnd() || !it.value()) {
711-
errorMessage = QStringLiteral("Missing output for %1").arg(task.cameraId);
712-
return false;
713-
}
714-
715-
const auto output = it.value();
716-
auto* backend = reinterpret_cast<SaveBackend*>(output->backend);
742+
auto* backend = reinterpret_cast<SaveBackend*>(output.backend);
717743
if (!backend) {
718744
SaveBackend::TiffOptions tiffOpts;
719745
tiffOpts.useDeflate = m_captureState.enableCompression;
720746
tiffOpts.zipQuality = m_captureState.compressionLevel;
721747

722748
auto newBackend = std::make_unique<SaveBackend>();
723-
if (!newBackend->startStackRaw(output->rawPath,
749+
if (!newBackend->startStackRaw(output.rawPath,
724750
m_captureState.format,
725751
task.frame.width,
726752
task.frame.height,
727753
task.frame.bits,
728754
tiffOpts)) {
729755
errorMessage = QStringLiteral("Failed to open raw output for %1: %2")
730-
.arg(task.cameraId)
756+
.arg(output.cameraId)
731757
.arg(newBackend->lastError());
732758
return false;
733759
}
734-
output->backend = newBackend.release();
735-
output->width = task.frame.width;
736-
output->height = task.frame.height;
737-
output->bits = task.frame.bits;
738-
backend = reinterpret_cast<SaveBackend*>(output->backend);
739-
} else if (task.frame.width != output->width
740-
|| task.frame.height != output->height
741-
|| task.frame.bits != output->bits) {
742-
errorMessage = QStringLiteral("Frame format changed during recording for %1").arg(task.cameraId);
760+
output.backend = newBackend.release();
761+
output.width = task.frame.width;
762+
output.height = task.frame.height;
763+
output.bits = task.frame.bits;
764+
backend = reinterpret_cast<SaveBackend*>(output.backend);
765+
} else if (task.frame.width != output.width
766+
|| task.frame.height != output.height
767+
|| task.frame.bits != output.bits) {
768+
errorMessage = QStringLiteral("Frame format changed during recording for %1").arg(output.cameraId);
743769
return false;
744770
}
745771

746772
if (!backend->appendRaw(reinterpret_cast<const uchar*>(task.frame.rawData.constData()),
747773
task.frame.rawData.size(),
748-
buildImageDescriptionJson(output->metadataFileName))) {
774+
buildImageDescriptionJson(output.metadataFileName))) {
749775
errorMessage = QStringLiteral("Failed writing raw frame for %1: %2")
750-
.arg(task.cameraId)
776+
.arg(output.cameraId)
751777
.arg(backend->lastError());
752778
return false;
753779
}
754780

755-
if (output->frameInfoFile.isOpen()) {
756-
const QByteArray infoLine = buildFrameInfoLine(task.cameraId, task.frame);
757-
if (output->frameInfoFile.write(infoLine) != infoLine.size()) {
758-
errorMessage = QStringLiteral("Failed writing frame info for %1").arg(task.cameraId);
781+
if (output.frameInfoFile.isOpen()) {
782+
const QByteArray infoLine = buildFrameInfoLine(output.cameraId, task.frame);
783+
if (output.frameInfoFile.write(infoLine) != infoLine.size()) {
784+
errorMessage = QStringLiteral("Failed writing frame info for %1").arg(output.cameraId);
759785
return false;
760786
}
761787
}
@@ -987,24 +1013,53 @@ bool RecordingManager::enqueueFrame(const RecordingFrame& frame, const QString&
9871013
// Stop capture if the writer queue grows too large
9881014
const size_t frameBytes = static_cast<size_t>(frame.rawData.size());
9891015
qint64 pendingWriteBytes = 0;
1016+
std::shared_ptr<CameraOutput> output;
1017+
bool emitStatus = false;
9901018
{
9911019
std::lock_guard<std::mutex> lock(m_writerState.writeMutex);
992-
if (m_writerState.pendingWriteBytes + frameBytes > m_writerState.recordedMaxBytes) {
1020+
const auto it = m_writerState.cameraOutputs.constFind(cameraId);
1021+
if (it == m_writerState.cameraOutputs.constEnd() || !it.value()) {
1022+
if (m_writerState.writerError.isEmpty()) {
1023+
m_writerState.writerError = QStringLiteral("Missing output for %1").arg(cameraId);
1024+
}
1025+
m_writerState.status.setPhase(RecordingWriterPhase::Failed, m_writerState.writerError);
1026+
emitStatus = true;
1027+
} else if (m_writerState.pendingWriteBytes + frameBytes > m_writerState.recordedMaxBytes) {
9931028
if (m_writerState.writerError.isEmpty()) {
9941029
m_writerState.writerError = QStringLiteral("Recording write queue exceeded limit");
9951030
}
9961031
m_writerState.status.setPhase(RecordingWriterPhase::Failed, m_writerState.writerError);
997-
qWarning().noquote() << "Recording write queue full, stopping capture";
1032+
emitStatus = true;
1033+
} else {
1034+
output = it.value();
1035+
m_writerState.pendingWriteBytes += frameBytes;
1036+
pendingWriteBytes = static_cast<qint64>(m_writerState.pendingWriteBytes);
1037+
}
1038+
}
1039+
if (!output) {
1040+
if (emitStatus) {
1041+
qWarning().noquote() << "Recording write queue full or output missing, stopping capture";
9981042
QMetaObject::invokeMethod(this, [this]() { stop(); }, Qt::QueuedConnection);
9991043
emitWriterStatus();
1044+
}
1045+
return false;
1046+
}
1047+
{
1048+
std::lock_guard<std::mutex> lock(output->queueMutex);
1049+
if (output->stopRequested) {
1050+
qint64 revertedPendingBytes = 0;
1051+
{
1052+
std::lock_guard<std::mutex> stateLock(m_writerState.writeMutex);
1053+
m_writerState.pendingWriteBytes -= frameBytes;
1054+
revertedPendingBytes = static_cast<qint64>(m_writerState.pendingWriteBytes);
1055+
}
1056+
emitBufferUsageChanged(revertedPendingBytes);
10001057
return false;
10011058
}
1002-
m_writerState.writeQueue.push_back(WriteTask{cameraId, frame});
1003-
m_writerState.pendingWriteBytes += frameBytes;
1004-
pendingWriteBytes = static_cast<qint64>(m_writerState.pendingWriteBytes);
1059+
output->writeQueue.push_back(WriteTask{frame});
10051060
}
10061061
emitBufferUsageChanged(pendingWriteBytes);
1007-
m_writerState.writeCondition.notify_one();
1062+
output->writeCondition.notify_one();
10081063
emitWriterStatus();
10091064
return true;
10101065
}

0 commit comments

Comments
 (0)