Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/DIMO-Network/fetch-api/internal/limits"
"github.com/DIMO-Network/fetch-api/pkg/eventrepo"
fetchgrpc "github.com/DIMO-Network/fetch-api/pkg/grpc"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/DIMO-Network/server-garage/pkg/gql/errorhandler"
gqlmetrics "github.com/DIMO-Network/server-garage/pkg/gql/metrics"
"github.com/DIMO-Network/shared/pkg/middleware/metrics"
Expand Down Expand Up @@ -42,7 +43,7 @@ func New(settings config.Settings) (*App, error) {
}
s3Client := s3ClientFromSettings(&settings)
buckets := []string{settings.CloudEventBucket, settings.EphemeralBucket, settings.ParquetBucket}
eventService := eventrepo.New(chConn, s3Client, settings.ParquetBucket)
eventService := eventrepo.New(chConn, s3Client, s3.NewPresignClient(s3Client), settings.ParquetBucket)

var identityClient identity.Client
if settings.IdentityAPIURL != "" {
Expand Down Expand Up @@ -118,7 +119,7 @@ func CreateGRPCServer(logger *zerolog.Logger, settings *config.Settings) (*grpc.
}

s3Client := s3ClientFromSettings(settings)
eventService := eventrepo.New(chConn, s3Client, settings.ParquetBucket)
eventService := eventrepo.New(chConn, s3Client, s3.NewPresignClient(s3Client), settings.ParquetBucket)

rpcServer := rpc.NewServer([]string{settings.CloudEventBucket, settings.EphemeralBucket, settings.ParquetBucket}, eventService)

Expand Down
14 changes: 7 additions & 7 deletions internal/config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (

// Settings contains the application config.
type Settings struct {
Port int `yaml:"PORT"`
MonPort int `yaml:"MON_PORT"`
GRPCPort int `yaml:"GRPC_PORT"`
EnablePprof bool `yaml:"ENABLE_PPROF"`
MaxRequestDuration string `yaml:"MAX_REQUEST_DURATION"`
TokenExchangeJWTKeySetURL string `yaml:"TOKEN_EXCHANGE_JWK_KEY_SET_URL"`
TokenExchangeIssuer string `yaml:"TOKEN_EXCHANGE_ISSUER_URL"`
Port int `yaml:"PORT"`
MonPort int `yaml:"MON_PORT"`
GRPCPort int `yaml:"GRPC_PORT"`
EnablePprof bool `yaml:"ENABLE_PPROF"`
MaxRequestDuration string `yaml:"MAX_REQUEST_DURATION"`
TokenExchangeJWTKeySetURL string `yaml:"TOKEN_EXCHANGE_JWK_KEY_SET_URL"`
TokenExchangeIssuer string `yaml:"TOKEN_EXCHANGE_ISSUER_URL"`
CloudEventBucket string `yaml:"CLOUDEVENT_BUCKET"`
EphemeralBucket string `yaml:"EPHEMERAL_BUCKET"`
ParquetBucket string `yaml:"PARQUET_BUCKET"`
Expand Down
50 changes: 44 additions & 6 deletions internal/graph/base.resolvers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions internal/graph/cloud_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
)

// CloudEventWrapper holds a pointer to a RawEvent so resolvers can expose
// header, data, and dataBase64 without copying the underlying event.
// header, data, dataBase64, and dataUrl without copying the underlying event.
type CloudEventWrapper struct {
Raw *cloudevent.RawEvent
Raw *cloudevent.RawEvent
DataURL string // non-empty when the payload is a blob served via presigned URL
}

// RawJSON is the raw bytes of a JSON value. It implements graphql.Marshaler by
Expand Down
44 changes: 44 additions & 0 deletions internal/graph/generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions pkg/eventrepo/event_repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestGetLatestIndexKey(t *testing.T) {
},
}

indexService := eventrepo.New(conn, nil, "")
indexService := eventrepo.New(conn, nil, nil, "")

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestGetDataFromIndex(t *testing.T) {
ContentLength: ref(int64(len(content))),
}, nil).AnyTimes()

indexService := eventrepo.New(conn, mockS3Client, "")
indexService := eventrepo.New(conn, mockS3Client, nil, "")

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestStoreObject(t *testing.T) {
mockS3Client := NewMockObjectGetter(ctrl)
mockS3Client.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(&s3.PutObjectOutput{}, nil).AnyTimes()

indexService := eventrepo.New(conn, mockS3Client, "")
indexService := eventrepo.New(conn, mockS3Client, nil, "")

content := []byte(`{"vin": "1HGCM82633A123456"}`)
did := cloudevent.ERC721DID{
Expand Down Expand Up @@ -333,7 +333,7 @@ func TestGetData(t *testing.T) {
ctrl := gomock.NewController(t)
mockS3Client := NewMockObjectGetter(ctrl)

indexService := eventrepo.New(conn, mockS3Client, "")
indexService := eventrepo.New(conn, mockS3Client, nil, "")
// Allow GetObject calls in any order since fetches are concurrent.
if len(tt.expectedIndexKeys) > 0 {
mockS3Client.EXPECT().GetObject(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) {
Expand Down Expand Up @@ -424,7 +424,7 @@ func TestGetEventWithAllHeaderFields(t *testing.T) {
eventDataEnvelope := []byte(`{"data":` + string(eventData) + `}`)

// Create service
indexService := eventrepo.New(conn, mockS3Client, "")
indexService := eventrepo.New(conn, mockS3Client, nil, "")

// Test retrieving the event
t.Run("retrieve event with full headers", func(t *testing.T) {
Expand Down Expand Up @@ -576,7 +576,7 @@ func TestGetCloudEventFromIndex_ParquetRef(t *testing.T) {
ctrl := gomock.NewController(t)
mockS3 := mockS3ParquetReader(t, ctrl, parquetBytes)

indexService := eventrepo.New(conn, mockS3, "test-parquet-bucket")
indexService := eventrepo.New(conn, mockS3, nil, "test-parquet-bucket")

// Build the index object as GetCloudEventFromIndex expects
index := cloudevent.CloudEvent[eventrepo.ObjectInfo]{
Expand Down Expand Up @@ -655,7 +655,7 @@ func TestListCloudEventsFromIndexes_ParquetCaching(t *testing.T) {
ctrl := gomock.NewController(t)
mockS3 := mockS3ParquetReader(t, ctrl, parquetBytes)

indexService := eventrepo.New(conn, mockS3, "test-parquet-bucket")
indexService := eventrepo.New(conn, mockS3, nil, "test-parquet-bucket")

indexes := []cloudevent.CloudEvent[eventrepo.ObjectInfo]{
{CloudEventHeader: hdr0, Data: eventrepo.ObjectInfo{Key: indexKeys[0]}},
Expand Down Expand Up @@ -747,7 +747,7 @@ func TestListIndexesAdvanced(t *testing.T) {
keyTypeStatusSource1Producer3 := insertTestData(t, ctx, conn, eventIdx3)
keyTypeStatusSource3Producer4 := insertTestData(t, ctx, conn, eventIdx4)

indexService := eventrepo.New(conn, nil, "")
indexService := eventrepo.New(conn, nil, nil, "")

tests := []struct {
name string
Expand Down Expand Up @@ -1032,7 +1032,7 @@ func TestGetCloudEventTypeSummaries(t *testing.T) {
insertTestData(t, ctx, conn, status3)
insertTestData(t, ctx, conn, fp1)

indexService := eventrepo.New(conn, nil, "")
indexService := eventrepo.New(conn, nil, nil, "")

t.Run("no filter returns all types", func(t *testing.T) {
opts := &grpc.SearchOptions{
Expand Down
33 changes: 32 additions & 1 deletion pkg/eventrepo/eventrepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
chindexer "github.com/DIMO-Network/cloudevent/clickhouse"
"github.com/DIMO-Network/cloudevent/parquet"
"github.com/DIMO-Network/fetch-api/pkg/grpc"
"github.com/aws/aws-sdk-go-v2/aws"
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/volatiletech/sqlboiler/v4/drivers"
"github.com/volatiletech/sqlboiler/v4/queries"
Expand All @@ -30,6 +32,7 @@ const tagsColumn = "JSONExtract(extras, 'tags', 'Array(String)')"
// Service manages and retrieves data messages from indexed objects in S3.
type Service struct {
objGetter ObjectGetter
presigner Presigner
chConn clickhouse.Conn
// parquetBucket is the object storage bucket for Iceberg Parquet files.
parquetBucket string
Expand All @@ -46,15 +49,43 @@ type ObjectGetter interface {
PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error)
}

// Presigner generates presigned S3 GET URLs.
type Presigner interface {
PresignGetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.PresignOptions)) (*v4.PresignedHTTPRequest, error)
}

// BlobKeyPrefix is the S3 key prefix used for large binary blob objects.
// Keys with this prefix are served via presigned URL instead of inline in the response.
const BlobKeyPrefix = "cloudevent/blobs/"

// presignTTL is the lifetime of generated presigned S3 URLs.
const presignTTL = 15 * time.Minute

// New creates a new instance of Service.
func New(chConn clickhouse.Conn, objGetter ObjectGetter, parquetBucket string) *Service {
func New(chConn clickhouse.Conn, objGetter ObjectGetter, presigner Presigner, parquetBucket string) *Service {
return &Service{
objGetter: objGetter,
presigner: presigner,
chConn: chConn,
parquetBucket: parquetBucket,
}
}

// PresignBlobURL returns a short-lived presigned GET URL for the given S3 key and bucket.
func (s *Service) PresignBlobURL(ctx context.Context, key, bucket string) (string, error) {
if s.presigner == nil {
return "", fmt.Errorf("presigner not configured")
}
req, err := s.presigner.PresignGetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
}, s3.WithPresignExpires(presignTTL))
if err != nil {
return "", fmt.Errorf("presign %s/%s: %w", bucket, key, err)
}
return req.URL, nil
}

// GetLatestIndex returns the latest cloud event index that matches the given options.
func (s *Service) GetLatestIndex(ctx context.Context, opts *grpc.SearchOptions) (cloudevent.CloudEvent[ObjectInfo], error) {
advancedOpts := convertSearchOptionsToAdvanced(opts)
Expand Down
Loading
Loading