Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/xtcp2/xtcp2.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func defineFlags() *mainFlags {
f.s3SecretKey = flag.String("s3SecretKey", s3SecretKeyCst, "s3parquet: S3 secret key. Falls back to S3_SECRET_KEY env. Never logged.")
f.s3Region = flag.String("s3Region", s3RegionCst, "s3parquet: S3 region. Defaults to 'us-east-1' when empty; required by AWS, ignored by most MinIO setups.")
f.s3ParquetFlushBytes = flag.Uint("s3ParquetFlushBytes", s3ParquetFlushThresholdBytesCst, "s3parquet: soft cap on the in-memory Parquet builder's uncompressed row bytes before finalize+upload. 0 = daemon default (63 MiB).")
f.dest = flag.String("dest", destCst, "kafka:127.0.0.1:9092, udp:127.0.0.1:13000, or nsq:127.0.0.1:4150")
f.dest = flag.String("dest", destCst, "kafka:127.0.0.1:9092, udp:127.0.0.1:13000, nsq:127.0.0.1:4150, null, or stdout (pair stdout with -marshal protoJson)")
f.destWriteFiles = flag.Uint("destWriteFiles", DestWriteFilesCst, "Write out the marshaled data to destWriteFiles number of files ( for debugging only )")
f.topic = flag.String("topic", topicCst, "Kafka or NSQ topic")
f.xtcpProtoFile = flag.String("xtcpProtoFile", xtcpProtoFileCst, "xtcpProtoFile for registering with the schema registry")
Expand Down
3 changes: 2 additions & 1 deletion pkg/xtcp/destinations_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type DestinationFactory func(ctx context.Context, x *XTCP) (Destination, error)
// package network name accepted by net.Dial / net.Listen.
const (
schemeNull = "null"
schemeStdout = "stdout"
schemeUDP = "udp"
schemeUnix = "unix"
schemeUnixgram = "unixgram"
Expand All @@ -50,7 +51,7 @@ const (
// distinguish "unknown scheme" from "exists but not compiled into this
// binary" so the operator gets the right hint.
var knownSchemes = []string{
schemeNull, schemeUDP, schemeUnix, schemeUnixgram,
schemeNull, schemeStdout, schemeUDP, schemeUnix, schemeUnixgram,
schemeKafka, schemeNats, schemeNsq, schemeValkey,
schemeS3Parquet,
}
Expand Down
58 changes: 58 additions & 0 deletions pkg/xtcp/destinations_stdout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package xtcp

import (
"context"
"fmt"
"io"
"os"
)

// writerDest sends each marshaled record to an arbitrary io.Writer,
// newline-terminated (one frame per Send). It is the reusable core behind
// any stream sink: the "stdout" scheme wires it to os.Stdout, and a future
// stderr/file sink is a one-line factory over the same type rather than a
// copy of the Send/Close boilerplate.
//
// The io.Writer seam is also the test seam — unit tests inject a
// *bytes.Buffer and assert on the framing without touching the real
// os.Stdout (see destinations_stdout_test.go).
//
// Pair with `-marshal protoJson` (or protoText) to stream records as NDJSON
// for local development, debugging, or piping to jq. The daemon's logs go to
// stderr, so stdout carries only records.
//
// Send is invoked serially (see the Destination contract), so the writer is
// used without an internal mutex.
type writerDest struct {
x *XTCP
w io.Writer
label string // metric label, e.g. "destStdout"
}

// streamFrameSep terminates each record written by writerDest. Kept as a
// package-level slice so Send never appends to (and thus never reallocates
// or corrupts) the caller's pooled payload buffer.
var streamFrameSep = []byte{'\n'}

func (d *writerDest) Send(_ context.Context, b *[]byte) (int, error) {
d.x.pC.WithLabelValues(d.label, "start", "count").Inc()
n, err := d.w.Write(*b)
if err != nil {
return n, fmt.Errorf("%s write: %w", d.label, err)
}
if _, err := d.w.Write(streamFrameSep); err != nil {
return n, fmt.Errorf("%s newline: %w", d.label, err)
}
return n, nil
}

func (d *writerDest) Close() error { return nil }

// newStdoutDest wires writerDest to os.Stdout.
func newStdoutDest(_ context.Context, x *XTCP) (Destination, error) {
return &writerDest{x: x, w: os.Stdout, label: "destStdout"}, nil
}

func init() {
RegisterDestination(schemeStdout, newStdoutDest)
}
102 changes: 102 additions & 0 deletions pkg/xtcp/destinations_stdout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package xtcp

import (
"bytes"
"context"
"errors"
"os"
"testing"
)

// TestWriterDestFraming verifies the reusable writerDest writes each payload
// followed by a single newline, returns the payload byte count, and can be
// driven entirely through an injected io.Writer — no os.Stdout needed.
func TestWriterDestFraming(t *testing.T) {
x := newTestXTCP(t, schemeStdout)
var buf bytes.Buffer
d := &writerDest{x: x, w: &buf, label: "destStdout"}
ctx := context.Background()

payloads := [][]byte{[]byte(`{"a":1}`), []byte(`{"b":2}`)}
for _, p := range payloads {
b := append([]byte(nil), p...) // copy: Send must not mutate the caller's buffer
n, err := d.Send(ctx, &b)
if err != nil {
t.Fatalf("Send: %v", err)
}
if n != len(p) {
t.Errorf("n = %d, want %d", n, len(p))
}
if !bytes.Equal(b, p) {
t.Errorf("Send mutated the payload buffer: got %q want %q", b, p)
}
}

want := "{\"a\":1}\n{\"b\":2}\n"
if got := buf.String(); got != want {
t.Errorf("output = %q, want %q", got, want)
}
if err := d.Close(); err != nil {
t.Errorf("Close: %v", err)
}
}

var errBoom = errors.New("boom")

// failingWriter fails on the Nth Write (1-indexed); earlier writes succeed
// into buf. Used to exercise both error branches of writerDest.Send.
type failingWriter struct {
failOn int
calls int
buf bytes.Buffer
}

func (w *failingWriter) Write(p []byte) (int, error) {
w.calls++
if w.calls == w.failOn {
return 0, errBoom
}
return w.buf.Write(p)
}

func TestWriterDestPayloadWriteError(t *testing.T) {
x := newTestXTCP(t, schemeStdout)
d := &writerDest{x: x, w: &failingWriter{failOn: 1}, label: "destStdout"}
b := []byte("x")
if _, err := d.Send(context.Background(), &b); err == nil {
t.Fatal("expected error when the payload write fails")
}
}

func TestWriterDestNewlineWriteError(t *testing.T) {
x := newTestXTCP(t, schemeStdout)
d := &writerDest{x: x, w: &failingWriter{failOn: 2}, label: "destStdout"}
b := []byte("x")
if _, err := d.Send(context.Background(), &b); err == nil {
t.Fatal("expected error when the newline write fails")
}
}

// TestStdoutDestFactory confirms the "stdout" scheme is registered and that
// its factory defaults the writer to os.Stdout with the expected label.
func TestStdoutDestFactory(t *testing.T) {
if _, status := lookupDestinationFactory(schemeStdout); status != destLookupFound {
t.Fatalf("stdout scheme not registered: status %v", status)
}

x := newTestXTCP(t, schemeStdout)
dest, err := newStdoutDest(context.Background(), x)
if err != nil {
t.Fatalf("newStdoutDest: %v", err)
}
wd, ok := dest.(*writerDest)
if !ok {
t.Fatalf("newStdoutDest returned %T, want *writerDest", dest)
}
if wd.w != os.Stdout {
t.Error("default writer should be os.Stdout")
}
if wd.label != "destStdout" {
t.Errorf("label = %q, want destStdout", wd.label)
}
}
4 changes: 2 additions & 2 deletions pkg/xtcp/input_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (x *XTCP) validateInput() error {
}
}

if x.config.Dest != schemeNull {
if x.config.Dest != schemeNull && x.config.Dest != schemeStdout {

scheme, _, found := strings.Cut(x.config.Dest, ":")
if !found {
Expand All @@ -45,7 +45,7 @@ func (x *XTCP) validateInput() error {
// schemeNullPrefix colon) failed validation as "must contain x2
// colons" while the registry happily had a "null" factory.
switch scheme {
case schemeUnix, schemeUnixgram, schemeNull, schemeS3Parquet:
case schemeUnix, schemeUnixgram, schemeNull, schemeStdout, schemeS3Parquet:
// only the leading `<scheme>:` separator is required; the
// per-destination factory validates the rest further. s3parquet
// accepts a URL (http://host:port) which has its own colons.
Expand Down
18 changes: 18 additions & 0 deletions pkg/xtcp/input_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ func TestValidateInput_happyPaths(t *testing.T) {
Topic: "xtcp",
},
},
{
// stdout takes no payload, like null: both the bare form
// and the `:`-suffixed form must validate.
name: "stdout dest bare",
cfg: &xtcp_config.XtcpConfig{
MarshalTo: MarshallerProtoJSON,
Dest: schemeStdout,
Topic: "xtcp",
},
},
{
name: "stdout dest with trailing colon",
cfg: &xtcp_config.XtcpConfig{
MarshalTo: MarshallerProtoJSON,
Dest: "stdout:",
Topic: "xtcp",
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
49 changes: 49 additions & 0 deletions pkg/xtcp/marshallers.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,21 @@ func (x *XTCP) InitEnvelopeMarshallers(wg *sync.WaitGroup) {
return x.protobufListMarshal(e)
})

// The human-readable formats are also offered at the Envelope level so
// they work with the destination pipeline (which is envelope-based):
// `-marshal protoJson -dest stdout` prints one JSON Envelope per flush.
// Without these, EnvelopeMarshaller would be nil for any non-protobufList
// format and flushEnvelope would nil-deref.
x.EnvelopeMarshallers.Store(MarshallerProtoJSON, func(e *xtcp_flat_record.Envelope) (buf *[]byte) {
return x.envelopeProtoJSONMarshal(e)
})
x.EnvelopeMarshallers.Store(MarshallerProtoText, func(e *xtcp_flat_record.Envelope) (buf *[]byte) {
return x.envelopeProtoTextMarshal(e)
})
x.EnvelopeMarshallers.Store(MarshallerMsgPack, func(e *xtcp_flat_record.Envelope) (buf *[]byte) {
return x.envelopeMsgPackMarshal(e)
})

if f, ok := x.EnvelopeMarshallers.Load(x.config.MarshalTo); ok {
if m, ok2 := f.(func(e *xtcp_flat_record.Envelope) (buf *[]byte)); ok2 {
x.EnvelopeMarshaller = m
Expand Down Expand Up @@ -147,6 +162,40 @@ func (w *ByteSliceWriter) Write(b []byte) (n int, err error) {
return len(b), nil
}

// envelopeProtoJSONMarshal marshals a whole Envelope (batch of rows) to
// compact single-line JSON — one JSON object per flush, i.e. NDJSON when the
// stdout destination appends a newline. Pairs with `-dest stdout` for
// jq-able local output. (protojson.Marshal is compact; protojson.Format is
// multi-line pretty-print and would break the one-object-per-line contract.)
func (x *XTCP) envelopeProtoJSONMarshal(e *xtcp_flat_record.Envelope) (buf *[]byte) {
b, err := protojson.Marshal(e)
if err != nil {
x.pC.WithLabelValues("envelopeProtoJSONMarshal", "Marshal", "error").Inc()
if x.debugLevel > 10 {
log.Println("protojson.Marshal(envelope) err: ", err)
}
}
return &b
}

// envelopeProtoTextMarshal marshals a whole Envelope to protobuf text.
func (x *XTCP) envelopeProtoTextMarshal(e *xtcp_flat_record.Envelope) (buf *[]byte) {
b := []byte(prototext.Format(e))
return &b
}

// envelopeMsgPackMarshal marshals a whole Envelope to MsgPack via reflection.
func (x *XTCP) envelopeMsgPackMarshal(e *xtcp_flat_record.Envelope) (buf *[]byte) {
b, err := msgpack.Marshal(e)
if err != nil {
x.pC.WithLabelValues("envelopeMsgPackMarshal", "Marshal", "error").Inc()
if x.debugLevel > 1000 {
log.Println("envelopeMsgPackMarshal err: ", err)
}
}
return &b
}

// protoJsonMarshal marshals to JSON.
// https://pkg.go.dev/google.golang.org/protobuf/encoding/protojson
func (x *XTCP) protoJsonMarshal(r *xtcp_flat_record.XtcpFlatRecord) (buf *[]byte) {
Expand Down
55 changes: 55 additions & 0 deletions pkg/xtcp/marshallers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package xtcp

import (
"bytes"
"encoding/json"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -241,3 +242,57 @@ func TestInitEnvelopeMarshallers_anyDest(t *testing.T) {
})
}
}

// TestInitEnvelopeMarshallers_humanFormats verifies the non-protobufList
// formats are also resolvable at the Envelope level, so they work with the
// destination pipeline (e.g. `-marshal protoJson -dest stdout`). Before this
// they were per-record only, leaving EnvelopeMarshaller nil and panicking at
// flush.
func TestInitEnvelopeMarshallers_humanFormats(t *testing.T) {
for _, name := range []string{MarshallerProtoJSON, MarshallerProtoText, MarshallerMsgPack} {
t.Run(name, func(t *testing.T) {
x, _ := newMarshalFixture(t)
x.config.MarshalTo = name
x.config.Dest = schemeStdout
var wg sync.WaitGroup
wg.Add(1)
x.InitEnvelopeMarshallers(&wg)
wg.Wait()
if x.EnvelopeMarshaller == nil {
t.Fatalf("EnvelopeMarshaller nil for %q", name)
}
buf := x.EnvelopeMarshaller(&xtcp_flat_record.Envelope{
Row: []*xtcp_flat_record.XtcpFlatRecord{
{Hostname: "host-a", Netns: "/run/netns/ns-1", SocketFd: 11},
},
})
if buf == nil || len(*buf) == 0 {
t.Fatalf("%q envelope marshal returned empty buf", name)
}
})
}
}

// TestEnvelopeProtoJSONMarshal_validJSON confirms the Envelope JSON output is
// valid JSON and carries the rows, so `-dest stdout -marshal protoJson` emits
// parseable records.
func TestEnvelopeProtoJSONMarshal_validJSON(t *testing.T) {
x, _ := newMarshalFixture(t)
env := &xtcp_flat_record.Envelope{
Row: []*xtcp_flat_record.XtcpFlatRecord{
{Hostname: "host-a", Netns: "/run/netns/ns-1", SocketFd: 11},
{Hostname: "host-b", Netns: "/run/netns/ns-2", SocketFd: 22},
},
}
buf := x.envelopeProtoJSONMarshal(env)
if buf == nil || len(*buf) == 0 {
t.Fatal("envelopeProtoJSONMarshal returned empty buf")
}
var decoded map[string]any
if err := json.Unmarshal(*buf, &decoded); err != nil {
t.Fatalf("output is not valid JSON: %v\n%s", err, *buf)
}
if _, ok := decoded["row"]; !ok {
t.Errorf("JSON envelope missing 'row' field: %s", *buf)
}
}