Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/ci3.sh
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ function main {
fi
check_cache
echo_header "Run ${CI_MODE} CI"
exec ./ci.sh "${CI_MODE}" "$@"
# Temporary: grind p2p tests instead of normal CI
exec ./ci.sh grind-p2p "$@"
}

main "$@"
25 changes: 25 additions & 0 deletions bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,31 @@ case "$cmd" in

grind_test "$full_cmd" "$timeout" "$jobs_pct" "$memsuspend_pct" "$commit"
;;
"ci-grind-p2p")
# Temporary: grind p2p tests to check for flakes after libp2p upgrade.
export CI=1
export USE_TEST_CACHE=0
prep
echo_header "build yarn-project for p2p grind"
make yarn-project

integration_timeout="${1:-30m}"
e2e_timeout="${2:-60m}"
hash="grind"
failed=0

# P2P e2e tests (lower parallelism — each test spins up 7 full Aztec nodes)
e2e_tests=(
"e2e_p2p/preferred_gossip_network.test.ts"
)
for test in "${e2e_tests[@]}"; do
echo_header "Grinding: $test"
full_cmd="${hash}:ISOLATE=1:MAKEFILE_TARGET=yarn-project:NAME=${test} LOG_LEVEL=\"verbose; debug:p2p\" yarn-project/end-to-end/scripts/run_test.sh simple src/${test}"
grind_test "$full_cmd" "$e2e_timeout" 10 || { echo "FAILED: $test"; failed=1; }
done

exit $failed
;;

##########################################
# NETWORK DEPLOYMENTS WITH BENCHES/TESTS #
Expand Down
9 changes: 9 additions & 0 deletions ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,15 @@ case "$cmd" in
export CPUS=${CPUS:-192}
bootstrap_ec2 "./bootstrap.sh ci-grind-test $(printf %q "$full_cmd") $timeout $jobs_pct $memsuspend_pct $commit" | DUP=1 cache_log "Grind test CI run" $RUN_ID
;;
grind-p2p)
# Temporary: grind p2p integration and e2e tests on a single EC2 instance.
export CI_DASHBOARD="deflake"
export JOB_ID="grind-p2p"
export INSTANCE_POSTFIX="grind-p2p"
export CPUS=${CPUS:-192}
export AWS_SHUTDOWN_TIME=${AWS_SHUTDOWN_TIME:-260}
bootstrap_ec2 "./bootstrap.sh ci-grind-p2p" | DUP=1 cache_log "Grind p2p tests" $RUN_ID
;;
##########################################
# NETWORK DEPLOYMENTS WITH BENCHES/TESTS #
##########################################
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ function test_cmds {
elif [[ "$test" =~ rollup_ivc_integration || "$test" =~ avm_integration ]]; then
cmd_env+=" LOG_LEVEL=debug BB_VERBOSE=1 "
elif [[ "$test" =~ e2e_p2p ]]; then
cmd_env+=" LOG_LEVEL='verbose; debug:p2p'"
cmd_env+=" LOG_LEVEL=\"verbose; debug:p2p\""
fi

# Enable real proofs in prover-client integration tests only on CI full.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Signature } from '@aztec/foundation/eth-signature';
import { retryUntil } from '@aztec/foundation/retry';
import { ENR, type P2PClient, type P2PService, type PeerId } from '@aztec/p2p';
import type { SequencerClient } from '@aztec/sequencer-client';
import { CheckpointAttestation, ConsensusPayload } from '@aztec/stdlib/p2p';
import { CheckpointAttestation, ConsensusPayload, TopicType } from '@aztec/stdlib/p2p';

import { jest } from '@jest/globals';
import fs from 'fs';
Expand Down Expand Up @@ -118,6 +118,19 @@ describe('e2e_p2p_preferred_network', () => {
p2pService.processCheckpointAttestationFromPeer = handleGossipedAttestationSpy;
};

/** Log gossipsub diagnostics for a node to help diagnose message relay issues. */
const logGossipDiagnostics = (node: AztecNodeService, label: string) => {
const p2pService = (node.getP2P() as any).p2pService as P2PService;
const diag = (p2pService as any).getGossipSubDiagnostics(TopicType.tx);
t.logger.info(`GossipSub diagnostics for ${label} (tx topic)`, {
meshPeers: diag.meshPeers.length,
topicPeers: diag.topicPeers.length,
directPeers: diag.directPeers,
allPeers: diag.allPeers.length,
backoffPeers: diag.backoffPeers.length,
});
};

const mockFailedAuthHandler = (node: AztecNodeService) => {
const p2pService = (node.getP2P() as any).p2pService as P2PService;
const peerManager = (p2pService as any).peerManager;
Expand Down Expand Up @@ -332,6 +345,14 @@ describe('e2e_p2p_preferred_network', () => {
// We need to `createNodes` before we setup account, because
// those nodes actually form the committee, and so we cannot build
// blocks without them (since targetCommitteeSize is set to the number of nodes)

// Log gossipsub diagnostics to diagnose potential relay issues
t.logger.info('=== GossipSub diagnostics before setupAccount ===');
preferredNodes.forEach((node, i) => logGossipDiagnostics(node, `Preferred Node ${i + 1}`));
validators.forEach((node, i) => logGossipDiagnostics(node, `Validator ${i + 1}`));
noDiscoveryValidators.forEach((node, i) => logGossipDiagnostics(node, `Picky Validator ${i + 1}`));
logGossipDiagnostics(t.ctx.aztecNodeService, 'Default Node');

await t.setupAccount();

// Send the required number of transactions to each node
Expand All @@ -342,12 +363,17 @@ describe('e2e_p2p_preferred_network', () => {
}

t.logger.info('Waiting for transactions to be mined');
// Use a longer timeout (6 slots instead of 3) because the picky validator relies on
// IHAVE/IWANT gossip relay through preferred nodes, adding latency. When the picky
// validator is proposer for consecutive slots, the default 3-slot timeout may not
// be enough for the tx to propagate and get included.
const extendedTxTimeout = WAIT_FOR_TX_TIMEOUT * 2;
// now ensure that all txs were successfully mined
const receipts = await Promise.all(
txsSentViaDifferentNodes.flatMap((txs, i) =>
txs.map((txHash, j) => {
t.logger.info(`Waiting for tx ${i}-${j}: ${txHash.toString()} to be mined`);
return waitForTx(nodes[0], txHash, { timeout: WAIT_FOR_TX_TIMEOUT });
return waitForTx(nodes[0], txHash, { timeout: extendedTxTimeout });
}),
),
);
Expand Down
34 changes: 33 additions & 1 deletion yarn-project/p2p/src/services/libp2p/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,38 @@ export class LibP2PService extends WithTracer implements P2PService {
return this.node.services.pubsub.getMeshPeers(this.topicStrings[topicType]).length;
}

public getGossipSubDiagnostics(topicType: TopicType): {
meshPeers: string[];
topicPeers: string[];
allPeers: string[];
subscriptions: string[];
directPeers: string[];
peerScores: Record<string, number>;
hasOutboundStream: Record<string, boolean>;
backoffPeers: string[];
} {
const pubsub = this.node.services.pubsub as any;
const topicStr = this.topicStrings[topicType];
const allPeerIds = Array.from(pubsub.peers?.keys() ?? []) as string[];
const peerScores: Record<string, number> = {};
const hasOutboundStream: Record<string, boolean> = {};
for (const id of allPeerIds) {
peerScores[id] = pubsub.score?.score(id) ?? 0;
hasOutboundStream[id] = pubsub.streamsOutbound?.has(id) ?? false;
}
const backoff = pubsub.backoff?.get(topicStr);
return {
meshPeers: pubsub.getMeshPeers(topicStr),
topicPeers: Array.from(pubsub.topics?.get(topicStr) ?? []),
allPeers: allPeerIds,
subscriptions: Array.from(pubsub.subscriptions ?? []),
directPeers: Array.from(pubsub.direct ?? []),
peerScores,
hasOutboundStream,
backoffPeers: backoff ? Array.from(backoff.keys()) : [],
};
}

private handleGossipSubEvent(e: CustomEvent<GossipsubMessage>) {
this.logger.trace(`Received PUBSUB message.`);

Expand Down Expand Up @@ -1052,7 +1084,7 @@ export class LibP2PService extends WithTracer implements P2PService {
// Tx was accepted into pool and will be propagated - just log and record metrics
const txHash = tx.getTxHash();
const txHashString = txHash.toString();
this.logger.verbose(`Received tx ${txHashString} from external peer ${source.toString()} via gossip`, {
this.logger.info(`Received tx ${txHashString} from peer ${source.toString()} via gossip`, {
source: source.toString(),
txHash: txHashString,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,9 +627,9 @@ export class CheckpointProposalJob implements Traceable {
tx => !txHashesAlreadyIncluded.has(tx.txHash.toString()),
);

this.log.debug(
`Building block ${blockNumber} at index ${indexWithinCheckpoint} for slot ${this.targetSlot} with ${availableTxs} available txs`,
{ slot: this.targetSlot, blockNumber, indexWithinCheckpoint },
this.log.info(
`Building block ${blockNumber} at index ${indexWithinCheckpoint} for slot ${this.targetSlot} with ${availableTxs} available txs (min: ${minTxs})`,
{ slot: this.targetSlot, blockNumber, indexWithinCheckpoint, availableTxs, minTxs },
);
this.setStateFn(SequencerState.CREATING_BLOCK, this.targetSlot);

Expand Down
8 changes: 4 additions & 4 deletions yarn-project/sequencer-client/src/sequencer/sequencer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -604,15 +604,15 @@ export class Sequencer extends (EventEmitter as new () => TypedEventEmitter<Sequ
const weAreProposer = validatorAddresses.some(addr => addr.equals(proposer));

if (!weAreProposer) {
this.log.debug(`Cannot propose at target slot ${targetSlot} since we are not a proposer`, {
this.log.info(`Cannot propose at target slot ${targetSlot} since we are not a proposer`, {
targetSlot,
validatorAddresses,
proposer,
validatorAddresses: validatorAddresses.map(a => a.toString()),
proposer: proposer.toString(),
});
return [false, proposer];
}

this.log.debug(`We are the proposer for target slot ${targetSlot}`, { targetSlot, proposer });
this.log.info(`We are the proposer for target slot ${targetSlot}`, { targetSlot, proposer: proposer.toString() });
return [true, proposer];
}

Expand Down
Loading