diff --git a/architecture/evm/cache_poller_pressure_test.go b/architecture/evm/cache_poller_pressure_test.go new file mode 100644 index 000000000..69a385fff --- /dev/null +++ b/architecture/evm/cache_poller_pressure_test.go @@ -0,0 +1,139 @@ +package evm + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/erpc/erpc/common" + "github.com/erpc/erpc/data" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLargeFinalizedGetLogsCacheGuards(t *testing.T) { + // 1 MiB avoids extra envelope copies; 4 MiB skips the heaviest finalized Postgres blob writes. + assert.True(t, isLargeFinalizedGetLogsPayload("eth_getLogs", common.DataFinalityStateFinalized, largeFinalizedGetLogsEnvelopeBypassBytes)) + assert.False(t, isLargeFinalizedGetLogsPayload("eth_call", common.DataFinalityStateFinalized, largeFinalizedGetLogsEnvelopeBypassBytes)) + assert.False(t, isLargeFinalizedGetLogsPayload("eth_getLogs", common.DataFinalityStateUnfinalized, largeFinalizedGetLogsEnvelopeBypassBytes)) + + assert.True(t, shouldSkipOversizedFinalizedGetLogsPostgresWrite( + "eth_getLogs", + common.DataFinalityStateFinalized, + largeFinalizedGetLogsPostgresSkipBytes, + &data.PostgreSQLConnector{}, + )) + assert.False(t, shouldSkipOversizedFinalizedGetLogsPostgresWrite( + "eth_getLogs", + common.DataFinalityStateFinalized, + largeFinalizedGetLogsPostgresSkipBytes-1, + &data.PostgreSQLConnector{}, + )) + assert.False(t, shouldSkipOversizedFinalizedGetLogsPostgresWrite( + "eth_getLogs", + common.DataFinalityStateFinalized, + largeFinalizedGetLogsPostgresSkipBytes, + &data.RedisConnector{}, + )) +} + +func TestEvmStatePoller_TriggerLatestPollAsync_DedupesInFlightWork(t *testing.T) { + var calls atomic.Int32 + started := make(chan int32, 2) + firstRelease := make(chan struct{}) + secondRelease := make(chan struct{}) + + poller := &EvmStatePoller{ + appCtx: context.Background(), + latestPollAsyncFn: func(ctx context.Context) (int64, error) { + call := calls.Add(1) + started <- call + if call == 1 { + <-firstRelease + } else { + <-secondRelease + } + return 12, nil + }, + } + + require.True(t, poller.TriggerLatestPollAsync(time.Second)) + require.False(t, poller.TriggerLatestPollAsync(time.Second)) + + select { + case call := <-started: + require.Equal(t, int32(1), call) + case <-time.After(time.Second): + t.Fatal("latest async poll did not start") + } + + close(firstRelease) + require.Eventually(t, func() bool { + return !poller.latestPollTriggerInFlight.Load() + }, time.Second, 10*time.Millisecond) + + require.True(t, poller.TriggerLatestPollAsync(time.Second)) + select { + case call := <-started: + require.Equal(t, int32(2), call) + case <-time.After(time.Second): + t.Fatal("second latest async poll did not start") + } + require.False(t, poller.TriggerLatestPollAsync(time.Second)) + assert.True(t, poller.latestPollTriggerInFlight.Load()) + close(secondRelease) + require.Eventually(t, func() bool { + return calls.Load() == 2 + }, time.Second, 10*time.Millisecond) +} + +func TestEvmStatePoller_TriggerFinalizedPollAsync_DedupesInFlightWork(t *testing.T) { + var calls atomic.Int32 + started := make(chan int32, 2) + firstRelease := make(chan struct{}) + secondRelease := make(chan struct{}) + + poller := &EvmStatePoller{ + appCtx: context.Background(), + finalizedPollAsyncFn: func(ctx context.Context) (int64, error) { + call := calls.Add(1) + started <- call + if call == 1 { + <-firstRelease + } else { + <-secondRelease + } + return 9, nil + }, + } + + require.True(t, poller.TriggerFinalizedPollAsync(time.Second)) + require.False(t, poller.TriggerFinalizedPollAsync(time.Second)) + + select { + case call := <-started: + require.Equal(t, int32(1), call) + case <-time.After(time.Second): + t.Fatal("finalized async poll did not start") + } + + close(firstRelease) + require.Eventually(t, func() bool { + return !poller.finalizedPollTriggerInFlight.Load() + }, time.Second, 10*time.Millisecond) + + require.True(t, poller.TriggerFinalizedPollAsync(time.Second)) + select { + case call := <-started: + require.Equal(t, int32(2), call) + case <-time.After(time.Second): + t.Fatal("second finalized async poll did not start") + } + require.False(t, poller.TriggerFinalizedPollAsync(time.Second)) + assert.True(t, poller.finalizedPollTriggerInFlight.Load()) + close(secondRelease) + require.Eventually(t, func() bool { + return calls.Load() == 2 + }, time.Second, 10*time.Millisecond) +} diff --git a/architecture/evm/eth_getLogs.go b/architecture/evm/eth_getLogs.go index 1763a7fef..866c81550 100644 --- a/architecture/evm/eth_getLogs.go +++ b/architecture/evm/eth_getLogs.go @@ -1,6 +1,7 @@ package evm import ( + "bytes" "context" "errors" "fmt" @@ -13,10 +14,122 @@ import ( "github.com/erpc/erpc/common" "github.com/erpc/erpc/telemetry" "github.com/erpc/erpc/util" + "github.com/rs/zerolog" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/singleflight" ) +var ( + getLogsSubRequestSemaphores sync.Map + getLogsSubRequestFlights singleflight.Group +) + +type getLogsSubRequestExecution struct { + jrr *common.JsonRpcResponse + holder *common.NormalizedResponse + fromCache bool + cacheAt int64 + serialized []byte +} + +func getLogsSubRequestRegistryKey(n common.Network) string { + if n == nil { + return "nil-network" + } + return fmt.Sprintf("%s/%s", n.ProjectId(), n.Id()) +} + +func getLogsSharedSubRequestSemaphore(n common.Network, fallbackConcurrency int) chan struct{} { + key := getLogsSubRequestRegistryKey(n) + if sem, ok := getLogsSubRequestSemaphores.Load(key); ok { + return sem.(chan struct{}) + } + + limit := fallbackConcurrency + if limit <= 0 { + limit = 10 + } + if cfg := n.Config(); cfg != nil && cfg.Evm != nil { + if cfg.Evm.GetLogsSplitConcurrency > limit { + limit = cfg.Evm.GetLogsSplitConcurrency + } + if cfg.Evm.GetLogsCacheChunkConcurrency > limit { + limit = cfg.Evm.GetLogsCacheChunkConcurrency + } + } + + sem := make(chan struct{}, limit) + actual, _ := getLogsSubRequestSemaphores.LoadOrStore(key, sem) + return actual.(chan struct{}) +} + +func shouldCoalesceGetLogsSubRequest(ctx context.Context, r *common.NormalizedRequest) bool { + if r == nil { + return false + } + if dr := r.Directives(); dr != nil && dr.UseUpstream != "" { + return false + } + return r.Finality(ctx) == common.DataFinalityStateFinalized +} + +func buildGetLogsSubRequestFlightKey( + ctx context.Context, + n common.Network, + parent *common.NormalizedRequest, + sub *common.NormalizedRequest, + sr ethGetLogsSubRequest, + skipCacheRead bool, + payloadLimit int64, +) (string, error) { + if sub == nil { + return "", fmt.Errorf("sub-request is nil") + } + hash, err := sub.CacheHash() + if err != nil { + return "", err + } + + return fmt.Sprintf( + "%s|%s|skipCacheRead=%t|payloadLimit=%d|range=%d-%d|hash=%s", + getLogsSubRequestRegistryKey(n), + parent.Finality(ctx).String(), + skipCacheRead, + payloadLimit, + sr.fromBlock, + sr.toBlock, + hash, + ), nil +} + +func serializeGetLogsSubRequestExecution(exec *getLogsSubRequestExecution) error { + if exec == nil || exec.jrr == nil || len(exec.serialized) > 0 { + return nil + } + + var buf bytes.Buffer + if _, err := exec.jrr.WriteTo(&buf); err != nil { + return err + } + exec.serialized = bytes.Clone(buf.Bytes()) + return nil +} + +func deserializeGetLogsSubRequestExecution(ctx context.Context, serialized []byte, fromCache bool, cacheAt int64) (*getLogsSubRequestExecution, error) { + serialized = bytes.Clone(serialized) + jrr := &common.JsonRpcResponse{} + if err := jrr.ParseFromBytes(ctx, serialized); err != nil { + return nil, err + } + return &getLogsSubRequestExecution{ + jrr: jrr, + holder: nil, + fromCache: fromCache, + cacheAt: cacheAt, + }, nil +} + // resolveBlockTagForGetLogs attempts to resolve a block tag (like "latest", "finalized") // to a hex block number string and its int64 value. If the value is already a hex number, // it parses and returns it. If the tag cannot be resolved (e.g., "safe", "pending", or @@ -81,7 +194,8 @@ func BuildGetLogsRequest(fromBlock, toBlock int64, address interface{}, topics i // projectPreForward_eth_getLogs records requested block-range size distribution // at the project level before cache and upstream selection. -// It does not modify the request or short-circuit; always returns (false, nil, nil). +// It returns (true, nil, err) only when maxDataBytes validation fails early; +// otherwise it does not modify the request and returns (false, nil, nil). func projectPreForward_eth_getLogs(ctx context.Context, n common.Network, nq *common.NormalizedRequest) (handled bool, resp *common.NormalizedResponse, err error) { if nq == nil || n == nil { return false, nil, nil @@ -239,8 +353,8 @@ func networkPreForward_eth_getLogs(ctx context.Context, n common.Network, ups [] } if requestRange > 0 && chunkSize > 0 && requestRange > chunkSize { subRequests := make([]ethGetLogsSubRequest, 0) - // Align first chunk start to chunkSize boundary for deterministic cache keys - // e.g. with chunkSize=1000, fromBlock=1500 -> first chunk starts at 1500, ends at 1999 + // Align chunk ends to chunkSize boundaries for deterministic cache keys. + // e.g. with chunkSize=1000, fromBlock=1500 -> first chunk is 1500..1999. sb := fromBlock for sb <= toBlock { // Compute aligned chunk end: next boundary - 1, clamped to toBlock @@ -918,6 +1032,216 @@ func executeGetLogsSubRequests(ctx context.Context, n common.Network, r *common. return executeGetLogsSubRequestsInternal(ctx, n, r, subRequests, skipCacheRead, concurrency, 0, nil) } +func executeSingleGetLogsSubRequest( + ctx context.Context, + n common.Network, + r *common.NormalizedRequest, + req ethGetLogsSubRequest, + skipCacheRead bool, + concurrency int, + depth int, + semaphore chan struct{}, + payloadLimit int64, + logger zerolog.Logger, + allowCoalesce bool, + releaseToken func(), +) (*getLogsSubRequestExecution, error) { + const maxSplitDepth = 16 + + srq, err := BuildGetLogsRequest(req.fromBlock, req.toBlock, req.address, req.topics) + if err != nil { + return nil, err + } + logger.Debug(). + Object("request", srq). + Msg("executing eth_getLogs sub-request") + + sbnrq := common.NewNormalizedRequestFromJsonRpcRequest(srq) + dr := r.Directives().Clone() + dr.SkipCacheRead = skipCacheRead + sbnrq.SetDirectives(dr) + sbnrq.SetNetwork(n) + sbnrq.SetParentRequestId(r.ID()) + sbnrq.CopyHttpContextFrom(r) + + if allowCoalesce && shouldCoalesceGetLogsSubRequest(ctx, r) { + flightKey, ferr := buildGetLogsSubRequestFlightKey(ctx, n, r, sbnrq, req, skipCacheRead, payloadLimit) + if ferr == nil { + resultCh := getLogsSubRequestFlights.DoChan(flightKey, func() (interface{}, error) { + // A singleflight leader must not impose its own cancellation deadline on + // other concurrent waiters for the same finalized sub-range. + flightCtx := context.WithoutCancel(ctx) + exec, execErr := executeSingleGetLogsSubRequest(flightCtx, n, r, req, skipCacheRead, concurrency, depth, semaphore, payloadLimit, logger, false, releaseToken) + if execErr != nil { + return nil, execErr + } + if serr := serializeGetLogsSubRequestExecution(exec); serr != nil { + if exec.holder != nil { + exec.holder.Release() + } + if exec.jrr != nil { + exec.jrr.Free() + } + return nil, serr + } + if exec.holder != nil { + exec.holder.Release() + } + if exec.jrr != nil { + exec.jrr.Free() + } + return &getLogsSubRequestExecution{ + serialized: exec.serialized, + fromCache: exec.fromCache, + cacheAt: exec.cacheAt, + }, nil + }) + select { + case <-ctx.Done(): + cause := context.Cause(ctx) + if cause != nil { + return nil, cause + } + return nil, ctx.Err() + case flightResult := <-resultCh: + if flightResult.Err != nil { + return nil, flightResult.Err + } + shared := flightResult.Val.(*getLogsSubRequestExecution) + return deserializeGetLogsSubRequestExecution(ctx, shared.serialized, shared.fromCache, shared.cacheAt) + } + } + logger.Debug().Err(ferr).Msg("failed to build eth_getLogs coalescing key; continuing without coalescing") + } + + // Use most of the remaining parent budget when available, but keep a bounded + // floor/ceiling so slow sub-ranges can still split without hanging the whole request. + subTimeout := 10 * time.Second + if dl, ok := ctx.Deadline(); ok { + rem := time.Until(dl) + if rem > 0 { + target := time.Duration(float64(rem) * 0.75) + if target > subTimeout { + subTimeout = target + } + } + } + if subTimeout < 3*time.Second { + subTimeout = 3 * time.Second + } + if subTimeout > 25*time.Second { + subTimeout = 25 * time.Second + } + + subCtx, cancel := context.WithTimeout(ctx, subTimeout) + rs, re := n.Forward(subCtx, sbnrq) + cancel() + if re != nil { + if depth < maxSplitDepth && shouldSplitEthGetLogsOnError(re) { + subSubs := splitEthGetLogsSubRequest(req) + if len(subSubs) > 0 { + if releaseToken != nil { + releaseToken() + } + subJrr, subMeta, subErr := executeGetLogsSubRequestsInternal(ctx, n, r, subSubs, skipCacheRead, concurrency, depth+1, semaphore) + if subErr == nil { + exec := &getLogsSubRequestExecution{jrr: subJrr} + if subMeta != nil { + exec.fromCache = subMeta.allFromCache + exec.cacheAt = subMeta.oldestCacheAt + } + return exec, nil + } + re = subErr + } else if req.fromBlock == req.toBlock && shouldFallbackEthGetLogsToBlockReceipts(re) { + if releaseToken != nil { + releaseToken() + } + fjrr, ferr := fallbackEthGetLogsSingleBlockViaBlockReceipts(ctx, n, r, req.fromBlock, req.address, req.topics, payloadLimit, skipCacheRead, util.RandomID()) + if ferr == nil && fjrr != nil { + return &getLogsSubRequestExecution{jrr: fjrr}, nil + } + logger.Warn().Err(ferr).AnErr("originalError", re).Int64("block", req.fromBlock). + Msg("eth_getLogs blockReceipts fallback failed after forward error") + re = ferr + } + } + return nil, fmt.Errorf("sub-request [%d-%d] forward failed: %w", req.fromBlock, req.toBlock, re) + } + + jrr, err := rs.JsonRpcResponse(ctx) + if err != nil { + rs.Release() + return nil, err + } + if jrr == nil { + rs.Release() + return nil, fmt.Errorf("unexpected empty json-rpc response %v", rs) + } + + if jrr.Error != nil { + if depth < maxSplitDepth && shouldSplitEthGetLogsOnError(jrr.Error) { + subSubs := splitEthGetLogsSubRequest(req) + if len(subSubs) > 0 { + rs.Release() + if releaseToken != nil { + releaseToken() + } + subJrr, subMeta, subErr := executeGetLogsSubRequestsInternal(ctx, n, r, subSubs, skipCacheRead, concurrency, depth+1, semaphore) + if subErr == nil { + exec := &getLogsSubRequestExecution{jrr: subJrr} + if subMeta != nil { + exec.fromCache = subMeta.allFromCache + exec.cacheAt = subMeta.oldestCacheAt + } + return exec, nil + } + return nil, subErr + } + if req.fromBlock == req.toBlock && shouldFallbackEthGetLogsToBlockReceipts(jrr.Error) { + rs.Release() + if releaseToken != nil { + releaseToken() + } + fjrr, ferr := fallbackEthGetLogsSingleBlockViaBlockReceipts(ctx, n, r, req.fromBlock, req.address, req.topics, payloadLimit, skipCacheRead, util.RandomID()) + if ferr == nil && fjrr != nil { + return &getLogsSubRequestExecution{jrr: fjrr}, nil + } + logger.Warn().Err(ferr).Int64("block", req.fromBlock). + Str("originalError", jrr.Error.Error()). + Msg("eth_getLogs blockReceipts fallback failed after JSON-RPC error") + return nil, ferr + } + } + rs.Release() + return nil, jrr.Error + } + + if payloadLimit > 0 { + filter := newGetLogsFilter(req.address, req.topics, payloadLimit) + dropped, ferr := filterGetLogsResponseByDataLimit(jrr, filter) + if ferr != nil { + rs.Release() + return nil, ferr + } + if dropped > 0 { + logger.Debug(). + Int("droppedLogs", dropped). + Int64("maxSize", payloadLimit). + Int64("fromBlock", req.fromBlock). + Int64("toBlock", req.toBlock). + Msg("filtered oversized eth_getLogs payloads from sub-request") + } + } + + return &getLogsSubRequestExecution{ + jrr: jrr, + holder: rs, + fromCache: rs.FromCache(), + cacheAt: rs.CacheStoredAtUnix(), + }, nil +} + func executeGetLogsSubRequestsInternal(ctx context.Context, n common.Network, r *common.NormalizedRequest, subRequests []ethGetLogsSubRequest, skipCacheRead bool, concurrency int, depth int, semaphore chan struct{}) (*common.JsonRpcResponse, *getLogsMergeMeta, error) { logger := n.Logger().With().Str("method", "eth_getLogs").Interface("id", r.ID()).Logger() payloadLimit, err := extractGetLogsPayloadLimitFromRequest(ctx, r) @@ -937,15 +1261,9 @@ func executeGetLogsSubRequestsInternal(ctx context.Context, n common.Network, r // Concurrency is passed by caller (cache chunking vs split-on-error differ). // Use a shared semaphore across recursive splits to bound total in-flight sub-requests. if semaphore == nil { - if concurrency <= 0 { - concurrency = 10 - } - semaphore = make(chan struct{}, concurrency) + semaphore = getLogsSharedSubRequestSemaphore(n, concurrency) } loopCtxErr := error(nil) - // maxSplitDepth bounds recursive binary splitting to prevent runaway recursion. - // Depth 16 allows up to 2^16 sub-requests, generous for real-world block ranges. - const maxSplitDepth = 16 loop: for idx, sr := range subRequests { wg.Add(1) @@ -980,202 +1298,20 @@ loop: n.ProjectId(), n.Label(), r.UserId(), r.AgentName(), ).Inc() } - applySubMeta := func(meta *getLogsMergeMeta) { - if meta != nil { - fromCacheFlags[i] = meta.allFromCache - cacheAts[i] = meta.oldestCacheAt - } else { - fromCacheFlags[i] = false - cacheAts[i] = 0 - } - } - - srq, err := BuildGetLogsRequest(req.fromBlock, req.toBlock, req.address, req.topics) - logger.Debug(). - Object("request", srq). - Msg("executing eth_getLogs sub-request") - - if err != nil { - mu.Lock() - incSplitFailure() - errs = append(errs, err) - mu.Unlock() - return - } - - sbnrq := common.NewNormalizedRequestFromJsonRpcRequest(srq) - dr := r.Directives().Clone() - dr.SkipCacheRead = skipCacheRead - // TODO dr.UseUpstream = u.Config().Id should we force this (or opposite of it)? - sbnrq.SetDirectives(dr) - sbnrq.SetNetwork(n) - sbnrq.SetParentRequestId(r.ID()) - - // Copy HTTP context (headers, query parameters, user) for proper metrics tracking - sbnrq.CopyHttpContextFrom(r) - - // Bound per-sub-request wall time so slow upstream getLogs calls trigger split-and-retry - // rather than letting the top-level client timeout. - subTimeout := 10 * time.Second - if dl, ok := ctx.Deadline(); ok { - rem := time.Until(dl) - // Prefer to use most of the remaining request budget (bounded), so slow but - // legitimate getLogs calls can still succeed under load. - if rem > 0 { - target := time.Duration(float64(rem) * 0.75) - if target > subTimeout { - subTimeout = target - } - } - } - if subTimeout < 3*time.Second { - subTimeout = 3 * time.Second - } - if subTimeout > 25*time.Second { - subTimeout = 25 * time.Second - } - subCtx, cancel := context.WithTimeout(ctx, subTimeout) - rs, re := n.Forward(subCtx, sbnrq) - cancel() - if re != nil { - // If a sub-request timed out or was too large, try splitting it further. - // This prevents a single slow/huge chunk from failing the entire merge. - if depth < maxSplitDepth && shouldSplitEthGetLogsOnError(re) { - subSubs := splitEthGetLogsSubRequest(req) - if len(subSubs) > 0 { - // Release token before recursing to avoid deadlocks with shared semaphore. - releaseToken() - subJrr, subMeta, subErr := executeGetLogsSubRequestsInternal(ctx, n, r, subSubs, skipCacheRead, concurrency, depth+1, semaphore) - if subErr == nil { - responses[i] = subJrr - holders[i] = nil - applySubMeta(subMeta) - return - } - re = subErr - } else if req.fromBlock == req.toBlock && shouldFallbackEthGetLogsToBlockReceipts(re) { - releaseToken() - fjrr, ferr := fallbackEthGetLogsSingleBlockViaBlockReceipts(ctx, n, r, req.fromBlock, req.address, req.topics, payloadLimit, skipCacheRead, util.RandomID()) - if ferr == nil && fjrr != nil { - responses[i] = fjrr - holders[i] = nil - fromCacheFlags[i] = false - cacheAts[i] = 0 - incSplitSuccess() - return - } - logger.Warn().Err(ferr).AnErr("originalError", re).Int64("block", req.fromBlock). - Msg("eth_getLogs blockReceipts fallback failed after forward error") - re = ferr - } - } + exec, execErr := executeSingleGetLogsSubRequest(ctx, n, r, req, skipCacheRead, concurrency, depth, semaphore, payloadLimit, logger, true, releaseToken) + if execErr != nil { mu.Lock() incSplitFailure() - errs = append(errs, fmt.Errorf("sub-request [%d-%d] forward failed: %w", req.fromBlock, req.toBlock, re)) + errs = append(errs, execErr) mu.Unlock() return } - jrr, err := rs.JsonRpcResponse(ctx) - if err != nil { - mu.Lock() - incSplitFailure() - errs = append(errs, err) - mu.Unlock() - rs.Release() - return - } - - if jrr == nil { - mu.Lock() - incSplitFailure() - errs = append(errs, fmt.Errorf("unexpected empty json-rpc response %v", rs)) - mu.Unlock() - rs.Release() - return - } - - if jrr.Error != nil { - // If the upstream returned a split-worthy JSON-RPC error, try splitting and retrying. - if depth < maxSplitDepth && shouldSplitEthGetLogsOnError(jrr.Error) { - subSubs := splitEthGetLogsSubRequest(req) - if len(subSubs) > 0 { - rs.Release() // no longer needed; we'll return a merged sub-response - releaseToken() - subJrr, subMeta, subErr := executeGetLogsSubRequestsInternal(ctx, n, r, subSubs, skipCacheRead, concurrency, depth+1, semaphore) - if subErr == nil { - responses[i] = subJrr - holders[i] = nil - applySubMeta(subMeta) - return - } - // If split execution failed, record the split failure and stop. - mu.Lock() - incSplitFailure() - errs = append(errs, subErr) - mu.Unlock() - return - } - if req.fromBlock == req.toBlock && shouldFallbackEthGetLogsToBlockReceipts(jrr.Error) { - // Release the failing response; we will compute logs via receipts. - rs.Release() - releaseToken() - fjrr, ferr := fallbackEthGetLogsSingleBlockViaBlockReceipts(ctx, n, r, req.fromBlock, req.address, req.topics, payloadLimit, skipCacheRead, util.RandomID()) - if ferr == nil && fjrr != nil { - responses[i] = fjrr - holders[i] = nil - fromCacheFlags[i] = false - cacheAts[i] = 0 - incSplitSuccess() - return - } - logger.Warn().Err(ferr).Int64("block", req.fromBlock). - Str("originalError", jrr.Error.Error()). - Msg("eth_getLogs blockReceipts fallback failed after JSON-RPC error") - mu.Lock() - incSplitFailure() - errs = append(errs, ferr) - mu.Unlock() - return - } - } - mu.Lock() - incSplitFailure() - errs = append(errs, jrr.Error) - mu.Unlock() - rs.Release() - return - } - - if payloadLimit > 0 { - filter := newGetLogsFilter(req.address, req.topics, payloadLimit) - dropped, err := filterGetLogsResponseByDataLimit(jrr, filter) - if err != nil { - mu.Lock() - incSplitFailure() - errs = append(errs, err) - mu.Unlock() - rs.Release() - return - } - if dropped > 0 { - logger.Debug(). - Int("droppedLogs", dropped). - Int64("maxSize", payloadLimit). - Int64("fromBlock", req.fromBlock). - Int64("toBlock", req.toBlock). - Msg("filtered oversized eth_getLogs payloads from sub-request") - } - } - incSplitSuccess() - // Avoid deep clone amplification: JsonRpcResponse already copies parsed fields - // out of the pooled parse buffer. Keep the sub-response alive until the merged - // response is released, then free it via GetLogsMultiResponseWriter.Release. - responses[i] = jrr - holders[i] = rs - fromCacheFlags[i] = rs.FromCache() - cacheAts[i] = rs.CacheStoredAtUnix() + responses[i] = exec.jrr + holders[i] = exec.holder + fromCacheFlags[i] = exec.fromCache + cacheAts[i] = exec.cacheAt }(sr, idx) } wg.Wait() diff --git a/architecture/evm/eth_getLogs_test.go b/architecture/evm/eth_getLogs_test.go index 90a278d85..e9cc59cb2 100644 --- a/architecture/evm/eth_getLogs_test.go +++ b/architecture/evm/eth_getLogs_test.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "sync" "sync/atomic" "testing" "time" @@ -18,6 +19,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "golang.org/x/sync/singleflight" ) func init() { @@ -52,8 +54,13 @@ func (m *mockNetwork) Label() string { } func (m *mockNetwork) Id() string { - args := m.Called() - return args.Get(0).(string) + for _, c := range m.ExpectedCalls { + if c.Method == "Id" { + args := m.Called() + return args.Get(0).(string) + } + } + return "evm:test" } func (m *mockNetwork) Forward( @@ -61,19 +68,25 @@ func (m *mockNetwork) Forward( req *common.NormalizedRequest, ) (*common.NormalizedResponse, error) { args := m.Called(ctx, req) - // Retrieve arguments from the mock and return them - respFn, eFn := args.Get(0).(func(ctx context.Context, r *common.NormalizedRequest) (*common.NormalizedResponse, error)) - if respFn != nil { - resp, err := respFn(ctx, req) + switch fn := args.Get(0).(type) { + case func(context.Context, *common.NormalizedRequest) (*common.NormalizedResponse, error): + resp, err := fn(ctx, req) if resp == nil && err == nil { - panic("Forward() returned nil for both resp and error eFn: " + fmt.Sprintf("%T", eFn)) + panic("Forward() returned nil for both resp and error eFn: " + fmt.Sprintf("%T", args.Get(1))) + } + return resp, err + case func(context.Context, *common.NormalizedRequest) *common.NormalizedResponse: + resp := fn(ctx, req) + err, _ := args.Get(1).(error) + if resp == nil && err == nil { + panic("Forward() returned nil for both resp and error eFn: " + fmt.Sprintf("%T", args.Get(1))) } return resp, err } respRaw, eRs := args.Get(0).(*common.NormalizedResponse) err, eEr := args.Get(1).(error) if respRaw == nil && err == nil { - panic("Forward() returned nil for both resp and error eFn: " + fmt.Sprintf("%T", eFn) + " eRs: " + fmt.Sprintf("%T", eRs) + " eEr: " + fmt.Sprintf("%T", eEr)) + panic("Forward() returned nil for both resp and error eFn: " + fmt.Sprintf("%T", args.Get(0)) + " eRs: " + fmt.Sprintf("%T", eRs) + " eEr: " + fmt.Sprintf("%T", eEr)) } return respRaw, err } @@ -92,6 +105,18 @@ func (m *mockNetwork) Logger() *zerolog.Logger { return &log.Logger } +func (m *mockNetwork) GetFinality(ctx context.Context, req *common.NormalizedRequest, resp *common.NormalizedResponse) common.DataFinalityState { + for _, c := range m.ExpectedCalls { + if c.Method == "GetFinality" { + args := m.Called(ctx, req, resp) + if finality, ok := args.Get(0).(common.DataFinalityState); ok { + return finality + } + } + } + return common.DataFinalityStateUnknown +} + func (m *mockNetwork) EvmStatePoller() common.EvmStatePoller { args := m.Called() if args.Get(0) == nil { @@ -1037,6 +1062,238 @@ func TestGetLogsFromBlockReceiptsWriter_IsResultEmptyish_InvalidResultIsNotEmpty assert.False(t, w.IsResultEmptyish()) } +func resetGetLogsSharedExecutionState() { + getLogsSubRequestSemaphores = sync.Map{} + getLogsSubRequestFlights = singleflight.Group{} +} + +func TestExecuteGetLogsSubRequests_GlobalSemaphoreSharedAcrossCalls(t *testing.T) { + resetGetLogsSharedExecutionState() + + n := new(mockNetwork) + n.On("Id").Return("evm:1").Maybe() + n.On("ProjectId").Return("prj").Maybe() + n.On("Config").Return(&common.NetworkConfig{Evm: &common.EvmNetworkConfig{GetLogsSplitConcurrency: 1}}).Maybe() + + var inFlight atomic.Int32 + var maxInFlight atomic.Int32 + var callCount atomic.Int32 + entered := make(chan struct{}, 2) + release := make(chan struct{}) + + n.On("Forward", mock.Anything, mock.Anything).Return( + func(ctx context.Context, r *common.NormalizedRequest) *common.NormalizedResponse { + callCount.Add(1) + current := inFlight.Add(1) + for { + prev := maxInFlight.Load() + if current <= prev || maxInFlight.CompareAndSwap(prev, current) { + break + } + } + entered <- struct{}{} + <-release + inFlight.Add(-1) + return common.NewNormalizedResponse(). + WithRequest(r). + WithJsonRpcResponse(common.MustNewJsonRpcResponseFromBytes([]byte(`1`), []byte(`[]`), nil)) + }, + nil, + ).Twice() + + req1 := createTestRequest(map[string]interface{}{"fromBlock": "0x1", "toBlock": "0x1"}) + req1.SetNetwork(n) + req2 := createTestRequest(map[string]interface{}{"fromBlock": "0x1", "toBlock": "0x1"}) + req2.SetNetwork(n) + + errCh1 := make(chan error, 1) + errCh2 := make(chan error, 1) + + go func() { + _, _, err := executeGetLogsSubRequests(context.Background(), n, req1, []ethGetLogsSubRequest{{fromBlock: 1, toBlock: 1}}, false, 1) + errCh1 <- err + }() + + select { + case <-entered: + case <-time.After(time.Second): + t.Fatal("first eth_getLogs sub-request did not start") + } + + go func() { + _, _, err := executeGetLogsSubRequests(context.Background(), n, req2, []ethGetLogsSubRequest{{fromBlock: 1, toBlock: 1}}, false, 1) + errCh2 <- err + }() + + select { + case <-entered: + t.Fatal("second eth_getLogs sub-request bypassed shared semaphore") + case <-time.After(150 * time.Millisecond): + } + + release <- struct{}{} + + select { + case <-entered: + case <-time.After(time.Second): + t.Fatal("second eth_getLogs sub-request never acquired shared semaphore") + } + + release <- struct{}{} + + require.NoError(t, <-errCh1) + require.NoError(t, <-errCh2) + assert.Equal(t, int32(1), maxInFlight.Load()) + assert.Equal(t, int32(2), callCount.Load()) + n.AssertExpectations(t) +} + +func TestExecuteGetLogsSubRequests_CoalescesFinalizedIdenticalSubRangesAcrossCalls(t *testing.T) { + resetGetLogsSharedExecutionState() + + n := new(mockNetwork) + n.On("Id").Return("evm:1").Maybe() + n.On("ProjectId").Return("prj").Maybe() + n.On("Config").Return(&common.NetworkConfig{Evm: &common.EvmNetworkConfig{GetLogsSplitConcurrency: 4}}).Maybe() + n.On("GetFinality", mock.Anything, mock.Anything, mock.Anything).Return(common.DataFinalityStateFinalized).Maybe() + + var callCount atomic.Int32 + entered := make(chan struct{}, 1) + release := make(chan struct{}) + + n.On("Forward", mock.Anything, mock.Anything).Return( + func(ctx context.Context, r *common.NormalizedRequest) *common.NormalizedResponse { + callCount.Add(1) + entered <- struct{}{} + <-release + return common.NewNormalizedResponse(). + WithRequest(r). + WithJsonRpcResponse(common.MustNewJsonRpcResponseFromBytes([]byte(`1`), []byte(`[{"blockNumber":"0x1"}]`), nil)) + }, + nil, + ).Once() + + req1 := createTestRequest(map[string]interface{}{"fromBlock": "0x1", "toBlock": "0x1"}) + req1.SetNetwork(n) + req2 := createTestRequest(map[string]interface{}{"fromBlock": "0x1", "toBlock": "0x1"}) + req2.SetNetwork(n) + + type result struct { + resp *common.JsonRpcResponse + err error + } + results := make(chan result, 2) + + run := func(req *common.NormalizedRequest) { + resp, _, err := executeGetLogsSubRequests(context.Background(), n, req, []ethGetLogsSubRequest{{fromBlock: 1, toBlock: 1}}, false, 4) + results <- result{resp: resp, err: err} + } + + go run(req1) + select { + case <-entered: + case <-time.After(time.Second): + t.Fatal("leader eth_getLogs sub-request did not start") + } + + go run(req2) + time.Sleep(100 * time.Millisecond) + assert.Equal(t, int32(1), callCount.Load(), "duplicate sub-range should coalesce while first request is in flight") + + close(release) + + for range 2 { + res := <-results + require.NoError(t, res.err) + require.NotNil(t, res.resp) + } + assert.Equal(t, int32(1), callCount.Load()) + n.AssertExpectations(t) +} + +func TestExecuteGetLogsSubRequests_CoalescedWaitersRespectOwnContexts(t *testing.T) { + resetGetLogsSharedExecutionState() + + n := new(mockNetwork) + n.On("Id").Return("evm:1").Maybe() + n.On("ProjectId").Return("prj").Maybe() + n.On("Config").Return(&common.NetworkConfig{Evm: &common.EvmNetworkConfig{GetLogsSplitConcurrency: 4}}).Maybe() + n.On("GetFinality", mock.Anything, mock.Anything, mock.Anything).Return(common.DataFinalityStateFinalized).Maybe() + + started := make(chan struct{}, 1) + release := make(chan struct{}) + n.On("Forward", mock.Anything, mock.Anything).Return( + func(ctx context.Context, r *common.NormalizedRequest) *common.NormalizedResponse { + started <- struct{}{} + <-release + return common.NewNormalizedResponse(). + WithRequest(r). + WithJsonRpcResponse(common.MustNewJsonRpcResponseFromBytes([]byte(`1`), []byte(`[{"blockNumber":"0x1"}]`), nil)) + }, + nil, + ).Once() + + req1 := createTestRequest(map[string]interface{}{"fromBlock": "0x1", "toBlock": "0x1"}) + req1.SetNetwork(n) + req2 := createTestRequest(map[string]interface{}{"fromBlock": "0x1", "toBlock": "0x1"}) + req2.SetNetwork(n) + + type result struct { + resp *common.JsonRpcResponse + err error + } + shortCtx, cancel := context.WithTimeout(context.Background(), 25*time.Millisecond) + defer cancel() + resCh := make(chan result, 2) + + go func() { + resp, _, err := executeGetLogsSubRequests(shortCtx, n, req1, []ethGetLogsSubRequest{{fromBlock: 1, toBlock: 1}}, false, 4) + resCh <- result{resp: resp, err: err} + }() + select { + case <-started: + case <-time.After(time.Second): + t.Fatal("leader eth_getLogs sub-request did not start") + } + + go func() { + resp, _, err := executeGetLogsSubRequests(context.Background(), n, req2, []ethGetLogsSubRequest{{fromBlock: 1, toBlock: 1}}, false, 4) + resCh <- result{resp: resp, err: err} + }() + + time.Sleep(75 * time.Millisecond) + close(release) + + first := <-resCh + second := <-resCh + if first.err == nil { + first, second = second, first + } + require.Error(t, first.err) + assert.ErrorIs(t, first.err, context.DeadlineExceeded) + require.NoError(t, second.err) + require.NotNil(t, second.resp) + n.AssertExpectations(t) +} + +func TestShouldCoalesceGetLogsSubRequest(t *testing.T) { + n := new(mockNetwork) + n.On("GetFinality", mock.Anything, mock.Anything, mock.Anything).Return(common.DataFinalityStateFinalized).Maybe() + req := createTestRequest(map[string]interface{}{"fromBlock": "0x1", "toBlock": "0x1"}) + req.SetNetwork(n) + + assert.True(t, shouldCoalesceGetLogsSubRequest(context.Background(), req)) + + req.SetDirectives(&common.RequestDirectives{UseUpstream: "rpc1"}) + assert.False(t, shouldCoalesceGetLogsSubRequest(context.Background(), req)) + + n2 := new(mockNetwork) + n2.On("GetFinality", mock.Anything, mock.Anything, mock.Anything).Return(common.DataFinalityStateUnfinalized).Maybe() + req2 := createTestRequest(map[string]interface{}{"fromBlock": "0x1", "toBlock": "0x1"}) + req2.SetNetwork(n2) + assert.False(t, shouldCoalesceGetLogsSubRequest(context.Background(), req2)) +} + func TestGetLogsFromBlockReceiptsWriter_FiltersOversizedPayloads(t *testing.T) { jrr := common.MustNewJsonRpcResponseFromBytes( []byte(`"0x1"`), diff --git a/architecture/evm/evm_state_poller.go b/architecture/evm/evm_state_poller.go index b586f53e3..f812f6912 100644 --- a/architecture/evm/evm_state_poller.go +++ b/architecture/evm/evm_state_poller.go @@ -6,6 +6,7 @@ import ( "fmt" "runtime/debug" "sync" + "sync/atomic" "time" "github.com/erpc/erpc/common" @@ -14,6 +15,7 @@ import ( "github.com/erpc/erpc/telemetry" "github.com/erpc/erpc/util" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -85,7 +87,11 @@ type EvmStatePoller struct { stateMu sync.RWMutex // Track if updates are in progress to avoid goroutine pile-up - finalizedUpdateInProgress sync.Mutex + finalizedUpdateInProgress sync.Mutex + latestPollTriggerInFlight atomic.Bool + finalizedPollTriggerInFlight atomic.Bool + latestPollAsyncFn func(context.Context) (int64, error) + finalizedPollAsyncFn func(context.Context) (int64, error) // Earliest per probe tracking earliestByProbe map[common.EvmAvailabilityProbeType]data.CounterInt64SharedVariable @@ -475,6 +481,116 @@ func (e *EvmStatePoller) LatestBlock() int64 { return e.latestBlockShared.GetValue() } +func (e *EvmStatePoller) pollLatestForAsyncTrigger(ctx context.Context) (int64, error) { + if e.latestPollAsyncFn != nil { + return e.latestPollAsyncFn(ctx) + } + return e.PollLatestBlockNumber(ctx) +} + +func (e *EvmStatePoller) pollFinalizedForAsyncTrigger(ctx context.Context) (int64, error) { + if e.finalizedPollAsyncFn != nil { + return e.finalizedPollAsyncFn(ctx) + } + return e.PollFinalizedBlockNumber(ctx) +} + +func (e *EvmStatePoller) TriggerLatestPollAsync(timeout time.Duration) bool { + if e == nil { + return false + } + if timeout <= 0 { + timeout = 10 * time.Second + } + if !e.latestPollTriggerInFlight.CompareAndSwap(false, true) { + return false + } + go func() { + defer e.latestPollTriggerInFlight.Store(false) + defer func() { + if rec := recover(); rec != nil { + e.logger.Error().Interface("panic", rec).Str("stack", string(debug.Stack())).Msg("panic in latest async poll trigger") + } + }() + ctx, cancel := context.WithTimeout(e.appCtx, timeout) + defer cancel() + if _, err := e.pollLatestForAsyncTrigger(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + e.logger.Debug().Err(err).Msg("latest async poll trigger failed") + } + }() + return true +} + +func (e *EvmStatePoller) TriggerFinalizedPollAsync(timeout time.Duration) bool { + if e == nil { + return false + } + if timeout <= 0 { + timeout = 10 * time.Second + } + if !e.finalizedPollTriggerInFlight.CompareAndSwap(false, true) { + return false + } + go func() { + defer e.finalizedPollTriggerInFlight.Store(false) + defer func() { + if rec := recover(); rec != nil { + e.logger.Error().Interface("panic", rec).Str("stack", string(debug.Stack())).Msg("panic in finalized async poll trigger") + } + }() + ctx, cancel := context.WithTimeout(e.appCtx, timeout) + defer cancel() + if _, err := e.pollFinalizedForAsyncTrigger(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + e.logger.Debug().Err(err).Msg("finalized async poll trigger failed") + } + }() + return true +} + +func TriggerLatestPollAsync(sp common.EvmStatePoller, timeout time.Duration) bool { + if sp == nil || sp.IsObjectNull() { + return false + } + if triggerable, ok := sp.(interface{ TriggerLatestPollAsync(time.Duration) bool }); ok { + return triggerable.TriggerLatestPollAsync(timeout) + } + go func() { + defer func() { + if rec := recover(); rec != nil { + log.Logger.Error().Interface("panic", rec).Str("stack", string(debug.Stack())).Msg("panic in latest async poll fallback") + } + }() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + if _, err := sp.PollLatestBlockNumber(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + log.Logger.Debug().Err(err).Msg("latest async poll fallback failed") + } + }() + return true +} + +func TriggerFinalizedPollAsync(sp common.EvmStatePoller, timeout time.Duration) bool { + if sp == nil || sp.IsObjectNull() { + return false + } + if triggerable, ok := sp.(interface{ TriggerFinalizedPollAsync(time.Duration) bool }); ok { + return triggerable.TriggerFinalizedPollAsync(timeout) + } + go func() { + defer func() { + if rec := recover(); rec != nil { + log.Logger.Error().Interface("panic", rec).Str("stack", string(debug.Stack())).Msg("panic in finalized async poll fallback") + } + }() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + if _, err := sp.PollFinalizedBlockNumber(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + log.Logger.Debug().Err(err).Msg("finalized async poll fallback failed") + } + }() + return true +} + func (e *EvmStatePoller) PollFinalizedBlockNumber(ctx context.Context) (int64, error) { if e.shouldSkipFinalizedCheck() { return 0, nil diff --git a/architecture/evm/json_rpc_cache.go b/architecture/evm/json_rpc_cache.go index ae37c1a01..df7bede19 100644 --- a/architecture/evm/json_rpc_cache.go +++ b/architecture/evm/json_rpc_cache.go @@ -53,8 +53,27 @@ const ( cacheEnvelopeMagic = "ERPC" cacheEnvelopeVersion = byte(1) cacheEnvelopeHeader = 4 + 1 + 8 + + // Bypass the extra envelope copy once finalized eth_getLogs payloads are already large. + largeFinalizedGetLogsEnvelopeBypassBytes = 1 << 20 + // Skip the heaviest finalized Postgres blob writes once payloads are large enough to dominate cost. + largeFinalizedGetLogsPostgresSkipBytes = 4 << 20 ) +func isLargeFinalizedGetLogsPayload(method string, finality common.DataFinalityState, resultSize int) bool { + return method == "eth_getLogs" && + finality == common.DataFinalityStateFinalized && + resultSize >= largeFinalizedGetLogsEnvelopeBypassBytes +} + +func shouldSkipOversizedFinalizedGetLogsPostgresWrite(method string, finality common.DataFinalityState, resultSize int, connector data.Connector) bool { + if !isLargeFinalizedGetLogsPayload(method, finality, resultSize) || resultSize < largeFinalizedGetLogsPostgresSkipBytes { + return false + } + _, ok := connector.(*data.PostgreSQLConnector) + return ok +} + func NewEvmJsonRpcCache(ctx context.Context, logger *zerolog.Logger, cfg *common.CacheConfig) (*EvmJsonRpcCache, error) { logger.Info().Msg("initializing evm json rpc cache...") @@ -619,6 +638,23 @@ func (c *EvmJsonRpcCache) Set(ctx context.Context, req *common.NormalizedRequest // Compress the value before storing if compression is enabled valueToStore := rpcResp.GetResultBytes() cachedCategory := rpcReq.Method + largeFinalizedGetLogs := isLargeFinalizedGetLogsPayload(rpcReq.Method, finState, len(valueToStore)) + if shouldSkipOversizedFinalizedGetLogsPostgresWrite(rpcReq.Method, finState, len(valueToStore), connector) { + lg.Debug(). + Str("connector", connector.Id()). + Int("resultBytes", len(valueToStore)). + Msg("skipping oversized finalized eth_getLogs Postgres cache write") + telemetry.MetricCacheSetSkippedTotal.WithLabelValues( + c.projectId, + req.NetworkLabel(), + rpcReq.Method, + connector.Id(), + policy.String(), + ttl.String(), + req.UserId(), + ).Inc() + return + } telemetry.MetricCacheSetOriginalBytes.WithLabelValues( c.projectId, req.NetworkLabel(), @@ -630,7 +666,7 @@ func (c *EvmJsonRpcCache) Set(ctx context.Context, req *common.NormalizedRequest ).Add(float64(len(valueToStore))) storedValue := valueToStore - if c.envelopeEnabled { + if c.envelopeEnabled && !largeFinalizedGetLogs { var wrapped bool storedValue, wrapped = wrapCacheEnvelope(valueToStore) if !wrapped { @@ -650,6 +686,10 @@ func (c *EvmJsonRpcCache) Set(ctx context.Context, req *common.NormalizedRequest req.UserId(), ).Inc() } + } else if c.envelopeEnabled && largeFinalizedGetLogs { + lg.Debug(). + Int("resultBytes", len(valueToStore)). + Msg("bypassing cache envelope for oversized finalized eth_getLogs payload") } if c.compressionEnabled && len(storedValue) >= c.compressionThreshold { compressedValue, isCompressed := c.compressValueBytes(storedValue) diff --git a/erpc/networks.go b/erpc/networks.go index 6c8efc417..05ddc63e7 100644 --- a/erpc/networks.go +++ b/erpc/networks.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math" + "runtime" "runtime/debug" "slices" "strings" @@ -34,6 +35,14 @@ const ( networkFailsafeTimeoutSlack = 30 * time.Millisecond ) +type networkMethodClass string + +const ( + networkMethodClassDefault networkMethodClass = "default" + networkMethodClassEthCall networkMethodClass = "eth_call" + networkMethodClassGetLogs networkMethodClass = "eth_getLogs" +) + type FailsafeExecutor struct { method string finalities []common.DataFinalityState @@ -79,6 +88,100 @@ func (n *Network) observeCacheWriteQueueDepth(sem chan struct{}) { ).Set(float64(len(sem))) } +func classifyNetworkMethodClass(method string) networkMethodClass { + switch method { + case "eth_getLogs": + return networkMethodClassGetLogs + case "eth_call": + return networkMethodClassEthCall + default: + return networkMethodClassDefault + } +} + +func deriveGetLogsNetworkBudget(cfg *common.NetworkConfig) int { + budget := 4 + if cfg != nil && cfg.Evm != nil { + budget = max(budget, cfg.Evm.GetLogsSplitConcurrency) + budget = max(budget, cfg.Evm.GetLogsCacheChunkConcurrency) + } + return max(1, budget) +} + +func deriveMethodClassBudgets(cfg *common.NetworkConfig) map[networkMethodClass]chan struct{} { + cpuBudget := max(1, runtime.GOMAXPROCS(0)) + getLogsBudget := max( + max(16, cpuBudget*4), + deriveGetLogsNetworkBudget(cfg)*4, + ) + ethCallBudget := max(8, cpuBudget*4) + defaultBudget := max(16, cpuBudget*8) + + return map[networkMethodClass]chan struct{}{ + networkMethodClassGetLogs: make(chan struct{}, getLogsBudget), + networkMethodClassEthCall: make(chan struct{}, ethCallBudget), + networkMethodClassDefault: make(chan struct{}, defaultBudget), + } +} + +func deriveGetLogsCacheWriteBudget(cfg *common.NetworkConfig) int { + return min(maxConcurrentNetworkCacheWrites, max(2, deriveGetLogsNetworkBudget(cfg))) +} + +func (n *Network) getMethodClassSem(method string) chan struct{} { + if n == nil || n.methodClassSems == nil { + return nil + } + return n.methodClassSems[classifyNetworkMethodClass(method)] +} + +func (n *Network) getCacheReadSem(method string) chan struct{} { + if n == nil || method != "eth_getLogs" { + return nil + } + return n.getLogsCacheReadSem +} + +func (n *Network) getAsyncCacheWriteSem(method string) chan struct{} { + if n == nil { + return nil + } + if method == "eth_getLogs" && n.getLogsCacheWriteSem != nil { + return n.getLogsCacheWriteSem + } + return n.getCacheWriteSem() +} + +func acquireBoundedPermit(ctx context.Context, sem chan struct{}) error { + if sem == nil { + return nil + } + select { + case sem <- struct{}{}: + return nil + case <-ctx.Done(): + cause := context.Cause(ctx) + if cause != nil { + return cause + } + return ctx.Err() + } +} + +func releaseBoundedPermit(sem chan struct{}) { + if sem == nil { + return + } + select { + case <-sem: + default: + } +} + +func shouldForceCacheMaterialization(method string) bool { + return method != "eth_getLogs" +} + func classifyAttemptReason(consensusEnabled bool, retries, hedges int) string { switch { case consensusEnabled: @@ -118,6 +221,9 @@ type Network struct { getSortedUpstreamsFn getSortedUpstreamsForNetworkFn cacheWriteSem chan struct{} cacheWriteSemInit sync.Once + methodClassSems map[networkMethodClass]chan struct{} + getLogsCacheReadSem chan struct{} + getLogsCacheWriteSem chan struct{} negativeResultCache *sync.Map postCompletionResults *sync.Map } @@ -474,9 +580,34 @@ func (n *Network) Forward(ctx context.Context, req *common.NormalizedRequest) (* defer n.cleanupMultiplexer(mlx) } + methodClassSem := n.getMethodClassSem(method) + if err := acquireBoundedPermit(ctx, methodClassSem); err != nil { + if mlx != nil { + mlx.Close(ctx, nil, err) + } + return nil, err + } + releaseMethodClassPermitWithReturn := true + releaseMethodClassPermit := func() { + releaseBoundedPermit(methodClassSem) + } + defer func() { + if releaseMethodClassPermitWithReturn { + releaseMethodClassPermit() + } + }() + if n.cacheDal != nil && !req.SkipCacheRead() { + cacheReadSem := n.getCacheReadSem(method) + if err := acquireBoundedPermit(ctx, cacheReadSem); err != nil { + if mlx != nil { + mlx.Close(ctx, nil, err) + } + return nil, err + } lg.Debug().Msgf("checking cache for request") resp, err := n.cacheDal.Get(ctx, req) + releaseBoundedPermit(cacheReadSem) if err != nil { lg.Debug().Err(err).Msgf("could not find response in cache") } else if resp != nil && !resp.IsObjectNull(ctx) { @@ -1011,11 +1142,13 @@ func (n *Network) Forward(ctx context.Context, req *common.NormalizedRequest) (* case <-ctx.Done(): // Forward is still running in background; drain its outcome to avoid // leaking a completed response when cancellation wins this select race. + releaseMethodClassPermitWithReturn = false go func() { out := <-execDone if out.resp != nil { out.resp.Release() } + releaseMethodClassPermit() }() cause := context.Cause(ctx) if cause == nil { @@ -1071,13 +1204,17 @@ func (n *Network) Forward(ctx context.Context, req *common.NormalizedRequest) (* if resp != nil { if n.cacheDal != nil { - sem := n.getCacheWriteSem() + sem := n.getAsyncCacheWriteSem(method) select { case sem <- struct{}{}: n.observeCacheWriteQueueDepth(sem) - // Force-materialize jrr so the goroutine reads only via atomic pointer (no locks needed). - // TODO For other architectures we might need a different approach - _, _ = resp.JsonRpcResponse(ctx) + if shouldForceCacheMaterialization(method) { + // Force-materialize non-getLogs responses now so the async cache writer + // reads stable parsed data; skip getLogs to avoid blocking the response path. + if _, err := resp.JsonRpcResponse(ctx); err != nil { + lg.Warn().Err(err).Msg("could not materialize response before async cache-set") + } + } resp.AddRef() go (func(resp *common.NormalizedResponse, forwardSpan trace.Span) { @@ -1449,11 +1586,7 @@ func (n *Network) handleBlockSkip( if isRetryable { if eu, ok := u.(common.EvmUpstream); ok { if sp := eu.EvmStatePoller(); sp != nil && !sp.IsObjectNull() { - go func() { - pollCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - _, _ = sp.PollLatestBlockNumber(pollCtx) - }() + evm.TriggerLatestPollAsync(sp, 10*time.Second) } } } diff --git a/erpc/networks_budget_test.go b/erpc/networks_budget_test.go new file mode 100644 index 000000000..5ce5922a8 --- /dev/null +++ b/erpc/networks_budget_test.go @@ -0,0 +1,91 @@ +package erpc + +import ( + "context" + "testing" + "time" + + "github.com/erpc/erpc/common" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestClassifyNetworkMethodClass(t *testing.T) { + assert.Equal(t, networkMethodClassGetLogs, classifyNetworkMethodClass("eth_getLogs")) + assert.Equal(t, networkMethodClassEthCall, classifyNetworkMethodClass("eth_call")) + assert.Equal(t, networkMethodClassDefault, classifyNetworkMethodClass("eth_blockNumber")) +} + +func TestDeriveMethodClassBudgetsUsesDedicatedGetLogsCapacity(t *testing.T) { + cfg := &common.NetworkConfig{ + Evm: &common.EvmNetworkConfig{ + GetLogsSplitConcurrency: 7, + GetLogsCacheChunkConcurrency: 11, + }, + } + + budgets := deriveMethodClassBudgets(cfg) + require.NotNil(t, budgets[networkMethodClassGetLogs]) + require.NotNil(t, budgets[networkMethodClassEthCall]) + require.NotNil(t, budgets[networkMethodClassDefault]) + assert.GreaterOrEqual(t, cap(budgets[networkMethodClassGetLogs]), 44) + assert.GreaterOrEqual(t, cap(budgets[networkMethodClassEthCall]), 8) + assert.GreaterOrEqual(t, cap(budgets[networkMethodClassDefault]), 16) +} + +func TestNetworkMethodClassPermitsIsolateGetLogsFromEthCall(t *testing.T) { + n := &Network{ + methodClassSems: map[networkMethodClass]chan struct{}{ + networkMethodClassGetLogs: make(chan struct{}, 1), + networkMethodClassEthCall: make(chan struct{}, 1), + networkMethodClassDefault: make(chan struct{}, 1), + }, + } + + require.NoError(t, acquireBoundedPermit(context.Background(), n.getMethodClassSem("eth_getLogs"))) + defer releaseBoundedPermit(n.getMethodClassSem("eth_getLogs")) + + otherClassCtx, otherClassCancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer otherClassCancel() + require.NoError(t, acquireBoundedPermit(otherClassCtx, n.getMethodClassSem("eth_call"))) + releaseBoundedPermit(n.getMethodClassSem("eth_call")) + + sameClassCtx, sameClassCancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer sameClassCancel() + err := acquireBoundedPermit(sameClassCtx, n.getMethodClassSem("eth_getLogs")) + require.Error(t, err) + assert.ErrorIs(t, err, context.DeadlineExceeded) +} + +func TestNetworkGetLogsCacheSemaphoresUseDedicatedPools(t *testing.T) { + n := &Network{ + cacheWriteSem: make(chan struct{}, 5), + getLogsCacheReadSem: make(chan struct{}, 2), + getLogsCacheWriteSem: make(chan struct{}, 3), + } + + assert.Equal(t, n.getLogsCacheReadSem, n.getCacheReadSem("eth_getLogs")) + assert.Nil(t, n.getCacheReadSem("eth_call")) + assert.Equal(t, n.getLogsCacheWriteSem, n.getAsyncCacheWriteSem("eth_getLogs")) + assert.Equal(t, n.cacheWriteSem, n.getAsyncCacheWriteSem("eth_call")) +} + +func TestDeriveGetLogsCacheWriteBudget(t *testing.T) { + cfg := &common.NetworkConfig{ + Evm: &common.EvmNetworkConfig{ + GetLogsSplitConcurrency: 7, + GetLogsCacheChunkConcurrency: 11, + }, + } + assert.Equal(t, 11, deriveGetLogsCacheWriteBudget(cfg)) + + cfg.Evm.GetLogsSplitConcurrency = 200 + cfg.Evm.GetLogsCacheChunkConcurrency = 300 + assert.Equal(t, maxConcurrentNetworkCacheWrites, deriveGetLogsCacheWriteBudget(cfg)) +} + +func TestShouldForceCacheMaterialization(t *testing.T) { + assert.False(t, shouldForceCacheMaterialization("eth_getLogs")) + assert.True(t, shouldForceCacheMaterialization("eth_call")) + assert.True(t, shouldForceCacheMaterialization("eth_blockNumber")) +} diff --git a/erpc/networks_registry.go b/erpc/networks_registry.go index 3fadd3492..d4b16e9ce 100644 --- a/erpc/networks_registry.go +++ b/erpc/networks_registry.go @@ -160,6 +160,9 @@ func NewNetwork( bootstrapOnce: sync.Once{}, inFlightRequests: &sync.Map{}, + methodClassSems: deriveMethodClassBudgets(nwCfg), + getLogsCacheReadSem: make(chan struct{}, deriveGetLogsNetworkBudget(nwCfg)), + getLogsCacheWriteSem: make(chan struct{}, deriveGetLogsCacheWriteBudget(nwCfg)), negativeResultCache: &sync.Map{}, postCompletionResults: &sync.Map{}, failsafeExecutors: failsafeExecutors,