Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions pkg/epp/handlers/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ limitations under the License.
package handlers

import (
"context"
"strconv"
"time"

configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"google.golang.org/protobuf/types/known/structpb"

"sigs.k8s.io/gateway-api-inference-extension/pkg/common"
Expand All @@ -37,7 +40,7 @@ const (
defaultFairnessID = "default-flow"
)

func (s *StreamingServer) HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest_RequestHeaders) error {
func (s *StreamingServer) HandleRequestHeaders(ctx context.Context, reqCtx *RequestContext, req *extProcPb.ProcessingRequest_RequestHeaders) error {
reqCtx.RequestReceivedTimestamp = time.Now()

// an EoS in the request headers means this request has no body or trailers.
Expand All @@ -52,7 +55,7 @@ func (s *StreamingServer) HandleRequestHeaders(reqCtx *RequestContext, req *extP
}
reqCtx.TargetEndpoint = pod.GetIPAddress() + ":" + pod.GetPort()
reqCtx.RequestSize = 0
reqCtx.reqHeaderResp = s.generateRequestHeaderResponse(reqCtx)
reqCtx.reqHeaderResp = s.generateRequestHeaderResponse(ctx, reqCtx)
return nil
}

Expand Down Expand Up @@ -91,7 +94,7 @@ func (s *StreamingServer) generateRequestBodyResponses(requestBodyBytes []byte)
return responses
}

func (s *StreamingServer) generateRequestHeaderResponse(reqCtx *RequestContext) *extProcPb.ProcessingResponse {
func (s *StreamingServer) generateRequestHeaderResponse(ctx context.Context, reqCtx *RequestContext) *extProcPb.ProcessingResponse {
// The Endpoint Picker supports two approaches to communicating the target endpoint, as a request header
// and as an unstructure ext-proc response metadata key/value pair. This enables different integration
// options for gateway providers.
Expand All @@ -101,7 +104,7 @@ func (s *StreamingServer) generateRequestHeaderResponse(reqCtx *RequestContext)
Response: &extProcPb.CommonResponse{
ClearRouteCache: true,
HeaderMutation: &extProcPb.HeaderMutation{
SetHeaders: s.generateHeaders(reqCtx),
SetHeaders: s.generateHeaders(ctx, reqCtx),
},
},
},
Expand All @@ -110,7 +113,7 @@ func (s *StreamingServer) generateRequestHeaderResponse(reqCtx *RequestContext)
}
}

func (s *StreamingServer) generateHeaders(reqCtx *RequestContext) []*configPb.HeaderValueOption {
func (s *StreamingServer) generateHeaders(ctx context.Context, reqCtx *RequestContext) []*configPb.HeaderValueOption {
// can likely refactor these two bespoke headers to be updated in PostDispatch, to centralize logic.
headers := []*configPb.HeaderValueOption{
{
Expand All @@ -131,6 +134,19 @@ func (s *StreamingServer) generateHeaders(reqCtx *RequestContext) []*configPb.He
})
}

// Inject trace context headers for propagation to downstream services
traceHeaders := make(map[string]string)
propagator := otel.GetTextMapPropagator()
propagator.Inject(ctx, propagation.MapCarrier(traceHeaders))
for key, value := range traceHeaders {
headers = append(headers, &configPb.HeaderValueOption{
Header: &configPb.HeaderValue{
Key: key,
RawValue: []byte(value),
},
})
}

Comment on lines +137 to +149
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should only be done if the user requested tracing. I think we need to add either a command line argument to enable tracing or to add something in the EPP Configuration.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you shouldn't need to manually propagate context like this, as long as the go context.Context is correctly passed around then the otel sdk will handle propagation for you

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, @damemi! I wasn't sure about this, I will remove this and retest to be sure. TY again!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove the manual propagation, then will verify with llm-d:

  1. Does vllm:llm_request span show up as a child of gateway.request?
  2. Does the trace ID remain consistent end-to-end?
  3. If there's an upstream traceparent, is it continued correctly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entry point of request handling is: https://github.com/kubernetes-sigs/gateway-api-inference-extension/blob/main/pkg/epp/handlers/server.go#L128C49-L128C80

Where the context in Go is wrapped in the srv extProcPb.ExternalProcessor_ProcessServer. Does OTel need the context to be explicitly defined in function interface?

ref - https://pkg.go.dev/google.golang.org/grpc#ServerStream

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some testing with the context propagation - it seems with GAIE's architecture we need to manually propagate the trace headers. With GAIE's architecture as an Envoy External Processor it doesn't make HTTP requests directly. Without manual propagation, trace context doesn't reach downstream services. I have confirmed this with some testing. Without the manual trace propagation we see separate spans for gateway-api-inference-extension and vllm services, not the vllm child span with the propagated context headers. I'll leave the manual propagation in.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sallyom ah that's interesting, I didn't think about how this was working with envoy so there could be some work you need to do there. Not something I've worked with before but testing tells the truth

// Include any non-system-owned headers.
for key, value := range reqCtx.Request.Headers {
if request.IsSystemOwnedHeader(key) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/epp/handlers/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package handlers

import (
"context"
"testing"

configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
Expand Down Expand Up @@ -64,7 +65,7 @@ func TestHandleRequestHeaders(t *testing.T) {
},
}

err := server.HandleRequestHeaders(reqCtx, req)
err := server.HandleRequestHeaders(context.Background(), reqCtx, req)
assert.NoError(t, err, "HandleRequestHeaders should not return an error")

assert.Equal(t, tc.wantFairnessID, reqCtx.FairnessID, "FairnessID should match expected value")
Expand Down Expand Up @@ -93,7 +94,7 @@ func TestGenerateHeaders_Sanitization(t *testing.T) {
},
}

results := server.generateHeaders(reqCtx)
results := server.generateHeaders(context.Background(), reqCtx)

gotHeaders := make(map[string]string)
for _, h := range results {
Expand Down
12 changes: 10 additions & 2 deletions pkg/epp/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"github.com/go-logr/logr"
"github.com/google/uuid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -126,6 +128,12 @@ const (

func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
ctx := srv.Context()

// Start tracing span for the request
tracer := otel.Tracer("gateway-api-inference-extension")
ctx, span := tracer.Start(ctx, "gateway.request", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

Comment on lines +131 to +136
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should only be done if the user requested tracing. I think we need to add either a command line argument to enable tracing or to add something in the EPP Configuration.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these calls are a zero-overhead no-op unless a TracerProvider is configured. So, all you should need to gate on the user enabling is the creation of the TracerProvider itself.

For reference, this is the same way that Kubernetes components implement tracing. They actually set up a no-op tracerprovider, but having no TracerProvider configured should be effectively the same.

Either way, it's not about feature gating the tracer.Start() calls, it's about the tracerprovider

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, @damemi! I'll leave as/is but still open to other opinions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the trace initialization is only invoked if the tracing is enabled:

If InitTracing is not invoked, a default noop provider will be used (Correct me if I was wrong here). So it should be fine to keep it the way the PR implements.

logger := log.FromContext(ctx)
loggerTrace := logger.V(logutil.TRACE)
loggerTrace.Info("Processing")
Expand Down Expand Up @@ -204,7 +212,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
loggerTrace = logger.V(logutil.TRACE)
ctx = log.IntoContext(ctx, logger)

err = s.HandleRequestHeaders(reqCtx, v)
err = s.HandleRequestHeaders(ctx, reqCtx, v)
case *extProcPb.ProcessingRequest_RequestBody:
loggerTrace.Info("Incoming body chunk", "EoS", v.RequestBody.EndOfStream)
// In the stream case, we can receive multiple request bodies.
Expand Down Expand Up @@ -240,7 +248,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
break
}
reqCtx.RequestSize = len(requestBodyBytes)
reqCtx.reqHeaderResp = s.generateRequestHeaderResponse(reqCtx)
reqCtx.reqHeaderResp = s.generateRequestHeaderResponse(ctx, reqCtx)
reqCtx.reqBodyResp = s.generateRequestBodyResponses(requestBodyBytes)

metrics.RecordRequestCounter(reqCtx.IncomingModelName, reqCtx.TargetModelName)
Expand Down