From 330dd0dfb6b79dd6ba4f6a98bf0d50a1dfcfd42d Mon Sep 17 00:00:00 2001 From: Hitesh Chidambar Kotian Date: Wed, 29 Apr 2026 12:22:54 -0400 Subject: [PATCH 1/3] Adding FDv2 Stream endpoint support in the dev server --- internal/dev_server/model/events.go | 12 +- internal/dev_server/model/override.go | 18 +-- internal/dev_server/model/override_test.go | 8 +- internal/dev_server/model/project.go | 5 +- internal/dev_server/model/project_test.go | 5 +- internal/dev_server/model/restore.go | 5 +- internal/dev_server/model/restore_test.go | 2 +- internal/dev_server/sdk/fdv2.go | 23 +++- internal/dev_server/sdk/routes.go | 1 + internal/dev_server/sdk/stream_server_fdv2.go | 121 ++++++++++++++++++ 10 files changed, 175 insertions(+), 25 deletions(-) create mode 100644 internal/dev_server/sdk/stream_server_fdv2.go diff --git a/internal/dev_server/model/events.go b/internal/dev_server/model/events.go index 43ab7c78..3c39499a 100644 --- a/internal/dev_server/model/events.go +++ b/internal/dev_server/model/events.go @@ -2,13 +2,15 @@ package model // Event for individual flag overrides type OverrideEvent struct { - FlagKey string - ProjectKey string - FlagState FlagState + FlagKey string + ProjectKey string + FlagState FlagState + PayloadVersion int } // Event for full project sync type SyncEvent struct { - ProjectKey string - AllFlagsState FlagsState + ProjectKey string + AllFlagsState FlagsState + PayloadVersion int } diff --git a/internal/dev_server/model/override.go b/internal/dev_server/model/override.go index cfc97c34..92b19448 100644 --- a/internal/dev_server/model/override.go +++ b/internal/dev_server/model/override.go @@ -60,15 +60,16 @@ func UpsertOverride(ctx context.Context, projectKey, flagKey string, value ldval return Override{}, err } - _, err = store.IncrementProjectPayloadVersion(ctx, projectKey) + newPayloadVersion, err := store.IncrementProjectPayloadVersion(ctx, projectKey) if err != nil { return Override{}, errors.Wrap(err, "unable to increment payload version") } GetObserversFromContext(ctx).Notify(OverrideEvent{ - FlagKey: flagKey, - ProjectKey: projectKey, - FlagState: override.Apply(flagState), + FlagKey: flagKey, + ProjectKey: projectKey, + FlagState: override.Apply(flagState), + PayloadVersion: newPayloadVersion, }) return override, nil } @@ -84,7 +85,7 @@ func DeleteOverride(ctx context.Context, projectKey, flagKey string) error { return err } - _, err = store.IncrementProjectPayloadVersion(ctx, projectKey) + newPayloadVersion, err := store.IncrementProjectPayloadVersion(ctx, projectKey) if err != nil { return errors.Wrap(err, "unable to increment payload version") } @@ -97,9 +98,10 @@ func DeleteOverride(ctx context.Context, projectKey, flagKey string) error { Version: version, } GetObserversFromContext(ctx).Notify(OverrideEvent{ - FlagKey: flagKey, - ProjectKey: projectKey, - FlagState: override.Apply(flagState), + FlagKey: flagKey, + ProjectKey: projectKey, + FlagState: override.Apply(flagState), + PayloadVersion: newPayloadVersion, }) return err } diff --git a/internal/dev_server/model/override_test.go b/internal/dev_server/model/override_test.go index b9198336..4ba90c03 100644 --- a/internal/dev_server/model/override_test.go +++ b/internal/dev_server/model/override_test.go @@ -77,9 +77,10 @@ func TestUpsertOverride(t *testing.T) { observer. EXPECT(). Handle(model.OverrideEvent{ - FlagKey: flagKey, - ProjectKey: projKey, - FlagState: model.FlagState{Value: ldvalue.Bool(true), Version: 2, TrackEvents: true}, + FlagKey: flagKey, + ProjectKey: projKey, + FlagState: model.FlagState{Value: ldvalue.Bool(true), Version: 2, TrackEvents: true}, + PayloadVersion: 1, }) o, err := model.UpsertOverride(ctx, projKey, flagKey, ldValue) @@ -139,6 +140,7 @@ func TestDeleteOverride(t *testing.T) { Value: ldvalue.Bool(false), Version: 3, // override version 2 + flag version 1 }, + PayloadVersion: 1, }) err := model.DeleteOverride(ctx, projKey, flagKey) diff --git a/internal/dev_server/model/project.go b/internal/dev_server/model/project.go index ad3e899d..778bd96b 100644 --- a/internal/dev_server/model/project.go +++ b/internal/dev_server/model/project.go @@ -101,8 +101,9 @@ func UpdateProject(ctx context.Context, projectKey string, context *ldcontext.Co } GetObserversFromContext(ctx).Notify(SyncEvent{ - ProjectKey: project.Key, - AllFlagsState: allFlagsWithOverrides, + ProjectKey: project.Key, + AllFlagsState: allFlagsWithOverrides, + PayloadVersion: project.PayloadVersion, }) return *project, nil } diff --git a/internal/dev_server/model/project_test.go b/internal/dev_server/model/project_test.go index 4a6d4a15..2b15f55d 100644 --- a/internal/dev_server/model/project_test.go +++ b/internal/dev_server/model/project_test.go @@ -188,8 +188,9 @@ func TestUpdateProject(t *testing.T) { observer. EXPECT(). Handle(model.SyncEvent{ - ProjectKey: proj.Key, - AllFlagsState: model.FromAllFlags(allFlagsState), + ProjectKey: proj.Key, + AllFlagsState: model.FromAllFlags(allFlagsState), + PayloadVersion: 2, }) project, err := model.UpdateProject(ctx, proj.Key, nil, nil) diff --git a/internal/dev_server/model/restore.go b/internal/dev_server/model/restore.go index 1a59554e..ca786cb2 100644 --- a/internal/dev_server/model/restore.go +++ b/internal/dev_server/model/restore.go @@ -29,8 +29,9 @@ func RestoreDb(ctx context.Context, stream io.Reader) error { return err } observers.Notify(SyncEvent{ - ProjectKey: project.Key, - AllFlagsState: allFlagsWithOverrides, + ProjectKey: project.Key, + AllFlagsState: allFlagsWithOverrides, + PayloadVersion: project.PayloadVersion, }) } diff --git a/internal/dev_server/model/restore_test.go b/internal/dev_server/model/restore_test.go index d522f410..b64ddf1a 100644 --- a/internal/dev_server/model/restore_test.go +++ b/internal/dev_server/model/restore_test.go @@ -53,7 +53,7 @@ func TestRestoreDb(t *testing.T) { store.EXPECT().GetDevProject(gomock.Any(), projKey).Return(&proj, nil) store.EXPECT().GetOverridesForProject(gomock.Any(), projKey).Return(model.Overrides{}, nil) observer := mocks.NewMockObserver(mockController) - observer.EXPECT().Handle(model.SyncEvent{ProjectKey: projKey, AllFlagsState: proj.AllFlagsState}) + observer.EXPECT().Handle(model.SyncEvent{ProjectKey: projKey, AllFlagsState: proj.AllFlagsState, PayloadVersion: proj.PayloadVersion}) observers.RegisterObserver(observer) diff --git a/internal/dev_server/sdk/fdv2.go b/internal/dev_server/sdk/fdv2.go index 4f665a97..e03ea0a0 100644 --- a/internal/dev_server/sdk/fdv2.go +++ b/internal/dev_server/sdk/fdv2.go @@ -14,6 +14,7 @@ const ( fdv2ReasonUpToDate = "up-to-date" fdv2ReasonCantCatchup = "cant-catchup" fdv2ReasonPayloadMissing = "payload-missing" + fdv2ReasonUpdate = "update" ) // parseBasis extracts the payload ID and version from a basis state string of the @@ -122,11 +123,29 @@ func makePutObjectEvent(version int, key string, flagState model.FlagState) (sub return subsystems.RawEvent{Name: subsystems.EventPutObject, Data: data}, nil } +// buildFlagChangeEvents builds the three-event sequence for a single flag update pushed over a stream: +// server-intent(xfer-changes) + put-object(changed flag) + payload-transferred. +func buildFlagChangeEvents(payloadID string, version int, flagKey string, flagState model.FlagState) ([]subsystems.RawEvent, error) { + intentEvent, err := makeServerIntentEvent(payloadID, version, subsystems.IntentTransferChanges, fdv2ReasonUpdate) + if err != nil { + return nil, err + } + putEvent, err := makePutObjectEvent(version, flagKey, flagState) + if err != nil { + return nil, err + } + transferredEvent, err := makePayloadTransferredEvent(payloadID, version) + if err != nil { + return nil, err + } + return []subsystems.RawEvent{intentEvent, putEvent, transferredEvent}, nil +} + func makePayloadTransferredEvent(payloadID string, version int) (subsystems.RawEvent, error) { // The selector state is synthetic and dev-server-specific: the dev server uses the // project key as the payload ID rather than a server-assigned opaque value. The SDK - // echoes this selector back as ?basis on subsequent polls, where parseBasisVersion - // extracts the version from it. + // echoes this selector back as ?basis on subsequent polls, where parseBasis + // extracts the payload ID and version from it. selector := subsystems.NewSelector(fmt.Sprintf("(p:%s:%d)", payloadID, version), version) data, err := json.Marshal(selector) if err != nil { diff --git a/internal/dev_server/sdk/routes.go b/internal/dev_server/sdk/routes.go index fdc221be..b59e0681 100644 --- a/internal/dev_server/sdk/routes.go +++ b/internal/dev_server/sdk/routes.go @@ -22,6 +22,7 @@ func BindRoutes(router *mux.Router) { router.Handle("/all", GetProjectKeyFromAuthorizationHeader(http.HandlerFunc(StreamServerAllPayload))) router.Handle("/sdk/latest-all", GetProjectKeyFromAuthorizationHeader(http.HandlerFunc(LatestAll))) router.Handle("/sdk/poll", GetProjectKeyFromAuthorizationHeader(http.HandlerFunc(PollV2))) + router.Handle("/sdk/stream", GetProjectKeyFromAuthorizationHeader(http.HandlerFunc(StreamV2))) router.PathPrefix("/sdk/flags/{flagKey}"). Methods(http.MethodGet). diff --git a/internal/dev_server/sdk/stream_server_fdv2.go b/internal/dev_server/sdk/stream_server_fdv2.go new file mode 100644 index 00000000..5306a51a --- /dev/null +++ b/internal/dev_server/sdk/stream_server_fdv2.go @@ -0,0 +1,121 @@ +package sdk + +import ( + "fmt" + "log" + "net/http" + "time" + + "github.com/launchdarkly/go-server-sdk/v7/subsystems" + "github.com/launchdarkly/ldcli/internal/dev_server/model" + "github.com/pkg/errors" +) + +func StreamV2(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + projectKey := GetProjectKeyFromContext(ctx) + store := model.StoreFromContext(ctx) + + project, err := store.GetDevProject(ctx, projectKey) + if err != nil { + WriteError(ctx, w, errors.Wrap(err, "failed to get project")) + return + } + + allFlags, err := project.GetFlagStateWithOverridesForProject(ctx) + if err != nil { + WriteError(ctx, w, errors.Wrap(err, "failed to get flag state")) + return + } + + flusher, ok := w.(http.Flusher) + if !ok { + WriteError(ctx, w, errors.New("streaming not supported")) + return + } + + initialPayload, err := buildFullTransferResponse(projectKey, project.PayloadVersion, allFlags, fdv2ReasonPayloadMissing) + if err != nil { + WriteError(ctx, w, errors.Wrap(err, "failed to build initial payload")) + return + } + + // Register observer before writing to the client so that any changes arriving + // during the initial write are queued and delivered immediately after. + updateChan := make(chan []subsystems.RawEvent, 10) + observerID := model.GetObserversFromContext(ctx).RegisterObserver(fdv2StreamObserver{ + updateChan: updateChan, + projectKey: projectKey, + }) + defer func() { + if ok := model.GetObserversFromContext(ctx).DeregisterObserver(observerID); !ok { + log.Printf("unable to deregister fdv2 stream observer") + } + }() + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + + if err := writeFDv2SSEEvents(w, flusher, initialPayload.Events); err != nil { + return + } + + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case events := <-updateChan: + if err := writeFDv2SSEEvents(w, flusher, events); err != nil { + return + } + case <-ticker.C: + // SSE comment line as a keepalive. + if _, err := w.Write([]byte(":\n\n")); err != nil { + return + } + flusher.Flush() + case <-ctx.Done(): + return + } + } +} + +// writeFDv2SSEEvents writes a batch of FDv2 events to the response as individual SSE events. +func writeFDv2SSEEvents(w http.ResponseWriter, flusher http.Flusher, events []subsystems.RawEvent) error { + for _, event := range events { + if _, err := fmt.Fprintf(w, "event:%s\ndata:%s\n\n", event.Name, event.Data); err != nil { + return err + } + } + flusher.Flush() + return nil +} + +type fdv2StreamObserver struct { + updateChan chan<- []subsystems.RawEvent + projectKey string +} + +func (o fdv2StreamObserver) Handle(event interface{}) { + switch event := event.(type) { + case model.OverrideEvent: + if event.ProjectKey != o.projectKey { + return + } + events, err := buildFlagChangeEvents(o.projectKey, event.PayloadVersion, event.FlagKey, event.FlagState) + if err != nil { + panic(errors.Wrap(err, "failed to build flag change events in fdv2 stream observer")) + } + o.updateChan <- events + case model.SyncEvent: + if event.ProjectKey != o.projectKey { + return + } + payload, err := buildFullTransferResponse(o.projectKey, event.PayloadVersion, event.AllFlagsState, fdv2ReasonCantCatchup) + if err != nil { + panic(errors.Wrap(err, "failed to build full transfer in fdv2 stream observer")) + } + o.updateChan <- payload.Events + } +} From 1f4132f7071017a5ed07fffe621b7801def44bfc Mon Sep 17 00:00:00 2001 From: Hitesh Chidambar Kotian Date: Wed, 29 Apr 2026 22:18:24 -0400 Subject: [PATCH 2/3] Fix Openstream message type --- .../dev_server/api/events/events_stream.go | 8 +-- internal/dev_server/sdk/fdv2.go | 2 +- .../dev_server/sdk/stream_client_flags.go | 4 +- internal/dev_server/sdk/stream_server_fdv2.go | 68 +++++-------------- .../dev_server/sdk/stream_server_flags.go | 4 +- internal/dev_server/sdk/streaming.go | 17 ++--- 6 files changed, 36 insertions(+), 67 deletions(-) diff --git a/internal/dev_server/api/events/events_stream.go b/internal/dev_server/api/events/events_stream.go index 2824e84f..d9fac164 100644 --- a/internal/dev_server/api/events/events_stream.go +++ b/internal/dev_server/api/events/events_stream.go @@ -16,10 +16,10 @@ import ( type sdkEventObserver struct { ctx context.Context debugSessionKey string - updateChan chan<- sdk.Message + updateChan chan<- []byte } -func newSdkEventObserver(updateChan chan<- sdk.Message, ctx context.Context) sdkEventObserver { +func newSdkEventObserver(updateChan chan<- []byte, ctx context.Context) sdkEventObserver { debugSessionKey := uuid.New().String() db := model.EventStoreFromContext(ctx) err := db.CreateDebugSession(ctx, debugSessionKey) @@ -54,14 +54,14 @@ func (o sdkEventObserver) Handle(message interface{}) { return } - o.updateChan <- sdk.Message{Event: sdk.TYPE_PUT, Data: str} + o.updateChan <- sdk.Message{Event: sdk.TYPE_PUT, Data: str}.ToPayload() } func SdkEventsTeeHandler(writer http.ResponseWriter, request *http.Request) { updateChan, errChan := sdk.OpenStream( writer, request.Context().Done(), - sdk.Message{Event: sdk.TYPE_PUT, Data: []byte{}}, + sdk.Message{Event: sdk.TYPE_PUT, Data: []byte{}}.ToPayload(), ) defer close(updateChan) observers := model.GetObserversFromContext(request.Context()) diff --git a/internal/dev_server/sdk/fdv2.go b/internal/dev_server/sdk/fdv2.go index e03ea0a0..30180a01 100644 --- a/internal/dev_server/sdk/fdv2.go +++ b/internal/dev_server/sdk/fdv2.go @@ -123,7 +123,7 @@ func makePutObjectEvent(version int, key string, flagState model.FlagState) (sub return subsystems.RawEvent{Name: subsystems.EventPutObject, Data: data}, nil } -// buildFlagChangeEvents builds the three-event sequence for a single flag update pushed over a stream: +// buildFlagChangeEvents builds the events sequence for a single flag update pushed over a stream: // server-intent(xfer-changes) + put-object(changed flag) + payload-transferred. func buildFlagChangeEvents(payloadID string, version int, flagKey string, flagState model.FlagState) ([]subsystems.RawEvent, error) { intentEvent, err := makeServerIntentEvent(payloadID, version, subsystems.IntentTransferChanges, fdv2ReasonUpdate) diff --git a/internal/dev_server/sdk/stream_client_flags.go b/internal/dev_server/sdk/stream_client_flags.go index 38573489..4b2cb371 100644 --- a/internal/dev_server/sdk/stream_client_flags.go +++ b/internal/dev_server/sdk/stream_client_flags.go @@ -25,7 +25,7 @@ func StreamClientFlags(w http.ResponseWriter, r *http.Request) { updateChan, doneChan := OpenStream( w, r.Context().Done(), - Message{Event: TYPE_PUT, Data: jsonBody}, + Message{Event: TYPE_PUT, Data: jsonBody}.ToPayload(), ) defer close(updateChan) projectKey := GetProjectKeyFromContext(ctx) @@ -46,7 +46,7 @@ func StreamClientFlags(w http.ResponseWriter, r *http.Request) { } type clientFlagsObserver struct { - updateChan chan<- Message + updateChan chan<- []byte projectKey string } diff --git a/internal/dev_server/sdk/stream_server_fdv2.go b/internal/dev_server/sdk/stream_server_fdv2.go index 5306a51a..3e8ae1df 100644 --- a/internal/dev_server/sdk/stream_server_fdv2.go +++ b/internal/dev_server/sdk/stream_server_fdv2.go @@ -4,7 +4,6 @@ import ( "fmt" "log" "net/http" - "time" "github.com/launchdarkly/go-server-sdk/v7/subsystems" "github.com/launchdarkly/ldcli/internal/dev_server/model" @@ -28,72 +27,41 @@ func StreamV2(w http.ResponseWriter, r *http.Request) { return } - flusher, ok := w.(http.Flusher) - if !ok { - WriteError(ctx, w, errors.New("streaming not supported")) - return - } - initialPayload, err := buildFullTransferResponse(projectKey, project.PayloadVersion, allFlags, fdv2ReasonPayloadMissing) if err != nil { WriteError(ctx, w, errors.Wrap(err, "failed to build initial payload")) return } - // Register observer before writing to the client so that any changes arriving - // during the initial write are queued and delivered immediately after. - updateChan := make(chan []subsystems.RawEvent, 10) - observerID := model.GetObserversFromContext(ctx).RegisterObserver(fdv2StreamObserver{ - updateChan: updateChan, - projectKey: projectKey, - }) + updateChan, doneChan := OpenStream(w, r.Context().Done(), fdv2SSEPayload(initialPayload.Events)) + defer close(updateChan) + + observer := fdv2StreamObserver{updateChan: updateChan, projectKey: projectKey} + observerID := model.GetObserversFromContext(ctx).RegisterObserver(observer) defer func() { if ok := model.GetObserversFromContext(ctx).DeregisterObserver(observerID); !ok { log.Printf("unable to deregister fdv2 stream observer") } }() - w.Header().Set("Content-Type", "text/event-stream") - w.Header().Set("Cache-Control", "no-cache") - - if err := writeFDv2SSEEvents(w, flusher, initialPayload.Events); err != nil { - return - } - - ticker := time.NewTicker(time.Minute) - defer ticker.Stop() - - for { - select { - case events := <-updateChan: - if err := writeFDv2SSEEvents(w, flusher, events); err != nil { - return - } - case <-ticker.C: - // SSE comment line as a keepalive. - if _, err := w.Write([]byte(":\n\n")); err != nil { - return - } - flusher.Flush() - case <-ctx.Done(): - return - } + err = <-doneChan + if err != nil { + WriteError(ctx, w, errors.Wrap(err, "stream failure")) } } -// writeFDv2SSEEvents writes a batch of FDv2 events to the response as individual SSE events. -func writeFDv2SSEEvents(w http.ResponseWriter, flusher http.Flusher, events []subsystems.RawEvent) error { - for _, event := range events { - if _, err := fmt.Fprintf(w, "event:%s\ndata:%s\n\n", event.Name, event.Data); err != nil { - return err - } +// fdv2SSEPayload formats a slice of FDv2 events as raw SSE bytes. +// Each event becomes an individual SSE event in the output. +func fdv2SSEPayload(events []subsystems.RawEvent) []byte { + var buf []byte + for _, e := range events { + buf = append(buf, fmt.Sprintf("event:%s\ndata:%s\n\n", e.Name, e.Data)...) } - flusher.Flush() - return nil + return buf } type fdv2StreamObserver struct { - updateChan chan<- []subsystems.RawEvent + updateChan chan<- []byte projectKey string } @@ -107,7 +75,7 @@ func (o fdv2StreamObserver) Handle(event interface{}) { if err != nil { panic(errors.Wrap(err, "failed to build flag change events in fdv2 stream observer")) } - o.updateChan <- events + o.updateChan <- fdv2SSEPayload(events) case model.SyncEvent: if event.ProjectKey != o.projectKey { return @@ -116,6 +84,6 @@ func (o fdv2StreamObserver) Handle(event interface{}) { if err != nil { panic(errors.Wrap(err, "failed to build full transfer in fdv2 stream observer")) } - o.updateChan <- payload.Events + o.updateChan <- fdv2SSEPayload(payload.Events) } } diff --git a/internal/dev_server/sdk/stream_server_flags.go b/internal/dev_server/sdk/stream_server_flags.go index 82a305ce..a7479d51 100644 --- a/internal/dev_server/sdk/stream_server_flags.go +++ b/internal/dev_server/sdk/stream_server_flags.go @@ -27,7 +27,7 @@ func StreamServerAllPayload(w http.ResponseWriter, r *http.Request) { updateChan, doneChan := OpenStream( w, r.Context().Done(), - Message{Event: TYPE_PUT, Data: jsonBody}, + Message{Event: TYPE_PUT, Data: jsonBody}.ToPayload(), ) defer close(updateChan) observer := serverFlagsObserver{updateChan, projectKey} @@ -47,7 +47,7 @@ func StreamServerAllPayload(w http.ResponseWriter, r *http.Request) { } type serverFlagsObserver struct { - updateChan chan<- Message + updateChan chan<- []byte projectKey string } diff --git a/internal/dev_server/sdk/streaming.go b/internal/dev_server/sdk/streaming.go index 04f650c8..b8a44299 100644 --- a/internal/dev_server/sdk/streaming.go +++ b/internal/dev_server/sdk/streaming.go @@ -28,10 +28,11 @@ func (m Message) ToPayload() []byte { return payload } -// OpenStream sends data to a response using the initial payload and subsequently via the returned write only channel -func OpenStream(w http.ResponseWriter, done <-chan struct{}, initialMessage Message) (chan<- Message, <-chan error) { +// OpenStream sets SSE headers, writes initialPayload, and starts the SSE loop. +// Each []byte sent to the returned channel is written verbatim to the response. +func OpenStream(w http.ResponseWriter, done <-chan struct{}, initialPayload []byte) (chan<- []byte, <-chan error) { errChan := make(chan error) - updateChan := make(chan Message, 10) + updateChan := make(chan []byte, 10) go func() { var err error defer func() { @@ -45,7 +46,7 @@ func OpenStream(w http.ResponseWriter, done <-chan struct{}, initialMessage Mess } w.Header().Set("Content-Type", "text/event-stream") - _, err = w.Write(initialMessage.ToPayload()) + _, err = w.Write(initialPayload) if err != nil { return errors.Wrap(err, "unable to write response") } @@ -60,8 +61,8 @@ func OpenStream(w http.ResponseWriter, done <-chan struct{}, initialMessage Mess return errors.Wrap(err, "unable to write response") } flusher.Flush() - case msg := <-updateChan: - _, err = w.Write(msg.ToPayload()) + case payload := <-updateChan: + _, err = w.Write(payload) if err != nil { return errors.Wrap(err, "unable to write response") } @@ -77,7 +78,7 @@ func OpenStream(w http.ResponseWriter, done <-chan struct{}, initialMessage Mess } func SendMessage( - updateChan chan<- Message, + updateChan chan<- []byte, msgType MessageType, data interface{}, ) error { @@ -89,7 +90,7 @@ func SendMessage( updateChan <- Message{ Event: msgType, Data: payload, - } + }.ToPayload() return nil } From 3b1c96091db0fa2142af57f85622fc47390907c2 Mon Sep 17 00:00:00 2001 From: Hitesh Chidambar Kotian Date: Wed, 29 Apr 2026 22:33:26 -0400 Subject: [PATCH 3/3] handle basis --- internal/dev_server/sdk/fdv2.go | 4 ++-- internal/dev_server/sdk/fdv2_test.go | 12 ++++++------ internal/dev_server/sdk/polling.go | 2 +- internal/dev_server/sdk/stream_server_fdv2.go | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/internal/dev_server/sdk/fdv2.go b/internal/dev_server/sdk/fdv2.go index 30180a01..012d725c 100644 --- a/internal/dev_server/sdk/fdv2.go +++ b/internal/dev_server/sdk/fdv2.go @@ -40,7 +40,7 @@ func parseBasis(basis string) (string, int) { return inner[:lastColon], version } -// buildPollResponse constructs the FDv2 polling response. +// buildInitialResponse constructs the FDv2 initial response for both polling and streaming. // // payloadID is the stable identifier for this payload (the project key). // currentVersion is the project's current PayloadVersion. @@ -49,7 +49,7 @@ func parseBasis(basis string) (string, int) { // // Delta transfers are not supported: stale clients always receive a full payload. // Tracking the change history required for deltas is overkill for a local dev server. -func buildPollResponse(payloadID string, currentVersion int, flags model.FlagsState, basis string) (subsystems.PollingPayload, error) { +func buildInitialResponse(payloadID string, currentVersion int, flags model.FlagsState, basis string) (subsystems.PollingPayload, error) { basisPayloadID, basisVersion := parseBasis(basis) switch { case basisVersion == 0: diff --git a/internal/dev_server/sdk/fdv2_test.go b/internal/dev_server/sdk/fdv2_test.go index 4020f5cb..bec14776 100644 --- a/internal/dev_server/sdk/fdv2_test.go +++ b/internal/dev_server/sdk/fdv2_test.go @@ -53,7 +53,7 @@ func TestBuildPollResponse(t *testing.T) { } t.Run("no basis sends xfer-full with payload-missing", func(t *testing.T) { - resp, err := buildPollResponse(payloadID, currentVersion, flags, "") + resp, err := buildInitialResponse(payloadID, currentVersion, flags, "") require.NoError(t, err) require.GreaterOrEqual(t, len(resp.Events), 3) // server-intent + put-objects + payload-transferred @@ -64,7 +64,7 @@ func TestBuildPollResponse(t *testing.T) { t.Run("up-to-date basis sends none with up-to-date", func(t *testing.T) { basis := fmt.Sprintf("(p:%s:%d)", payloadID, currentVersion) - resp, err := buildPollResponse(payloadID, currentVersion, flags, basis) + resp, err := buildInitialResponse(payloadID, currentVersion, flags, basis) require.NoError(t, err) require.Len(t, resp.Events, 1) @@ -73,7 +73,7 @@ func TestBuildPollResponse(t *testing.T) { t.Run("basis ahead of current version sends full transfer (e.g. project recreated)", func(t *testing.T) { basis := fmt.Sprintf("(p:%s:%d)", payloadID, currentVersion+10) - resp, err := buildPollResponse(payloadID, currentVersion, flags, basis) + resp, err := buildInitialResponse(payloadID, currentVersion, flags, basis) require.NoError(t, err) require.GreaterOrEqual(t, len(resp.Events), 3) @@ -83,7 +83,7 @@ func TestBuildPollResponse(t *testing.T) { t.Run("stale basis sends xfer-full with cant-catchup", func(t *testing.T) { basis := fmt.Sprintf("(p:%s:%d)", payloadID, currentVersion-1) - resp, err := buildPollResponse(payloadID, currentVersion, flags, basis) + resp, err := buildInitialResponse(payloadID, currentVersion, flags, basis) require.NoError(t, err) require.GreaterOrEqual(t, len(resp.Events), 3) @@ -93,7 +93,7 @@ func TestBuildPollResponse(t *testing.T) { t.Run("basis with wrong payload ID sends xfer-full", func(t *testing.T) { basis := fmt.Sprintf("(p:%s:%d)", "other-project", currentVersion) - resp, err := buildPollResponse(payloadID, currentVersion, flags, basis) + resp, err := buildInitialResponse(payloadID, currentVersion, flags, basis) require.NoError(t, err) require.GreaterOrEqual(t, len(resp.Events), 3) @@ -106,7 +106,7 @@ func TestBuildPollResponse(t *testing.T) { "flag-a": model.FlagState{Value: ldvalue.Bool(true), Version: 1}, "flag-b": model.FlagState{Value: ldvalue.String("hello"), Version: 2}, } - resp, err := buildPollResponse(payloadID, currentVersion, multiFlags, "") + resp, err := buildInitialResponse(payloadID, currentVersion, multiFlags, "") require.NoError(t, err) // server-intent + 2 put-objects + payload-transferred diff --git a/internal/dev_server/sdk/polling.go b/internal/dev_server/sdk/polling.go index b3d46989..11fb8042 100644 --- a/internal/dev_server/sdk/polling.go +++ b/internal/dev_server/sdk/polling.go @@ -25,7 +25,7 @@ func PollV2(w http.ResponseWriter, r *http.Request) { return } - response, err := buildPollResponse(projectKey, project.PayloadVersion, allFlags, r.URL.Query().Get("basis")) + response, err := buildInitialResponse(projectKey, project.PayloadVersion, allFlags, r.URL.Query().Get("basis")) if err != nil { WriteError(ctx, w, errors.Wrap(err, "failed to build poll response")) return diff --git a/internal/dev_server/sdk/stream_server_fdv2.go b/internal/dev_server/sdk/stream_server_fdv2.go index 3e8ae1df..159fb29e 100644 --- a/internal/dev_server/sdk/stream_server_fdv2.go +++ b/internal/dev_server/sdk/stream_server_fdv2.go @@ -27,7 +27,7 @@ func StreamV2(w http.ResponseWriter, r *http.Request) { return } - initialPayload, err := buildFullTransferResponse(projectKey, project.PayloadVersion, allFlags, fdv2ReasonPayloadMissing) + initialPayload, err := buildInitialResponse(projectKey, project.PayloadVersion, allFlags, r.URL.Query().Get("basis")) if err != nil { WriteError(ctx, w, errors.Wrap(err, "failed to build initial payload")) return