Skip to content

refactor(go-forwarder): add HTTP middleware and consolidate forwarding#1135

Merged
ndakkoune merged 6 commits into
nabil.dakkoune/go-forwarderfrom
nabil.dakkoune/go-forwarder-middleware
May 28, 2026
Merged

refactor(go-forwarder): add HTTP middleware and consolidate forwarding#1135
ndakkoune merged 6 commits into
nabil.dakkoune/go-forwarderfrom
nabil.dakkoune/go-forwarder-middleware

Conversation

@ndakkoune
Copy link
Copy Markdown
Contributor

@ndakkoune ndakkoune commented May 25, 2026

What does this PR do?

Refactors the forwarding package to use HTTP middleware and enhance errors/logging/retry on intake failures

Motivation

  • Not fail the whole pipeline on the first intake error
  • Retry batch sending on retriable errors
  • Context cancellation testing during request to DD intake

Testing Guidelines

Additional Notes

  • Wanted to add WithHeaders middleware but they vary too much per request (e.g. storage tag)

Types of changes

  • Bug fix
  • New feature
  • Breaking change
  • Misc (docs, refactoring, dependency upgrade, etc.)

Check all that apply

  • This PR's description is comprehensive
  • This PR contains breaking changes that are documented in the description
  • This PR introduces new APIs or parameters that are documented and unlikely to change in the foreseeable future
  • This PR impacts documentation, and it has been updated (or a ticket has been logged)
  • This PR's changes are covered by the automated tests
  • This PR collects user input/sensitive content into Datadog
  • This PR passes the integration tests (ask a Datadog member to run the tests)
  • This PR passes the unit tests
  • This PR passes the installation tests (ask a Datadog member to run the tests)

@github-actions github-actions Bot added the aws label May 25, 2026
@ndakkoune ndakkoune marked this pull request as ready for review May 25, 2026 14:18
@ndakkoune ndakkoune requested a review from a team as a code owner May 25, 2026 14:18
@ndakkoune ndakkoune requested review from ViBiOh and ge0Aja May 25, 2026 14:19
@ViBiOh ViBiOh self-assigned this May 26, 2026
Comment thread aws/logs_monitoring_go/internal/forwarding/forwarding.go
Comment thread aws/logs_monitoring_go/internal/forwarding/forwarding.go
Comment thread aws/logs_monitoring_go/internal/forwarding/forwarding.go Outdated
Comment thread aws/logs_monitoring_go/internal/forwarding/forwarding.go Outdated
Comment thread aws/logs_monitoring_go/internal/forwarding/middleware.go

if err != nil {
slog.LogAttrs(req.Context(), slog.LevelWarn, "request failed", append(attrs, slog.String("error", err.Error()))...)
continue
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of error here, we continue (retry) without waiting some time. Is it what we want?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error check is for requests that failed to be created (or something went wrong in the middleware stack). We don't want to retry those since they haven't reach the intake endpoint, hence we instantly retry them without waiting.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(e.g. DNS lookup fail should just retry after some time bc it could be transient)

@datadog-prod-us1-5

This comment has been minimized.

@ndakkoune ndakkoune force-pushed the nabil.dakkoune/go-forwarder-middleware branch from 8b93f8d to f846620 Compare May 27, 2026 10:33
Comment thread aws/logs_monitoring_go/internal/forwarding/forwarding.go Outdated
func WithCompression(next http.RoundTripper) RoundTripperFunc {
return func(req *http.Request) (*http.Response, error) {
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syncPool @ViBiOh

Comment thread aws/logs_monitoring_go/internal/forwarding/middleware.go Outdated
Comment thread aws/logs_monitoring_go/internal/forwarding/middleware.go Outdated
Comment thread aws/logs_monitoring_go/internal/forwarding/middleware.go
return func(req *http.Request) (*http.Response, error) {
buf := getBuffer()
gz := getGzipWriter()
defer bufPool.Put(buf)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ViBiOh If a middleware layer call WithCompression, it may access the buffer/body of another goroutine. Should I make something about that or I can assume that I will always have the WithCompression as the first layer ?

@ndakkoune ndakkoune merged commit 5c0530d into nabil.dakkoune/go-forwarder May 28, 2026
10 checks passed
@ndakkoune ndakkoune deleted the nabil.dakkoune/go-forwarder-middleware branch May 28, 2026 16:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants