Skip to content
129 changes: 121 additions & 8 deletions server/cmd/api/api/display.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"os/exec"
"strconv"
"strings"
"time"

nekooapi "github.com/m1k1o/neko/server/lib/oapi"
"github.com/onkernel/kernel-images/server/lib/logger"
oapi "github.com/onkernel/kernel-images/server/lib/oapi"
"github.com/onkernel/kernel-images/server/lib/recorder"
)

// PatchDisplay updates the display configuration. When require_idle
Expand Down Expand Up @@ -62,18 +64,29 @@ func (s *ApiService) PatchDisplay(ctx context.Context, req oapi.PatchDisplayRequ
requireIdle = *req.Body.RequireIdle
}

// Check if resize is safe (no active sessions or recordings)
// Check if resize is safe (no active live view sessions)
if requireIdle {
live := s.getActiveNekoSessions(ctx)
isRecording := s.anyRecordingActive(ctx)
resizableNow := (live == 0) && !isRecording

log.Info("checking if resize is safe", "live_sessions", live, "is_recording", isRecording, "resizable", resizableNow)

if !resizableNow {
if live > 0 {
log.Info("resize refused: live view or replay active", "live_sessions", live)
return oapi.PatchDisplay409JSONResponse{
ConflictErrorJSONResponse: oapi.ConflictErrorJSONResponse{
Message: "resize refused: live view or recording/replay active",
Message: "resize refused: live view or replay active",
},
}, nil
}

// Gracefully stop active recordings so the resize can proceed.
// They will be restarted (with new segment files) after the resize completes.
stopped, stopErr := s.stopActiveRecordings(ctx)
if len(stopped) > 0 {
defer s.restartRecordings(context.WithoutCancel(ctx), stopped)
}
if stopErr != nil {
log.Error("failed to stop recordings for resize", "error", stopErr)
return oapi.PatchDisplay500JSONResponse{
InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{
Message: fmt.Sprintf("failed to stop recordings for resize: %s", stopErr.Error()),
},
}, nil
}
Expand Down Expand Up @@ -361,6 +374,106 @@ func (s *ApiService) getCurrentResolution(ctx context.Context) (int, int, int, e
return width, height, refreshRate, nil
}

// stoppedRecordingInfo holds state captured from a recording that was stopped
// so it can be restarted after a display resize.
type stoppedRecordingInfo struct {
id string
params recorder.FFmpegRecordingParams
outputPath string
}

// stopActiveRecordings gracefully stops every recording that is currently in
// progress and deregisters them from the manager. It returns info needed to
// restart each recording later. Recordings that were successfully stopped are
// always included in the returned slice, even when a later recording fails to
// stop (so the caller can restart whatever was stopped).
func (s *ApiService) stopActiveRecordings(ctx context.Context) ([]stoppedRecordingInfo, error) {
log := logger.FromContext(ctx)
var stopped []stoppedRecordingInfo
Comment on lines +396 to +402
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is another place where we're breaking the record manager interface. I wonder if it's simpler in thinking of these operations as stop and clone but nbd

also segments is a new concept. you could encode it into the record manager interface and treat the replays as disjoint and perhaps join them back together at the end. unclear


for _, rec := range s.recordManager.ListActiveRecorders(ctx) {
if !rec.IsRecording(ctx) {
continue
}

id := rec.ID()

ffmpegRec, ok := rec.(*recorder.FFmpegRecorder)
if !ok {
log.Warn("cannot capture params from non-FFmpeg recorder, skipping", "id", id)
continue
}
Comment thread
cursor[bot] marked this conversation as resolved.

params := ffmpegRec.Params()
outputPath := ffmpegRec.OutputPath()

log.Info("stopping recording for resize", "id", id)
if err := rec.Stop(ctx); err != nil {
// Stop() returns finalization errors even when the process was
// successfully terminated. Only treat it as a hard failure if
// the process is still running.
if rec.IsRecording(ctx) {
log.Error("failed to stop recording for resize", "id", id, "error", err)
return stopped, fmt.Errorf("failed to stop recording %s: %w", id, err)
}
log.Warn("recording stopped with finalization warning", "id", id, "error", err)
}

if err := s.recordManager.DeregisterRecorder(ctx, rec); err != nil {
log.Error("failed to deregister recorder", "id", id, "error", err)
}

stopped = append(stopped, stoppedRecordingInfo{
id: id,
params: params,
outputPath: outputPath,
})
log.Info("recording stopped and deregistered for resize", "id", id)
}

return stopped, nil
}

// restartRecordings re-creates and starts recordings that were previously
// stopped for a display resize. The old (finalized) recording file is renamed
// to preserve it before the new recording begins at the same output path.
func (s *ApiService) restartRecordings(ctx context.Context, stopped []stoppedRecordingInfo) {
log := logger.FromContext(ctx)

for _, info := range stopped {
// Best-effort: preserve the pre-resize segment by renaming the finalized file.
// If this fails the old file may be overwritten, but we still restart recording.
if _, err := os.Stat(info.outputPath); err == nil {
preservedPath := strings.TrimSuffix(info.outputPath, ".mp4") +
fmt.Sprintf("-before-resize-%d.mp4", time.Now().UnixMilli())
if err := os.Rename(info.outputPath, preservedPath); err != nil {
log.Error("failed to rename pre-resize recording, old file may be overwritten", "id", info.id, "error", err)
} else {
log.Info("preserved pre-resize recording segment", "id", info.id, "path", preservedPath)
}
Comment thread
cursor[bot] marked this conversation as resolved.
}

rec, err := s.factory(info.id, info.params)
if err != nil {
log.Error("failed to create recorder for restart", "id", info.id, "error", err)
continue
}

if err := s.recordManager.RegisterRecorder(ctx, rec); err != nil {
log.Error("failed to register restarted recorder", "id", info.id, "error", err)
continue
}

if err := rec.Start(ctx); err != nil {
log.Error("failed to start restarted recording", "id", info.id, "error", err)
_ = s.recordManager.DeregisterRecorder(ctx, rec)
continue
Comment thread
cursor[bot] marked this conversation as resolved.
}

log.Info("recording restarted after resize", "id", info.id)
}
Comment thread
cursor[bot] marked this conversation as resolved.
}

// isNekoEnabled checks if Neko service is enabled
func (s *ApiService) isNekoEnabled() bool {
return os.Getenv("ENABLE_WEBRTC") == "true"
Expand Down
241 changes: 241 additions & 0 deletions server/cmd/api/api/display_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
package api

import (
"context"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/onkernel/kernel-images/server/lib/recorder"
"github.com/onkernel/kernel-images/server/lib/scaletozero"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var testMockFFmpegBin = filepath.Join("..", "..", "..", "lib", "recorder", "testdata", "mock_ffmpeg.sh")

func testFFmpegFactory(t *testing.T, tempDir string) recorder.FFmpegRecorderFactory {
t.Helper()
fr := 5
disp := 0
size := 1
config := recorder.FFmpegRecordingParams{
FrameRate: &fr,
DisplayNum: &disp,
MaxSizeInMB: &size,
OutputDir: &tempDir,
}
return recorder.NewFFmpegRecorderFactory(testMockFFmpegBin, config, scaletozero.NewNoopController())
}

func newTestServiceWithFactory(t *testing.T, mgr recorder.RecordManager, factory recorder.FFmpegRecorderFactory) *ApiService {
t.Helper()
svc, err := New(mgr, factory, newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
require.NoError(t, err)
return svc
}

func TestStopActiveRecordings(t *testing.T) {
t.Run("stops and deregisters active recording", func(t *testing.T) {
ctx := context.Background()
tempDir := t.TempDir()
factory := testFFmpegFactory(t, tempDir)
mgr := recorder.NewFFmpegManager()
svc := newTestServiceWithFactory(t, mgr, factory)

rec, err := factory("test-rec", recorder.FFmpegRecordingParams{})
require.NoError(t, err)
require.NoError(t, mgr.RegisterRecorder(ctx, rec))
require.NoError(t, rec.Start(ctx))
time.Sleep(50 * time.Millisecond)
require.True(t, rec.IsRecording(ctx))

stopped, err := svc.stopActiveRecordings(ctx)
require.NoError(t, err)
require.Len(t, stopped, 1)
assert.Equal(t, "test-rec", stopped[0].id)
assert.NotNil(t, stopped[0].params.FrameRate)
assert.Equal(t, filepath.Join(tempDir, "test-rec.mp4"), stopped[0].outputPath)

_, exists := mgr.GetRecorder("test-rec")
assert.False(t, exists, "recorder should be deregistered after stop")
})

t.Run("stops multiple active recordings", func(t *testing.T) {
ctx := context.Background()
tempDir := t.TempDir()
factory := testFFmpegFactory(t, tempDir)
mgr := recorder.NewFFmpegManager()
svc := newTestServiceWithFactory(t, mgr, factory)

ids := []string{"rec-a", "rec-b"}
for _, id := range ids {
rec, err := factory(id, recorder.FFmpegRecordingParams{})
require.NoError(t, err)
require.NoError(t, mgr.RegisterRecorder(ctx, rec))
require.NoError(t, rec.Start(ctx))
}
time.Sleep(50 * time.Millisecond)

stopped, err := svc.stopActiveRecordings(ctx)
require.NoError(t, err)
assert.Len(t, stopped, 2)

stoppedIDs := map[string]bool{}
for _, s := range stopped {
stoppedIDs[s.id] = true
}
for _, id := range ids {
assert.True(t, stoppedIDs[id], "recording %s should have been stopped", id)
_, exists := mgr.GetRecorder(id)
assert.False(t, exists, "recorder %s should be deregistered", id)
}
})

t.Run("skips non-recording recorders", func(t *testing.T) {
ctx := context.Background()
tempDir := t.TempDir()
factory := testFFmpegFactory(t, tempDir)
mgr := recorder.NewFFmpegManager()
svc := newTestServiceWithFactory(t, mgr, factory)

mock := &mockRecorder{id: "idle-rec", isRecordingFlag: false}
require.NoError(t, mgr.RegisterRecorder(ctx, mock))

stopped, err := svc.stopActiveRecordings(ctx)
require.NoError(t, err)
assert.Empty(t, stopped)

_, exists := mgr.GetRecorder("idle-rec")
assert.True(t, exists, "non-recording recorder should remain registered")
})

t.Run("returns empty when no recorders exist", func(t *testing.T) {
ctx := context.Background()
tempDir := t.TempDir()
factory := testFFmpegFactory(t, tempDir)
mgr := recorder.NewFFmpegManager()
svc := newTestServiceWithFactory(t, mgr, factory)

stopped, err := svc.stopActiveRecordings(ctx)
require.NoError(t, err)
assert.Empty(t, stopped)
})
}

func TestRestartRecordings(t *testing.T) {
t.Run("renames old file and starts new recording", func(t *testing.T) {
ctx := context.Background()
tempDir := t.TempDir()
factory := testFFmpegFactory(t, tempDir)
mgr := recorder.NewFFmpegManager()
svc := newTestServiceWithFactory(t, mgr, factory)

outputPath := filepath.Join(tempDir, "test-rec.mp4")
require.NoError(t, os.WriteFile(outputPath, []byte("fake video data"), 0644))

fr := 5
disp := 0
size := 1
info := stoppedRecordingInfo{
id: "test-rec",
params: recorder.FFmpegRecordingParams{
FrameRate: &fr,
DisplayNum: &disp,
MaxSizeInMB: &size,
OutputDir: &tempDir,
},
outputPath: outputPath,
}

svc.restartRecordings(ctx, []stoppedRecordingInfo{info})

entries, err := os.ReadDir(tempDir)
require.NoError(t, err)

foundRenamed := false
for _, e := range entries {
if strings.Contains(e.Name(), "before-resize") {
foundRenamed = true
data, readErr := os.ReadFile(filepath.Join(tempDir, e.Name()))
require.NoError(t, readErr)
assert.Equal(t, []byte("fake video data"), data)
}
}
assert.True(t, foundRenamed, "pre-resize recording should be preserved with renamed file")

rec, exists := mgr.GetRecorder("test-rec")
require.True(t, exists, "restarted recorder should be registered")
assert.True(t, rec.IsRecording(ctx), "restarted recorder should be recording")

_ = rec.Stop(ctx)
})

t.Run("starts recording even when no old file exists", func(t *testing.T) {
ctx := context.Background()
tempDir := t.TempDir()
factory := testFFmpegFactory(t, tempDir)
mgr := recorder.NewFFmpegManager()
svc := newTestServiceWithFactory(t, mgr, factory)

fr := 5
disp := 0
size := 1
info := stoppedRecordingInfo{
id: "fresh-rec",
params: recorder.FFmpegRecordingParams{
FrameRate: &fr,
DisplayNum: &disp,
MaxSizeInMB: &size,
OutputDir: &tempDir,
},
outputPath: filepath.Join(tempDir, "fresh-rec.mp4"),
}

svc.restartRecordings(ctx, []stoppedRecordingInfo{info})

rec, exists := mgr.GetRecorder("fresh-rec")
require.True(t, exists, "recorder should be registered")
assert.True(t, rec.IsRecording(ctx))

_ = rec.Stop(ctx)
})
}

func TestStopAndRestartRecordings_RoundTrip(t *testing.T) {
ctx := context.Background()
tempDir := t.TempDir()
factory := testFFmpegFactory(t, tempDir)
mgr := recorder.NewFFmpegManager()
svc := newTestServiceWithFactory(t, mgr, factory)

// Start a recording
rec, err := factory("round-trip", recorder.FFmpegRecordingParams{})
require.NoError(t, err)
require.NoError(t, mgr.RegisterRecorder(ctx, rec))
require.NoError(t, rec.Start(ctx))
time.Sleep(50 * time.Millisecond)
require.True(t, rec.IsRecording(ctx))

// Stop all active recordings
stopped, err := svc.stopActiveRecordings(ctx)
require.NoError(t, err)
require.Len(t, stopped, 1)
assert.Equal(t, "round-trip", stopped[0].id)

// Verify the recorder was deregistered
_, exists := mgr.GetRecorder("round-trip")
require.False(t, exists)

// Restart recordings
svc.restartRecordings(ctx, stopped)

// Verify the recording resumed with the same ID
newRec, exists := mgr.GetRecorder("round-trip")
require.True(t, exists, "recorder should be re-registered after restart")
assert.True(t, newRec.IsRecording(ctx), "recorder should be actively recording")

_ = newRec.Stop(ctx)
}
Loading
Loading