Skip to content

Commit 409d90d

Browse files
committed
chanevents: implement store
1 parent c4934a9 commit 409d90d

7 files changed

Lines changed: 544 additions & 2 deletions

File tree

chanevents/chanevents.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Package chanevents contains functions for monitoring and storing channel
2+
// events such as online/offline and balance updates.
3+
package chanevents
4+
5+
import (
6+
"time"
7+
8+
"github.com/btcsuite/btcd/btcutil"
9+
"github.com/lightningnetwork/lnd/fn/v2"
10+
)
11+
12+
// EventType is an enum for the different types of channel events.
13+
type EventType int16
14+
15+
const (
16+
// EventTypeUnknown is the unknown event type.
17+
EventTypeUnknown EventType = iota
18+
19+
// EventTypeOnline is the online event type.
20+
EventTypeOnline
21+
22+
// EventTypeOffline is the offline event type.
23+
EventTypeOffline
24+
25+
// EventTypeUpdate is the balance update event type.
26+
EventTypeUpdate
27+
)
28+
29+
// String returns the string representation of the event type.
30+
func (e EventType) String() string {
31+
switch e {
32+
case EventTypeOnline:
33+
return "online"
34+
case EventTypeOffline:
35+
return "offline"
36+
case EventTypeUpdate:
37+
return "update"
38+
default:
39+
return "unknown"
40+
}
41+
}
42+
43+
// EventTypeFromString returns the event type from a string.
44+
func EventTypeFromString(s string) EventType {
45+
switch s {
46+
case "online":
47+
return EventTypeOnline
48+
case "offline":
49+
return EventTypeOffline
50+
case "update":
51+
return EventTypeUpdate
52+
default:
53+
return EventTypeUnknown
54+
}
55+
}
56+
57+
// Peer is the application-level representation of a peer.
58+
type Peer struct {
59+
// ID is the database ID of the peer.
60+
ID int64
61+
62+
// PubKey is the public key of the peer.
63+
PubKey string
64+
}
65+
66+
// Channel is the application-level representation of a channel.
67+
type Channel struct {
68+
// ID is the database ID of the channel.
69+
ID int64
70+
71+
// ChannelPoint is the channel point of the channel.
72+
ChannelPoint string
73+
74+
// ShortChannelID is the short channel ID of the channel.
75+
ShortChannelID uint64
76+
77+
// PeerID is the database ID of the peer that this channel is with.
78+
PeerID int64
79+
}
80+
81+
// ChannelEvent is the application-level representation of a channel event.
82+
type ChannelEvent struct {
83+
// ID is the database ID of the event.
84+
ID int64
85+
86+
// ChannelID is the database ID of the channel that this event is
87+
// associated with.
88+
ChannelID int64
89+
90+
// EventType is the type of the event.
91+
EventType EventType
92+
93+
// Timestamp is the time that the event occurred.
94+
Timestamp time.Time
95+
96+
// LocalBalance is the local balance of the channel at the time of the
97+
// event. This is only populated for balance update events.
98+
LocalBalance fn.Option[btcutil.Amount]
99+
100+
// RemoteBalance is the remote balance of the channel at the time of the
101+
// event. This is only populated for balance update events.
102+
RemoteBalance fn.Option[btcutil.Amount]
103+
}

chanevents/store.go

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
package chanevents
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"errors"
7+
"time"
8+
9+
"github.com/btcsuite/btcd/btcutil"
10+
"github.com/lightninglabs/faraday/db/sqlc"
11+
"github.com/lightningnetwork/lnd/clock"
12+
"github.com/lightningnetwork/lnd/fn/v2"
13+
"github.com/lightningnetwork/lnd/sqldb/v2"
14+
)
15+
16+
var (
17+
errUnknownPeer = errors.New("unknown peer")
18+
errUnknownChannel = errors.New("unknown channel")
19+
)
20+
21+
// PeerQueries is a subset of the sqlc.Queries interface that can be used
22+
// to interact with the peers, channels and channel_events tables.
23+
type PeerQueries interface {
24+
InsertPeer(ctx context.Context, pubkey string) (int64, error)
25+
GetPeerByPubKey(ctx context.Context, pubkey string) (sqlc.Peer, error)
26+
InsertChannel(ctx context.Context, arg sqlc.InsertChannelParams) (int64, error)
27+
GetChannelByChanPoint(ctx context.Context, channelPoint string) (sqlc.Channel, error)
28+
GetChannelByShortChanID(ctx context.Context, shortChannelID int64) (sqlc.Channel, error)
29+
InsertChannelEvent(ctx context.Context, arg sqlc.InsertChannelEventParams) error
30+
GetChannelEvents(ctx context.Context, arg sqlc.GetChannelEventsParams) ([]sqlc.ChannelEvent, error)
31+
}
32+
33+
// Store provides access to the db for channel events.
34+
type Store struct {
35+
// db is all the higher level queries that the SQLStore has access to
36+
// in order to implement all its CRUD logic.
37+
db BatchedSQLQueries
38+
39+
// BaseDB represents the underlying database connection.
40+
*sqldb.BaseDB
41+
42+
clock clock.Clock
43+
}
44+
45+
// BatchedSQLQueries combines the SQLQueries interface with the BatchedTx
46+
// interface, allowing for multiple queries to be executed in single SQL
47+
// transaction.
48+
type BatchedSQLQueries interface {
49+
SQLQueries
50+
51+
sqldb.BatchedTx[SQLQueries]
52+
}
53+
54+
// SQLQueries is a subset of the sqlc.Queries interface that can be used to
55+
// interact with various chanevents tables.
56+
type SQLQueries interface {
57+
sqldb.BaseQuerier
58+
59+
PeerQueries
60+
}
61+
62+
type SQLQueriesExecutor[T sqldb.BaseQuerier] struct {
63+
*sqldb.TransactionExecutor[T]
64+
65+
SQLQueries
66+
}
67+
68+
// NewStore creates a new SQLStore instance given an open SQLQueries
69+
// storage backend.
70+
func NewStore(sqlDB *sqldb.BaseDB, queries *sqlc.Queries,
71+
clock clock.Clock) *Store {
72+
73+
txExecutor := sqldb.NewTransactionExecutor(
74+
sqlDB, func(tx *sql.Tx) SQLQueries {
75+
return queries.WithTx(tx)
76+
},
77+
)
78+
79+
executor := &SQLQueriesExecutor[SQLQueries]{
80+
TransactionExecutor: txExecutor,
81+
SQLQueries: queries,
82+
}
83+
84+
return &Store{
85+
db: executor,
86+
BaseDB: sqlDB,
87+
clock: clock,
88+
}
89+
}
90+
91+
// AddPeer adds a new peer to the database.
92+
func (s *Store) AddPeer(ctx context.Context, pubkey string) (int64, error) {
93+
id, err := s.db.InsertPeer(ctx, pubkey)
94+
if err != nil {
95+
return 0, err
96+
}
97+
98+
return id, nil
99+
}
100+
101+
// GetPeer retrieves a peer by their public key.
102+
func (s *Store) GetPeer(ctx context.Context, pubkey string) (*Peer, error) {
103+
dbPeer, err := s.db.GetPeerByPubKey(ctx, pubkey)
104+
if err != nil {
105+
if errors.Is(err, sql.ErrNoRows) {
106+
return nil, errUnknownPeer
107+
}
108+
return nil, err
109+
}
110+
111+
return &Peer{
112+
ID: dbPeer.ID,
113+
PubKey: dbPeer.Pubkey,
114+
}, nil
115+
}
116+
117+
// int64ToSCID converts an int64 to a uint64 ShortChannelID.
118+
// The BOLT spec encodes SCIDs as uint64, but SQL only supports signed int64.
119+
// We preserve the bits, which means SCIDs with the high bit set will appear
120+
// negative in the database. Direct SQL queries (e.g. ORDER BY short_channel_id)
121+
// will not sort these correctly, but round-tripping through Go preserves the value.
122+
func int64ToSCID(i int64) uint64 {
123+
return uint64(i)
124+
}
125+
126+
// scidToInt64 converts a uint64 ShortChannelID to an int64 for SQL storage.
127+
func scidToInt64(u uint64) int64 {
128+
return int64(u)
129+
}
130+
131+
// AddChannel adds a new channel for a peer.
132+
func (s *Store) AddChannel(ctx context.Context, channelPoint string,
133+
shortChannelID uint64, peerID int64) (int64, error) {
134+
135+
id, err := s.db.InsertChannel(ctx, sqlc.InsertChannelParams{
136+
ChannelPoint: channelPoint,
137+
ShortChannelID: scidToInt64(shortChannelID),
138+
PeerID: peerID,
139+
})
140+
if err != nil {
141+
return 0, err
142+
}
143+
144+
return id, nil
145+
}
146+
147+
// GetChannel retrieves a channel by its channel point.
148+
func (s *Store) GetChannel(ctx context.Context, channelPoint string) (*Channel, error) {
149+
dbChannel, err := s.db.GetChannelByChanPoint(ctx, channelPoint)
150+
if err != nil {
151+
if errors.Is(err, sql.ErrNoRows) {
152+
return nil, errUnknownChannel
153+
}
154+
return nil, err
155+
}
156+
157+
return &Channel{
158+
ID: dbChannel.ID,
159+
ChannelPoint: dbChannel.ChannelPoint,
160+
ShortChannelID: int64ToSCID(dbChannel.ShortChannelID),
161+
PeerID: dbChannel.PeerID,
162+
}, nil
163+
}
164+
165+
// AddChannelEvent adds a new channel event.
166+
func (s *Store) AddChannelEvent(ctx context.Context, event *ChannelEvent) error {
167+
var localBalance sql.NullInt64
168+
event.LocalBalance.WhenSome(func(b btcutil.Amount) {
169+
localBalance.Int64 = int64(b)
170+
localBalance.Valid = true
171+
})
172+
173+
var remoteBalance sql.NullInt64
174+
event.RemoteBalance.WhenSome(func(b btcutil.Amount) {
175+
remoteBalance.Int64 = int64(b)
176+
remoteBalance.Valid = true
177+
})
178+
179+
timestamp := event.Timestamp
180+
if timestamp.IsZero() {
181+
timestamp = s.clock.Now().UTC()
182+
}
183+
184+
return s.db.InsertChannelEvent(ctx, sqlc.InsertChannelEventParams{
185+
ChannelID: event.ChannelID,
186+
EventType: int16(event.EventType),
187+
Timestamp: timestamp,
188+
LocalBalanceSat: localBalance,
189+
RemoteBalanceSat: remoteBalance,
190+
})
191+
}
192+
193+
// GetChannelEvents retrieves all events for a channel within a given time range.
194+
// TODO(M-2): Add pagination support (LIMIT/OFFSET) to prevent OOM on high-traffic channels.
195+
func (s *Store) GetChannelEvents(ctx context.Context, channelID int64,
196+
startTime, endTime time.Time) ([]*ChannelEvent, error) {
197+
198+
dbEvents, err := s.db.GetChannelEvents(ctx, sqlc.GetChannelEventsParams{
199+
ChannelID: channelID,
200+
Timestamp: startTime.UTC(),
201+
Timestamp_2: endTime.UTC(),
202+
})
203+
if err != nil {
204+
return nil, err
205+
}
206+
207+
events := make([]*ChannelEvent, len(dbEvents))
208+
for i, dbEvent := range dbEvents {
209+
var localBalance fn.Option[btcutil.Amount]
210+
if dbEvent.LocalBalanceSat.Valid {
211+
amt := btcutil.Amount(dbEvent.LocalBalanceSat.Int64)
212+
localBalance = fn.Some(amt)
213+
}
214+
215+
var remoteBalance fn.Option[btcutil.Amount]
216+
if dbEvent.RemoteBalanceSat.Valid {
217+
amt := btcutil.Amount(dbEvent.RemoteBalanceSat.Int64)
218+
remoteBalance = fn.Some(amt)
219+
}
220+
221+
events[i] = &ChannelEvent{
222+
ID: dbEvent.ID,
223+
ChannelID: dbEvent.ChannelID,
224+
EventType: EventType(dbEvent.EventType),
225+
Timestamp: dbEvent.Timestamp.UTC(),
226+
LocalBalance: localBalance,
227+
RemoteBalance: remoteBalance,
228+
}
229+
}
230+
231+
return events, nil
232+
}

0 commit comments

Comments
 (0)