-
Notifications
You must be signed in to change notification settings - Fork 46
fix(pdp): detect contract reverts and exclude terminated datasets from scheduling #869
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| -- PDP Termination Handling | ||
| -- Tracks terminated datasets and applies backoff for contract reverts. | ||
|
|
||
| -- Add termination and failure tracking columns to pdp_data_sets | ||
| ALTER TABLE pdp_data_sets ADD COLUMN IF NOT EXISTS terminated_at_epoch BIGINT; | ||
| ALTER TABLE pdp_data_sets ADD COLUMN IF NOT EXISTS consecutive_prove_failures INT NOT NULL DEFAULT 0; | ||
| ALTER TABLE pdp_data_sets ADD COLUMN IF NOT EXISTS next_prove_attempt_at BIGINT; | ||
|
|
||
| COMMENT ON COLUMN pdp_data_sets.terminated_at_epoch IS 'Block height at which dataset termination was detected; NULL if active'; | ||
| COMMENT ON COLUMN pdp_data_sets.consecutive_prove_failures IS 'Number of consecutive proving failures (resets on success)'; | ||
| COMMENT ON COLUMN pdp_data_sets.next_prove_attempt_at IS 'Block height before which proving should not be attempted (backoff)'; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| package pdp | ||
|
|
||
| import ( | ||
| "strings" | ||
| ) | ||
|
|
||
| // Known error selectors indicating permanent dataset termination. | ||
| // These are the first 4 bytes of keccak256(error signature) from PDPListener | ||
| // callbacks. When PDPVerifier calls a listener and it reverts with one of | ||
| // these, the dataset should never be retried. | ||
| // | ||
| // Unrecognized reverts fall through to Tier 2 (backoff), so this list only | ||
| // needs selectors where we want immediate termination rather than retry. | ||
| // Additional PDPListener implementations can add their selectors here. | ||
| const ( | ||
| // DataSetPaymentBeyondEndEpoch(uint256,uint256,uint256) | ||
| // The dataset's payment period has ended. | ||
| ErrSelectorDataSetPaymentBeyondEndEpoch = "d7c45de5" | ||
|
|
||
| // DataSetPaymentAlreadyTerminated(uint256) | ||
| // The dataset was explicitly terminated. | ||
| ErrSelectorDataSetPaymentAlreadyTerminated = "e3f8fa35" | ||
| ) | ||
|
|
||
| // IsTerminationError returns true if the error contains a known termination | ||
| // selector. These errors indicate the dataset is permanently terminated | ||
| // on-chain and proving should stop immediately. | ||
| func IsTerminationError(err error) bool { | ||
| if err == nil { | ||
| return false | ||
| } | ||
| errStr := strings.ToLower(err.Error()) | ||
| return strings.Contains(errStr, ErrSelectorDataSetPaymentBeyondEndEpoch) || | ||
| strings.Contains(errStr, ErrSelectorDataSetPaymentAlreadyTerminated) | ||
| } | ||
|
|
||
| // IsContractRevert returns true if the error indicates a contract revert. | ||
| // Contract reverts mean the on-chain state is rejecting the call - retrying | ||
| // immediately is pointless. This includes gas estimation failures due to | ||
| // reverts, which is how most failures manifest. | ||
| func IsContractRevert(err error) bool { | ||
| if err == nil { | ||
| return false | ||
| } | ||
| errStr := strings.ToLower(err.Error()) | ||
|
|
||
| // Common patterns indicating contract reverts | ||
| return strings.Contains(errStr, "execution reverted") || | ||
| strings.Contains(errStr, "vm execution error") || | ||
| strings.Contains(errStr, "revert reason") || | ||
| strings.Contains(errStr, "retcode=33") || // EVM revert exit code | ||
| strings.Contains(errStr, "(exit=[33]") || // Filecoin EVM revert format | ||
| strings.Contains(errStr, "contract reverted") | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,124 @@ | ||
| package pdp | ||
|
|
||
| import ( | ||
| "errors" | ||
| "testing" | ||
| ) | ||
|
|
||
| func TestIsTerminationError(t *testing.T) { | ||
| tests := []struct { | ||
| name string | ||
| err error | ||
| expected bool | ||
| }{ | ||
| { | ||
| name: "nil error", | ||
| err: nil, | ||
| expected: false, | ||
| }, | ||
| { | ||
| name: "DataSetPaymentBeyondEndEpoch by selector", | ||
| err: errors.New("failed to estimate gas: execution reverted: 0xd7c45de5000000000000"), | ||
| expected: true, | ||
| }, | ||
| { | ||
| name: "DataSetPaymentAlreadyTerminated by selector", | ||
| err: errors.New("execution reverted: 0xe3f8fa35"), | ||
| expected: true, | ||
| }, | ||
| { | ||
| name: "unrelated error", | ||
| err: errors.New("network timeout"), | ||
| expected: false, | ||
| }, | ||
| { | ||
| name: "contract revert is not termination", | ||
| err: errors.New("execution reverted: 0x96ed3e73"), | ||
| expected: false, | ||
| }, | ||
| } | ||
|
|
||
| for _, tc := range tests { | ||
| t.Run(tc.name, func(t *testing.T) { | ||
| result := IsTerminationError(tc.err) | ||
| if result != tc.expected { | ||
| t.Errorf("IsTerminationError(%v) = %v, want %v", tc.err, result, tc.expected) | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func TestIsContractRevert(t *testing.T) { | ||
| tests := []struct { | ||
| name string | ||
| err error | ||
| expected bool | ||
| }{ | ||
| { | ||
| name: "nil error", | ||
| err: nil, | ||
| expected: false, | ||
| }, | ||
| { | ||
| name: "execution reverted", | ||
| err: errors.New("failed to estimate gas: execution reverted: 0x96ed3e73"), | ||
| expected: true, | ||
| }, | ||
| { | ||
| name: "vm execution error", | ||
| err: errors.New("vm execution error: something went wrong"), | ||
| expected: true, | ||
| }, | ||
| { | ||
| name: "filecoin evm exit code 33", | ||
| err: errors.New("message failed (exit=[33], revert reason=[...])"), | ||
| expected: true, | ||
| }, | ||
| { | ||
| name: "retcode 33", | ||
| err: errors.New("call failed with RetCode=33"), | ||
| expected: true, | ||
| }, | ||
| { | ||
| name: "network timeout is not revert", | ||
| err: errors.New("connection timeout"), | ||
| expected: false, | ||
| }, | ||
| { | ||
| name: "rpc error is not revert", | ||
| err: errors.New("rpc error: server unavailable"), | ||
| expected: false, | ||
| }, | ||
| } | ||
|
|
||
| for _, tc := range tests { | ||
| t.Run(tc.name, func(t *testing.T) { | ||
| result := IsContractRevert(tc.err) | ||
| if result != tc.expected { | ||
| t.Errorf("IsContractRevert(%v) = %v, want %v", tc.err, result, tc.expected) | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func TestCalculateBackoffBlocks(t *testing.T) { | ||
| tests := []struct { | ||
| failures int | ||
| expected int | ||
| }{ | ||
| {0, 0}, | ||
| {1, 100}, | ||
| {2, 200}, | ||
| {3, 400}, | ||
| {4, 800}, | ||
| {5, 1600}, | ||
| {10, MaxBackoffBlocks}, // capped | ||
| } | ||
|
|
||
| for _, tc := range tests { | ||
| result := CalculateBackoffBlocks(tc.failures) | ||
| if result != tc.expected { | ||
| t.Errorf("CalculateBackoffBlocks(%d) = %d, want %d", tc.failures, result, tc.expected) | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,156 @@ | ||
| package pdp | ||
|
|
||
| import ( | ||
| "context" | ||
|
|
||
| "golang.org/x/xerrors" | ||
|
|
||
| "github.com/filecoin-project/curio/harmony/harmonydb" | ||
| ) | ||
|
|
||
| const ( | ||
| // MaxConsecutiveFailures is the threshold for giving up on a dataset. | ||
| // Used by ApplyProvingBackoff: after this many consecutive contract reverts | ||
| // without a successful prove, the dataset is marked as terminated. | ||
| // This gives time for external resolution (e.g., client adds funds). | ||
| MaxConsecutiveFailures = 5 | ||
|
|
||
| // BaseBackoffBlocks is the initial delay after the first contract revert. | ||
| // Used by CalculateBackoffBlocks: subsequent failures double this value. | ||
| BaseBackoffBlocks = 100 | ||
|
|
||
| // MaxBackoffBlocks prevents unbounded exponential growth. | ||
| // Used by CalculateBackoffBlocks to cap the delay. In practice, | ||
| // MaxConsecutiveFailures is reached before this cap applies. | ||
| MaxBackoffBlocks = 28800 | ||
| ) | ||
|
|
||
| // CalculateBackoffBlocks computes exponential backoff: base * 2^(failures-1) | ||
| func CalculateBackoffBlocks(failures int) int { | ||
| if failures <= 0 { | ||
| return 0 | ||
| } | ||
| backoff := BaseBackoffBlocks << (failures - 1) | ||
| if backoff > MaxBackoffBlocks || backoff <= 0 { // check for overflow | ||
| return MaxBackoffBlocks | ||
| } | ||
| return backoff | ||
| } | ||
|
|
||
| // MarkDatasetTerminated marks a dataset as terminated, stopping all future proving attempts. | ||
| // This is called when a termination error (like DataSetPaymentBeyondEndEpoch) is detected. | ||
| func MarkDatasetTerminated(ctx context.Context, db *harmonydb.DB, dataSetId int64, currentHeight int64) error { | ||
| _, err := db.Exec(ctx, ` | ||
| UPDATE pdp_data_sets | ||
| SET terminated_at_epoch = $2, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One thing I want to fix is the problematic naming collision between this and service termination (which #871 is all about hooking this function up to) Service termination today begins when payer is in default. It involves terminating rails and entering the grace period but NOT halting proving operations until the moment we delete the dataset in order to not be penalized on the final period of fee collection by not submitting proofs. The concept here which I'm calling UnrecoverableProvingFailure should terminate service as well but has the additional property that we are fully done with all proving too. It's possible we'll want to update datasets to unrecoverableProvingFailure after dataset deletion but I think we'll probably just clean up all of the datasets state at that point anyway. |
||
| consecutive_prove_failures = consecutive_prove_failures + 1, | ||
| next_prove_attempt_at = NULL, | ||
| init_ready = FALSE, | ||
| prove_at_epoch = NULL, | ||
| challenge_request_msg_hash = NULL | ||
| WHERE id = $1 AND terminated_at_epoch IS NULL | ||
| `, dataSetId, currentHeight) | ||
| return err | ||
| } | ||
|
|
||
| // ApplyProvingBackoff increments the failure count and sets a backoff period. | ||
| // If too many failures occur, marks the dataset as terminated. | ||
| // Returns true if the dataset was marked as terminated. | ||
| func ApplyProvingBackoff(ctx context.Context, db *harmonydb.DB, dataSetId int64, currentHeight int64) (terminated bool, err error) { | ||
| // Get current failure count | ||
| var currentFailures int | ||
| err = db.QueryRow(ctx, ` | ||
| SELECT consecutive_prove_failures FROM pdp_data_sets WHERE id = $1 | ||
| `, dataSetId).Scan(¤tFailures) | ||
| if err != nil { | ||
| return false, xerrors.Errorf("failed to get failure count: %w", err) | ||
| } | ||
|
|
||
| newFailures := currentFailures + 1 | ||
|
|
||
| if newFailures >= MaxConsecutiveFailures { | ||
| // Too many failures, mark as terminated | ||
| _, err = db.Exec(ctx, ` | ||
| UPDATE pdp_data_sets | ||
| SET terminated_at_epoch = $2, | ||
| consecutive_prove_failures = $3, | ||
| next_prove_attempt_at = NULL, | ||
| init_ready = FALSE, | ||
| prove_at_epoch = NULL, | ||
| challenge_request_msg_hash = NULL | ||
| WHERE id = $1 AND terminated_at_epoch IS NULL | ||
| `, dataSetId, currentHeight, newFailures) | ||
| if err != nil { | ||
| return false, xerrors.Errorf("failed to mark as terminated: %w", err) | ||
| } | ||
| log.Warnw("Dataset marked as terminated due to repeated failures", | ||
| "dataSetId", dataSetId, "failures", newFailures) | ||
| return true, nil | ||
| } | ||
|
|
||
| // Apply exponential backoff | ||
| backoffBlocks := CalculateBackoffBlocks(newFailures) | ||
| nextAttempt := currentHeight + int64(backoffBlocks) | ||
|
|
||
| _, err = db.Exec(ctx, ` | ||
| UPDATE pdp_data_sets | ||
| SET consecutive_prove_failures = $2, | ||
| next_prove_attempt_at = $3 | ||
| WHERE id = $1 | ||
| `, dataSetId, newFailures, nextAttempt) | ||
| if err != nil { | ||
| return false, xerrors.Errorf("failed to apply backoff: %w", err) | ||
| } | ||
|
|
||
| log.Infow("Backoff applied for proving failure", | ||
| "dataSetId", dataSetId, "failures", newFailures, | ||
| "backoffBlocks", backoffBlocks, "nextAttemptAt", nextAttempt) | ||
| return false, nil | ||
| } | ||
|
|
||
| // ResetProvingFailures resets the failure count after a successful prove. | ||
| func ResetProvingFailures(ctx context.Context, db *harmonydb.DB, dataSetId int64) error { | ||
| _, err := db.Exec(ctx, ` | ||
| UPDATE pdp_data_sets | ||
| SET consecutive_prove_failures = 0, | ||
| next_prove_attempt_at = NULL | ||
| WHERE id = $1 | ||
| `, dataSetId) | ||
| return err | ||
| } | ||
|
|
||
| // HandleProvingSendError processes errors from sender.Send() calls in proving tasks. | ||
| // It implements three-tier error handling: | ||
| // - Tier 1: Known termination errors, mark terminated immediately | ||
| // - Tier 2: Other contract reverts, apply backoff, may terminate after repeated failures | ||
| // - Tier 3: Transient errors, return error for harmony retry | ||
| // | ||
| // Returns (done, err) where done=true means the task should complete (not retry), | ||
| // and err!=nil means harmony should retry the task. | ||
| func HandleProvingSendError(ctx context.Context, db *harmonydb.DB, dataSetId int64, currentHeight int64, sendErr error) (done bool, err error) { | ||
| // Tier 1: Known termination errors, mark terminated immediately | ||
| if IsTerminationError(sendErr) { | ||
| if markErr := MarkDatasetTerminated(ctx, db, dataSetId, currentHeight); markErr != nil { | ||
| log.Errorw("Failed to mark dataset as terminated", "error", markErr, "dataSetId", dataSetId) | ||
| } | ||
| log.Warnw("Dataset terminated, stopping proving attempts", | ||
| "dataSetId", dataSetId, "error", sendErr) | ||
| return true, nil | ||
| } | ||
|
|
||
| // Tier 2: Other contract reverts, apply backoff, may terminate after repeated failures | ||
| if IsContractRevert(sendErr) { | ||
| terminated, backoffErr := ApplyProvingBackoff(ctx, db, dataSetId, currentHeight) | ||
| if backoffErr != nil { | ||
| log.Errorw("Failed to apply backoff", "error", backoffErr, "dataSetId", dataSetId) | ||
| } | ||
| if terminated { | ||
| log.Warnw("Dataset terminated after repeated contract reverts", | ||
| "dataSetId", dataSetId, "error", sendErr) | ||
| } | ||
| return true, nil // Backoff applied; scheduler query prevents immediate re-scheduling | ||
| } | ||
|
|
||
| // Tier 3: Transient errors, let harmony retry | ||
| return false, sendErr | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If are terminating these dataSets, shouldn't we also clean them up? We can plug this into termination flow easily. It will try to terminate FWSS (if not already), terminate in PDPVerifier and then clean up the piece itself.