-
Notifications
You must be signed in to change notification settings - Fork 247
feat: add background P2P init retries to SyncService #3002
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,6 +61,10 @@ type SyncService[H header.Header[H]] struct { | |
| getterByHeight GetterByHeightFunc[H] | ||
| rangeGetter RangeGetterFunc[H] | ||
| storeInitialized atomic.Bool | ||
|
|
||
| // context for background operations | ||
| bgCtx context.Context | ||
| bgCancel context.CancelFunc | ||
| } | ||
|
|
||
| // DataSyncService is the P2P Sync Service for blocks. | ||
|
|
@@ -153,6 +157,8 @@ func newSyncService[H header.Header[H]]( | |
| return nil, fmt.Errorf("failed to initialize the %s store: %w", syncType, err) | ||
| } | ||
|
|
||
| bgCtx, bgCancel := context.WithCancel(context.Background()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this can be init in start instead with the parent context. You can set a noop for |
||
|
|
||
| svc := &SyncService[H]{ | ||
| conf: conf, | ||
| genesis: genesis, | ||
|
|
@@ -164,6 +170,8 @@ func newSyncService[H header.Header[H]]( | |
| syncType: syncType, | ||
| logger: logger, | ||
| syncerStatus: new(SyncerStatus), | ||
| bgCtx: bgCtx, | ||
| bgCancel: bgCancel, | ||
| } | ||
|
|
||
| return svc, nil | ||
|
|
@@ -389,6 +397,42 @@ func (syncService *SyncService[H]) startSubscriber(ctx context.Context) error { | |
| return nil | ||
| } | ||
|
|
||
| // tryInit attempts to initialize the syncer from P2P once. | ||
| // Returns true if successful, false otherwise with an error. | ||
| func (syncService *SyncService[H]) tryInit(ctx context.Context) (bool, error) { | ||
| var ( | ||
| trusted H | ||
| err error | ||
| heightToQuery uint64 | ||
| ) | ||
|
|
||
| head, headErr := syncService.store.Head(ctx) | ||
| switch { | ||
| case errors.Is(headErr, header.ErrNotFound), errors.Is(headErr, header.ErrEmptyStore): | ||
| heightToQuery = syncService.genesis.InitialHeight | ||
| case headErr != nil: | ||
| return false, fmt.Errorf("failed to inspect local store head: %w", headErr) | ||
| default: | ||
| heightToQuery = head.Height() | ||
| } | ||
|
|
||
| if trusted, err = syncService.ex.GetByHeight(ctx, heightToQuery); err != nil { | ||
| return false, fmt.Errorf("failed to fetch height %d from peers: %w", heightToQuery, err) | ||
| } | ||
|
|
||
| if syncService.storeInitialized.CompareAndSwap(false, true) { | ||
| if _, err := syncService.initStore(ctx, trusted); err != nil { | ||
| syncService.storeInitialized.Store(false) | ||
| return false, fmt.Errorf("failed to initialize the store: %w", err) | ||
| } | ||
| } | ||
| if err := syncService.startSyncer(ctx); err != nil { | ||
| return false, err | ||
| } | ||
|
|
||
| return true, nil | ||
| } | ||
|
|
||
| // initFromP2PWithRetry initializes the syncer from P2P with a retry mechanism. | ||
| // It inspects the local store to determine the first height to request: | ||
| // - when the store already contains items, it reuses the latest height as the starting point; | ||
|
|
@@ -398,48 +442,15 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee | |
| return nil | ||
| } | ||
|
|
||
| tryInit := func(ctx context.Context) (bool, error) { | ||
| var ( | ||
| trusted H | ||
| err error | ||
| heightToQuery uint64 | ||
| ) | ||
|
|
||
| head, headErr := syncService.store.Head(ctx) | ||
| switch { | ||
| case errors.Is(headErr, header.ErrNotFound), errors.Is(headErr, header.ErrEmptyStore): | ||
| heightToQuery = syncService.genesis.InitialHeight | ||
| case headErr != nil: | ||
| return false, fmt.Errorf("failed to inspect local store head: %w", headErr) | ||
| default: | ||
| heightToQuery = head.Height() | ||
| } | ||
|
|
||
| if trusted, err = syncService.ex.GetByHeight(ctx, heightToQuery); err != nil { | ||
| return false, fmt.Errorf("failed to fetch height %d from peers: %w", heightToQuery, err) | ||
| } | ||
|
|
||
| if syncService.storeInitialized.CompareAndSwap(false, true) { | ||
| if _, err := syncService.initStore(ctx, trusted); err != nil { | ||
| syncService.storeInitialized.Store(false) | ||
| return false, fmt.Errorf("failed to initialize the store: %w", err) | ||
| } | ||
| } | ||
| if err := syncService.startSyncer(ctx); err != nil { | ||
| return false, err | ||
| } | ||
| return true, nil | ||
| } | ||
|
|
||
| // block with exponential backoff until initialization succeeds or context is canceled. | ||
| backoff := 1 * time.Second | ||
| maxBackoff := 10 * time.Second | ||
|
|
||
| timeoutTimer := time.NewTimer(time.Minute * 10) | ||
| timeoutTimer := time.NewTimer(time.Minute * 2) | ||
| defer timeoutTimer.Stop() | ||
|
|
||
| for { | ||
| ok, err := tryInit(ctx) | ||
| ok, err := syncService.tryInit(ctx) | ||
| if ok { | ||
| return nil | ||
| } | ||
|
|
@@ -450,7 +461,9 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee | |
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| case <-timeoutTimer.C: | ||
| return fmt.Errorf("timeout reached while trying to initialize the store after 10 minutes: %w", err) | ||
| syncService.logger.Warn().Err(err).Msg("timeout reached while trying to initialize the store, scheduling background retry") | ||
| go syncService.retryInitInBackground() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense to try async init directly? Why wait 2 min? |
||
| return nil | ||
| case <-time.After(backoff): | ||
| } | ||
|
|
||
|
|
@@ -461,10 +474,40 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee | |
| } | ||
| } | ||
|
|
||
| // retryInitInBackground continues attempting to initialize the syncer in the background. | ||
| func (syncService *SyncService[H]) retryInitInBackground() { | ||
| backoff := 15 * time.Second | ||
| maxBackoff := 5 * time.Minute | ||
|
|
||
| for { | ||
| select { | ||
| case <-syncService.bgCtx.Done(): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here and others: pass context via param instead. That would be more consistent to other methods and allows support for tracing and parent context cancellation (on sigTerm for example). |
||
| syncService.logger.Info().Msg("background retry cancelled") | ||
| return | ||
| case <-time.After(backoff): | ||
| } | ||
|
|
||
| ok, err := syncService.tryInit(syncService.bgCtx) | ||
| if ok { | ||
| syncService.logger.Info().Msg("successfully initialized store from P2P in background") | ||
| return | ||
| } | ||
|
|
||
| syncService.logger.Info().Err(err).Dur("retry_in", backoff).Msg("background retry: headers not yet available from peers") | ||
|
|
||
| backoff *= 2 | ||
| if backoff > maxBackoff { | ||
| backoff = maxBackoff | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Stop is a part of Service interface. | ||
| // | ||
| // `store` is closed last because it's used by other services. | ||
| func (syncService *SyncService[H]) Stop(ctx context.Context) error { | ||
| syncService.bgCancel() | ||
|
|
||
| // unsubscribe from topic first so that sub.Stop() does not fail | ||
| syncService.topicSubscription.Cancel() | ||
| err := errors.Join( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be great to get rid of this field.