-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathblocking.go
More file actions
144 lines (118 loc) · 4.23 KB
/
blocking.go
File metadata and controls
144 lines (118 loc) · 4.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package responses
import (
"context"
"errors"
"net/http"
"time"
"github.com/google/uuid"
"github.com/openai/openai-go/v3/option"
"github.com/openai/openai-go/v3/responses"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/xerrors"
"cdr.dev/slog/v3"
"github.com/coder/aibridge/config"
aibcontext "github.com/coder/aibridge/context"
"github.com/coder/aibridge/intercept"
"github.com/coder/aibridge/mcp"
"github.com/coder/aibridge/recorder"
"github.com/coder/aibridge/tracing"
)
type BlockingResponsesInterceptor struct {
responsesInterceptionBase
}
func NewBlockingInterceptor(
id uuid.UUID,
reqPayload ResponsesRequestPayload,
providerName string,
cfg config.OpenAI,
clientHeaders http.Header,
authHeaderName string,
tracer trace.Tracer,
cred intercept.CredentialInfo,
) *BlockingResponsesInterceptor {
return &BlockingResponsesInterceptor{
responsesInterceptionBase: responsesInterceptionBase{
id: id,
providerName: providerName,
reqPayload: reqPayload,
cfg: cfg,
clientHeaders: clientHeaders,
authHeaderName: authHeaderName,
tracer: tracer,
credential: cred,
},
}
}
func (i *BlockingResponsesInterceptor) Setup(logger slog.Logger, rec recorder.Recorder, mcpProxy mcp.ServerProxier) {
i.responsesInterceptionBase.Setup(logger.Named("blocking"), rec, mcpProxy)
}
func (*BlockingResponsesInterceptor) Streaming() bool {
return false
}
func (i *BlockingResponsesInterceptor) TraceAttributes(r *http.Request) []attribute.KeyValue {
return i.responsesInterceptionBase.baseTraceAttributes(r, false)
}
func (i *BlockingResponsesInterceptor) ProcessRequest(w http.ResponseWriter, r *http.Request) (outErr error) {
ctx, span := i.tracer.Start(r.Context(), "Intercept.ProcessRequest", trace.WithAttributes(tracing.InterceptionAttributesFromContext(r.Context())...))
defer tracing.EndSpanErr(span, &outErr)
if err := i.validateRequest(ctx, w); err != nil {
return err
}
i.injectTools()
var (
response *responses.Response
upstreamErr error
respCopy responseCopier
firstResponseID string
)
prompt, promptFound, err := i.reqPayload.lastUserPrompt(ctx, i.logger)
if err != nil {
i.logger.Warn(ctx, "failed to get user prompt", slog.Error(err))
}
shouldLoop := true
for shouldLoop {
srv := i.newResponsesService()
respCopy = responseCopier{}
opts := i.requestOptions(&respCopy)
opts = append(opts, option.WithRequestTimeout(time.Second*600))
// TODO(ssncferreira): inject actor headers directly in the client-header
// middleware instead of using SDK options.
if actor := aibcontext.ActorFromContext(r.Context()); actor != nil && i.cfg.SendActorHeaders {
opts = append(opts, intercept.ActorHeadersAsOpenAIOpts(actor)...)
}
response, upstreamErr = i.newResponse(ctx, srv, opts)
if upstreamErr != nil || response == nil {
break
}
if firstResponseID == "" {
firstResponseID = response.ID
}
i.recordTokenUsage(ctx, response)
i.recordModelThoughts(ctx, response)
// Check if there any injected tools to invoke.
pending := i.getPendingInjectedToolCalls(response)
shouldLoop, err = i.handleInnerAgenticLoop(ctx, pending, response)
if err != nil {
i.sendCustomErr(ctx, w, http.StatusInternalServerError, err)
shouldLoop = false
}
}
if promptFound {
i.recordUserPrompt(ctx, firstResponseID, prompt)
}
i.recordNonInjectedToolUsage(ctx, response)
if upstreamErr != nil && !respCopy.responseReceived.Load() {
// no response received from upstream, return custom error
i.sendCustomErr(ctx, w, http.StatusInternalServerError, upstreamErr)
return xerrors.Errorf("failed to connect to upstream: %w", upstreamErr)
}
err = respCopy.forwardResp(w)
return errors.Join(upstreamErr, err)
}
func (i *BlockingResponsesInterceptor) newResponse(ctx context.Context, srv responses.ResponseService, opts []option.RequestOption) (_ *responses.Response, outErr error) {
ctx, span := i.tracer.Start(ctx, "Intercept.ProcessRequest.Upstream", trace.WithAttributes(tracing.InterceptionAttributesFromContext(ctx)...))
defer tracing.EndSpanErr(span, &outErr)
// The body is overridden by option.WithRequestBody(reqPayload) in requestOptions
return srv.New(ctx, responses.ResponseNewParams{}, opts...)
}