From 0d37b7a4d77797f39f2e62e2a61abeab04bf8c3f Mon Sep 17 00:00:00 2001 From: "randomizedcoder dave.seddon.ca@gmail.com" Date: Tue, 16 Jun 2026 22:37:22 -0700 Subject: [PATCH] feat(xtcp): add stdout destination + envelope JSON/text marshallers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a `stdout` destination so the daemon can print socket data to its own stdout — e.g. `xtcp2 -dest stdout -marshal protoJson` streams one JSON Envelope per poll as NDJSON, ideal for local dev, debugging, and piping to jq. The destination is backed by a reusable, io.Writer-based `writerDest` (default os.Stdout) rather than copying the null/udp boilerplate; the io.Writer seam also makes it unit-testable with a bytes.Buffer. Fix a latent bug this exposed: the destination pipeline is envelope-based, but only protobufList was registered as an EnvelopeMarshaller, so `-marshal protoJson|protoText|msgpack -dest ` nil-deref panicked at flushEnvelope. Register envelope-level marshallers for all three; protoJson uses protojson.Marshal (compact, one object per line). - pkg/xtcp/destinations_stdout.go: writerDest (io.Writer seam) + stdout factory - pkg/xtcp/destinations_core.go: schemeStdout const + knownSchemes - pkg/xtcp/input_validation.go: treat stdout like null (bare + ":"-suffixed) - pkg/xtcp/marshallers.go: envelope protoJson/protoText/msgpack marshallers - cmd/xtcp2: mention stdout in -dest help - tests for the writer seam, error paths, factory, validation, and envelope JSON output (valid one-line JSON with a row array) Verified end-to-end in a container: `-dest stdout -marshal protoJson -d 1` prints one-line JSON envelopes of real host sockets to stdout (logs stay on stderr; config dump only at -d >10). Co-Authored-By: Claude Opus 4.8 --- cmd/xtcp2/xtcp2.go | 2 +- pkg/xtcp/destinations_core.go | 3 +- pkg/xtcp/destinations_stdout.go | 58 +++++++++++++++ pkg/xtcp/destinations_stdout_test.go | 102 +++++++++++++++++++++++++++ pkg/xtcp/input_validation.go | 4 +- pkg/xtcp/input_validation_test.go | 18 +++++ pkg/xtcp/marshallers.go | 49 +++++++++++++ pkg/xtcp/marshallers_test.go | 55 +++++++++++++++ 8 files changed, 287 insertions(+), 4 deletions(-) create mode 100644 pkg/xtcp/destinations_stdout.go create mode 100644 pkg/xtcp/destinations_stdout_test.go diff --git a/cmd/xtcp2/xtcp2.go b/cmd/xtcp2/xtcp2.go index 3212b87..76f8ff0 100644 --- a/cmd/xtcp2/xtcp2.go +++ b/cmd/xtcp2/xtcp2.go @@ -241,7 +241,7 @@ func defineFlags() *mainFlags { f.s3SecretKey = flag.String("s3SecretKey", s3SecretKeyCst, "s3parquet: S3 secret key. Falls back to S3_SECRET_KEY env. Never logged.") f.s3Region = flag.String("s3Region", s3RegionCst, "s3parquet: S3 region. Defaults to 'us-east-1' when empty; required by AWS, ignored by most MinIO setups.") f.s3ParquetFlushBytes = flag.Uint("s3ParquetFlushBytes", s3ParquetFlushThresholdBytesCst, "s3parquet: soft cap on the in-memory Parquet builder's uncompressed row bytes before finalize+upload. 0 = daemon default (63 MiB).") - f.dest = flag.String("dest", destCst, "kafka:127.0.0.1:9092, udp:127.0.0.1:13000, or nsq:127.0.0.1:4150") + f.dest = flag.String("dest", destCst, "kafka:127.0.0.1:9092, udp:127.0.0.1:13000, nsq:127.0.0.1:4150, null, or stdout (pair stdout with -marshal protoJson)") f.destWriteFiles = flag.Uint("destWriteFiles", DestWriteFilesCst, "Write out the marshaled data to destWriteFiles number of files ( for debugging only )") f.topic = flag.String("topic", topicCst, "Kafka or NSQ topic") f.xtcpProtoFile = flag.String("xtcpProtoFile", xtcpProtoFileCst, "xtcpProtoFile for registering with the schema registry") diff --git a/pkg/xtcp/destinations_core.go b/pkg/xtcp/destinations_core.go index 7daf237..5c4cc46 100644 --- a/pkg/xtcp/destinations_core.go +++ b/pkg/xtcp/destinations_core.go @@ -30,6 +30,7 @@ type DestinationFactory func(ctx context.Context, x *XTCP) (Destination, error) // package network name accepted by net.Dial / net.Listen. const ( schemeNull = "null" + schemeStdout = "stdout" schemeUDP = "udp" schemeUnix = "unix" schemeUnixgram = "unixgram" @@ -50,7 +51,7 @@ const ( // distinguish "unknown scheme" from "exists but not compiled into this // binary" so the operator gets the right hint. var knownSchemes = []string{ - schemeNull, schemeUDP, schemeUnix, schemeUnixgram, + schemeNull, schemeStdout, schemeUDP, schemeUnix, schemeUnixgram, schemeKafka, schemeNats, schemeNsq, schemeValkey, schemeS3Parquet, } diff --git a/pkg/xtcp/destinations_stdout.go b/pkg/xtcp/destinations_stdout.go new file mode 100644 index 0000000..fb49d92 --- /dev/null +++ b/pkg/xtcp/destinations_stdout.go @@ -0,0 +1,58 @@ +package xtcp + +import ( + "context" + "fmt" + "io" + "os" +) + +// writerDest sends each marshaled record to an arbitrary io.Writer, +// newline-terminated (one frame per Send). It is the reusable core behind +// any stream sink: the "stdout" scheme wires it to os.Stdout, and a future +// stderr/file sink is a one-line factory over the same type rather than a +// copy of the Send/Close boilerplate. +// +// The io.Writer seam is also the test seam — unit tests inject a +// *bytes.Buffer and assert on the framing without touching the real +// os.Stdout (see destinations_stdout_test.go). +// +// Pair with `-marshal protoJson` (or protoText) to stream records as NDJSON +// for local development, debugging, or piping to jq. The daemon's logs go to +// stderr, so stdout carries only records. +// +// Send is invoked serially (see the Destination contract), so the writer is +// used without an internal mutex. +type writerDest struct { + x *XTCP + w io.Writer + label string // metric label, e.g. "destStdout" +} + +// streamFrameSep terminates each record written by writerDest. Kept as a +// package-level slice so Send never appends to (and thus never reallocates +// or corrupts) the caller's pooled payload buffer. +var streamFrameSep = []byte{'\n'} + +func (d *writerDest) Send(_ context.Context, b *[]byte) (int, error) { + d.x.pC.WithLabelValues(d.label, "start", "count").Inc() + n, err := d.w.Write(*b) + if err != nil { + return n, fmt.Errorf("%s write: %w", d.label, err) + } + if _, err := d.w.Write(streamFrameSep); err != nil { + return n, fmt.Errorf("%s newline: %w", d.label, err) + } + return n, nil +} + +func (d *writerDest) Close() error { return nil } + +// newStdoutDest wires writerDest to os.Stdout. +func newStdoutDest(_ context.Context, x *XTCP) (Destination, error) { + return &writerDest{x: x, w: os.Stdout, label: "destStdout"}, nil +} + +func init() { + RegisterDestination(schemeStdout, newStdoutDest) +} diff --git a/pkg/xtcp/destinations_stdout_test.go b/pkg/xtcp/destinations_stdout_test.go new file mode 100644 index 0000000..707bbd5 --- /dev/null +++ b/pkg/xtcp/destinations_stdout_test.go @@ -0,0 +1,102 @@ +package xtcp + +import ( + "bytes" + "context" + "errors" + "os" + "testing" +) + +// TestWriterDestFraming verifies the reusable writerDest writes each payload +// followed by a single newline, returns the payload byte count, and can be +// driven entirely through an injected io.Writer — no os.Stdout needed. +func TestWriterDestFraming(t *testing.T) { + x := newTestXTCP(t, schemeStdout) + var buf bytes.Buffer + d := &writerDest{x: x, w: &buf, label: "destStdout"} + ctx := context.Background() + + payloads := [][]byte{[]byte(`{"a":1}`), []byte(`{"b":2}`)} + for _, p := range payloads { + b := append([]byte(nil), p...) // copy: Send must not mutate the caller's buffer + n, err := d.Send(ctx, &b) + if err != nil { + t.Fatalf("Send: %v", err) + } + if n != len(p) { + t.Errorf("n = %d, want %d", n, len(p)) + } + if !bytes.Equal(b, p) { + t.Errorf("Send mutated the payload buffer: got %q want %q", b, p) + } + } + + want := "{\"a\":1}\n{\"b\":2}\n" + if got := buf.String(); got != want { + t.Errorf("output = %q, want %q", got, want) + } + if err := d.Close(); err != nil { + t.Errorf("Close: %v", err) + } +} + +var errBoom = errors.New("boom") + +// failingWriter fails on the Nth Write (1-indexed); earlier writes succeed +// into buf. Used to exercise both error branches of writerDest.Send. +type failingWriter struct { + failOn int + calls int + buf bytes.Buffer +} + +func (w *failingWriter) Write(p []byte) (int, error) { + w.calls++ + if w.calls == w.failOn { + return 0, errBoom + } + return w.buf.Write(p) +} + +func TestWriterDestPayloadWriteError(t *testing.T) { + x := newTestXTCP(t, schemeStdout) + d := &writerDest{x: x, w: &failingWriter{failOn: 1}, label: "destStdout"} + b := []byte("x") + if _, err := d.Send(context.Background(), &b); err == nil { + t.Fatal("expected error when the payload write fails") + } +} + +func TestWriterDestNewlineWriteError(t *testing.T) { + x := newTestXTCP(t, schemeStdout) + d := &writerDest{x: x, w: &failingWriter{failOn: 2}, label: "destStdout"} + b := []byte("x") + if _, err := d.Send(context.Background(), &b); err == nil { + t.Fatal("expected error when the newline write fails") + } +} + +// TestStdoutDestFactory confirms the "stdout" scheme is registered and that +// its factory defaults the writer to os.Stdout with the expected label. +func TestStdoutDestFactory(t *testing.T) { + if _, status := lookupDestinationFactory(schemeStdout); status != destLookupFound { + t.Fatalf("stdout scheme not registered: status %v", status) + } + + x := newTestXTCP(t, schemeStdout) + dest, err := newStdoutDest(context.Background(), x) + if err != nil { + t.Fatalf("newStdoutDest: %v", err) + } + wd, ok := dest.(*writerDest) + if !ok { + t.Fatalf("newStdoutDest returned %T, want *writerDest", dest) + } + if wd.w != os.Stdout { + t.Error("default writer should be os.Stdout") + } + if wd.label != "destStdout" { + t.Errorf("label = %q, want destStdout", wd.label) + } +} diff --git a/pkg/xtcp/input_validation.go b/pkg/xtcp/input_validation.go index ef4438a..b8126a7 100644 --- a/pkg/xtcp/input_validation.go +++ b/pkg/xtcp/input_validation.go @@ -26,7 +26,7 @@ func (x *XTCP) validateInput() error { } } - if x.config.Dest != schemeNull { + if x.config.Dest != schemeNull && x.config.Dest != schemeStdout { scheme, _, found := strings.Cut(x.config.Dest, ":") if !found { @@ -45,7 +45,7 @@ func (x *XTCP) validateInput() error { // schemeNullPrefix colon) failed validation as "must contain x2 // colons" while the registry happily had a "null" factory. switch scheme { - case schemeUnix, schemeUnixgram, schemeNull, schemeS3Parquet: + case schemeUnix, schemeUnixgram, schemeNull, schemeStdout, schemeS3Parquet: // only the leading `:` separator is required; the // per-destination factory validates the rest further. s3parquet // accepts a URL (http://host:port) which has its own colons. diff --git a/pkg/xtcp/input_validation_test.go b/pkg/xtcp/input_validation_test.go index 1330778..e834457 100644 --- a/pkg/xtcp/input_validation_test.go +++ b/pkg/xtcp/input_validation_test.go @@ -87,6 +87,24 @@ func TestValidateInput_happyPaths(t *testing.T) { Topic: "xtcp", }, }, + { + // stdout takes no payload, like null: both the bare form + // and the `:`-suffixed form must validate. + name: "stdout dest bare", + cfg: &xtcp_config.XtcpConfig{ + MarshalTo: MarshallerProtoJSON, + Dest: schemeStdout, + Topic: "xtcp", + }, + }, + { + name: "stdout dest with trailing colon", + cfg: &xtcp_config.XtcpConfig{ + MarshalTo: MarshallerProtoJSON, + Dest: "stdout:", + Topic: "xtcp", + }, + }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { diff --git a/pkg/xtcp/marshallers.go b/pkg/xtcp/marshallers.go index 981dca9..8149baa 100644 --- a/pkg/xtcp/marshallers.go +++ b/pkg/xtcp/marshallers.go @@ -108,6 +108,21 @@ func (x *XTCP) InitEnvelopeMarshallers(wg *sync.WaitGroup) { return x.protobufListMarshal(e) }) + // The human-readable formats are also offered at the Envelope level so + // they work with the destination pipeline (which is envelope-based): + // `-marshal protoJson -dest stdout` prints one JSON Envelope per flush. + // Without these, EnvelopeMarshaller would be nil for any non-protobufList + // format and flushEnvelope would nil-deref. + x.EnvelopeMarshallers.Store(MarshallerProtoJSON, func(e *xtcp_flat_record.Envelope) (buf *[]byte) { + return x.envelopeProtoJSONMarshal(e) + }) + x.EnvelopeMarshallers.Store(MarshallerProtoText, func(e *xtcp_flat_record.Envelope) (buf *[]byte) { + return x.envelopeProtoTextMarshal(e) + }) + x.EnvelopeMarshallers.Store(MarshallerMsgPack, func(e *xtcp_flat_record.Envelope) (buf *[]byte) { + return x.envelopeMsgPackMarshal(e) + }) + if f, ok := x.EnvelopeMarshallers.Load(x.config.MarshalTo); ok { if m, ok2 := f.(func(e *xtcp_flat_record.Envelope) (buf *[]byte)); ok2 { x.EnvelopeMarshaller = m @@ -147,6 +162,40 @@ func (w *ByteSliceWriter) Write(b []byte) (n int, err error) { return len(b), nil } +// envelopeProtoJSONMarshal marshals a whole Envelope (batch of rows) to +// compact single-line JSON — one JSON object per flush, i.e. NDJSON when the +// stdout destination appends a newline. Pairs with `-dest stdout` for +// jq-able local output. (protojson.Marshal is compact; protojson.Format is +// multi-line pretty-print and would break the one-object-per-line contract.) +func (x *XTCP) envelopeProtoJSONMarshal(e *xtcp_flat_record.Envelope) (buf *[]byte) { + b, err := protojson.Marshal(e) + if err != nil { + x.pC.WithLabelValues("envelopeProtoJSONMarshal", "Marshal", "error").Inc() + if x.debugLevel > 10 { + log.Println("protojson.Marshal(envelope) err: ", err) + } + } + return &b +} + +// envelopeProtoTextMarshal marshals a whole Envelope to protobuf text. +func (x *XTCP) envelopeProtoTextMarshal(e *xtcp_flat_record.Envelope) (buf *[]byte) { + b := []byte(prototext.Format(e)) + return &b +} + +// envelopeMsgPackMarshal marshals a whole Envelope to MsgPack via reflection. +func (x *XTCP) envelopeMsgPackMarshal(e *xtcp_flat_record.Envelope) (buf *[]byte) { + b, err := msgpack.Marshal(e) + if err != nil { + x.pC.WithLabelValues("envelopeMsgPackMarshal", "Marshal", "error").Inc() + if x.debugLevel > 1000 { + log.Println("envelopeMsgPackMarshal err: ", err) + } + } + return &b +} + // protoJsonMarshal marshals to JSON. // https://pkg.go.dev/google.golang.org/protobuf/encoding/protojson func (x *XTCP) protoJsonMarshal(r *xtcp_flat_record.XtcpFlatRecord) (buf *[]byte) { diff --git a/pkg/xtcp/marshallers_test.go b/pkg/xtcp/marshallers_test.go index 9063ad4..b222d66 100644 --- a/pkg/xtcp/marshallers_test.go +++ b/pkg/xtcp/marshallers_test.go @@ -2,6 +2,7 @@ package xtcp import ( "bytes" + "encoding/json" "strings" "sync" "testing" @@ -241,3 +242,57 @@ func TestInitEnvelopeMarshallers_anyDest(t *testing.T) { }) } } + +// TestInitEnvelopeMarshallers_humanFormats verifies the non-protobufList +// formats are also resolvable at the Envelope level, so they work with the +// destination pipeline (e.g. `-marshal protoJson -dest stdout`). Before this +// they were per-record only, leaving EnvelopeMarshaller nil and panicking at +// flush. +func TestInitEnvelopeMarshallers_humanFormats(t *testing.T) { + for _, name := range []string{MarshallerProtoJSON, MarshallerProtoText, MarshallerMsgPack} { + t.Run(name, func(t *testing.T) { + x, _ := newMarshalFixture(t) + x.config.MarshalTo = name + x.config.Dest = schemeStdout + var wg sync.WaitGroup + wg.Add(1) + x.InitEnvelopeMarshallers(&wg) + wg.Wait() + if x.EnvelopeMarshaller == nil { + t.Fatalf("EnvelopeMarshaller nil for %q", name) + } + buf := x.EnvelopeMarshaller(&xtcp_flat_record.Envelope{ + Row: []*xtcp_flat_record.XtcpFlatRecord{ + {Hostname: "host-a", Netns: "/run/netns/ns-1", SocketFd: 11}, + }, + }) + if buf == nil || len(*buf) == 0 { + t.Fatalf("%q envelope marshal returned empty buf", name) + } + }) + } +} + +// TestEnvelopeProtoJSONMarshal_validJSON confirms the Envelope JSON output is +// valid JSON and carries the rows, so `-dest stdout -marshal protoJson` emits +// parseable records. +func TestEnvelopeProtoJSONMarshal_validJSON(t *testing.T) { + x, _ := newMarshalFixture(t) + env := &xtcp_flat_record.Envelope{ + Row: []*xtcp_flat_record.XtcpFlatRecord{ + {Hostname: "host-a", Netns: "/run/netns/ns-1", SocketFd: 11}, + {Hostname: "host-b", Netns: "/run/netns/ns-2", SocketFd: 22}, + }, + } + buf := x.envelopeProtoJSONMarshal(env) + if buf == nil || len(*buf) == 0 { + t.Fatal("envelopeProtoJSONMarshal returned empty buf") + } + var decoded map[string]any + if err := json.Unmarshal(*buf, &decoded); err != nil { + t.Fatalf("output is not valid JSON: %v\n%s", err, *buf) + } + if _, ok := decoded["row"]; !ok { + t.Errorf("JSON envelope missing 'row' field: %s", *buf) + } +}