Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,21 @@ jobs:
node-version: 24.10.0
- name: Check for circular dependencies
run: pnpm circular

integration:
name: Integration Tests
runs-on: ubuntu-latest
permissions:
contents: read
timeout-minutes: 15
steps:
- uses: actions/checkout@v4
- name: Install dependencies
uses: ./.github/actions/setup
with:
node-version: 24.10.0
- name: Run integration tests
run: pnpm test:integration
- name: Collect container logs on failure
if: failure()
run: docker compose -p amp-integration-tests logs --tail 200 2>/dev/null || true
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ scratchpad/**/*.md
scratchpad/**/*.ts
!scratchpad/index.ts

# Agent Context
.repos/

# Generated by foundry
out/
broadcast/
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ services:

anvil:
image: ghcr.io/foundry-rs/foundry
entrypoint: ["anvil", "--host", "0.0.0.0"]
entrypoint: ["anvil", "--host", "0.0.0.0", "--block-time", "1"]
ports:
- 8545:8545

Expand Down Expand Up @@ -54,7 +54,7 @@ services:
# Amp is a data engineering layer for Ethereum.
# https://github.com/edgeandnode/amp
amp:
image: ghcr.io/edgeandnode/amp:latest
image: ghcr.io/edgeandnode/amp:canary
command: ["--config", "/var/lib/amp/config.toml", "dev"]
depends_on:
- postgres
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"clean": "node scripts/clean.mjs",
"lint": "oxlint && dprint check",
"lint:fix": "oxlint --fix && dprint fmt",
"test": "vitest"
"test": "vitest",
"test:integration": "pnpm --filter @edgeandnode/amp test:integration"
},
"pnpm": {
"overrides": {
Expand Down Expand Up @@ -41,6 +42,7 @@
"madge": "^8.0.0",
"oxlint": "^1.42.0",
"rulesync": "^6.7.0",
"testcontainers": "^11.12.0",
"ts-patch": "^3.3.0",
"typescript": "^5.9.3",
"vite-tsconfig-paths": "^6.0.5",
Expand Down
1 change: 1 addition & 0 deletions packages/amp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"babel": "babel dist --plugins annotate-pure-calls --out-dir dist --source-maps",
"check": "tsc -b tsconfig.json",
"test": "vitest",
"test:integration": "vitest run --config vitest.integration.config.ts",
"coverage": "vitest --coverage"
},
"peerDependencies": {
Expand Down
22 changes: 21 additions & 1 deletion packages/amp/src/admin/domain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,27 @@ export class GetDatasetVersionResponse extends Schema.Class<GetDatasetVersionRes
export class GetDatasetVersionsResponse extends Schema.Class<GetDatasetVersionsResponse>(
"Amp/AdminApi/GetDatasetVersionsResponse"
)({
versions: Schema.Array(Models.DatasetVersion)
namespace: Models.DatasetNamespace,
name: Models.DatasetName,
versions: Schema.Array(Schema.Struct({
version: Models.DatasetVersion,
manifestHash: Models.DatasetHash.pipe(
Schema.propertySignature,
Schema.fromKey("manifest_hash")
),
createdAt: Schema.DateTimeUtc.pipe(
Schema.propertySignature,
Schema.fromKey("created_at")
),
updatedAt: Schema.DateTimeUtc.pipe(
Schema.propertySignature,
Schema.fromKey("updated_at")
)
})),
specialTags: Schema.Struct({
dev: Schema.optional(Schema.Union(Models.DatasetVersion, Models.DatasetHash)),
latest: Schema.optional(Schema.Union(Models.DatasetVersion, Models.DatasetHash))
}).pipe(Schema.propertySignature, Schema.fromKey("special_tags"))
}, { identifier: "GetDatasetVersionsResponse" }) {}

/**
Expand Down
213 changes: 213 additions & 0 deletions packages/amp/test/integration/fixtures/anvil-manifest.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/**
* Anvil evm-rpc manifest for integration tests.
*/
import type * as Models from "@edgeandnode/amp/core"

const blocksFields: Array<Models.ArrowField> = [
{ name: "_block_num", type: "UInt64", nullable: false },
{ name: "block_num", type: "UInt64", nullable: false },
{ name: "timestamp", type: { Timestamp: ["Nanosecond", "+00:00"] }, nullable: false },
{ name: "hash", type: { FixedSizeBinary: 32 }, nullable: false },
{ name: "parent_hash", type: { FixedSizeBinary: 32 }, nullable: false },
{ name: "ommers_hash", type: { FixedSizeBinary: 32 }, nullable: false },
{ name: "miner", type: { FixedSizeBinary: 20 }, nullable: false },
{ name: "state_root", type: { FixedSizeBinary: 32 }, nullable: false },
{ name: "transactions_root", type: { FixedSizeBinary: 32 }, nullable: false },
{ name: "receipt_root", type: { FixedSizeBinary: 32 }, nullable: false },
{ name: "logs_bloom", type: "Binary", nullable: false },
{ name: "difficulty", type: { Decimal128: [38, 0] }, nullable: false },
{ name: "total_difficulty", type: { Decimal128: [38, 0] }, nullable: true },
{ name: "gas_limit", type: "UInt64", nullable: false },
{ name: "gas_used", type: "UInt64", nullable: false },
{ name: "extra_data", type: "Binary", nullable: false },
{ name: "mix_hash", type: { FixedSizeBinary: 32 }, nullable: false },
{ name: "nonce", type: "UInt64", nullable: false },
{ name: "base_fee_per_gas", type: { Decimal128: [38, 0] }, nullable: true },
{ name: "withdrawals_root", type: { FixedSizeBinary: 32 }, nullable: true },
{ name: "blob_gas_used", type: "UInt64", nullable: true },
{ name: "excess_blob_gas", type: "UInt64", nullable: true },
{ name: "parent_beacon_root", type: { FixedSizeBinary: 32 }, nullable: true },
{ name: "requests_hash", type: { FixedSizeBinary: 32 }, nullable: true }
]

const transactionsFields: Array<Models.ArrowField> = [
{ name: "_block_num", type: "UInt64", nullable: false },
{ name: "block_hash", type: { FixedSizeBinary: 32 }, nullable: false },
{ name: "block_num", type: "UInt64", nullable: false },
{ name: "timestamp", type: { Timestamp: ["Nanosecond", "+00:00"] }, nullable: false },
{ name: "tx_index", type: "UInt32", nullable: false },
{ name: "tx_hash", type: { FixedSizeBinary: 32 }, nullable: false },
{ name: "to", type: { FixedSizeBinary: 20 }, nullable: true },
{ name: "nonce", type: "UInt64", nullable: false },
{ name: "gas_price", type: { Decimal128: [38, 0] }, nullable: true },
{ name: "gas_limit", type: "UInt64", nullable: false },
{ name: "value", type: "Utf8", nullable: false },
{ name: "input", type: "Binary", nullable: false },
{ name: "r", type: { FixedSizeBinary: 32 }, nullable: false },
{ name: "s", type: { FixedSizeBinary: 32 }, nullable: false },
{ name: "v_parity", type: "Boolean", nullable: false },
{ name: "chain_id", type: "UInt64", nullable: true },
{ name: "gas_used", type: "UInt64", nullable: false },
{ name: "type", type: "Int32", nullable: false },
{ name: "max_fee_per_gas", type: { Decimal128: [38, 0] }, nullable: true },
{ name: "max_priority_fee_per_gas", type: { Decimal128: [38, 0] }, nullable: true },
{ name: "max_fee_per_blob_gas", type: { Decimal128: [38, 0] }, nullable: true },
{ name: "from", type: { FixedSizeBinary: 20 }, nullable: false },
{ name: "status", type: "Boolean", nullable: false },
{ name: "state_root", type: { FixedSizeBinary: 32 }, nullable: true },
{
name: "access_list",
type: {
List: {
name: "item",
nullable: false,
data_type: {
Struct: [
{
name: "address",
nullable: false,
data_type: { FixedSizeBinary: 20 },
dict_id: 0,
dict_is_ordered: false,
metadata: {}
},
{
name: "storage_keys",
nullable: false,
data_type: {
List: {
name: "item",
nullable: false,
data_type: { FixedSizeBinary: 32 },
dict_id: 0,
dict_is_ordered: false,
metadata: {}
}
},
dict_id: 0,
dict_is_ordered: false,
metadata: {}
}
]
},
dict_id: 0,
dict_is_ordered: false,
metadata: {}
}
},
nullable: true
},
{
name: "blob_versioned_hashes",
type: {
List: {
name: "item",
nullable: false,
data_type: { FixedSizeBinary: 32 },
dict_id: 0,
dict_is_ordered: false,
metadata: {}
}
},
nullable: true
},
{
name: "authorization_list",
type: {
List: {
name: "item",
nullable: false,
data_type: {
Struct: [
{
name: "chain_id",
nullable: false,
data_type: "UInt64",
dict_id: 0,
dict_is_ordered: false,
metadata: {}
},
{
name: "address",
nullable: false,
data_type: { FixedSizeBinary: 20 },
dict_id: 0,
dict_is_ordered: false,
metadata: {}
},
{
name: "nonce",
nullable: false,
data_type: "UInt64",
dict_id: 0,
dict_is_ordered: false,
metadata: {}
},
{
name: "y_parity",
nullable: false,
data_type: "Boolean",
dict_id: 0,
dict_is_ordered: false,
metadata: {}
},
{
name: "r",
nullable: false,
data_type: { FixedSizeBinary: 32 },
dict_id: 0,
dict_is_ordered: false,
metadata: {}
},
{
name: "s",
nullable: false,
data_type: { FixedSizeBinary: 32 },
dict_id: 0,
dict_is_ordered: false,
metadata: {}
}
]
},
dict_id: 0,
dict_is_ordered: false,
metadata: {}
}
},
nullable: true
}
]

const logsFields: Array<Models.ArrowField> = [
{ name: "_block_num", type: "UInt64", nullable: false },
{ name: "block_hash", type: { FixedSizeBinary: 32 }, nullable: false },
{ name: "block_num", type: "UInt64", nullable: false },
{ name: "timestamp", type: { Timestamp: ["Nanosecond", "+00:00"] }, nullable: false },
{ name: "tx_hash", type: { FixedSizeBinary: 32 }, nullable: false },
{ name: "tx_index", type: "UInt32", nullable: false },
{ name: "log_index", type: "UInt32", nullable: false },
{ name: "address", type: { FixedSizeBinary: 20 }, nullable: false },
{ name: "topic0", type: { FixedSizeBinary: 32 }, nullable: true },
{ name: "topic1", type: { FixedSizeBinary: 32 }, nullable: true },
{ name: "topic2", type: { FixedSizeBinary: 32 }, nullable: true },
{ name: "topic3", type: { FixedSizeBinary: 32 }, nullable: true },
{ name: "data", type: "Binary", nullable: false }
]

const network = "anvil" as Models.Network

/**
* Anvil evm-rpc manifest — typed as `DatasetManifest` for direct use with
* `AdminApi.registerDataset`.
*/
export const anvilManifest: Models.DatasetManifest = {
kind: "evm-rpc",
network,
startBlock: 0,
finalizedBlocksOnly: false,
tables: {
blocks: { schema: { arrow: { fields: blocksFields } }, network },
transactions: { schema: { arrow: { fields: transactionsFields } }, network },
logs: { schema: { arrow: { fields: logsFields } }, network }
}
}
75 changes: 75 additions & 0 deletions packages/amp/test/integration/helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import * as AdminService from "@edgeandnode/amp/admin/service"
import * as Models from "@edgeandnode/amp/core"
import * as Effect from "effect/Effect"

/**
* Generate a unique dataset name for test isolation.
* Each test group gets its own name so tests don't collide.
*/
export const uniqueDatasetName = (prefix: string): Models.DatasetName =>
Models.DatasetName.make(`${prefix}_${Date.now()}`)

/** Terminal job statuses — polling stops when the job reaches one of these. */
const TERMINAL_STATUSES = new Set<string>(["COMPLETED", "STOPPED", "FAILED", "UNKNOWN"])

/**
* Poll job status until it reaches a terminal state.
* Polls up to 15 times with 1-second spacing (15s total).
* Returns the final `JobInfo`.
*/
export const waitForJob = Effect.fn(function*(jobId: number) {
const admin = yield* AdminService.AdminApi

for (let attempt = 1; attempt <= 15; attempt++) {
if (attempt > 1) {
yield* Effect.sleep("1 second")
}
const job = yield* admin.getJobById(jobId)
if (TERMINAL_STATUSES.has(job.status)) {
return job
}
}

return yield* Effect.die(
new Error(
`Job ${jobId} did not reach terminal state after 15 attempts`
)
)
})

/**
* Poll sync progress until at least one table has blocks.
* Polls up to 60 times with 2-second spacing (120s total).
*
* Resilient to transient errors — the sync progress endpoint may not
* be available immediately after job completion (the SDK converts some
* API errors to defects via `Effect.die`).
*/
export const waitForSync = Effect.fn(
function*(
namespace: Models.DatasetNamespace,
name: Models.DatasetName,
revision: Models.DatasetRevision
) {
const admin = yield* AdminService.AdminApi

for (let attempt = 1; attempt <= 60; attempt++) {
if (attempt > 1) {
yield* Effect.sleep("2 seconds")
}
const synced = yield* admin.getDatasetSyncProgress(namespace, name, revision).pipe(
Effect.map((progress) => progress.tables.some((t) => t.currentBlock !== undefined && t.currentBlock > 0)),
Effect.catchAllCause(() => Effect.succeed(false))
)
if (synced) {
return
}
}

return yield* Effect.die(
new Error(
"Sync progress did not show blocks after 60 attempts"
)
)
}
)
Loading
Loading