Skip to content

Commit 02eeb22

Browse files
committed
add get-by-offset cmd
1 parent 84428f7 commit 02eeb22

2 files changed

Lines changed: 52 additions & 5 deletions

File tree

main.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ func main() {
1919
rootCmd.AddCommand(paths())
2020
rootCmd.AddCommand(publish())
2121
rootCmd.AddCommand(consume())
22+
rootCmd.AddCommand(getByOffset())
2223
rootCmd.AddCommand(receive())
2324
rootCmd.AddCommand(cleanup())
2425
rootCmd.AddCommand(logsRoot())
@@ -82,20 +83,24 @@ func output(v any, err error) error {
8283
if err != nil {
8384
return outputErr(err)
8485
}
85-
return outputValue(os.Stdout, v)
86+
return outputValue(v)
8687
}
8788

8889
func outputErr(err error) error {
8990
if err := klev.GetError(err); err != nil {
90-
if err := outputValue(os.Stderr, err); err != nil {
91+
if err := outputValueTo(os.Stderr, err); err != nil {
9192
return err
9293
}
9394
os.Exit(1)
9495
}
9596
return err
9697
}
9798

98-
func outputValue(w io.Writer, v any) error {
99+
func outputValue(v any) error {
100+
return outputValueTo(os.Stdout, v)
101+
}
102+
103+
func outputValueTo(w io.Writer, v any) error {
99104
enc := json.NewEncoder(w)
100105
enc.SetIndent("", " ")
101106
return enc.Encode(v)

messages.go

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func consume() *cobra.Command {
131131
for repeat {
132132
next, out, err := klient.Messages.Consume(cmd.Context(), id, opts...)
133133
if err != nil {
134-
return output("", err)
134+
return outputErr(err)
135135
}
136136

137137
var msgs = make([]klev.ConsumeMessageOut, len(out))
@@ -161,6 +161,48 @@ func consume() *cobra.Command {
161161
return cmd
162162
}
163163

164+
func getByOffset() *cobra.Command {
165+
cmd := &cobra.Command{
166+
Use: "get-by-offset <log-id>",
167+
Short: "get message by offset",
168+
Args: cobra.ExactArgs(1),
169+
}
170+
171+
offset := cmd.Flags().Int64("offset", klev.OffsetNewest, "the starting offset (defaults to newest message)")
172+
encoding := cmd.Flags().String("encoding", "string", "how to convert message payload")
173+
174+
cmd.RunE = func(cmd *cobra.Command, args []string) error {
175+
id, err := klev.ParseLogID(args[0])
176+
if err != nil {
177+
return outputErr(err)
178+
}
179+
180+
var coder = klev.MessageEncodingString
181+
if cmd.Flags().Changed("encoding") {
182+
encoding, err := klev.ParseMessageEncoding(*encoding)
183+
if err != nil {
184+
return outputErr(err)
185+
}
186+
coder = encoding
187+
}
188+
189+
msg, err := klient.Messages.GetByOffset(cmd.Context(), id, *offset)
190+
if err != nil {
191+
return outputErr(err)
192+
}
193+
194+
outMessage := klev.ConsumeMessageOut{
195+
Offset: msg.Offset,
196+
Time: coder.EncodeTime(msg.Time),
197+
Key: coder.EncodeData(msg.Key),
198+
Value: coder.EncodeData(msg.Value),
199+
}
200+
201+
return outputValue(outMessage)
202+
}
203+
204+
return cmd
205+
}
164206
func receive() *cobra.Command {
165207
cmd := &cobra.Command{
166208
Use: "receive",
@@ -175,7 +217,7 @@ func receive() *cobra.Command {
175217
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
176218
msg, err := ingress_validate.Message(w, r, time.Now, *secret)
177219
if err != nil {
178-
outputValue(os.Stderr, err)
220+
outputErr(err)
179221
}
180222
fmt.Printf("Offset: %d\n Time: %v\n Key: %s\n Value: %s\n",
181223
msg.Offset, msg.Time, msg.Key, msg.Value)

0 commit comments

Comments
 (0)