Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cmd/xtcp2/xtcp2.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ type mainFlags struct {
capturePath *string
modulus *uint64
marshal *string
columns *string
envelopeFlushBytes *uint
envelopeFlushRows *uint
kafkaCompression *string
Expand Down Expand Up @@ -230,7 +231,8 @@ func defineFlags() *mainFlags {
f.writeFiles = flag.Uint("writeFiles", WriteFilesCst, "Write netlink packets to writeFiles number of files ( to generate test data ) per netlinker")
f.capturePath = flag.String("capturePath", capturePathCst, "Write files path")
f.modulus = flag.Uint64("modulus", modulusCst, "modulus. Report every X inetd messages to output")
f.marshal = flag.String("marshal", marshalCst, "Marshaling of the exported data (protobufList, protoJson, protoText, msgpack)")
f.marshal = flag.String("marshal", marshalCst, "Marshaling of the exported data (protobufList, protoJson, protoText, msgpack, jsonl, csv, tsv)")
f.columns = flag.String("columns", "", "csv/tsv only: comma-separated subset of XtcpFlatRecord json field names (e.g. hostname,inetDiagMsgSocketSourcePort,inetDiagMsgState,tcpInfoRtt); empty = all")
f.envelopeFlushBytes = flag.Uint("envelopeFlushBytes", envelopeFlushBytesCst, "Safety-net cap on the in-flight protobufList Envelope's UNCOMPRESSED proto size in bytes (franz-go compresses post-flush, so wire size is typically 3-8x smaller). 0 = use daemon default (768 KiB). Whichever cap (bytes/rows) trips first wins.")
f.envelopeFlushRows = flag.Uint("envelopeFlushRows", envelopeFlushRowsCst, "Primary cap on the in-flight protobufList Envelope's row count. 0 = use daemon default (10000). Cheap, predictable; pairs with -envelopeFlushBytes as a safety net.")
f.kafkaCompression = flag.String("kafkaCompression", kafkaCompressionCst, "Kafka producer compression codec. '' or 'auto' = preference list [zstd,lz4,snappy,none] negotiated with broker; or pin one of: zstd, lz4, snappy, gzip, none. All codecs are decodable by Redpanda + ClickHouse's Kafka engine.")
Expand All @@ -241,7 +243,7 @@ func defineFlags() *mainFlags {
f.s3SecretKey = flag.String("s3SecretKey", s3SecretKeyCst, "s3parquet: S3 secret key. Falls back to S3_SECRET_KEY env. Never logged.")
f.s3Region = flag.String("s3Region", s3RegionCst, "s3parquet: S3 region. Defaults to 'us-east-1' when empty; required by AWS, ignored by most MinIO setups.")
f.s3ParquetFlushBytes = flag.Uint("s3ParquetFlushBytes", s3ParquetFlushThresholdBytesCst, "s3parquet: soft cap on the in-memory Parquet builder's uncompressed row bytes before finalize+upload. 0 = daemon default (63 MiB).")
f.dest = flag.String("dest", destCst, "kafka:127.0.0.1:9092, udp:127.0.0.1:13000, nsq:127.0.0.1:4150, null, or stdout (pair stdout with -marshal protoJson)")
f.dest = flag.String("dest", destCst, "scheme:addr — kafka:host:9092, nats:..., nsq:..., valkey:..., udp:host:13000, tcp:host:9000, unix:/path, unixgram:/path, file:/path, http(s)://host/ingest, s3parquet:..., stdout, stderr, null (pair stdout/file/tcp with -marshal jsonl|csv|tsv)")
f.destWriteFiles = flag.Uint("destWriteFiles", DestWriteFilesCst, "Write out the marshaled data to destWriteFiles number of files ( for debugging only )")
f.topic = flag.String("topic", topicCst, "Kafka or NSQ topic")
f.xtcpProtoFile = flag.String("xtcpProtoFile", xtcpProtoFileCst, "xtcpProtoFile for registering with the schema registry")
Expand Down Expand Up @@ -292,6 +294,7 @@ func printFlags(f *mainFlags) {
fmt.Println("*capturePath:", *f.capturePath)
fmt.Println("*modulus:", *f.modulus)
fmt.Println("*marshal:", *f.marshal)
fmt.Println("*columns:", *f.columns)
fmt.Println("*envelopeFlushBytes:", *f.envelopeFlushBytes)
fmt.Println("*envelopeFlushRows:", *f.envelopeFlushRows)
fmt.Println("*kafkaCompression:", *f.kafkaCompression)
Expand Down Expand Up @@ -333,6 +336,7 @@ func buildConfig(f *mainFlags, des *xtcp_config.EnabledDeserializers) *xtcp_conf
CapturePath: *f.capturePath,
Modulus: *f.modulus,
MarshalTo: *f.marshal,
CsvColumns: *f.columns,
EnvelopeFlushThresholdBytes: uint32(*f.envelopeFlushBytes),
EnvelopeFlushThresholdRows: uint32(*f.envelopeFlushRows),
KafkaCompression: *f.kafkaCompression,
Expand Down
4 changes: 4 additions & 0 deletions cmd/xtcp2/xtcp2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@ func TestPrintFlags(t *testing.T) {
f.capturePath = &s
f.modulus = &n64
f.marshal = &s
f.columns = &s
f.envelopeFlushBytes = &n
f.envelopeFlushRows = &n
f.kafkaCompression = &s
Expand Down Expand Up @@ -685,6 +686,7 @@ func TestBuildConfig(t *testing.T) {
cp := "/tmp/cap/"
mod := uint64(13)
mar := "protoText"
cols := "hostname,tcpInfoRtt"
dst := "udp:127.0.0.1:13000"
dwf := uint(4)
topic := "topic1"
Expand All @@ -709,6 +711,7 @@ func TestBuildConfig(t *testing.T) {
nltimeout: &nl, pollFrequency: &pf, pollTimeout: &pt, maxLoops: &ml,
netlinkers: &nlk, nlmsgSeq: &seq, packetSize: &psz, packetSizeMply: &psm,
writeFiles: &wf, capturePath: &cp, modulus: &mod, marshal: &mar,
columns: &cols,
envelopeFlushBytes: &wf, envelopeFlushRows: &wf,
kafkaCompression: &mar,
s3Endpoint: &mar,
Expand Down Expand Up @@ -746,6 +749,7 @@ func TestBuildConfig(t *testing.T) {
{"CapturePath", c.CapturePath, "/tmp/cap/"},
{"Modulus", c.Modulus, uint64(13)},
{"MarshalTo", c.MarshalTo, "protoText"},
{"CsvColumns", c.CsvColumns, "hostname,tcpInfoRtt"},
{"Dest", c.Dest, "udp:127.0.0.1:13000"},
{"DestWriteFiles", c.DestWriteFiles, uint32(4)},
{"Topic", c.Topic, "topic1"},
Expand Down
18 changes: 18 additions & 0 deletions dart/xtcp_config/v1/xtcp_config.pb.dart

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions dart/xtcp_config/v1/xtcp_config.pbjson.dart

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading