Skip to content

Commit 5584227

Browse files
NONEVM-3038: logpoller global filter cache (#585)
* feat: global filter cache * chore: clean up * chore: clean up Co-authored-by: Joe Huang <joe.huang@smartcontract.com> --------- Co-authored-by: Joe Huang <joe.huang@smartcontract.com>
1 parent 5bc9247 commit 5584227

7 files changed

Lines changed: 646 additions & 38 deletions

File tree

pkg/logpoller/filter.go

Lines changed: 100 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,59 +2,132 @@ package logpoller
22

33
import (
44
"context"
5-
"fmt"
65

76
"github.com/xssnick/tonutils-go/address"
87

98
"github.com/smartcontractkit/chainlink-ton/pkg/logpoller/models"
109
)
1110

12-
// buildFilterIndex creates a filter index for efficient lookup during processing.
13-
// Returns FilterIndex mapping filter keys to Filter objects, enabling direct property access.
14-
func (lp *service) buildFilterIndex(ctx context.Context, addresses []*address.Address) (models.FilterIndex, error) {
15-
filterIndex := make(models.FilterIndex)
11+
// buildFilterIndex builds FilterIndex from the in-memory filtersByName cache.
12+
// Returns FilterIndex mapping filter keys to Filter objects for direct property access.
13+
// Note: returned Filter pointers reference cached data - callers must not mutate them.
14+
func (lp *service) buildFilterIndex(ctx context.Context) (models.FilterIndex, error) {
15+
if err := lp.loadFilters(ctx); err != nil {
16+
return nil, err
17+
}
1618

17-
for _, addr := range addresses {
18-
filters, err := lp.filterStore.GetFiltersByAddress(ctx, addr)
19-
if err != nil {
20-
return nil, fmt.Errorf("failed to get filters for %s: %w", addr.String(), err)
21-
}
19+
lp.filtersMu.RLock()
20+
defer lp.filtersMu.RUnlock()
2221

23-
for _, filter := range filters {
24-
key := models.FilterKey{
25-
Address: addr,
26-
MsgType: filter.MsgType,
27-
EventSig: filter.EventSig,
28-
}
29-
keyStr := key.String()
30-
filterIndex[keyStr] = append(filterIndex[keyStr], &filter)
22+
filterIndex := make(models.FilterIndex)
23+
for _, filter := range lp.filtersByName {
24+
key := models.FilterKey{
25+
Address: filter.Address,
26+
MsgType: filter.MsgType,
27+
EventSig: filter.EventSig,
3128
}
29+
keyStr := key.String()
30+
filterIndex[keyStr] = append(filterIndex[keyStr], filter)
3231
}
3332

3433
return filterIndex, nil
3534
}
3635

37-
// RegisterFilter adds a new filter to monitor specific address/event signature combinations.
38-
// Note: Filter changes take effect on the next LogPoller loop tick (up to pollPeriod delay)
36+
// getDistinctAddresses returns unique addresses from the in-memory filtersByName cache.
37+
func (lp *service) getDistinctAddresses(ctx context.Context) ([]*address.Address, error) {
38+
if err := lp.loadFilters(ctx); err != nil {
39+
return nil, err
40+
}
41+
42+
lp.filtersMu.RLock()
43+
defer lp.filtersMu.RUnlock()
44+
45+
addressSet := make(map[string]*address.Address)
46+
for _, filter := range lp.filtersByName {
47+
addrStr := filter.Address.String()
48+
if _, ok := addressSet[addrStr]; !ok {
49+
addressSet[addrStr] = filter.Address
50+
}
51+
}
52+
53+
addresses := make([]*address.Address, 0, len(addressSet))
54+
for _, addr := range addressSet {
55+
addresses = append(addresses, addr)
56+
}
57+
58+
return addresses, nil
59+
}
60+
61+
// RegisterFilter registers a filter for log polling.
62+
// Checks cache first - skips DB if filter already exists with same config.
63+
//
64+
// Note: Filter changes take effect on the next LogPoller loop tick (up to pollPeriod delay).
3965
// If registration occurs before run() reads addresses, the change applies immediately.
4066
// Otherwise, it waits until the next tick.
41-
func (lp *service) RegisterFilter(ctx context.Context, flt models.Filter) (int64, error) {
42-
id, err := lp.filterStore.RegisterFilter(ctx, flt)
67+
func (lp *service) RegisterFilter(ctx context.Context, filter models.Filter) (int64, error) {
68+
// Ensure cache is loaded
69+
if err := lp.loadFilters(ctx); err != nil {
70+
return 0, err
71+
}
72+
73+
// Check cache first (read lock)
74+
lp.filtersMu.RLock()
75+
if cached, ok := lp.filtersByName[filter.Name]; ok {
76+
// Filter exists - check if config matches
77+
if cached.Address.String() == filter.Address.String() &&
78+
cached.MsgType == filter.MsgType &&
79+
cached.EventSig == filter.EventSig {
80+
// Same config - return cached ID, skip DB
81+
id := cached.ID
82+
lp.filtersMu.RUnlock()
83+
return id, nil
84+
}
85+
}
86+
lp.filtersMu.RUnlock()
87+
88+
// Cache miss or config changed - hit DB
89+
id, err := lp.filterStore.RegisterFilter(ctx, filter)
4390
if err != nil {
4491
return 0, err
4592
}
4693

94+
// Update cache (write lock)
95+
lp.filtersMu.Lock()
96+
filter.ID = id
97+
lp.filtersByName[filter.Name] = &filter
98+
lp.filtersMu.Unlock()
99+
47100
return id, nil
48101
}
49102

50-
// UnregisterFilter removes a filter by name.
51-
// Note: Filter removal takes effect on the next LogPoller loop tick (up to pollPeriod delay)
103+
// UnregisterFilter marks a filter as deleted.
104+
//
105+
// Note: Filter removal takes effect on the next LogPoller loop tick (up to pollPeriod delay).
52106
// If unregistration occurs during an active tick, the old filter continues processing for that tick.
53107
func (lp *service) UnregisterFilter(ctx context.Context, name string) error {
54-
return lp.filterStore.UnregisterFilter(ctx, name)
108+
// Hit DB first (marks is_deleted = true)
109+
if err := lp.filterStore.UnregisterFilter(ctx, name); err != nil {
110+
return err
111+
}
112+
113+
// Update cache
114+
lp.filtersMu.Lock()
115+
delete(lp.filtersByName, name)
116+
lp.filtersMu.Unlock()
117+
118+
return nil
55119
}
56120

57-
// HasFilter checks if a filter with the given name exists
121+
// HasFilter checks if a filter with the given name exists.
122+
// Uses in-memory cache - no database query.
58123
func (lp *service) HasFilter(ctx context.Context, name string) (bool, error) {
59-
return lp.filterStore.HasFilter(ctx, name)
124+
if err := lp.loadFilters(ctx); err != nil {
125+
return false, err
126+
}
127+
128+
lp.filtersMu.RLock()
129+
_, exists := lp.filtersByName[name]
130+
lp.filtersMu.RUnlock()
131+
132+
return exists, nil
60133
}

0 commit comments

Comments
 (0)