-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmain.go
More file actions
223 lines (180 loc) · 6.08 KB
/
main.go
File metadata and controls
223 lines (180 loc) · 6.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package main
import (
"fmt"
"log"
"os"
"path/filepath"
"time"
"github.com/willibrandon/mtlog"
"github.com/willibrandon/mtlog/core"
"github.com/willibrandon/mtlog/sinks"
)
func main() {
// Create a temporary directory for buffer files
tempDir := filepath.Join(os.TempDir(), "mtlog-durable-example")
os.MkdirAll(tempDir, 0755)
defer os.RemoveAll(tempDir)
fmt.Printf("Durable buffer directory: %s\n", tempDir)
// Example 1: Basic durable buffering with Seq
fmt.Println("\n=== Example 1: Durable Seq Sink ===")
// Create a Seq sink that might fail (we'll use a mock if it fails)
var seqSinkInterface core.LogEventSink
seqSink, err := sinks.NewSeqSink("http://localhost:5341")
if err != nil {
fmt.Printf("Note: Seq sink failed to connect (expected): %v\n", err)
// Use a mock sink instead for demonstration
seqSinkInterface = &mockSink{name: "Seq"}
} else {
seqSinkInterface = seqSink
}
// Wrap with durable buffering
durableSeq, err := sinks.NewDurableSink(seqSinkInterface, sinks.DurableOptions{
BufferPath: filepath.Join(tempDir, "seq-buffer"),
MaxBufferSize: 1024 * 1024, // 1MB
RetryInterval: 5 * time.Second,
FlushInterval: 1 * time.Second,
BatchSize: 10,
OnError: func(err error) {
fmt.Printf("Durable sink error: %v\n", err)
},
})
if err != nil {
log.Fatalf("Failed to create durable sink: %v", err)
}
logger := mtlog.New(
mtlog.WithSink(durableSeq),
mtlog.WithProperty("Service", "DurableExample"),
)
// Log some events
logger.Information("Application started")
logger.Warning("This message will be buffered if Seq is unavailable")
logger.Error("Critical error occurred")
// Show metrics
showMetrics("Durable Seq", durableSeq)
durableSeq.Close()
// Example 2: Using convenience methods
fmt.Println("\n=== Example 2: Convenience Methods ===")
logger2 := mtlog.New(
mtlog.WithDurableSeq("http://localhost:5341", filepath.Join(tempDir, "convenient-buffer")),
mtlog.WithProperty("Example", "Convenience"),
)
logger2.Information("Using convenient durable Seq configuration")
logger2.Debug("Debug message with convenience method")
// Example 3: Configuration-based setup
fmt.Println("\n=== Example 3: Advanced Configuration ===")
// Create a failing sink for demonstration
failingSink := &failingSink{}
durableAdvanced, err := sinks.NewDurableSink(failingSink, sinks.DurableOptions{
BufferPath: filepath.Join(tempDir, "advanced-buffer"),
MaxBufferSize: 512 * 1024, // 512KB
MaxBufferFiles: 5,
RetryInterval: 2 * time.Second,
FlushInterval: 500 * time.Millisecond,
BatchSize: 5,
ShutdownTimeout: 10 * time.Second,
OnError: func(err error) {
fmt.Printf("Advanced sink error: %v\n", err)
},
})
if err != nil {
log.Fatalf("Failed to create advanced durable sink: %v", err)
}
logger3 := mtlog.New(
mtlog.WithSink(durableAdvanced),
mtlog.WithProperty("Mode", "Advanced"),
)
// Generate events that will be buffered
for i := 0; i < 10; i++ {
logger3.Information("Buffered message {Index}", i)
time.Sleep(100 * time.Millisecond)
}
showMetrics("Advanced Durable", durableAdvanced)
// Simulate sink recovery
fmt.Println("\nSimulating sink recovery...")
failingSink.Recover()
// Wait for retry
time.Sleep(3 * time.Second)
showMetrics("Advanced Durable (after recovery)", durableAdvanced)
durableAdvanced.Close()
// Example 4: Multiple sinks with different reliability
fmt.Println("\n=== Example 4: Mixed Reliability ===")
// Reliable console sink
consoleSink := sinks.NewConsoleSink()
// Unreliable remote sink with durable buffering
remoteSink := &intermittentSink{failureRate: 0.3}
durableRemote, err := sinks.NewDurableSink(remoteSink, sinks.DurableOptions{
BufferPath: filepath.Join(tempDir, "remote-buffer"),
RetryInterval: 1 * time.Second,
FlushInterval: 500 * time.Millisecond,
})
if err != nil {
log.Fatalf("Failed to create durable remote sink: %v", err)
}
logger4 := mtlog.New(
mtlog.WithSink(consoleSink), // Always works
mtlog.WithSink(durableRemote), // Sometimes fails, but buffered
mtlog.WithProperty("Reliability", "Mixed"),
)
// Generate mixed traffic
for i := 0; i < 5; i++ {
logger4.Information("Mixed reliability message {Index}", i)
time.Sleep(200 * time.Millisecond)
}
showMetrics("Mixed Remote", durableRemote)
durableRemote.Close()
fmt.Println("\n=== Durable Buffer Example Complete ===")
fmt.Printf("Buffer files created in: %s\n", tempDir)
}
func showMetrics(name string, sink *sinks.DurableSink) {
metrics := sink.GetMetrics()
fmt.Printf("%s Metrics:\n", name)
fmt.Printf(" Delivered: %d\n", metrics["delivered"])
fmt.Printf(" Buffered: %d\n", metrics["buffered"])
fmt.Printf(" Dropped: %d\n", metrics["dropped"])
fmt.Printf(" Retries: %d\n", metrics["retries"])
fmt.Printf(" Healthy: %t\n", sink.IsHealthy())
fmt.Println()
}
// Mock sink for demonstration
type mockSink struct {
name string
}
func (ms *mockSink) Emit(event *core.LogEvent) {
fmt.Printf("[%s] %v: %s\n", ms.name, event.Level, event.MessageTemplate)
}
func (ms *mockSink) Close() error {
return nil
}
// Failing sink for demonstration
type failingSink struct {
recovered bool
}
func (fs *failingSink) Emit(event *core.LogEvent) {
if !fs.recovered {
panic("sink is failing")
}
fmt.Printf("[RECOVERED] %v: %s\n", event.Level, event.MessageTemplate)
}
func (fs *failingSink) Close() error {
return nil
}
func (fs *failingSink) Recover() {
fs.recovered = true
fmt.Println("Sink has recovered!")
}
// Intermittent sink for demonstration
type intermittentSink struct {
failureRate float64
counter int
}
func (is *intermittentSink) Emit(event *core.LogEvent) {
is.counter++
// Fail based on counter pattern
if float64(is.counter%10) < is.failureRate*10 {
panic("intermittent failure")
}
fmt.Printf("[REMOTE] %v: %s\n", event.Level, event.MessageTemplate)
}
func (is *intermittentSink) Close() error {
return nil
}