Skip to content

Commit deba8b2

Browse files
committed
Add tracing entry span with W3C propagation to EPP handler
Signed-off-by: sallyom <somalley@redhat.com>
1 parent 271932f commit deba8b2

File tree

3 files changed

+34
-9
lines changed

3 files changed

+34
-9
lines changed

pkg/epp/handlers/request.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@ limitations under the License.
1717
package handlers
1818

1919
import (
20+
"context"
2021
"strconv"
2122
"time"
2223

2324
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2425
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
26+
"go.opentelemetry.io/otel"
27+
"go.opentelemetry.io/otel/propagation"
2528
"google.golang.org/protobuf/types/known/structpb"
2629

2730
"sigs.k8s.io/gateway-api-inference-extension/pkg/common"
@@ -37,7 +40,7 @@ const (
3740
defaultFairnessID = "default-flow"
3841
)
3942

40-
func (s *StreamingServer) HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest_RequestHeaders) error {
43+
func (s *StreamingServer) HandleRequestHeaders(ctx context.Context, reqCtx *RequestContext, req *extProcPb.ProcessingRequest_RequestHeaders) error {
4144
reqCtx.RequestReceivedTimestamp = time.Now()
4245

4346
// an EoS in the request headers means this request has no body or trailers.
@@ -52,7 +55,7 @@ func (s *StreamingServer) HandleRequestHeaders(reqCtx *RequestContext, req *extP
5255
}
5356
reqCtx.TargetEndpoint = pod.GetIPAddress() + ":" + pod.GetPort()
5457
reqCtx.RequestSize = 0
55-
reqCtx.reqHeaderResp = s.generateRequestHeaderResponse(reqCtx)
58+
reqCtx.reqHeaderResp = s.generateRequestHeaderResponse(ctx, reqCtx)
5659
return nil
5760
}
5861

@@ -91,7 +94,7 @@ func (s *StreamingServer) generateRequestBodyResponses(requestBodyBytes []byte)
9194
return responses
9295
}
9396

94-
func (s *StreamingServer) generateRequestHeaderResponse(reqCtx *RequestContext) *extProcPb.ProcessingResponse {
97+
func (s *StreamingServer) generateRequestHeaderResponse(ctx context.Context, reqCtx *RequestContext) *extProcPb.ProcessingResponse {
9598
// The Endpoint Picker supports two approaches to communicating the target endpoint, as a request header
9699
// and as an unstructure ext-proc response metadata key/value pair. This enables different integration
97100
// options for gateway providers.
@@ -101,7 +104,7 @@ func (s *StreamingServer) generateRequestHeaderResponse(reqCtx *RequestContext)
101104
Response: &extProcPb.CommonResponse{
102105
ClearRouteCache: true,
103106
HeaderMutation: &extProcPb.HeaderMutation{
104-
SetHeaders: s.generateHeaders(reqCtx),
107+
SetHeaders: s.generateHeaders(ctx, reqCtx),
105108
},
106109
},
107110
},
@@ -110,7 +113,7 @@ func (s *StreamingServer) generateRequestHeaderResponse(reqCtx *RequestContext)
110113
}
111114
}
112115

113-
func (s *StreamingServer) generateHeaders(reqCtx *RequestContext) []*configPb.HeaderValueOption {
116+
func (s *StreamingServer) generateHeaders(ctx context.Context, reqCtx *RequestContext) []*configPb.HeaderValueOption {
114117
// can likely refactor these two bespoke headers to be updated in PostDispatch, to centralize logic.
115118
headers := []*configPb.HeaderValueOption{
116119
{
@@ -131,6 +134,19 @@ func (s *StreamingServer) generateHeaders(reqCtx *RequestContext) []*configPb.He
131134
})
132135
}
133136

137+
// Inject trace context headers for propagation to downstream services
138+
traceHeaders := make(map[string]string)
139+
propagator := otel.GetTextMapPropagator()
140+
propagator.Inject(ctx, propagation.MapCarrier(traceHeaders))
141+
for key, value := range traceHeaders {
142+
headers = append(headers, &configPb.HeaderValueOption{
143+
Header: &configPb.HeaderValue{
144+
Key: key,
145+
RawValue: []byte(value),
146+
},
147+
})
148+
}
149+
134150
// Include any non-system-owned headers.
135151
for key, value := range reqCtx.Request.Headers {
136152
if request.IsSystemOwnedHeader(key) {

pkg/epp/handlers/request_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package handlers
1818

1919
import (
20+
"context"
2021
"testing"
2122

2223
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
@@ -64,7 +65,7 @@ func TestHandleRequestHeaders(t *testing.T) {
6465
},
6566
}
6667

67-
err := server.HandleRequestHeaders(reqCtx, req)
68+
err := server.HandleRequestHeaders(context.Background(), reqCtx, req)
6869
assert.NoError(t, err, "HandleRequestHeaders should not return an error")
6970

7071
assert.Equal(t, tc.wantFairnessID, reqCtx.FairnessID, "FairnessID should match expected value")
@@ -93,7 +94,7 @@ func TestGenerateHeaders_Sanitization(t *testing.T) {
9394
},
9495
}
9596

96-
results := server.generateHeaders(reqCtx)
97+
results := server.generateHeaders(context.Background(), reqCtx)
9798

9899
gotHeaders := make(map[string]string)
99100
for _, h := range results {

pkg/epp/handlers/server.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
2828
"github.com/go-logr/logr"
2929
"github.com/google/uuid"
30+
"go.opentelemetry.io/otel"
31+
"go.opentelemetry.io/otel/trace"
3032
"google.golang.org/grpc/codes"
3133
"google.golang.org/grpc/status"
3234

@@ -126,6 +128,12 @@ const (
126128

127129
func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
128130
ctx := srv.Context()
131+
132+
// Start tracing span for the request
133+
tracer := otel.Tracer("gateway-api-inference-extension")
134+
ctx, span := tracer.Start(ctx, "gateway.request", trace.WithSpanKind(trace.SpanKindServer))
135+
defer span.End()
136+
129137
logger := log.FromContext(ctx)
130138
loggerTrace := logger.V(logutil.TRACE)
131139
loggerTrace.Info("Processing")
@@ -204,7 +212,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
204212
loggerTrace = logger.V(logutil.TRACE)
205213
ctx = log.IntoContext(ctx, logger)
206214

207-
err = s.HandleRequestHeaders(reqCtx, v)
215+
err = s.HandleRequestHeaders(ctx, reqCtx, v)
208216
case *extProcPb.ProcessingRequest_RequestBody:
209217
loggerTrace.Info("Incoming body chunk", "EoS", v.RequestBody.EndOfStream)
210218
// In the stream case, we can receive multiple request bodies.
@@ -240,7 +248,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
240248
break
241249
}
242250
reqCtx.RequestSize = len(requestBodyBytes)
243-
reqCtx.reqHeaderResp = s.generateRequestHeaderResponse(reqCtx)
251+
reqCtx.reqHeaderResp = s.generateRequestHeaderResponse(ctx, reqCtx)
244252
reqCtx.reqBodyResp = s.generateRequestBodyResponses(requestBodyBytes)
245253

246254
metrics.RecordRequestCounter(reqCtx.IncomingModelName, reqCtx.TargetModelName)

0 commit comments

Comments
 (0)