diff --git a/.cspell.json b/.cspell.json index 9182b0d..779f066 100644 --- a/.cspell.json +++ b/.cspell.json @@ -103,6 +103,8 @@ "unintuitive", "unpruned", "upserting", + "UTXO", + "UTXOs", "verack", "xprivkey", "xpubkey" diff --git a/.gitignore b/.gitignore index aa314ef..249d16d 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ package-lock.json data .env client/generated +.scratch diff --git a/charts/chaingraph/templates/agent.yaml b/charts/chaingraph/templates/agent.yaml index 5b1c17f..2388432 100644 --- a/charts/chaingraph/templates/agent.yaml +++ b/charts/chaingraph/templates/agent.yaml @@ -41,6 +41,8 @@ spec: value: {{ .Values.agent.synchronousCommit | quote }} - name: CHAINGRAPH_BLOCK_BUFFER_TARGET_SIZE_MB value: {{ .Values.agent.blockBufferTargetSizeMb | quote }} + - name: CHAINGRAPH_INCOMPLETE_BLOCK_REPAIR_BATCH_SIZE + value: {{ .Values.agent.repairBlocksBatchSize | quote }} - name: CHAINGRAPH_GENESIS_BLOCKS value: {{ .Values.agent.genesisBlocks | quote }} - name: CHAINGRAPH_TRUSTED_NODES diff --git a/charts/chaingraph/values.yaml b/charts/chaingraph/values.yaml index 8887127..68616ce 100644 --- a/charts/chaingraph/values.yaml +++ b/charts/chaingraph/values.yaml @@ -18,12 +18,15 @@ agent: # The maximum number of connections the agent should maintain to the database. # For best performance, this should be set to the number of CPUs available to Postgres. If not set, Chaingraph will assume that Postgres is running on hardware equivalent to its own. (This is ideal if Postgres is running on either the same machine or an equivalent one from a homogenous Kubernetes node pool.) maxConnections: '' -# If set to false, the Postgres database will be configured to use "synchronous_commit = off" during initial sync. -# In real-world testing, this usually reduces the speed of Chaingraph's initial sync, so Chaingraph leaves "synchronous_commit = on" by default. + # If set to false, the Postgres database will be configured to use "synchronous_commit = off" during initial sync. + # In real-world testing, this usually reduces the speed of Chaingraph's initial sync, so Chaingraph leaves "synchronous_commit = on" by default. synchronousCommit: true # The target size (in MB) of the buffer which holds downloaded blocks waiting to be saved to the database. This primarily affects memory usage during the initial chain sync. # For best performance, this should be around `maxConnections * maximum block size`, while leaving enough memory available to the host machine. If left unset (recommended), Chaingraph will measure free memory at startup and attempt to select a reasonable value. blockBufferTargetSizeMb: '' + # Self-healing for https://github.com/bitauth/chaingraph/issues/74: after initial sync, the agent scans saved blocks in batches to find blocks whose linked transactions don't match the saved block size, then repairs those blocks by re-requesting them from trusted nodes. + # Set to 0 to disable the startup repair task. + repairBlocksBatchSize: 10000 # A mapping of network magic bytes to hex-encoded genesis blocks. # Format: `NETWORK_MAGIC:RAW_GENESIS_BLOCK_HEX`, comma separated. # E.g. CHAINGRAPH_GENESIS_BLOCKS=e3e1f3e8:rawblockhex,deadbeef:rawblockhex diff --git a/defaults.env b/defaults.env index f6f87fc..efb79e7 100644 --- a/defaults.env +++ b/defaults.env @@ -16,6 +16,21 @@ CHAINGRAPH_POSTGRES_SYNCHRONOUS_COMMIT=true # For best performance, this should be around `CHAINGRAPH_POSTGRES_MAX_CONNECTIONS * maximum block size`, while leaving enough memory available to the host machine. If not set, Chaingraph will measure free memory at startup and attempt to select a reasonable value. CHAINGRAPH_BLOCK_BUFFER_TARGET_SIZE_MB= +# Self-healing for https://github.com/bitauth/chaingraph/issues/74: after +# initial sync, the agent scans saved blocks in batches to find blocks whose +# linked transactions don't match the saved block size, then repairs those +# blocks by re-requesting them from trusted nodes. Set to 0 to disable the +# startup repair task. +CHAINGRAPH_INCOMPLETE_BLOCK_REPAIR_BATCH_SIZE=10000 + +# BCHN expires transactions from the mempool after 336 hours but does not announce +# those expirations. At initial sync and periodically thereafter the agent scans for +# node_transaction rows expiring soon and schedules each row to be archived to +# node_transaction_history at its exact expiry time. The existing mempool +# invalidation cascade archives same-node descendants. See bitauth/chaingraph#73. +CHAINGRAPH_MEMPOOL_TRANSACTION_EXPIRATION_MS=1209600000 +CHAINGRAPH_MEMPOOL_TRANSACTION_EXPIRATION_SCAN_INTERVAL_MS=3600000 + # A mapping of network magic bytes to hex-encoded genesis blocks. # Format: `NETWORK_MAGIC:RAW_GENESIS_BLOCK_HEX`, comma separated. # E.g. CHAINGRAPH_GENESIS_BLOCKS=e3e1f3e8:rawblockhex,deadbeef:rawblockhex diff --git a/images/bitcoin-cash-node/Dockerfile b/images/bitcoin-cash-node/Dockerfile index 55008df..b84ac54 100644 --- a/images/bitcoin-cash-node/Dockerfile +++ b/images/bitcoin-cash-node/Dockerfile @@ -7,9 +7,9 @@ RUN set -ex \ && apt-get install -qq --no-install-recommends ca-certificates gosu wget \ && rm -rf /var/lib/apt/lists/* -ENV BITCOIN_VERSION 28.0.2 -ENV BITCOIN_URL https://download.bitcoincashnode.org/misc/builds/upgrade12_temp/linux/bitcoin-cash-node-28.0.2-x86_64-linux-gnu.tar.gz -ENV BITCOIN_SHA256 140b44fd76a4f9428354bfbec4800d58fd39fb723320e761a035f15c2dd43596 +ENV BITCOIN_VERSION 29.0.0 +ENV BITCOIN_URL https://github.com/bitcoin-cash-node/bitcoin-cash-node/releases/download/v29.0.0/bitcoin-cash-node-29.0.0-x86_64-linux-gnu.tar.gz +ENV BITCOIN_SHA256 6125d1cbecc1db476f2b6b7b91da5acde92d2311b8e738124e3db64ca84b33e1 # install bitcoin binaries RUN set -ex \ diff --git a/images/hasura/hasura-data/migrations/default/1778151011521_cascade_invalidate_mempool_descendants/down.sql b/images/hasura/hasura-data/migrations/default/1778151011521_cascade_invalidate_mempool_descendants/down.sql new file mode 100644 index 0000000..6def7fc --- /dev/null +++ b/images/hasura/hasura-data/migrations/default/1778151011521_cascade_invalidate_mempool_descendants/down.sql @@ -0,0 +1,2 @@ +DROP TRIGGER IF EXISTS trigger_public_node_transaction_history_insert ON node_transaction_history; +DROP FUNCTION IF EXISTS trigger_node_transaction_history_insert(); diff --git a/images/hasura/hasura-data/migrations/default/1778151011521_cascade_invalidate_mempool_descendants/up.sql b/images/hasura/hasura-data/migrations/default/1778151011521_cascade_invalidate_mempool_descendants/up.sql new file mode 100644 index 0000000..b837b5b --- /dev/null +++ b/images/hasura/hasura-data/migrations/default/1778151011521_cascade_invalidate_mempool_descendants/up.sql @@ -0,0 +1,105 @@ +CREATE OR REPLACE FUNCTION trigger_node_transaction_history_insert() RETURNS trigger + LANGUAGE plpgsql +AS $$ +BEGIN + /* + * The recursive CTE archives the full descendant set in one pass. The + * archive INSERT below re-fires this trigger, but that re-entry should return + * immediately rather than attempting another cascade. + */ + IF current_setting('chaingraph.suppress_mempool_descendant_cascade', true) = 'on' THEN + RETURN NULL; + END IF; + + -- Confirmations and empty batches do not invalidate descendants. + IF NOT EXISTS (SELECT 1 FROM new_table WHERE replaced_at IS NOT NULL) THEN + RETURN NULL; + END IF; + + PERFORM set_config('chaingraph.suppress_mempool_descendant_cascade', 'on', true); + + BEGIN + /* + * If another session deletes matching descendants before this DELETE + * reaches them, this archives zero rows. Suppression still prevents empty + * self-reentry. + */ + WITH RECURSIVE descendant_transactions AS ( + -- Seed: mempool transactions spending outputs of newly replaced parents. + SELECT nt.node_internal_id, + nt.transaction_internal_id, + nt.validated_at, + replaced_parents.replaced_at + FROM new_table replaced_parents + INNER JOIN transaction parent_transaction + ON parent_transaction.internal_id = replaced_parents.transaction_internal_id + INNER JOIN output parent_output + ON parent_output.transaction_hash = parent_transaction.hash + INNER JOIN input + ON input.outpoint_transaction_hash = parent_output.transaction_hash + AND input.outpoint_index = parent_output.output_index + INNER JOIN node_transaction nt + ON nt.transaction_internal_id = input.transaction_internal_id + AND nt.node_internal_id = replaced_parents.node_internal_id + WHERE replaced_parents.replaced_at IS NOT NULL + + UNION + + -- Recursive step: mempool transactions spending outputs of descendants. + SELECT child_nt.node_internal_id, + child_nt.transaction_internal_id, + child_nt.validated_at, + parent_descendants.replaced_at + FROM descendant_transactions parent_descendants + INNER JOIN transaction parent_transaction + ON parent_transaction.internal_id = parent_descendants.transaction_internal_id + INNER JOIN output parent_output + ON parent_output.transaction_hash = parent_transaction.hash + INNER JOIN input + ON input.outpoint_transaction_hash = parent_output.transaction_hash + AND input.outpoint_index = parent_output.output_index + INNER JOIN node_transaction child_nt + ON child_nt.transaction_internal_id = input.transaction_internal_id + AND child_nt.node_internal_id = parent_descendants.node_internal_id + ), + descendants AS ( + -- If reachable through multiple replaced parents, use earliest invalidation. + SELECT node_internal_id, + transaction_internal_id, + validated_at, + MIN(replaced_at) AS replaced_at + FROM descendant_transactions + GROUP BY node_internal_id, transaction_internal_id, validated_at + ), + deleted_descendants AS ( + DELETE FROM node_transaction + USING descendants + WHERE node_transaction.node_internal_id = descendants.node_internal_id + AND node_transaction.transaction_internal_id = descendants.transaction_internal_id + RETURNING node_transaction.node_internal_id, + node_transaction.transaction_internal_id, + node_transaction.validated_at, + descendants.replaced_at + ) + INSERT INTO node_transaction_history (node_internal_id, transaction_internal_id, validated_at, replaced_at) + SELECT node_internal_id, transaction_internal_id, validated_at, replaced_at + FROM deleted_descendants; + EXCEPTION WHEN OTHERS THEN + PERFORM set_config('chaingraph.suppress_mempool_descendant_cascade', 'off', true); + RAISE; + END; + + PERFORM set_config('chaingraph.suppress_mempool_descendant_cascade', 'off', true); + RETURN NULL; +END; +$$; + +CREATE TRIGGER trigger_public_node_transaction_history_insert + AFTER INSERT ON node_transaction_history + REFERENCING NEW TABLE AS new_table + FOR EACH STATEMENT EXECUTE FUNCTION trigger_node_transaction_history_insert(); +COMMENT ON TRIGGER trigger_public_node_transaction_history_insert ON node_transaction_history + IS 'Cascades mempool invalidation recursively: when a node_transaction is archived to history with replaced_at set, all same-node descendants still present in node_transaction are archived with a deterministic replaced_at timestamp.'; + +-- disabled until initial sync is complete (when mempool transactions begin to be accepted) +ALTER TABLE node_transaction_history DISABLE TRIGGER trigger_public_node_transaction_history_insert; diff --git a/images/hasura/hasura-data/migrations/default/1778158619747_backfill_orphan_mempool_descendants/down.sql b/images/hasura/hasura-data/migrations/default/1778158619747_backfill_orphan_mempool_descendants/down.sql new file mode 100644 index 0000000..918fd2d --- /dev/null +++ b/images/hasura/hasura-data/migrations/default/1778158619747_backfill_orphan_mempool_descendants/down.sql @@ -0,0 +1,2 @@ +-- This migration archives orphaned mempool descendants into +-- node_transaction_history. The data move is intentionally not reversible. diff --git a/images/hasura/hasura-data/migrations/default/1778158619747_backfill_orphan_mempool_descendants/up.sql b/images/hasura/hasura-data/migrations/default/1778158619747_backfill_orphan_mempool_descendants/up.sql new file mode 100644 index 0000000..d90a627 --- /dev/null +++ b/images/hasura/hasura-data/migrations/default/1778158619747_backfill_orphan_mempool_descendants/up.sql @@ -0,0 +1,88 @@ +DO $$ +DECLARE + previous_suppression text := current_setting('chaingraph.suppress_mempool_descendant_cascade', true); +BEGIN + /* + * The backfill archives the full existing orphan set itself. If the + * node_transaction_history trigger is enabled while this migration runs, + * suppress its re-entry from the archive INSERT below. + */ + PERFORM set_config('chaingraph.suppress_mempool_descendant_cascade', 'on', true); + + BEGIN + WITH RECURSIVE orphan_transactions AS ( + -- Seed: mempool transactions whose parent was already archived as replaced. + SELECT nt.node_internal_id, + nt.transaction_internal_id, + nt.validated_at, + nth.replaced_at + FROM node_transaction nt + INNER JOIN input child_input + ON child_input.transaction_internal_id = nt.transaction_internal_id + INNER JOIN transaction parent_transaction + ON parent_transaction.hash = child_input.outpoint_transaction_hash + INNER JOIN output parent_output + ON parent_output.transaction_hash = parent_transaction.hash + AND parent_output.output_index = child_input.outpoint_index + INNER JOIN node_transaction_history nth + ON nth.transaction_internal_id = parent_transaction.internal_id + AND nth.node_internal_id = nt.node_internal_id + WHERE nth.replaced_at IS NOT NULL + + UNION + + -- Recursive step: mempool transactions spending outputs of known orphans. + SELECT child_nt.node_internal_id, + child_nt.transaction_internal_id, + child_nt.validated_at, + parent_orphans.replaced_at + FROM orphan_transactions parent_orphans + INNER JOIN transaction parent_transaction + ON parent_transaction.internal_id = parent_orphans.transaction_internal_id + INNER JOIN output parent_output + ON parent_output.transaction_hash = parent_transaction.hash + INNER JOIN input child_input + ON child_input.outpoint_transaction_hash = parent_output.transaction_hash + AND child_input.outpoint_index = parent_output.output_index + INNER JOIN node_transaction child_nt + ON child_nt.transaction_internal_id = child_input.transaction_internal_id + AND child_nt.node_internal_id = parent_orphans.node_internal_id + ), + orphans AS ( + -- If reachable through multiple replaced parents, use earliest invalidation. + SELECT node_internal_id, + transaction_internal_id, + validated_at, + MIN(replaced_at) AS replaced_at + FROM orphan_transactions + GROUP BY node_internal_id, transaction_internal_id, validated_at + ), + deleted_orphans AS ( + DELETE FROM node_transaction + USING orphans + WHERE node_transaction.node_internal_id = orphans.node_internal_id + AND node_transaction.transaction_internal_id = orphans.transaction_internal_id + RETURNING node_transaction.node_internal_id, + node_transaction.transaction_internal_id, + node_transaction.validated_at, + orphans.replaced_at + ) + INSERT INTO node_transaction_history (node_internal_id, transaction_internal_id, validated_at, replaced_at) + SELECT node_internal_id, transaction_internal_id, validated_at, replaced_at + FROM deleted_orphans; + EXCEPTION WHEN OTHERS THEN + PERFORM set_config( + 'chaingraph.suppress_mempool_descendant_cascade', + COALESCE(previous_suppression, 'off'), + true + ); + RAISE; + END; + + PERFORM set_config( + 'chaingraph.suppress_mempool_descendant_cascade', + COALESCE(previous_suppression, 'off'), + true + ); +END; +$$; diff --git a/images/hasura/hasura-data/migrations/default/1778351597200_fix_block_encoded_transaction_count/down.sql b/images/hasura/hasura-data/migrations/default/1778351597200_fix_block_encoded_transaction_count/down.sql new file mode 100644 index 0000000..6eb242d --- /dev/null +++ b/images/hasura/hasura-data/migrations/default/1778351597200_fix_block_encoded_transaction_count/down.sql @@ -0,0 +1,25 @@ +CREATE OR REPLACE FUNCTION encode_block(block_row block) RETURNS bytea + LANGUAGE plpgsql IMMUTABLE +AS $$ +DECLARE + transactions CURSOR FOR SELECT transaction.* FROM transaction + INNER JOIN block_transaction ON transaction.internal_id = block_transaction.transaction_internal_id + WHERE block_transaction.block_internal_id = block_row.internal_id + ORDER BY block_transaction.transaction_index ASC; + encoded_block bytea := encode_block_header(block_row) || encode_compact_uint(COUNT(transactions)); +BEGIN + FOR transaction_row IN transactions + LOOP + encoded_block := encoded_block || + encode_transaction(ROW( + transaction_row.internal_id, + transaction_row.hash, + transaction_row.version, + transaction_row.locktime, + transaction_row.size_bytes, + transaction_row.is_coinbase)::transaction + ); + END LOOP; + RETURN encoded_block; +END; +$$; diff --git a/images/hasura/hasura-data/migrations/default/1778351597200_fix_block_encoded_transaction_count/up.sql b/images/hasura/hasura-data/migrations/default/1778351597200_fix_block_encoded_transaction_count/up.sql new file mode 100644 index 0000000..4e572ee --- /dev/null +++ b/images/hasura/hasura-data/migrations/default/1778351597200_fix_block_encoded_transaction_count/up.sql @@ -0,0 +1,30 @@ +CREATE OR REPLACE FUNCTION encode_block(block_row block) RETURNS bytea + LANGUAGE plpgsql IMMUTABLE +AS $$ +DECLARE + transactions CURSOR FOR SELECT transaction.* FROM transaction + INNER JOIN block_transaction ON transaction.internal_id = block_transaction.transaction_internal_id + WHERE block_transaction.block_internal_id = block_row.internal_id + ORDER BY block_transaction.transaction_index ASC; + encoded_block bytea := encode_block_header(block_row); + transaction_count bigint := 0; +BEGIN + SELECT COUNT(*) INTO transaction_count + FROM block_transaction + WHERE block_transaction.block_internal_id = block_row.internal_id; + encoded_block := encoded_block || encode_compact_uint(transaction_count); + FOR transaction_row IN transactions + LOOP + encoded_block := encoded_block || + encode_transaction(ROW( + transaction_row.internal_id, + transaction_row.hash, + transaction_row.version, + transaction_row.locktime, + transaction_row.size_bytes, + transaction_row.is_coinbase)::transaction + ); + END LOOP; + RETURN encoded_block; +END; +$$; diff --git a/images/hasura/hasura-data/migrations/default/1778415174939_fix_data_carrier_outputs_empty_bytecode/down.sql b/images/hasura/hasura-data/migrations/default/1778415174939_fix_data_carrier_outputs_empty_bytecode/down.sql new file mode 100644 index 0000000..7608b37 --- /dev/null +++ b/images/hasura/hasura-data/migrations/default/1778415174939_fix_data_carrier_outputs_empty_bytecode/down.sql @@ -0,0 +1,5 @@ +CREATE OR REPLACE FUNCTION transaction_data_carrier_outputs(transaction_row transaction) RETURNS SETOF output + LANGUAGE sql IMMUTABLE +AS $$ + SELECT * FROM output WHERE transaction_hash = $1.hash AND (value_satoshis = 0 OR get_byte(locking_bytecode, 0) = 106); +$$; diff --git a/images/hasura/hasura-data/migrations/default/1778415174939_fix_data_carrier_outputs_empty_bytecode/up.sql b/images/hasura/hasura-data/migrations/default/1778415174939_fix_data_carrier_outputs_empty_bytecode/up.sql new file mode 100644 index 0000000..b1cdd13 --- /dev/null +++ b/images/hasura/hasura-data/migrations/default/1778415174939_fix_data_carrier_outputs_empty_bytecode/up.sql @@ -0,0 +1,5 @@ +CREATE OR REPLACE FUNCTION transaction_data_carrier_outputs(transaction_row transaction) RETURNS SETOF output + LANGUAGE sql IMMUTABLE +AS $$ + SELECT * FROM output WHERE transaction_hash = $1.hash AND (value_satoshis = 0 OR (octet_length(locking_bytecode) > 0 AND get_byte(locking_bytecode, 0) = 106)); +$$; diff --git a/images/hasura/hasura-data/migrations/default/1778429124205_fix_coinbase_only_value_aggregates/down.sql b/images/hasura/hasura-data/migrations/default/1778429124205_fix_coinbase_only_value_aggregates/down.sql new file mode 100644 index 0000000..1562de5 --- /dev/null +++ b/images/hasura/hasura-data/migrations/default/1778429124205_fix_coinbase_only_value_aggregates/down.sql @@ -0,0 +1,59 @@ +CREATE OR REPLACE FUNCTION transaction_input_value_satoshis(transaction_row transaction) RETURNS bigint + LANGUAGE sql IMMUTABLE +AS $$ + SELECT SUM(inputs.input_value_satoshis)::bigint FROM ( + SELECT input_value_satoshis (input) FROM input WHERE transaction_internal_id = transaction_row.internal_id + ) as "inputs" +$$; + +CREATE OR REPLACE FUNCTION block_input_count(block_row block) RETURNS bigint + LANGUAGE sql IMMUTABLE +AS $$ + SELECT SUM(transactions.transaction_input_count)::bigint FROM ( + SELECT transaction_input_count (transaction) FROM transaction WHERE internal_id IN (SELECT transaction_internal_id from block_transaction WHERE block_internal_id = block_row.internal_id) + ) as "transactions" +$$; + +CREATE OR REPLACE FUNCTION block_input_value_satoshis(block_row block) RETURNS bigint + LANGUAGE sql IMMUTABLE +AS $$ + SELECT SUM(transactions.transaction_input_value_satoshis)::bigint FROM ( + SELECT transaction_input_value_satoshis (transaction) FROM transaction WHERE internal_id IN (SELECT transaction_internal_id from block_transaction WHERE block_internal_id = block_row.internal_id) + ) as "transactions" +$$; + +CREATE OR REPLACE FUNCTION block_output_count(block_row block) RETURNS bigint + LANGUAGE sql IMMUTABLE +AS $$ + SELECT SUM(transactions.transaction_output_count)::bigint FROM ( + SELECT transaction_output_count (transaction) FROM transaction WHERE internal_id IN (SELECT transaction_internal_id from block_transaction WHERE block_internal_id = block_row.internal_id) + ) as "transactions" +$$; + +CREATE OR REPLACE FUNCTION block_output_value_satoshis(block_row block) RETURNS bigint + LANGUAGE sql IMMUTABLE +AS $$ + SELECT SUM(transactions.transaction_output_value_satoshis)::bigint FROM ( + SELECT transaction_output_value_satoshis (transaction) FROM transaction WHERE internal_id IN (SELECT transaction_internal_id from block_transaction WHERE block_internal_id = block_row.internal_id) + ) as "transactions" +$$; + +CREATE OR REPLACE FUNCTION transaction_fee_satoshis(transaction_row transaction) RETURNS bigint + LANGUAGE sql IMMUTABLE +AS $$ + SELECT transaction_input_value_satoshis(transaction_row) - transaction_output_value_satoshis(transaction_row) +$$; + +CREATE OR REPLACE FUNCTION block_fee_satoshis(block_row block) RETURNS bigint + LANGUAGE sql IMMUTABLE +AS $$ + SELECT SUM(transactions.transaction_fee_satoshis)::bigint FROM ( + SELECT transaction_fee_satoshis (transaction) FROM transaction WHERE internal_id IN (SELECT transaction_internal_id from block_transaction WHERE block_internal_id = block_row.internal_id ) + ) as "transactions" +$$; + +CREATE OR REPLACE FUNCTION block_generated_value_satoshis(block_row block) RETURNS bigint + LANGUAGE sql IMMUTABLE +AS $$ + SELECT (block_output_value_satoshis(block) - block_input_value_satoshis(block))::bigint FROM block WHERE internal_id = block_row.internal_id; +$$; diff --git a/images/hasura/hasura-data/migrations/default/1778429124205_fix_coinbase_only_value_aggregates/up.sql b/images/hasura/hasura-data/migrations/default/1778429124205_fix_coinbase_only_value_aggregates/up.sql new file mode 100644 index 0000000..b604537 --- /dev/null +++ b/images/hasura/hasura-data/migrations/default/1778429124205_fix_coinbase_only_value_aggregates/up.sql @@ -0,0 +1,62 @@ +CREATE OR REPLACE FUNCTION transaction_input_value_satoshis(transaction_row transaction) RETURNS bigint + LANGUAGE sql IMMUTABLE +AS $$ + SELECT COALESCE(SUM(inputs.input_value_satoshis), 0)::bigint FROM ( + SELECT input_value_satoshis (input) FROM input WHERE transaction_internal_id = transaction_row.internal_id + ) as "inputs" +$$; + +CREATE OR REPLACE FUNCTION block_input_count(block_row block) RETURNS bigint + LANGUAGE sql IMMUTABLE +AS $$ + SELECT COALESCE(SUM(transactions.transaction_input_count), 0)::bigint FROM ( + SELECT transaction_input_count (transaction) FROM transaction WHERE internal_id IN (SELECT transaction_internal_id from block_transaction WHERE block_internal_id = block_row.internal_id) + ) as "transactions" +$$; + +CREATE OR REPLACE FUNCTION block_input_value_satoshis(block_row block) RETURNS bigint + LANGUAGE sql IMMUTABLE +AS $$ + SELECT COALESCE(SUM(transactions.transaction_input_value_satoshis), 0)::bigint FROM ( + SELECT transaction_input_value_satoshis (transaction) FROM transaction WHERE internal_id IN (SELECT transaction_internal_id from block_transaction WHERE block_internal_id = block_row.internal_id) + ) as "transactions" +$$; + +CREATE OR REPLACE FUNCTION block_output_count(block_row block) RETURNS bigint + LANGUAGE sql IMMUTABLE +AS $$ + SELECT COALESCE(SUM(transactions.transaction_output_count), 0)::bigint FROM ( + SELECT transaction_output_count (transaction) FROM transaction WHERE internal_id IN (SELECT transaction_internal_id from block_transaction WHERE block_internal_id = block_row.internal_id) + ) as "transactions" +$$; + +CREATE OR REPLACE FUNCTION block_output_value_satoshis(block_row block) RETURNS bigint + LANGUAGE sql IMMUTABLE +AS $$ + SELECT COALESCE(SUM(transactions.transaction_output_value_satoshis), 0)::bigint FROM ( + SELECT transaction_output_value_satoshis (transaction) FROM transaction WHERE internal_id IN (SELECT transaction_internal_id from block_transaction WHERE block_internal_id = block_row.internal_id) + ) as "transactions" +$$; + +CREATE OR REPLACE FUNCTION transaction_fee_satoshis(transaction_row transaction) RETURNS bigint + LANGUAGE sql IMMUTABLE +AS $$ + SELECT CASE + WHEN transaction_row.is_coinbase THEN 0 + ELSE transaction_input_value_satoshis(transaction_row) - transaction_output_value_satoshis(transaction_row) + END +$$; + +CREATE OR REPLACE FUNCTION block_fee_satoshis(block_row block) RETURNS bigint + LANGUAGE sql IMMUTABLE +AS $$ + SELECT COALESCE(SUM(transactions.transaction_fee_satoshis), 0)::bigint FROM ( + SELECT transaction_fee_satoshis (transaction) FROM transaction WHERE internal_id IN (SELECT transaction_internal_id from block_transaction WHERE block_internal_id = block_row.internal_id ) + ) as "transactions" +$$; + +CREATE OR REPLACE FUNCTION block_generated_value_satoshis(block_row block) RETURNS bigint + LANGUAGE sql IMMUTABLE +AS $$ + SELECT (block_output_value_satoshis(block) - block_input_value_satoshis(block))::bigint FROM block WHERE internal_id = block_row.internal_id; +$$; diff --git a/images/hasura/hasura-data/migrations/default/1778435997270_add_node_transaction_history_primary_key/down.sql b/images/hasura/hasura-data/migrations/default/1778435997270_add_node_transaction_history_primary_key/down.sql new file mode 100644 index 0000000..6759492 --- /dev/null +++ b/images/hasura/hasura-data/migrations/default/1778435997270_add_node_transaction_history_primary_key/down.sql @@ -0,0 +1,2 @@ +ALTER TABLE ONLY node_transaction_history + DROP CONSTRAINT node_transaction_history_pkey; diff --git a/images/hasura/hasura-data/migrations/default/1778435997270_add_node_transaction_history_primary_key/up.sql b/images/hasura/hasura-data/migrations/default/1778435997270_add_node_transaction_history_primary_key/up.sql new file mode 100644 index 0000000..8428325 --- /dev/null +++ b/images/hasura/hasura-data/migrations/default/1778435997270_add_node_transaction_history_primary_key/up.sql @@ -0,0 +1,2 @@ +ALTER TABLE ONLY node_transaction_history + ADD CONSTRAINT node_transaction_history_pkey PRIMARY KEY (internal_id); diff --git a/images/hasura/hasura-data/migrations/default/1778437612917_fix_zero_length_pushdata_patterns/down.sql b/images/hasura/hasura-data/migrations/default/1778437612917_fix_zero_length_pushdata_patterns/down.sql new file mode 100644 index 0000000..6e599ca --- /dev/null +++ b/images/hasura/hasura-data/migrations/default/1778437612917_fix_zero_length_pushdata_patterns/down.sql @@ -0,0 +1,52 @@ +CREATE OR REPLACE FUNCTION parse_bytecode_pattern_with_pushdata_lengths(bytecode bytea) RETURNS bytea + LANGUAGE plpgsql IMMUTABLE +AS $$ +DECLARE + pattern bytea := '\x'::bytea; + selected_byte integer; + scratch bytea; + i integer := 0; + bytecode_length integer := octet_length(bytecode); +BEGIN + WHILE i < bytecode_length LOOP + selected_byte := get_byte(bytecode, i); + pattern := pattern || substring(bytecode from (i + 1) for 1); + IF selected_byte > 78 OR selected_byte = 0 THEN + -- OP_0 (0) and all opcodes after OP_PUSHDATA_4 (78) are single-byte instructions + i := i + 1; + ELSIF selected_byte > 0 AND selected_byte <= 75 THEN + -- OP_PUSHBYTES_1 (1) through OP_PUSHBYTES_75 (75) directly indicate the length of pushed data + i := i + 1 + selected_byte; + ELSIF selected_byte = 76 THEN + IF bytecode_length - i < 3 THEN + -- malformed, return immediately + RETURN pattern; + END IF; + -- OP_PUSHDATA_1 reads one length-byte + pattern := pattern || substring(bytecode from (i + 2) for 1); -- append length byte + i := i + 2 + get_byte(bytecode, (i + 1)); + ELSIF selected_byte = 77 THEN + IF bytecode_length - i < 4 THEN + -- malformed, return immediately + RETURN pattern; + END IF; + -- OP_PUSHDATA_2 reads two length-bytes + scratch := substring(bytecode from (i + 2) for 2); + pattern := pattern || scratch; -- append length bytes + -- parse scratch as unsigned, two byte, little-endian number: + i := i + 3 + ((get_byte(scratch, 1) << 8) | get_byte(scratch, 0)); + ELSIF selected_byte = 78 THEN + IF bytecode_length - i < 6 THEN + -- malformed, return immediately + RETURN pattern; + END IF; + -- OP_PUSHDATA_4 reads four length-bytes + scratch := substring(bytecode from (i + 2) for 4); + pattern := pattern || scratch; -- append length bytes + -- parse scratch as unsigned, four byte, little-endian number: + i := i + 5 + ((get_byte(scratch, 3) << 24) | (get_byte(scratch, 2) << 16) | (get_byte(scratch, 1) << 8) | get_byte(scratch, 0)); + END IF; + END LOOP; + RETURN pattern; +END; +$$; diff --git a/images/hasura/hasura-data/migrations/default/1778437612917_fix_zero_length_pushdata_patterns/up.sql b/images/hasura/hasura-data/migrations/default/1778437612917_fix_zero_length_pushdata_patterns/up.sql new file mode 100644 index 0000000..34b4876 --- /dev/null +++ b/images/hasura/hasura-data/migrations/default/1778437612917_fix_zero_length_pushdata_patterns/up.sql @@ -0,0 +1,52 @@ +CREATE OR REPLACE FUNCTION parse_bytecode_pattern_with_pushdata_lengths(bytecode bytea) RETURNS bytea + LANGUAGE plpgsql IMMUTABLE +AS $$ +DECLARE + pattern bytea := '\x'::bytea; + selected_byte integer; + scratch bytea; + i integer := 0; + bytecode_length integer := octet_length(bytecode); +BEGIN + WHILE i < bytecode_length LOOP + selected_byte := get_byte(bytecode, i); + pattern := pattern || substring(bytecode from (i + 1) for 1); + IF selected_byte > 78 OR selected_byte = 0 THEN + -- OP_0 (0) and all opcodes after OP_PUSHDATA_4 (78) are single-byte instructions + i := i + 1; + ELSIF selected_byte > 0 AND selected_byte <= 75 THEN + -- OP_PUSHBYTES_1 (1) through OP_PUSHBYTES_75 (75) directly indicate the length of pushed data + i := i + 1 + selected_byte; + ELSIF selected_byte = 76 THEN + IF bytecode_length - i < 2 THEN + -- malformed, return immediately + RETURN pattern; + END IF; + -- OP_PUSHDATA_1 reads one length-byte + pattern := pattern || substring(bytecode from (i + 2) for 1); -- append length byte + i := i + 2 + get_byte(bytecode, (i + 1)); + ELSIF selected_byte = 77 THEN + IF bytecode_length - i < 3 THEN + -- malformed, return immediately + RETURN pattern; + END IF; + -- OP_PUSHDATA_2 reads two length-bytes + scratch := substring(bytecode from (i + 2) for 2); + pattern := pattern || scratch; -- append length bytes + -- parse scratch as unsigned, two byte, little-endian number: + i := i + 3 + ((get_byte(scratch, 1) << 8) | get_byte(scratch, 0)); + ELSIF selected_byte = 78 THEN + IF bytecode_length - i < 5 THEN + -- malformed, return immediately + RETURN pattern; + END IF; + -- OP_PUSHDATA_4 reads four length-bytes + scratch := substring(bytecode from (i + 2) for 4); + pattern := pattern || scratch; -- append length bytes + -- parse scratch as unsigned, four byte, little-endian number: + i := i + 5 + ((get_byte(scratch, 3) << 24) | (get_byte(scratch, 2) << 16) | (get_byte(scratch, 1) << 8) | get_byte(scratch, 0)); + END IF; + END LOOP; + RETURN pattern; +END; +$$; diff --git a/images/hasura/hasura-data/migrations/default/1778438318512_fix_redeem_bytecode_parser/down.sql b/images/hasura/hasura-data/migrations/default/1778438318512_fix_redeem_bytecode_parser/down.sql new file mode 100644 index 0000000..beea754 --- /dev/null +++ b/images/hasura/hasura-data/migrations/default/1778438318512_fix_redeem_bytecode_parser/down.sql @@ -0,0 +1,49 @@ +CREATE OR REPLACE FUNCTION parse_bytecode_pattern_redeem(bytecode bytea) RETURNS bytea + LANGUAGE plpgsql IMMUTABLE +AS $$ +DECLARE + maybe_redeem bytea; + selected_byte integer; + length_value integer; + scratch bytea; + i integer := 0; + bytecode_length integer := octet_length(bytecode); +BEGIN + WHILE i < bytecode_length LOOP + selected_byte := get_byte(bytecode, i); + IF selected_byte > 78 OR selected_byte = 0 THEN + -- OP_0 (0) and all opcodes after OP_PUSHDATA_4 (78) are single-byte instructions + i := i + 1; + maybe_redeem := NULL; + ELSIF selected_byte > 0 AND selected_byte <= 75 THEN + -- OP_PUSHBYTES_1 (1) through OP_PUSHBYTES_75 (75) directly indicate the length of pushed data + maybe_redeem := substring(bytecode from (i + 2) for selected_byte); + i := i + 1 + selected_byte; + ELSIF selected_byte = 76 THEN + -- OP_PUSHDATA_1 reads one length-byte + length_value := get_byte(bytecode, (i + 1)); + maybe_redeem := substring(bytecode from (i + 3) for length_value); + i := i + 2 + length_value; + ELSIF selected_byte = 77 THEN + -- OP_PUSHDATA_2 reads two length-bytes + scratch := substring(bytecode from (i + 2) for 2); + -- parse scratch as unsigned, two byte, little-endian number: + length_value := ((get_byte(scratch, 1) << 8) | get_byte(scratch, 0)); + maybe_redeem := substring(bytecode from (i + 4) for length_value); + i := i + 3 + length_value; + ELSIF selected_byte = 78 THEN + -- OP_PUSHDATA_4 reads four length-bytes + scratch := substring(bytecode from (i + 2) for 4); + -- parse scratch as unsigned, four byte, little-endian number: + length_value := ((get_byte(scratch, 3) << 24) | (get_byte(scratch, 2) << 16) | (get_byte(scratch, 1) << 8) | get_byte(scratch, 0)); + maybe_redeem := substring(bytecode from (i + 6) for length_value); + i := i + 5 + length_value; + END IF; + END LOOP; + IF maybe_redeem = NULL THEN + RETURN maybe_redeem; + ELSE + RETURN parse_bytecode_pattern(maybe_redeem); + END IF; +END; +$$; diff --git a/images/hasura/hasura-data/migrations/default/1778438318512_fix_redeem_bytecode_parser/up.sql b/images/hasura/hasura-data/migrations/default/1778438318512_fix_redeem_bytecode_parser/up.sql new file mode 100644 index 0000000..982c46b --- /dev/null +++ b/images/hasura/hasura-data/migrations/default/1778438318512_fix_redeem_bytecode_parser/up.sql @@ -0,0 +1,81 @@ +CREATE OR REPLACE FUNCTION parse_bytecode_pattern_redeem(bytecode bytea) RETURNS bytea + LANGUAGE plpgsql IMMUTABLE +AS $$ +DECLARE + maybe_redeem bytea; + selected_byte integer; + length_value integer; + scratch bytea; + i integer := 0; + bytecode_length integer := octet_length(bytecode); +BEGIN + WHILE i < bytecode_length LOOP + selected_byte := get_byte(bytecode, i); + IF selected_byte > 78 OR selected_byte = 0 THEN + -- OP_0 (0) and all opcodes after OP_PUSHDATA_4 (78) are single-byte instructions + i := i + 1; + maybe_redeem := NULL; + ELSIF selected_byte > 0 AND selected_byte <= 75 THEN + -- OP_PUSHBYTES_1 (1) through OP_PUSHBYTES_75 (75) directly indicate the length of pushed data + IF bytecode_length - i < 1 + selected_byte THEN + -- malformed, no redeem bytecode can be safely identified + RETURN NULL; + END IF; + maybe_redeem := substring(bytecode from (i + 2) for selected_byte); + i := i + 1 + selected_byte; + ELSIF selected_byte = 76 THEN + IF bytecode_length - i < 2 THEN + -- malformed, no redeem bytecode can be safely identified + RETURN NULL; + END IF; + -- OP_PUSHDATA_1 reads one length-byte + length_value := get_byte(bytecode, (i + 1)); + IF bytecode_length - i < 2 + length_value THEN + -- malformed, no redeem bytecode can be safely identified + RETURN NULL; + END IF; + maybe_redeem := substring(bytecode from (i + 3) for length_value); + i := i + 2 + length_value; + ELSIF selected_byte = 77 THEN + IF bytecode_length - i < 3 THEN + -- malformed, no redeem bytecode can be safely identified + RETURN NULL; + END IF; + -- OP_PUSHDATA_2 reads two length-bytes + scratch := substring(bytecode from (i + 2) for 2); + -- parse scratch as unsigned, two byte, little-endian number: + length_value := ((get_byte(scratch, 1) << 8) | get_byte(scratch, 0)); + IF bytecode_length - i < 3 + length_value THEN + -- malformed, no redeem bytecode can be safely identified + RETURN NULL; + END IF; + maybe_redeem := substring(bytecode from (i + 4) for length_value); + i := i + 3 + length_value; + ELSIF selected_byte = 78 THEN + IF bytecode_length - i < 5 THEN + -- malformed, no redeem bytecode can be safely identified + RETURN NULL; + END IF; + -- OP_PUSHDATA_4 reads four length-bytes + scratch := substring(bytecode from (i + 2) for 4); + IF get_byte(scratch, 3) << 24 < 0 THEN + -- push length exceeds maximum signed int (>2GB) + RETURN NULL; + END IF; + -- parse scratch as unsigned, four byte, little-endian number: + length_value := ((get_byte(scratch, 3) << 24) | (get_byte(scratch, 2) << 16) | (get_byte(scratch, 1) << 8) | get_byte(scratch, 0)); + IF bytecode_length - i - 5 < length_value THEN + -- malformed, no redeem bytecode can be safely identified + RETURN NULL; + END IF; + maybe_redeem := substring(bytecode from (i + 6) for length_value); + i := i + 5 + length_value; + END IF; + END LOOP; + IF maybe_redeem IS NULL THEN + RETURN maybe_redeem; + ELSE + RETURN parse_bytecode_pattern(maybe_redeem); + END IF; +END; +$$; diff --git a/src/agent.ts b/src/agent.ts index 3241571..2dbe80e 100644 --- a/src/agent.ts +++ b/src/agent.ts @@ -31,14 +31,21 @@ import { chaingraphLogFirehose, chaingraphUserAgent, genesisBlocks, + incompleteBlockRepairBatchSize, + mempoolTransactionExpirationMs, + mempoolTransactionExpirationScanIntervalMs, postgresMaxConnections, trustedNodes, } from './config.js'; import { acceptBlocksViaHeaders, + archiveMempoolTransaction, + archiveMempoolTransactionsAcceptedByBlocks, createIndexes, getAllKnownBlockHashes, + getIncompleteBlocks, getIndexCreationProgress, + getMempoolTransactionsExpiringBefore, listExistingIndexes, optionallyDisableSynchronousCommit, optionallyEnableSynchronousCommit, @@ -50,6 +57,7 @@ import { saveBlock, saveTransactionForNodes, } from './db.js'; +import type { ExpiringMempoolTransaction, IncompleteBlock } from './db.js'; import type { ChaingraphBlock } from './types/chaingraph.js'; // eslint-disable-next-line @typescript-eslint/naming-convention @@ -108,6 +116,27 @@ const renderSyncPercentage = (value: number) => { }; const msPerSecond = 1000; +const durationDecimalPlaces = 1; +const transactionRateDecimalPlaces = 2; + +const formatDurationSeconds = (durationMs: number) => + `${(durationMs / msPerSecond).toFixed(durationDecimalPlaces)}s`; + +const formatTransactionRate = (transactionCount: number, durationMs: number) => + durationMs === 0 + ? 'n/a tx/s' + : `${((transactionCount * msPerSecond) / durationMs).toFixed( + transactionRateDecimalPlaces + )}tx/s`; + +const formatTransactionDuration = ( + transactionCount: number, + durationMs: number +) => + transactionCount === 0 + ? 'n/a/tx' + : `${formatDurationSeconds(durationMs / transactionCount)}/tx`; + /** * Convert a bitcoin block header timestamp (UTC in seconds) to a `Date`. * @param timestamp - the block header timestamp @@ -288,6 +317,32 @@ export class Agent { scheduledBlockBufferFill = false; + scheduledIncompleteBlockRepair = false; + + incompleteBlockRepairInProgress = false; + + currentIncompleteBlockRepair: + | { + hash: string; + resolve: () => void; + } + | undefined; + + incompleteBlockRepairTimeout: ReturnType | undefined; + + mempoolTransactionExpirationScanTimeout: + | ReturnType + | undefined; + + mempoolTransactionExpirationTimers = new Map< + string, + ReturnType + >(); + + incompleteBlockRepairNextHeight = 0; + + completedIncompleteBlockRepairScan = false; + /** * The next second after which to log another warning that one or more nodes * are unresponsive. @@ -326,6 +381,7 @@ export class Agent { 'block_inclusions_index', 'output_search_index', 'spent_by_index', + 'token_category_index', ]; /** @@ -825,9 +881,16 @@ export class Agent { this.logger.info( `Agent: all managed indexes have been created.` ); - return reenableMempoolCleaning().then(() => { + return reenableMempoolCleaning().then((schemaIsCurrent) => { + if (!schemaIsCurrent) { + this.logger.warn( + 'Agent: WARNING! Database schema is old and missing multiple bug fixes. Update the Hasura image to apply migrations and then restart this agent.' + ); + } this.logger.info('Agent: enabled mempool tracking.'); this.saveInboundTransactions = true; + this.scheduleIncompleteBlockRepair(); + this.scheduleMempoolTransactionExpirationScan(); }); }) .catch((err) => { @@ -906,6 +969,334 @@ export class Agent { return indexCreationCompletion; } + canScanForMempoolTransactionExpirations() { + return ![ + !this.completedInitialSync, + !this.saveInboundTransactions, + this.waitingForIncompleteBlockRepairScan(), + this.mempoolTransactionExpirationScanTimeout !== undefined, + this.willShutdown, + ].includes(true); + } + + waitingForIncompleteBlockRepairScan() { + return ( + incompleteBlockRepairBatchSize !== 0 && + !this.completedIncompleteBlockRepairScan + ); + } + + scheduleMempoolTransactionExpirationScan(delayMs = 0, forceSchedule = false) { + if ( + (!forceSchedule && !this.canScanForMempoolTransactionExpirations()) || + this.willShutdown + ) { + return; + } + this.mempoolTransactionExpirationScanTimeout = setTimeout(() => { + this.mempoolTransactionExpirationScanTimeout = undefined; + this.scanForMempoolTransactionExpirations() + .catch((err) => { + this.logger.error( + err, + 'Agent: failed to scan for expiring mempool transactions.' + ); + }) + .finally(() => { + this.scheduleMempoolTransactionExpirationScan( + mempoolTransactionExpirationScanIntervalMs, + true + ); + }); + }, delayMs); + } + + scheduleMempoolTransactionExpiration( + transaction: ExpiringMempoolTransaction + ) { + const key = `${transaction.nodeInternalId}:${transaction.transactionInternalId}`; + if (this.mempoolTransactionExpirationTimers.has(key)) { + return; + } + const delayMs = Math.max(transaction.expiresAt.getTime() - Date.now(), 0); + const timeout = setTimeout(() => { + this.mempoolTransactionExpirationTimers.delete(key); + this.expireMempoolTransaction(transaction).catch((err) => { + this.logger.error( + err, + `Agent: failed to expire mempool transaction ${transaction.hash} for node ${transaction.nodeName}.` + ); + }); + }, delayMs); + this.mempoolTransactionExpirationTimers.set(key, timeout); + } + + async scanForMempoolTransactionExpirations() { + if (!this.saveInboundTransactions || this.willShutdown) { + return; + } + await this.archiveAcceptedMempoolTransactions(); + const expiresBefore = new Date( + Date.now() + mempoolTransactionExpirationScanIntervalMs + ); + const expiringTransactions = await getMempoolTransactionsExpiringBefore({ + expirationMs: mempoolTransactionExpirationMs, + expiresBefore, + }); + if (expiringTransactions.length === 0) { + this.logger.debug( + `Agent: no mempool transactions expiring before ${expiresBefore.toISOString()}; next scan in ${mempoolTransactionExpirationScanIntervalMs.toLocaleString()}ms.` + ); + return; + } + this.logger.info( + `Agent: found ${ + expiringTransactions.length + } mempool transaction(s) expiring before ${expiresBefore.toISOString()}; scheduling exact expiry.` + ); + expiringTransactions.forEach((transaction) => { + this.scheduleMempoolTransactionExpiration(transaction); + }); + } + + async expireMempoolTransaction(transaction: ExpiringMempoolTransaction) { + const archivedCount = await archiveMempoolTransaction({ + nodeInternalId: transaction.nodeInternalId, + replacedAt: transaction.expiresAt, + transactionInternalId: transaction.transactionInternalId, + }); + if (archivedCount === 0) { + this.logger.debug( + `Agent: skipped expiry of mempool transaction ${transaction.hash} for node ${transaction.nodeName}; it has already left node_transaction.` + ); + return; + } + this.logger.warn( + `Agent: expired mempool transaction ${transaction.hash} for node ${ + transaction.nodeName + }; archived from node_transaction to node_transaction_history with replaced_at ${transaction.expiresAt.toISOString()}.` + ); + } + + async archiveAcceptedMempoolTransactions() { + const archivedTransactions = + await archiveMempoolTransactionsAcceptedByBlocks(); + if (archivedTransactions.length === 0) { + return; + } + this.logger.warn( + `Agent: archived ${archivedTransactions.length.toLocaleString()} stale mempool transaction(s) already accepted or replaced by blocks before ordinary expiry.` + ); + archivedTransactions.forEach((transaction) => { + if (transaction.replacedAt === null) { + this.logger.info( + `Agent: archived stale confirmed mempool transaction ${transaction.hash} for node ${transaction.nodeName}; transaction is already accepted by a block, archived with replaced_at NULL.` + ); + return; + } + this.logger.info( + `Agent: archived stale replaced mempool transaction ${ + transaction.hash + } for node ${ + transaction.nodeName + }; an accepted block already spends the same outpoint, archived with replaced_at ${transaction.replacedAt.toISOString()}.` + ); + }); + } + + canScheduleIncompleteBlockRepair() { + return ![ + incompleteBlockRepairBatchSize === 0, + !this.completedInitialSync, + this.scheduledIncompleteBlockRepair, + this.incompleteBlockRepairInProgress, + this.completedIncompleteBlockRepairScan, + this.willShutdown, + ].includes(true); + } + + scheduleIncompleteBlockRepair() { + if (!this.canScheduleIncompleteBlockRepair()) { + return; + } + this.incompleteBlockRepairTimeout = setTimeout(() => { + this.scheduledIncompleteBlockRepair = false; + this.repairIncompleteBlocks().catch((err) => { + this.logger.fatal(err); + this.shutdown().catch((shutdownErr) => { + this.logger.error(shutdownErr); + }); + }); + }); + this.scheduledIncompleteBlockRepair = true; + } + + getIncompleteBlockRepairRange() { + const bestHeight = Math.max( + ...Object.values(this.blockTree.getBestHeights()) + ); + const finalHeight = bestHeight + 1; + const heightLowerBound = + this.incompleteBlockRepairNextHeight > bestHeight + ? 0 + : this.incompleteBlockRepairNextHeight; + const heightUpperBound = Math.min( + heightLowerBound + incompleteBlockRepairBatchSize, + finalHeight + ); + return { finalHeight, heightLowerBound, heightUpperBound }; + } + + updateIncompleteBlockRepairProgress({ + finalHeight, + heightLowerBound, + heightUpperBound, + incompleteBlockCount, + limit, + }: { + finalHeight: number; + heightLowerBound: number; + heightUpperBound: number; + incompleteBlockCount: number; + limit: number; + }) { + if (incompleteBlockCount === limit) { + this.incompleteBlockRepairNextHeight = heightLowerBound; + return; + } + if (heightUpperBound === finalHeight) { + this.incompleteBlockRepairNextHeight = 0; + this.completedIncompleteBlockRepairScan = true; + this.logger.info('Agent: completed incomplete block repair scan.'); + this.scheduleMempoolTransactionExpirationScan(); + return; + } + this.incompleteBlockRepairNextHeight = heightUpperBound; + } + + async repairIncompleteBlock(block: IncompleteBlock) { + if (this.currentIncompleteBlockRepair !== undefined) { + // eslint-disable-next-line functional/no-throw-statement + throw new Error( + `Agent: attempted to repair incomplete block ${block.height} (${block.hash}) while already repairing ${this.currentIncompleteBlockRepair.hash}.` + ); + } + const sourceNodes = this.blockTree.getNodesWithBlock( + block.hash, + block.height + ); + if (sourceNodes.length === 0) { + // eslint-disable-next-line functional/no-throw-statement + throw new Error( + `Agent: incomplete block ${block.height} (${block.hash}) is not currently accepted by any connected node.` + ); + } + this.logger.info( + `Agent: self-healing incomplete block ${block.height} (${block.hash}); linked size ${block.linkedSizeBytes}/${block.sizeBytes} bytes across ${block.transactionCount} saved transaction(s).` + ); + const alreadyDownloading = this.blockDownloads.some( + (download) => download.hash === block.hash + ); + return new Promise((resolve) => { + this.currentIncompleteBlockRepair = { + hash: block.hash, + resolve, + }; + if (!alreadyDownloading) { + this.blockBuffer.reserveBlock(); + this.requestBlock(block.hash, block.height); + } + }); + } + + canRepairIncompleteBlocks() { + return ( + incompleteBlockRepairBatchSize !== 0 && + this.completedInitialSync && + !this.willShutdown + ); + } + + getRegisteredNodeInternalIds() { + return Object.values(this.nodes) + .map((node) => node.internalId) + .filter((id): id is number => id !== undefined); + } + + async repairIncompleteBlocksSequentially(blocks: IncompleteBlock[]) { + await blocks.reduce>(async (previousRepair, block) => { + await previousRepair; + await this.repairIncompleteBlock(block); + }, Promise.resolve()); + } + + async repairIncompleteBlocksOnce() { + const { finalHeight, heightLowerBound, heightUpperBound } = + this.getIncompleteBlockRepairRange(); + const scanStartTime = Date.now(); + const { incompleteBlocks, scannedBlockCount } = await getIncompleteBlocks({ + excludedBlockHashes: [], + heightLowerBound, + heightUpperBound, + limit: incompleteBlockRepairBatchSize, + nodeInternalIds: this.getRegisteredNodeInternalIds(), + }); + const scanDurationMs = Date.now() - scanStartTime; + const scanRate = + scanDurationMs === 0 + ? scannedBlockCount * msPerSecond + : Math.round((scannedBlockCount / scanDurationMs) * msPerSecond); + const scanPerformanceLog = `scanned ${scannedBlockCount.toLocaleString()} block(s) in ${scanDurationMs.toLocaleString()}ms (${scanRate.toLocaleString()} blocks/s)`; + if (incompleteBlocks.length === 0) { + this.updateIncompleteBlockRepairProgress({ + finalHeight, + heightLowerBound, + heightUpperBound, + incompleteBlockCount: incompleteBlocks.length, + limit: incompleteBlockRepairBatchSize, + }); + this.logger.info( + `Agent: no incomplete blocks found from height ${heightLowerBound} to ${ + heightUpperBound - 1 + }; ${scanPerformanceLog}.` + ); + return; + } + this.logger.warn( + `Agent: found ${ + incompleteBlocks.length + } incomplete block(s) from height ${heightLowerBound} to ${ + heightUpperBound - 1 + }; ${scanPerformanceLog}; requesting full block contents for repair.` + ); + await this.repairIncompleteBlocksSequentially(incompleteBlocks); + } + + /** + * Audit a bounded range of blocks accepted by currently-connected nodes. If + * the saved transactions don't sum to the saved block size, re-request the + * full block and let the normal block-saving path repair missing rows. + */ + async repairIncompleteBlocks() { + if (!this.canRepairIncompleteBlocks()) { + return; + } + if (this.incompleteBlockRepairInProgress) { + return; + } + this.incompleteBlockRepairInProgress = true; + await this.repairIncompleteBlocksOnce() + .then(() => { + this.incompleteBlockRepairInProgress = false; + this.scheduleIncompleteBlockRepair(); + }) + .catch((err) => { + this.incompleteBlockRepairInProgress = false; + // eslint-disable-next-line functional/no-throw-statement + throw err; + }); + } + /** * Internal method used to identify sync status of all nodes. Returns a list * of nodes in order from least-synced to most-synced. (Nodes which have not @@ -1366,6 +1757,7 @@ export class Agent { const durationMs = completionTime - startTime; const transactions = attemptedSavedTransactions.length; + const savedTransactionCount = transactions - transactionCacheMisses; const inputs = attemptedSavedTransactions.reduce( (total, tx) => total + tx.inputs.length, 0 @@ -1399,15 +1791,29 @@ export class Agent { .toString() .padStart(heightMinWidth, ' ')} | timestamp: ${blockTimestampToDate( block.timestamp - ).toISOString()} | hash: ${block.hash} | new txs: ${ - transactions - transactionCacheMisses - }/${block.transactions.length.toString()} (${transactionCacheMisses} cache misses) | nodes: ${nodeAcceptances + ).toISOString()} | hash: ${ + block.hash + } | new txs: ${savedTransactionCount}/${block.transactions.length.toString()} (${transactionCacheMisses} cache misses) | nodes: ${nodeAcceptances .map((acceptance) => acceptance.nodeName) .join(', ')}`; + const blockInsertLog = `Inserting block ${ + block.height + } with ${savedTransactionCount} new transaction${ + savedTransactionCount === 1 ? '' : 's' + } for ${nodeAcceptances + .map((acceptance) => acceptance.nodeName) + .join(', ')} took ${formatDurationSeconds( + durationMs + )} (${formatTransactionDuration( + savedTransactionCount, + durationMs + )}, ${formatTransactionRate(savedTransactionCount, durationMs)})`; if (isHistoricalSync) { this.logger.debug(blockSyncLog); + this.logger.trace(blockInsertLog); } else { this.logger.info(blockSyncLog); + this.logger.debug(blockInsertLog); } nodeAcceptances.forEach((acceptance) => { @@ -1417,6 +1823,11 @@ export class Agent { blockTimestampToDate(block.timestamp) ); }); + const currentRepair = this.currentIncompleteBlockRepair; + if (currentRepair?.hash === block.hash) { + currentRepair.resolve(); + this.currentIncompleteBlockRepair = undefined; + } this.blockBuffer.removeBlock(block); } @@ -1500,7 +1911,7 @@ export class Agent { txCacheItem.db = true; this.transactionCache.set(transactionHash, txCacheItem); this.logger.trace( - `Marked transaction saved to DB - hash: ${transactionHash}` + `Marked transaction as saved to DB - hash: ${transactionHash}` ); } @@ -1542,7 +1953,14 @@ export class Agent { .join(', ')} - hash: ${tx.hash}` ); // TODO: collect statistics on save speed + const startTime = Date.now(); await saveTransactionForNodes(tx, validations); + const durationMs = Date.now() - startTime; + this.logger.debug( + `Inserting mempool tx ${ + tx.hash + } for node ${nodeName} took ${formatDurationSeconds(durationMs)}` + ); this.markTransactionSavedToDb(tx.hash); } } @@ -1754,6 +2172,16 @@ export class Agent { this.willShutdown = true; clearInterval(eventLoopDurationInterval); clearInterval(this.heartbeatInterval); + if (this.incompleteBlockRepairTimeout !== undefined) { + clearTimeout(this.incompleteBlockRepairTimeout); + } + if (this.mempoolTransactionExpirationScanTimeout !== undefined) { + clearTimeout(this.mempoolTransactionExpirationScanTimeout); + } + this.mempoolTransactionExpirationTimers.forEach((timeout) => { + clearTimeout(timeout); + }); + this.mempoolTransactionExpirationTimers.clear(); Object.values(this.nodes).forEach((connection) => { connection.disconnect(); }); diff --git a/src/components/db-utils.ts b/src/components/db-utils.ts index dfdacd0..e1f03e5 100644 --- a/src/components/db-utils.ts +++ b/src/components/db-utils.ts @@ -4,6 +4,7 @@ export const indexDefinitions = { block_inclusions_index: /* sql */ `CREATE INDEX block_inclusions_index ON block_transaction USING btree (transaction_internal_id);`, output_search_index: /* sql */ `CREATE INDEX output_search_index ON output USING btree (substring(locking_bytecode, 0, 26));`, spent_by_index: /* sql */ `CREATE INDEX spent_by_index ON input USING btree (outpoint_transaction_hash, outpoint_index);`, + token_category_index: /* sql */ `CREATE INDEX token_category_index ON output USING btree (token_category);`, }; /* eslint-enable camelcase, @typescript-eslint/naming-convention */ diff --git a/src/components/sync-state.spec.ts b/src/components/sync-state.spec.ts index b21a1f5..4f5fb3c 100644 --- a/src/components/sync-state.spec.ts +++ b/src/components/sync-state.spec.ts @@ -58,3 +58,31 @@ test('SyncState', (t) => { t.deepEqual(state.additionalSyncedHeights, []); t.deepEqual(state.latestSyncedBlockTime, new Date(5)); }); + +test('SyncState scales when marking many contiguous heights as pending', (t) => { + const state = new SyncState({ + additionalSyncedHeights: [], + fullySyncedUpToHeight: 0, + pendingSyncOfHeights: [], + }); + const pendingThroughHeight = 4_000; + const maximumDurationMs = 1_000; + + const startedAt = Date.now(); + // eslint-disable-next-line functional/no-loop-statement + for ( + // eslint-disable-next-line functional/no-let + let nextHeight = state.getPendingSyncHeight() + 1; + nextHeight <= pendingThroughHeight; + nextHeight = state.getPendingSyncHeight() + 1 + ) { + state.markHeightAsPendingSync(nextHeight); + } + const durationMs = Date.now() - startedAt; + + t.deepEqual(state.getPendingSyncHeight(), pendingThroughHeight); + t.true( + durationMs < maximumDurationMs, + `Expected marking contiguous pending heights to complete in less than ${maximumDurationMs}ms; took ${durationMs}ms.` + ); +}); diff --git a/src/components/sync-state.ts b/src/components/sync-state.ts index ed15b60..58f6e5c 100644 --- a/src/components/sync-state.ts +++ b/src/components/sync-state.ts @@ -48,10 +48,21 @@ export class SyncState { */ latestSyncedBlockTime: Date | 'caught-up' | undefined; + private pendingSyncHeight: number; + + private pendingSyncHeights: Set; + + private additionalSyncedHeightSet: Set; + constructor(initialState: InitialSyncState) { this.fullySyncedUpToHeight = initialState.fullySyncedUpToHeight; this.pendingSyncOfHeights = initialState.pendingSyncOfHeights.slice(); this.additionalSyncedHeights = initialState.additionalSyncedHeights.slice(); + this.pendingSyncHeights = new Set(this.pendingSyncOfHeights); + this.additionalSyncedHeightSet = new Set(this.additionalSyncedHeights); + this.pendingSyncHeight = this.computePendingSyncHeightFrom( + this.fullySyncedUpToHeight + ); this.latestSyncedBlockTime = initialState.fullySyncedUpToHeight > 0 ? 'caught-up' : undefined; } @@ -69,28 +80,35 @@ export class SyncState { if ( this.fullySyncedUpToHeight < height && - !this.additionalSyncedHeights.includes(height) + !this.additionalSyncedHeightSet.has(height) ) { this.additionalSyncedHeights.push(height); - removeValueIfPresent(this.pendingSyncOfHeights, height); + this.additionalSyncedHeightSet.add(height); + if (this.pendingSyncHeights.delete(height)) { + removeValueIfPresent(this.pendingSyncOfHeights, height); + } } // eslint-disable-next-line functional/no-let let nextHeight = this.fullySyncedUpToHeight + 1; // eslint-disable-next-line functional/no-loop-statement - while (removeValueIfPresent(this.additionalSyncedHeights, nextHeight)) { + while (this.additionalSyncedHeightSet.delete(nextHeight)) { + removeValueIfPresent(this.additionalSyncedHeights, nextHeight); this.fullySyncedUpToHeight = nextHeight; nextHeight += 1; } + this.updatePendingSyncHeight(); } markHeightAsPendingSync(height: number) { if ( this.fullySyncedUpToHeight < height && - !this.pendingSyncOfHeights.includes(height) && - !this.additionalSyncedHeights.includes(height) + !this.pendingSyncHeights.has(height) && + !this.additionalSyncedHeightSet.has(height) ) { this.pendingSyncOfHeights.push(height); + this.pendingSyncHeights.add(height); + this.updatePendingSyncHeight(); } } @@ -113,6 +131,9 @@ export class SyncState { this.additionalSyncedHeights = this.additionalSyncedHeights.filter( (completed) => completed < height ); + this.pendingSyncHeights = new Set(this.pendingSyncOfHeights); + this.additionalSyncedHeightSet = new Set(this.additionalSyncedHeights); + this.resetPendingSyncHeight(); } /** @@ -120,16 +141,35 @@ export class SyncState { * useful for prioritizing syncing. */ getPendingSyncHeight() { - const handledHeights = [ - ...this.pendingSyncOfHeights, - ...this.additionalSyncedHeights, - ]; + return this.pendingSyncHeight; + } + + private isHeightHandled(height: number) { + return ( + this.pendingSyncHeights.has(height) || + this.additionalSyncedHeightSet.has(height) + ); + } + + private computePendingSyncHeightFrom(height: number) { // eslint-disable-next-line functional/no-let - let firstUnhandledHeight = this.fullySyncedUpToHeight + 1; + let pendingSyncHeight = height; // eslint-disable-next-line functional/no-loop-statement - while (handledHeights.includes(firstUnhandledHeight)) { - firstUnhandledHeight += 1; + while (this.isHeightHandled(pendingSyncHeight + 1)) { + pendingSyncHeight += 1; } - return firstUnhandledHeight - 1; + return pendingSyncHeight; + } + + private updatePendingSyncHeight() { + this.pendingSyncHeight = this.computePendingSyncHeightFrom( + Math.max(this.pendingSyncHeight, this.fullySyncedUpToHeight) + ); + } + + private resetPendingSyncHeight() { + this.pendingSyncHeight = this.computePendingSyncHeightFrom( + this.fullySyncedUpToHeight + ); } } diff --git a/src/config.ts b/src/config.ts index 46f51ff..00c37a5 100644 --- a/src/config.ts +++ b/src/config.ts @@ -36,11 +36,14 @@ const configuration = { const expectedOptions = [ 'CHAINGRAPH_BLOCK_BUFFER_TARGET_SIZE_MB', 'CHAINGRAPH_GENESIS_BLOCKS', + 'CHAINGRAPH_INCOMPLETE_BLOCK_REPAIR_BATCH_SIZE', 'CHAINGRAPH_INTERNAL_API_PORT', 'CHAINGRAPH_LOG_FIREHOSE', 'CHAINGRAPH_LOG_LEVEL_STDOUT', 'CHAINGRAPH_LOG_LEVEL_PATH', 'CHAINGRAPH_LOG_PATH', + 'CHAINGRAPH_MEMPOOL_TRANSACTION_EXPIRATION_MS', + 'CHAINGRAPH_MEMPOOL_TRANSACTION_EXPIRATION_SCAN_INTERVAL_MS', 'CHAINGRAPH_POSTGRES_CONNECTION_STRING', 'CHAINGRAPH_POSTGRES_MAX_CONNECTIONS', 'CHAINGRAPH_POSTGRES_SYNCHRONOUS_COMMIT', @@ -112,6 +115,45 @@ if (isNaN(blockBufferTargetSizeMb) || blockBufferTargetSizeMb <= 0) { ); } +const incompleteBlockRepairBatchSize = Number( + configuration.CHAINGRAPH_INCOMPLETE_BLOCK_REPAIR_BATCH_SIZE +); +if ( + !Number.isInteger(incompleteBlockRepairBatchSize) || + incompleteBlockRepairBatchSize < 0 +) { + // eslint-disable-next-line functional/no-throw-statement + throw new Error( + 'The CHAINGRAPH_INCOMPLETE_BLOCK_REPAIR_BATCH_SIZE environment variable must be an integer greater than or equal to 0.' + ); +} + +const mempoolTransactionExpirationMs = Number( + configuration.CHAINGRAPH_MEMPOOL_TRANSACTION_EXPIRATION_MS +); +if ( + !Number.isInteger(mempoolTransactionExpirationMs) || + mempoolTransactionExpirationMs <= 0 +) { + // eslint-disable-next-line functional/no-throw-statement + throw new Error( + 'The CHAINGRAPH_MEMPOOL_TRANSACTION_EXPIRATION_MS environment variable must be an integer greater than 0.' + ); +} + +const mempoolTransactionExpirationScanIntervalMs = Number( + configuration.CHAINGRAPH_MEMPOOL_TRANSACTION_EXPIRATION_SCAN_INTERVAL_MS +); +if ( + !Number.isInteger(mempoolTransactionExpirationScanIntervalMs) || + mempoolTransactionExpirationScanIntervalMs <= 0 +) { + // eslint-disable-next-line functional/no-throw-statement + throw new Error( + 'The CHAINGRAPH_MEMPOOL_TRANSACTION_EXPIRATION_SCAN_INTERVAL_MS environment variable must be an integer greater than 0.' + ); +} + const extendTildeAndResolvePath = (path: string) => path.startsWith('~') ? resolve(join(homedir(), path.slice(1))) @@ -376,6 +418,9 @@ export { chaingraphLogLevelPath, chaingraphUserAgent, genesisBlocks, + incompleteBlockRepairBatchSize, + mempoolTransactionExpirationMs, + mempoolTransactionExpirationScanIntervalMs, postgresMaxConnections, postgresConnectionString, postgresSynchronousCommit, diff --git a/src/db.ts b/src/db.ts index 8c62d18..fa4465e 100644 --- a/src/db.ts +++ b/src/db.ts @@ -1,3 +1,4 @@ +/* eslint-disable max-lines */ import pg from 'pg'; import type { Agent } from './agent.js'; @@ -93,6 +94,365 @@ export const getAllKnownBlockHashes = async () => { return hashes; }; +export interface IncompleteBlock { + hash: string; + height: number; + linkedSizeBytes: number; + sizeBytes: number; + transactionCount: number; +} + +export interface IncompleteBlockScan { + incompleteBlocks: IncompleteBlock[]; + scannedBlockCount: number; +} + +export interface ExpiringMempoolTransaction { + expiresAt: Date; + hash: string; + nodeInternalId: number; + nodeName: string; + transactionInternalId: number; + validatedAt: Date; +} + +export interface ArchivedMempoolTransaction { + hash: string; + nodeName: string; + replacedAt: Date | null; +} + +/** + * Find blocks for which the locally saved block_transaction rows don't sum to + * the block's saved byte size. This avoids the SQL block encoder so it can + * detect incomplete blocks even if encoder functions have bugs (e.g. #75). + */ +export const getIncompleteBlocks = async ({ + heightLowerBound, + heightUpperBound, + limit, + nodeInternalIds, + excludedBlockHashes, +}: { + excludedBlockHashes: string[]; + heightLowerBound: number; + heightUpperBound: number; + limit: number; + nodeInternalIds: number[]; +}): Promise => { + if (nodeInternalIds.length === 0) { + return { incompleteBlocks: [], scannedBlockCount: 0 }; + } + const client = await pool.connect(); + // eslint-disable-next-line functional/no-try-statement + try { + const incompleteBlockScan = await client.query<{ + incompleteBlocks: { + hash: string; + height: number | string; + linkedSizeBytes: number | string; + sizeBytes: number | string; + transactionCount: number | string; + }[]; + scannedBlockCount: string; + }>( + /* sql */ ` +WITH linked_transactions AS ( + SELECT + block.internal_id, + block.height, + block.hash, + block.size_bytes, + COUNT(block_transaction.transaction_internal_id)::bigint + AS transaction_count, + COALESCE(SUM(transaction.size_bytes), 0)::bigint + AS transaction_size_bytes + FROM block + LEFT JOIN block_transaction + ON block_transaction.block_internal_id = block.internal_id + LEFT JOIN transaction + ON transaction.internal_id = block_transaction.transaction_internal_id + WHERE block.height >= $2 + AND block.height < $3 + AND NOT (encode(block.hash, 'hex') = ANY($5::text[])) + AND EXISTS ( + SELECT 1 FROM node_block + WHERE node_block.block_internal_id = block.internal_id + AND node_block.node_internal_id = ANY($1::integer[]) + ) + GROUP BY block.internal_id +), +linked_block_sizes AS ( + SELECT + hash, + height, + size_bytes, + transaction_count, + 80 + + CASE + WHEN transaction_count <= 252 THEN 1 + WHEN transaction_count <= 65535 THEN 3 + WHEN transaction_count <= 4294967295 THEN 5 + ELSE 9 + END + + transaction_size_bytes AS linked_size_bytes + FROM linked_transactions +), +incomplete_blocks AS ( + SELECT + encode(hash, 'hex') AS hash, + height, + linked_size_bytes, + size_bytes, + transaction_count + FROM linked_block_sizes + WHERE linked_size_bytes != size_bytes + ORDER BY height ASC, hash ASC + LIMIT $4 +) +SELECT + (SELECT COUNT(*)::bigint FROM linked_block_sizes) AS "scannedBlockCount", + COALESCE( + ( + SELECT jsonb_agg( + jsonb_build_object( + 'hash', hash, + 'height', height, + 'linkedSizeBytes', linked_size_bytes, + 'sizeBytes', size_bytes, + 'transactionCount', transaction_count + ) + ORDER BY height ASC, hash ASC + ) + FROM incomplete_blocks + ), + '[]'::jsonb + ) AS "incompleteBlocks"; +`, + [ + nodeInternalIds, + heightLowerBound, + heightUpperBound, + limit, + excludedBlockHashes, + ] + ); + const scan = incompleteBlockScan.rows[0]!; + return { + incompleteBlocks: scan.incompleteBlocks.map((block) => ({ + hash: block.hash, + height: Number(block.height), + linkedSizeBytes: Number(block.linkedSizeBytes), + sizeBytes: Number(block.sizeBytes), + transactionCount: Number(block.transactionCount), + })), + scannedBlockCount: Number(scan.scannedBlockCount), + }; + } finally { + client.release(); + } +}; + +/** + * Find node_transaction rows which will expire before the provided timestamp. + */ +export const getMempoolTransactionsExpiringBefore = async ({ + expiresBefore, + expirationMs, +}: { + expirationMs: number; + expiresBefore: Date; +}): Promise => { + const expirationInterval = `${expirationMs}::double precision * interval '1 millisecond'`; + const client = await pool.connect(); + // eslint-disable-next-line functional/no-try-statement + try { + const transactions = await client.query<{ + expiresAt: string; + hash: string; + nodeInternalId: string; + nodeName: string; + transactionInternalId: string; + validatedAt: string; + }>(/* sql */ ` +SELECT encode(transaction.hash, 'hex') AS "hash", + node.name AS "nodeName", + node_transaction.node_internal_id AS "nodeInternalId", + node_transaction.transaction_internal_id AS "transactionInternalId", + node_transaction.validated_at::text AS "validatedAt", + (node_transaction.validated_at + (${expirationInterval}))::text AS "expiresAt" + FROM node_transaction + JOIN node + ON node.internal_id = node_transaction.node_internal_id + JOIN transaction + ON transaction.internal_id = node_transaction.transaction_internal_id + WHERE node_transaction.validated_at + (${expirationInterval}) <= ${dateToTimestampWithoutTimezone( + expiresBefore + )} + ORDER BY "expiresAt", "nodeName", "hash"; +`); + return transactions.rows.map((transaction) => ({ + expiresAt: timestampWithoutTimezoneToDate(transaction.expiresAt), + hash: transaction.hash, + nodeInternalId: Number(transaction.nodeInternalId), + nodeName: transaction.nodeName, + transactionInternalId: Number(transaction.transactionInternalId), + validatedAt: timestampWithoutTimezoneToDate(transaction.validatedAt), + })); + } finally { + client.release(); + } +}; + +/** + * Archive node_transaction rows for transactions that are already accepted or + * replaced by accepted blocks for the same node. This repairs historical rows + * missed when block inclusions are added after the node_block trigger has + * already fired. + */ +export const archiveMempoolTransactionsAcceptedByBlocks = async (): Promise< + ArchivedMempoolTransaction[] +> => { + const client = await pool.connect(); + // eslint-disable-next-line functional/no-try-statement + try { + const result = await client.query<{ + hash: string; + nodeName: string; + replacedAt: string | null; + }>(/* sql */ ` +WITH directly_accepted AS ( + SELECT node_transaction.node_internal_id, + node_transaction.transaction_internal_id, + NULL::timestamp without time zone AS replaced_at + FROM node_transaction + JOIN block_transaction + ON block_transaction.transaction_internal_id = node_transaction.transaction_internal_id + JOIN node_block + ON node_block.node_internal_id = node_transaction.node_internal_id + AND node_block.block_internal_id = block_transaction.block_internal_id +), +replaced_by_accepted AS ( + SELECT node_transaction.node_internal_id, + node_transaction.transaction_internal_id, + MIN(node_block.accepted_at) AS replaced_at + FROM node_transaction + JOIN input mempool_input + ON mempool_input.transaction_internal_id = node_transaction.transaction_internal_id + JOIN input accepted_input + ON accepted_input.outpoint_transaction_hash = mempool_input.outpoint_transaction_hash + AND accepted_input.outpoint_index = mempool_input.outpoint_index + AND accepted_input.transaction_internal_id != node_transaction.transaction_internal_id + JOIN block_transaction + ON block_transaction.transaction_internal_id = accepted_input.transaction_internal_id + JOIN node_block + ON node_block.node_internal_id = node_transaction.node_internal_id + AND node_block.block_internal_id = block_transaction.block_internal_id + WHERE mempool_input.outpoint_transaction_hash != '\\x0000000000000000000000000000000000000000000000000000000000000000'::bytea + GROUP BY node_transaction.node_internal_id, + node_transaction.transaction_internal_id +), +archive_candidates AS ( + SELECT node_internal_id, transaction_internal_id, replaced_at + FROM directly_accepted + UNION ALL + SELECT node_internal_id, transaction_internal_id, replaced_at + FROM replaced_by_accepted +), +archive_rows AS ( + SELECT node_internal_id, + transaction_internal_id, + CASE + WHEN bool_or(replaced_at IS NULL) THEN NULL::timestamp without time zone + ELSE MIN(replaced_at) + END AS replaced_at + FROM archive_candidates + GROUP BY node_internal_id, transaction_internal_id +), +deleted_rows AS ( + DELETE FROM node_transaction + USING archive_rows + WHERE node_transaction.node_internal_id = archive_rows.node_internal_id + AND node_transaction.transaction_internal_id = archive_rows.transaction_internal_id + RETURNING node_transaction.node_internal_id, + node_transaction.transaction_internal_id, + node_transaction.validated_at, + archive_rows.replaced_at +), +inserted_history AS ( + INSERT INTO node_transaction_history (node_internal_id, transaction_internal_id, validated_at, replaced_at) + SELECT node_internal_id, transaction_internal_id, validated_at, replaced_at + FROM deleted_rows + RETURNING node_internal_id, transaction_internal_id, replaced_at +) +SELECT encode(transaction.hash, 'hex') AS "hash", + node.name AS "nodeName", + inserted_history.replaced_at::text AS "replacedAt" + FROM inserted_history + JOIN node + ON node.internal_id = inserted_history.node_internal_id + JOIN transaction + ON transaction.internal_id = inserted_history.transaction_internal_id + ORDER BY "nodeName", "hash"; +`); + return result.rows.map((row) => ({ + hash: row.hash, + nodeName: row.nodeName, + replacedAt: + row.replacedAt === null + ? null + : timestampWithoutTimezoneToDate(row.replacedAt), + })); + } finally { + client.release(); + } +}; + +/** + * Archive a single node_transaction row. Existing history triggers handle any + * same-node descendants with the same replaced_at timestamp. + */ +export const archiveMempoolTransaction = async ({ + nodeInternalId, + replacedAt, + transactionInternalId, +}: { + nodeInternalId: number; + replacedAt: Date; + transactionInternalId: number; +}) => { + const client = await pool.connect(); + // eslint-disable-next-line functional/no-try-statement + try { + const result = await client.query<{ + archivedCount: number; + }>( + /* sql */ ` +WITH deleted_row AS ( + DELETE FROM node_transaction + WHERE node_internal_id = $1 + AND transaction_internal_id = $2 + RETURNING node_internal_id, + transaction_internal_id, + validated_at, + ${dateToTimestampWithoutTimezone(replacedAt)} AS replaced_at +), +inserted_history AS ( + INSERT INTO node_transaction_history (node_internal_id, transaction_internal_id, validated_at, replaced_at) + SELECT node_internal_id, transaction_internal_id, validated_at, replaced_at + FROM deleted_row + RETURNING transaction_internal_id +) +SELECT COUNT(*)::integer AS "archivedCount" FROM inserted_history; +`, + [nodeInternalId, transactionInternalId] + ); + return result.rows[0]!.archivedCount; + } finally { + client.release(); + } +}; + /** * Create or update one or more trusted node in the Chaingraph database, * returning it's internal ID. @@ -200,15 +560,6 @@ WITH transaction_values (hash, version, locktime, size_bytes, is_coinbase) AS ( )}'::bytea, '${hexToByteaString(input.unlockingBytecode)}'::bytea)` ) .join(',')} -), node_transaction_values (node_internal_id, validated_at) AS ( - VALUES ${nodeValidations - .map( - (validation) => - `(${ - validation.nodeInternalId - }::bigint, ${dateToTimestampWithoutTimezone(validation.validatedAt)})` - ) - .join(',')} ), new_transaction (transaction_hash, transaction_internal_id) AS ( INSERT INTO transaction (hash, version, locktime, size_bytes, is_coinbase) SELECT hash, version, locktime, size_bytes, is_coinbase FROM transaction_values @@ -220,27 +571,57 @@ WITH transaction_values (hash, version, locktime, size_bytes, is_coinbase) AS ( ), insert_inputs AS ( INSERT INTO input (transaction_internal_id, input_index, outpoint_index, sequence_number, outpoint_transaction_hash, unlocking_bytecode) SELECT transaction_internal_id, input_index, outpoint_index, sequence_number, outpoint_transaction_hash, unlocking_bytecode FROM input_values CROSS JOIN new_transaction -), new_or_existing_transaction (transaction_internal_id) AS ( - SELECT COALESCE ( - (SELECT transaction_internal_id FROM new_transaction), - (SELECT internal_id AS transaction_internal_id FROM transaction WHERE transaction.hash = '${hexToByteaString( - transaction.hash - )}'::bytea) - ) +) +SELECT COUNT(*) FROM new_transaction; +`; + const saveNodeValidations = /* sql */ ` +WITH node_transaction_values (node_internal_id, validated_at) AS ( + VALUES ${nodeValidations + .map( + (validation) => + `(${ + validation.nodeInternalId + }::bigint, ${dateToTimestampWithoutTimezone(validation.validatedAt)})` + ) + .join(',')} ) INSERT INTO node_transaction (node_internal_id, transaction_internal_id, validated_at) - SELECT node_internal_id, transaction_internal_id, validated_at FROM node_transaction_values CROSS JOIN new_or_existing_transaction; + SELECT node_internal_id, $1::bigint, validated_at FROM node_transaction_values + ON CONFLICT ON CONSTRAINT "node_transaction_pkey" DO NOTHING; `; const client = await pool.connect(); - await client.query(saveTransaction); - client.release(); + // eslint-disable-next-line functional/no-try-statement + try { + await client.query('BEGIN;'); + await client.query(saveTransaction); + const transactionInternalIdResult = await client.query<{ + internalId: string; + }>( + /* sql */ `SELECT internal_id AS "internalId" FROM transaction WHERE hash = $1;`, + [Buffer.from(transaction.hash, 'hex')] + ); + const transactionInternalId = + transactionInternalIdResult.rows[0]?.internalId; + if (transactionInternalId === undefined) { + // eslint-disable-next-line functional/no-throw-statement + throw new Error( + `Failed to save or find transaction while recording node validation: ${transaction.hash}` + ); + } + await client.query(saveNodeValidations, [transactionInternalId]); + await client.query('COMMIT;'); + } catch (err) { + await client.query('ROLLBACK;'); + // eslint-disable-next-line functional/no-throw-statement + throw err; + } finally { + client.release(); + } }; /** * Immediately mark a node as having validated a transaction already known to * exist in the database. - * - * TODO: test */ export const recordNodeValidation = async ( transactionHash: string, @@ -253,16 +634,28 @@ export const recordNodeValidation = async ( /* * The transaction is already saved, just insert `node_transaction`s. */ - await client.query(/* sql */ ` - INSERT INTO node_transaction (node_internal_id, transaction_internal_id, validated_at) - SELECT node_internal_id, validated_at FROM (VALUES ( + // eslint-disable-next-line functional/no-try-statement + try { + await client.query(/* sql */ ` + WITH node_transaction_values (node_internal_id, validated_at) AS ( + VALUES ( ${validation.nodeInternalId}::bigint, - ${dateToTimestampWithoutTimezone(validation.validatedAt)}) - INNER JOIN (SELECT internal_id as transaction_internal_id from transaction WHERE hash = '${hexToByteaString( - transactionHash - )}'::bytea); - `); - client.release(); + ${dateToTimestampWithoutTimezone(validation.validatedAt)} + ) + ), known_transaction (transaction_internal_id) AS ( + SELECT internal_id + FROM transaction + WHERE hash = '${hexToByteaString(transactionHash)}'::bytea + ) + INSERT INTO node_transaction (node_internal_id, transaction_internal_id, validated_at) + SELECT node_internal_id, transaction_internal_id, validated_at + FROM node_transaction_values + CROSS JOIN known_transaction + ON CONFLICT ON CONSTRAINT "node_transaction_pkey" DO NOTHING; + `); + } finally { + client.release(); + } }; /** @@ -300,7 +693,7 @@ export const saveBlock = async ({ }>( (transactions, transaction) => { // eslint-disable-next-line @typescript-eslint/no-unused-expressions - transactionCache.has(transaction.hash) + transactionCache.get(transaction.hash)?.db === true ? transactions.alreadySaved.push(transaction) : transactions.unknown.push(transaction); return transactions; @@ -453,11 +846,6 @@ inserted_block (internal_id) AS ( ON CONFLICT ON CONSTRAINT "block_hash_key" DO NOTHING RETURNING internal_id ), -inserted_block_transactions AS ( - INSERT INTO block_transaction (block_internal_id, transaction_internal_id, transaction_index) - SELECT blk.internal_id, tx.internal_id, tx.transaction_index - FROM inserted_block blk CROSS JOIN joined_transactions tx -), new_or_existing_block (internal_id) AS ( SELECT COALESCE ( (SELECT internal_id FROM inserted_block), @@ -465,27 +853,78 @@ new_or_existing_block (internal_id) AS ( block.hash )}'::bytea) ) -) -INSERT INTO node_block (node_internal_id, block_internal_id, accepted_at) +), +inserted_block_transactions AS ( + INSERT INTO block_transaction (block_internal_id, transaction_internal_id, transaction_index) + SELECT blk.internal_id, tx.internal_id, tx.transaction_index + FROM new_or_existing_block blk CROSS JOIN joined_transactions tx + ON CONFLICT ON CONSTRAINT "block_transaction_pkey" DO NOTHING + RETURNING transaction_internal_id +), +inserted_node_blocks AS ( + INSERT INTO node_block (node_internal_id, block_internal_id, accepted_at) SELECT node.node_internal_id, blk.internal_id, node.accepted_at FROM new_or_existing_block blk CROSS JOIN accepting_nodes node - ON CONFLICT ON CONSTRAINT "node_block_pkey" DO NOTHING`; + ON CONFLICT ON CONSTRAINT "node_block_pkey" DO NOTHING + RETURNING block_internal_id +) +SELECT + (SELECT COUNT(*)::bigint FROM joined_transactions) AS "joinedTransactionCount", + (SELECT COUNT(*)::bigint FROM inserted_block_transactions) AS "insertedBlockTransactionCount", + (SELECT COUNT(*)::bigint FROM inserted_node_blocks) AS "insertedNodeBlockCount";`; const client = await pool.connect(); - await client.query('BEGIN;'); - const saveTransactionsResult = await client.query<{ count: string }>( - addAllTransactions - ); - const attemptedSavedTransactions = blockTransactions.unknown; - const savedTransactionCount = Number(saveTransactionsResult.rows[0]!.count); - const transactionCacheMisses = - attemptedSavedTransactions.length - savedTransactionCount; - await client.query(addBlockQuery); - await client.query('COMMIT;'); - client.release(); - return { - attemptedSavedTransactions, - transactionCacheMisses, - }; + // eslint-disable-next-line functional/no-try-statement + try { + await client.query('BEGIN;'); + const saveTransactionsResult = await client.query<{ count: string }>( + addAllTransactions + ); + const attemptedSavedTransactions = blockTransactions.unknown; + const savedTransactionCount = Number(saveTransactionsResult.rows[0]!.count); + const transactionCacheMisses = + attemptedSavedTransactions.length - savedTransactionCount; + const addBlockResult = await client.query<{ + insertedBlockTransactionCount: string; + insertedNodeBlockCount: string; + joinedTransactionCount: string; + }>(addBlockQuery); + const joinedTransactionCount = Number( + addBlockResult.rows[0]!.joinedTransactionCount + ); + const linkedBlockTransactionCount = Number( + ( + await client.query<{ count: string }>( + /* sql */ ` + SELECT COUNT(*)::bigint AS count + FROM block_transaction + INNER JOIN block ON block.internal_id = block_transaction.block_internal_id + WHERE block.hash = $1; + `, + [Buffer.from(block.hash, 'hex')] + ) + ).rows[0]!.count + ); + if ( + joinedTransactionCount !== block.transactions.length || + linkedBlockTransactionCount !== block.transactions.length + ) { + // eslint-disable-next-line functional/no-throw-statement + throw new Error( + `Failed to save all transactions for block ${block.height} (${block.hash}): joined ${joinedTransactionCount}/${block.transactions.length}, linked ${linkedBlockTransactionCount}/${block.transactions.length}.` + ); + } + await client.query('COMMIT;'); + return { + attemptedSavedTransactions, + transactionCacheMisses, + }; + } catch (err) { + await client.query('ROLLBACK;'); + // eslint-disable-next-line functional/no-throw-statement + throw err; + } finally { + client.release(); + } }; /** @@ -564,17 +1003,33 @@ DELETE FROM node_block WHERE /** * After initial sync, Chaingraph begins tracking each node's mempool. * - * To maintain consistency, a trigger which is disabled before initial sync must + * To maintain consistency, triggers which are disabled before initial sync must * be reenabled to clear any confirmed or conflicting transactions when a block * is accepted. */ export const reenableMempoolCleaning = async () => { const client = await pool.connect(); - const res = await client.query( + await client.query( `ALTER TABLE node_block ENABLE TRIGGER trigger_public_node_block_insert;` ); + const triggerExists = + // cspell:ignore tgrelid tgname + ( + await client.query<{ triggerExists: boolean }>(/* sql */ ` +SELECT EXISTS ( + SELECT 1 FROM pg_trigger + WHERE tgrelid = 'node_transaction_history'::regclass + AND tgname = 'trigger_public_node_transaction_history_insert' +) AS "triggerExists"; +`) + ).rows[0]?.triggerExists === true; + if (triggerExists) { + await client.query( + `ALTER TABLE node_transaction_history ENABLE TRIGGER trigger_public_node_transaction_history_insert;` + ); + } client.release(); - return res.rowCount; + return triggerExists; }; /** diff --git a/src/e2e/e2e.spec.ts b/src/e2e/e2e.spec.ts index 8d62a42..8c8034b 100644 --- a/src/e2e/e2e.spec.ts +++ b/src/e2e/e2e.spec.ts @@ -27,6 +27,8 @@ import { execa } from 'execa'; import got from 'got'; import pg from 'pg'; +import type { ChaingraphTransaction } from '../types/chaingraph.js'; + import { chaingraphE2eLogPath, logger } from './e2e.spec.logging.helper.js'; import { chipnetCashTokensTx, @@ -59,10 +61,27 @@ const recreateDbOnStartup = true as boolean; const dir = dirname(fileURLToPath(import.meta.url)); const migration = (path: string) => resolve(dir, '../../images/hasura/hasura-data/migrations/', path); +const backfillOrphanMempoolDescendantsMigrationPath = migration( + 'default/1778158619747_backfill_orphan_mempool_descendants/up.sql' +); const dbUpMigrationPaths = [ migration('default/1616195337538_init/up.sql'), migration('default/1673124945608_tokens/up.sql'), migration('default/1676794104752_parse_bytecode_pattern/up.sql'), + migration( + 'default/1778151011521_cascade_invalidate_mempool_descendants/up.sql' + ), + backfillOrphanMempoolDescendantsMigrationPath, + migration('default/1778351597200_fix_block_encoded_transaction_count/up.sql'), + migration( + 'default/1778415174939_fix_data_carrier_outputs_empty_bytecode/up.sql' + ), + migration('default/1778429124205_fix_coinbase_only_value_aggregates/up.sql'), + migration( + 'default/1778435997270_add_node_transaction_history_primary_key/up.sql' + ), + migration('default/1778437612917_fix_zero_length_pushdata_patterns/up.sql'), + migration('default/1778438318512_fix_redeem_bytecode_parser/up.sql'), ]; const chaingraphInternalApiPort = '3201'; @@ -138,9 +157,11 @@ const postgresE2eConnectionStringTestDb = `${postgresE2eConnectionStringBase}/${ const e2eEnvVariables = { /* eslint-disable @typescript-eslint/naming-convention */ CHAINGRAPH_GENESIS_BLOCKS: `${e2eTestNetworkMagicHex}:${genesisBlockRaw},e3e1f3e8:${genesisBlockRaw},dab5bffa:${testnetGenesisBlockRaw}`, + CHAINGRAPH_INCOMPLETE_BLOCK_REPAIR_BATCH_SIZE: '10000', CHAINGRAPH_INTERNAL_API_PORT: chaingraphInternalApiPort, CHAINGRAPH_LOG_FIREHOSE: logP2pMessage.toString(), CHAINGRAPH_LOG_PATH: chaingraphE2eLogPath, + CHAINGRAPH_MEMPOOL_TRANSACTION_EXPIRATION_SCAN_INTERVAL_MS: '100', CHAINGRAPH_POSTGRES_CONNECTION_STRING: postgresE2eConnectionStringTestDb, CHAINGRAPH_TRUSTED_NODES: e2eTrustedNodesSet1, NODE_ENV: 'production', @@ -558,6 +579,186 @@ const sleep = async (ms: number) => setTimeout(res, ms); }); +const repeatedHashByteLength = 32; +const transactionSaveConflictPollingAttempts = 50; +const transactionSaveConflictPollingIntervalMs = 20; +const waitForTransactionSaveConflict = async ( + transactionHash: string, + remainingAttempts = transactionSaveConflictPollingAttempts +): Promise => { + const result = await client.query<{ waiting: boolean }>( + /* sql */ ` + SELECT EXISTS ( + SELECT 1 + FROM pg_stat_activity + -- cspell:disable-next-line + WHERE datname = $1 + AND query LIKE '%INSERT INTO transaction%' + AND query LIKE $2 + AND wait_event_type IS NOT NULL + ) AS waiting; + `, + [e2eTestDbName, `%${transactionHash}%`] + ); + if (result.rows[0]!.waiting) { + return; + } + if (remainingAttempts === 0) { + // eslint-disable-next-line functional/no-throw-statement + throw new Error( + `Timed out waiting for saveTransactionForNodes conflict on ${transactionHash}.` + ); + } + await sleep(transactionSaveConflictPollingIntervalMs); + await waitForTransactionSaveConflict(transactionHash, remainingAttempts - 1); +}; + +const blockRepairPollingAttempts = 40; +const blockRepairPollingIntervalMs = 250; +const getBlockTransactionCount = async (blockHash: string) => + Number( + ( + await client.query<{ count: string }>( + /* sql */ ` + SELECT COUNT(*) FROM block_transaction + INNER JOIN block ON block.internal_id = block_transaction.block_internal_id + WHERE block.hash = $1; + `, + [hexToBin(blockHash)] + ) + ).rows[0]!.count + ); +const waitForBlockTransactionCount = async ( + blockHash: string, + expectedCount: number, + remainingAttempts = blockRepairPollingAttempts +): Promise => { + const count = await getBlockTransactionCount(blockHash); + if (count === expectedCount || remainingAttempts === 0) { + return count; + } + await sleep(blockRepairPollingIntervalMs); + return waitForBlockTransactionCount( + blockHash, + expectedCount, + remainingAttempts - 1 + ); +}; + +const mempoolExpirationPollingAttempts = 50; +const mempoolExpirationPollingIntervalMs = 100; +const getExpiredMempoolArchiveState = async () => + ( + await client.query<{ + historyRowCount: number; + inMempool: boolean; + replacedAt: string | null; + transactionName: string; + }>(/* sql */ ` +WITH transaction_values (name, hash) AS ( + VALUES + ('expiry_parent_a', decode(repeat('d1', 32), 'hex')), + ('expiry_child_b', decode(repeat('d2', 32), 'hex')), + ('expiry_child_c', decode(repeat('d3', 32), 'hex')) +), +selected_node AS ( + SELECT internal_id + FROM node + WHERE name = 'node1' +), +named_transactions AS ( + SELECT transaction_values.name, transaction.internal_id + FROM transaction + JOIN transaction_values + ON transaction_values.hash = transaction.hash +) +SELECT named_transactions.name AS "transactionName", + (node_transaction.transaction_internal_id IS NOT NULL) AS "inMempool", + COUNT(node_transaction_history.transaction_internal_id)::integer AS "historyRowCount", + MIN(node_transaction_history.replaced_at)::text AS "replacedAt" + FROM named_transactions + CROSS JOIN selected_node + LEFT JOIN node_transaction + ON node_transaction.node_internal_id = selected_node.internal_id + AND node_transaction.transaction_internal_id = named_transactions.internal_id + LEFT JOIN node_transaction_history + ON node_transaction_history.node_internal_id = selected_node.internal_id + AND node_transaction_history.transaction_internal_id = named_transactions.internal_id + GROUP BY named_transactions.name, node_transaction.transaction_internal_id + ORDER BY named_transactions.name; +`) + ).rows; +const waitForExpiredMempoolArchive = async ( + remainingAttempts = mempoolExpirationPollingAttempts +): Promise>> => { + const rows = await getExpiredMempoolArchiveState(); + const expectedReplacedAt = '2026-01-15 00:00:00'; + if ( + rows.every( + (row) => + !row.inMempool && + row.historyRowCount === 1 && + row.replacedAt === expectedReplacedAt + ) || + remainingAttempts === 0 + ) { + return rows; + } + await sleep(mempoolExpirationPollingIntervalMs); + return waitForExpiredMempoolArchive(remainingAttempts - 1); +}; + +const getConfirmedMempoolArchiveState = async () => + ( + await client.query<{ + historyRowCount: number; + inMempool: boolean; + replacedAt: string | null; + }>(/* sql */ ` +WITH selected_node AS ( + SELECT internal_id + FROM node + WHERE name = 'node1' +), +selected_transaction AS ( + SELECT internal_id + FROM transaction + WHERE hash = decode(repeat('d5', 32), 'hex') +) +SELECT (node_transaction.transaction_internal_id IS NOT NULL) AS "inMempool", + COUNT(node_transaction_history.transaction_internal_id)::integer AS "historyRowCount", + MIN(node_transaction_history.replaced_at)::text AS "replacedAt" + FROM selected_transaction + CROSS JOIN selected_node + LEFT JOIN node_transaction + ON node_transaction.node_internal_id = selected_node.internal_id + AND node_transaction.transaction_internal_id = selected_transaction.internal_id + LEFT JOIN node_transaction_history + ON node_transaction_history.node_internal_id = selected_node.internal_id + AND node_transaction_history.transaction_internal_id = selected_transaction.internal_id + GROUP BY node_transaction.transaction_internal_id; +`) + ).rows[0]; + +const confirmedMempoolArchiveCompleted = ( + row: Awaited> +) => + row !== undefined && + !row.inMempool && + row.historyRowCount === 1 && + row.replacedAt === null; + +const waitForConfirmedMempoolArchive = async ( + remainingAttempts = mempoolExpirationPollingAttempts +): Promise>> => { + const row = await getConfirmedMempoolArchiveState(); + if (confirmedMempoolArchiveCompleted(row) || remainingAttempts === 0) { + return row; + } + await sleep(mempoolExpirationPollingIntervalMs); + return waitForConfirmedMempoolArchive(remainingAttempts - 1); +}; + test.serial( '[e2e] ignores inbound transactions before initial sync is complete', async (t) => { @@ -609,17 +810,890 @@ test.serial('[e2e] creates expected indexes after initial sync', async (t) => { 'node_internal_id_key', 'node_name_key', 'node_pkey', + 'node_transaction_history_pkey', 'node_transaction_pkey', 'output_pkey', 'output_search_index', 'spent_by_index', + 'token_category_index', 'transaction_hash_key', 'transaction_pkey', ]); + // cspell:ignore tgenabled tgname + const triggers = ( + await client.query<{ + tgenabled: string; + tgname: string; + }>(/* sql */ ` + SELECT tgname, tgenabled FROM pg_trigger + WHERE tgname IN ( + 'trigger_public_node_block_insert', + 'trigger_public_node_transaction_history_insert' + ) + ORDER BY tgname; + `) + ).rows; + t.deepEqual(triggers, [ + { tgenabled: 'O', tgname: 'trigger_public_node_block_insert' }, + { + tgenabled: 'O', + tgname: 'trigger_public_node_transaction_history_insert', + }, + ]); clearStdoutBuffer(); t.pass(); }); +test.serial( + '[e2e] records node validation after concurrent transaction insert conflict', + async (t) => { + const transactionHash = 'c1'.repeat(repeatedHashByteLength); + const validatedAt = new Date('2026-01-01T00:00:00.000Z'); + const transaction: ChaingraphTransaction = { + hash: transactionHash, + inputs: [ + { + outpointIndex: 0, + outpointTransactionHash: 'c2'.repeat(repeatedHashByteLength), + sequenceNumber: 0, + unlockingBytecode: '51', + }, + ], + isCoinbase: false, + locktime: 0, + outputs: [ + { + lockingBytecode: '51', + valueSatoshis: 1000n, + }, + ], + sizeBytes: 100, + version: 1, + }; + const nodeInternalId = Number( + ( + await client.query<{ internalId: number }>( + /* sql */ `SELECT internal_id AS "internalId" FROM node WHERE name = 'node1';` + ) + ).rows[0]!.internalId + ); + await client.query( + /* sql */ ` + DELETE FROM node_transaction + USING transaction + WHERE node_transaction.transaction_internal_id = transaction.internal_id + AND transaction.hash = $1; + `, + [Buffer.from(transactionHash, 'hex')] + ); + await client.query( + /* sql */ ` + DELETE FROM input + USING transaction + WHERE input.transaction_internal_id = transaction.internal_id + AND transaction.hash = $1; + `, + [Buffer.from(transactionHash, 'hex')] + ); + await client.query( + /* sql */ `DELETE FROM output WHERE transaction_hash = $1;`, + [Buffer.from(transactionHash, 'hex')] + ); + await client.query(/* sql */ `DELETE FROM transaction WHERE hash = $1;`, [ + Buffer.from(transactionHash, 'hex'), + ]); + const originalPostgresConnectionString = + process.env.CHAINGRAPH_POSTGRES_CONNECTION_STRING; + process.env.CHAINGRAPH_POSTGRES_CONNECTION_STRING = + postgresE2eConnectionStringTestDb; + const { pool: dbPool, saveTransactionForNodes } = await import('../db.js'); + const competingClient = new pg.Client({ + connectionString: postgresE2eConnectionStringTestDb, + }); + await competingClient.connect(); + // eslint-disable-next-line functional/no-let + let competingTransactionOpen = false; + // eslint-disable-next-line functional/no-try-statement + try { + await competingClient.query(/* sql */ `BEGIN;`); + competingTransactionOpen = true; + await competingClient.query( + /* sql */ ` + INSERT INTO transaction (hash, version, locktime, size_bytes, is_coinbase) + VALUES ($1, 1, 0, 100, false); + `, + [Buffer.from(transactionHash, 'hex')] + ); + const savePromise = saveTransactionForNodes(transaction, [ + { nodeInternalId, validatedAt }, + ]); + await waitForTransactionSaveConflict(transactionHash); + await competingClient.query(/* sql */ `COMMIT;`); + competingTransactionOpen = false; + await t.notThrowsAsync(savePromise); + const savedValidationCount = Number( + ( + await client.query<{ count: string }>( + /* sql */ ` + SELECT COUNT(*)::bigint AS count + FROM node_transaction + JOIN transaction + ON transaction.internal_id = node_transaction.transaction_internal_id + WHERE transaction.hash = $1 + AND node_transaction.node_internal_id = $2 + AND node_transaction.validated_at = $3; + `, + [Buffer.from(transactionHash, 'hex'), nodeInternalId, validatedAt] + ) + ).rows[0]!.count + ); + t.deepEqual(savedValidationCount, 1); + } finally { + if (competingTransactionOpen) { + await competingClient.query(/* sql */ `ROLLBACK;`).catch((err) => { + logger.debug(err); + }); + } + await client.query( + /* sql */ ` + DELETE FROM node_transaction + USING transaction + WHERE node_transaction.transaction_internal_id = transaction.internal_id + AND transaction.hash = $1; + `, + [Buffer.from(transactionHash, 'hex')] + ); + await client.query( + /* sql */ ` + DELETE FROM input + USING transaction + WHERE input.transaction_internal_id = transaction.internal_id + AND transaction.hash = $1; + `, + [Buffer.from(transactionHash, 'hex')] + ); + await client.query( + /* sql */ `DELETE FROM output WHERE transaction_hash = $1;`, + [Buffer.from(transactionHash, 'hex')] + ); + await client.query(/* sql */ `DELETE FROM transaction WHERE hash = $1;`, [ + Buffer.from(transactionHash, 'hex'), + ]); + await competingClient.end(); + await dbPool.end(); + if (originalPostgresConnectionString === undefined) { + delete process.env.CHAINGRAPH_POSTGRES_CONNECTION_STRING; + } else { + process.env.CHAINGRAPH_POSTGRES_CONNECTION_STRING = + originalPostgresConnectionString; + } + } + } +); + +test.serial( + '[e2e] cascades replaced mempool transaction history to same-node descendants', + async (t) => { + await client.query(/* sql */ `BEGIN;`); + // eslint-disable-next-line functional/no-try-statement + try { + await client.query(/* sql */ ` +WITH transaction_values (name, hash) AS ( + VALUES + ('parent_a', decode(repeat('f1', 32), 'hex')), + ('child_b', decode(repeat('f3', 32), 'hex')), + ('child_c', decode(repeat('f4', 32), 'hex')) +) +INSERT INTO transaction (hash, version, locktime, size_bytes, is_coinbase) + SELECT hash, 1, 0, 100, false + FROM transaction_values; +`); + await client.query(/* sql */ ` +WITH transaction_values (name, hash) AS ( + VALUES + ('parent_a', decode(repeat('f1', 32), 'hex')), + ('child_b', decode(repeat('f3', 32), 'hex')) +) +INSERT INTO output (transaction_hash, output_index, value_satoshis, locking_bytecode) + SELECT hash, 0, 1000, '\\x51'::bytea + FROM transaction_values; +`); + await client.query(/* sql */ ` +WITH input_values (child_name, parent_name, input_index) AS ( + VALUES + ('child_b', 'parent_a', 0), + ('child_c', 'child_b', 0) +), +transaction_values (name, hash) AS ( + VALUES + ('parent_a', decode(repeat('f1', 32), 'hex')), + ('child_b', decode(repeat('f3', 32), 'hex')), + ('child_c', decode(repeat('f4', 32), 'hex')) +), +named_transactions AS ( + SELECT transaction_values.name, transaction.internal_id, transaction.hash + FROM transaction + JOIN transaction_values + ON transaction_values.hash = transaction.hash +) +INSERT INTO input (transaction_internal_id, input_index, outpoint_index, sequence_number, outpoint_transaction_hash, unlocking_bytecode) + SELECT child.internal_id, input_values.input_index, 0, 0, parent.hash, '\\x51'::bytea + FROM input_values + JOIN named_transactions child + ON child.name = input_values.child_name + JOIN named_transactions parent + ON parent.name = input_values.parent_name; +`); + await client.query(/* sql */ ` +WITH selected_nodes AS ( + SELECT name, internal_id + FROM node + WHERE name IN ('node1', 'node2') +), +transaction_values (name, hash) AS ( + VALUES + ('child_b', decode(repeat('f3', 32), 'hex')), + ('child_c', decode(repeat('f4', 32), 'hex')) +), +named_transactions AS ( + SELECT transaction_values.name, transaction.internal_id + FROM transaction + JOIN transaction_values + ON transaction_values.hash = transaction.hash +) +INSERT INTO node_transaction (node_internal_id, transaction_internal_id, validated_at) + SELECT selected_nodes.internal_id, named_transactions.internal_id, timestamp '2026-01-01 00:00:00' + FROM selected_nodes + CROSS JOIN named_transactions; +`); + await client.query(/* sql */ ` +WITH selected_nodes AS ( + SELECT name, internal_id + FROM node + WHERE name = 'node1' +), +transaction_values (name, hash) AS ( + VALUES + ('parent_a', decode(repeat('f1', 32), 'hex')) +), +named_transactions AS ( + SELECT transaction_values.name, transaction.internal_id + FROM transaction + JOIN transaction_values + ON transaction_values.hash = transaction.hash +) +INSERT INTO node_transaction_history (node_internal_id, transaction_internal_id, validated_at, replaced_at) + SELECT selected_nodes.internal_id, + named_transactions.internal_id, + timestamp '2026-01-01 00:00:00', + timestamp '2026-01-01 00:10:00' + FROM selected_nodes + CROSS JOIN named_transactions; +`); + const remainingMempool = ( + await client.query<{ + nodeName: string; + transactionName: string; + }>(/* sql */ ` +WITH transaction_values (name, hash) AS ( + VALUES + ('child_b', decode(repeat('f3', 32), 'hex')), + ('child_c', decode(repeat('f4', 32), 'hex')) +) +SELECT node.name AS "nodeName", transaction_values.name AS "transactionName" + FROM node_transaction + JOIN node + ON node.internal_id = node_transaction.node_internal_id + JOIN transaction + ON transaction.internal_id = node_transaction.transaction_internal_id + JOIN transaction_values + ON transaction_values.hash = transaction.hash + ORDER BY "nodeName", "transactionName"; +`) + ).rows; + t.deepEqual(remainingMempool, [ + { nodeName: 'node2', transactionName: 'child_b' }, + { nodeName: 'node2', transactionName: 'child_c' }, + ]); + const archivedDescendants = ( + await client.query<{ + replacedAt: string; + transactionName: string; + }>(/* sql */ ` +WITH transaction_values (name, hash) AS ( + VALUES + ('child_b', decode(repeat('f3', 32), 'hex')), + ('child_c', decode(repeat('f4', 32), 'hex')) +) +SELECT transaction_values.name AS "transactionName", + node_transaction_history.replaced_at::text AS "replacedAt" + FROM node_transaction_history + JOIN node + ON node.internal_id = node_transaction_history.node_internal_id + JOIN transaction + ON transaction.internal_id = node_transaction_history.transaction_internal_id + JOIN transaction_values + ON transaction_values.hash = transaction.hash + WHERE node.name = 'node1' + ORDER BY "transactionName"; +`) + ).rows; + t.deepEqual(archivedDescendants, [ + { replacedAt: '2026-01-01 00:10:00', transactionName: 'child_b' }, + { replacedAt: '2026-01-01 00:10:00', transactionName: 'child_c' }, + ]); + } finally { + await client.query(/* sql */ `ROLLBACK;`); + } + } +); + +test.serial( + '[e2e] archives expired mempool transactions and descendants', + async (t) => { + await client.query(/* sql */ ` +WITH transaction_values (name, hash) AS ( + VALUES + ('expiry_parent_a', decode(repeat('d1', 32), 'hex')), + ('expiry_child_b', decode(repeat('d2', 32), 'hex')), + ('expiry_child_c', decode(repeat('d3', 32), 'hex')) +) +INSERT INTO transaction (hash, version, locktime, size_bytes, is_coinbase) + SELECT hash, 1, 0, 100, false + FROM transaction_values; +`); + // eslint-disable-next-line functional/no-try-statement + try { + await client.query(/* sql */ ` +WITH transaction_values (name, hash) AS ( + VALUES + ('expiry_parent_a', decode(repeat('d1', 32), 'hex')), + ('expiry_child_b', decode(repeat('d2', 32), 'hex')) +) +INSERT INTO output (transaction_hash, output_index, value_satoshis, locking_bytecode) + SELECT hash, 0, 1000, '\\x51'::bytea + FROM transaction_values; +`); + await client.query(/* sql */ ` +WITH input_values (child_name, parent_name, input_index) AS ( + VALUES + ('expiry_child_b', 'expiry_parent_a', 0), + ('expiry_child_c', 'expiry_child_b', 0) +), +transaction_values (name, hash) AS ( + VALUES + ('expiry_parent_a', decode(repeat('d1', 32), 'hex')), + ('expiry_child_b', decode(repeat('d2', 32), 'hex')), + ('expiry_child_c', decode(repeat('d3', 32), 'hex')) +), +named_transactions AS ( + SELECT transaction_values.name, transaction.internal_id, transaction.hash + FROM transaction + JOIN transaction_values + ON transaction_values.hash = transaction.hash +) +INSERT INTO input (transaction_internal_id, input_index, outpoint_index, sequence_number, outpoint_transaction_hash, unlocking_bytecode) + SELECT child.internal_id, input_values.input_index, 0, 0, parent.hash, '\\x51'::bytea + FROM input_values + JOIN named_transactions child + ON child.name = input_values.child_name + JOIN named_transactions parent + ON parent.name = input_values.parent_name; +`); + await client.query(/* sql */ ` +WITH selected_node AS ( + SELECT internal_id + FROM node + WHERE name = 'node1' +), +transaction_values (name, hash) AS ( + VALUES + ('expiry_parent_a', decode(repeat('d1', 32), 'hex')), + ('expiry_child_b', decode(repeat('d2', 32), 'hex')), + ('expiry_child_c', decode(repeat('d3', 32), 'hex')) +), +named_transactions AS ( + SELECT transaction_values.name, transaction.internal_id + FROM transaction + JOIN transaction_values + ON transaction_values.hash = transaction.hash +) +INSERT INTO node_transaction (node_internal_id, transaction_internal_id, validated_at) + SELECT selected_node.internal_id, + named_transactions.internal_id, + CASE + WHEN named_transactions.name = 'expiry_parent_a' + THEN timestamp '2026-01-01 00:00:00' + ELSE CURRENT_TIMESTAMP + interval '1 day' + END + FROM selected_node + CROSS JOIN named_transactions; +`); + const archivedTransactions = await waitForExpiredMempoolArchive(); + t.deepEqual(archivedTransactions, [ + { + historyRowCount: 1, + inMempool: false, + replacedAt: '2026-01-15 00:00:00', + transactionName: 'expiry_child_b', + }, + { + historyRowCount: 1, + inMempool: false, + replacedAt: '2026-01-15 00:00:00', + transactionName: 'expiry_child_c', + }, + { + historyRowCount: 1, + inMempool: false, + replacedAt: '2026-01-15 00:00:00', + transactionName: 'expiry_parent_a', + }, + ]); + } finally { + await client.query(/* sql */ ` +WITH transaction_values (hash) AS ( + VALUES + (decode(repeat('d1', 32), 'hex')), + (decode(repeat('d2', 32), 'hex')), + (decode(repeat('d3', 32), 'hex')) +), +named_transactions AS ( + SELECT transaction.internal_id + FROM transaction + JOIN transaction_values + ON transaction_values.hash = transaction.hash +) +DELETE FROM node_transaction + USING named_transactions + WHERE node_transaction.transaction_internal_id = named_transactions.internal_id; +`); + await client.query(/* sql */ ` +WITH transaction_values (hash) AS ( + VALUES + (decode(repeat('d1', 32), 'hex')), + (decode(repeat('d2', 32), 'hex')), + (decode(repeat('d3', 32), 'hex')) +), +named_transactions AS ( + SELECT transaction.internal_id + FROM transaction + JOIN transaction_values + ON transaction_values.hash = transaction.hash +) +DELETE FROM node_transaction_history + USING named_transactions + WHERE node_transaction_history.transaction_internal_id = named_transactions.internal_id; +`); + await client.query(/* sql */ ` +WITH transaction_values (hash) AS ( + VALUES + (decode(repeat('d1', 32), 'hex')), + (decode(repeat('d2', 32), 'hex')), + (decode(repeat('d3', 32), 'hex')) +), +named_transactions AS ( + SELECT transaction.internal_id + FROM transaction + JOIN transaction_values + ON transaction_values.hash = transaction.hash +) +DELETE FROM input + USING named_transactions + WHERE input.transaction_internal_id = named_transactions.internal_id; +`); + await client.query(/* sql */ ` +WITH transaction_values (hash) AS ( + VALUES + (decode(repeat('d1', 32), 'hex')), + (decode(repeat('d2', 32), 'hex')), + (decode(repeat('d3', 32), 'hex')) +) +DELETE FROM output + USING transaction_values + WHERE output.transaction_hash = transaction_values.hash; +`); + await client.query(/* sql */ ` +WITH transaction_values (hash) AS ( + VALUES + (decode(repeat('d1', 32), 'hex')), + (decode(repeat('d2', 32), 'hex')), + (decode(repeat('d3', 32), 'hex')) +) +DELETE FROM transaction + USING transaction_values + WHERE transaction.hash = transaction_values.hash; +`); + } + } +); + +test.serial( + '[e2e] archives stale mempool transactions already accepted by blocks', + async (t) => { + await client.query(/* sql */ ` +WITH transaction_values (name, hash) AS ( + VALUES + ('confirmed_parent_a', decode(repeat('d4', 32), 'hex')), + ('confirmed_child_b', decode(repeat('d5', 32), 'hex')) +) +INSERT INTO transaction (hash, version, locktime, size_bytes, is_coinbase) + SELECT hash, 1, 0, 100, false + FROM transaction_values; +`); + // eslint-disable-next-line functional/no-try-statement + try { + await client.query(/* sql */ ` +INSERT INTO output (transaction_hash, output_index, value_satoshis, locking_bytecode) + VALUES (decode(repeat('d4', 32), 'hex'), 0, 1000, '\\x51'::bytea); +`); + await client.query(/* sql */ ` +WITH selected_transaction AS ( + SELECT internal_id + FROM transaction + WHERE hash = decode(repeat('d5', 32), 'hex') +) +INSERT INTO input (transaction_internal_id, input_index, outpoint_index, sequence_number, outpoint_transaction_hash, unlocking_bytecode) + SELECT selected_transaction.internal_id, + 0, + 0, + 0, + decode(repeat('d4', 32), 'hex'), + '\\x51'::bytea + FROM selected_transaction; +`); + await client.query(/* sql */ ` +WITH selected_transaction AS ( + SELECT internal_id + FROM transaction + WHERE hash = decode(repeat('d5', 32), 'hex') +), +inserted_block AS ( + INSERT INTO block (height, version, "timestamp", hash, previous_block_hash, merkle_root, bits, nonce, size_bytes) + VALUES (4001, 1, 0, decode(repeat('d6', 32), 'hex'), decode(repeat('d7', 32), 'hex'), decode(repeat('d8', 32), 'hex'), 0, 0, 181) + RETURNING internal_id +) +INSERT INTO block_transaction (block_internal_id, transaction_internal_id, transaction_index) + SELECT inserted_block.internal_id, selected_transaction.internal_id, 1 + FROM inserted_block + CROSS JOIN selected_transaction; +`); + await client.query(/* sql */ ` +WITH selected_node AS ( + SELECT internal_id + FROM node + WHERE name = 'node1' +), +selected_block AS ( + SELECT internal_id + FROM block + WHERE hash = decode(repeat('d6', 32), 'hex') +) +INSERT INTO node_block (node_internal_id, block_internal_id, accepted_at) + SELECT selected_node.internal_id, + selected_block.internal_id, + timestamp '2026-01-01 00:10:00' + FROM selected_node + CROSS JOIN selected_block; +`); + await client.query(/* sql */ ` +WITH selected_node AS ( + SELECT internal_id + FROM node + WHERE name = 'node1' +), +selected_transaction AS ( + SELECT internal_id + FROM transaction + WHERE hash = decode(repeat('d5', 32), 'hex') +) +INSERT INTO node_transaction (node_internal_id, transaction_internal_id, validated_at) + SELECT selected_node.internal_id, + selected_transaction.internal_id, + timestamp '2026-01-01 00:00:00' + FROM selected_node + CROSS JOIN selected_transaction; +`); + const archivedTransaction = await waitForConfirmedMempoolArchive(); + t.deepEqual(archivedTransaction, { + historyRowCount: 1, + inMempool: false, + replacedAt: null, + }); + } finally { + await client.query(/* sql */ ` +WITH transaction_values (hash) AS ( + VALUES + (decode(repeat('d4', 32), 'hex')), + (decode(repeat('d5', 32), 'hex')) +), +named_transactions AS ( + SELECT transaction.internal_id + FROM transaction + JOIN transaction_values + ON transaction_values.hash = transaction.hash +) +DELETE FROM node_transaction + USING named_transactions + WHERE node_transaction.transaction_internal_id = named_transactions.internal_id; +`); + await client.query(/* sql */ ` +WITH transaction_values (hash) AS ( + VALUES + (decode(repeat('d4', 32), 'hex')), + (decode(repeat('d5', 32), 'hex')) +), +named_transactions AS ( + SELECT transaction.internal_id + FROM transaction + JOIN transaction_values + ON transaction_values.hash = transaction.hash +) +DELETE FROM node_transaction_history + USING named_transactions + WHERE node_transaction_history.transaction_internal_id = named_transactions.internal_id; +`); + await client.query(/* sql */ ` +WITH selected_block AS ( + SELECT internal_id + FROM block + WHERE hash = decode(repeat('d6', 32), 'hex') +) +DELETE FROM node_block + USING selected_block + WHERE node_block.block_internal_id = selected_block.internal_id; +`); + await client.query(/* sql */ ` +WITH selected_block AS ( + SELECT internal_id + FROM block + WHERE hash = decode(repeat('d6', 32), 'hex') +) +DELETE FROM node_block_history + USING selected_block + WHERE node_block_history.block_internal_id = selected_block.internal_id; +`); + await client.query(/* sql */ ` +WITH selected_block AS ( + SELECT internal_id + FROM block + WHERE hash = decode(repeat('d6', 32), 'hex') +) +DELETE FROM block_transaction + USING selected_block + WHERE block_transaction.block_internal_id = selected_block.internal_id; +`); + await client.query(/* sql */ ` +DELETE FROM block + WHERE hash = decode(repeat('d6', 32), 'hex'); +`); + await client.query(/* sql */ ` +WITH selected_transaction AS ( + SELECT internal_id + FROM transaction + WHERE hash = decode(repeat('d5', 32), 'hex') +) +DELETE FROM input + USING selected_transaction + WHERE input.transaction_internal_id = selected_transaction.internal_id; +`); + await client.query(/* sql */ ` +DELETE FROM output + WHERE transaction_hash = decode(repeat('d4', 32), 'hex'); +`); + await client.query(/* sql */ ` +WITH transaction_values (hash) AS ( + VALUES + (decode(repeat('d4', 32), 'hex')), + (decode(repeat('d5', 32), 'hex')) +) +DELETE FROM transaction + USING transaction_values + WHERE transaction.hash = transaction_values.hash; +`); + } + } +); + +test.serial( + '[e2e] backfills existing orphan mempool descendants with idempotence', + async (t) => { + await client.query(/* sql */ `BEGIN;`); + // eslint-disable-next-line functional/no-try-statement + try { + const backfillMigration = readFileSync( + backfillOrphanMempoolDescendantsMigrationPath, + 'utf8' + ); + await client.query(/* sql */ ` +WITH transaction_values (name, hash) AS ( + VALUES + ('backfill_parent_a', decode(repeat('e1', 32), 'hex')), + ('backfill_child_b', decode(repeat('e2', 32), 'hex')), + ('backfill_child_c', decode(repeat('e3', 32), 'hex')) +) +INSERT INTO transaction (hash, version, locktime, size_bytes, is_coinbase) + SELECT hash, 1, 0, 100, false + FROM transaction_values; +`); + await client.query(/* sql */ ` +WITH transaction_values (name, hash) AS ( + VALUES + ('backfill_parent_a', decode(repeat('e1', 32), 'hex')), + ('backfill_child_b', decode(repeat('e2', 32), 'hex')) +) +INSERT INTO output (transaction_hash, output_index, value_satoshis, locking_bytecode) + SELECT hash, 0, 1000, '\\x51'::bytea + FROM transaction_values; +`); + await client.query(/* sql */ ` +WITH input_values (child_name, parent_name, input_index) AS ( + VALUES + ('backfill_child_b', 'backfill_parent_a', 0), + ('backfill_child_c', 'backfill_child_b', 0) +), +transaction_values (name, hash) AS ( + VALUES + ('backfill_parent_a', decode(repeat('e1', 32), 'hex')), + ('backfill_child_b', decode(repeat('e2', 32), 'hex')), + ('backfill_child_c', decode(repeat('e3', 32), 'hex')) +), +named_transactions AS ( + SELECT transaction_values.name, transaction.internal_id, transaction.hash + FROM transaction + JOIN transaction_values + ON transaction_values.hash = transaction.hash +) +INSERT INTO input (transaction_internal_id, input_index, outpoint_index, sequence_number, outpoint_transaction_hash, unlocking_bytecode) + SELECT child.internal_id, input_values.input_index, 0, 0, parent.hash, '\\x51'::bytea + FROM input_values + JOIN named_transactions child + ON child.name = input_values.child_name + JOIN named_transactions parent + ON parent.name = input_values.parent_name; +`); + await client.query(/* sql */ ` +WITH selected_nodes AS ( + SELECT name, internal_id + FROM node + WHERE name = 'node1' +), +transaction_values (name, hash) AS ( + VALUES + ('backfill_parent_a', decode(repeat('e1', 32), 'hex')) +), +named_transactions AS ( + SELECT transaction_values.name, transaction.internal_id + FROM transaction + JOIN transaction_values + ON transaction_values.hash = transaction.hash +) +INSERT INTO node_transaction_history (node_internal_id, transaction_internal_id, validated_at, replaced_at) + SELECT selected_nodes.internal_id, + named_transactions.internal_id, + timestamp '2026-01-01 00:00:00', + timestamp '2026-01-01 00:10:00' + FROM selected_nodes + CROSS JOIN named_transactions; +`); + await client.query(/* sql */ ` +WITH selected_nodes AS ( + SELECT name, internal_id + FROM node + WHERE name = 'node1' +), +transaction_values (name, hash) AS ( + VALUES + ('backfill_child_b', decode(repeat('e2', 32), 'hex')), + ('backfill_child_c', decode(repeat('e3', 32), 'hex')) +), +named_transactions AS ( + SELECT transaction_values.name, transaction.internal_id + FROM transaction + JOIN transaction_values + ON transaction_values.hash = transaction.hash +) +INSERT INTO node_transaction (node_internal_id, transaction_internal_id, validated_at) + SELECT selected_nodes.internal_id, named_transactions.internal_id, timestamp '2026-01-01 00:00:00' + FROM selected_nodes + CROSS JOIN named_transactions; +`); + await client.query(backfillMigration); + await client.query(backfillMigration); + + const remainingMempool = ( + await client.query<{ + transactionName: string; + }>(/* sql */ ` +WITH transaction_values (name, hash) AS ( + VALUES + ('backfill_child_b', decode(repeat('e2', 32), 'hex')), + ('backfill_child_c', decode(repeat('e3', 32), 'hex')) +) +SELECT transaction_values.name AS "transactionName" + FROM node_transaction + JOIN node + ON node.internal_id = node_transaction.node_internal_id + JOIN transaction + ON transaction.internal_id = node_transaction.transaction_internal_id + JOIN transaction_values + ON transaction_values.hash = transaction.hash + WHERE node.name = 'node1' + ORDER BY "transactionName"; +`) + ).rows; + t.deepEqual(remainingMempool, []); + + const archivedTransactions = ( + await client.query<{ + historyRowCount: number; + replacedAt: string; + transactionName: string; + }>(/* sql */ ` +WITH transaction_values (name, hash) AS ( + VALUES + ('backfill_parent_a', decode(repeat('e1', 32), 'hex')), + ('backfill_child_b', decode(repeat('e2', 32), 'hex')), + ('backfill_child_c', decode(repeat('e3', 32), 'hex')) +) +SELECT transaction_values.name AS "transactionName", + COUNT(*)::integer AS "historyRowCount", + MIN(node_transaction_history.replaced_at)::text AS "replacedAt" + FROM node_transaction_history + JOIN node + ON node.internal_id = node_transaction_history.node_internal_id + JOIN transaction + ON transaction.internal_id = node_transaction_history.transaction_internal_id + JOIN transaction_values + ON transaction_values.hash = transaction.hash + WHERE node.name = 'node1' + GROUP BY transaction_values.name + ORDER BY "transactionName"; +`) + ).rows; + t.deepEqual(archivedTransactions, [ + { + historyRowCount: 1, + replacedAt: '2026-01-01 00:10:00', + transactionName: 'backfill_child_b', + }, + { + historyRowCount: 1, + replacedAt: '2026-01-01 00:10:00', + transactionName: 'backfill_child_c', + }, + { + historyRowCount: 1, + replacedAt: '2026-01-01 00:10:00', + transactionName: 'backfill_parent_a', + }, + ]); + } finally { + await client.query(/* sql */ `ROLLBACK;`); + } + } +); + test.serial( '[e2e] after initial sync is complete, requests transactions as they are announced', async (t) => { @@ -654,6 +1728,45 @@ test.serial( } ); +test.serial( + '[e2e] records validation when another node announces a known transaction', + async (t) => { + peers.node2.sendMessage( + new peers.node2.messages.Transaction(new Transaction(halTxRaw)) + ); + const delay = 1000; + await sleep(delay); + const validations = await client.query<{ name: string }>( + /* sql */ ` + SELECT node.name + FROM node_transaction + INNER JOIN node + ON node.internal_id = node_transaction.node_internal_id + INNER JOIN transaction + ON transaction.internal_id = node_transaction.transaction_internal_id + WHERE transaction.hash = $1 + ORDER BY node.name ASC; + `, + [hexToBin(halTxHash)] + ); + t.deepEqual( + validations.rows.map(({ name }) => name), + ['node1', 'node2'] + ); + await client.query( + /* sql */ ` + DELETE FROM node_transaction + USING node, transaction + WHERE node_transaction.node_internal_id = node.internal_id + AND node_transaction.transaction_internal_id = transaction.internal_id + AND node.name = 'node2' + AND transaction.hash = $1; + `, + [hexToBin(halTxHash)] + ); + } +); + test.serial('[e2e] handles first chipnet CashTokens transaction', async (t) => { peers.node1.sendMessage( new peers.node1.messages.Transaction(new Transaction(chipnetCashTokensTx)) @@ -733,6 +1846,82 @@ test.serial( } ); +test.serial( + '[e2e] [postgres] value aggregates handle coinbase-only blocks', + async (t) => { + const aggregates = ( + await client.query<{ + feeSatoshis: string; + generatedValueSatoshis: string; + inputValueSatoshis: string; + outputValueSatoshis: string; + }>(/* sql */ ` + SELECT + block_fee_satoshis(block)::text AS "feeSatoshis", + block_generated_value_satoshis(block)::text AS "generatedValueSatoshis", + block_input_value_satoshis(block)::text AS "inputValueSatoshis", + block_output_value_satoshis(block)::text AS "outputValueSatoshis" + FROM block WHERE height = 0; + `) + ).rows[0]!; + t.deepEqual(aggregates, { + feeSatoshis: '0', + generatedValueSatoshis: '5000000000', + inputValueSatoshis: '0', + outputValueSatoshis: '5000000000', + }); + } +); + +test.serial( + '[e2e] get hex-encoded block with multiple transactions', + async (t) => { + const blockWithMultipleTransactions = mockchainBeforeFork[1]!; + t.true(blockWithMultipleTransactions.transactions.length > 1); + /* eslint-disable @typescript-eslint/naming-convention */ + const encodedHex = ( + await client.query<{ block_encoded_hex: string }>( + /* sql */ `SELECT block_encoded_hex(block) FROM block WHERE hash = $1::bytea;`, + [hexToBin(blockWithMultipleTransactions.header.hash)] + ) + ).rows[0]!.block_encoded_hex; + /* eslint-enable @typescript-eslint/naming-convention */ + t.deepEqual(encodedHex, binToHex(blockWithMultipleTransactions.toBuffer())); + } +); + +test.serial( + '[e2e] [postgres] transaction_data_carrier_outputs ignores empty locking bytecode', + async (t) => { + const txHash = + '0000000000000000000000000000000000000000000000000000000000000075'; + await client.query( + /* sql */ ` + INSERT INTO transaction (hash, version, locktime, size_bytes, is_coinbase) + VALUES ($1::bytea, 1, 0, 10, false); + `, + [hexToBin(txHash)] + ); + await client.query( + /* sql */ ` + INSERT INTO output (transaction_hash, output_index, value_satoshis, locking_bytecode) + VALUES ($1::bytea, 0, 1, $2::bytea); + `, + [hexToBin(txHash), hexToBin('')] + ); + const outputs = await client.query<{ outputIndex: string }>( + /* sql */ ` + SELECT output_index AS "outputIndex" + FROM transaction_data_carrier_outputs( + (SELECT transaction FROM transaction WHERE hash = $1::bytea) + ); + `, + [hexToBin(txHash)] + ); + t.deepEqual(outputs.rows, []); + } +); + const newBlocks = ( node: 'node1' | 'node2' | 'node3', blocks: BitcoreBlock[] @@ -1029,6 +2218,51 @@ test.serial('[e2e] shuts down with SIGINT', async (t) => { t.pass(); }); +const historicalRepairTipIndex = 161; +const historicalRepairTransactionIndex = 1; +const historicalRepairBlock = tipA[historicalRepairTipIndex]!; +const historicalRepairBlockHash = historicalRepairBlock.header.hash; + +test.serial( + '[e2e] prepares incomplete historical block transaction before restart', + async (t) => { + const transactionHash = + historicalRepairBlock.transactions[historicalRepairTransactionIndex]! + .hash; + const selectedTransaction = ( + await client.query<{ hash: string }>( + /* sql */ ` + SELECT encode(transaction.hash, 'hex') AS hash + FROM block_transaction + INNER JOIN block + ON block.internal_id = block_transaction.block_internal_id + INNER JOIN transaction + ON transaction.internal_id = + block_transaction.transaction_internal_id + WHERE block.hash = $1 + AND block_transaction.transaction_index = $2; + `, + [hexToBin(historicalRepairBlockHash), historicalRepairTransactionIndex] + ) + ).rows; + t.deepEqual(selectedTransaction, [{ hash: transactionHash }]); + await client.query( + /* sql */ ` + DELETE FROM block_transaction + USING block + WHERE block.internal_id = block_transaction.block_internal_id + AND block.hash = $1 + AND block_transaction.transaction_index = $2; + `, + [hexToBin(historicalRepairBlockHash), historicalRepairTransactionIndex] + ); + t.deepEqual( + await getBlockTransactionCount(historicalRepairBlockHash), + historicalRepairBlock.transactions.length - 1 + ); + } +); + test.serial( '[e2e] restores sync-state from database on restart (after initial sync)', async (t) => { @@ -1078,6 +2312,20 @@ test.serial('[e2e] catches up a new node via headers', async (t) => { t.pass(); }); +test.serial( + '[e2e] self-heals incomplete historical block transactions on startup', + async (t) => { + t.timeout(oneMinute); + t.deepEqual( + await waitForBlockTransactionCount( + historicalRepairBlockHash, + historicalRepairBlock.transactions.length + ), + historicalRepairBlock.transactions.length + ); + } +); + test.serial( '[e2e] handles empty headers messages (fully-synced)', async (t) => { @@ -1089,8 +2337,54 @@ test.serial( } ); +test.serial( + '[e2e] saves block transactions if previously announced tx is seen but not yet saved', + async (t) => { + const tipStartIndex = 162; + const [, tx1] = tipA[tipStartIndex]!.transactions; + mempool[swapEndianness(tx1!.hash)] = false; + const node1RequestedTx = new Promise((res) => { + node1.once('peergetdata', (_, message) => { + res(message.inventory); + }); + }); + peers.node1.sendMessage( + peers.node1.messages.Inventory.forTransaction( + Buffer.from(tx1!.hash, 'hex') + ) + ); + await node1RequestedTx; + logger.debug( + `node1: announced tipA[${tipStartIndex}] transaction 1 without providing the transaction: ${ + tx1!.hash + }` + ); + newBlocks('node1', [tipA[tipStartIndex]!]); + newBlocks('node2', [tipB[tipStartIndex]!]); + newBlocks('node3', [tipA[tipStartIndex]!]); + await waitForStdout(/Saved new block – height:\s+3163[^\n]+nodes: node2/u); + await waitForStdout( + /Saved new block – height:\s+3163[^\n]+nodes: node1, node4/u + ); + const blockTransactionCount = ( + await client.query<{ count: string }>( + /* sql */ ` + SELECT COUNT(*) FROM block_transaction + INNER JOIN block ON block.internal_id = block_transaction.block_internal_id + WHERE block.hash = $1; + `, + [hexToBin(tipA[tipStartIndex]!.header.hash)] + ) + ).rows[0]!.count; + t.deepEqual( + blockTransactionCount, + tipA[tipStartIndex]!.transactions.length.toString() + ); + } +); + test.serial('[e2e] syncs remaining blocks one-by-one', async (t) => { - const tipStartIndex = 162; + const tipStartIndex = 163; slowFeedBlocks('node1', tipA.slice(tipStartIndex), 1); slowFeedBlocks('node2', tipB.slice(tipStartIndex), 1); /** @@ -1360,6 +2654,18 @@ const bytecodeFunction = test.macro<[string, string, string]>({ `[e2e] [postgres] ${functionName} – ${patternHex}: ${providedTitle ?? ''}`, }); +const bytecodeFunctionReturnsNull = test.macro<[string, string]>({ + exec: async (t, functionName, bytecodeHex) => { + const result = await client.query<{ isNull: boolean }>( + /* sql */ `SELECT ${functionName} ($1) IS NULL AS "isNull";`, + [hexToBin(bytecodeHex)] + ); + t.true(result.rows[0]!.isNull); + }, + title: (providedTitle, functionName, bytecodeHex) => + `[e2e] [postgres] ${functionName} – ${bytecodeHex}: ${providedTitle ?? ''}`, +}); + test( 'P2PKH', bytecodeFunction, @@ -1470,6 +2776,28 @@ test( '6a02094c5c' ); +test( + 'zero-length OP_PUSHDATA1', + bytecodeFunction, + 'parse_bytecode_pattern_with_pushdata_lengths', + '4c00', + '4c00' +); +test( + 'zero-length OP_PUSHDATA2', + bytecodeFunction, + 'parse_bytecode_pattern_with_pushdata_lengths', + '4d0000', + '4d0000' +); +test( + 'zero-length OP_PUSHDATA4', + bytecodeFunction, + 'parse_bytecode_pattern_with_pushdata_lengths', + '4e00000000', + '4e00000000' +); + test( 'OP_RETURN with OP_PUSHDATA2', bytecodeFunction, @@ -1521,10 +2849,9 @@ test( test( 'no redeem', - bytecodeFunction, + bytecodeFunctionReturnsNull, 'parse_bytecode_pattern_redeem', - `0002000051`, - '' + `0002000051` ); test( @@ -1584,6 +2911,31 @@ test( '004e515253' ); +test( + 'malformed OP_PUSHDATA1 redeem', + bytecodeFunctionReturnsNull, + 'parse_bytecode_pattern_redeem', + '4c' +); +test( + 'malformed OP_PUSHDATA2 redeem', + bytecodeFunctionReturnsNull, + 'parse_bytecode_pattern_redeem', + '4d11' +); +test( + 'malformed OP_PUSHDATA4 redeem', + bytecodeFunctionReturnsNull, + 'parse_bytecode_pattern_redeem', + '4e112233' +); +test( + 'oversized OP_PUSHDATA4 redeem', + bytecodeFunctionReturnsNull, + 'parse_bytecode_pattern_redeem', + '4effffffff' +); + test('[e2e] [postgres] encode_uint16le', async (t) => { const query = async (encoded: number) => (