diff --git a/internal/app/app.go b/internal/app/app.go index e410ec4..9d82b5b 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -3,6 +3,8 @@ package app import ( "fmt" "net/http" + "time" + "github.com/99designs/gqlgen/graphql/handler" "github.com/99designs/gqlgen/graphql/handler/extension" "github.com/99designs/gqlgen/graphql/handler/transport" @@ -21,6 +23,7 @@ import ( grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/rs/zerolog" "google.golang.org/grpc" ) @@ -34,6 +37,9 @@ type App struct { cleanup func() } +// defaultPresignExpiry is used when PRESIGNED_URL_EXPIRY is not configured. +const defaultPresignExpiry = 15 * time.Minute + // New creates a new application with GraphQL handler and middleware. func New(settings config.Settings) (*App, error) { chConn, err := chClientFromSettings(&settings) @@ -41,15 +47,25 @@ func New(settings config.Settings) (*App, error) { return nil, fmt.Errorf("failed to create ClickHouse connection: %w", err) } s3Client := s3ClientFromSettings(&settings) + presignClient := s3.NewPresignClient(s3Client) buckets := []string{settings.CloudEventBucket, settings.EphemeralBucket} eventService := eventrepo.New(chConn, s3Client, settings.ParquetBucket) + presignExpiry := defaultPresignExpiry + if settings.PresignedURLExpiry != "" { + d, err := time.ParseDuration(settings.PresignedURLExpiry) + if err != nil { + return nil, fmt.Errorf("invalid PRESIGNED_URL_EXPIRY %q: %w", settings.PresignedURLExpiry, err) + } + presignExpiry = d + } + var identityClient identity.Client if settings.IdentityAPIURL != "" { identityClient = identity.New(settings.IdentityAPIURL) } - gqlSrv := newGraphQLHandler(&settings, eventService, buckets, identityClient) + gqlSrv := newGraphQLHandler(&settings, eventService, buckets, identityClient, presignClient, presignExpiry) jwtMiddleware, err := auth.NewJWTMiddleware(settings.TokenExchangeIssuer, settings.TokenExchangeJWTKeySetURL) if err != nil { @@ -91,11 +107,13 @@ func (a *App) Cleanup() { } // newGraphQLHandler creates a configured gqlgen handler.Server. -func newGraphQLHandler(settings *config.Settings, eventService *eventrepo.Service, buckets []string, identityClient identity.Client) *handler.Server { +func newGraphQLHandler(settings *config.Settings, eventService *eventrepo.Service, buckets []string, identityClient identity.Client, presignClient *s3.PresignClient, presignExpiry time.Duration) *handler.Server { resolver := &graph.Resolver{ EventService: eventService, Buckets: buckets, IdentityClient: identityClient, + Presigner: presignClient, + PresignExpiry: presignExpiry, } cfg := graph.Config{Resolvers: resolver} diff --git a/internal/config/settings.go b/internal/config/settings.go index f5b856b..8094e43 100644 --- a/internal/config/settings.go +++ b/internal/config/settings.go @@ -17,6 +17,7 @@ type Settings struct { CloudEventBucket string `yaml:"CLOUDEVENT_BUCKET"` EphemeralBucket string `yaml:"EPHEMERAL_BUCKET"` ParquetBucket string `yaml:"PARQUET_BUCKET"` + PresignedURLExpiry string `yaml:"PRESIGNED_URL_EXPIRY"` // e.g. "15m"; defaults to 15m if empty S3AWSRegion string `yaml:"S3_AWS_REGION"` S3AWSAccessKeyID string `yaml:"S3_AWS_ACCESS_KEY_ID"` S3AWSSecretAccessKey string `yaml:"S3_AWS_SECRET_ACCESS_KEY"` diff --git a/internal/graph/base.resolvers.go b/internal/graph/base.resolvers.go index 0946a91..61382fb 100644 --- a/internal/graph/base.resolvers.go +++ b/internal/graph/base.resolvers.go @@ -13,6 +13,8 @@ import ( "github.com/DIMO-Network/cloudevent" "github.com/DIMO-Network/fetch-api/internal/fetch" "github.com/DIMO-Network/fetch-api/internal/graph/model" + "github.com/DIMO-Network/fetch-api/pkg/eventrepo" + fetchgrpc "github.com/DIMO-Network/fetch-api/pkg/grpc" ) // Header is the resolver for the header field. Returns a pointer into the wrapped event; no copy. @@ -40,6 +42,15 @@ func (r *cloudEventResolver) DataBase64(ctx context.Context, obj *CloudEventWrap return &obj.Raw.DataBase64, nil } +// PresignedUrl is the resolver for the presignedUrl field. +// Returns a short-lived pre-signed S3 URL for large single-file events. +func (r *cloudEventResolver) PresignedUrl(ctx context.Context, obj *CloudEventWrapper) (*string, error) { + if obj == nil || obj.PresignedURL == "" { + return nil, nil + } + return &obj.PresignedURL, nil +} + // LatestIndex is the resolver for the latestIndex field. func (r *queryResolver) LatestIndex(ctx context.Context, did string, filter *model.CloudEventFilter) (*model.CloudEventIndex, error) { opts, err := r.requireSubjectOptsByDID(ctx, did, filter) @@ -83,6 +94,15 @@ func (r *queryResolver) LatestCloudEvent(ctx context.Context, did string, filter if err != nil { return nil, err } + if eventrepo.IsSingleEventRef(idx.Data.Key) { + url, err := r.presignSingleEvent(ctx, idx.Data.Key) + if err != nil { + return nil, err + } + hdr := idx.CloudEventHeader + hdr.Tags = fetchgrpc.TagsOrEmpty(hdr.Tags) + return &CloudEventWrapper{Raw: &cloudevent.RawEvent{CloudEventHeader: hdr}, PresignedURL: url}, nil + } ce, err := fetch.GetCloudEventFromIndex(ctx, r.EventService, &idx, r.Buckets) if err != nil { return nil, err @@ -103,14 +123,37 @@ func (r *queryResolver) CloudEvents(ctx context.Context, did string, limit *int, } return nil, err } - events, err := fetch.ListCloudEventsFromIndexes(ctx, r.EventService, list, r.Buckets) - if err != nil { - return nil, err + + out := make([]*CloudEventWrapper, len(list)) + + // Separate single-event refs (presign only) from regular refs (fetch from S3). + var fetchIndexes []cloudevent.CloudEvent[eventrepo.ObjectInfo] + var fetchPositions []int + for i, idx := range list { + if eventrepo.IsSingleEventRef(idx.Data.Key) { + url, err := r.presignSingleEvent(ctx, idx.Data.Key) + if err != nil { + return nil, err + } + hdr := idx.CloudEventHeader + hdr.Tags = fetchgrpc.TagsOrEmpty(hdr.Tags) + out[i] = &CloudEventWrapper{Raw: &cloudevent.RawEvent{CloudEventHeader: hdr}, PresignedURL: url} + } else { + fetchIndexes = append(fetchIndexes, idx) + fetchPositions = append(fetchPositions, i) + } } - out := make([]*CloudEventWrapper, len(events)) - for i := range events { - out[i] = &CloudEventWrapper{Raw: &events[i]} + + if len(fetchIndexes) > 0 { + events, err := fetch.ListCloudEventsFromIndexes(ctx, r.EventService, fetchIndexes, r.Buckets) + if err != nil { + return nil, err + } + for i, pos := range fetchPositions { + out[pos] = &CloudEventWrapper{Raw: &events[i]} + } } + return out, nil } diff --git a/internal/graph/cloud_event.go b/internal/graph/cloud_event.go index 761cd63..8557ce2 100644 --- a/internal/graph/cloud_event.go +++ b/internal/graph/cloud_event.go @@ -8,8 +8,10 @@ import ( // CloudEventWrapper holds a pointer to a RawEvent so resolvers can expose // header, data, and dataBase64 without copying the underlying event. +// For large single-file events, PresignedURL is set and Data/DataBase64 are null. type CloudEventWrapper struct { - Raw *cloudevent.RawEvent + Raw *cloudevent.RawEvent + PresignedURL string // non-empty for large single-event refs; clients should fetch from this URL } // RawJSON is the raw bytes of a JSON value. It implements graphql.Marshaler by diff --git a/internal/graph/generated.go b/internal/graph/generated.go index 59140a5..9f57a65 100644 --- a/internal/graph/generated.go +++ b/internal/graph/generated.go @@ -49,9 +49,10 @@ type DirectiveRoot struct { type ComplexityRoot struct { CloudEvent struct { - Data func(childComplexity int) int - DataBase64 func(childComplexity int) int - Header func(childComplexity int) int + Data func(childComplexity int) int + DataBase64 func(childComplexity int) int + Header func(childComplexity int) int + PresignedUrl func(childComplexity int) int } CloudEventHeader struct { @@ -86,6 +87,7 @@ type CloudEventResolver interface { Header(ctx context.Context, obj *CloudEventWrapper) (*cloudevent.CloudEventHeader, error) Data(ctx context.Context, obj *CloudEventWrapper) (RawJSON, error) DataBase64(ctx context.Context, obj *CloudEventWrapper) (*string, error) + PresignedUrl(ctx context.Context, obj *CloudEventWrapper) (*string, error) } type QueryResolver interface { LatestIndex(ctx context.Context, did string, filter *model.CloudEventFilter) (*model.CloudEventIndex, error) @@ -125,6 +127,12 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin } return e.complexity.CloudEvent.DataBase64(childComplexity), true + case "CloudEvent.presignedUrl": + if e.complexity.CloudEvent.PresignedUrl == nil { + break + } + + return e.complexity.CloudEvent.PresignedUrl(childComplexity), true case "CloudEvent.header": if e.complexity.CloudEvent.Header == nil { break @@ -370,10 +378,12 @@ Full CloudEvent: selectable header, data (JSON), and optional data_base64. type CloudEvent { """CloudEvents header fields. Request only the fields you need.""" header: CloudEventHeader! - """JSON payload. Omitted if not requested.""" + """JSON payload. Omitted if not requested. Null for large single-file events (see presignedUrl).""" data: JSON """Base64-encoded payload when present. Omitted if not requested.""" dataBase64: String + """Pre-signed S3 URL for large single-file events. When present, data and dataBase64 are null. The URL is valid for a short time.""" + presignedUrl: String } """ @@ -697,6 +707,35 @@ func (ec *executionContext) fieldContext_CloudEvent_dataBase64(_ context.Context return fc, nil } +func (ec *executionContext) _CloudEvent_presignedUrl(ctx context.Context, field graphql.CollectedField, obj *CloudEventWrapper) (ret graphql.Marshaler) { + return graphql.ResolveField( + ctx, + ec.OperationContext, + field, + ec.fieldContext_CloudEvent_presignedUrl, + func(ctx context.Context) (any, error) { + return ec.resolvers.CloudEvent().PresignedUrl(ctx, obj) + }, + nil, + ec.marshalOString2ᚖstring, + true, + false, + ) +} + +func (ec *executionContext) fieldContext_CloudEvent_presignedUrl(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "CloudEvent", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _CloudEventHeader_specversion(ctx context.Context, field graphql.CollectedField, obj *cloudevent.CloudEventHeader) (ret graphql.Marshaler) { return graphql.ResolveField( ctx, @@ -1254,6 +1293,8 @@ func (ec *executionContext) fieldContext_Query_latestCloudEvent(ctx context.Cont return ec.fieldContext_CloudEvent_data(ctx, field) case "dataBase64": return ec.fieldContext_CloudEvent_dataBase64(ctx, field) + case "presignedUrl": + return ec.fieldContext_CloudEvent_presignedUrl(ctx, field) } return nil, fmt.Errorf("no field named %q was found under type CloudEvent", field.Name) }, @@ -1303,6 +1344,8 @@ func (ec *executionContext) fieldContext_Query_cloudEvents(ctx context.Context, return ec.fieldContext_CloudEvent_data(ctx, field) case "dataBase64": return ec.fieldContext_CloudEvent_dataBase64(ctx, field) + case "presignedUrl": + return ec.fieldContext_CloudEvent_presignedUrl(ctx, field) } return nil, fmt.Errorf("no field named %q was found under type CloudEvent", field.Name) }, @@ -3064,6 +3107,39 @@ func (ec *executionContext) _CloudEvent(ctx context.Context, sel ast.SelectionSe continue } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) + case "presignedUrl": + field := field + + innerFunc := func(ctx context.Context, _ *graphql.FieldSet) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._CloudEvent_presignedUrl(ctx, field, obj) + return res + } + + if field.Deferrable != nil { + dfs, ok := deferred[field.Deferrable.Label] + di := 0 + if ok { + dfs.AddField(field) + di = len(dfs.Values) - 1 + } else { + dfs = graphql.NewFieldSet([]graphql.CollectedField{field}) + deferred[field.Deferrable.Label] = dfs + } + dfs.Concurrently(di, func(ctx context.Context) graphql.Marshaler { + return innerFunc(ctx, dfs) + }) + + // don't run the out.Concurrently() call below + out.Values[i] = graphql.Null + continue + } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) default: panic("unknown field " + strconv.Quote(field.Name)) diff --git a/internal/graph/presign_test.go b/internal/graph/presign_test.go new file mode 100644 index 0000000..c949c15 --- /dev/null +++ b/internal/graph/presign_test.go @@ -0,0 +1,239 @@ +package graph + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/DIMO-Network/cloudevent" + "github.com/DIMO-Network/fetch-api/pkg/eventrepo" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// stubObjectGetter is a minimal test double for eventrepo.ObjectGetter. +// Only HeadObject is exercised by the presign path; the other methods panic +// if called so tests fail loudly if unexpected code paths are hit. +type stubObjectGetter struct { + headObject func(bucket, key string) error +} + +func (s *stubObjectGetter) GetObject(_ context.Context, _ *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + panic("GetObject called unexpectedly in presign test") +} + +func (s *stubObjectGetter) PutObject(_ context.Context, _ *s3.PutObjectInput, _ ...func(*s3.Options)) (*s3.PutObjectOutput, error) { + panic("PutObject called unexpectedly in presign test") +} + +func (s *stubObjectGetter) HeadObject(_ context.Context, params *s3.HeadObjectInput, _ ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + return &s3.HeadObjectOutput{}, s.headObject(aws.ToString(params.Bucket), aws.ToString(params.Key)) +} + +// fakePresignClient creates an s3.PresignClient backed by static fake +// credentials. Presigning is pure HMAC computation — no network calls occur. +func fakePresignClient() *s3.PresignClient { + s3c := s3.NewFromConfig(aws.Config{ + Region: "us-east-1", + Credentials: credentials.NewStaticCredentialsProvider("TESTAKID", "TESTSECRET", ""), + }) + return s3.NewPresignClient(s3c) +} + +// presignQueryResolver builds a queryResolver wired with the given service, +// buckets, and expiry using the shared fake presign client. +func presignQueryResolver(svc *eventrepo.Service, buckets []string, expiry time.Duration) *queryResolver { + return &queryResolver{&Resolver{ + EventService: svc, + Buckets: buckets, + Presigner: fakePresignClient(), + PresignExpiry: expiry, + }} +} + +// --- PresignedUrl field resolver --- + +func TestPresignedUrlResolver(t *testing.T) { + t.Parallel() + r := &cloudEventResolver{&Resolver{}} + + t.Run("nil wrapper returns nil", func(t *testing.T) { + t.Parallel() + url, err := r.PresignedUrl(context.Background(), nil) + require.NoError(t, err) + assert.Nil(t, url) + }) + + t.Run("wrapper with empty PresignedURL returns nil", func(t *testing.T) { + t.Parallel() + w := &CloudEventWrapper{Raw: &cloudevent.RawEvent{}} + url, err := r.PresignedUrl(context.Background(), w) + require.NoError(t, err) + assert.Nil(t, url) + }) + + t.Run("wrapper with PresignedURL returns pointer to URL", func(t *testing.T) { + t.Parallel() + const expected = "https://s3.example.com/bucket/single-xyz.json?sig=abc" + w := &CloudEventWrapper{ + Raw: &cloudevent.RawEvent{}, + PresignedURL: expected, + } + url, err := r.PresignedUrl(context.Background(), w) + require.NoError(t, err) + require.NotNil(t, url) + assert.Equal(t, expected, *url) + }) +} + +// --- Header resolver with a presigned wrapper --- +// Verifies that header fields are still accessible when data is replaced by +// a presigned URL (the Raw event carries the header from the index). + +func TestHeaderResolverWithPresignedWrapper(t *testing.T) { + t.Parallel() + r := &cloudEventResolver{&Resolver{}} + + hdr := cloudevent.CloudEventHeader{ + ID: "evt-001", + Source: "0xABC", + Subject: "did:eth:137:0x1234:42", + Type: "dimo.status", + } + w := &CloudEventWrapper{ + Raw: &cloudevent.RawEvent{CloudEventHeader: hdr}, + PresignedURL: "https://s3.example.com/bucket/single-uuid.json?sig=x", + } + + got, err := r.Header(context.Background(), w) + require.NoError(t, err) + require.NotNil(t, got) + assert.Equal(t, "evt-001", got.ID) + assert.Equal(t, "0xABC", got.Source) + assert.Equal(t, "dimo.status", got.Type) + + // data and dataBase64 must be nil when only a presigned URL is present + data, err := r.Data(context.Background(), w) + require.NoError(t, err) + assert.Nil(t, data) + + b64, err := r.DataBase64(context.Background(), w) + require.NoError(t, err) + assert.Nil(t, b64) +} + +// --- presignSingleEvent helper --- + +func TestPresignSingleEvent(t *testing.T) { + t.Parallel() + + const ( + bucket1 = "cloud-events" + bucket2 = "ephemeral-events" + key = "2024/01/15/single-abc123.json" + ) + + t.Run("object in first bucket returns presigned URL for that bucket", func(t *testing.T) { + t.Parallel() + stub := &stubObjectGetter{ + headObject: func(bucket, k string) error { + if bucket == bucket1 && k == key { + return nil + } + return &s3types.NoSuchKey{} + }, + } + svc := eventrepo.New(nil, stub, "") + q := presignQueryResolver(svc, []string{bucket1, bucket2}, 15*time.Minute) + + url, err := q.presignSingleEvent(context.Background(), key) + require.NoError(t, err) + assert.True(t, strings.HasPrefix(url, "https://"), "URL should be HTTPS") + assert.Contains(t, url, bucket1) + assert.Contains(t, url, "single-abc123.json") + }) + + t.Run("object absent from first bucket falls back to second", func(t *testing.T) { + t.Parallel() + stub := &stubObjectGetter{ + headObject: func(bucket, k string) error { + if bucket == bucket2 && k == key { + return nil // only in bucket2 + } + return &s3types.NoSuchKey{} + }, + } + svc := eventrepo.New(nil, stub, "") + q := presignQueryResolver(svc, []string{bucket1, bucket2}, 15*time.Minute) + + url, err := q.presignSingleEvent(context.Background(), key) + require.NoError(t, err) + assert.Contains(t, url, bucket2) + assert.Contains(t, url, "single-abc123.json") + }) + + t.Run("object in neither bucket returns error mentioning the key", func(t *testing.T) { + t.Parallel() + stub := &stubObjectGetter{ + headObject: func(_, _ string) error { + return &s3types.NoSuchKey{} + }, + } + svc := eventrepo.New(nil, stub, "") + q := presignQueryResolver(svc, []string{bucket1, bucket2}, 15*time.Minute) + + url, err := q.presignSingleEvent(context.Background(), key) + require.Error(t, err) + assert.Empty(t, url) + assert.Contains(t, err.Error(), key) + }) + + t.Run("HeadObject error other than NoSuchKey still skips bucket", func(t *testing.T) { + t.Parallel() + // Any error (not just NoSuchKey) causes the bucket to be skipped. + stub := &stubObjectGetter{ + headObject: func(bucket, k string) error { + if bucket == bucket1 { + return fmt.Errorf("access denied") + } + return nil // bucket2 succeeds + }, + } + svc := eventrepo.New(nil, stub, "") + q := presignQueryResolver(svc, []string{bucket1, bucket2}, 15*time.Minute) + + url, err := q.presignSingleEvent(context.Background(), key) + require.NoError(t, err) + assert.Contains(t, url, bucket2) + }) + + t.Run("presigned URL embeds the configured expiry", func(t *testing.T) { + t.Parallel() + const expiry = 30 * time.Minute // 1800 seconds + stub := &stubObjectGetter{ + headObject: func(_, _ string) error { return nil }, + } + svc := eventrepo.New(nil, stub, "") + q := presignQueryResolver(svc, []string{bucket1}, expiry) + + url, err := q.presignSingleEvent(context.Background(), key) + require.NoError(t, err) + assert.Contains(t, url, "X-Amz-Expires=1800") + }) + + t.Run("no buckets configured returns error", func(t *testing.T) { + t.Parallel() + svc := eventrepo.New(nil, &stubObjectGetter{headObject: func(_, _ string) error { return nil }}, "") + q := presignQueryResolver(svc, nil, 15*time.Minute) + + url, err := q.presignSingleEvent(context.Background(), key) + require.Error(t, err) + assert.Empty(t, url) + }) +} diff --git a/internal/graph/resolver.go b/internal/graph/resolver.go index 9ea5404..368a486 100644 --- a/internal/graph/resolver.go +++ b/internal/graph/resolver.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "slices" + "time" "github.com/DIMO-Network/cloudevent" "github.com/DIMO-Network/fetch-api/internal/graph/model" @@ -11,6 +12,8 @@ import ( "github.com/DIMO-Network/fetch-api/pkg/eventrepo" "github.com/DIMO-Network/fetch-api/pkg/grpc" "github.com/DIMO-Network/token-exchange-api/pkg/tokenclaims" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" ) // This file will not be regenerated automatically. @@ -24,6 +27,8 @@ type Resolver struct { EventService *eventrepo.Service Buckets []string IdentityClient identity.Client + Presigner *s3.PresignClient + PresignExpiry time.Duration } const ( @@ -65,6 +70,26 @@ func requireRawDataToken(ctx context.Context) (*tokenclaims.Token, error) { return tok, nil } +// presignSingleEvent finds which configured bucket contains the large single-event +// object at key and returns a pre-signed S3 GET URL valid for r.PresignExpiry. +func (r *queryResolver) presignSingleEvent(ctx context.Context, key string) (string, error) { + for _, bucket := range r.Buckets { + _, err := r.EventService.HeadObject(ctx, bucket, key) + if err != nil { + continue + } + req, err := r.Presigner.PresignGetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }, func(o *s3.PresignOptions) { o.Expires = r.PresignExpiry }) + if err != nil { + return "", fmt.Errorf("presign %s/%s: %w", bucket, key, err) + } + return req.URL, nil + } + return "", fmt.Errorf("single event not found in any bucket: %s", key) +} + // ensureRequestedDIDLinkedToPermissionedSubject verifies the client-requested DID is allowed by the token. // requestedDID: the DID from the query (e.g. cloudEvents(did: "...")). // tokenSubjectDID: the DID the JWT grants access to (tok.Asset). diff --git a/pkg/eventrepo/eventrepo.go b/pkg/eventrepo/eventrepo.go index a18de4d..57327d9 100644 --- a/pkg/eventrepo/eventrepo.go +++ b/pkg/eventrepo/eventrepo.go @@ -14,6 +14,7 @@ import ( chindexer "github.com/DIMO-Network/cloudevent/clickhouse" "github.com/DIMO-Network/cloudevent/parquet" "github.com/DIMO-Network/fetch-api/pkg/grpc" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/volatiletech/sqlboiler/v4/drivers" "github.com/volatiletech/sqlboiler/v4/queries" @@ -43,6 +44,14 @@ type ObjectInfo struct { type ObjectGetter interface { GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) + HeadObject(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) +} + +// IsSingleEventRef reports whether key is a large single-event JSON object +// (filename of the form "single-.json") rather than a parquet batch ref or legacy key. +func IsSingleEventRef(key string) bool { + base := key[strings.LastIndexByte(key, '/')+1:] + return strings.HasPrefix(base, "single-") && strings.HasSuffix(base, ".json") } // New creates a new instance of Service. @@ -421,6 +430,16 @@ func (s *Service) StoreObject(ctx context.Context, bucketName string, cloudHeade return nil } +// HeadObject returns the S3 object metadata for the given bucket and key. +// It is used to discover which bucket contains a large single-event object +// before generating a pre-signed URL. +func (s *Service) HeadObject(ctx context.Context, bucket, key string) (*s3.HeadObjectOutput, error) { + return s.objGetter.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) +} + // toCloudEvent extracts only the data and data_base64 fields from the stored // CloudEvent JSON, then overlays the index header. We skip parsing header fields // since they come from the DB index. When data_base64 is present, Data is left nil. diff --git a/pkg/eventrepo/eventrepo_mock_test.go b/pkg/eventrepo/eventrepo_mock_test.go index 50daa38..db33a32 100644 --- a/pkg/eventrepo/eventrepo_mock_test.go +++ b/pkg/eventrepo/eventrepo_mock_test.go @@ -80,3 +80,23 @@ func (mr *MockObjectGetterMockRecorder) PutObject(ctx, params any, optFns ...any varargs := append([]any{ctx, params}, optFns...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutObject", reflect.TypeOf((*MockObjectGetter)(nil).PutObject), varargs...) } + +// HeadObject mocks base method. +func (m *MockObjectGetter) HeadObject(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, params} + for _, a := range optFns { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "HeadObject", varargs...) + ret0, _ := ret[0].(*s3.HeadObjectOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// HeadObject indicates an expected call of HeadObject. +func (mr *MockObjectGetterMockRecorder) HeadObject(ctx, params any, optFns ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, params}, optFns...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HeadObject", reflect.TypeOf((*MockObjectGetter)(nil).HeadObject), varargs...) +} diff --git a/pkg/eventrepo/single_ref_test.go b/pkg/eventrepo/single_ref_test.go new file mode 100644 index 0000000..1967330 --- /dev/null +++ b/pkg/eventrepo/single_ref_test.go @@ -0,0 +1,88 @@ +package eventrepo_test + +import ( + "testing" + + "github.com/DIMO-Network/fetch-api/pkg/eventrepo" + "github.com/stretchr/testify/assert" +) + +func TestIsSingleEventRef(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + key string + expected bool + }{ + // Positive cases + { + name: "date-prefixed single event", + key: "2024/01/15/single-abc123.json", + expected: true, + }, + { + name: "no path prefix", + key: "single-abc123.json", + expected: true, + }, + { + name: "deep path prefix", + key: "did:eth:153:0x1234/events/single-uuid.json", + expected: true, + }, + { + name: "single- prefix with UUID-style name", + key: "2025/06/01/single-f47ac10b-58cc-4372-a567-0e02b2c3d479.json", + expected: true, + }, + // Negative cases + { + name: "parquet batch ref", + key: "batch-abc123.parquet#42", + expected: false, + }, + { + name: "legacy JSON key (DID path)", + key: "did:eth:153:0x1234/2024-01-15T10:00:00Z", + expected: false, + }, + { + name: "single- prefix but wrong extension", + key: "single-abc123.parquet", + expected: false, + }, + { + name: "single- prefix but no extension", + key: "single-abc123", + expected: false, + }, + { + name: "basename does not start with single-", + key: "2024/01/15/large-abc123.json", + expected: false, + }, + { + name: "contains single- but not at start of basename", + key: "path/not-single-abc.json", + expected: false, + }, + { + name: "empty key", + key: "", + expected: false, + }, + { + name: "only a slash", + key: "/", + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + assert.Equal(t, tt.expected, eventrepo.IsSingleEventRef(tt.key)) + }) + } +} diff --git a/schema/base.graphqls b/schema/base.graphqls index 91b552e..a50be9f 100644 --- a/schema/base.graphqls +++ b/schema/base.graphqls @@ -14,10 +14,12 @@ Full CloudEvent: selectable header, data (JSON), and optional data_base64. type CloudEvent { """CloudEvents header fields. Request only the fields you need.""" header: CloudEventHeader! - """JSON payload. Omitted if not requested.""" + """JSON payload. Omitted if not requested. Null for large single-file events (see presignedUrl).""" data: JSON """Base64-encoded payload when present. Omitted if not requested.""" dataBase64: String + """Pre-signed S3 URL for large single-file events. When present, data and dataBase64 are null. The URL is valid for a short time.""" + presignedUrl: String } """