From 14c218a96f8f5cf935e741b3cf3f946f1fe604f6 Mon Sep 17 00:00:00 2001 From: "randomizedcoder dave.seddon.ca@gmail.com" Date: Wed, 17 Jun 2026 09:20:42 -0700 Subject: [PATCH] =?UTF-8?q?feat(xtcp):=20wide-scale=20output=20=E2=80=94?= =?UTF-8?q?=20jsonl/csv/tsv=20formats;=20tcp/file/stderr/http=20sinks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make socket data easy to analyze with common tools. Adds, all reusing the existing envelope-marshaller + destination patterns: Formats (envelope marshallers, marshallers_text.go): - jsonl — one raw JSON record per line (NDJSON / ClickHouse JSONEachRow) - csv/tsv — reflection-generated columns (all 122 XtcpFlatRecord fields by default, or a -columns subset), humanized: IPs as dotted-quad/v6, TCP state and congestion as names, timestamp as RFC3339 (humanize.go, flat_record_row.go). Header written once per stream. Destinations (stdlib, no build tag): - tcp:host:port — reliable streaming to Vector/Logstash/Fluentd/nc (fills the UDP-only gap) - file:/path — append to a file (reuses writerDest, 0600) - stderr — one-liner over writerDest - http(s)://... — POST each batch; Content-Type derived from the marshaller Framing: marshallers now own their trailing newline and writerDest/tcp/http write bytes verbatim, so text formats are newline-delimited on every sink while protobufList stays a clean length-delimited stream. Config: new -columns flag → csv_columns proto field; marshal_to min_len 4→3 so "csv"/"tsv" validate (regen-protos updated all language bindings). Tests cover the humanizer (incl. the IPv4-in-16-byte-slot regression), reflection columns + selection, jsonl/csv/tsv output, and tcp/file/stderr/http sinks (net.Listen / httptest). Verified end-to-end in a container: csv/jsonl of real host sockets to stdout with humanized addresses/states. Co-Authored-By: Claude Opus 4.8 --- cmd/xtcp2/xtcp2.go | 8 +- cmd/xtcp2/xtcp2_test.go | 4 + dart/xtcp_config/v1/xtcp_config.pb.dart | 18 ++ dart/xtcp_config/v1/xtcp_config.pbjson.dart | 10 +- gen/xtcp_config/v1/xtcp_config.pb.cc | 99 ++++++---- gen/xtcp_config/v1/xtcp_config.pb.h | 70 ++++++- gen/xtcp_config/v1/xtcp_config.pb.validate.cc | 19 +- pkg/xtcp/destinations_core.go | 9 +- pkg/xtcp/destinations_file.go | 38 ++++ pkg/xtcp/destinations_http.go | 111 +++++++++++ pkg/xtcp/destinations_stdout.go | 50 ++--- pkg/xtcp/destinations_stdout_test.go | 44 ++--- pkg/xtcp/destinations_stream_test.go | 184 ++++++++++++++++++ pkg/xtcp/destinations_tcp.go | 58 ++++++ pkg/xtcp/flat_record_row.go | 141 ++++++++++++++ pkg/xtcp/flat_record_row_test.go | 101 ++++++++++ pkg/xtcp/humanize.go | 93 +++++++++ pkg/xtcp/humanize_test.go | 70 +++++++ pkg/xtcp/input_validation.go | 5 +- pkg/xtcp/input_validation_test.go | 40 ++++ pkg/xtcp/marshallers.go | 27 ++- pkg/xtcp/marshallers_text.go | 105 ++++++++++ pkg/xtcp/marshallers_text_test.go | 116 +++++++++++ pkg/xtcp_config/xtcp_config.pb.go | 24 ++- proto/xtcp_config/v1/xtcp_config.proto | 10 +- python/xtcp_config/v1/xtcp_config_pb2.py | 20 +- python/xtcp_config/v1/xtcp_config_pb2.pyi | 6 +- xtcp_config/v1/xtcp_config.swagger.json | 4 + 28 files changed, 1361 insertions(+), 123 deletions(-) create mode 100644 pkg/xtcp/destinations_file.go create mode 100644 pkg/xtcp/destinations_http.go create mode 100644 pkg/xtcp/destinations_stream_test.go create mode 100644 pkg/xtcp/destinations_tcp.go create mode 100644 pkg/xtcp/flat_record_row.go create mode 100644 pkg/xtcp/flat_record_row_test.go create mode 100644 pkg/xtcp/humanize.go create mode 100644 pkg/xtcp/humanize_test.go create mode 100644 pkg/xtcp/marshallers_text.go create mode 100644 pkg/xtcp/marshallers_text_test.go diff --git a/cmd/xtcp2/xtcp2.go b/cmd/xtcp2/xtcp2.go index 76f8ff0..b8acb4f 100644 --- a/cmd/xtcp2/xtcp2.go +++ b/cmd/xtcp2/xtcp2.go @@ -179,6 +179,7 @@ type mainFlags struct { capturePath *string modulus *uint64 marshal *string + columns *string envelopeFlushBytes *uint envelopeFlushRows *uint kafkaCompression *string @@ -230,7 +231,8 @@ func defineFlags() *mainFlags { f.writeFiles = flag.Uint("writeFiles", WriteFilesCst, "Write netlink packets to writeFiles number of files ( to generate test data ) per netlinker") f.capturePath = flag.String("capturePath", capturePathCst, "Write files path") f.modulus = flag.Uint64("modulus", modulusCst, "modulus. Report every X inetd messages to output") - f.marshal = flag.String("marshal", marshalCst, "Marshaling of the exported data (protobufList, protoJson, protoText, msgpack)") + f.marshal = flag.String("marshal", marshalCst, "Marshaling of the exported data (protobufList, protoJson, protoText, msgpack, jsonl, csv, tsv)") + f.columns = flag.String("columns", "", "csv/tsv only: comma-separated subset of XtcpFlatRecord json field names (e.g. hostname,inetDiagMsgSocketSourcePort,inetDiagMsgState,tcpInfoRtt); empty = all") f.envelopeFlushBytes = flag.Uint("envelopeFlushBytes", envelopeFlushBytesCst, "Safety-net cap on the in-flight protobufList Envelope's UNCOMPRESSED proto size in bytes (franz-go compresses post-flush, so wire size is typically 3-8x smaller). 0 = use daemon default (768 KiB). Whichever cap (bytes/rows) trips first wins.") f.envelopeFlushRows = flag.Uint("envelopeFlushRows", envelopeFlushRowsCst, "Primary cap on the in-flight protobufList Envelope's row count. 0 = use daemon default (10000). Cheap, predictable; pairs with -envelopeFlushBytes as a safety net.") f.kafkaCompression = flag.String("kafkaCompression", kafkaCompressionCst, "Kafka producer compression codec. '' or 'auto' = preference list [zstd,lz4,snappy,none] negotiated with broker; or pin one of: zstd, lz4, snappy, gzip, none. All codecs are decodable by Redpanda + ClickHouse's Kafka engine.") @@ -241,7 +243,7 @@ func defineFlags() *mainFlags { f.s3SecretKey = flag.String("s3SecretKey", s3SecretKeyCst, "s3parquet: S3 secret key. Falls back to S3_SECRET_KEY env. Never logged.") f.s3Region = flag.String("s3Region", s3RegionCst, "s3parquet: S3 region. Defaults to 'us-east-1' when empty; required by AWS, ignored by most MinIO setups.") f.s3ParquetFlushBytes = flag.Uint("s3ParquetFlushBytes", s3ParquetFlushThresholdBytesCst, "s3parquet: soft cap on the in-memory Parquet builder's uncompressed row bytes before finalize+upload. 0 = daemon default (63 MiB).") - f.dest = flag.String("dest", destCst, "kafka:127.0.0.1:9092, udp:127.0.0.1:13000, nsq:127.0.0.1:4150, null, or stdout (pair stdout with -marshal protoJson)") + f.dest = flag.String("dest", destCst, "scheme:addr — kafka:host:9092, nats:..., nsq:..., valkey:..., udp:host:13000, tcp:host:9000, unix:/path, unixgram:/path, file:/path, http(s)://host/ingest, s3parquet:..., stdout, stderr, null (pair stdout/file/tcp with -marshal jsonl|csv|tsv)") f.destWriteFiles = flag.Uint("destWriteFiles", DestWriteFilesCst, "Write out the marshaled data to destWriteFiles number of files ( for debugging only )") f.topic = flag.String("topic", topicCst, "Kafka or NSQ topic") f.xtcpProtoFile = flag.String("xtcpProtoFile", xtcpProtoFileCst, "xtcpProtoFile for registering with the schema registry") @@ -292,6 +294,7 @@ func printFlags(f *mainFlags) { fmt.Println("*capturePath:", *f.capturePath) fmt.Println("*modulus:", *f.modulus) fmt.Println("*marshal:", *f.marshal) + fmt.Println("*columns:", *f.columns) fmt.Println("*envelopeFlushBytes:", *f.envelopeFlushBytes) fmt.Println("*envelopeFlushRows:", *f.envelopeFlushRows) fmt.Println("*kafkaCompression:", *f.kafkaCompression) @@ -333,6 +336,7 @@ func buildConfig(f *mainFlags, des *xtcp_config.EnabledDeserializers) *xtcp_conf CapturePath: *f.capturePath, Modulus: *f.modulus, MarshalTo: *f.marshal, + CsvColumns: *f.columns, EnvelopeFlushThresholdBytes: uint32(*f.envelopeFlushBytes), EnvelopeFlushThresholdRows: uint32(*f.envelopeFlushRows), KafkaCompression: *f.kafkaCompression, diff --git a/cmd/xtcp2/xtcp2_test.go b/cmd/xtcp2/xtcp2_test.go index 5ba8662..63520d0 100644 --- a/cmd/xtcp2/xtcp2_test.go +++ b/cmd/xtcp2/xtcp2_test.go @@ -614,6 +614,7 @@ func TestPrintFlags(t *testing.T) { f.capturePath = &s f.modulus = &n64 f.marshal = &s + f.columns = &s f.envelopeFlushBytes = &n f.envelopeFlushRows = &n f.kafkaCompression = &s @@ -685,6 +686,7 @@ func TestBuildConfig(t *testing.T) { cp := "/tmp/cap/" mod := uint64(13) mar := "protoText" + cols := "hostname,tcpInfoRtt" dst := "udp:127.0.0.1:13000" dwf := uint(4) topic := "topic1" @@ -709,6 +711,7 @@ func TestBuildConfig(t *testing.T) { nltimeout: &nl, pollFrequency: &pf, pollTimeout: &pt, maxLoops: &ml, netlinkers: &nlk, nlmsgSeq: &seq, packetSize: &psz, packetSizeMply: &psm, writeFiles: &wf, capturePath: &cp, modulus: &mod, marshal: &mar, + columns: &cols, envelopeFlushBytes: &wf, envelopeFlushRows: &wf, kafkaCompression: &mar, s3Endpoint: &mar, @@ -746,6 +749,7 @@ func TestBuildConfig(t *testing.T) { {"CapturePath", c.CapturePath, "/tmp/cap/"}, {"Modulus", c.Modulus, uint64(13)}, {"MarshalTo", c.MarshalTo, "protoText"}, + {"CsvColumns", c.CsvColumns, "hostname,tcpInfoRtt"}, {"Dest", c.Dest, "udp:127.0.0.1:13000"}, {"DestWriteFiles", c.DestWriteFiles, uint32(4)}, {"Topic", c.Topic, "topic1"}, diff --git a/dart/xtcp_config/v1/xtcp_config.pb.dart b/dart/xtcp_config/v1/xtcp_config.pb.dart index 96d416f..88d5fd0 100644 --- a/dart/xtcp_config/v1/xtcp_config.pb.dart +++ b/dart/xtcp_config/v1/xtcp_config.pb.dart @@ -374,6 +374,7 @@ class XtcpConfig extends $pb.GeneratedMessage { $core.bool? ioUring, $core.int? ioUringRecvBatchSize, $core.int? ioUringCqeBatchSize, + $core.String? csvColumns, }) { final $result = create(); if (nlTimeoutMilliseconds != null) { @@ -499,6 +500,9 @@ class XtcpConfig extends $pb.GeneratedMessage { if (ioUringCqeBatchSize != null) { $result.ioUringCqeBatchSize = ioUringCqeBatchSize; } + if (csvColumns != null) { + $result.csvColumns = csvColumns; + } return $result; } XtcpConfig._() : super(); @@ -547,6 +551,7 @@ class XtcpConfig extends $pb.GeneratedMessage { ..aOB(210, _omitFieldNames ? '' : 'ioUring') ..a<$core.int>(211, _omitFieldNames ? '' : 'ioUringRecvBatchSize', $pb.PbFieldType.OU3) ..a<$core.int>(212, _omitFieldNames ? '' : 'ioUringCqeBatchSize', $pb.PbFieldType.OU3) + ..aOS(220, _omitFieldNames ? '' : 'csvColumns') ..hasRequiredFields = false ; @@ -1077,6 +1082,19 @@ class XtcpConfig extends $pb.GeneratedMessage { $core.bool hasIoUringCqeBatchSize() => $_has(40); @$pb.TagNumber(212) void clearIoUringCqeBatchSize() => clearField(212); + + /// Comma-separated subset of XtcpFlatRecord json field names selecting + /// which columns the csv/tsv marshallers emit (e.g. + /// "hostname,inetDiagMsgSocketSourcePort,inetDiagMsgState,tcpInfoRtt"). + /// Empty = all fields. Ignored by non-tabular marshallers. + @$pb.TagNumber(220) + $core.String get csvColumns => $_getSZ(41); + @$pb.TagNumber(220) + set csvColumns($core.String v) { $_setString(41, v); } + @$pb.TagNumber(220) + $core.bool hasCsvColumns() => $_has(41); + @$pb.TagNumber(220) + void clearCsvColumns() => clearField(220); } class EnabledDeserializers extends $pb.GeneratedMessage { diff --git a/dart/xtcp_config/v1/xtcp_config.pbjson.dart b/dart/xtcp_config/v1/xtcp_config.pbjson.dart index 396bcc2..652e9f0 100644 --- a/dart/xtcp_config/v1/xtcp_config.pbjson.dart +++ b/dart/xtcp_config/v1/xtcp_config.pbjson.dart @@ -138,6 +138,7 @@ const XtcpConfig$json = { {'1': 'io_uring', '3': 210, '4': 1, '5': 8, '8': {}, '10': 'ioUring'}, {'1': 'io_uring_recv_batch_size', '3': 211, '4': 1, '5': 13, '8': {}, '10': 'ioUringRecvBatchSize'}, {'1': 'io_uring_cqe_batch_size', '3': 212, '4': 1, '5': 13, '8': {}, '10': 'ioUringCqeBatchSize'}, + {'1': 'csv_columns', '3': 220, '4': 1, '5': 9, '8': {}, '10': 'csvColumns'}, ], '7': {}, }; @@ -157,7 +158,7 @@ final $typed_data.Uint8List xtcpConfigDescriptor = $convert.base64Decode( 'dFNpemVNcGx5Ei4KC3dyaXRlX2ZpbGVzGFogASgNQg26SArIAQAqBRjoBygAUgp3cml0ZUZpbG' 'VzEi8KDGNhcHR1cmVfcGF0aBhkIAEoCUIMukgJyAEAcgQQARhQUgtjYXB0dXJlUGF0aBIoCgdt' 'b2R1bHVzGG4gASgEQg66SAvIAQEyBhjAhD0oAVIHbW9kdWx1cxIrCgptYXJzaGFsX3RvGHggAS' - 'gJQgy6SAnIAQFyBBAEGChSCW1hcnNoYWxUbxJLCh5lbnZlbG9wZV9mbHVzaF90aHJlc2hvbGRf' + 'gJQgy6SAnIAQFyBBADGChSCW1hcnNoYWxUbxJLCh5lbnZlbG9wZV9mbHVzaF90aHJlc2hvbGRf' 'Ynl0ZXMYeiABKA1CBrpIA8gBAFIbZW52ZWxvcGVGbHVzaFRocmVzaG9sZEJ5dGVzEkkKHWVudm' 'Vsb3BlX2ZsdXNoX3RocmVzaG9sZF9yb3dzGHsgASgNQga6SAPIAQBSGmVudmVsb3BlRmx1c2hU' 'aHJlc2hvbGRSb3dzEjMKEWthZmthX2NvbXByZXNzaW9uGHwgASgJQga6SAPIAQBSEGthZmthQ2' @@ -185,9 +186,10 @@ final $typed_data.Uint8List xtcpConfigDescriptor = $convert.base64Decode( 'emVycxIiCghpb191cmluZxjSASABKAhCBrpIA8gBAFIHaW9VcmluZxJGChhpb191cmluZ19yZW' 'N2X2JhdGNoX3NpemUY0wEgASgNQg26SArIAQAqBRiAICgBUhRpb1VyaW5nUmVjdkJhdGNoU2l6' 'ZRJEChdpb191cmluZ19jcWVfYmF0Y2hfc2l6ZRjUASABKA1CDbpICsgBACoFGIAgKAFSE2lvVX' - 'JpbmdDcWVCYXRjaFNpemU6c7pIcBpuCg9YdGNwQ29uZmlnLnBvbGwSMlBvbGwgdGltZW91dCBt' - 'dXN0IGJlIGxlc3MgdGhhbiBwb2xsIHBvbGxfZnJlcXVlbmN5Gid0aGlzLnBvbGxfZnJlcXVlbm' - 'N5ID4gdGhpcy5wb2xsX3RpbWVvdXQ='); + 'JpbmdDcWVCYXRjaFNpemUSKAoLY3N2X2NvbHVtbnMY3AEgASgJQga6SAPIAQBSCmNzdkNvbHVt' + 'bnM6c7pIcBpuCg9YdGNwQ29uZmlnLnBvbGwSMlBvbGwgdGltZW91dCBtdXN0IGJlIGxlc3MgdG' + 'hhbiBwb2xsIHBvbGxfZnJlcXVlbmN5Gid0aGlzLnBvbGxfZnJlcXVlbmN5ID4gdGhpcy5wb2xs' + 'X3RpbWVvdXQ='); @$core.Deprecated('Use enabledDeserializersDescriptor instead') const EnabledDeserializers$json = { diff --git a/gen/xtcp_config/v1/xtcp_config.pb.cc b/gen/xtcp_config/v1/xtcp_config.pb.cc index 4543937..93f690b 100644 --- a/gen/xtcp_config/v1/xtcp_config.pb.cc +++ b/gen/xtcp_config/v1/xtcp_config.pb.cc @@ -168,6 +168,9 @@ inline constexpr XtcpConfig::Impl_::Impl_( tag_( &::google::protobuf::internal::fixed_address_empty_string, ::_pbi::ConstantInitialized()), + csv_columns_( + &::google::protobuf::internal::fixed_address_empty_string, + ::_pbi::ConstantInitialized()), poll_frequency_{nullptr}, poll_timeout_{nullptr}, kafka_produce_timeout_{nullptr}, @@ -430,6 +433,7 @@ const ::uint32_t PROTOBUF_FIELD_OFFSET(::xtcp_config::v1::XtcpConfig, _impl_.io_uring_), PROTOBUF_FIELD_OFFSET(::xtcp_config::v1::XtcpConfig, _impl_.io_uring_recv_batch_size_), PROTOBUF_FIELD_OFFSET(::xtcp_config::v1::XtcpConfig, _impl_.io_uring_cqe_batch_size_), + PROTOBUF_FIELD_OFFSET(::xtcp_config::v1::XtcpConfig, _impl_.csv_columns_), ~0u, 0, 1, @@ -471,6 +475,7 @@ const ::uint32_t ~0u, ~0u, ~0u, + ~0u, PROTOBUF_FIELD_OFFSET(::xtcp_config::v1::EnabledDeserializers_EnabledEntry_DoNotUse, _impl_._has_bits_), PROTOBUF_FIELD_OFFSET(::xtcp_config::v1::EnabledDeserializers_EnabledEntry_DoNotUse, _internal_metadata_), ~0u, // no _extensions_ @@ -502,9 +507,9 @@ static const ::_pbi::MigrationSchema {28, 37, -1, sizeof(::xtcp_config::v1::SetResponse)}, {38, 48, -1, sizeof(::xtcp_config::v1::SetPollFrequencyRequest)}, {50, 59, -1, sizeof(::xtcp_config::v1::SetPollFrequencyResponse)}, - {60, 109, -1, sizeof(::xtcp_config::v1::XtcpConfig)}, - {150, 160, -1, sizeof(::xtcp_config::v1::EnabledDeserializers_EnabledEntry_DoNotUse)}, - {162, -1, -1, sizeof(::xtcp_config::v1::EnabledDeserializers)}, + {60, 110, -1, sizeof(::xtcp_config::v1::XtcpConfig)}, + {152, 162, -1, sizeof(::xtcp_config::v1::EnabledDeserializers_EnabledEntry_DoNotUse)}, + {164, -1, -1, sizeof(::xtcp_config::v1::EnabledDeserializers)}, }; static const ::_pb::Message* const file_default_instances[] = { &::xtcp_config::v1::_GetRequest_default_instance_._instance, @@ -537,7 +542,7 @@ const char descriptor_table_protodef_xtcp_5fconfig_2fv1_2fxtcp_5fconfig_2eproto[ " than poll poll_frequency\032\'this.poll_tim" "eout < this.poll_frequency\"N\n\030SetPollFre" "quencyResponse\0222\n\006config\030\001 \001(\0132\032.xtcp_co" - "nfig.v1.XtcpConfigR\006config\"\350\022\n\nXtcpConfi" + "nfig.v1.XtcpConfigR\006config\"\222\023\n\nXtcpConfi" "g\022F\n\027nl_timeout_milliseconds\030\n \001(\004B\016\272H\0132" "\006\030\240\215\006(\000\310\001\001R\025nlTimeoutMilliseconds\022S\n\016pol" "l_frequency\030\024 \001(\0132\031.google.protobuf.Dura" @@ -556,7 +561,7 @@ const char descriptor_table_protodef_xtcp_5fconfig_2fv1_2fxtcp_5fconfig_2eproto[ "s\022/\n\014capture_path\030d \001(\tB\014\272H\tr\004\020\001\030P\310\001\000R\013c" "apturePath\022(\n\007modulus\030n \001(\004B\016\272H\0132\006\030\300\204=(\001" "\310\001\001R\007modulus\022+\n\nmarshal_to\030x \001(\tB\014\272H\tr\004\020" - "\004\030(\310\001\001R\tmarshalTo\022K\n\036envelope_flush_thre" + "\003\030(\310\001\001R\tmarshalTo\022K\n\036envelope_flush_thre" "shold_bytes\030z \001(\rB\006\272H\003\310\001\000R\033envelopeFlush" "ThresholdBytes\022I\n\035envelope_flush_thresho" "ld_rows\030{ \001(\rB\006\272H\003\310\001\000R\032envelopeFlushThre" @@ -595,26 +600,27 @@ const char descriptor_table_protodef_xtcp_5fconfig_2fv1_2fxtcp_5fconfig_2eproto[ "h_size\030\323\001 \001(\rB\r\272H\n*\005\030\200 (\001\310\001\000R\024ioUringRec" "vBatchSize\022D\n\027io_uring_cqe_batch_size\030\324\001" " \001(\rB\r\272H\n*\005\030\200 (\001\310\001\000R\023ioUringCqeBatchSize" - ":s\272Hp\032n\n\017XtcpConfig.poll\0222Poll timeout m" - "ust be less than poll poll_frequency\032\'th" - "is.poll_frequency > this.poll_timeout\"\237\001" - "\n\024EnabledDeserializers\022K\n\007enabled\030\001 \003(\0132" - "1.xtcp_config.v1.EnabledDeserializers.En" - "abledEntryR\007enabled\032:\n\014EnabledEntry\022\020\n\003k" - "ey\030\001 \001(\tR\003key\022\024\n\005value\030\002 \001(\010R\005value:\0028\0012" - "\341\002\n\rConfigService\022]\n\003Get\022\032.xtcp_config.v" - "1.GetRequest\032\033.xtcp_config.v1.GetRespons" - "e\"\035\202\323\344\223\002\027\032\022/ConfigService/Get:\001*\022]\n\003Set\022" - "\032.xtcp_config.v1.SetRequest\032\033.xtcp_confi" - "g.v1.SetResponse\"\035\202\323\344\223\002\027\032\022/ConfigService" - "/Set:\001*\022\221\001\n\020SetPollFrequency\022\'.xtcp_conf" - "ig.v1.SetPollFrequencyRequest\032(.xtcp_con" - "fig.v1.SetPollFrequencyResponse\"*\202\323\344\223\002$\032" - "\037/ConfigService/SetPollFrequency:\001*B\215\001\n\022" - "com.xtcp_config.v1B\017XtcpConfigProtoP\001Z\021." - "/pkg/xtcp_config\242\002\003XXX\252\002\rXtcpConfig.V1\312\002" - "\rXtcpConfig\\V1\342\002\031XtcpConfig\\V1\\GPBMetada" - "ta\352\002\016XtcpConfig::V1b\006proto3" + "\022(\n\013csv_columns\030\334\001 \001(\tB\006\272H\003\310\001\000R\ncsvColum" + "ns:s\272Hp\032n\n\017XtcpConfig.poll\0222Poll timeout" + " must be less than poll poll_frequency\032\'" + "this.poll_frequency > this.poll_timeout\"" + "\237\001\n\024EnabledDeserializers\022K\n\007enabled\030\001 \003(" + "\01321.xtcp_config.v1.EnabledDeserializers." + "EnabledEntryR\007enabled\032:\n\014EnabledEntry\022\020\n" + "\003key\030\001 \001(\tR\003key\022\024\n\005value\030\002 \001(\010R\005value:\0028" + "\0012\341\002\n\rConfigService\022]\n\003Get\022\032.xtcp_config" + ".v1.GetRequest\032\033.xtcp_config.v1.GetRespo" + "nse\"\035\202\323\344\223\002\027\032\022/ConfigService/Get:\001*\022]\n\003Se" + "t\022\032.xtcp_config.v1.SetRequest\032\033.xtcp_con" + "fig.v1.SetResponse\"\035\202\323\344\223\002\027\032\022/ConfigServi" + "ce/Set:\001*\022\221\001\n\020SetPollFrequency\022\'.xtcp_co" + "nfig.v1.SetPollFrequencyRequest\032(.xtcp_c" + "onfig.v1.SetPollFrequencyResponse\"*\202\323\344\223\002" + "$\032\037/ConfigService/SetPollFrequency:\001*B\215\001" + "\n\022com.xtcp_config.v1B\017XtcpConfigProtoP\001Z" + "\021./pkg/xtcp_config\242\002\003XXX\252\002\rXtcpConfig.V1" + "\312\002\rXtcpConfig\\V1\342\002\031XtcpConfig\\V1\\GPBMeta" + "data\352\002\016XtcpConfig::V1b\006proto3" }; static const ::_pbi::DescriptorTable* const descriptor_table_xtcp_5fconfig_2fv1_2fxtcp_5fconfig_2eproto_deps[3] = { @@ -626,7 +632,7 @@ static ::absl::once_flag descriptor_table_xtcp_5fconfig_2fv1_2fxtcp_5fconfig_2ep PROTOBUF_CONSTINIT const ::_pbi::DescriptorTable descriptor_table_xtcp_5fconfig_2fv1_2fxtcp_5fconfig_2eproto = { false, false, - 3827, + 3869, descriptor_table_protodef_xtcp_5fconfig_2fv1_2fxtcp_5fconfig_2eproto, "xtcp_config/v1/xtcp_config.proto", &descriptor_table_xtcp_5fconfig_2fv1_2fxtcp_5fconfig_2eproto_once, @@ -2112,7 +2118,8 @@ inline PROTOBUF_NDEBUG_INLINE XtcpConfig::Impl_::Impl_( xtcp_proto_file_(arena, from.xtcp_proto_file_), kafka_schema_url_(arena, from.kafka_schema_url_), label_(arena, from.label_), - tag_(arena, from.tag_) {} + tag_(arena, from.tag_), + csv_columns_(arena, from.csv_columns_) {} XtcpConfig::XtcpConfig( ::google::protobuf::Arena* arena, @@ -2170,7 +2177,8 @@ inline PROTOBUF_NDEBUG_INLINE XtcpConfig::Impl_::Impl_( xtcp_proto_file_(arena), kafka_schema_url_(arena), label_(arena), - tag_(arena) {} + tag_(arena), + csv_columns_(arena) {} inline void XtcpConfig::SharedCtor(::_pb::Arena* arena) { new (&_impl_) Impl_(internal_visibility(), arena); @@ -2206,6 +2214,7 @@ inline void XtcpConfig::SharedDtor(MessageLite& self) { this_._impl_.kafka_schema_url_.Destroy(); this_._impl_.label_.Destroy(); this_._impl_.tag_.Destroy(); + this_._impl_.csv_columns_.Destroy(); delete this_._impl_.poll_frequency_; delete this_._impl_.poll_timeout_; delete this_._impl_.kafka_produce_timeout_; @@ -2249,15 +2258,15 @@ const ::google::protobuf::internal::ClassData* XtcpConfig::GetClassData() const return _class_data_.base(); } PROTOBUF_CONSTINIT PROTOBUF_ATTRIBUTE_INIT_PRIORITY1 -const ::_pbi::TcParseTable<5, 41, 4, 256, 27> XtcpConfig::_table_ = { +const ::_pbi::TcParseTable<5, 42, 4, 267, 29> XtcpConfig::_table_ = { { PROTOBUF_FIELD_OFFSET(XtcpConfig, _impl_._has_bits_), 0, // no _extensions_ - 212, 248, // max_field_number, fast_idx_mask + 220, 248, // max_field_number, fast_idx_mask offsetof(decltype(_table_), field_lookup_table), 3757571583, // skipmap offsetof(decltype(_table_), field_entries), - 41, // num_field_entries + 42, // num_field_entries 4, // num_aux_entries offsetof(decltype(_table_), aux_entries), _class_data_.base(), @@ -2334,9 +2343,9 @@ const ::_pbi::TcParseTable<5, 41, 4, 256, 27> XtcpConfig::_table_ = { {::_pbi::TcParser::FastUS2, {2042, 63, 0, PROTOBUF_FIELD_OFFSET(XtcpConfig, _impl_.s3_prefix_)}}, }}, {{ - 40, 0, 11, + 40, 0, 12, 62462, 3, 49135, 6, 65279, 8, 61435, 9, 65471, 11, 18434, 12, - 48480, 25, 65279, 33, 61435, 34, 65471, 36, 58366, 37, + 48480, 25, 65279, 33, 61435, 34, 65471, 36, 58366, 37, 65519, 41, 65535, 65535 }}, {{ // uint64 nl_timeout_milliseconds = 10 [json_name = "nlTimeoutMilliseconds", (.buf.validate.field) = { @@ -2462,13 +2471,16 @@ const ::_pbi::TcParseTable<5, 41, 4, 256, 27> XtcpConfig::_table_ = { // uint32 io_uring_cqe_batch_size = 212 [json_name = "ioUringCqeBatchSize", (.buf.validate.field) = { {PROTOBUF_FIELD_OFFSET(XtcpConfig, _impl_.io_uring_cqe_batch_size_), -1, 0, (0 | ::_fl::kFcSingular | ::_fl::kUInt32)}, + // string csv_columns = 220 [json_name = "csvColumns", (.buf.validate.field) = { + {PROTOBUF_FIELD_OFFSET(XtcpConfig, _impl_.csv_columns_), -1, 0, + (0 | ::_fl::kFcSingular | ::_fl::kUtf8String | ::_fl::kRepAString)}, }}, {{ {::_pbi::TcParser::GetTable<::google::protobuf::Duration>()}, {::_pbi::TcParser::GetTable<::google::protobuf::Duration>()}, {::_pbi::TcParser::GetTable<::google::protobuf::Duration>()}, {::_pbi::TcParser::GetTable<::xtcp_config::v1::EnabledDeserializers>()}, }}, {{ - "\31\0\0\0\0\0\0\0\0\0\0\14\0\12\0\0\21\13\11\11\15\15\4\0\11\0\15\22\0\0\5\17\20\0\0\5\3\0\0\0\0\0\0\0\0\0\0\0" + "\31\0\0\0\0\0\0\0\0\0\0\14\0\12\0\0\21\13\11\11\15\15\4\0\11\0\15\22\0\0\5\17\20\0\0\5\3\0\0\0\0\0\13\0\0\0\0\0" "xtcp_config.v1.XtcpConfig" "capture_path" "marshal_to" @@ -2487,6 +2499,7 @@ const ::_pbi::TcParseTable<5, 41, 4, 256, 27> XtcpConfig::_table_ = { "kafka_schema_url" "label" "tag" + "csv_columns" }}, }; @@ -2514,6 +2527,7 @@ PROTOBUF_NOINLINE void XtcpConfig::Clear() { _impl_.kafka_schema_url_.ClearToEmpty(); _impl_.label_.ClearToEmpty(); _impl_.tag_.ClearToEmpty(); + _impl_.csv_columns_.ClearToEmpty(); cached_has_bits = _impl_._has_bits_[0]; if (cached_has_bits & 0x0000000fu) { if (cached_has_bits & 0x00000001u) { @@ -2860,6 +2874,14 @@ PROTOBUF_NOINLINE void XtcpConfig::Clear() { 212, this_._internal_io_uring_cqe_batch_size(), target); } + // string csv_columns = 220 [json_name = "csvColumns", (.buf.validate.field) = { + if (!this_._internal_csv_columns().empty()) { + const std::string& _s = this_._internal_csv_columns(); + ::google::protobuf::internal::WireFormatLite::VerifyUtf8String( + _s.data(), static_cast(_s.length()), ::google::protobuf::internal::WireFormatLite::SERIALIZE, "xtcp_config.v1.XtcpConfig.csv_columns"); + target = stream->WriteStringMaybeAliased(220, _s, target); + } + if (PROTOBUF_PREDICT_FALSE(this_._internal_metadata_.have_unknown_fields())) { target = ::_pbi::WireFormat::InternalSerializeUnknownFieldsToArray( @@ -2970,6 +2992,11 @@ PROTOBUF_NOINLINE void XtcpConfig::Clear() { total_size += 2 + ::google::protobuf::internal::WireFormatLite::StringSize( this_._internal_tag()); } + // string csv_columns = 220 [json_name = "csvColumns", (.buf.validate.field) = { + if (!this_._internal_csv_columns().empty()) { + total_size += 2 + ::google::protobuf::internal::WireFormatLite::StringSize( + this_._internal_csv_columns()); + } } cached_has_bits = this_._impl_._has_bits_[0]; if (cached_has_bits & 0x0000000fu) { @@ -3159,6 +3186,9 @@ void XtcpConfig::MergeImpl(::google::protobuf::MessageLite& to_msg, const ::goog if (!from._internal_tag().empty()) { _this->_internal_set_tag(from._internal_tag()); } + if (!from._internal_csv_columns().empty()) { + _this->_internal_set_csv_columns(from._internal_csv_columns()); + } cached_has_bits = from._impl_._has_bits_[0]; if (cached_has_bits & 0x0000000fu) { if (cached_has_bits & 0x00000001u) { @@ -3293,6 +3323,7 @@ void XtcpConfig::InternalSwap(XtcpConfig* PROTOBUF_RESTRICT other) { ::_pbi::ArenaStringPtr::InternalSwap(&_impl_.kafka_schema_url_, &other->_impl_.kafka_schema_url_, arena); ::_pbi::ArenaStringPtr::InternalSwap(&_impl_.label_, &other->_impl_.label_, arena); ::_pbi::ArenaStringPtr::InternalSwap(&_impl_.tag_, &other->_impl_.tag_, arena); + ::_pbi::ArenaStringPtr::InternalSwap(&_impl_.csv_columns_, &other->_impl_.csv_columns_, arena); ::google::protobuf::internal::memswap< PROTOBUF_FIELD_OFFSET(XtcpConfig, _impl_.io_uring_cqe_batch_size_) + sizeof(XtcpConfig::_impl_.io_uring_cqe_batch_size_) diff --git a/gen/xtcp_config/v1/xtcp_config.pb.h b/gen/xtcp_config/v1/xtcp_config.pb.h index c618252..37a77f1 100644 --- a/gen/xtcp_config/v1/xtcp_config.pb.h +++ b/gen/xtcp_config/v1/xtcp_config.pb.h @@ -860,6 +860,7 @@ class XtcpConfig final : public ::google::protobuf::Message kKafkaSchemaUrlFieldNumber = 145, kLabelFieldNumber = 170, kTagFieldNumber = 180, + kCsvColumnsFieldNumber = 220, kPollFrequencyFieldNumber = 20, kPollTimeoutFieldNumber = 30, kKafkaProduceTimeoutFieldNumber = 150, @@ -1156,6 +1157,22 @@ class XtcpConfig final : public ::google::protobuf::Message const std::string& value); std::string* _internal_mutable_tag(); + public: + // string csv_columns = 220 [json_name = "csvColumns", (.buf.validate.field) = { + void clear_csv_columns() ; + const std::string& csv_columns() const; + template + void set_csv_columns(Arg_&& arg, Args_... args); + std::string* mutable_csv_columns(); + PROTOBUF_NODISCARD std::string* release_csv_columns(); + void set_allocated_csv_columns(std::string* value); + + private: + const std::string& _internal_csv_columns() const; + inline PROTOBUF_ALWAYS_INLINE void _internal_set_csv_columns( + const std::string& value); + std::string* _internal_mutable_csv_columns(); + public: // .google.protobuf.Duration poll_frequency = 20 [json_name = "pollFrequency", (.buf.validate.field) = { bool has_poll_frequency() const; @@ -1422,8 +1439,8 @@ class XtcpConfig final : public ::google::protobuf::Message class _Internal; friend class ::google::protobuf::internal::TcParser; static const ::google::protobuf::internal::TcParseTable< - 5, 41, 4, - 256, 27> + 5, 42, 4, + 267, 29> _table_; friend class ::google::protobuf::MessageLite; @@ -1459,6 +1476,7 @@ class XtcpConfig final : public ::google::protobuf::Message ::google::protobuf::internal::ArenaStringPtr kafka_schema_url_; ::google::protobuf::internal::ArenaStringPtr label_; ::google::protobuf::internal::ArenaStringPtr tag_; + ::google::protobuf::internal::ArenaStringPtr csv_columns_; ::google::protobuf::Duration* poll_frequency_; ::google::protobuf::Duration* poll_timeout_; ::google::protobuf::Duration* kafka_produce_timeout_; @@ -4504,6 +4522,54 @@ inline void XtcpConfig::_internal_set_io_uring_cqe_batch_size(::uint32_t value) _impl_.io_uring_cqe_batch_size_ = value; } +// string csv_columns = 220 [json_name = "csvColumns", (.buf.validate.field) = { +inline void XtcpConfig::clear_csv_columns() { + ::google::protobuf::internal::TSanWrite(&_impl_); + _impl_.csv_columns_.ClearToEmpty(); +} +inline const std::string& XtcpConfig::csv_columns() const + ABSL_ATTRIBUTE_LIFETIME_BOUND { + // @@protoc_insertion_point(field_get:xtcp_config.v1.XtcpConfig.csv_columns) + return _internal_csv_columns(); +} +template +inline PROTOBUF_ALWAYS_INLINE void XtcpConfig::set_csv_columns(Arg_&& arg, + Args_... args) { + ::google::protobuf::internal::TSanWrite(&_impl_); + _impl_.csv_columns_.Set(static_cast(arg), args..., GetArena()); + // @@protoc_insertion_point(field_set:xtcp_config.v1.XtcpConfig.csv_columns) +} +inline std::string* XtcpConfig::mutable_csv_columns() ABSL_ATTRIBUTE_LIFETIME_BOUND { + std::string* _s = _internal_mutable_csv_columns(); + // @@protoc_insertion_point(field_mutable:xtcp_config.v1.XtcpConfig.csv_columns) + return _s; +} +inline const std::string& XtcpConfig::_internal_csv_columns() const { + ::google::protobuf::internal::TSanRead(&_impl_); + return _impl_.csv_columns_.Get(); +} +inline void XtcpConfig::_internal_set_csv_columns(const std::string& value) { + ::google::protobuf::internal::TSanWrite(&_impl_); + _impl_.csv_columns_.Set(value, GetArena()); +} +inline std::string* XtcpConfig::_internal_mutable_csv_columns() { + ::google::protobuf::internal::TSanWrite(&_impl_); + return _impl_.csv_columns_.Mutable( GetArena()); +} +inline std::string* XtcpConfig::release_csv_columns() { + ::google::protobuf::internal::TSanWrite(&_impl_); + // @@protoc_insertion_point(field_release:xtcp_config.v1.XtcpConfig.csv_columns) + return _impl_.csv_columns_.Release(); +} +inline void XtcpConfig::set_allocated_csv_columns(std::string* value) { + ::google::protobuf::internal::TSanWrite(&_impl_); + _impl_.csv_columns_.SetAllocated(value, GetArena()); + if (::google::protobuf::internal::DebugHardenForceCopyDefaultString() && _impl_.csv_columns_.IsDefault()) { + _impl_.csv_columns_.Set("", GetArena()); + } + // @@protoc_insertion_point(field_set_allocated:xtcp_config.v1.XtcpConfig.csv_columns) +} + // ------------------------------------------------------------------- // ------------------------------------------------------------------- diff --git a/gen/xtcp_config/v1/xtcp_config.pb.validate.cc b/gen/xtcp_config/v1/xtcp_config.pb.validate.cc index cfa3bfe..3d0b28b 100644 --- a/gen/xtcp_config/v1/xtcp_config.pb.validate.cc +++ b/gen/xtcp_config/v1/xtcp_config.pb.validate.cc @@ -1021,6 +1021,23 @@ return false; + + + + + + + + + + + + + + + + + @@ -1122,7 +1139,7 @@ return false; } } -// no validation rules for io_uring// no validation rules for io_uring_recv_batch_size// no validation rules for io_uring_cqe_batch_size +// no validation rules for io_uring// no validation rules for io_uring_recv_batch_size// no validation rules for io_uring_cqe_batch_size// no validation rules for csv_columns return true; } diff --git a/pkg/xtcp/destinations_core.go b/pkg/xtcp/destinations_core.go index 5c4cc46..54b449e 100644 --- a/pkg/xtcp/destinations_core.go +++ b/pkg/xtcp/destinations_core.go @@ -31,6 +31,11 @@ type DestinationFactory func(ctx context.Context, x *XTCP) (Destination, error) const ( schemeNull = "null" schemeStdout = "stdout" + schemeStderr = "stderr" + schemeFile = "file" + schemeTCP = "tcp" + schemeHTTP = "http" + schemeHTTPS = "https" schemeUDP = "udp" schemeUnix = "unix" schemeUnixgram = "unixgram" @@ -51,7 +56,9 @@ const ( // distinguish "unknown scheme" from "exists but not compiled into this // binary" so the operator gets the right hint. var knownSchemes = []string{ - schemeNull, schemeStdout, schemeUDP, schemeUnix, schemeUnixgram, + schemeNull, schemeStdout, schemeStderr, schemeFile, + schemeTCP, schemeHTTP, schemeHTTPS, + schemeUDP, schemeUnix, schemeUnixgram, schemeKafka, schemeNats, schemeNsq, schemeValkey, schemeS3Parquet, } diff --git a/pkg/xtcp/destinations_file.go b/pkg/xtcp/destinations_file.go new file mode 100644 index 0000000..5c2bbc0 --- /dev/null +++ b/pkg/xtcp/destinations_file.go @@ -0,0 +1,38 @@ +package xtcp + +import ( + "context" + "fmt" + "os" + "strings" +) + +// stderr and file destinations: thin factories over the reusable writerDest. +// Both are stdlib (no build tag). Pair with a line/tabular marshaller +// (jsonl/csv/tsv/protoJson) — the marshaller frames each flush. + +// newStderrDest writes records to the process's stderr. Handy when stdout is +// reserved for something else, or for `2>records.log`. +func newStderrDest(_ context.Context, x *XTCP) (Destination, error) { + return &writerDest{x: x, w: os.Stderr, label: "destStderr"}, nil +} + +// newFileDest appends records to a file: `-dest file:/var/log/xtcp.jsonl`. +// The file is created if missing and opened for append; the *os.File is +// closed via writerDest.Close. +func newFileDest(_ context.Context, x *XTCP) (Destination, error) { + path := strings.TrimPrefix(x.config.Dest, schemeFile+":") + if path == "" { + return nil, fmt.Errorf("InitDestFile: empty path (use -dest file:/path/to/file)") + } + f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0o600) + if err != nil { + return nil, fmt.Errorf("InitDestFile OpenFile(%q): %w", path, err) + } + return &writerDest{x: x, w: f, label: "destFile", closer: f}, nil +} + +func init() { + RegisterDestination(schemeStderr, newStderrDest) + RegisterDestination(schemeFile, newFileDest) +} diff --git a/pkg/xtcp/destinations_http.go b/pkg/xtcp/destinations_http.go new file mode 100644 index 0000000..5c37c5e --- /dev/null +++ b/pkg/xtcp/destinations_http.go @@ -0,0 +1,111 @@ +package xtcp + +import ( + "bytes" + "context" + "fmt" + "io" + "log" + "net/http" + "time" +) + +// httpDest POSTs each flushed batch to an HTTP(S) endpoint: +// `-dest http://host:port/path` or `-dest https://...`. Works with generic +// ingest endpoints and log/metric shippers (Vector, Loki push, Elasticsearch +// bulk, Splunk HEC, …). The Content-Type is derived from the marshaller so a +// receiver can route by type. Framing is the marshaller's responsibility. +// +// One POST per flush (per poll cycle / size-cap). Non-2xx responses are +// errors. A keep-alive client is reused across sends. +type httpDest struct { + x *XTCP + url string + contentType string + client *http.Client + timeout time.Duration +} + +const httpDestDefaultTimeout = 10 * time.Second + +// contentTypeForMarshaller maps a marshaller name to the MIME type a receiver +// would expect, so HTTP consumers can route/parse by Content-Type. +func contentTypeForMarshaller(marshalTo string) string { + switch marshalTo { + case MarshallerJSONL: + return "application/x-ndjson" + case MarshallerProtoJSON: + return "application/json" + case MarshallerCSV: + return "text/csv" + case MarshallerTSV: + return "text/tab-separated-values" + case MarshallerProtoText: + return "text/plain; charset=utf-8" + default: // protobufList, msgpack + return "application/octet-stream" + } +} + +func newHTTPDest(_ context.Context, x *XTCP) (Destination, error) { + // x.config.Dest is the full URL (scheme included), e.g. + // "http://127.0.0.1:8080/ingest" — used verbatim. + url := x.config.Dest + timeout := x.config.GetKafkaProduceTimeout().AsDuration() + if timeout <= 0 { + timeout = httpDestDefaultTimeout + } + return &httpDest{ + x: x, + url: url, + contentType: contentTypeForMarshaller(x.config.MarshalTo), + client: &http.Client{Timeout: timeout}, + timeout: timeout, + }, nil +} + +func (d *httpDest) Send(ctx context.Context, b *[]byte) (int, error) { + reqCtx, cancel := context.WithTimeout(ctx, d.timeout) + defer cancel() + + req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, d.url, bytes.NewReader(*b)) + if err != nil { + d.x.pC.WithLabelValues("destHTTP", "newRequest", "error").Inc() + return 0, fmt.Errorf("destHTTP new request: %w", err) + } + req.Header.Set("Content-Type", d.contentType) + + resp, err := d.client.Do(req) + if err != nil { + d.x.pC.WithLabelValues("destHTTP", "do", "error").Inc() + if d.x.debugLevel > 100 { + log.Printf("destHTTP POST %q err:%v", d.url, err) + } + return 0, fmt.Errorf("destHTTP POST %q: %w", d.url, err) + } + // Drain and close so the keep-alive connection can be reused. + if _, derr := io.Copy(io.Discard, resp.Body); derr != nil && d.x.debugLevel > 100 { + log.Printf("destHTTP drain body err:%v", derr) + } + if cerr := resp.Body.Close(); cerr != nil && d.x.debugLevel > 100 { + log.Printf("destHTTP body close err:%v", cerr) + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + d.x.pC.WithLabelValues("destHTTP", "status", "error").Inc() + return 0, fmt.Errorf("destHTTP POST %q: status %d", d.url, resp.StatusCode) + } + d.x.pC.WithLabelValues("destHTTP", "posts", "count").Inc() + d.x.pC.WithLabelValues("destHTTP", "postBytes", "count").Add(float64(len(*b))) + return 1, nil +} + +func (d *httpDest) Close() error { + d.client.CloseIdleConnections() + return nil +} + +func init() { + RegisterDestination(schemeHTTP, newHTTPDest) + RegisterDestination(schemeHTTPS, newHTTPDest) +} diff --git a/pkg/xtcp/destinations_stdout.go b/pkg/xtcp/destinations_stdout.go index fb49d92..e25287a 100644 --- a/pkg/xtcp/destinations_stdout.go +++ b/pkg/xtcp/destinations_stdout.go @@ -7,48 +7,50 @@ import ( "os" ) -// writerDest sends each marshaled record to an arbitrary io.Writer, -// newline-terminated (one frame per Send). It is the reusable core behind -// any stream sink: the "stdout" scheme wires it to os.Stdout, and a future -// stderr/file sink is a one-line factory over the same type rather than a -// copy of the Send/Close boilerplate. +// writerDest sends each marshaled payload to an arbitrary io.Writer, +// verbatim. It is the reusable core behind any stream sink: the "stdout" +// scheme wires it to os.Stdout, "stderr" to os.Stderr, and "file" to an +// *os.File — each a one-line factory over this type rather than a copy of +// the Send/Close boilerplate. // -// The io.Writer seam is also the test seam — unit tests inject a -// *bytes.Buffer and assert on the framing without touching the real -// os.Stdout (see destinations_stdout_test.go). +// Framing is the marshaller's responsibility, not the destination's: the +// text/line marshallers (protoJson envelope, jsonl, csv, tsv) terminate their +// output with a trailing newline, while the binary marshallers (protobufList, +// msgpack) do not. Keeping writerDest a verbatim byte writer means every +// format frames correctly on every sink — including the raw tcp/http sinks +// that must not have stray bytes injected into a length-delimited stream. // -// Pair with `-marshal protoJson` (or protoText) to stream records as NDJSON -// for local development, debugging, or piping to jq. The daemon's logs go to -// stderr, so stdout carries only records. +// The io.Writer seam is also the test seam — unit tests inject a +// *bytes.Buffer and assert on the bytes without touching the real os.Stdout. // // Send is invoked serially (see the Destination contract), so the writer is // used without an internal mutex. type writerDest struct { - x *XTCP - w io.Writer - label string // metric label, e.g. "destStdout" + x *XTCP + w io.Writer + label string // metric label, e.g. "destStdout" + closer io.Closer // optional; closed by Close. nil for os.Stdout/os.Stderr. } -// streamFrameSep terminates each record written by writerDest. Kept as a -// package-level slice so Send never appends to (and thus never reallocates -// or corrupts) the caller's pooled payload buffer. -var streamFrameSep = []byte{'\n'} - func (d *writerDest) Send(_ context.Context, b *[]byte) (int, error) { d.x.pC.WithLabelValues(d.label, "start", "count").Inc() n, err := d.w.Write(*b) if err != nil { return n, fmt.Errorf("%s write: %w", d.label, err) } - if _, err := d.w.Write(streamFrameSep); err != nil { - return n, fmt.Errorf("%s newline: %w", d.label, err) - } return n, nil } -func (d *writerDest) Close() error { return nil } +func (d *writerDest) Close() error { + if d.closer != nil { + return d.closer.Close() + } + return nil +} -// newStdoutDest wires writerDest to os.Stdout. +// newStdoutDest wires writerDest to os.Stdout. Pair with a line-oriented +// marshaller (`-marshal jsonl|csv|tsv|protoJson`) for human/jq-able output; +// the daemon's logs go to stderr, so stdout carries only records. func newStdoutDest(_ context.Context, x *XTCP) (Destination, error) { return &writerDest{x: x, w: os.Stdout, label: "destStdout"}, nil } diff --git a/pkg/xtcp/destinations_stdout_test.go b/pkg/xtcp/destinations_stdout_test.go index 707bbd5..05f5448 100644 --- a/pkg/xtcp/destinations_stdout_test.go +++ b/pkg/xtcp/destinations_stdout_test.go @@ -8,16 +8,18 @@ import ( "testing" ) -// TestWriterDestFraming verifies the reusable writerDest writes each payload -// followed by a single newline, returns the payload byte count, and can be -// driven entirely through an injected io.Writer — no os.Stdout needed. -func TestWriterDestFraming(t *testing.T) { +// TestWriterDestVerbatim verifies the reusable writerDest writes each payload +// verbatim (framing is the marshaller's job, not the destination's), returns +// the payload byte count, and can be driven entirely through an injected +// io.Writer — no os.Stdout needed. +func TestWriterDestVerbatim(t *testing.T) { x := newTestXTCP(t, schemeStdout) var buf bytes.Buffer d := &writerDest{x: x, w: &buf, label: "destStdout"} ctx := context.Background() - payloads := [][]byte{[]byte(`{"a":1}`), []byte(`{"b":2}`)} + // Marshallers own the newline, so payloads arrive already framed. + payloads := [][]byte{[]byte("{\"a\":1}\n"), []byte("{\"b\":2}\n")} for _, p := range payloads { b := append([]byte(nil), p...) // copy: Send must not mutate the caller's buffer n, err := d.Send(ctx, &b) @@ -43,37 +45,17 @@ func TestWriterDestFraming(t *testing.T) { var errBoom = errors.New("boom") -// failingWriter fails on the Nth Write (1-indexed); earlier writes succeed -// into buf. Used to exercise both error branches of writerDest.Send. -type failingWriter struct { - failOn int - calls int - buf bytes.Buffer -} +// errWriter always fails, exercising writerDest.Send's error branch. +type errWriter struct{} -func (w *failingWriter) Write(p []byte) (int, error) { - w.calls++ - if w.calls == w.failOn { - return 0, errBoom - } - return w.buf.Write(p) -} - -func TestWriterDestPayloadWriteError(t *testing.T) { - x := newTestXTCP(t, schemeStdout) - d := &writerDest{x: x, w: &failingWriter{failOn: 1}, label: "destStdout"} - b := []byte("x") - if _, err := d.Send(context.Background(), &b); err == nil { - t.Fatal("expected error when the payload write fails") - } -} +func (errWriter) Write([]byte) (int, error) { return 0, errBoom } -func TestWriterDestNewlineWriteError(t *testing.T) { +func TestWriterDestWriteError(t *testing.T) { x := newTestXTCP(t, schemeStdout) - d := &writerDest{x: x, w: &failingWriter{failOn: 2}, label: "destStdout"} + d := &writerDest{x: x, w: errWriter{}, label: "destStdout"} b := []byte("x") if _, err := d.Send(context.Background(), &b); err == nil { - t.Fatal("expected error when the newline write fails") + t.Fatal("expected error when the write fails") } } diff --git a/pkg/xtcp/destinations_stream_test.go b/pkg/xtcp/destinations_stream_test.go new file mode 100644 index 0000000..bc2d7e5 --- /dev/null +++ b/pkg/xtcp/destinations_stream_test.go @@ -0,0 +1,184 @@ +package xtcp + +import ( + "bufio" + "context" + "io" + "net" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "sync" + "testing" +) + +// TestStderrDestFactory: the stderr scheme is registered and writes to stderr. +func TestStderrDestFactory(t *testing.T) { + if _, status := lookupDestinationFactory(schemeStderr); status != destLookupFound { + t.Fatalf("stderr not registered: %v", status) + } + x := newTestXTCP(t, schemeStderr) + dest, err := newStderrDest(context.Background(), x) + if err != nil { + t.Fatalf("newStderrDest: %v", err) + } + wd, ok := dest.(*writerDest) + if !ok || wd.w != os.Stderr { + t.Fatalf("stderr dest not wired to os.Stderr: %T", dest) + } + if err := dest.Close(); err != nil { + t.Errorf("Close: %v", err) + } +} + +// TestFileDest writes payloads and reads them back from the file. +func TestFileDest(t *testing.T) { + path := filepath.Join(t.TempDir(), "out.jsonl") + x := newTestXTCP(t, "file:"+path) + dest, err := newFileDest(context.Background(), x) + if err != nil { + t.Fatalf("newFileDest: %v", err) + } + payload := []byte("hello\n") + if _, err := dest.Send(context.Background(), &payload); err != nil { + t.Fatalf("Send: %v", err) + } + if err := dest.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + got, err := os.ReadFile(path) + if err != nil { + t.Fatal(err) + } + if string(got) != "hello\n" { + t.Errorf("file content = %q, want %q", got, "hello\n") + } +} + +func TestFileDest_emptyPath(t *testing.T) { + x := newTestXTCP(t, "file:") + if _, err := newFileDest(context.Background(), x); err == nil { + t.Fatal("expected error for empty file path") + } +} + +// TestTCPDest dials a listener, sends, and reads the bytes back verbatim. +func TestTCPDest(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + got := make(chan string, 1) + go func() { + conn, aerr := ln.Accept() + if aerr != nil { + got <- "" + return + } + defer conn.Close() + line, _ := bufio.NewReader(conn).ReadString('\n') + got <- line + }() + + x := newTestXTCP(t, "tcp:"+ln.Addr().String()) + dest, err := newTCPDest(context.Background(), x) + if err != nil { + t.Fatalf("newTCPDest: %v", err) + } + payload := []byte("tcp-line\n") + if _, err := dest.Send(context.Background(), &payload); err != nil { + t.Fatalf("Send: %v", err) + } + if line := <-got; line != "tcp-line\n" { + t.Errorf("received %q, want %q", line, "tcp-line\n") + } + if err := dest.Close(); err != nil { + t.Errorf("Close: %v", err) + } +} + +func TestTCPDest_dialError(t *testing.T) { + // Port 1 on localhost should refuse; dial must fail. + x := newTestXTCP(t, "tcp:127.0.0.1:1") + if _, err := newTCPDest(context.Background(), x); err == nil { + t.Fatal("expected dial error") + } +} + +// TestHTTPDest posts to a test server and checks body + Content-Type. +func TestHTTPDest(t *testing.T) { + type capture struct { + body string + contentType string + } + capCh := make(chan capture, 1) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + buf, _ := io.ReadAll(r.Body) + capCh <- capture{body: string(buf), contentType: r.Header.Get("Content-Type")} + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + x := newTestXTCP(t, srv.URL) + x.config.MarshalTo = MarshallerJSONL + dest, err := newHTTPDest(context.Background(), x) + if err != nil { + t.Fatalf("newHTTPDest: %v", err) + } + payload := []byte("{\"a\":1}\n") + if _, err := dest.Send(context.Background(), &payload); err != nil { + t.Fatalf("Send: %v", err) + } + c := <-capCh + if c.body != "{\"a\":1}\n" { + t.Errorf("posted body = %q", c.body) + } + if c.contentType != "application/x-ndjson" { + t.Errorf("content-type = %q, want application/x-ndjson", c.contentType) + } + if err := dest.Close(); err != nil { + t.Errorf("Close: %v", err) + } +} + +func TestHTTPDest_non2xx(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + x := newTestXTCP(t, srv.URL) + dest, err := newHTTPDest(context.Background(), x) + if err != nil { + t.Fatal(err) + } + payload := []byte("x") + if _, err := dest.Send(context.Background(), &payload); err == nil { + t.Fatal("expected error on 500 response") + } +} + +// TestNewStreamSchemesRegistered confirms all new schemes resolve. +func TestNewStreamSchemesRegistered(t *testing.T) { + for _, s := range []string{schemeStderr, schemeFile, schemeTCP, schemeHTTP, schemeHTTPS} { + if _, status := lookupDestinationFactory(s); status != destLookupFound { + t.Errorf("scheme %q not registered (status %v)", s, status) + } + } +} + +// Ensure the package's InitEnvelopeMarshallers + new dests don't deadlock when +// initialized together (smoke). +func TestInitWithNewFormats(t *testing.T) { + x, _ := newMarshalFixture(t) + x.config.MarshalTo = MarshallerCSV + var wg sync.WaitGroup + wg.Add(1) + x.InitEnvelopeMarshallers(&wg) + wg.Wait() + if x.EnvelopeMarshaller == nil { + t.Fatal("csv EnvelopeMarshaller nil") + } +} diff --git a/pkg/xtcp/destinations_tcp.go b/pkg/xtcp/destinations_tcp.go new file mode 100644 index 0000000..d141f95 --- /dev/null +++ b/pkg/xtcp/destinations_tcp.go @@ -0,0 +1,58 @@ +package xtcp + +import ( + "context" + "fmt" + "log" + "net" + "strings" +) + +// tcpDest writes each marshaled payload to a connected TCP socket: +// `-dest tcp:host:port`. TCP is the reliable, ordered transport most log/ +// metric shippers (Vector, Logstash, Fluentd, `nc`) expect — pair it with a +// line-delimited marshaller (jsonl/csv/tsv). Framing is the marshaller's job; +// tcpDest writes bytes verbatim so it never corrupts a length-delimited +// stream (e.g. protobufList). +// +// Unlike udp, this uses the syscall write path only (no io_uring variant yet; +// it can follow the udp pattern later). Send is invoked serially. +type tcpDest struct { + x *XTCP + conn net.Conn +} + +func newTCPDest(ctx context.Context, x *XTCP) (Destination, error) { + addr := strings.TrimPrefix(x.config.Dest, schemeTCP+":") + var dialer net.Dialer + conn, err := dialer.DialContext(ctx, "tcp", addr) + if err != nil { + return nil, fmt.Errorf("InitDestTCP net.Dial(tcp, %q): %w", addr, err) + } + return &tcpDest{x: x, conn: conn}, nil +} + +func (d *tcpDest) Send(_ context.Context, b *[]byte) (int, error) { + n, err := d.conn.Write(*b) + if err != nil { + d.x.pC.WithLabelValues("destTCP", "Write", "error").Inc() + if d.x.debugLevel > 100 { + log.Printf("destTCP conn.Write err:%v", err) + } + return 0, err + } + d.x.pC.WithLabelValues("destTCP", "Writes", "count").Inc() + d.x.pC.WithLabelValues("destTCP", "WriteBytes", "count").Add(float64(n)) + return 1, nil +} + +func (d *tcpDest) Close() error { + if d.conn != nil { + return d.conn.Close() + } + return nil +} + +func init() { + RegisterDestination(schemeTCP, newTCPDest) +} diff --git a/pkg/xtcp/flat_record_row.go b/pkg/xtcp/flat_record_row.go new file mode 100644 index 0000000..8b5e75b --- /dev/null +++ b/pkg/xtcp/flat_record_row.go @@ -0,0 +1,141 @@ +package xtcp + +import ( + "encoding/base64" + "fmt" + "strconv" + "strings" + "sync" + + "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" + "google.golang.org/protobuf/reflect/protoreflect" +) + +// Tabular (CSV/TSV) row encoding for XtcpFlatRecord. The column set is derived +// once from the protobuf descriptor via reflection, so it never drifts from +// the schema — adding a field to the .proto adds a column automatically. The +// few fields that are machine values in the wire format (IP-address bytes, the +// congestion enum, TCP-state integers, the nanosecond timestamp) are rendered +// human-readably when humanize is set; see humanize.go. + +type flatCol struct { + name string // protojson camelCase name — the CSV header cell + fd protoreflect.FieldDescriptor +} + +var ( + flatColsOnce sync.Once + flatColsAll []flatCol + flatColsIndex map[string]flatCol +) + +// flatColumns returns the full ordered column list (proto declaration order), +// computed once from the XtcpFlatRecord descriptor. +func flatColumns() []flatCol { + flatColsOnce.Do(func() { + fields := (&xtcp_flat_record.XtcpFlatRecord{}).ProtoReflect().Descriptor().Fields() + flatColsAll = make([]flatCol, 0, fields.Len()) + flatColsIndex = make(map[string]flatCol, fields.Len()) + for i := 0; i < fields.Len(); i++ { + fd := fields.Get(i) + c := flatCol{name: fd.JSONName(), fd: fd} + flatColsAll = append(flatColsAll, c) + flatColsIndex[c.name] = c + } + }) + return flatColsAll +} + +// selectColumns resolves a comma-separated `-columns` spec to an ordered +// column list. Empty (or whitespace) selects all columns. Unknown names are +// an error so a typo fails fast rather than silently dropping a column. +func selectColumns(spec string) ([]flatCol, error) { + all := flatColumns() + spec = strings.TrimSpace(spec) + if spec == "" { + return all, nil + } + parts := strings.Split(spec, ",") + out := make([]flatCol, 0, len(parts)) + for _, p := range parts { + name := strings.TrimSpace(p) + if name == "" { + continue + } + c, ok := flatColsIndex[name] + if !ok { + return nil, fmt.Errorf("unknown -columns field %q (expect an XtcpFlatRecord json name, e.g. hostname, inetDiagMsgSocketSourcePort, tcpInfoRtt)", name) + } + out = append(out, c) + } + if len(out) == 0 { + return all, nil + } + return out, nil +} + +func flatRecordHeader(cols []flatCol) []string { + h := make([]string, len(cols)) + for i, c := range cols { + h[i] = c.name + } + return h +} + +func flatRecordValues(r *xtcp_flat_record.XtcpFlatRecord, cols []flatCol, humanize bool) []string { + m := r.ProtoReflect() + out := make([]string, len(cols)) + for i, c := range cols { + out[i] = formatField(r, m, c, humanize) + } + return out +} + +// formatField renders one column. When humanize is set, the handful of +// machine-valued fields are formatted via the humanize.go helpers; everything +// else (and everything when humanize is false) goes through formatScalar. +func formatField(r *xtcp_flat_record.XtcpFlatRecord, m protoreflect.Message, c flatCol, humanize bool) string { + if humanize { + switch c.name { + case "inetDiagMsgSocketSource": + return ipString(r.GetInetDiagMsgFamily(), r.GetInetDiagMsgSocketSource()) + case "inetDiagMsgSocketDestination": + return ipString(r.GetInetDiagMsgFamily(), r.GetInetDiagMsgSocketDestination()) + case "inetDiagMsgState": + return tcpStateName(r.GetInetDiagMsgState()) + case "tcpInfoState": + return tcpStateName(r.GetTcpInfoState()) + case "congestionAlgorithmEnum": + return congestionAlgorithmName(r.GetCongestionAlgorithmEnum()) + case "timestampNs": + return timestampRFC3339(r.GetTimestampNs()) + } + } + return formatScalar(c.fd, m.Get(c.fd)) +} + +// formatScalar renders a protoreflect scalar value as a string. XtcpFlatRecord +// has no nested-message or repeated fields, so scalar coverage is sufficient; +// the two bytes fields are IP addresses (base64 here, humanized elsewhere). +func formatScalar(fd protoreflect.FieldDescriptor, v protoreflect.Value) string { + switch fd.Kind() { + case protoreflect.BoolKind: + return strconv.FormatBool(v.Bool()) + case protoreflect.Int32Kind, protoreflect.Sint32Kind, protoreflect.Sfixed32Kind, + protoreflect.Int64Kind, protoreflect.Sint64Kind, protoreflect.Sfixed64Kind: + return strconv.FormatInt(v.Int(), 10) + case protoreflect.Uint32Kind, protoreflect.Fixed32Kind, + protoreflect.Uint64Kind, protoreflect.Fixed64Kind: + return strconv.FormatUint(v.Uint(), 10) + case protoreflect.FloatKind, protoreflect.DoubleKind: + return strconv.FormatFloat(v.Float(), 'f', -1, 64) + case protoreflect.StringKind: + return v.String() + case protoreflect.BytesKind: + return base64.StdEncoding.EncodeToString(v.Bytes()) + case protoreflect.EnumKind: + return strconv.FormatInt(int64(v.Enum()), 10) + default: + return v.String() + } +} diff --git a/pkg/xtcp/flat_record_row_test.go b/pkg/xtcp/flat_record_row_test.go new file mode 100644 index 0000000..dae5c96 --- /dev/null +++ b/pkg/xtcp/flat_record_row_test.go @@ -0,0 +1,101 @@ +package xtcp + +import ( + "net" + "strings" + "testing" + + "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" +) + +func TestFlatColumns_allFields(t *testing.T) { + cols := flatColumns() + // XtcpFlatRecord field count (from the proto descriptor). Guard it so an + // accidental schema change is noticed; bump deliberately when fields are + // added to the .proto. + if len(cols) != 122 { + t.Errorf("flatColumns len = %d, want 122", len(cols)) + } + // Header names are the protojson camelCase names. + hdr := flatRecordHeader(cols) + if hdr[0] == "" { + t.Error("empty header cell") + } + want := map[string]bool{"hostname": false, "timestampNs": false, "congestionAlgorithmEnum": false} + for _, h := range hdr { + if _, ok := want[h]; ok { + want[h] = true + } + } + for name, seen := range want { + if !seen { + t.Errorf("expected column %q in header", name) + } + } +} + +func TestSelectColumns(t *testing.T) { + t.Run("empty selects all", func(t *testing.T) { + got, err := selectColumns("") + if err != nil { + t.Fatal(err) + } + if len(got) != len(flatColumns()) { + t.Errorf("empty spec selected %d cols, want all %d", len(got), len(flatColumns())) + } + }) + t.Run("subset preserves order", func(t *testing.T) { + got, err := selectColumns("hostname, tcpInfoRtt ,inetDiagMsgState") + if err != nil { + t.Fatal(err) + } + names := flatRecordHeader(got) + wantOrder := []string{"hostname", "tcpInfoRtt", "inetDiagMsgState"} + if strings.Join(names, ",") != strings.Join(wantOrder, ",") { + t.Errorf("subset = %v, want %v", names, wantOrder) + } + }) + t.Run("unknown column errors", func(t *testing.T) { + if _, err := selectColumns("hostname,not_a_field"); err == nil { + t.Fatal("expected error for unknown column") + } + }) +} + +func TestFlatRecordValues_humanize(t *testing.T) { + r := &xtcp_flat_record.XtcpFlatRecord{ + Hostname: "host-a", + InetDiagMsgFamily: afInet, + InetDiagMsgSocketSource: []byte(net.ParseIP("10.0.0.5").To4()), + InetDiagMsgSocketSourcePort: 443, + InetDiagMsgState: 10, // LISTEN + TcpInfoState: 10, + CongestionAlgorithmEnum: xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_CUBIC, + TimestampNs: 1_700_000_000_000_000_000, + } + cols, err := selectColumns("hostname,inetDiagMsgSocketSource,inetDiagMsgSocketSourcePort,inetDiagMsgState,congestionAlgorithmEnum") + if err != nil { + t.Fatal(err) + } + + // Humanized: address dotted-quad, state name, congestion name. + h := flatRecordValues(r, cols, true) + wantH := []string{"host-a", "10.0.0.5", "443", "LISTEN", "CUBIC"} + for i := range wantH { + if h[i] != wantH[i] { + t.Errorf("humanized[%d] = %q, want %q", i, h[i], wantH[i]) + } + } + + // Raw: address base64, state/enum numeric. + raw := flatRecordValues(r, cols, false) + if raw[1] == "10.0.0.5" { + t.Errorf("raw address should not be dotted-quad: %q", raw[1]) + } + if raw[3] != "10" { + t.Errorf("raw state = %q, want \"10\"", raw[3]) + } + if raw[4] != "1" { + t.Errorf("raw congestion enum = %q, want \"1\"", raw[4]) + } +} diff --git a/pkg/xtcp/humanize.go b/pkg/xtcp/humanize.go new file mode 100644 index 0000000..257faab --- /dev/null +++ b/pkg/xtcp/humanize.go @@ -0,0 +1,93 @@ +package xtcp + +import ( + "net" + "strconv" + "strings" + "time" + + "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" +) + +// Human-readable formatting for the machine values in an XtcpFlatRecord. +// Used by the CSV/TSV marshallers so a column of IP addresses, TCP states, +// and congestion algorithms is actually readable (the raw record stores +// addresses as bytes, state as a kernel integer, and the algorithm as an +// enum). The JSON/protobuf formats keep the raw values. + +const ( + afInet = 2 // AF_INET + afInet6 = 10 // AF_INET6 +) + +// ipString renders an inet_diag address as a dotted-quad or RFC-5952 IPv6 +// string. The kernel returns the address in a 16-byte __be32[4] slot +// regardless of family, with only the first 4 bytes meaningful for IPv4 — so +// family is authoritative and must be consulted before length, otherwise an +// IPv4 address (e.g. 192.168.122.1) in a 16-byte buffer is misread as IPv6 +// ("c0a8:7a01::"). Empty input → "". +func ipString(family uint32, b []byte) string { + if len(b) == 0 { + return "" + } + switch family { + case afInet: + if len(b) >= net.IPv4len { + return net.IP(b[:net.IPv4len]).String() + } + case afInet6: + if len(b) >= net.IPv6len { + return net.IP(b[:net.IPv6len]).String() + } + } + // Unknown/unset family: fall back to the byte length. + return net.IP(b).String() +} + +// tcpStateNames maps the kernel TCP state integers (include/net/tcp_states.h) +// to their conventional names — the same names `ss` prints. xtcp2 carries the +// state as a bare uint8 (no protobuf enum), so the mapping lives here. +var tcpStateNames = map[uint32]string{ + 1: "ESTABLISHED", + 2: "SYN_SENT", + 3: "SYN_RECV", + 4: "FIN_WAIT1", + 5: "FIN_WAIT2", + 6: "TIME_WAIT", + 7: "CLOSE", + 8: "CLOSE_WAIT", + 9: "LAST_ACK", + 10: "LISTEN", + 11: "CLOSING", + 12: "NEW_SYN_RECV", +} + +// tcpStateName returns the conventional name for a TCP state integer, or the +// decimal value as a string for anything outside the known range. +func tcpStateName(state uint32) string { + if name, ok := tcpStateNames[state]; ok { + return name + } + return strconv.FormatUint(uint64(state), 10) +} + +// congestionAlgorithmName returns the short congestion-control name (e.g. +// "CUBIC", "BBR3") by trimming the generated enum's CONGESTION_ALGORITHM_ +// prefix. UNSPECIFIED renders as "". +func congestionAlgorithmName(e xtcp_flat_record.XtcpFlatRecord_CongestionAlgorithm) string { + if e == xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_UNSPECIFIED { + return "" + } + return strings.TrimPrefix(e.String(), "CONGESTION_ALGORITHM_") +} + +// timestampRFC3339 formats the record's timestamp_ns (Unix nanoseconds, held +// as a double) as RFC3339 with nanosecond precision in UTC. Zero → "". +func timestampRFC3339(ns float64) string { + if ns <= 0 { + return "" + } + sec := int64(ns) / 1e9 + nsec := int64(ns) % 1e9 + return time.Unix(sec, nsec).UTC().Format(time.RFC3339Nano) +} diff --git a/pkg/xtcp/humanize_test.go b/pkg/xtcp/humanize_test.go new file mode 100644 index 0000000..55c305a --- /dev/null +++ b/pkg/xtcp/humanize_test.go @@ -0,0 +1,70 @@ +package xtcp + +import ( + "net" + "strings" + "testing" + + "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" +) + +func TestIPString(t *testing.T) { + cases := []struct { + name string + family uint32 + in []byte + want string + }{ + {"empty", afInet, nil, ""}, + {"ipv4", afInet, []byte{192, 168, 0, 1}, "192.168.0.1"}, + {"ipv4 loopback", afInet, []byte{127, 0, 0, 1}, "127.0.0.1"}, + // IPv4 in the kernel's 16-byte slot: family must win over length so + // it isn't misread as IPv6 (regression guard for "c0a8:7a01::"). + {"ipv4 in 16-byte slot", afInet, append([]byte{192, 168, 122, 1}, make([]byte, 12)...), "192.168.122.1"}, + {"ipv6 loopback", afInet6, net.IPv6loopback, "::1"}, + {"ipv6 full", afInet6, net.ParseIP("2001:db8::1").To16(), "2001:db8::1"}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + if got := ipString(c.family, c.in); got != c.want { + t.Errorf("ipString(%d, %v) = %q, want %q", c.family, c.in, got, c.want) + } + }) + } +} + +func TestTCPStateName(t *testing.T) { + if got := tcpStateName(10); got != "LISTEN" { + t.Errorf("state 10 = %q, want LISTEN", got) + } + if got := tcpStateName(1); got != "ESTABLISHED" { + t.Errorf("state 1 = %q, want ESTABLISHED", got) + } + // Unknown state falls back to the decimal value. + if got := tcpStateName(99); got != "99" { + t.Errorf("state 99 = %q, want \"99\"", got) + } +} + +func TestCongestionAlgorithmName(t *testing.T) { + if got := congestionAlgorithmName(xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_CUBIC); got != "CUBIC" { + t.Errorf("CUBIC name = %q", got) + } + if got := congestionAlgorithmName(xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_BBR3); got != "BBR3" { + t.Errorf("BBR3 name = %q", got) + } + if got := congestionAlgorithmName(xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_UNSPECIFIED); got != "" { + t.Errorf("UNSPECIFIED name = %q, want empty", got) + } +} + +func TestTimestampRFC3339(t *testing.T) { + if got := timestampRFC3339(0); got != "" { + t.Errorf("zero ts = %q, want empty", got) + } + // 1_700_000_000.5s expressed in ns. + got := timestampRFC3339(1_700_000_000_500_000_000) + if !strings.HasPrefix(got, "2023-11-14T") || !strings.HasSuffix(got, "Z") { + t.Errorf("ts = %q, want a 2023-11-14 UTC RFC3339 value", got) + } +} diff --git a/pkg/xtcp/input_validation.go b/pkg/xtcp/input_validation.go index b8126a7..4fb5bbe 100644 --- a/pkg/xtcp/input_validation.go +++ b/pkg/xtcp/input_validation.go @@ -26,7 +26,7 @@ func (x *XTCP) validateInput() error { } } - if x.config.Dest != schemeNull && x.config.Dest != schemeStdout { + if x.config.Dest != schemeNull && x.config.Dest != schemeStdout && x.config.Dest != schemeStderr { scheme, _, found := strings.Cut(x.config.Dest, ":") if !found { @@ -45,7 +45,8 @@ func (x *XTCP) validateInput() error { // schemeNullPrefix colon) failed validation as "must contain x2 // colons" while the registry happily had a "null" factory. switch scheme { - case schemeUnix, schemeUnixgram, schemeNull, schemeStdout, schemeS3Parquet: + case schemeUnix, schemeUnixgram, schemeNull, schemeStdout, schemeStderr, + schemeFile, schemeHTTP, schemeHTTPS, schemeS3Parquet: // only the leading `:` separator is required; the // per-destination factory validates the rest further. s3parquet // accepts a URL (http://host:port) which has its own colons. diff --git a/pkg/xtcp/input_validation_test.go b/pkg/xtcp/input_validation_test.go index e834457..48afb34 100644 --- a/pkg/xtcp/input_validation_test.go +++ b/pkg/xtcp/input_validation_test.go @@ -105,6 +105,46 @@ func TestValidateInput_happyPaths(t *testing.T) { Topic: "xtcp", }, }, + { + name: "stderr dest bare", + cfg: &xtcp_config.XtcpConfig{ + MarshalTo: MarshallerProtoJSON, + Dest: schemeStderr, + Topic: "xtcp", + }, + }, + { + name: "file dest with path", + cfg: &xtcp_config.XtcpConfig{ + MarshalTo: MarshallerProtoJSON, + Dest: "file:/var/log/xtcp.csv", + Topic: "xtcp", + }, + }, + { + name: "tcp dest host:port", + cfg: &xtcp_config.XtcpConfig{ + MarshalTo: MarshallerProtoJSON, + Dest: "tcp:127.0.0.1:9999", + Topic: "xtcp", + }, + }, + { + name: "http dest url", + cfg: &xtcp_config.XtcpConfig{ + MarshalTo: MarshallerProtoJSON, + Dest: "http://127.0.0.1:8080/ingest", + Topic: "xtcp", + }, + }, + { + name: "https dest url with port", + cfg: &xtcp_config.XtcpConfig{ + MarshalTo: MarshallerProtoJSON, + Dest: "https://collector.example.com:443/v1/ingest", + Topic: "xtcp", + }, + }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { diff --git a/pkg/xtcp/marshallers.go b/pkg/xtcp/marshallers.go index 8149baa..09a4db1 100644 --- a/pkg/xtcp/marshallers.go +++ b/pkg/xtcp/marshallers.go @@ -19,6 +19,9 @@ const ( MarshallerProtoJSON = "protoJson" MarshallerProtoText = "protoText" MarshallerMsgPack = "msgpack" + MarshallerJSONL = "jsonl" // one JSON record per line (NDJSON / ClickHouse JSONEachRow) + MarshallerCSV = "csv" // comma-separated, humanized, header once + MarshallerTSV = "tsv" // tab-separated, humanized, header once ) // Envelope-size safety valves. Two independent thresholds — the @@ -51,6 +54,9 @@ var ( MarshallerProtoJSON: true, MarshallerProtoText: true, MarshallerMsgPack: true, + MarshallerJSONL: true, + MarshallerCSV: true, + MarshallerTSV: true, } ) @@ -123,6 +129,12 @@ func (x *XTCP) InitEnvelopeMarshallers(wg *sync.WaitGroup) { return x.envelopeMsgPackMarshal(e) }) + // Tabular + per-record-line formats for easy ad-hoc analysis. csv/tsv + // share the reflection row encoder and humanize machine values; jsonl + // emits one raw JSON record per line. Registered here (see + // marshallers_text.go) so they flow through the envelope pipeline. + x.registerTextEnvelopeMarshallers() + if f, ok := x.EnvelopeMarshallers.Load(x.config.MarshalTo); ok { if m, ok2 := f.(func(e *xtcp_flat_record.Envelope) (buf *[]byte)); ok2 { x.EnvelopeMarshaller = m @@ -163,10 +175,12 @@ func (w *ByteSliceWriter) Write(b []byte) (n int, err error) { } // envelopeProtoJSONMarshal marshals a whole Envelope (batch of rows) to -// compact single-line JSON — one JSON object per flush, i.e. NDJSON when the -// stdout destination appends a newline. Pairs with `-dest stdout` for -// jq-able local output. (protojson.Marshal is compact; protojson.Format is -// multi-line pretty-print and would break the one-object-per-line contract.) +// compact single-line JSON, newline-terminated — one JSON object per flush, +// i.e. NDJSON. Pairs with `-dest stdout` for jq-able local output. +// (protojson.Marshal is compact; protojson.Format is multi-line pretty-print +// and would break the one-object-per-line contract.) The trailing newline is +// the marshaller's framing responsibility — writerDest/tcp/http write bytes +// verbatim. func (x *XTCP) envelopeProtoJSONMarshal(e *xtcp_flat_record.Envelope) (buf *[]byte) { b, err := protojson.Marshal(e) if err != nil { @@ -175,12 +189,15 @@ func (x *XTCP) envelopeProtoJSONMarshal(e *xtcp_flat_record.Envelope) (buf *[]by log.Println("protojson.Marshal(envelope) err: ", err) } } + b = append(b, '\n') return &b } -// envelopeProtoTextMarshal marshals a whole Envelope to protobuf text. +// envelopeProtoTextMarshal marshals a whole Envelope to protobuf text, +// newline-terminated so consecutive flushes stay separated on a stream. func (x *XTCP) envelopeProtoTextMarshal(e *xtcp_flat_record.Envelope) (buf *[]byte) { b := []byte(prototext.Format(e)) + b = append(b, '\n') return &b } diff --git a/pkg/xtcp/marshallers_text.go b/pkg/xtcp/marshallers_text.go new file mode 100644 index 0000000..9a3d500 --- /dev/null +++ b/pkg/xtcp/marshallers_text.go @@ -0,0 +1,105 @@ +package xtcp + +import ( + "bytes" + "encoding/csv" + "log" + "sync/atomic" + + "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" + "google.golang.org/protobuf/encoding/protojson" +) + +// registerTextEnvelopeMarshallers wires the line/tabular envelope marshallers +// (jsonl, csv, tsv) into x.EnvelopeMarshallers. Each is an envelope marshaller +// that iterates Envelope.Row and owns its trailing-newline framing (writerDest +// and the tcp/http sinks write bytes verbatim). +// +// csv/tsv resolve their column set from -columns once here; an invalid spec +// fatals at init only when one of them is the selected format, so a stray +// -columns alongside protoJson is harmless. +func (x *XTCP) registerTextEnvelopeMarshallers() { + x.EnvelopeMarshallers.Store(MarshallerJSONL, func(e *xtcp_flat_record.Envelope) (buf *[]byte) { + return x.envelopeJSONLMarshal(e) + }) + + cols := flatColumns() + if x.config.MarshalTo == MarshallerCSV || x.config.MarshalTo == MarshallerTSV { + c, err := selectColumns(x.config.CsvColumns) + if err != nil { + x.callFatalf("InitEnvelopeMarshallers -columns: %v", err) + return + } + cols = c + } + + // Separate header-written guards so csv and tsv each emit their header + // exactly once per process on whichever stream they feed. + var csvHeader, tsvHeader atomic.Bool + x.EnvelopeMarshallers.Store(MarshallerCSV, func(e *xtcp_flat_record.Envelope) (buf *[]byte) { + return x.envelopeDelimitedMarshal(e, cols, ',', &csvHeader) + }) + x.EnvelopeMarshallers.Store(MarshallerTSV, func(e *xtcp_flat_record.Envelope) (buf *[]byte) { + return x.envelopeDelimitedMarshal(e, cols, '\t', &tsvHeader) + }) +} + +// envelopeJSONLMarshal emits one compact JSON object per row, each on its own +// line (NDJSON / ClickHouse JSONEachRow). Values are raw (machine) — use +// csv/tsv for humanized addresses/states. +func (x *XTCP) envelopeJSONLMarshal(e *xtcp_flat_record.Envelope) (buf *[]byte) { + var b bytes.Buffer + for _, r := range e.Row { + line, err := protojson.Marshal(r) + if err != nil { + x.pC.WithLabelValues("envelopeJSONLMarshal", "Marshal", "error").Inc() + if x.debugLevel > 10 { + log.Println("envelopeJSONLMarshal protojson.Marshal err: ", err) + } + continue + } + b.Write(line) + b.WriteByte('\n') + } + out := b.Bytes() + return &out +} + +// envelopeDelimitedMarshal renders the envelope's rows as delimited text +// (CSV or TSV depending on comma), humanized, with the header written once. +// encoding/csv already terminates every record with '\n', so the block is +// self-framing. +func (x *XTCP) envelopeDelimitedMarshal(e *xtcp_flat_record.Envelope, cols []flatCol, comma rune, headerWritten *atomic.Bool) (buf *[]byte) { + var b bytes.Buffer + w := csv.NewWriter(&b) + w.Comma = comma + + if headerWritten.CompareAndSwap(false, true) { + if err := w.Write(flatRecordHeader(cols)); err != nil { + x.pC.WithLabelValues("envelopeDelimitedMarshal", "header", "error").Inc() + if x.debugLevel > 10 { + log.Println("envelopeDelimitedMarshal header err: ", err) + } + } + } + + for _, r := range e.Row { + if err := w.Write(flatRecordValues(r, cols, true)); err != nil { + x.pC.WithLabelValues("envelopeDelimitedMarshal", "row", "error").Inc() + if x.debugLevel > 10 { + log.Println("envelopeDelimitedMarshal row err: ", err) + } + } + } + + w.Flush() + if err := w.Error(); err != nil { + x.pC.WithLabelValues("envelopeDelimitedMarshal", "flush", "error").Inc() + if x.debugLevel > 10 { + log.Println("envelopeDelimitedMarshal flush err: ", err) + } + } + + out := b.Bytes() + return &out +} diff --git a/pkg/xtcp/marshallers_text_test.go b/pkg/xtcp/marshallers_text_test.go new file mode 100644 index 0000000..bbd529c --- /dev/null +++ b/pkg/xtcp/marshallers_text_test.go @@ -0,0 +1,116 @@ +package xtcp + +import ( + "bytes" + "encoding/csv" + "encoding/json" + "strings" + "sync" + "sync/atomic" + "testing" + + "github.com/randomizedcoder/xtcp2/pkg/xtcp_flat_record" +) + +func sampleEnvelope() *xtcp_flat_record.Envelope { + return &xtcp_flat_record.Envelope{ + Row: []*xtcp_flat_record.XtcpFlatRecord{ + {Hostname: "host-a", InetDiagMsgFamily: afInet, InetDiagMsgState: 10}, + {Hostname: "host-b", InetDiagMsgFamily: afInet, InetDiagMsgState: 1}, + }, + } +} + +func TestEnvelopeJSONLMarshal(t *testing.T) { + x, _ := newMarshalFixture(t) + buf := x.envelopeJSONLMarshal(sampleEnvelope()) + lines := strings.Split(strings.TrimRight(string(*buf), "\n"), "\n") + if len(lines) != 2 { + t.Fatalf("got %d lines, want 2: %q", len(lines), string(*buf)) + } + for i, ln := range lines { + var m map[string]any + if err := json.Unmarshal([]byte(ln), &m); err != nil { + t.Errorf("line %d not valid JSON: %v (%q)", i, err, ln) + } + } + // Trailing newline is part of the framing contract. + if !bytes.HasSuffix(*buf, []byte("\n")) { + t.Error("jsonl output must end with a newline") + } +} + +func TestEnvelopeDelimitedMarshal_csv(t *testing.T) { + x, _ := newMarshalFixture(t) + cols, err := selectColumns("hostname,inetDiagMsgState") + if err != nil { + t.Fatal(err) + } + var header atomic.Bool + + // First flush includes the header. + buf := x.envelopeDelimitedMarshal(sampleEnvelope(), cols, ',', &header) + rows, err := csv.NewReader(bytes.NewReader(*buf)).ReadAll() + if err != nil { + t.Fatalf("csv parse: %v", err) + } + if len(rows) != 3 { // header + 2 records + t.Fatalf("got %d csv rows, want 3 (header+2): %v", len(rows), rows) + } + if rows[0][0] != "hostname" || rows[0][1] != "inetDiagMsgState" { + t.Errorf("header = %v", rows[0]) + } + // Humanized state: 10 → LISTEN, 1 → ESTABLISHED. + if rows[1][1] != "LISTEN" || rows[2][1] != "ESTABLISHED" { + t.Errorf("humanized state cells = %q, %q", rows[1][1], rows[2][1]) + } + + // Second flush omits the header (header-once). + buf2 := x.envelopeDelimitedMarshal(sampleEnvelope(), cols, ',', &header) + rows2, err := csv.NewReader(bytes.NewReader(*buf2)).ReadAll() + if err != nil { + t.Fatalf("csv parse 2: %v", err) + } + if len(rows2) != 2 { + t.Errorf("second flush rows = %d, want 2 (no header)", len(rows2)) + } +} + +func TestEnvelopeDelimitedMarshal_tsv(t *testing.T) { + x, _ := newMarshalFixture(t) + cols, _ := selectColumns("hostname,inetDiagMsgState") + var header atomic.Bool + buf := x.envelopeDelimitedMarshal(sampleEnvelope(), cols, '\t', &header) + if !strings.Contains(string(*buf), "\t") { + t.Errorf("tsv output should contain tabs: %q", string(*buf)) + } + r := csv.NewReader(bytes.NewReader(*buf)) + r.Comma = '\t' + rows, err := r.ReadAll() + if err != nil { + t.Fatalf("tsv parse: %v", err) + } + if len(rows) != 3 { + t.Errorf("tsv rows = %d, want 3", len(rows)) + } +} + +func TestRegisterTextEnvelopeMarshallers_selected(t *testing.T) { + for _, name := range []string{MarshallerJSONL, MarshallerCSV, MarshallerTSV} { + t.Run(name, func(t *testing.T) { + x, _ := newMarshalFixture(t) + x.config.MarshalTo = name + var wg sync.WaitGroup + wg.Add(1) + x.InitEnvelopeMarshallers(&wg) + wg.Wait() + if x.EnvelopeMarshaller == nil { + t.Fatalf("EnvelopeMarshaller nil for %q", name) + } + buf := x.EnvelopeMarshaller(sampleEnvelope()) + if buf == nil || len(*buf) == 0 { + t.Fatalf("%q produced empty output", name) + } + }) + } +} diff --git a/pkg/xtcp_config/xtcp_config.pb.go b/pkg/xtcp_config/xtcp_config.pb.go index 2a5b954..aaa94c8 100644 --- a/pkg/xtcp_config/xtcp_config.pb.go +++ b/pkg/xtcp_config/xtcp_config.pb.go @@ -479,8 +479,13 @@ type XtcpConfig struct { // userland loop overhead but increase scheduling latency for the // netlinker goroutine. Ignored unless io_uring=true. Default 128. IoUringCqeBatchSize uint32 `protobuf:"varint,212,opt,name=io_uring_cqe_batch_size,json=ioUringCqeBatchSize,proto3" json:"io_uring_cqe_batch_size,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // Comma-separated subset of XtcpFlatRecord json field names selecting + // which columns the csv/tsv marshallers emit (e.g. + // "hostname,inetDiagMsgSocketSourcePort,inetDiagMsgState,tcpInfoRtt"). + // Empty = all fields. Ignored by non-tabular marshallers. + CsvColumns string `protobuf:"bytes,220,opt,name=csv_columns,json=csvColumns,proto3" json:"csv_columns,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *XtcpConfig) Reset() { @@ -800,6 +805,13 @@ func (x *XtcpConfig) GetIoUringCqeBatchSize() uint32 { return 0 } +func (x *XtcpConfig) GetCsvColumns() string { + if x != nil { + return x.CsvColumns + } + return "" +} + type EnabledDeserializers struct { state protoimpl.MessageState `protogen:"open.v1"` Enabled map[string]bool `protobuf:"bytes,1,rep,name=enabled,proto3" json:"enabled,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"` @@ -863,7 +875,7 @@ const file_xtcp_config_v1_xtcp_config_proto_rawDesc = "" + "\fpoll_timeout\x18\x1e \x01(\v2\x19.google.protobuf.DurationB\x11\xbaH\x0e\xc8\x01\x01\xaa\x01\b\"\x04\b\x80\xf5$2\x00R\vpollTimeout:s\xbaHp\x1an\n" + "\x0fXtcpConfig.poll\x122Poll timeout must be less than poll poll_frequency\x1a'this.poll_timeout < this.poll_frequency\"N\n" + "\x18SetPollFrequencyResponse\x122\n" + - "\x06config\x18\x01 \x01(\v2\x1a.xtcp_config.v1.XtcpConfigR\x06config\"\xe8\x12\n" + + "\x06config\x18\x01 \x01(\v2\x1a.xtcp_config.v1.XtcpConfigR\x06config\"\x92\x13\n" + "\n" + "XtcpConfig\x12F\n" + "\x17nl_timeout_milliseconds\x18\n" + @@ -887,7 +899,7 @@ const file_xtcp_config_v1_xtcp_config_proto_rawDesc = "" + "\fcapture_path\x18d \x01(\tB\f\xbaH\t\xc8\x01\x00r\x04\x10\x01\x18PR\vcapturePath\x12(\n" + "\amodulus\x18n \x01(\x04B\x0e\xbaH\v\xc8\x01\x012\x06\x18\xc0\x84=(\x01R\amodulus\x12+\n" + "\n" + - "marshal_to\x18x \x01(\tB\f\xbaH\t\xc8\x01\x01r\x04\x10\x04\x18(R\tmarshalTo\x12K\n" + + "marshal_to\x18x \x01(\tB\f\xbaH\t\xc8\x01\x01r\x04\x10\x03\x18(R\tmarshalTo\x12K\n" + "\x1eenvelope_flush_threshold_bytes\x18z \x01(\rB\x06\xbaH\x03\xc8\x01\x00R\x1benvelopeFlushThresholdBytes\x12I\n" + "\x1denvelope_flush_threshold_rows\x18{ \x01(\rB\x06\xbaH\x03\xc8\x01\x00R\x1aenvelopeFlushThresholdRows\x123\n" + "\x11kafka_compression\x18| \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\x10kafkaCompression\x12'\n" + @@ -924,7 +936,9 @@ const file_xtcp_config_v1_xtcp_config_proto_rawDesc = "" + "\x18io_uring_recv_batch_size\x18\xd3\x01 \x01(\rB\r\xbaH\n" + "\xc8\x01\x00*\x05\x18\x80 (\x01R\x14ioUringRecvBatchSize\x12D\n" + "\x17io_uring_cqe_batch_size\x18\xd4\x01 \x01(\rB\r\xbaH\n" + - "\xc8\x01\x00*\x05\x18\x80 (\x01R\x13ioUringCqeBatchSize:s\xbaHp\x1an\n" + + "\xc8\x01\x00*\x05\x18\x80 (\x01R\x13ioUringCqeBatchSize\x12(\n" + + "\vcsv_columns\x18\xdc\x01 \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\n" + + "csvColumns:s\xbaHp\x1an\n" + "\x0fXtcpConfig.poll\x122Poll timeout must be less than poll poll_frequency\x1a'this.poll_frequency > this.poll_timeout\"\x9f\x01\n" + "\x14EnabledDeserializers\x12K\n" + "\aenabled\x18\x01 \x03(\v21.xtcp_config.v1.EnabledDeserializers.EnabledEntryR\aenabled\x1a:\n" + diff --git a/proto/xtcp_config/v1/xtcp_config.proto b/proto/xtcp_config/v1/xtcp_config.proto index a037c10..01a9799 100644 --- a/proto/xtcp_config/v1/xtcp_config.proto +++ b/proto/xtcp_config/v1/xtcp_config.proto @@ -245,7 +245,7 @@ message XtcpConfig { string marshal_to = 120 [ (buf.validate.field).required = true, (buf.validate.field).string = { - min_len: 4, + min_len: 3, max_len: 40, }]; @@ -512,6 +512,14 @@ message XtcpConfig { gte: 1, lte: 4096 }]; + + // Comma-separated subset of XtcpFlatRecord json field names selecting + // which columns the csv/tsv marshallers emit (e.g. + // "hostname,inetDiagMsgSocketSourcePort,inetDiagMsgState,tcpInfoRtt"). + // Empty = all fields. Ignored by non-tabular marshallers. + string csv_columns = 220 [ + (buf.validate.field).required = false + ]; }; message EnabledDeserializers { diff --git a/python/xtcp_config/v1/xtcp_config_pb2.py b/python/xtcp_config/v1/xtcp_config_pb2.py index 5019481..5b465c2 100644 --- a/python/xtcp_config/v1/xtcp_config_pb2.py +++ b/python/xtcp_config/v1/xtcp_config_pb2.py @@ -27,7 +27,7 @@ from buf.validate import validate_pb2 as buf_dot_validate_dot_validate__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n xtcp_config/v1/xtcp_config.proto\x12\x0extcp_config.v1\x1a\x1egoogle/protobuf/duration.proto\x1a\x1cgoogle/api/annotations.proto\x1a\x1b\x62uf/validate/validate.proto\"\x0c\n\nGetRequest\"A\n\x0bGetResponse\x12\x32\n\x06\x63onfig\x18\x01 \x01(\x0b\x32\x1a.xtcp_config.v1.XtcpConfigR\x06\x63onfig\"@\n\nSetRequest\x12\x32\n\x06\x63onfig\x18\x01 \x01(\x0b\x32\x1a.xtcp_config.v1.XtcpConfigR\x06\x63onfig\"A\n\x0bSetResponse\x12\x32\n\x06\x63onfig\x18\x01 \x01(\x0b\x32\x1a.xtcp_config.v1.XtcpConfigR\x06\x63onfig\"\xb4\x02\n\x17SetPollFrequencyRequest\x12S\n\x0epoll_frequency\x18\x14 \x01(\x0b\x32\x19.google.protobuf.DurationB\x11\xbaH\x0e\xaa\x01\x08\"\x04\x08\x80\xf5$2\x00\xc8\x01\x01R\rpollFrequency\x12O\n\x0cpoll_timeout\x18\x1e \x01(\x0b\x32\x19.google.protobuf.DurationB\x11\xbaH\x0e\xaa\x01\x08\"\x04\x08\x80\xf5$2\x00\xc8\x01\x01R\x0bpollTimeout:s\xbaHp\x1an\n\x0fXtcpConfig.poll\x12\x32Poll timeout must be less than poll poll_frequency\x1a\'this.poll_timeout < this.poll_frequency\"N\n\x18SetPollFrequencyResponse\x12\x32\n\x06\x63onfig\x18\x01 \x01(\x0b\x32\x1a.xtcp_config.v1.XtcpConfigR\x06\x63onfig\"\xe8\x12\n\nXtcpConfig\x12\x46\n\x17nl_timeout_milliseconds\x18\n \x01(\x04\x42\x0e\xbaH\x0b\x32\x06\x18\xa0\x8d\x06(\x00\xc8\x01\x01R\x15nlTimeoutMilliseconds\x12S\n\x0epoll_frequency\x18\x14 \x01(\x0b\x32\x19.google.protobuf.DurationB\x11\xbaH\x0e\xaa\x01\x08\"\x04\x08\x80\xf5$*\x00\xc8\x01\x01R\rpollFrequency\x12O\n\x0cpoll_timeout\x18\x1e \x01(\x0b\x32\x19.google.protobuf.DurationB\x11\xbaH\x0e\xaa\x01\x08\"\x04\x08\x80\xf5$*\x00\xc8\x01\x01R\x0bpollTimeout\x12+\n\tmax_loops\x18( \x01(\x04\x42\x0e\xbaH\x0b\x32\x06\x18\xa0\x8d\x06(\x00\xc8\x01\x00R\x08maxLoops\x12,\n\nnetlinkers\x18\x32 \x01(\rB\x0c\xbaH\t*\x04\x18\x64(\x01\xc8\x01\x01R\nnetlinkers\x12H\n\x19netlinkers_done_chan_size\x18\x33 \x01(\rB\r\xbaH\n*\x05\x18\xe8\x07(\x01\xc8\x01\x01R\x16netlinkersDoneChanSize\x12*\n\tnlmsg_seq\x18< \x01(\rB\r\xbaH\n*\x05\x18\x90N(\x00\xc8\x01\x01R\x08nlmsgSeq\x12/\n\x0bpacket_size\x18\x46 \x01(\x04\x42\x0e\xbaH\x0b\x32\x06\x18\xc0\x84=(\x00\xc8\x01\x00R\npacketSize\x12\x36\n\x10packet_size_mply\x18P \x01(\rB\x0c\xbaH\t*\x04\x18\x64(\x00\xc8\x01\x00R\x0epacketSizeMply\x12.\n\x0bwrite_files\x18Z \x01(\rB\r\xbaH\n*\x05\x18\xe8\x07(\x00\xc8\x01\x00R\nwriteFiles\x12/\n\x0c\x63\x61pture_path\x18\x64 \x01(\tB\x0c\xbaH\tr\x04\x10\x01\x18P\xc8\x01\x00R\x0b\x63\x61pturePath\x12(\n\x07modulus\x18n \x01(\x04\x42\x0e\xbaH\x0b\x32\x06\x18\xc0\x84=(\x01\xc8\x01\x01R\x07modulus\x12+\n\nmarshal_to\x18x \x01(\tB\x0c\xbaH\tr\x04\x10\x04\x18(\xc8\x01\x01R\tmarshalTo\x12K\n\x1e\x65nvelope_flush_threshold_bytes\x18z \x01(\rB\x06\xbaH\x03\xc8\x01\x00R\x1b\x65nvelopeFlushThresholdBytes\x12I\n\x1d\x65nvelope_flush_threshold_rows\x18{ \x01(\rB\x06\xbaH\x03\xc8\x01\x00R\x1a\x65nvelopeFlushThresholdRows\x12\x33\n\x11kafka_compression\x18| \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\x10kafkaCompression\x12\'\n\x0bs3_endpoint\x18} \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\ns3Endpoint\x12#\n\ts3_bucket\x18~ \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\x08s3Bucket\x12#\n\ts3_prefix\x18\x7f \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\x08s3Prefix\x12+\n\rs3_access_key\x18\x80\x01 \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\x0bs3AccessKey\x12+\n\rs3_secret_key\x18\x81\x01 \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\x0bs3SecretKey\x12O\n s3_parquet_flush_threshold_bytes\x18\x84\x01 \x01(\rB\x06\xbaH\x03\xc8\x01\x00R\x1cs3ParquetFlushThresholdBytes\x12$\n\ts3_region\x18\x85\x01 \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\x08s3Region\x12,\n\rpyroscope_url\x18\x88\x01 \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\x0cpyroscopeUrl\x12\x35\n\x12pyroscope_app_name\x18\x89\x01 \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\x10pyroscopeAppName\x12\x37\n\x13pyroscope_sample_hz\x18\x8a\x01 \x01(\rB\x06\xbaH\x03\xc8\x01\x00R\x11pyroscopeSampleHz\x12J\n\x1dpyroscope_upload_interval_sec\x18\x8b\x01 \x01(\rB\x06\xbaH\x03\xc8\x01\x00R\x1apyroscopeUploadIntervalSec\x12\"\n\x04\x64\x65st\x18\x82\x01 \x01(\tB\r\xbaH\nr\x05\x10\x04\x18\x80\x01\xc8\x01\x01R\x04\x64\x65st\x12\x38\n\x10\x64\x65st_write_files\x18\x87\x01 \x01(\rB\r\xbaH\n*\x05\x18\xe8\x07(\x00\xc8\x01\x00R\x0e\x64\x65stWriteFiles\x12#\n\x05topic\x18\x8c\x01 \x01(\tB\x0c\xbaH\tr\x04\x10\x01\x18(\xc8\x01\x00R\x05topic\x12\x35\n\x0fxtcp_proto_file\x18\x8f\x01 \x01(\tB\x0c\xbaH\tr\x04\x10\x01\x18P\xc8\x01\x00R\rxtcpProtoFile\x12\x37\n\x10kafka_schema_url\x18\x91\x01 \x01(\tB\x0c\xbaH\tr\x04\x10\x01\x18<\xc8\x01\x00R\x0ekafkaSchemaUrl\x12`\n\x15kafka_produce_timeout\x18\x96\x01 \x01(\x0b\x32\x19.google.protobuf.DurationB\x10\xbaH\r\xaa\x01\x07\"\x03\x08\xd8\x04\x32\x00\xc8\x01\x00R\x13kafkaProduceTimeout\x12/\n\x0b\x64\x65\x62ug_level\x18\xa0\x01 \x01(\rB\r\xbaH\n*\x05\x18\xe8\x07(\x00\xc8\x01\x01R\ndebugLevel\x12!\n\x05label\x18\xaa\x01 \x01(\tB\n\xbaH\x07r\x02\x18(\xc8\x01\x00R\x05label\x12\x1d\n\x03tag\x18\xb4\x01 \x01(\tB\n\xbaH\x07r\x02\x18(\xc8\x01\x00R\x03tag\x12,\n\tgrpc_port\x18\xbe\x01 \x01(\rB\x0e\xbaH\x0b*\x06\x18\xff\xff\x03(\x01\xc8\x01\x01R\x08grpcPort\x12\x62\n\x15\x65nabled_deserializers\x18\xc8\x01 \x01(\x0b\x32$.xtcp_config.v1.EnabledDeserializersB\x06\xbaH\x03\xc8\x01\x00R\x14\x65nabledDeserializers\x12\"\n\x08io_uring\x18\xd2\x01 \x01(\x08\x42\x06\xbaH\x03\xc8\x01\x00R\x07ioUring\x12\x46\n\x18io_uring_recv_batch_size\x18\xd3\x01 \x01(\rB\r\xbaH\n*\x05\x18\x80 (\x01\xc8\x01\x00R\x14ioUringRecvBatchSize\x12\x44\n\x17io_uring_cqe_batch_size\x18\xd4\x01 \x01(\rB\r\xbaH\n*\x05\x18\x80 (\x01\xc8\x01\x00R\x13ioUringCqeBatchSize:s\xbaHp\x1an\n\x0fXtcpConfig.poll\x12\x32Poll timeout must be less than poll poll_frequency\x1a\'this.poll_frequency > this.poll_timeout\"\x9f\x01\n\x14\x45nabledDeserializers\x12K\n\x07\x65nabled\x18\x01 \x03(\x0b\x32\x31.xtcp_config.v1.EnabledDeserializers.EnabledEntryR\x07\x65nabled\x1a:\n\x0c\x45nabledEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\x08R\x05value:\x02\x38\x01\x32\xe1\x02\n\rConfigService\x12]\n\x03Get\x12\x1a.xtcp_config.v1.GetRequest\x1a\x1b.xtcp_config.v1.GetResponse\"\x1d\x82\xd3\xe4\x93\x02\x17\x1a\x12/ConfigService/Get:\x01*\x12]\n\x03Set\x12\x1a.xtcp_config.v1.SetRequest\x1a\x1b.xtcp_config.v1.SetResponse\"\x1d\x82\xd3\xe4\x93\x02\x17\x1a\x12/ConfigService/Set:\x01*\x12\x91\x01\n\x10SetPollFrequency\x12\'.xtcp_config.v1.SetPollFrequencyRequest\x1a(.xtcp_config.v1.SetPollFrequencyResponse\"*\x82\xd3\xe4\x93\x02$\x1a\x1f/ConfigService/SetPollFrequency:\x01*B\x8d\x01\n\x12\x63om.xtcp_config.v1B\x0fXtcpConfigProtoP\x01Z\x11./pkg/xtcp_config\xa2\x02\x03XXX\xaa\x02\rXtcpConfig.V1\xca\x02\rXtcpConfig\\V1\xe2\x02\x19XtcpConfig\\V1\\GPBMetadata\xea\x02\x0eXtcpConfig::V1b\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n xtcp_config/v1/xtcp_config.proto\x12\x0extcp_config.v1\x1a\x1egoogle/protobuf/duration.proto\x1a\x1cgoogle/api/annotations.proto\x1a\x1b\x62uf/validate/validate.proto\"\x0c\n\nGetRequest\"A\n\x0bGetResponse\x12\x32\n\x06\x63onfig\x18\x01 \x01(\x0b\x32\x1a.xtcp_config.v1.XtcpConfigR\x06\x63onfig\"@\n\nSetRequest\x12\x32\n\x06\x63onfig\x18\x01 \x01(\x0b\x32\x1a.xtcp_config.v1.XtcpConfigR\x06\x63onfig\"A\n\x0bSetResponse\x12\x32\n\x06\x63onfig\x18\x01 \x01(\x0b\x32\x1a.xtcp_config.v1.XtcpConfigR\x06\x63onfig\"\xb4\x02\n\x17SetPollFrequencyRequest\x12S\n\x0epoll_frequency\x18\x14 \x01(\x0b\x32\x19.google.protobuf.DurationB\x11\xbaH\x0e\xaa\x01\x08\"\x04\x08\x80\xf5$2\x00\xc8\x01\x01R\rpollFrequency\x12O\n\x0cpoll_timeout\x18\x1e \x01(\x0b\x32\x19.google.protobuf.DurationB\x11\xbaH\x0e\xaa\x01\x08\"\x04\x08\x80\xf5$2\x00\xc8\x01\x01R\x0bpollTimeout:s\xbaHp\x1an\n\x0fXtcpConfig.poll\x12\x32Poll timeout must be less than poll poll_frequency\x1a\'this.poll_timeout < this.poll_frequency\"N\n\x18SetPollFrequencyResponse\x12\x32\n\x06\x63onfig\x18\x01 \x01(\x0b\x32\x1a.xtcp_config.v1.XtcpConfigR\x06\x63onfig\"\x92\x13\n\nXtcpConfig\x12\x46\n\x17nl_timeout_milliseconds\x18\n \x01(\x04\x42\x0e\xbaH\x0b\x32\x06\x18\xa0\x8d\x06(\x00\xc8\x01\x01R\x15nlTimeoutMilliseconds\x12S\n\x0epoll_frequency\x18\x14 \x01(\x0b\x32\x19.google.protobuf.DurationB\x11\xbaH\x0e\xaa\x01\x08\"\x04\x08\x80\xf5$*\x00\xc8\x01\x01R\rpollFrequency\x12O\n\x0cpoll_timeout\x18\x1e \x01(\x0b\x32\x19.google.protobuf.DurationB\x11\xbaH\x0e\xaa\x01\x08\"\x04\x08\x80\xf5$*\x00\xc8\x01\x01R\x0bpollTimeout\x12+\n\tmax_loops\x18( \x01(\x04\x42\x0e\xbaH\x0b\x32\x06\x18\xa0\x8d\x06(\x00\xc8\x01\x00R\x08maxLoops\x12,\n\nnetlinkers\x18\x32 \x01(\rB\x0c\xbaH\t*\x04\x18\x64(\x01\xc8\x01\x01R\nnetlinkers\x12H\n\x19netlinkers_done_chan_size\x18\x33 \x01(\rB\r\xbaH\n*\x05\x18\xe8\x07(\x01\xc8\x01\x01R\x16netlinkersDoneChanSize\x12*\n\tnlmsg_seq\x18< \x01(\rB\r\xbaH\n*\x05\x18\x90N(\x00\xc8\x01\x01R\x08nlmsgSeq\x12/\n\x0bpacket_size\x18\x46 \x01(\x04\x42\x0e\xbaH\x0b\x32\x06\x18\xc0\x84=(\x00\xc8\x01\x00R\npacketSize\x12\x36\n\x10packet_size_mply\x18P \x01(\rB\x0c\xbaH\t*\x04\x18\x64(\x00\xc8\x01\x00R\x0epacketSizeMply\x12.\n\x0bwrite_files\x18Z \x01(\rB\r\xbaH\n*\x05\x18\xe8\x07(\x00\xc8\x01\x00R\nwriteFiles\x12/\n\x0c\x63\x61pture_path\x18\x64 \x01(\tB\x0c\xbaH\tr\x04\x10\x01\x18P\xc8\x01\x00R\x0b\x63\x61pturePath\x12(\n\x07modulus\x18n \x01(\x04\x42\x0e\xbaH\x0b\x32\x06\x18\xc0\x84=(\x01\xc8\x01\x01R\x07modulus\x12+\n\nmarshal_to\x18x \x01(\tB\x0c\xbaH\tr\x04\x10\x03\x18(\xc8\x01\x01R\tmarshalTo\x12K\n\x1e\x65nvelope_flush_threshold_bytes\x18z \x01(\rB\x06\xbaH\x03\xc8\x01\x00R\x1b\x65nvelopeFlushThresholdBytes\x12I\n\x1d\x65nvelope_flush_threshold_rows\x18{ \x01(\rB\x06\xbaH\x03\xc8\x01\x00R\x1a\x65nvelopeFlushThresholdRows\x12\x33\n\x11kafka_compression\x18| \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\x10kafkaCompression\x12\'\n\x0bs3_endpoint\x18} \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\ns3Endpoint\x12#\n\ts3_bucket\x18~ \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\x08s3Bucket\x12#\n\ts3_prefix\x18\x7f \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\x08s3Prefix\x12+\n\rs3_access_key\x18\x80\x01 \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\x0bs3AccessKey\x12+\n\rs3_secret_key\x18\x81\x01 \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\x0bs3SecretKey\x12O\n s3_parquet_flush_threshold_bytes\x18\x84\x01 \x01(\rB\x06\xbaH\x03\xc8\x01\x00R\x1cs3ParquetFlushThresholdBytes\x12$\n\ts3_region\x18\x85\x01 \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\x08s3Region\x12,\n\rpyroscope_url\x18\x88\x01 \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\x0cpyroscopeUrl\x12\x35\n\x12pyroscope_app_name\x18\x89\x01 \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\x10pyroscopeAppName\x12\x37\n\x13pyroscope_sample_hz\x18\x8a\x01 \x01(\rB\x06\xbaH\x03\xc8\x01\x00R\x11pyroscopeSampleHz\x12J\n\x1dpyroscope_upload_interval_sec\x18\x8b\x01 \x01(\rB\x06\xbaH\x03\xc8\x01\x00R\x1apyroscopeUploadIntervalSec\x12\"\n\x04\x64\x65st\x18\x82\x01 \x01(\tB\r\xbaH\nr\x05\x10\x04\x18\x80\x01\xc8\x01\x01R\x04\x64\x65st\x12\x38\n\x10\x64\x65st_write_files\x18\x87\x01 \x01(\rB\r\xbaH\n*\x05\x18\xe8\x07(\x00\xc8\x01\x00R\x0e\x64\x65stWriteFiles\x12#\n\x05topic\x18\x8c\x01 \x01(\tB\x0c\xbaH\tr\x04\x10\x01\x18(\xc8\x01\x00R\x05topic\x12\x35\n\x0fxtcp_proto_file\x18\x8f\x01 \x01(\tB\x0c\xbaH\tr\x04\x10\x01\x18P\xc8\x01\x00R\rxtcpProtoFile\x12\x37\n\x10kafka_schema_url\x18\x91\x01 \x01(\tB\x0c\xbaH\tr\x04\x10\x01\x18<\xc8\x01\x00R\x0ekafkaSchemaUrl\x12`\n\x15kafka_produce_timeout\x18\x96\x01 \x01(\x0b\x32\x19.google.protobuf.DurationB\x10\xbaH\r\xaa\x01\x07\"\x03\x08\xd8\x04\x32\x00\xc8\x01\x00R\x13kafkaProduceTimeout\x12/\n\x0b\x64\x65\x62ug_level\x18\xa0\x01 \x01(\rB\r\xbaH\n*\x05\x18\xe8\x07(\x00\xc8\x01\x01R\ndebugLevel\x12!\n\x05label\x18\xaa\x01 \x01(\tB\n\xbaH\x07r\x02\x18(\xc8\x01\x00R\x05label\x12\x1d\n\x03tag\x18\xb4\x01 \x01(\tB\n\xbaH\x07r\x02\x18(\xc8\x01\x00R\x03tag\x12,\n\tgrpc_port\x18\xbe\x01 \x01(\rB\x0e\xbaH\x0b*\x06\x18\xff\xff\x03(\x01\xc8\x01\x01R\x08grpcPort\x12\x62\n\x15\x65nabled_deserializers\x18\xc8\x01 \x01(\x0b\x32$.xtcp_config.v1.EnabledDeserializersB\x06\xbaH\x03\xc8\x01\x00R\x14\x65nabledDeserializers\x12\"\n\x08io_uring\x18\xd2\x01 \x01(\x08\x42\x06\xbaH\x03\xc8\x01\x00R\x07ioUring\x12\x46\n\x18io_uring_recv_batch_size\x18\xd3\x01 \x01(\rB\r\xbaH\n*\x05\x18\x80 (\x01\xc8\x01\x00R\x14ioUringRecvBatchSize\x12\x44\n\x17io_uring_cqe_batch_size\x18\xd4\x01 \x01(\rB\r\xbaH\n*\x05\x18\x80 (\x01\xc8\x01\x00R\x13ioUringCqeBatchSize\x12(\n\x0b\x63sv_columns\x18\xdc\x01 \x01(\tB\x06\xbaH\x03\xc8\x01\x00R\ncsvColumns:s\xbaHp\x1an\n\x0fXtcpConfig.poll\x12\x32Poll timeout must be less than poll poll_frequency\x1a\'this.poll_frequency > this.poll_timeout\"\x9f\x01\n\x14\x45nabledDeserializers\x12K\n\x07\x65nabled\x18\x01 \x03(\x0b\x32\x31.xtcp_config.v1.EnabledDeserializers.EnabledEntryR\x07\x65nabled\x1a:\n\x0c\x45nabledEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\x08R\x05value:\x02\x38\x01\x32\xe1\x02\n\rConfigService\x12]\n\x03Get\x12\x1a.xtcp_config.v1.GetRequest\x1a\x1b.xtcp_config.v1.GetResponse\"\x1d\x82\xd3\xe4\x93\x02\x17\x1a\x12/ConfigService/Get:\x01*\x12]\n\x03Set\x12\x1a.xtcp_config.v1.SetRequest\x1a\x1b.xtcp_config.v1.SetResponse\"\x1d\x82\xd3\xe4\x93\x02\x17\x1a\x12/ConfigService/Set:\x01*\x12\x91\x01\n\x10SetPollFrequency\x12\'.xtcp_config.v1.SetPollFrequencyRequest\x1a(.xtcp_config.v1.SetPollFrequencyResponse\"*\x82\xd3\xe4\x93\x02$\x1a\x1f/ConfigService/SetPollFrequency:\x01*B\x8d\x01\n\x12\x63om.xtcp_config.v1B\x0fXtcpConfigProtoP\x01Z\x11./pkg/xtcp_config\xa2\x02\x03XXX\xaa\x02\rXtcpConfig.V1\xca\x02\rXtcpConfig\\V1\xe2\x02\x19XtcpConfig\\V1\\GPBMetadata\xea\x02\x0eXtcpConfig::V1b\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -66,7 +66,7 @@ _globals['_XTCPCONFIG'].fields_by_name['modulus']._loaded_options = None _globals['_XTCPCONFIG'].fields_by_name['modulus']._serialized_options = b'\272H\0132\006\030\300\204=(\001\310\001\001' _globals['_XTCPCONFIG'].fields_by_name['marshal_to']._loaded_options = None - _globals['_XTCPCONFIG'].fields_by_name['marshal_to']._serialized_options = b'\272H\tr\004\020\004\030(\310\001\001' + _globals['_XTCPCONFIG'].fields_by_name['marshal_to']._serialized_options = b'\272H\tr\004\020\003\030(\310\001\001' _globals['_XTCPCONFIG'].fields_by_name['envelope_flush_threshold_bytes']._loaded_options = None _globals['_XTCPCONFIG'].fields_by_name['envelope_flush_threshold_bytes']._serialized_options = b'\272H\003\310\001\000' _globals['_XTCPCONFIG'].fields_by_name['envelope_flush_threshold_rows']._loaded_options = None @@ -123,6 +123,8 @@ _globals['_XTCPCONFIG'].fields_by_name['io_uring_recv_batch_size']._serialized_options = b'\272H\n*\005\030\200 (\001\310\001\000' _globals['_XTCPCONFIG'].fields_by_name['io_uring_cqe_batch_size']._loaded_options = None _globals['_XTCPCONFIG'].fields_by_name['io_uring_cqe_batch_size']._serialized_options = b'\272H\n*\005\030\200 (\001\310\001\000' + _globals['_XTCPCONFIG'].fields_by_name['csv_columns']._loaded_options = None + _globals['_XTCPCONFIG'].fields_by_name['csv_columns']._serialized_options = b'\272H\003\310\001\000' _globals['_XTCPCONFIG']._loaded_options = None _globals['_XTCPCONFIG']._serialized_options = b'\272Hp\032n\n\017XtcpConfig.poll\0222Poll timeout must be less than poll poll_frequency\032\'this.poll_frequency > this.poll_timeout' _globals['_ENABLEDDESERIALIZERS_ENABLEDENTRY']._loaded_options = None @@ -146,11 +148,11 @@ _globals['_SETPOLLFREQUENCYRESPONSE']._serialized_start=668 _globals['_SETPOLLFREQUENCYRESPONSE']._serialized_end=746 _globals['_XTCPCONFIG']._serialized_start=749 - _globals['_XTCPCONFIG']._serialized_end=3157 - _globals['_ENABLEDDESERIALIZERS']._serialized_start=3160 - _globals['_ENABLEDDESERIALIZERS']._serialized_end=3319 - _globals['_ENABLEDDESERIALIZERS_ENABLEDENTRY']._serialized_start=3261 - _globals['_ENABLEDDESERIALIZERS_ENABLEDENTRY']._serialized_end=3319 - _globals['_CONFIGSERVICE']._serialized_start=3322 - _globals['_CONFIGSERVICE']._serialized_end=3675 + _globals['_XTCPCONFIG']._serialized_end=3199 + _globals['_ENABLEDDESERIALIZERS']._serialized_start=3202 + _globals['_ENABLEDDESERIALIZERS']._serialized_end=3361 + _globals['_ENABLEDDESERIALIZERS_ENABLEDENTRY']._serialized_start=3303 + _globals['_ENABLEDDESERIALIZERS_ENABLEDENTRY']._serialized_end=3361 + _globals['_CONFIGSERVICE']._serialized_start=3364 + _globals['_CONFIGSERVICE']._serialized_end=3717 # @@protoc_insertion_point(module_scope) diff --git a/python/xtcp_config/v1/xtcp_config_pb2.pyi b/python/xtcp_config/v1/xtcp_config_pb2.pyi index bc96261..ef1d9c7 100644 --- a/python/xtcp_config/v1/xtcp_config_pb2.pyi +++ b/python/xtcp_config/v1/xtcp_config_pb2.pyi @@ -46,7 +46,7 @@ class SetPollFrequencyResponse(_message.Message): def __init__(self, config: _Optional[_Union[XtcpConfig, _Mapping]] = ...) -> None: ... class XtcpConfig(_message.Message): - __slots__ = ("nl_timeout_milliseconds", "poll_frequency", "poll_timeout", "max_loops", "netlinkers", "netlinkers_done_chan_size", "nlmsg_seq", "packet_size", "packet_size_mply", "write_files", "capture_path", "modulus", "marshal_to", "envelope_flush_threshold_bytes", "envelope_flush_threshold_rows", "kafka_compression", "s3_endpoint", "s3_bucket", "s3_prefix", "s3_access_key", "s3_secret_key", "s3_parquet_flush_threshold_bytes", "s3_region", "pyroscope_url", "pyroscope_app_name", "pyroscope_sample_hz", "pyroscope_upload_interval_sec", "dest", "dest_write_files", "topic", "xtcp_proto_file", "kafka_schema_url", "kafka_produce_timeout", "debug_level", "label", "tag", "grpc_port", "enabled_deserializers", "io_uring", "io_uring_recv_batch_size", "io_uring_cqe_batch_size") + __slots__ = ("nl_timeout_milliseconds", "poll_frequency", "poll_timeout", "max_loops", "netlinkers", "netlinkers_done_chan_size", "nlmsg_seq", "packet_size", "packet_size_mply", "write_files", "capture_path", "modulus", "marshal_to", "envelope_flush_threshold_bytes", "envelope_flush_threshold_rows", "kafka_compression", "s3_endpoint", "s3_bucket", "s3_prefix", "s3_access_key", "s3_secret_key", "s3_parquet_flush_threshold_bytes", "s3_region", "pyroscope_url", "pyroscope_app_name", "pyroscope_sample_hz", "pyroscope_upload_interval_sec", "dest", "dest_write_files", "topic", "xtcp_proto_file", "kafka_schema_url", "kafka_produce_timeout", "debug_level", "label", "tag", "grpc_port", "enabled_deserializers", "io_uring", "io_uring_recv_batch_size", "io_uring_cqe_batch_size", "csv_columns") NL_TIMEOUT_MILLISECONDS_FIELD_NUMBER: _ClassVar[int] POLL_FREQUENCY_FIELD_NUMBER: _ClassVar[int] POLL_TIMEOUT_FIELD_NUMBER: _ClassVar[int] @@ -88,6 +88,7 @@ class XtcpConfig(_message.Message): IO_URING_FIELD_NUMBER: _ClassVar[int] IO_URING_RECV_BATCH_SIZE_FIELD_NUMBER: _ClassVar[int] IO_URING_CQE_BATCH_SIZE_FIELD_NUMBER: _ClassVar[int] + CSV_COLUMNS_FIELD_NUMBER: _ClassVar[int] nl_timeout_milliseconds: int poll_frequency: _duration_pb2.Duration poll_timeout: _duration_pb2.Duration @@ -129,7 +130,8 @@ class XtcpConfig(_message.Message): io_uring: bool io_uring_recv_batch_size: int io_uring_cqe_batch_size: int - def __init__(self, nl_timeout_milliseconds: _Optional[int] = ..., poll_frequency: _Optional[_Union[_duration_pb2.Duration, _Mapping]] = ..., poll_timeout: _Optional[_Union[_duration_pb2.Duration, _Mapping]] = ..., max_loops: _Optional[int] = ..., netlinkers: _Optional[int] = ..., netlinkers_done_chan_size: _Optional[int] = ..., nlmsg_seq: _Optional[int] = ..., packet_size: _Optional[int] = ..., packet_size_mply: _Optional[int] = ..., write_files: _Optional[int] = ..., capture_path: _Optional[str] = ..., modulus: _Optional[int] = ..., marshal_to: _Optional[str] = ..., envelope_flush_threshold_bytes: _Optional[int] = ..., envelope_flush_threshold_rows: _Optional[int] = ..., kafka_compression: _Optional[str] = ..., s3_endpoint: _Optional[str] = ..., s3_bucket: _Optional[str] = ..., s3_prefix: _Optional[str] = ..., s3_access_key: _Optional[str] = ..., s3_secret_key: _Optional[str] = ..., s3_parquet_flush_threshold_bytes: _Optional[int] = ..., s3_region: _Optional[str] = ..., pyroscope_url: _Optional[str] = ..., pyroscope_app_name: _Optional[str] = ..., pyroscope_sample_hz: _Optional[int] = ..., pyroscope_upload_interval_sec: _Optional[int] = ..., dest: _Optional[str] = ..., dest_write_files: _Optional[int] = ..., topic: _Optional[str] = ..., xtcp_proto_file: _Optional[str] = ..., kafka_schema_url: _Optional[str] = ..., kafka_produce_timeout: _Optional[_Union[_duration_pb2.Duration, _Mapping]] = ..., debug_level: _Optional[int] = ..., label: _Optional[str] = ..., tag: _Optional[str] = ..., grpc_port: _Optional[int] = ..., enabled_deserializers: _Optional[_Union[EnabledDeserializers, _Mapping]] = ..., io_uring: bool = ..., io_uring_recv_batch_size: _Optional[int] = ..., io_uring_cqe_batch_size: _Optional[int] = ...) -> None: ... + csv_columns: str + def __init__(self, nl_timeout_milliseconds: _Optional[int] = ..., poll_frequency: _Optional[_Union[_duration_pb2.Duration, _Mapping]] = ..., poll_timeout: _Optional[_Union[_duration_pb2.Duration, _Mapping]] = ..., max_loops: _Optional[int] = ..., netlinkers: _Optional[int] = ..., netlinkers_done_chan_size: _Optional[int] = ..., nlmsg_seq: _Optional[int] = ..., packet_size: _Optional[int] = ..., packet_size_mply: _Optional[int] = ..., write_files: _Optional[int] = ..., capture_path: _Optional[str] = ..., modulus: _Optional[int] = ..., marshal_to: _Optional[str] = ..., envelope_flush_threshold_bytes: _Optional[int] = ..., envelope_flush_threshold_rows: _Optional[int] = ..., kafka_compression: _Optional[str] = ..., s3_endpoint: _Optional[str] = ..., s3_bucket: _Optional[str] = ..., s3_prefix: _Optional[str] = ..., s3_access_key: _Optional[str] = ..., s3_secret_key: _Optional[str] = ..., s3_parquet_flush_threshold_bytes: _Optional[int] = ..., s3_region: _Optional[str] = ..., pyroscope_url: _Optional[str] = ..., pyroscope_app_name: _Optional[str] = ..., pyroscope_sample_hz: _Optional[int] = ..., pyroscope_upload_interval_sec: _Optional[int] = ..., dest: _Optional[str] = ..., dest_write_files: _Optional[int] = ..., topic: _Optional[str] = ..., xtcp_proto_file: _Optional[str] = ..., kafka_schema_url: _Optional[str] = ..., kafka_produce_timeout: _Optional[_Union[_duration_pb2.Duration, _Mapping]] = ..., debug_level: _Optional[int] = ..., label: _Optional[str] = ..., tag: _Optional[str] = ..., grpc_port: _Optional[int] = ..., enabled_deserializers: _Optional[_Union[EnabledDeserializers, _Mapping]] = ..., io_uring: bool = ..., io_uring_recv_batch_size: _Optional[int] = ..., io_uring_cqe_batch_size: _Optional[int] = ..., csv_columns: _Optional[str] = ...) -> None: ... class EnabledDeserializers(_message.Message): __slots__ = ("enabled",) diff --git a/xtcp_config/v1/xtcp_config.swagger.json b/xtcp_config/v1/xtcp_config.swagger.json index 539f87c..f83cda1 100644 --- a/xtcp_config/v1/xtcp_config.swagger.json +++ b/xtcp_config/v1/xtcp_config.swagger.json @@ -386,6 +386,10 @@ "type": "integer", "format": "int64", "description": "Maximum CQEs reaped per PeekBatchCQE call. Larger batches amortise\nuserland loop overhead but increase scheduling latency for the\nnetlinker goroutine. Ignored unless io_uring=true. Default 128." + }, + "csvColumns": { + "type": "string", + "description": "Comma-separated subset of XtcpFlatRecord json field names selecting\nwhich columns the csv/tsv marshallers emit (e.g.\n\"hostname,inetDiagMsgSocketSourcePort,inetDiagMsgState,tcpInfoRtt\").\nEmpty = all fields. Ignored by non-tabular marshallers." } }, "title": "xtcp configuration"