diff --git a/internal/app/app.go b/internal/app/app.go index 8c153ef..8986289 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -14,6 +14,7 @@ import ( "github.com/DIMO-Network/fetch-api/internal/limits" "github.com/DIMO-Network/fetch-api/pkg/eventrepo" fetchgrpc "github.com/DIMO-Network/fetch-api/pkg/grpc" + "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/DIMO-Network/server-garage/pkg/gql/errorhandler" gqlmetrics "github.com/DIMO-Network/server-garage/pkg/gql/metrics" "github.com/DIMO-Network/shared/pkg/middleware/metrics" @@ -42,7 +43,7 @@ func New(settings config.Settings) (*App, error) { } s3Client := s3ClientFromSettings(&settings) buckets := []string{settings.CloudEventBucket, settings.EphemeralBucket, settings.ParquetBucket} - eventService := eventrepo.New(chConn, s3Client, settings.ParquetBucket) + eventService := eventrepo.New(chConn, s3Client, s3.NewPresignClient(s3Client), settings.ParquetBucket) var identityClient identity.Client if settings.IdentityAPIURL != "" { @@ -118,7 +119,7 @@ func CreateGRPCServer(logger *zerolog.Logger, settings *config.Settings) (*grpc. } s3Client := s3ClientFromSettings(settings) - eventService := eventrepo.New(chConn, s3Client, settings.ParquetBucket) + eventService := eventrepo.New(chConn, s3Client, s3.NewPresignClient(s3Client), settings.ParquetBucket) rpcServer := rpc.NewServer([]string{settings.CloudEventBucket, settings.EphemeralBucket, settings.ParquetBucket}, eventService) diff --git a/internal/config/settings.go b/internal/config/settings.go index f5b856b..ada6e0d 100644 --- a/internal/config/settings.go +++ b/internal/config/settings.go @@ -7,13 +7,13 @@ import ( // Settings contains the application config. type Settings struct { - Port int `yaml:"PORT"` - MonPort int `yaml:"MON_PORT"` - GRPCPort int `yaml:"GRPC_PORT"` - EnablePprof bool `yaml:"ENABLE_PPROF"` - MaxRequestDuration string `yaml:"MAX_REQUEST_DURATION"` - TokenExchangeJWTKeySetURL string `yaml:"TOKEN_EXCHANGE_JWK_KEY_SET_URL"` - TokenExchangeIssuer string `yaml:"TOKEN_EXCHANGE_ISSUER_URL"` + Port int `yaml:"PORT"` + MonPort int `yaml:"MON_PORT"` + GRPCPort int `yaml:"GRPC_PORT"` + EnablePprof bool `yaml:"ENABLE_PPROF"` + MaxRequestDuration string `yaml:"MAX_REQUEST_DURATION"` + TokenExchangeJWTKeySetURL string `yaml:"TOKEN_EXCHANGE_JWK_KEY_SET_URL"` + TokenExchangeIssuer string `yaml:"TOKEN_EXCHANGE_ISSUER_URL"` CloudEventBucket string `yaml:"CLOUDEVENT_BUCKET"` EphemeralBucket string `yaml:"EPHEMERAL_BUCKET"` ParquetBucket string `yaml:"PARQUET_BUCKET"` diff --git a/internal/graph/base.resolvers.go b/internal/graph/base.resolvers.go index e3fb053..0c0f4a1 100644 --- a/internal/graph/base.resolvers.go +++ b/internal/graph/base.resolvers.go @@ -9,10 +9,12 @@ import ( "context" "database/sql" "errors" + "strings" "github.com/DIMO-Network/cloudevent" "github.com/DIMO-Network/fetch-api/internal/fetch" "github.com/DIMO-Network/fetch-api/internal/graph/model" + "github.com/DIMO-Network/fetch-api/pkg/eventrepo" ) // Header is the resolver for the header field. Returns a pointer into the wrapped event; no copy. @@ -83,6 +85,17 @@ func (r *queryResolver) LatestCloudEvent(ctx context.Context, did string, filter if err != nil { return nil, err } + if strings.HasPrefix(idx.Data.Key, eventrepo.BlobKeyPrefix) { + if len(r.Buckets) == 0 { + return nil, errors.New("no buckets configured") + } + url, err := r.EventService.PresignBlobURL(ctx, idx.Data.Key, r.Buckets[0]) + if err != nil { + return nil, err + } + hdr := idx.CloudEventHeader + return &CloudEventWrapper{Raw: &cloudevent.RawEvent{CloudEventHeader: hdr}, DataURL: url}, nil + } ce, err := fetch.GetCloudEventFromIndex(ctx, r.EventService, &idx, r.Buckets) if err != nil { return nil, err @@ -103,14 +116,39 @@ func (r *queryResolver) CloudEvents(ctx context.Context, did string, limit *int, } return nil, err } - events, err := fetch.ListCloudEventsFromIndexes(ctx, r.EventService, list, r.Buckets) - if err != nil { - return nil, err + + out := make([]*CloudEventWrapper, len(list)) + var nonBlobIdxs []cloudevent.CloudEvent[eventrepo.ObjectInfo] + var nonBlobPos []int + + for i, idx := range list { + if strings.HasPrefix(idx.Data.Key, eventrepo.BlobKeyPrefix) { + if len(r.Buckets) == 0 { + return nil, errors.New("no buckets configured") + } + url, err := r.EventService.PresignBlobURL(ctx, idx.Data.Key, r.Buckets[0]) + if err != nil { + return nil, err + } + hdr := idx.CloudEventHeader + out[i] = &CloudEventWrapper{Raw: &cloudevent.RawEvent{CloudEventHeader: hdr}, DataURL: url} + } else { + nonBlobIdxs = append(nonBlobIdxs, idx) + nonBlobPos = append(nonBlobPos, i) + } } - out := make([]*CloudEventWrapper, len(events)) - for i := range events { - out[i] = &CloudEventWrapper{Raw: &events[i]} + + if len(nonBlobIdxs) > 0 { + events, err := fetch.ListCloudEventsFromIndexes(ctx, r.EventService, nonBlobIdxs, r.Buckets) + if err != nil { + return nil, err + } + for j := range events { + ev := events[j] + out[nonBlobPos[j]] = &CloudEventWrapper{Raw: &ev} + } } + return out, nil } diff --git a/internal/graph/cloud_event.go b/internal/graph/cloud_event.go index 761cd63..d42b262 100644 --- a/internal/graph/cloud_event.go +++ b/internal/graph/cloud_event.go @@ -7,9 +7,10 @@ import ( ) // CloudEventWrapper holds a pointer to a RawEvent so resolvers can expose -// header, data, and dataBase64 without copying the underlying event. +// header, data, dataBase64, and dataUrl without copying the underlying event. type CloudEventWrapper struct { - Raw *cloudevent.RawEvent + Raw *cloudevent.RawEvent + DataURL string // non-empty when the payload is a blob served via presigned URL } // RawJSON is the raw bytes of a JSON value. It implements graphql.Marshaler by diff --git a/internal/graph/generated.go b/internal/graph/generated.go index d7b824f..0702476 100644 --- a/internal/graph/generated.go +++ b/internal/graph/generated.go @@ -51,6 +51,7 @@ type ComplexityRoot struct { CloudEvent struct { Data func(childComplexity int) int DataBase64 func(childComplexity int) int + DataURL func(childComplexity int) int Header func(childComplexity int) int } @@ -134,6 +135,12 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin } return e.complexity.CloudEvent.DataBase64(childComplexity), true + case "CloudEvent.dataUrl": + if e.complexity.CloudEvent.DataURL == nil { + break + } + + return e.complexity.CloudEvent.DataURL(childComplexity), true case "CloudEvent.header": if e.complexity.CloudEvent.Header == nil { break @@ -419,6 +426,8 @@ type CloudEvent { data: JSON """Base64-encoded payload when present. Omitted if not requested.""" dataBase64: String + """Presigned S3 URL for large binary payloads. Populated instead of data/dataBase64 for large files.""" + dataUrl: String } """ @@ -777,6 +786,35 @@ func (ec *executionContext) fieldContext_CloudEvent_dataBase64(_ context.Context return fc, nil } +func (ec *executionContext) _CloudEvent_dataUrl(ctx context.Context, field graphql.CollectedField, obj *CloudEventWrapper) (ret graphql.Marshaler) { + return graphql.ResolveField( + ctx, + ec.OperationContext, + field, + ec.fieldContext_CloudEvent_dataUrl, + func(ctx context.Context) (any, error) { + return obj.DataURL, nil + }, + nil, + ec.marshalOString2string, + true, + false, + ) +} + +func (ec *executionContext) fieldContext_CloudEvent_dataUrl(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "CloudEvent", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _CloudEventHeader_specversion(ctx context.Context, field graphql.CollectedField, obj *cloudevent.CloudEventHeader) (ret graphql.Marshaler) { return graphql.ResolveField( ctx, @@ -1450,6 +1488,8 @@ func (ec *executionContext) fieldContext_Query_latestCloudEvent(ctx context.Cont return ec.fieldContext_CloudEvent_data(ctx, field) case "dataBase64": return ec.fieldContext_CloudEvent_dataBase64(ctx, field) + case "dataUrl": + return ec.fieldContext_CloudEvent_dataUrl(ctx, field) } return nil, fmt.Errorf("no field named %q was found under type CloudEvent", field.Name) }, @@ -1499,6 +1539,8 @@ func (ec *executionContext) fieldContext_Query_cloudEvents(ctx context.Context, return ec.fieldContext_CloudEvent_data(ctx, field) case "dataBase64": return ec.fieldContext_CloudEvent_dataBase64(ctx, field) + case "dataUrl": + return ec.fieldContext_CloudEvent_dataUrl(ctx, field) } return nil, fmt.Errorf("no field named %q was found under type CloudEvent", field.Name) }, @@ -3312,6 +3354,8 @@ func (ec *executionContext) _CloudEvent(ctx context.Context, sel ast.SelectionSe } out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) + case "dataUrl": + out.Values[i] = ec._CloudEvent_dataUrl(ctx, field, obj) default: panic("unknown field " + strconv.Quote(field.Name)) } diff --git a/pkg/eventrepo/event_repo_test.go b/pkg/eventrepo/event_repo_test.go index 75a879a..83061d8 100644 --- a/pkg/eventrepo/event_repo_test.go +++ b/pkg/eventrepo/event_repo_test.go @@ -93,7 +93,7 @@ func TestGetLatestIndexKey(t *testing.T) { }, } - indexService := eventrepo.New(conn, nil, "") + indexService := eventrepo.New(conn, nil, nil, "") for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -172,7 +172,7 @@ func TestGetDataFromIndex(t *testing.T) { ContentLength: ref(int64(len(content))), }, nil).AnyTimes() - indexService := eventrepo.New(conn, mockS3Client, "") + indexService := eventrepo.New(conn, mockS3Client, nil, "") for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -204,7 +204,7 @@ func TestStoreObject(t *testing.T) { mockS3Client := NewMockObjectGetter(ctrl) mockS3Client.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(&s3.PutObjectOutput{}, nil).AnyTimes() - indexService := eventrepo.New(conn, mockS3Client, "") + indexService := eventrepo.New(conn, mockS3Client, nil, "") content := []byte(`{"vin": "1HGCM82633A123456"}`) did := cloudevent.ERC721DID{ @@ -333,7 +333,7 @@ func TestGetData(t *testing.T) { ctrl := gomock.NewController(t) mockS3Client := NewMockObjectGetter(ctrl) - indexService := eventrepo.New(conn, mockS3Client, "") + indexService := eventrepo.New(conn, mockS3Client, nil, "") // Allow GetObject calls in any order since fetches are concurrent. if len(tt.expectedIndexKeys) > 0 { mockS3Client.EXPECT().GetObject(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { @@ -424,7 +424,7 @@ func TestGetEventWithAllHeaderFields(t *testing.T) { eventDataEnvelope := []byte(`{"data":` + string(eventData) + `}`) // Create service - indexService := eventrepo.New(conn, mockS3Client, "") + indexService := eventrepo.New(conn, mockS3Client, nil, "") // Test retrieving the event t.Run("retrieve event with full headers", func(t *testing.T) { @@ -576,7 +576,7 @@ func TestGetCloudEventFromIndex_ParquetRef(t *testing.T) { ctrl := gomock.NewController(t) mockS3 := mockS3ParquetReader(t, ctrl, parquetBytes) - indexService := eventrepo.New(conn, mockS3, "test-parquet-bucket") + indexService := eventrepo.New(conn, mockS3, nil, "test-parquet-bucket") // Build the index object as GetCloudEventFromIndex expects index := cloudevent.CloudEvent[eventrepo.ObjectInfo]{ @@ -655,7 +655,7 @@ func TestListCloudEventsFromIndexes_ParquetCaching(t *testing.T) { ctrl := gomock.NewController(t) mockS3 := mockS3ParquetReader(t, ctrl, parquetBytes) - indexService := eventrepo.New(conn, mockS3, "test-parquet-bucket") + indexService := eventrepo.New(conn, mockS3, nil, "test-parquet-bucket") indexes := []cloudevent.CloudEvent[eventrepo.ObjectInfo]{ {CloudEventHeader: hdr0, Data: eventrepo.ObjectInfo{Key: indexKeys[0]}}, @@ -747,7 +747,7 @@ func TestListIndexesAdvanced(t *testing.T) { keyTypeStatusSource1Producer3 := insertTestData(t, ctx, conn, eventIdx3) keyTypeStatusSource3Producer4 := insertTestData(t, ctx, conn, eventIdx4) - indexService := eventrepo.New(conn, nil, "") + indexService := eventrepo.New(conn, nil, nil, "") tests := []struct { name string @@ -1032,7 +1032,7 @@ func TestGetCloudEventTypeSummaries(t *testing.T) { insertTestData(t, ctx, conn, status3) insertTestData(t, ctx, conn, fp1) - indexService := eventrepo.New(conn, nil, "") + indexService := eventrepo.New(conn, nil, nil, "") t.Run("no filter returns all types", func(t *testing.T) { opts := &grpc.SearchOptions{ diff --git a/pkg/eventrepo/eventrepo.go b/pkg/eventrepo/eventrepo.go index 752c775..a5f36b1 100644 --- a/pkg/eventrepo/eventrepo.go +++ b/pkg/eventrepo/eventrepo.go @@ -15,6 +15,8 @@ import ( chindexer "github.com/DIMO-Network/cloudevent/clickhouse" "github.com/DIMO-Network/cloudevent/parquet" "github.com/DIMO-Network/fetch-api/pkg/grpc" + "github.com/aws/aws-sdk-go-v2/aws" + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/volatiletech/sqlboiler/v4/drivers" "github.com/volatiletech/sqlboiler/v4/queries" @@ -30,6 +32,7 @@ const tagsColumn = "JSONExtract(extras, 'tags', 'Array(String)')" // Service manages and retrieves data messages from indexed objects in S3. type Service struct { objGetter ObjectGetter + presigner Presigner chConn clickhouse.Conn // parquetBucket is the object storage bucket for Iceberg Parquet files. parquetBucket string @@ -46,15 +49,43 @@ type ObjectGetter interface { PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) } +// Presigner generates presigned S3 GET URLs. +type Presigner interface { + PresignGetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.PresignOptions)) (*v4.PresignedHTTPRequest, error) +} + +// BlobKeyPrefix is the S3 key prefix used for large binary blob objects. +// Keys with this prefix are served via presigned URL instead of inline in the response. +const BlobKeyPrefix = "cloudevent/blobs/" + +// presignTTL is the lifetime of generated presigned S3 URLs. +const presignTTL = 15 * time.Minute + // New creates a new instance of Service. -func New(chConn clickhouse.Conn, objGetter ObjectGetter, parquetBucket string) *Service { +func New(chConn clickhouse.Conn, objGetter ObjectGetter, presigner Presigner, parquetBucket string) *Service { return &Service{ objGetter: objGetter, + presigner: presigner, chConn: chConn, parquetBucket: parquetBucket, } } +// PresignBlobURL returns a short-lived presigned GET URL for the given S3 key and bucket. +func (s *Service) PresignBlobURL(ctx context.Context, key, bucket string) (string, error) { + if s.presigner == nil { + return "", fmt.Errorf("presigner not configured") + } + req, err := s.presigner.PresignGetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }, s3.WithPresignExpires(presignTTL)) + if err != nil { + return "", fmt.Errorf("presign %s/%s: %w", bucket, key, err) + } + return req.URL, nil +} + // GetLatestIndex returns the latest cloud event index that matches the given options. func (s *Service) GetLatestIndex(ctx context.Context, opts *grpc.SearchOptions) (cloudevent.CloudEvent[ObjectInfo], error) { advancedOpts := convertSearchOptionsToAdvanced(opts) diff --git a/pkg/eventrepo/eventrepo_mock_test.go b/pkg/eventrepo/eventrepo_mock_test.go index 50daa38..48a8230 100644 --- a/pkg/eventrepo/eventrepo_mock_test.go +++ b/pkg/eventrepo/eventrepo_mock_test.go @@ -13,6 +13,7 @@ import ( context "context" reflect "reflect" + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" s3 "github.com/aws/aws-sdk-go-v2/service/s3" gomock "go.uber.org/mock/gomock" ) @@ -80,3 +81,47 @@ func (mr *MockObjectGetterMockRecorder) PutObject(ctx, params any, optFns ...any varargs := append([]any{ctx, params}, optFns...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutObject", reflect.TypeOf((*MockObjectGetter)(nil).PutObject), varargs...) } + +// MockPresigner is a mock of Presigner interface. +type MockPresigner struct { + ctrl *gomock.Controller + recorder *MockPresignerMockRecorder + isgomock struct{} +} + +// MockPresignerMockRecorder is the mock recorder for MockPresigner. +type MockPresignerMockRecorder struct { + mock *MockPresigner +} + +// NewMockPresigner creates a new mock instance. +func NewMockPresigner(ctrl *gomock.Controller) *MockPresigner { + mock := &MockPresigner{ctrl: ctrl} + mock.recorder = &MockPresignerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPresigner) EXPECT() *MockPresignerMockRecorder { + return m.recorder +} + +// PresignGetObject mocks base method. +func (m *MockPresigner) PresignGetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.PresignOptions)) (*v4.PresignedHTTPRequest, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, params} + for _, a := range optFns { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "PresignGetObject", varargs...) + ret0, _ := ret[0].(*v4.PresignedHTTPRequest) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PresignGetObject indicates an expected call of PresignGetObject. +func (mr *MockPresignerMockRecorder) PresignGetObject(ctx, params any, optFns ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, params}, optFns...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PresignGetObject", reflect.TypeOf((*MockPresigner)(nil).PresignGetObject), varargs...) +} diff --git a/pkg/eventrepo/presign_test.go b/pkg/eventrepo/presign_test.go new file mode 100644 index 0000000..70dfd23 --- /dev/null +++ b/pkg/eventrepo/presign_test.go @@ -0,0 +1,74 @@ +package eventrepo_test + +import ( + "context" + "fmt" + "testing" + "time" + + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + "github.com/DIMO-Network/fetch-api/pkg/eventrepo" +) + +func TestPresignBlobURL(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + mockPresigner := NewMockPresigner(ctrl) + + svc := eventrepo.New(nil, nil, mockPresigner, "") + + const ( + bucket = "test-bucket" + key = "cloudevent/blobs/some-scan.bin" + expectedURL = "https://s3.amazonaws.com/test-bucket/cloudevent/blobs/some-scan.bin?X-Amz-Signature=abc123" + ) + + mockPresigner.EXPECT(). + PresignGetObject(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, params *s3.GetObjectInput, optFns ...func(*s3.PresignOptions)) (*v4.PresignedHTTPRequest, error) { + assert.Equal(t, bucket, *params.Bucket) + assert.Equal(t, key, *params.Key) + + // Verify the TTL is applied. + opts := s3.PresignOptions{} + for _, fn := range optFns { + fn(&opts) + } + assert.Equal(t, 15*time.Minute, opts.Expires) + + return &v4.PresignedHTTPRequest{URL: expectedURL}, nil + }) + + url, err := svc.PresignBlobURL(context.Background(), key, bucket) + require.NoError(t, err) + assert.Equal(t, expectedURL, url) +} + +func TestPresignBlobURL_PresignerError(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + mockPresigner := NewMockPresigner(ctrl) + + svc := eventrepo.New(nil, nil, mockPresigner, "") + + mockPresigner.EXPECT(). + PresignGetObject(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, fmt.Errorf("signing failure")) + + _, err := svc.PresignBlobURL(context.Background(), "cloudevent/blobs/test.bin", "test-bucket") + require.Error(t, err) + assert.Contains(t, err.Error(), "signing failure") +} + +func TestPresignBlobURL_NilPresigner(t *testing.T) { + t.Parallel() + svc := eventrepo.New(nil, nil, nil, "") + + _, err := svc.PresignBlobURL(context.Background(), "cloudevent/blobs/test.bin", "test-bucket") + require.Error(t, err) +} diff --git a/schema/base.graphqls b/schema/base.graphqls index aafb0b9..8a09c4c 100644 --- a/schema/base.graphqls +++ b/schema/base.graphqls @@ -18,6 +18,8 @@ type CloudEvent { data: JSON """Base64-encoded payload when present. Omitted if not requested.""" dataBase64: String + """Presigned S3 URL for large binary payloads. Populated instead of data/dataBase64 for large files.""" + dataUrl: String } """