diff --git a/api/new_chain_flusher.go b/api/new_chain_flusher.go new file mode 100644 index 00000000..37b1de77 --- /dev/null +++ b/api/new_chain_flusher.go @@ -0,0 +1,224 @@ +package api + +import ( + "context" + "crypto/tls" + "net" + "net/http" + "strings" + "time" + + "api.audius.co/config" + "connectrpc.com/connect" + v1 "github.com/OpenAudio/go-openaudio/pkg/api/core/v1" + corev1connect "github.com/OpenAudio/go-openaudio/pkg/api/core/v1/v1connect" + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgxpool" + "go.uber.org/zap" + "golang.org/x/net/http2" + "google.golang.org/protobuf/proto" +) + +// NewChainFlusher reads rows from new_chain_queue and forwards them to the new +// Core chain. On startup it deletes rows covered by the backfill (confirmed_block +// < cfg.NewChainFlushFromBlock), then sends the rest in id order, one at a time. +// Sequential processing is required to preserve transaction ordering across users +// (dev apps can act on behalf of other users, creating cross-user dependencies). +type NewChainFlusher struct { + cfg *config.Config + writePool *pgxpool.Pool + chainClient corev1connect.CoreServiceClient + logger *zap.Logger +} + +func NewNewChainFlusher(cfg *config.Config, writePool *pgxpool.Pool, logger *zap.Logger) *NewChainFlusher { + chainURL := cfg.NewChainURL + if !strings.HasPrefix(chainURL, "http://") && !strings.HasPrefix(chainURL, "https://") { + chainURL = "https://" + chainURL + } + + var httpClient *http.Client + if strings.HasPrefix(chainURL, "http://") { + // h2c: plain HTTP/2, bypasses nginx and talks directly to the gRPC port + httpClient = &http.Client{ + Transport: &http2.Transport{ + AllowHTTP: true, + DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) { + return net.Dial(network, addr) + }, + }, + } + } else if cfg.NewChainInsecureSkipVerify { + httpClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + } + } else { + httpClient = http.DefaultClient + } + + return newFlusherWithClient(cfg, writePool, corev1connect.NewCoreServiceClient(httpClient, chainURL), logger) +} + +// newFlusherWithClient constructs a flusher with a pre-built chain client. +// Used in tests to inject a plain HTTP/1.1 client against httptest.Server. +func newFlusherWithClient(cfg *config.Config, writePool *pgxpool.Pool, client corev1connect.CoreServiceClient, logger *zap.Logger) *NewChainFlusher { + return &NewChainFlusher{ + cfg: cfg, + writePool: writePool, + chainClient: client, + logger: logger.With(zap.String("component", "new_chain_flusher")), + } +} + +// Start trims backfill-covered rows, then continuously drains new_chain_queue. +// Runs until ctx is cancelled. +func (f *NewChainFlusher) Start(ctx context.Context) { + if err := f.trimBackfillRows(ctx); err != nil { + f.logger.Error("trim failed", zap.Error(err)) + // non-fatal: continue flushing with whatever rows remain + } + + f.logger.Info("starting flush loop", + zap.String("new_chain_url", f.cfg.NewChainURL), + ) + + const batchSize = 500 + + for { + if ctx.Err() != nil { + return + } + + rows, err := f.fetchBatch(ctx, batchSize) + if err != nil { + if ctx.Err() != nil { + return + } + f.logger.Error("fetch batch failed", zap.Error(err)) + sleep(ctx, 2*time.Second) + continue + } + + if len(rows) == 0 { + // Queue is empty; pause before checking again. + sleep(ctx, 500*time.Millisecond) + continue + } + + for _, row := range rows { + if ctx.Err() != nil { + return + } + if err := f.flushRow(ctx, row); err != nil && ctx.Err() == nil { + // Do not advance past a failed row: ordering requires every tx + // to land before the next one is sent. Sleep and let the outer + // loop re-fetch the same row. + f.logger.Error("flush row failed, retrying", zap.Int64("id", row.id), zap.Error(err)) + sleep(ctx, 2*time.Second) + break + } + } + } +} + +type queueRow struct { + id int64 + txRaw []byte +} + +func (f *NewChainFlusher) fetchBatch(ctx context.Context, limit int) ([]queueRow, error) { + rows, err := f.writePool.Query(ctx, + `SELECT id, tx_data FROM new_chain_queue ORDER BY id LIMIT $1`, + limit, + ) + if err != nil { + return nil, err + } + defer rows.Close() + + var result []queueRow + for rows.Next() { + var r queueRow + if err := rows.Scan(&r.id, &r.txRaw); err != nil { + return nil, err + } + result = append(result, r) + } + return result, rows.Err() +} + +func (f *NewChainFlusher) flushRow(ctx context.Context, row queueRow) error { + var tx v1.ManageEntityLegacy + if err := proto.Unmarshal(row.txRaw, &tx); err != nil { + // Corrupt row — delete it and move on rather than retrying forever. + f.logger.Error("corrupt queue row, deleting", zap.Int64("id", row.id), zap.Error(err)) + _, _ = f.writePool.Exec(ctx, `DELETE FROM new_chain_queue WHERE id = $1`, row.id) + return nil + } + + req := connect.NewRequest(&v1.ForwardTransactionRequest{ + Transaction: &v1.SignedTransaction{ + RequestId: uuid.NewString(), + Transaction: &v1.SignedTransaction_ManageEntity{ + ManageEntity: &tx, + }, + }, + }) + + const ( + baseDelay = 500 * time.Millisecond + maxDelay = 15 * time.Second + ) + delay := baseDelay + for { + _, err := f.chainClient.ForwardTransaction(ctx, req) + if err == nil { + break + } + if !strings.Contains(err.Error(), "mempool full") { + return err + } + f.logger.Warn("mempool full, pausing flush", zap.Duration("wait", delay)) + sleep(ctx, delay) + if ctx.Err() != nil { + return ctx.Err() + } + delay *= 2 + if delay > maxDelay { + delay = maxDelay + } + } + + _, err := f.writePool.Exec(ctx, `DELETE FROM new_chain_queue WHERE id = $1`, row.id) + return err +} + +// trimBackfillRows deletes all queue rows whose confirmed_block is before the +// configured flush-from block, i.e. rows already covered by the genesis backfill. +// Rows with a NULL confirmed_block are kept: NULL < $1 evaluates to NULL (falsy) in SQL. +func (f *NewChainFlusher) trimBackfillRows(ctx context.Context) error { + if f.cfg.NewChainFlushFromBlock <= 0 { + return nil + } + tag, err := f.writePool.Exec(ctx, + `DELETE FROM new_chain_queue WHERE confirmed_block < $1`, + f.cfg.NewChainFlushFromBlock, + ) + if err != nil { + return err + } + f.logger.Info("trimmed backfill-covered rows", + zap.Int64("deleted", tag.RowsAffected()), + zap.Int64("flush_from_block", f.cfg.NewChainFlushFromBlock), + ) + return nil +} + +func sleep(ctx context.Context, d time.Duration) { + select { + case <-ctx.Done(): + case <-time.After(d): + } +} diff --git a/api/new_chain_flusher_test.go b/api/new_chain_flusher_test.go new file mode 100644 index 00000000..674f01ce --- /dev/null +++ b/api/new_chain_flusher_test.go @@ -0,0 +1,246 @@ +package api + +import ( + "context" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" + + "api.audius.co/config" + "api.audius.co/database" + "connectrpc.com/connect" + corev1 "github.com/OpenAudio/go-openaudio/pkg/api/core/v1" + corev1connect "github.com/OpenAudio/go-openaudio/pkg/api/core/v1/v1connect" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" +) + +// mockCoreService implements only ForwardTransaction; all other methods return Unimplemented. +type mockCoreService struct { + corev1connect.UnimplementedCoreServiceHandler + received []*corev1.ManageEntityLegacy + calls atomic.Int32 +} + +func (m *mockCoreService) ForwardTransaction(_ context.Context, req *connect.Request[corev1.ForwardTransactionRequest]) (*connect.Response[corev1.ForwardTransactionResponse], error) { + m.calls.Add(1) + if me := req.Msg.Transaction.GetManageEntity(); me != nil { + m.received = append(m.received, me) + } + return connect.NewResponse(&corev1.ForwardTransactionResponse{}), nil +} + +func newTestFlusher(t *testing.T, cfg *config.Config) (*NewChainFlusher, *mockCoreService) { + t.Helper() + mock := &mockCoreService{} + mux := http.NewServeMux() + path, handler := corev1connect.NewCoreServiceHandler(mock) + mux.Handle(path, handler) + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + + pool := database.CreateTestDatabase(t, "test_api") + + // Create the new_chain_queue table (not in template DB yet). + _, err := pool.Exec(context.Background(), ` + CREATE TABLE IF NOT EXISTS new_chain_queue ( + id bigserial PRIMARY KEY, + created_at timestamptz NOT NULL DEFAULT now(), + tx_data bytea NOT NULL, + confirmed_block bigint + ) + `) + require.NoError(t, err) + + cfg.NewChainURL = srv.URL + + logger, _ := zap.NewDevelopment() + client := corev1connect.NewCoreServiceClient(http.DefaultClient, srv.URL) + f := newFlusherWithClient(cfg, pool, client, logger) + return f, mock +} + +func sampleTx(entityID int64) *corev1.ManageEntityLegacy { + return &corev1.ManageEntityLegacy{ + UserId: 1, + EntityType: "Track", + EntityId: entityID, + Action: "Create", + Metadata: "{}", + Nonce: "0xdeadbeef", + } +} + +func insertQueueRow(t *testing.T, f *NewChainFlusher, tx *corev1.ManageEntityLegacy, confirmedBlock *int64) { + t.Helper() + b, err := proto.Marshal(tx) + require.NoError(t, err) + _, err = f.writePool.Exec(context.Background(), + `INSERT INTO new_chain_queue (tx_data, confirmed_block) VALUES ($1, $2)`, + b, confirmedBlock, + ) + require.NoError(t, err) +} + +func queueDepth(t *testing.T, f *NewChainFlusher) int { + t.Helper() + var n int + err := f.writePool.QueryRow(context.Background(), `SELECT count(*) FROM new_chain_queue`).Scan(&n) + require.NoError(t, err) + return n +} + +// TestEnqueueForNewChain verifies that enqueueForNewChain inserts a row with the +// correct tx_data and confirmed_block. +func TestEnqueueForNewChain(t *testing.T) { + app := emptyTestApp(t) + + // Add new_chain_queue to the test DB. + _, err := app.writePool.Exec(context.Background(), ` + CREATE TABLE IF NOT EXISTS new_chain_queue ( + id bigserial PRIMARY KEY, + created_at timestamptz NOT NULL DEFAULT now(), + tx_data bytea NOT NULL, + confirmed_block bigint + ) + `) + require.NoError(t, err) + + tx := sampleTx(42) + app.enqueueForNewChain(tx, 999) + + // enqueueForNewChain is fire-and-forget; give it a moment to write. + require.Eventually(t, func() bool { + var n int + app.writePool.QueryRow(context.Background(), `SELECT count(*) FROM new_chain_queue`).Scan(&n) //nolint:errcheck + return n == 1 + }, 2*time.Second, 50*time.Millisecond) + + var txData []byte + var confirmedBlock int64 + err = app.writePool.QueryRow(context.Background(), + `SELECT tx_data, confirmed_block FROM new_chain_queue LIMIT 1`, + ).Scan(&txData, &confirmedBlock) + require.NoError(t, err) + require.Equal(t, int64(999), confirmedBlock) + + var got corev1.ManageEntityLegacy + require.NoError(t, proto.Unmarshal(txData, &got)) + require.Equal(t, int64(42), got.EntityId) + require.Equal(t, "Track", got.EntityType) +} + +// TestNewChainFlusherTrim verifies that rows with confirmed_block < FlushFromBlock +// are deleted on startup, and rows at or above the threshold are kept. +func TestNewChainFlusherTrim(t *testing.T) { + cfg := &config.Config{NewChainFlushFromBlock: 100} + f, _ := newTestFlusher(t, cfg) + + block50 := int64(50) + block99 := int64(99) + block100 := int64(100) + block200 := int64(200) + insertQueueRow(t, f, sampleTx(1), &block50) // should be trimmed + insertQueueRow(t, f, sampleTx(2), &block99) // should be trimmed + insertQueueRow(t, f, sampleTx(3), &block100) // kept (boundary) + insertQueueRow(t, f, sampleTx(4), &block200) // kept + insertQueueRow(t, f, sampleTx(5), nil) // NULL confirmed_block — kept + + require.Equal(t, 5, queueDepth(t, f)) + + err := f.trimBackfillRows(context.Background()) + require.NoError(t, err) + + require.Equal(t, 3, queueDepth(t, f)) + + // Verify the surviving entity IDs. + rows, err := f.writePool.Query(context.Background(), + `SELECT (tx_data) FROM new_chain_queue ORDER BY id`, + ) + require.NoError(t, err) + defer rows.Close() + + var ids []int64 + for rows.Next() { + var b []byte + require.NoError(t, rows.Scan(&b)) + var me corev1.ManageEntityLegacy + require.NoError(t, proto.Unmarshal(b, &me)) + ids = append(ids, me.EntityId) + } + require.Equal(t, []int64{3, 4, 5}, ids) +} + +// TestNewChainFlusherSends verifies that the flusher forwards all queued rows to +// the new chain and deletes them on success. +func TestNewChainFlusherSends(t *testing.T) { + cfg := &config.Config{} // no trim + f, mock := newTestFlusher(t, cfg) + + block10 := int64(10) + block20 := int64(20) + block30 := int64(30) + insertQueueRow(t, f, sampleTx(1), &block10) + insertQueueRow(t, f, sampleTx(2), &block20) + insertQueueRow(t, f, sampleTx(3), &block30) + + require.Equal(t, 3, queueDepth(t, f)) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + go f.Start(ctx) + + require.Eventually(t, func() bool { + return mock.calls.Load() == 3 + }, 5*time.Second, 50*time.Millisecond, "expected 3 ForwardTransaction calls") + + require.Eventually(t, func() bool { + return queueDepth(t, f) == 0 + }, 5*time.Second, 50*time.Millisecond, "expected queue to drain") + + // Verify all three entity IDs were forwarded. + receivedIDs := make([]int64, len(mock.received)) + for i, me := range mock.received { + receivedIDs[i] = me.EntityId + } + require.ElementsMatch(t, []int64{1, 2, 3}, receivedIDs) +} + +// TestNewChainFlusherTrimThenSend verifies the combined trim+flush flow: +// rows before the cutoff block are dropped; rows after are forwarded. +func TestNewChainFlusherTrimThenSend(t *testing.T) { + cfg := &config.Config{NewChainFlushFromBlock: 50} + f, mock := newTestFlusher(t, cfg) + + block10 := int64(10) // pre-backfill — trimmed + block49 := int64(49) // pre-backfill — trimmed + block50 := int64(50) // post-backfill — flushed + block100 := int64(100) // post-backfill — flushed + insertQueueRow(t, f, sampleTx(1), &block10) + insertQueueRow(t, f, sampleTx(2), &block49) + insertQueueRow(t, f, sampleTx(3), &block50) + insertQueueRow(t, f, sampleTx(4), &block100) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + go f.Start(ctx) + + require.Eventually(t, func() bool { + return mock.calls.Load() == 2 + }, 5*time.Second, 50*time.Millisecond, "expected 2 ForwardTransaction calls (post-trim)") + + require.Eventually(t, func() bool { + return queueDepth(t, f) == 0 + }, 5*time.Second, 50*time.Millisecond, "expected queue to drain") + + receivedIDs := make([]int64, len(mock.received)) + for i, me := range mock.received { + receivedIDs[i] = me.EntityId + } + require.ElementsMatch(t, []int64{3, 4}, receivedIDs) +} diff --git a/api/relay.go b/api/relay.go index b07204b9..c702a8a1 100644 --- a/api/relay.go +++ b/api/relay.go @@ -19,6 +19,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/gofiber/fiber/v2" "go.uber.org/zap" + "google.golang.org/protobuf/proto" ) const ( @@ -285,6 +286,13 @@ func (app *ApiServer) handleRelay(ctx context.Context, logger *zap.Logger, decod msg := res.Msg.Transaction endpointLogger.Info("transaction confirmed", zap.String("hash", msg.GetHash())) + + // Enqueue for new chain if dual-write is enabled. Fire-and-forget: + // a queue failure must never fail the relay response to the client. + if app.config.NewChainQueueEnabled && app.writePool != nil { + go app.enqueueForNewChain(decodedTx, msg.GetHeight()) + } + return msg, nil } @@ -292,6 +300,23 @@ func (app *ApiServer) handleRelay(ctx context.Context, logger *zap.Logger, decod return nil, fmt.Errorf("all endpoints failed, last error: %w", lastErr) } +// enqueueForNewChain serializes a confirmed ManageEntityLegacy transaction and +// inserts it into new_chain_queue. Called asynchronously; logs errors but never panics. +func (app *ApiServer) enqueueForNewChain(tx *v1.ManageEntityLegacy, confirmedBlock int64) { + b, err := proto.Marshal(tx) + if err != nil { + app.logger.Warn("new_chain_queue: marshal failed", zap.Error(err)) + return + } + _, err = app.writePool.Exec(context.Background(), + `INSERT INTO new_chain_queue (tx_data, confirmed_block) VALUES ($1, $2)`, + b, confirmedBlock, + ) + if err != nil { + app.logger.Warn("new_chain_queue: insert failed", zap.Error(err)) + } +} + func transactionToReceipt(tx *v1.Transaction, wallet string) map[string]interface{} { return map[string]interface{}{ "transactionHash": tx.GetHash(), diff --git a/api/server.go b/api/server.go index e6bf1f4f..ea9a21b1 100644 --- a/api/server.go +++ b/api/server.go @@ -219,6 +219,11 @@ func NewApiServer(config config.Config) *ApiServer { panic(err) } + var newChainFlusher *NewChainFlusher + if config.NewChainFlushEnabled && config.NewChainURL != "" && writePool != nil { + newChainFlusher = NewNewChainFlusher(&config, writePool, logger) + } + app := &ApiServer{ App: fiber.New(fiber.Config{ JSONEncoder: json.Marshal, @@ -258,6 +263,7 @@ func NewApiServer(config config.Config) *ApiServer { birdeyeClient: birdeye.New(config.BirdeyeToken), solanaRpcClient: solanaRpc, meteoraDbcClient: meteoraDbcClient, + newChainFlusher: newChainFlusher, } // Set up a custom decoder for HashIds so they can be parsed in lists @@ -793,6 +799,7 @@ type ApiServer struct { meteoraDbcClient *meteora_dbc.Client validators *Nodes openAudioPool *OpenAudioPool + newChainFlusher *NewChainFlusher } func (app *ApiServer) home(c *fiber.Ctx) error { @@ -873,6 +880,14 @@ func (app *ApiServer) Serve() { app.logger.Info("Started validators poller") }() + if app.newChainFlusher != nil { + go func() { + app.logger.Info("Starting new chain flusher") + app.newChainFlusher.Start(ctx) + app.logger.Info("New chain flusher stopped") + }() + } + // Bind to both ipv4 and ipv6 listener, err := net.Listen("tcp", "[::]:1323") if err != nil { diff --git a/config/config.go b/config/config.go index 29cbe594..32722aea 100644 --- a/config/config.go +++ b/config/config.go @@ -58,6 +58,20 @@ type Config struct { AudiusApiSecret string // Shared secret for notifications-dashboard (or other internal jobs) to read notification campaign push open counts NotificationCampaignOpenMetricsSecret string + + // Genesis migration dual-write queue. + // NewChainURL is the bootstrap chain gRPC endpoint (e.g. http://bootstrap-node:50051). + // NewChainQueueEnabled turns on enqueuing of relayed txs for the new chain. + // NewChainFlushEnabled turns on the background flusher goroutine. + // NewChainFlushFromBlock, when set, causes the flusher to delete all queued rows with + // confirmed_block < NewChainFlushFromBlock before sending — trimming rows already + // covered by the backfill. + // NewChainInsecureSkipVerify disables TLS verification for the new chain endpoint (e.g. localstack). + NewChainURL string + NewChainQueueEnabled bool + NewChainFlushEnabled bool + NewChainFlushFromBlock int64 + NewChainInsecureSkipVerify bool } var Cfg = Config{ @@ -272,4 +286,17 @@ func init() { if v := os.Getenv("archiverNodes"); v != "" { Cfg.ArchiverNodes = strings.Split(v, ",") } + + // Genesis migration dual-write queue + Cfg.NewChainURL = os.Getenv("newChainUrl") + Cfg.NewChainQueueEnabled = os.Getenv("newChainQueueEnabled") == "true" + Cfg.NewChainFlushEnabled = os.Getenv("newChainFlushEnabled") == "true" + Cfg.NewChainInsecureSkipVerify = os.Getenv("newChainInsecureSkipVerify") == "true" + if v := os.Getenv("newChainFlushFromBlock"); v != "" { + n, err := strconv.ParseInt(v, 10, 64) + if err != nil { + panic("Invalid newChainFlushFromBlock: " + err.Error()) + } + Cfg.NewChainFlushFromBlock = n + } } diff --git a/ddl/migrations/0180_new_chain_queue.sql b/ddl/migrations/0180_new_chain_queue.sql new file mode 100644 index 00000000..c9de18c5 --- /dev/null +++ b/ddl/migrations/0180_new_chain_queue.sql @@ -0,0 +1,16 @@ +BEGIN; + +CREATE TABLE IF NOT EXISTS new_chain_queue ( + id bigserial PRIMARY KEY, + created_at timestamptz NOT NULL DEFAULT now(), + tx_data bytea NOT NULL, + confirmed_block bigint +); + +COMMENT ON TABLE new_chain_queue IS 'Queue of ManageEntity transactions to be forwarded to the new Core chain (audius-mainnet-v2) during genesis migration.'; +COMMENT ON COLUMN new_chain_queue.tx_data IS 'Protobuf-serialized ManageEntityLegacy message.'; +COMMENT ON COLUMN new_chain_queue.confirmed_block IS 'Block height on the old chain where this transaction was confirmed. NULL if confirmation was not recorded (e.g. relay restart).'; + +CREATE INDEX IF NOT EXISTS new_chain_queue_confirmed_block_idx ON new_chain_queue (confirmed_block); + +COMMIT;