Skip to content

Commit 6034fcd

Browse files
committed
chanevents: implement store
1 parent 9432748 commit 6034fcd

9 files changed

Lines changed: 619 additions & 3 deletions

File tree

chanevents/chanevents.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Package chanevents contains functions for monitoring and storing channel
2+
// events such as online/offline and balance updates.
3+
package chanevents
4+
5+
import (
6+
"time"
7+
8+
"github.com/btcsuite/btcd/btcutil"
9+
"github.com/lightningnetwork/lnd/fn/v2"
10+
)
11+
12+
// EventType is an enum for the different types of channel events.
13+
type EventType int16
14+
15+
const (
16+
// EventTypeUnknown is the unknown event type.
17+
EventTypeUnknown = 0
18+
19+
// EventTypeOnline is the online event type.
20+
EventTypeOnline = 1
21+
22+
// EventTypeOffline is the offline event type.
23+
EventTypeOffline = 2
24+
25+
// EventTypeUpdate is the balance update event type.
26+
EventTypeUpdate = 3
27+
)
28+
29+
// String returns the string representation of the event type.
30+
func (e EventType) String() string {
31+
switch e {
32+
case EventTypeOnline:
33+
return "online"
34+
35+
case EventTypeOffline:
36+
return "offline"
37+
38+
case EventTypeUpdate:
39+
return "update"
40+
41+
default:
42+
return "unknown"
43+
}
44+
}
45+
46+
// EventTypeFromString returns the event type from a string.
47+
func EventTypeFromString(s string) EventType {
48+
switch s {
49+
case "online":
50+
return EventTypeOnline
51+
52+
case "offline":
53+
return EventTypeOffline
54+
55+
case "update":
56+
return EventTypeUpdate
57+
58+
default:
59+
return EventTypeUnknown
60+
}
61+
}
62+
63+
// Peer is the application-level representation of a peer.
64+
type Peer struct {
65+
// ID is the database ID of the peer.
66+
ID int64
67+
68+
// PubKey is the public key of the peer.
69+
PubKey string
70+
}
71+
72+
// Channel is the application-level representation of a channel.
73+
type Channel struct {
74+
// ID is the database ID of the channel.
75+
ID int64
76+
77+
// ChannelPoint is the channel point of the channel.
78+
ChannelPoint string
79+
80+
// ShortChannelID is the short channel ID of the channel.
81+
ShortChannelID uint64
82+
83+
// PeerID is the database ID of the peer that this channel is with.
84+
PeerID int64
85+
}
86+
87+
// ChannelEvent is the application-level representation of a channel event.
88+
type ChannelEvent struct {
89+
// ID is the database ID of the event.
90+
ID int64
91+
92+
// ChannelID is the database ID of the channel that this event is
93+
// associated with.
94+
ChannelID int64
95+
96+
// EventType is the type of the event.
97+
EventType EventType
98+
99+
// Timestamp is the time that the event occurred.
100+
Timestamp time.Time
101+
102+
// LocalBalance is the local balance of the channel at the time of the
103+
// event. This is only populated for balance update events.
104+
LocalBalance fn.Option[btcutil.Amount]
105+
106+
// RemoteBalance is the remote balance of the channel at the time of the
107+
// event. This is only populated for balance update events.
108+
RemoteBalance fn.Option[btcutil.Amount]
109+
}

chanevents/store.go

Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
package chanevents
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"errors"
7+
"fmt"
8+
"time"
9+
10+
"github.com/btcsuite/btcd/btcutil"
11+
"github.com/lightninglabs/faraday/db/sqlc"
12+
"github.com/lightningnetwork/lnd/clock"
13+
"github.com/lightningnetwork/lnd/fn/v2"
14+
"github.com/lightningnetwork/lnd/sqldb/v2"
15+
)
16+
17+
var (
18+
errUnknownPeer = errors.New("unknown peer")
19+
errUnknownChannel = errors.New("unknown channel")
20+
)
21+
22+
// Queries is a subset of the sqlc.Queries interface that can be used to
23+
// interact with the peers, channels and channel_events tables.
24+
type Queries interface {
25+
InsertPeer(ctx context.Context, pubkey string) (int64, error)
26+
27+
GetPeerByPubKey(ctx context.Context, pubkey string) (sqlc.Peer, error)
28+
29+
InsertChannel(ctx context.Context,
30+
arg sqlc.InsertChannelParams) (int64, error)
31+
32+
GetChannelByChanPoint(ctx context.Context,
33+
channelPoint string) (sqlc.Channel, error)
34+
35+
GetChannelByShortChanID(ctx context.Context,
36+
shortChannelID int64) (sqlc.Channel, error)
37+
38+
InsertChannelEvent(ctx context.Context,
39+
arg sqlc.InsertChannelEventParams) error
40+
41+
GetChannelEvents(ctx context.Context,
42+
arg sqlc.GetChannelEventsParams) ([]sqlc.ChannelEvent, error)
43+
}
44+
45+
// Store provides access to the db for channel events.
46+
type Store struct {
47+
// db is all the higher level queries that the SQLStore has access to in
48+
// order to implement all its CRUD logic.
49+
db BatchedSQLQueries
50+
51+
// BaseDB represents the underlying database connection.
52+
*sqldb.BaseDB
53+
54+
clock clock.Clock
55+
}
56+
57+
// BatchedSQLQueries combines the SQLQueries interface with the BatchedTx
58+
// interface, allowing for multiple queries to be executed in single SQL
59+
// transaction.
60+
type BatchedSQLQueries interface {
61+
SQLQueries
62+
63+
sqldb.BatchedTx[SQLQueries]
64+
}
65+
66+
// SQLQueries is a subset of the sqlc.Queries interface that can be used to
67+
// interact with various chanevents tables.
68+
type SQLQueries interface {
69+
sqldb.BaseQuerier
70+
71+
Queries
72+
}
73+
74+
type SQLQueriesExecutor[T sqldb.BaseQuerier] struct {
75+
*sqldb.TransactionExecutor[T]
76+
77+
SQLQueries
78+
}
79+
80+
// NewStore creates a new SQLStore instance given an open SQLQueries storage
81+
// backend.
82+
func NewStore(sqlDB *sqldb.BaseDB, queries *sqlc.Queries,
83+
clock clock.Clock) *Store {
84+
85+
txExecutor := sqldb.NewTransactionExecutor(
86+
sqlDB,
87+
func(tx *sql.Tx) SQLQueries {
88+
return queries.WithTx(tx)
89+
},
90+
)
91+
92+
executor := &SQLQueriesExecutor[SQLQueries]{
93+
TransactionExecutor: txExecutor,
94+
SQLQueries: queries,
95+
}
96+
97+
return &Store{
98+
db: executor,
99+
BaseDB: sqlDB,
100+
clock: clock,
101+
}
102+
}
103+
104+
// AddPeer adds a new peer to the database.
105+
func (s *Store) AddPeer(ctx context.Context, pubkey string) (int64, error) {
106+
id, err := s.db.InsertPeer(ctx, pubkey)
107+
if err != nil {
108+
return 0, fmt.Errorf("failed to insert peer: %w", err)
109+
}
110+
111+
return id, nil
112+
}
113+
114+
// GetPeer retrieves a peer by their public key.
115+
func (s *Store) GetPeer(ctx context.Context, pubkey string) (*Peer, error) {
116+
dbPeer, err := s.db.GetPeerByPubKey(ctx, pubkey)
117+
if err != nil {
118+
if errors.Is(err, sql.ErrNoRows) {
119+
return nil, errUnknownPeer
120+
}
121+
122+
return nil, fmt.Errorf("failed to get peer: %w", err)
123+
}
124+
125+
return &Peer{
126+
ID: dbPeer.ID,
127+
PubKey: dbPeer.Pubkey,
128+
}, nil
129+
}
130+
131+
// int64ToSCID converts an int64 to a uint64 ShortChannelID. The BOLT spec
132+
// encodes SCIDs as uint64, but SQL only supports signed int64. We preserve the
133+
// bits, which means SCIDs with the high bit set will appear negative in the
134+
// database. Direct SQL queries (e.g. ORDER BY short_channel_id) will not sort
135+
// these correctly, but round-tripping through Go preserves the value.
136+
func int64ToSCID(i int64) uint64 {
137+
return uint64(i)
138+
}
139+
140+
// scidToInt64 converts a uint64 ShortChannelID to an int64 for SQL storage.
141+
func scidToInt64(u uint64) int64 {
142+
return int64(u)
143+
}
144+
145+
// AddChannel adds a new channel for a peer.
146+
func (s *Store) AddChannel(ctx context.Context, channelPoint string,
147+
shortChannelID uint64, peerID int64) (int64, error) {
148+
149+
id, err := s.db.InsertChannel(
150+
ctx, sqlc.InsertChannelParams{
151+
ChannelPoint: channelPoint,
152+
ShortChannelID: scidToInt64(shortChannelID),
153+
PeerID: peerID,
154+
},
155+
)
156+
if err != nil {
157+
return 0, fmt.Errorf("failed to insert channel: %w", err)
158+
}
159+
160+
return id, nil
161+
}
162+
163+
// GetChannel retrieves a channel by its channel point.
164+
func (s *Store) GetChannel(ctx context.Context, channelPoint string) (*Channel,
165+
error) {
166+
167+
dbChannel, err := s.db.GetChannelByChanPoint(ctx, channelPoint)
168+
if err != nil {
169+
if errors.Is(err, sql.ErrNoRows) {
170+
return nil, errUnknownChannel
171+
}
172+
173+
return nil, fmt.Errorf("failed to get channel: %w", err)
174+
}
175+
176+
return &Channel{
177+
ID: dbChannel.ID,
178+
ChannelPoint: dbChannel.ChannelPoint,
179+
ShortChannelID: int64ToSCID(dbChannel.ShortChannelID),
180+
PeerID: dbChannel.PeerID,
181+
}, nil
182+
}
183+
184+
// AddChannelEvent adds a new channel event.
185+
func (s *Store) AddChannelEvent(ctx context.Context,
186+
event *ChannelEvent) error {
187+
188+
var localBalance sql.NullInt64
189+
event.LocalBalance.WhenSome(
190+
func(b btcutil.Amount) {
191+
localBalance.Int64 = int64(b)
192+
localBalance.Valid = true
193+
},
194+
)
195+
196+
var remoteBalance sql.NullInt64
197+
event.RemoteBalance.WhenSome(
198+
func(b btcutil.Amount) {
199+
remoteBalance.Int64 = int64(b)
200+
remoteBalance.Valid = true
201+
},
202+
)
203+
204+
timestamp := event.Timestamp.UTC()
205+
if timestamp.IsZero() {
206+
timestamp = s.clock.Now().UTC()
207+
}
208+
209+
err := s.db.InsertChannelEvent(
210+
ctx, sqlc.InsertChannelEventParams{
211+
ChannelID: event.ChannelID,
212+
EventType: int16(event.EventType),
213+
Timestamp: timestamp,
214+
LocalBalanceSat: localBalance,
215+
RemoteBalanceSat: remoteBalance,
216+
},
217+
)
218+
if err != nil {
219+
return fmt.Errorf("failed to insert channel event: %w", err)
220+
}
221+
222+
return nil
223+
}
224+
225+
// GetChannelEvents retrieves all events for a channel within a given time
226+
// range.
227+
// TODO: Add pagination support (LIMIT/OFFSET) to prevent OOM on high-traffic
228+
// channels.
229+
func (s *Store) GetChannelEvents(ctx context.Context, channelID int64,
230+
startTime, endTime time.Time) ([]*ChannelEvent, error) {
231+
232+
dbEvents, err := s.db.GetChannelEvents(
233+
ctx, sqlc.GetChannelEventsParams{
234+
ChannelID: channelID,
235+
Timestamp: startTime.UTC(),
236+
Timestamp_2: endTime.UTC(),
237+
},
238+
)
239+
if err != nil {
240+
return nil, fmt.Errorf("failed to get channel events: %w", err)
241+
}
242+
243+
events := make([]*ChannelEvent, len(dbEvents))
244+
for i, dbEvent := range dbEvents {
245+
var localBalance fn.Option[btcutil.Amount]
246+
if dbEvent.LocalBalanceSat.Valid {
247+
amt := btcutil.Amount(dbEvent.LocalBalanceSat.Int64)
248+
localBalance = fn.Some(amt)
249+
}
250+
251+
var remoteBalance fn.Option[btcutil.Amount]
252+
if dbEvent.RemoteBalanceSat.Valid {
253+
amt := btcutil.Amount(dbEvent.RemoteBalanceSat.Int64)
254+
remoteBalance = fn.Some(amt)
255+
}
256+
257+
events[i] = &ChannelEvent{
258+
ID: dbEvent.ID,
259+
ChannelID: dbEvent.ChannelID,
260+
EventType: EventType(dbEvent.EventType),
261+
Timestamp: dbEvent.Timestamp.UTC(),
262+
LocalBalance: localBalance,
263+
RemoteBalance: remoteBalance,
264+
}
265+
}
266+
267+
return events, nil
268+
}

0 commit comments

Comments
 (0)