-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtimescale.go
More file actions
164 lines (126 loc) · 3.22 KB
/
timescale.go
File metadata and controls
164 lines (126 loc) · 3.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package logpush
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log/slog"
"strconv"
"strings"
"time"
_ "github.com/lib/pq"
)
type sqltx interface {
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
}
func NewTimescaleWriter(dbUrl string) (*timescaleWriter, error) {
const version = "v2"
db, err := sql.Open("postgres", dbUrl)
if err != nil {
return nil, err
}
tableName := "logpush_entries_" + version
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
var tableExists = func() (bool, error) {
query := fmt.Sprintf("select exists (select 1 from %s)", tableName)
_, err := db.QueryContext(ctx, query)
if err == nil || strings.Contains(err.Error(), "does not exist") {
return err == nil, nil
}
return false, err
}
var tableInit = func() error {
query := fmt.Sprintf(`create table %s (
time timestamp with time zone not null,
tag text not null,
level text not null,
message text not null,
meta jsonb null
)`, tableName)
_, err := db.ExecContext(ctx, query)
return err
}
if exists, _ := tableExists(); !exists {
slog.Info("TIMESCALE: Setting up",
slog.String("table", tableName))
if err := tableInit(); err != nil {
db.Close()
return nil, err
}
}
return ×caleWriter{
db: db,
table: tableName,
version: version,
}, nil
}
type timescaleWriter struct {
db *sql.DB
table string
version string
}
func (this *timescaleWriter) Type() string {
return "timescale"
}
func (this *timescaleWriter) Version() string {
return this.version
}
func (this *timescaleWriter) Ping() error {
return this.db.Ping()
}
func (this *timescaleWriter) Close() error {
return this.db.Close()
}
func (this *timescaleWriter) WriteEntry(ctx context.Context, entry LogEntry) error {
return this.writeEntryTx(ctx, this.db, entry)
}
func (this *timescaleWriter) WriteBatch(ctx context.Context, batch []LogEntry) error {
tx, err := this.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
for _, val := range batch {
if err := this.writeEntryTx(ctx, tx, val); err != nil {
return err
}
}
return tx.Commit()
}
func (this *timescaleWriter) writeEntryTx(ctx context.Context, tx sqltx, entry LogEntry) error {
row := map[string]any{
"time": entry.Timestamp,
"tag": entry.StreamTag,
"level": entry.LogLevel.String(),
"message": entry.Message,
"meta": entry.Metadata,
}
if entry.Metadata != nil {
data, err := json.Marshal(entry.Metadata)
if err != nil {
return err
}
row["meta"] = string(data)
}
return sqlInsertContext(ctx, tx, this.table, row)
}
func sqlInsertContext(ctx context.Context, tx sqltx, table string, row map[string]any) error {
var columns []string
var args []any
for col, val := range row {
columns = append(columns, col)
args = append(args, val)
}
var bindvars []string
for idx := range columns {
bindvars = append(bindvars, "$"+strconv.Itoa(idx+1))
}
query := fmt.Sprintf("insert into %s (%s) values (%s)",
table,
strings.Join(columns, ", "),
strings.Join(bindvars, ", "))
_, err := tx.ExecContext(ctx, query, args...)
return err
}