diff --git a/internal/dev_server/api/events/events_stream.go b/internal/dev_server/api/events/events_stream.go index 2824e84f..d9fac164 100644 --- a/internal/dev_server/api/events/events_stream.go +++ b/internal/dev_server/api/events/events_stream.go @@ -16,10 +16,10 @@ import ( type sdkEventObserver struct { ctx context.Context debugSessionKey string - updateChan chan<- sdk.Message + updateChan chan<- []byte } -func newSdkEventObserver(updateChan chan<- sdk.Message, ctx context.Context) sdkEventObserver { +func newSdkEventObserver(updateChan chan<- []byte, ctx context.Context) sdkEventObserver { debugSessionKey := uuid.New().String() db := model.EventStoreFromContext(ctx) err := db.CreateDebugSession(ctx, debugSessionKey) @@ -54,14 +54,14 @@ func (o sdkEventObserver) Handle(message interface{}) { return } - o.updateChan <- sdk.Message{Event: sdk.TYPE_PUT, Data: str} + o.updateChan <- sdk.Message{Event: sdk.TYPE_PUT, Data: str}.ToPayload() } func SdkEventsTeeHandler(writer http.ResponseWriter, request *http.Request) { updateChan, errChan := sdk.OpenStream( writer, request.Context().Done(), - sdk.Message{Event: sdk.TYPE_PUT, Data: []byte{}}, + sdk.Message{Event: sdk.TYPE_PUT, Data: []byte{}}.ToPayload(), ) defer close(updateChan) observers := model.GetObserversFromContext(request.Context()) diff --git a/internal/dev_server/model/events.go b/internal/dev_server/model/events.go index 43ab7c78..3c39499a 100644 --- a/internal/dev_server/model/events.go +++ b/internal/dev_server/model/events.go @@ -2,13 +2,15 @@ package model // Event for individual flag overrides type OverrideEvent struct { - FlagKey string - ProjectKey string - FlagState FlagState + FlagKey string + ProjectKey string + FlagState FlagState + PayloadVersion int } // Event for full project sync type SyncEvent struct { - ProjectKey string - AllFlagsState FlagsState + ProjectKey string + AllFlagsState FlagsState + PayloadVersion int } diff --git a/internal/dev_server/model/override.go b/internal/dev_server/model/override.go index cfc97c34..92b19448 100644 --- a/internal/dev_server/model/override.go +++ b/internal/dev_server/model/override.go @@ -60,15 +60,16 @@ func UpsertOverride(ctx context.Context, projectKey, flagKey string, value ldval return Override{}, err } - _, err = store.IncrementProjectPayloadVersion(ctx, projectKey) + newPayloadVersion, err := store.IncrementProjectPayloadVersion(ctx, projectKey) if err != nil { return Override{}, errors.Wrap(err, "unable to increment payload version") } GetObserversFromContext(ctx).Notify(OverrideEvent{ - FlagKey: flagKey, - ProjectKey: projectKey, - FlagState: override.Apply(flagState), + FlagKey: flagKey, + ProjectKey: projectKey, + FlagState: override.Apply(flagState), + PayloadVersion: newPayloadVersion, }) return override, nil } @@ -84,7 +85,7 @@ func DeleteOverride(ctx context.Context, projectKey, flagKey string) error { return err } - _, err = store.IncrementProjectPayloadVersion(ctx, projectKey) + newPayloadVersion, err := store.IncrementProjectPayloadVersion(ctx, projectKey) if err != nil { return errors.Wrap(err, "unable to increment payload version") } @@ -97,9 +98,10 @@ func DeleteOverride(ctx context.Context, projectKey, flagKey string) error { Version: version, } GetObserversFromContext(ctx).Notify(OverrideEvent{ - FlagKey: flagKey, - ProjectKey: projectKey, - FlagState: override.Apply(flagState), + FlagKey: flagKey, + ProjectKey: projectKey, + FlagState: override.Apply(flagState), + PayloadVersion: newPayloadVersion, }) return err } diff --git a/internal/dev_server/model/override_test.go b/internal/dev_server/model/override_test.go index b9198336..4ba90c03 100644 --- a/internal/dev_server/model/override_test.go +++ b/internal/dev_server/model/override_test.go @@ -77,9 +77,10 @@ func TestUpsertOverride(t *testing.T) { observer. EXPECT(). Handle(model.OverrideEvent{ - FlagKey: flagKey, - ProjectKey: projKey, - FlagState: model.FlagState{Value: ldvalue.Bool(true), Version: 2, TrackEvents: true}, + FlagKey: flagKey, + ProjectKey: projKey, + FlagState: model.FlagState{Value: ldvalue.Bool(true), Version: 2, TrackEvents: true}, + PayloadVersion: 1, }) o, err := model.UpsertOverride(ctx, projKey, flagKey, ldValue) @@ -139,6 +140,7 @@ func TestDeleteOverride(t *testing.T) { Value: ldvalue.Bool(false), Version: 3, // override version 2 + flag version 1 }, + PayloadVersion: 1, }) err := model.DeleteOverride(ctx, projKey, flagKey) diff --git a/internal/dev_server/model/project.go b/internal/dev_server/model/project.go index ad3e899d..778bd96b 100644 --- a/internal/dev_server/model/project.go +++ b/internal/dev_server/model/project.go @@ -101,8 +101,9 @@ func UpdateProject(ctx context.Context, projectKey string, context *ldcontext.Co } GetObserversFromContext(ctx).Notify(SyncEvent{ - ProjectKey: project.Key, - AllFlagsState: allFlagsWithOverrides, + ProjectKey: project.Key, + AllFlagsState: allFlagsWithOverrides, + PayloadVersion: project.PayloadVersion, }) return *project, nil } diff --git a/internal/dev_server/model/project_test.go b/internal/dev_server/model/project_test.go index 4a6d4a15..2b15f55d 100644 --- a/internal/dev_server/model/project_test.go +++ b/internal/dev_server/model/project_test.go @@ -188,8 +188,9 @@ func TestUpdateProject(t *testing.T) { observer. EXPECT(). Handle(model.SyncEvent{ - ProjectKey: proj.Key, - AllFlagsState: model.FromAllFlags(allFlagsState), + ProjectKey: proj.Key, + AllFlagsState: model.FromAllFlags(allFlagsState), + PayloadVersion: 2, }) project, err := model.UpdateProject(ctx, proj.Key, nil, nil) diff --git a/internal/dev_server/model/restore.go b/internal/dev_server/model/restore.go index 1a59554e..ca786cb2 100644 --- a/internal/dev_server/model/restore.go +++ b/internal/dev_server/model/restore.go @@ -29,8 +29,9 @@ func RestoreDb(ctx context.Context, stream io.Reader) error { return err } observers.Notify(SyncEvent{ - ProjectKey: project.Key, - AllFlagsState: allFlagsWithOverrides, + ProjectKey: project.Key, + AllFlagsState: allFlagsWithOverrides, + PayloadVersion: project.PayloadVersion, }) } diff --git a/internal/dev_server/model/restore_test.go b/internal/dev_server/model/restore_test.go index d522f410..b64ddf1a 100644 --- a/internal/dev_server/model/restore_test.go +++ b/internal/dev_server/model/restore_test.go @@ -53,7 +53,7 @@ func TestRestoreDb(t *testing.T) { store.EXPECT().GetDevProject(gomock.Any(), projKey).Return(&proj, nil) store.EXPECT().GetOverridesForProject(gomock.Any(), projKey).Return(model.Overrides{}, nil) observer := mocks.NewMockObserver(mockController) - observer.EXPECT().Handle(model.SyncEvent{ProjectKey: projKey, AllFlagsState: proj.AllFlagsState}) + observer.EXPECT().Handle(model.SyncEvent{ProjectKey: projKey, AllFlagsState: proj.AllFlagsState, PayloadVersion: proj.PayloadVersion}) observers.RegisterObserver(observer) diff --git a/internal/dev_server/sdk/fdv2.go b/internal/dev_server/sdk/fdv2.go index 4f665a97..012d725c 100644 --- a/internal/dev_server/sdk/fdv2.go +++ b/internal/dev_server/sdk/fdv2.go @@ -14,6 +14,7 @@ const ( fdv2ReasonUpToDate = "up-to-date" fdv2ReasonCantCatchup = "cant-catchup" fdv2ReasonPayloadMissing = "payload-missing" + fdv2ReasonUpdate = "update" ) // parseBasis extracts the payload ID and version from a basis state string of the @@ -39,7 +40,7 @@ func parseBasis(basis string) (string, int) { return inner[:lastColon], version } -// buildPollResponse constructs the FDv2 polling response. +// buildInitialResponse constructs the FDv2 initial response for both polling and streaming. // // payloadID is the stable identifier for this payload (the project key). // currentVersion is the project's current PayloadVersion. @@ -48,7 +49,7 @@ func parseBasis(basis string) (string, int) { // // Delta transfers are not supported: stale clients always receive a full payload. // Tracking the change history required for deltas is overkill for a local dev server. -func buildPollResponse(payloadID string, currentVersion int, flags model.FlagsState, basis string) (subsystems.PollingPayload, error) { +func buildInitialResponse(payloadID string, currentVersion int, flags model.FlagsState, basis string) (subsystems.PollingPayload, error) { basisPayloadID, basisVersion := parseBasis(basis) switch { case basisVersion == 0: @@ -122,11 +123,29 @@ func makePutObjectEvent(version int, key string, flagState model.FlagState) (sub return subsystems.RawEvent{Name: subsystems.EventPutObject, Data: data}, nil } +// buildFlagChangeEvents builds the events sequence for a single flag update pushed over a stream: +// server-intent(xfer-changes) + put-object(changed flag) + payload-transferred. +func buildFlagChangeEvents(payloadID string, version int, flagKey string, flagState model.FlagState) ([]subsystems.RawEvent, error) { + intentEvent, err := makeServerIntentEvent(payloadID, version, subsystems.IntentTransferChanges, fdv2ReasonUpdate) + if err != nil { + return nil, err + } + putEvent, err := makePutObjectEvent(version, flagKey, flagState) + if err != nil { + return nil, err + } + transferredEvent, err := makePayloadTransferredEvent(payloadID, version) + if err != nil { + return nil, err + } + return []subsystems.RawEvent{intentEvent, putEvent, transferredEvent}, nil +} + func makePayloadTransferredEvent(payloadID string, version int) (subsystems.RawEvent, error) { // The selector state is synthetic and dev-server-specific: the dev server uses the // project key as the payload ID rather than a server-assigned opaque value. The SDK - // echoes this selector back as ?basis on subsequent polls, where parseBasisVersion - // extracts the version from it. + // echoes this selector back as ?basis on subsequent polls, where parseBasis + // extracts the payload ID and version from it. selector := subsystems.NewSelector(fmt.Sprintf("(p:%s:%d)", payloadID, version), version) data, err := json.Marshal(selector) if err != nil { diff --git a/internal/dev_server/sdk/fdv2_test.go b/internal/dev_server/sdk/fdv2_test.go index 4020f5cb..bec14776 100644 --- a/internal/dev_server/sdk/fdv2_test.go +++ b/internal/dev_server/sdk/fdv2_test.go @@ -53,7 +53,7 @@ func TestBuildPollResponse(t *testing.T) { } t.Run("no basis sends xfer-full with payload-missing", func(t *testing.T) { - resp, err := buildPollResponse(payloadID, currentVersion, flags, "") + resp, err := buildInitialResponse(payloadID, currentVersion, flags, "") require.NoError(t, err) require.GreaterOrEqual(t, len(resp.Events), 3) // server-intent + put-objects + payload-transferred @@ -64,7 +64,7 @@ func TestBuildPollResponse(t *testing.T) { t.Run("up-to-date basis sends none with up-to-date", func(t *testing.T) { basis := fmt.Sprintf("(p:%s:%d)", payloadID, currentVersion) - resp, err := buildPollResponse(payloadID, currentVersion, flags, basis) + resp, err := buildInitialResponse(payloadID, currentVersion, flags, basis) require.NoError(t, err) require.Len(t, resp.Events, 1) @@ -73,7 +73,7 @@ func TestBuildPollResponse(t *testing.T) { t.Run("basis ahead of current version sends full transfer (e.g. project recreated)", func(t *testing.T) { basis := fmt.Sprintf("(p:%s:%d)", payloadID, currentVersion+10) - resp, err := buildPollResponse(payloadID, currentVersion, flags, basis) + resp, err := buildInitialResponse(payloadID, currentVersion, flags, basis) require.NoError(t, err) require.GreaterOrEqual(t, len(resp.Events), 3) @@ -83,7 +83,7 @@ func TestBuildPollResponse(t *testing.T) { t.Run("stale basis sends xfer-full with cant-catchup", func(t *testing.T) { basis := fmt.Sprintf("(p:%s:%d)", payloadID, currentVersion-1) - resp, err := buildPollResponse(payloadID, currentVersion, flags, basis) + resp, err := buildInitialResponse(payloadID, currentVersion, flags, basis) require.NoError(t, err) require.GreaterOrEqual(t, len(resp.Events), 3) @@ -93,7 +93,7 @@ func TestBuildPollResponse(t *testing.T) { t.Run("basis with wrong payload ID sends xfer-full", func(t *testing.T) { basis := fmt.Sprintf("(p:%s:%d)", "other-project", currentVersion) - resp, err := buildPollResponse(payloadID, currentVersion, flags, basis) + resp, err := buildInitialResponse(payloadID, currentVersion, flags, basis) require.NoError(t, err) require.GreaterOrEqual(t, len(resp.Events), 3) @@ -106,7 +106,7 @@ func TestBuildPollResponse(t *testing.T) { "flag-a": model.FlagState{Value: ldvalue.Bool(true), Version: 1}, "flag-b": model.FlagState{Value: ldvalue.String("hello"), Version: 2}, } - resp, err := buildPollResponse(payloadID, currentVersion, multiFlags, "") + resp, err := buildInitialResponse(payloadID, currentVersion, multiFlags, "") require.NoError(t, err) // server-intent + 2 put-objects + payload-transferred diff --git a/internal/dev_server/sdk/polling.go b/internal/dev_server/sdk/polling.go index b3d46989..11fb8042 100644 --- a/internal/dev_server/sdk/polling.go +++ b/internal/dev_server/sdk/polling.go @@ -25,7 +25,7 @@ func PollV2(w http.ResponseWriter, r *http.Request) { return } - response, err := buildPollResponse(projectKey, project.PayloadVersion, allFlags, r.URL.Query().Get("basis")) + response, err := buildInitialResponse(projectKey, project.PayloadVersion, allFlags, r.URL.Query().Get("basis")) if err != nil { WriteError(ctx, w, errors.Wrap(err, "failed to build poll response")) return diff --git a/internal/dev_server/sdk/routes.go b/internal/dev_server/sdk/routes.go index fdc221be..b59e0681 100644 --- a/internal/dev_server/sdk/routes.go +++ b/internal/dev_server/sdk/routes.go @@ -22,6 +22,7 @@ func BindRoutes(router *mux.Router) { router.Handle("/all", GetProjectKeyFromAuthorizationHeader(http.HandlerFunc(StreamServerAllPayload))) router.Handle("/sdk/latest-all", GetProjectKeyFromAuthorizationHeader(http.HandlerFunc(LatestAll))) router.Handle("/sdk/poll", GetProjectKeyFromAuthorizationHeader(http.HandlerFunc(PollV2))) + router.Handle("/sdk/stream", GetProjectKeyFromAuthorizationHeader(http.HandlerFunc(StreamV2))) router.PathPrefix("/sdk/flags/{flagKey}"). Methods(http.MethodGet). diff --git a/internal/dev_server/sdk/stream_client_flags.go b/internal/dev_server/sdk/stream_client_flags.go index 38573489..4b2cb371 100644 --- a/internal/dev_server/sdk/stream_client_flags.go +++ b/internal/dev_server/sdk/stream_client_flags.go @@ -25,7 +25,7 @@ func StreamClientFlags(w http.ResponseWriter, r *http.Request) { updateChan, doneChan := OpenStream( w, r.Context().Done(), - Message{Event: TYPE_PUT, Data: jsonBody}, + Message{Event: TYPE_PUT, Data: jsonBody}.ToPayload(), ) defer close(updateChan) projectKey := GetProjectKeyFromContext(ctx) @@ -46,7 +46,7 @@ func StreamClientFlags(w http.ResponseWriter, r *http.Request) { } type clientFlagsObserver struct { - updateChan chan<- Message + updateChan chan<- []byte projectKey string } diff --git a/internal/dev_server/sdk/stream_server_fdv2.go b/internal/dev_server/sdk/stream_server_fdv2.go new file mode 100644 index 00000000..159fb29e --- /dev/null +++ b/internal/dev_server/sdk/stream_server_fdv2.go @@ -0,0 +1,89 @@ +package sdk + +import ( + "fmt" + "log" + "net/http" + + "github.com/launchdarkly/go-server-sdk/v7/subsystems" + "github.com/launchdarkly/ldcli/internal/dev_server/model" + "github.com/pkg/errors" +) + +func StreamV2(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + projectKey := GetProjectKeyFromContext(ctx) + store := model.StoreFromContext(ctx) + + project, err := store.GetDevProject(ctx, projectKey) + if err != nil { + WriteError(ctx, w, errors.Wrap(err, "failed to get project")) + return + } + + allFlags, err := project.GetFlagStateWithOverridesForProject(ctx) + if err != nil { + WriteError(ctx, w, errors.Wrap(err, "failed to get flag state")) + return + } + + initialPayload, err := buildInitialResponse(projectKey, project.PayloadVersion, allFlags, r.URL.Query().Get("basis")) + if err != nil { + WriteError(ctx, w, errors.Wrap(err, "failed to build initial payload")) + return + } + + updateChan, doneChan := OpenStream(w, r.Context().Done(), fdv2SSEPayload(initialPayload.Events)) + defer close(updateChan) + + observer := fdv2StreamObserver{updateChan: updateChan, projectKey: projectKey} + observerID := model.GetObserversFromContext(ctx).RegisterObserver(observer) + defer func() { + if ok := model.GetObserversFromContext(ctx).DeregisterObserver(observerID); !ok { + log.Printf("unable to deregister fdv2 stream observer") + } + }() + + err = <-doneChan + if err != nil { + WriteError(ctx, w, errors.Wrap(err, "stream failure")) + } +} + +// fdv2SSEPayload formats a slice of FDv2 events as raw SSE bytes. +// Each event becomes an individual SSE event in the output. +func fdv2SSEPayload(events []subsystems.RawEvent) []byte { + var buf []byte + for _, e := range events { + buf = append(buf, fmt.Sprintf("event:%s\ndata:%s\n\n", e.Name, e.Data)...) + } + return buf +} + +type fdv2StreamObserver struct { + updateChan chan<- []byte + projectKey string +} + +func (o fdv2StreamObserver) Handle(event interface{}) { + switch event := event.(type) { + case model.OverrideEvent: + if event.ProjectKey != o.projectKey { + return + } + events, err := buildFlagChangeEvents(o.projectKey, event.PayloadVersion, event.FlagKey, event.FlagState) + if err != nil { + panic(errors.Wrap(err, "failed to build flag change events in fdv2 stream observer")) + } + o.updateChan <- fdv2SSEPayload(events) + case model.SyncEvent: + if event.ProjectKey != o.projectKey { + return + } + payload, err := buildFullTransferResponse(o.projectKey, event.PayloadVersion, event.AllFlagsState, fdv2ReasonCantCatchup) + if err != nil { + panic(errors.Wrap(err, "failed to build full transfer in fdv2 stream observer")) + } + o.updateChan <- fdv2SSEPayload(payload.Events) + } +} diff --git a/internal/dev_server/sdk/stream_server_flags.go b/internal/dev_server/sdk/stream_server_flags.go index 82a305ce..a7479d51 100644 --- a/internal/dev_server/sdk/stream_server_flags.go +++ b/internal/dev_server/sdk/stream_server_flags.go @@ -27,7 +27,7 @@ func StreamServerAllPayload(w http.ResponseWriter, r *http.Request) { updateChan, doneChan := OpenStream( w, r.Context().Done(), - Message{Event: TYPE_PUT, Data: jsonBody}, + Message{Event: TYPE_PUT, Data: jsonBody}.ToPayload(), ) defer close(updateChan) observer := serverFlagsObserver{updateChan, projectKey} @@ -47,7 +47,7 @@ func StreamServerAllPayload(w http.ResponseWriter, r *http.Request) { } type serverFlagsObserver struct { - updateChan chan<- Message + updateChan chan<- []byte projectKey string } diff --git a/internal/dev_server/sdk/streaming.go b/internal/dev_server/sdk/streaming.go index 04f650c8..b8a44299 100644 --- a/internal/dev_server/sdk/streaming.go +++ b/internal/dev_server/sdk/streaming.go @@ -28,10 +28,11 @@ func (m Message) ToPayload() []byte { return payload } -// OpenStream sends data to a response using the initial payload and subsequently via the returned write only channel -func OpenStream(w http.ResponseWriter, done <-chan struct{}, initialMessage Message) (chan<- Message, <-chan error) { +// OpenStream sets SSE headers, writes initialPayload, and starts the SSE loop. +// Each []byte sent to the returned channel is written verbatim to the response. +func OpenStream(w http.ResponseWriter, done <-chan struct{}, initialPayload []byte) (chan<- []byte, <-chan error) { errChan := make(chan error) - updateChan := make(chan Message, 10) + updateChan := make(chan []byte, 10) go func() { var err error defer func() { @@ -45,7 +46,7 @@ func OpenStream(w http.ResponseWriter, done <-chan struct{}, initialMessage Mess } w.Header().Set("Content-Type", "text/event-stream") - _, err = w.Write(initialMessage.ToPayload()) + _, err = w.Write(initialPayload) if err != nil { return errors.Wrap(err, "unable to write response") } @@ -60,8 +61,8 @@ func OpenStream(w http.ResponseWriter, done <-chan struct{}, initialMessage Mess return errors.Wrap(err, "unable to write response") } flusher.Flush() - case msg := <-updateChan: - _, err = w.Write(msg.ToPayload()) + case payload := <-updateChan: + _, err = w.Write(payload) if err != nil { return errors.Wrap(err, "unable to write response") } @@ -77,7 +78,7 @@ func OpenStream(w http.ResponseWriter, done <-chan struct{}, initialMessage Mess } func SendMessage( - updateChan chan<- Message, + updateChan chan<- []byte, msgType MessageType, data interface{}, ) error { @@ -89,7 +90,7 @@ func SendMessage( updateChan <- Message{ Event: msgType, Data: payload, - } + }.ToPayload() return nil }