From 8cc8470fa66d704f69eb49d89127fce1e8b51487 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 30 Mar 2026 10:58:49 +0000 Subject: [PATCH 1/5] Add S3 presigned URL support for blob CloudEvents Objects stored under the cloudevent/blobs/ key prefix are now served via a short-lived presigned S3 GET URL (dataUrl field) instead of being downloaded and embedded inline in the GraphQL response. This avoids ballooning the JSON payload for large binary objects (e.g. scans). - Add Presigner interface + PresignBlobURL method to eventrepo.Service - Export BlobKeyPrefix constant ("cloudevent/blobs/") - Add dataUrl: String field to GraphQL CloudEvent type - Detect blob prefix in LatestCloudEvent / CloudEvents resolvers; skip GetObject and presign against the primary bucket instead - Wire s3.NewPresignClient into both GraphQL and gRPC app paths - Update eventrepo.New signature; pass nil in tests that don't need presigning - Add MockPresigner to generated mock file https://claude.ai/code/session_015ReeLGeCywJfYkkrrng5wU --- internal/app/app.go | 5 +- internal/graph/base.resolvers.go | 58 ++++++++++++++++++--- internal/graph/cloud_event.go | 5 +- internal/graph/generated.go | 76 ++++++++++++++++++++++++++++ pkg/eventrepo/event_repo_test.go | 18 +++---- pkg/eventrepo/eventrepo.go | 33 +++++++++++- pkg/eventrepo/eventrepo_mock_test.go | 45 ++++++++++++++++ schema/base.graphqls | 2 + 8 files changed, 222 insertions(+), 20 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index 8c153ef..8986289 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -14,6 +14,7 @@ import ( "github.com/DIMO-Network/fetch-api/internal/limits" "github.com/DIMO-Network/fetch-api/pkg/eventrepo" fetchgrpc "github.com/DIMO-Network/fetch-api/pkg/grpc" + "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/DIMO-Network/server-garage/pkg/gql/errorhandler" gqlmetrics "github.com/DIMO-Network/server-garage/pkg/gql/metrics" "github.com/DIMO-Network/shared/pkg/middleware/metrics" @@ -42,7 +43,7 @@ func New(settings config.Settings) (*App, error) { } s3Client := s3ClientFromSettings(&settings) buckets := []string{settings.CloudEventBucket, settings.EphemeralBucket, settings.ParquetBucket} - eventService := eventrepo.New(chConn, s3Client, settings.ParquetBucket) + eventService := eventrepo.New(chConn, s3Client, s3.NewPresignClient(s3Client), settings.ParquetBucket) var identityClient identity.Client if settings.IdentityAPIURL != "" { @@ -118,7 +119,7 @@ func CreateGRPCServer(logger *zerolog.Logger, settings *config.Settings) (*grpc. } s3Client := s3ClientFromSettings(settings) - eventService := eventrepo.New(chConn, s3Client, settings.ParquetBucket) + eventService := eventrepo.New(chConn, s3Client, s3.NewPresignClient(s3Client), settings.ParquetBucket) rpcServer := rpc.NewServer([]string{settings.CloudEventBucket, settings.EphemeralBucket, settings.ParquetBucket}, eventService) diff --git a/internal/graph/base.resolvers.go b/internal/graph/base.resolvers.go index e3fb053..4c2498e 100644 --- a/internal/graph/base.resolvers.go +++ b/internal/graph/base.resolvers.go @@ -9,10 +9,12 @@ import ( "context" "database/sql" "errors" + "strings" "github.com/DIMO-Network/cloudevent" "github.com/DIMO-Network/fetch-api/internal/fetch" "github.com/DIMO-Network/fetch-api/internal/graph/model" + "github.com/DIMO-Network/fetch-api/pkg/eventrepo" ) // Header is the resolver for the header field. Returns a pointer into the wrapped event; no copy. @@ -40,6 +42,14 @@ func (r *cloudEventResolver) DataBase64(ctx context.Context, obj *CloudEventWrap return &obj.Raw.DataBase64, nil } +// DataUrl is the resolver for the dataUrl field. Returns a presigned S3 URL for blob payloads. +func (r *cloudEventResolver) DataUrl(ctx context.Context, obj *CloudEventWrapper) (*string, error) { + if obj == nil || obj.DataURL == "" { + return nil, nil + } + return &obj.DataURL, nil +} + // LatestIndex is the resolver for the latestIndex field. func (r *queryResolver) LatestIndex(ctx context.Context, did string, filter *model.CloudEventFilter) (*model.CloudEventIndex, error) { opts, err := r.requireSubjectOptsByDID(ctx, did, filter) @@ -83,6 +93,17 @@ func (r *queryResolver) LatestCloudEvent(ctx context.Context, did string, filter if err != nil { return nil, err } + if strings.HasPrefix(idx.Data.Key, eventrepo.BlobKeyPrefix) { + if len(r.Buckets) == 0 { + return nil, errors.New("no buckets configured") + } + url, err := r.EventService.PresignBlobURL(ctx, idx.Data.Key, r.Buckets[0]) + if err != nil { + return nil, err + } + hdr := idx.CloudEventHeader + return &CloudEventWrapper{Raw: &cloudevent.RawEvent{CloudEventHeader: hdr}, DataURL: url}, nil + } ce, err := fetch.GetCloudEventFromIndex(ctx, r.EventService, &idx, r.Buckets) if err != nil { return nil, err @@ -103,14 +124,39 @@ func (r *queryResolver) CloudEvents(ctx context.Context, did string, limit *int, } return nil, err } - events, err := fetch.ListCloudEventsFromIndexes(ctx, r.EventService, list, r.Buckets) - if err != nil { - return nil, err + + out := make([]*CloudEventWrapper, len(list)) + var nonBlobIdxs []cloudevent.CloudEvent[eventrepo.ObjectInfo] + var nonBlobPos []int + + for i, idx := range list { + if strings.HasPrefix(idx.Data.Key, eventrepo.BlobKeyPrefix) { + if len(r.Buckets) == 0 { + return nil, errors.New("no buckets configured") + } + url, err := r.EventService.PresignBlobURL(ctx, idx.Data.Key, r.Buckets[0]) + if err != nil { + return nil, err + } + hdr := idx.CloudEventHeader + out[i] = &CloudEventWrapper{Raw: &cloudevent.RawEvent{CloudEventHeader: hdr}, DataURL: url} + } else { + nonBlobIdxs = append(nonBlobIdxs, idx) + nonBlobPos = append(nonBlobPos, i) + } } - out := make([]*CloudEventWrapper, len(events)) - for i := range events { - out[i] = &CloudEventWrapper{Raw: &events[i]} + + if len(nonBlobIdxs) > 0 { + events, err := fetch.ListCloudEventsFromIndexes(ctx, r.EventService, nonBlobIdxs, r.Buckets) + if err != nil { + return nil, err + } + for j := range events { + ev := events[j] + out[nonBlobPos[j]] = &CloudEventWrapper{Raw: &ev} + } } + return out, nil } diff --git a/internal/graph/cloud_event.go b/internal/graph/cloud_event.go index 761cd63..d42b262 100644 --- a/internal/graph/cloud_event.go +++ b/internal/graph/cloud_event.go @@ -7,9 +7,10 @@ import ( ) // CloudEventWrapper holds a pointer to a RawEvent so resolvers can expose -// header, data, and dataBase64 without copying the underlying event. +// header, data, dataBase64, and dataUrl without copying the underlying event. type CloudEventWrapper struct { - Raw *cloudevent.RawEvent + Raw *cloudevent.RawEvent + DataURL string // non-empty when the payload is a blob served via presigned URL } // RawJSON is the raw bytes of a JSON value. It implements graphql.Marshaler by diff --git a/internal/graph/generated.go b/internal/graph/generated.go index d7b824f..29fa6b9 100644 --- a/internal/graph/generated.go +++ b/internal/graph/generated.go @@ -51,6 +51,7 @@ type ComplexityRoot struct { CloudEvent struct { Data func(childComplexity int) int DataBase64 func(childComplexity int) int + DataUrl func(childComplexity int) int Header func(childComplexity int) int } @@ -94,6 +95,7 @@ type CloudEventResolver interface { Header(ctx context.Context, obj *CloudEventWrapper) (*cloudevent.CloudEventHeader, error) Data(ctx context.Context, obj *CloudEventWrapper) (RawJSON, error) DataBase64(ctx context.Context, obj *CloudEventWrapper) (*string, error) + DataUrl(ctx context.Context, obj *CloudEventWrapper) (*string, error) } type QueryResolver interface { LatestIndex(ctx context.Context, did string, filter *model.CloudEventFilter) (*model.CloudEventIndex, error) @@ -134,6 +136,12 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin } return e.complexity.CloudEvent.DataBase64(childComplexity), true + case "CloudEvent.dataUrl": + if e.complexity.CloudEvent.DataUrl == nil { + break + } + + return e.complexity.CloudEvent.DataUrl(childComplexity), true case "CloudEvent.header": if e.complexity.CloudEvent.Header == nil { break @@ -419,6 +427,8 @@ type CloudEvent { data: JSON """Base64-encoded payload when present. Omitted if not requested.""" dataBase64: String + """Presigned S3 URL for large blob payloads. Set instead of data/dataBase64 when the object key has the blob prefix.""" + dataUrl: String } """ @@ -777,6 +787,35 @@ func (ec *executionContext) fieldContext_CloudEvent_dataBase64(_ context.Context return fc, nil } +func (ec *executionContext) _CloudEvent_dataUrl(ctx context.Context, field graphql.CollectedField, obj *CloudEventWrapper) (ret graphql.Marshaler) { + return graphql.ResolveField( + ctx, + ec.OperationContext, + field, + ec.fieldContext_CloudEvent_dataUrl, + func(ctx context.Context) (any, error) { + return ec.resolvers.CloudEvent().DataUrl(ctx, obj) + }, + nil, + ec.marshalOString2áš–string, + true, + false, + ) +} + +func (ec *executionContext) fieldContext_CloudEvent_dataUrl(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "CloudEvent", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _CloudEventHeader_specversion(ctx context.Context, field graphql.CollectedField, obj *cloudevent.CloudEventHeader) (ret graphql.Marshaler) { return graphql.ResolveField( ctx, @@ -1450,6 +1489,8 @@ func (ec *executionContext) fieldContext_Query_latestCloudEvent(ctx context.Cont return ec.fieldContext_CloudEvent_data(ctx, field) case "dataBase64": return ec.fieldContext_CloudEvent_dataBase64(ctx, field) + case "dataUrl": + return ec.fieldContext_CloudEvent_dataUrl(ctx, field) } return nil, fmt.Errorf("no field named %q was found under type CloudEvent", field.Name) }, @@ -1499,6 +1540,8 @@ func (ec *executionContext) fieldContext_Query_cloudEvents(ctx context.Context, return ec.fieldContext_CloudEvent_data(ctx, field) case "dataBase64": return ec.fieldContext_CloudEvent_dataBase64(ctx, field) + case "dataUrl": + return ec.fieldContext_CloudEvent_dataUrl(ctx, field) } return nil, fmt.Errorf("no field named %q was found under type CloudEvent", field.Name) }, @@ -3311,6 +3354,39 @@ func (ec *executionContext) _CloudEvent(ctx context.Context, sel ast.SelectionSe continue } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) + case "dataUrl": + field := field + + innerFunc := func(ctx context.Context, _ *graphql.FieldSet) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._CloudEvent_dataUrl(ctx, field, obj) + return res + } + + if field.Deferrable != nil { + dfs, ok := deferred[field.Deferrable.Label] + di := 0 + if ok { + dfs.AddField(field) + di = len(dfs.Values) - 1 + } else { + dfs = graphql.NewFieldSet([]graphql.CollectedField{field}) + deferred[field.Deferrable.Label] = dfs + } + dfs.Concurrently(di, func(ctx context.Context) graphql.Marshaler { + return innerFunc(ctx, dfs) + }) + + // don't run the out.Concurrently() call below + out.Values[i] = graphql.Null + continue + } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) default: panic("unknown field " + strconv.Quote(field.Name)) diff --git a/pkg/eventrepo/event_repo_test.go b/pkg/eventrepo/event_repo_test.go index 75a879a..83061d8 100644 --- a/pkg/eventrepo/event_repo_test.go +++ b/pkg/eventrepo/event_repo_test.go @@ -93,7 +93,7 @@ func TestGetLatestIndexKey(t *testing.T) { }, } - indexService := eventrepo.New(conn, nil, "") + indexService := eventrepo.New(conn, nil, nil, "") for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -172,7 +172,7 @@ func TestGetDataFromIndex(t *testing.T) { ContentLength: ref(int64(len(content))), }, nil).AnyTimes() - indexService := eventrepo.New(conn, mockS3Client, "") + indexService := eventrepo.New(conn, mockS3Client, nil, "") for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -204,7 +204,7 @@ func TestStoreObject(t *testing.T) { mockS3Client := NewMockObjectGetter(ctrl) mockS3Client.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(&s3.PutObjectOutput{}, nil).AnyTimes() - indexService := eventrepo.New(conn, mockS3Client, "") + indexService := eventrepo.New(conn, mockS3Client, nil, "") content := []byte(`{"vin": "1HGCM82633A123456"}`) did := cloudevent.ERC721DID{ @@ -333,7 +333,7 @@ func TestGetData(t *testing.T) { ctrl := gomock.NewController(t) mockS3Client := NewMockObjectGetter(ctrl) - indexService := eventrepo.New(conn, mockS3Client, "") + indexService := eventrepo.New(conn, mockS3Client, nil, "") // Allow GetObject calls in any order since fetches are concurrent. if len(tt.expectedIndexKeys) > 0 { mockS3Client.EXPECT().GetObject(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { @@ -424,7 +424,7 @@ func TestGetEventWithAllHeaderFields(t *testing.T) { eventDataEnvelope := []byte(`{"data":` + string(eventData) + `}`) // Create service - indexService := eventrepo.New(conn, mockS3Client, "") + indexService := eventrepo.New(conn, mockS3Client, nil, "") // Test retrieving the event t.Run("retrieve event with full headers", func(t *testing.T) { @@ -576,7 +576,7 @@ func TestGetCloudEventFromIndex_ParquetRef(t *testing.T) { ctrl := gomock.NewController(t) mockS3 := mockS3ParquetReader(t, ctrl, parquetBytes) - indexService := eventrepo.New(conn, mockS3, "test-parquet-bucket") + indexService := eventrepo.New(conn, mockS3, nil, "test-parquet-bucket") // Build the index object as GetCloudEventFromIndex expects index := cloudevent.CloudEvent[eventrepo.ObjectInfo]{ @@ -655,7 +655,7 @@ func TestListCloudEventsFromIndexes_ParquetCaching(t *testing.T) { ctrl := gomock.NewController(t) mockS3 := mockS3ParquetReader(t, ctrl, parquetBytes) - indexService := eventrepo.New(conn, mockS3, "test-parquet-bucket") + indexService := eventrepo.New(conn, mockS3, nil, "test-parquet-bucket") indexes := []cloudevent.CloudEvent[eventrepo.ObjectInfo]{ {CloudEventHeader: hdr0, Data: eventrepo.ObjectInfo{Key: indexKeys[0]}}, @@ -747,7 +747,7 @@ func TestListIndexesAdvanced(t *testing.T) { keyTypeStatusSource1Producer3 := insertTestData(t, ctx, conn, eventIdx3) keyTypeStatusSource3Producer4 := insertTestData(t, ctx, conn, eventIdx4) - indexService := eventrepo.New(conn, nil, "") + indexService := eventrepo.New(conn, nil, nil, "") tests := []struct { name string @@ -1032,7 +1032,7 @@ func TestGetCloudEventTypeSummaries(t *testing.T) { insertTestData(t, ctx, conn, status3) insertTestData(t, ctx, conn, fp1) - indexService := eventrepo.New(conn, nil, "") + indexService := eventrepo.New(conn, nil, nil, "") t.Run("no filter returns all types", func(t *testing.T) { opts := &grpc.SearchOptions{ diff --git a/pkg/eventrepo/eventrepo.go b/pkg/eventrepo/eventrepo.go index 752c775..a5f36b1 100644 --- a/pkg/eventrepo/eventrepo.go +++ b/pkg/eventrepo/eventrepo.go @@ -15,6 +15,8 @@ import ( chindexer "github.com/DIMO-Network/cloudevent/clickhouse" "github.com/DIMO-Network/cloudevent/parquet" "github.com/DIMO-Network/fetch-api/pkg/grpc" + "github.com/aws/aws-sdk-go-v2/aws" + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/volatiletech/sqlboiler/v4/drivers" "github.com/volatiletech/sqlboiler/v4/queries" @@ -30,6 +32,7 @@ const tagsColumn = "JSONExtract(extras, 'tags', 'Array(String)')" // Service manages and retrieves data messages from indexed objects in S3. type Service struct { objGetter ObjectGetter + presigner Presigner chConn clickhouse.Conn // parquetBucket is the object storage bucket for Iceberg Parquet files. parquetBucket string @@ -46,15 +49,43 @@ type ObjectGetter interface { PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) } +// Presigner generates presigned S3 GET URLs. +type Presigner interface { + PresignGetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.PresignOptions)) (*v4.PresignedHTTPRequest, error) +} + +// BlobKeyPrefix is the S3 key prefix used for large binary blob objects. +// Keys with this prefix are served via presigned URL instead of inline in the response. +const BlobKeyPrefix = "cloudevent/blobs/" + +// presignTTL is the lifetime of generated presigned S3 URLs. +const presignTTL = 15 * time.Minute + // New creates a new instance of Service. -func New(chConn clickhouse.Conn, objGetter ObjectGetter, parquetBucket string) *Service { +func New(chConn clickhouse.Conn, objGetter ObjectGetter, presigner Presigner, parquetBucket string) *Service { return &Service{ objGetter: objGetter, + presigner: presigner, chConn: chConn, parquetBucket: parquetBucket, } } +// PresignBlobURL returns a short-lived presigned GET URL for the given S3 key and bucket. +func (s *Service) PresignBlobURL(ctx context.Context, key, bucket string) (string, error) { + if s.presigner == nil { + return "", fmt.Errorf("presigner not configured") + } + req, err := s.presigner.PresignGetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }, s3.WithPresignExpires(presignTTL)) + if err != nil { + return "", fmt.Errorf("presign %s/%s: %w", bucket, key, err) + } + return req.URL, nil +} + // GetLatestIndex returns the latest cloud event index that matches the given options. func (s *Service) GetLatestIndex(ctx context.Context, opts *grpc.SearchOptions) (cloudevent.CloudEvent[ObjectInfo], error) { advancedOpts := convertSearchOptionsToAdvanced(opts) diff --git a/pkg/eventrepo/eventrepo_mock_test.go b/pkg/eventrepo/eventrepo_mock_test.go index 50daa38..48a8230 100644 --- a/pkg/eventrepo/eventrepo_mock_test.go +++ b/pkg/eventrepo/eventrepo_mock_test.go @@ -13,6 +13,7 @@ import ( context "context" reflect "reflect" + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" s3 "github.com/aws/aws-sdk-go-v2/service/s3" gomock "go.uber.org/mock/gomock" ) @@ -80,3 +81,47 @@ func (mr *MockObjectGetterMockRecorder) PutObject(ctx, params any, optFns ...any varargs := append([]any{ctx, params}, optFns...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutObject", reflect.TypeOf((*MockObjectGetter)(nil).PutObject), varargs...) } + +// MockPresigner is a mock of Presigner interface. +type MockPresigner struct { + ctrl *gomock.Controller + recorder *MockPresignerMockRecorder + isgomock struct{} +} + +// MockPresignerMockRecorder is the mock recorder for MockPresigner. +type MockPresignerMockRecorder struct { + mock *MockPresigner +} + +// NewMockPresigner creates a new mock instance. +func NewMockPresigner(ctrl *gomock.Controller) *MockPresigner { + mock := &MockPresigner{ctrl: ctrl} + mock.recorder = &MockPresignerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPresigner) EXPECT() *MockPresignerMockRecorder { + return m.recorder +} + +// PresignGetObject mocks base method. +func (m *MockPresigner) PresignGetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.PresignOptions)) (*v4.PresignedHTTPRequest, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, params} + for _, a := range optFns { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "PresignGetObject", varargs...) + ret0, _ := ret[0].(*v4.PresignedHTTPRequest) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PresignGetObject indicates an expected call of PresignGetObject. +func (mr *MockPresignerMockRecorder) PresignGetObject(ctx, params any, optFns ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, params}, optFns...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PresignGetObject", reflect.TypeOf((*MockPresigner)(nil).PresignGetObject), varargs...) +} diff --git a/schema/base.graphqls b/schema/base.graphqls index aafb0b9..0035913 100644 --- a/schema/base.graphqls +++ b/schema/base.graphqls @@ -18,6 +18,8 @@ type CloudEvent { data: JSON """Base64-encoded payload when present. Omitted if not requested.""" dataBase64: String + """Presigned S3 URL for large blob payloads. Set instead of data/dataBase64 when the object key has the blob prefix.""" + dataUrl: String } """ From a290941f44a1d1873ef19fbbcfe6d0083d3cd0cf Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 31 Mar 2026 17:19:04 +0000 Subject: [PATCH 2/5] Improve dataUrl field description in GraphQL schema Describe the field in terms of behavior (large files) rather than the internal key prefix convention, which is an implementation detail that may change. https://claude.ai/code/session_015ReeLGeCywJfYkkrrng5wU --- internal/graph/generated.go | 2 +- schema/base.graphqls | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/graph/generated.go b/internal/graph/generated.go index 29fa6b9..9a20d14 100644 --- a/internal/graph/generated.go +++ b/internal/graph/generated.go @@ -427,7 +427,7 @@ type CloudEvent { data: JSON """Base64-encoded payload when present. Omitted if not requested.""" dataBase64: String - """Presigned S3 URL for large blob payloads. Set instead of data/dataBase64 when the object key has the blob prefix.""" + """Presigned S3 URL for large binary payloads. Populated instead of data/dataBase64 for large files.""" dataUrl: String } diff --git a/schema/base.graphqls b/schema/base.graphqls index 0035913..8a09c4c 100644 --- a/schema/base.graphqls +++ b/schema/base.graphqls @@ -18,7 +18,7 @@ type CloudEvent { data: JSON """Base64-encoded payload when present. Omitted if not requested.""" dataBase64: String - """Presigned S3 URL for large blob payloads. Set instead of data/dataBase64 when the object key has the blob prefix.""" + """Presigned S3 URL for large binary payloads. Populated instead of data/dataBase64 for large files.""" dataUrl: String } From 6f988edb9f4dd3a6b2975b4d30903c7415f8ccaf Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 31 Mar 2026 17:23:46 +0000 Subject: [PATCH 3/5] Add tests for presigned URL generation and blob resolver fields - pkg/eventrepo/presign_test.go: unit tests for PresignBlobURL covering correct bucket/key routing, 15-minute TTL, presigner error propagation, and nil-presigner guard - internal/graph/blob_resolver_test.go: unit tests for the DataUrl resolver (nil wrapper, empty DataURL, populated DataURL) and a composite test that a blob wrapper returns nil for data/dataBase64 and a URL for dataUrl https://claude.ai/code/session_015ReeLGeCywJfYkkrrng5wU --- internal/graph/blob_resolver_test.go | 67 +++++++++++++++++++++++++ pkg/eventrepo/presign_test.go | 74 ++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+) create mode 100644 internal/graph/blob_resolver_test.go create mode 100644 pkg/eventrepo/presign_test.go diff --git a/internal/graph/blob_resolver_test.go b/internal/graph/blob_resolver_test.go new file mode 100644 index 0000000..490fcc0 --- /dev/null +++ b/internal/graph/blob_resolver_test.go @@ -0,0 +1,67 @@ +package graph + +import ( + "context" + "testing" + + "github.com/DIMO-Network/cloudevent" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestDataUrlResolver verifies the DataUrl field resolver reads CloudEventWrapper.DataURL. +func TestDataUrlResolver(t *testing.T) { + r := &cloudEventResolver{&Resolver{}} + ctx := context.Background() + + t.Run("returns nil when DataURL is empty", func(t *testing.T) { + obj := &CloudEventWrapper{Raw: &cloudevent.RawEvent{}} + result, err := r.DataUrl(ctx, obj) + require.NoError(t, err) + assert.Nil(t, result) + }) + + t.Run("returns URL when DataURL is set", func(t *testing.T) { + const url = "https://s3.amazonaws.com/bucket/cloudevent/blobs/scan.bin?X-Amz-Signature=abc" + obj := &CloudEventWrapper{ + Raw: &cloudevent.RawEvent{}, + DataURL: url, + } + result, err := r.DataUrl(ctx, obj) + require.NoError(t, err) + require.NotNil(t, result) + assert.Equal(t, url, *result) + }) + + t.Run("returns nil for nil wrapper", func(t *testing.T) { + result, err := r.DataUrl(ctx, nil) + require.NoError(t, err) + assert.Nil(t, result) + }) +} + +// TestBlobWrapperFieldsAreNil verifies that a CloudEventWrapper constructed for a +// blob (no inline payload) returns nil from the data and dataBase64 resolvers. +// This mirrors what the query resolver does when it builds a blob wrapper. +func TestBlobWrapperFieldsAreNil(t *testing.T) { + r := &cloudEventResolver{&Resolver{}} + ctx := context.Background() + + blobWrapper := &CloudEventWrapper{ + Raw: &cloudevent.RawEvent{CloudEventHeader: cloudevent.CloudEventHeader{ID: "evt-1"}}, + DataURL: "https://example.com/presigned", + } + + data, err := r.Data(ctx, blobWrapper) + require.NoError(t, err) + assert.Nil(t, data, "data should be nil for a blob wrapper") + + b64, err := r.DataBase64(ctx, blobWrapper) + require.NoError(t, err) + assert.Nil(t, b64, "dataBase64 should be nil for a blob wrapper") + + url, err := r.DataUrl(ctx, blobWrapper) + require.NoError(t, err) + require.NotNil(t, url) + assert.Equal(t, "https://example.com/presigned", *url) +} diff --git a/pkg/eventrepo/presign_test.go b/pkg/eventrepo/presign_test.go new file mode 100644 index 0000000..70dfd23 --- /dev/null +++ b/pkg/eventrepo/presign_test.go @@ -0,0 +1,74 @@ +package eventrepo_test + +import ( + "context" + "fmt" + "testing" + "time" + + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + "github.com/DIMO-Network/fetch-api/pkg/eventrepo" +) + +func TestPresignBlobURL(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + mockPresigner := NewMockPresigner(ctrl) + + svc := eventrepo.New(nil, nil, mockPresigner, "") + + const ( + bucket = "test-bucket" + key = "cloudevent/blobs/some-scan.bin" + expectedURL = "https://s3.amazonaws.com/test-bucket/cloudevent/blobs/some-scan.bin?X-Amz-Signature=abc123" + ) + + mockPresigner.EXPECT(). + PresignGetObject(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, params *s3.GetObjectInput, optFns ...func(*s3.PresignOptions)) (*v4.PresignedHTTPRequest, error) { + assert.Equal(t, bucket, *params.Bucket) + assert.Equal(t, key, *params.Key) + + // Verify the TTL is applied. + opts := s3.PresignOptions{} + for _, fn := range optFns { + fn(&opts) + } + assert.Equal(t, 15*time.Minute, opts.Expires) + + return &v4.PresignedHTTPRequest{URL: expectedURL}, nil + }) + + url, err := svc.PresignBlobURL(context.Background(), key, bucket) + require.NoError(t, err) + assert.Equal(t, expectedURL, url) +} + +func TestPresignBlobURL_PresignerError(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + mockPresigner := NewMockPresigner(ctrl) + + svc := eventrepo.New(nil, nil, mockPresigner, "") + + mockPresigner.EXPECT(). + PresignGetObject(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, fmt.Errorf("signing failure")) + + _, err := svc.PresignBlobURL(context.Background(), "cloudevent/blobs/test.bin", "test-bucket") + require.Error(t, err) + assert.Contains(t, err.Error(), "signing failure") +} + +func TestPresignBlobURL_NilPresigner(t *testing.T) { + t.Parallel() + svc := eventrepo.New(nil, nil, nil, "") + + _, err := svc.PresignBlobURL(context.Background(), "cloudevent/blobs/test.bin", "test-bucket") + require.Error(t, err) +} From 4540f76e8f396b2746afa9a67afce1e393de3ebe Mon Sep 17 00:00:00 2001 From: Dylan Moreland Date: Tue, 31 Mar 2026 14:05:17 -0400 Subject: [PATCH 4/5] Get rid of these imagined handlers --- internal/graph/base.resolvers.go | 8 ------ internal/graph/generated.go | 48 ++++++-------------------------- 2 files changed, 8 insertions(+), 48 deletions(-) diff --git a/internal/graph/base.resolvers.go b/internal/graph/base.resolvers.go index 4c2498e..0c0f4a1 100644 --- a/internal/graph/base.resolvers.go +++ b/internal/graph/base.resolvers.go @@ -42,14 +42,6 @@ func (r *cloudEventResolver) DataBase64(ctx context.Context, obj *CloudEventWrap return &obj.Raw.DataBase64, nil } -// DataUrl is the resolver for the dataUrl field. Returns a presigned S3 URL for blob payloads. -func (r *cloudEventResolver) DataUrl(ctx context.Context, obj *CloudEventWrapper) (*string, error) { - if obj == nil || obj.DataURL == "" { - return nil, nil - } - return &obj.DataURL, nil -} - // LatestIndex is the resolver for the latestIndex field. func (r *queryResolver) LatestIndex(ctx context.Context, did string, filter *model.CloudEventFilter) (*model.CloudEventIndex, error) { opts, err := r.requireSubjectOptsByDID(ctx, did, filter) diff --git a/internal/graph/generated.go b/internal/graph/generated.go index 9a20d14..0702476 100644 --- a/internal/graph/generated.go +++ b/internal/graph/generated.go @@ -51,7 +51,7 @@ type ComplexityRoot struct { CloudEvent struct { Data func(childComplexity int) int DataBase64 func(childComplexity int) int - DataUrl func(childComplexity int) int + DataURL func(childComplexity int) int Header func(childComplexity int) int } @@ -95,7 +95,6 @@ type CloudEventResolver interface { Header(ctx context.Context, obj *CloudEventWrapper) (*cloudevent.CloudEventHeader, error) Data(ctx context.Context, obj *CloudEventWrapper) (RawJSON, error) DataBase64(ctx context.Context, obj *CloudEventWrapper) (*string, error) - DataUrl(ctx context.Context, obj *CloudEventWrapper) (*string, error) } type QueryResolver interface { LatestIndex(ctx context.Context, did string, filter *model.CloudEventFilter) (*model.CloudEventIndex, error) @@ -137,11 +136,11 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin return e.complexity.CloudEvent.DataBase64(childComplexity), true case "CloudEvent.dataUrl": - if e.complexity.CloudEvent.DataUrl == nil { + if e.complexity.CloudEvent.DataURL == nil { break } - return e.complexity.CloudEvent.DataUrl(childComplexity), true + return e.complexity.CloudEvent.DataURL(childComplexity), true case "CloudEvent.header": if e.complexity.CloudEvent.Header == nil { break @@ -794,10 +793,10 @@ func (ec *executionContext) _CloudEvent_dataUrl(ctx context.Context, field graph field, ec.fieldContext_CloudEvent_dataUrl, func(ctx context.Context) (any, error) { - return ec.resolvers.CloudEvent().DataUrl(ctx, obj) + return obj.DataURL, nil }, nil, - ec.marshalOString2áš–string, + ec.marshalOString2string, true, false, ) @@ -807,8 +806,8 @@ func (ec *executionContext) fieldContext_CloudEvent_dataUrl(_ context.Context, f fc = &graphql.FieldContext{ Object: "CloudEvent", Field: field, - IsMethod: true, - IsResolver: true, + IsMethod: false, + IsResolver: false, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { return nil, errors.New("field of type String does not have child fields") }, @@ -3356,38 +3355,7 @@ func (ec *executionContext) _CloudEvent(ctx context.Context, sel ast.SelectionSe out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) case "dataUrl": - field := field - - innerFunc := func(ctx context.Context, _ *graphql.FieldSet) (res graphql.Marshaler) { - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - } - }() - res = ec._CloudEvent_dataUrl(ctx, field, obj) - return res - } - - if field.Deferrable != nil { - dfs, ok := deferred[field.Deferrable.Label] - di := 0 - if ok { - dfs.AddField(field) - di = len(dfs.Values) - 1 - } else { - dfs = graphql.NewFieldSet([]graphql.CollectedField{field}) - deferred[field.Deferrable.Label] = dfs - } - dfs.Concurrently(di, func(ctx context.Context) graphql.Marshaler { - return innerFunc(ctx, dfs) - }) - - // don't run the out.Concurrently() call below - out.Values[i] = graphql.Null - continue - } - - out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) + out.Values[i] = ec._CloudEvent_dataUrl(ctx, field, obj) default: panic("unknown field " + strconv.Quote(field.Name)) } From e2ce1733fc74d663765d5ae233fcee30d0e2375b Mon Sep 17 00:00:00 2001 From: Dylan Moreland Date: Tue, 31 Mar 2026 14:08:55 -0400 Subject: [PATCH 5/5] These tests don't do much --- internal/config/settings.go | 14 +++--- internal/graph/blob_resolver_test.go | 67 ---------------------------- 2 files changed, 7 insertions(+), 74 deletions(-) delete mode 100644 internal/graph/blob_resolver_test.go diff --git a/internal/config/settings.go b/internal/config/settings.go index f5b856b..ada6e0d 100644 --- a/internal/config/settings.go +++ b/internal/config/settings.go @@ -7,13 +7,13 @@ import ( // Settings contains the application config. type Settings struct { - Port int `yaml:"PORT"` - MonPort int `yaml:"MON_PORT"` - GRPCPort int `yaml:"GRPC_PORT"` - EnablePprof bool `yaml:"ENABLE_PPROF"` - MaxRequestDuration string `yaml:"MAX_REQUEST_DURATION"` - TokenExchangeJWTKeySetURL string `yaml:"TOKEN_EXCHANGE_JWK_KEY_SET_URL"` - TokenExchangeIssuer string `yaml:"TOKEN_EXCHANGE_ISSUER_URL"` + Port int `yaml:"PORT"` + MonPort int `yaml:"MON_PORT"` + GRPCPort int `yaml:"GRPC_PORT"` + EnablePprof bool `yaml:"ENABLE_PPROF"` + MaxRequestDuration string `yaml:"MAX_REQUEST_DURATION"` + TokenExchangeJWTKeySetURL string `yaml:"TOKEN_EXCHANGE_JWK_KEY_SET_URL"` + TokenExchangeIssuer string `yaml:"TOKEN_EXCHANGE_ISSUER_URL"` CloudEventBucket string `yaml:"CLOUDEVENT_BUCKET"` EphemeralBucket string `yaml:"EPHEMERAL_BUCKET"` ParquetBucket string `yaml:"PARQUET_BUCKET"` diff --git a/internal/graph/blob_resolver_test.go b/internal/graph/blob_resolver_test.go deleted file mode 100644 index 490fcc0..0000000 --- a/internal/graph/blob_resolver_test.go +++ /dev/null @@ -1,67 +0,0 @@ -package graph - -import ( - "context" - "testing" - - "github.com/DIMO-Network/cloudevent" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -// TestDataUrlResolver verifies the DataUrl field resolver reads CloudEventWrapper.DataURL. -func TestDataUrlResolver(t *testing.T) { - r := &cloudEventResolver{&Resolver{}} - ctx := context.Background() - - t.Run("returns nil when DataURL is empty", func(t *testing.T) { - obj := &CloudEventWrapper{Raw: &cloudevent.RawEvent{}} - result, err := r.DataUrl(ctx, obj) - require.NoError(t, err) - assert.Nil(t, result) - }) - - t.Run("returns URL when DataURL is set", func(t *testing.T) { - const url = "https://s3.amazonaws.com/bucket/cloudevent/blobs/scan.bin?X-Amz-Signature=abc" - obj := &CloudEventWrapper{ - Raw: &cloudevent.RawEvent{}, - DataURL: url, - } - result, err := r.DataUrl(ctx, obj) - require.NoError(t, err) - require.NotNil(t, result) - assert.Equal(t, url, *result) - }) - - t.Run("returns nil for nil wrapper", func(t *testing.T) { - result, err := r.DataUrl(ctx, nil) - require.NoError(t, err) - assert.Nil(t, result) - }) -} - -// TestBlobWrapperFieldsAreNil verifies that a CloudEventWrapper constructed for a -// blob (no inline payload) returns nil from the data and dataBase64 resolvers. -// This mirrors what the query resolver does when it builds a blob wrapper. -func TestBlobWrapperFieldsAreNil(t *testing.T) { - r := &cloudEventResolver{&Resolver{}} - ctx := context.Background() - - blobWrapper := &CloudEventWrapper{ - Raw: &cloudevent.RawEvent{CloudEventHeader: cloudevent.CloudEventHeader{ID: "evt-1"}}, - DataURL: "https://example.com/presigned", - } - - data, err := r.Data(ctx, blobWrapper) - require.NoError(t, err) - assert.Nil(t, data, "data should be nil for a blob wrapper") - - b64, err := r.DataBase64(ctx, blobWrapper) - require.NoError(t, err) - assert.Nil(t, b64, "dataBase64 should be nil for a blob wrapper") - - url, err := r.DataUrl(ctx, blobWrapper) - require.NoError(t, err) - require.NotNil(t, url) - assert.Equal(t, "https://example.com/presigned", *url) -}