From d87a4417bf7e1744a0ca71e1e58b4975e6c6baf1 Mon Sep 17 00:00:00 2001 From: Divyanshu Tiwari Date: Wed, 11 Feb 2026 23:08:27 +0530 Subject: [PATCH 1/4] add buffer pools, reduce allocations, remove references for GC - record marshalling in SendData allocates a lot of space, using a buffer pool to reuse buffers can reduce this overhead - avoiding []byte -> string -> []byte allocations - sending record clone to branch tasks to avoid GC references to original record - explicitly clear references in task buffers after use to help GC --- internal/pkg/pipeline/pipeline.go | 11 +++++- internal/pkg/pipeline/record/record.go | 28 ++++++++++++++ internal/pkg/pipeline/task/converter/sst.go | 17 +++++---- internal/pkg/pipeline/task/join/join.go | 14 ++++--- internal/pkg/pipeline/task/replace/replace.go | 4 +- internal/pkg/pipeline/task/split/split.go | 13 +++++-- internal/pkg/pipeline/task/task.go | 37 ++++++++++++++++--- 7 files changed, 99 insertions(+), 25 deletions(-) diff --git a/internal/pkg/pipeline/pipeline.go b/internal/pkg/pipeline/pipeline.go index 4665c06..5bce901 100644 --- a/internal/pkg/pipeline/pipeline.go +++ b/internal/pkg/pipeline/pipeline.go @@ -196,9 +196,16 @@ func (p *Pipeline) distributeToChannels(input <-chan *record.Record, outputs []c }() for rec := range input { - for _, ch := range outputs { + for i, ch := range outputs { if ch != nil { - ch <- rec + if i == 0 { + // First branch gets the original record + ch <- rec + } else { + // Other branches get cloned records to prevent shared references + // and allow independent GC of records in each branch + ch <- rec.Clone() + } } } } diff --git a/internal/pkg/pipeline/record/record.go b/internal/pkg/pipeline/record/record.go index 0f406bb..21241af 100644 --- a/internal/pkg/pipeline/record/record.go +++ b/internal/pkg/pipeline/record/record.go @@ -2,6 +2,7 @@ package record import ( "encoding/json" + "maps" ) type Record struct { @@ -28,3 +29,30 @@ func (r *Record) Bytes() []byte { return data } + +// Clone creates a deep copy of the record to prevent shared references +// across parallel pipeline branches. Data is copied, Meta is cloned. +func (r *Record) Clone() *Record { + if r == nil { + return nil + } + + newRec := &Record{ + ID: r.ID, + Origin: r.Origin, + } + + // Deep copy Data + if r.Data != nil { + newRec.Data = make([]byte, len(r.Data)) + copy(newRec.Data, r.Data) + } + + // Deep copy Meta + if r.Meta != nil { + newRec.Meta = make(map[string]string, len(r.Meta)) + maps.Copy(newRec.Meta, r.Meta) + } + + return newRec +} diff --git a/internal/pkg/pipeline/task/converter/sst.go b/internal/pkg/pipeline/task/converter/sst.go index 5cc2af4..c24ddaf 100644 --- a/internal/pkg/pipeline/task/converter/sst.go +++ b/internal/pkg/pipeline/task/converter/sst.go @@ -1,10 +1,10 @@ package converter import ( + "bytes" "errors" "os" "sort" - "strings" "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/objstorage/objstorageprovider" @@ -19,17 +19,20 @@ const ( ) func (c *sst) convert(data []byte, d string) ([]converterOutput, error) { - lines := strings.Split(string(data), "\n") + delimBytes := []byte(d) + newline := []byte("\n") + lines := bytes.Split(data, newline) values := map[string]string{} for _, line := range lines { - if line == "" { + if len(line) == 0 { continue } - kv := strings.SplitN(line, d, 2) + kv := bytes.SplitN(line, delimBytes, 2) if len(kv) != 2 { - return nil, errors.New("invalid input for sst converter: expected 'key" + d + "value' format (got: " + line + ")") + return nil, errors.New("invalid input for sst converter: expected 'key" + d + "value' format (got: " + string(line) + ")") } - values[kv[0]] = kv[1] + + values[string(kv[0])] = string(kv[1]) } fileName, err := c.createSST(values) @@ -37,7 +40,7 @@ func (c *sst) convert(data []byte, d string) ([]converterOutput, error) { return nil, err } defer os.Remove(fileName) - + sstData, err := os.ReadFile(fileName) if err != nil { return nil, err diff --git a/internal/pkg/pipeline/task/join/join.go b/internal/pkg/pipeline/task/join/join.go index 994f2e2..a55469b 100644 --- a/internal/pkg/pipeline/task/join/join.go +++ b/internal/pkg/pipeline/task/join/join.go @@ -1,9 +1,9 @@ package join import ( + "bytes" "context" "fmt" - "strings" "time" "github.com/patterninc/caterpillar/internal/pkg/duration" @@ -88,21 +88,25 @@ func (j *join) Run(ctx context.Context, input <-chan *record.Record, output chan func (j *join) flushBuffer(output chan<- *record.Record) { if len(j.buffer) > 0 { j.sendJoinedRecords(output) + // clear the buffer, explicitly nil out the elements to help GC + for i := range j.buffer { + j.buffer[i] = nil + } j.buffer = j.buffer[:0] } } func (j *join) sendJoinedRecords(output chan<- *record.Record) { - // Join all data with the specified delimiter - var joinedData strings.Builder + var joinedData bytes.Buffer + delimBytes := []byte(j.Delimiter) for i, r := range j.buffer { if i > 0 { - joinedData.WriteString(j.Delimiter) + joinedData.Write(delimBytes) } joinedData.Write(r.Data) } - j.SendData(nil, []byte(joinedData.String()), output) + j.SendData(nil, joinedData.Bytes(), output) } diff --git a/internal/pkg/pipeline/task/replace/replace.go b/internal/pkg/pipeline/task/replace/replace.go index bd2dabb..5587a57 100644 --- a/internal/pkg/pipeline/task/replace/replace.go +++ b/internal/pkg/pipeline/task/replace/replace.go @@ -25,13 +25,15 @@ func (r *replace) Run(ctx context.Context, input <-chan *record.Record, output c return err } + replacementBytes := []byte(r.Replacement) + if output != nil { for { record, ok := r.GetRecord(input) if !ok { break } - r.SendData(record.Meta, []byte(rx.ReplaceAllString(string(record.Data), r.Replacement)), output) + r.SendData(record.Meta, rx.ReplaceAll(record.Data, replacementBytes), output) } } diff --git a/internal/pkg/pipeline/task/split/split.go b/internal/pkg/pipeline/task/split/split.go index d29dd87..bd69d71 100644 --- a/internal/pkg/pipeline/task/split/split.go +++ b/internal/pkg/pipeline/task/split/split.go @@ -1,8 +1,8 @@ package split import ( + "bytes" "context" - "strings" "github.com/patterninc/caterpillar/internal/pkg/pipeline/record" "github.com/patterninc/caterpillar/internal/pkg/pipeline/task" @@ -22,16 +22,21 @@ func New() (task.Task, error) { Delimiter: defaultDelimiter, }, nil } + func (s *split) Run(ctx context.Context, input <-chan *record.Record, output chan<- *record.Record) error { + delimBytes := []byte(s.Delimiter) + for { r, ok := s.GetRecord(input) if !ok { break } - lines := strings.Split(strings.TrimSuffix(string(r.Data), s.Delimiter), s.Delimiter) - for _, line := range lines { - s.SendData(r.Meta, []byte(line), output) + + data := bytes.TrimSuffix(r.Data, delimBytes) + lines := bytes.SplitSeq(data, delimBytes) + for line := range lines { + s.SendData(r.Meta, line, output) } } diff --git a/internal/pkg/pipeline/task/task.go b/internal/pkg/pipeline/task/task.go index 781ef07..d6e5f12 100644 --- a/internal/pkg/pipeline/task/task.go +++ b/internal/pkg/pipeline/task/task.go @@ -1,6 +1,7 @@ package task import ( + "bytes" "context" "encoding/json" "fmt" @@ -25,6 +26,12 @@ var ( ErrPresentInputOutput = fmt.Errorf(`either input or output must be set, not both`) ) +var byteBufferPool = sync.Pool{ + New: func() any { + return new(bytes.Buffer) + }, +} + type Task interface { Run(ctx context.Context, input <-chan *record.Record, output chan<- *record.Record) error GetName() string @@ -119,13 +126,26 @@ func (b *Base) SendRecord(r *record.Record, output chan<- *record.Record) /* we output <- r }() + // get a buffer from the pool + buf := byteBufferPool.Get().(*bytes.Buffer) + defer byteBufferPool.Put(buf) + buf.Reset() + // before we set context, let's serialize the whole record - data, err := json.Marshal(r) - if err != nil { - // TODO: do prom metrics / log event to syslog + if err := json.NewEncoder(buf).Encode(r); err != nil { fmt.Println(`ERROR (marshal):`, err) return } + + data := buf.Bytes() + + // get another buffer for the query results + ctxBuf := byteBufferPool.Get().(*bytes.Buffer) + defer byteBufferPool.Put(ctxBuf) + ctxBuf.Reset() + + ctxEncoder := json.NewEncoder(ctxBuf) + // Set the context values for the record for name, query := range b.Context { queryResult, err := query.Execute(data) @@ -135,13 +155,18 @@ func (b *Base) SendRecord(r *record.Record, output chan<- *record.Record) /* we return } // now, let's marshal it to json and set in the context... - contextValueJson, err := json.Marshal(queryResult) - if err != nil { + if err := ctxEncoder.Encode(queryResult); err != nil { // TODO: do prom metrics / log event to syslog fmt.Println(`ERROR (result):`, err) return } - r.SetMetaValue(name, string(contextValueJson)) + // Remove trailing newline added by json.Encoder.Encode() + value := ctxBuf.String() + if len(value) > 0 && value[len(value)-1] == '\n' { + value = value[:len(value)-1] + } + r.SetMetaValue(name, value) + ctxBuf.Reset() } } From 1f417eb79058ed17227afcf59d4b05d3742d7915 Mon Sep 17 00:00:00 2001 From: Divyanshu Tiwari Date: Wed, 11 Feb 2026 23:17:43 +0530 Subject: [PATCH 2/4] better comment --- internal/pkg/pipeline/record/record.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/internal/pkg/pipeline/record/record.go b/internal/pkg/pipeline/record/record.go index 21241af..525aa7d 100644 --- a/internal/pkg/pipeline/record/record.go +++ b/internal/pkg/pipeline/record/record.go @@ -30,8 +30,7 @@ func (r *Record) Bytes() []byte { } -// Clone creates a deep copy of the record to prevent shared references -// across parallel pipeline branches. Data is copied, Meta is cloned. +// Clone creates a deep copy of the record to prevent shared references across parallel pipeline branches. func (r *Record) Clone() *Record { if r == nil { return nil @@ -42,13 +41,11 @@ func (r *Record) Clone() *Record { Origin: r.Origin, } - // Deep copy Data if r.Data != nil { newRec.Data = make([]byte, len(r.Data)) copy(newRec.Data, r.Data) } - // Deep copy Meta if r.Meta != nil { newRec.Meta = make(map[string]string, len(r.Meta)) maps.Copy(newRec.Meta, r.Meta) From 60e5828598cb952ec139c2ad2fa509ca2f34efe9 Mon Sep 17 00:00:00 2001 From: Divyanshu Tiwari <33171967+divyanshu-tiwari@users.noreply.github.com> Date: Thu, 12 Feb 2026 10:39:41 +0530 Subject: [PATCH 3/4] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- internal/pkg/pipeline/task/join/join.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/pkg/pipeline/task/join/join.go b/internal/pkg/pipeline/task/join/join.go index a55469b..5146f04 100644 --- a/internal/pkg/pipeline/task/join/join.go +++ b/internal/pkg/pipeline/task/join/join.go @@ -107,6 +107,6 @@ func (j *join) sendJoinedRecords(output chan<- *record.Record) { joinedData.Write(r.Data) } - j.SendData(nil, joinedData.Bytes(), output) - + data := bytes.Clone(joinedData.Bytes()) + j.SendData(nil, data, output) } From 834ba55baf95d1dfee17a11749628fa495b38806 Mon Sep 17 00:00:00 2001 From: Divyanshu Tiwari Date: Thu, 12 Feb 2026 11:17:23 +0530 Subject: [PATCH 4/4] address comments --- internal/pkg/pipeline/pipeline.go | 13 ++++--------- internal/pkg/pipeline/task/split/split.go | 4 ++-- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/internal/pkg/pipeline/pipeline.go b/internal/pkg/pipeline/pipeline.go index 5bce901..8fba732 100644 --- a/internal/pkg/pipeline/pipeline.go +++ b/internal/pkg/pipeline/pipeline.go @@ -196,16 +196,11 @@ func (p *Pipeline) distributeToChannels(input <-chan *record.Record, outputs []c }() for rec := range input { - for i, ch := range outputs { + // Send clones to all channels to avoid data races + // and ensure GC can clean up records after processing + for _, ch := range outputs { if ch != nil { - if i == 0 { - // First branch gets the original record - ch <- rec - } else { - // Other branches get cloned records to prevent shared references - // and allow independent GC of records in each branch - ch <- rec.Clone() - } + ch <- rec.Clone() } } } diff --git a/internal/pkg/pipeline/task/split/split.go b/internal/pkg/pipeline/task/split/split.go index bd69d71..d53863d 100644 --- a/internal/pkg/pipeline/task/split/split.go +++ b/internal/pkg/pipeline/task/split/split.go @@ -34,8 +34,8 @@ func (s *split) Run(ctx context.Context, input <-chan *record.Record, output cha } data := bytes.TrimSuffix(r.Data, delimBytes) - lines := bytes.SplitSeq(data, delimBytes) - for line := range lines { + lines := bytes.Split(data, delimBytes) + for _, line := range lines { s.SendData(r.Meta, line, output) } }