Skip to content

Commit ae77028

Browse files
authored
feat(pftools): enhance function failure handling with retry logic (#79)
1 parent 3466c9c commit ae77028

5 files changed

Lines changed: 326 additions & 18 deletions

File tree

pkg/mcp/pftools/errors.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ var (
2828
ErrFunctionNotFound = errors.New("function not found")
2929
// ErrNotOurMessage indicates a message that should be ignored.
3030
ErrNotOurMessage = errors.New("not our message")
31+
// ErrFunctionNoInputTopics indicates the function has no input topics.
32+
ErrFunctionNoInputTopics = errors.New("function has no input topics")
33+
// ErrSchemaConversionFailed indicates the schema conversion failed.
34+
ErrSchemaConversionFailed = errors.New("schema conversion failed")
3135
)
3236

3337
// IsClusterUnhealthy checks if an error indicates cluster health issues
@@ -128,3 +132,17 @@ func isNotFoundText(text string) bool {
128132
}
129133
return false
130134
}
135+
136+
// classifyConvertError reports whether a conversion failure is retryable.
137+
func classifyConvertError(err error) failureCategory {
138+
if err == nil {
139+
return failureUnknown
140+
}
141+
if errors.Is(err, ErrFunctionNoInputTopics) || errors.Is(err, ErrSchemaConversionFailed) {
142+
return failurePermanent
143+
}
144+
if IsClusterUnhealthy(err) || IsAuthError(err) || IsNetworkError(err) {
145+
return failureRetryable
146+
}
147+
return failureUnknown
148+
}

pkg/mcp/pftools/errors_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,46 @@ func TestIsNotFoundError(t *testing.T) {
6161
}
6262
})
6363
}
64+
65+
func TestClassifyConvertError(t *testing.T) {
66+
t.Run("no input topics is permanent", func(t *testing.T) {
67+
if classifyConvertError(ErrFunctionNoInputTopics) != failurePermanent {
68+
t.Fatalf("expected permanent for no input topics")
69+
}
70+
})
71+
72+
t.Run("schema conversion is permanent", func(t *testing.T) {
73+
err := errors.Join(ErrSchemaConversionFailed, errors.New("boom"))
74+
if classifyConvertError(err) != failurePermanent {
75+
t.Fatalf("expected permanent for schema conversion failure")
76+
}
77+
})
78+
79+
t.Run("network error is retryable", func(t *testing.T) {
80+
err := errors.New("connection refused")
81+
if classifyConvertError(err) != failureRetryable {
82+
t.Fatalf("expected retryable for network error")
83+
}
84+
})
85+
86+
t.Run("auth error is retryable", func(t *testing.T) {
87+
err := errors.New("token expired")
88+
if classifyConvertError(err) != failureRetryable {
89+
t.Fatalf("expected retryable for auth error")
90+
}
91+
})
92+
93+
t.Run("cluster error is retryable", func(t *testing.T) {
94+
err := rest.Error{Code: 503, Reason: "no healthy upstream"}
95+
if classifyConvertError(err) != failureRetryable {
96+
t.Fatalf("expected retryable for cluster error")
97+
}
98+
})
99+
100+
t.Run("unknown error is unknown", func(t *testing.T) {
101+
err := errors.New("something else")
102+
if classifyConvertError(err) != failureUnknown {
103+
t.Fatalf("expected unknown for generic error")
104+
}
105+
})
106+
}

pkg/mcp/pftools/manager.go

Lines changed: 136 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ package pftools
1616

1717
import (
1818
"context"
19+
"crypto/sha256"
1920
"encoding/json"
21+
"errors"
2022
"fmt"
2123
"log"
2224
"strings"
@@ -93,6 +95,7 @@ func NewPulsarFunctionManager(snServer *Server, readOnly bool, options *ManagerO
9395
v2adminClient: v2adminClient,
9496
pulsarClient: pulsarClient,
9597
fnToToolMap: make(map[string]*FunctionTool),
98+
failedFunctions: make(map[string]*functionFailureState),
9699
mutex: sync.RWMutex{},
97100
producerCache: make(map[string]pulsarclient.Producer),
98101
producerMutex: sync.RWMutex{},
@@ -171,39 +174,93 @@ func (m *PulsarFunctionManager) updateFunctions() {
171174
fullName := getFunctionFullName(fn.Tenant, fn.Namespace, fn.Name)
172175
seenFunctions[fullName] = true
173176

177+
configHash, hashErr := computeFunctionConfigHash(fn)
178+
if hashErr != nil {
179+
log.Printf("Failed to compute config hash for function %s: %v", fullName, hashErr)
180+
}
181+
174182
// Check if we already have this function
175183
m.mutex.RLock()
176-
_, exists := m.fnToToolMap[fullName]
184+
existingFn, exists := m.fnToToolMap[fullName]
185+
failureState, hasFailure := m.failedFunctions[fullName]
177186
m.mutex.RUnlock()
178187

188+
if hasFailure && configHash != "" && failureState.configHash != configHash {
189+
m.mutex.Lock()
190+
delete(m.failedFunctions, fullName)
191+
m.mutex.Unlock()
192+
hasFailure = false
193+
failureState = nil
194+
}
195+
179196
changed := false
180197
if exists {
181198
// Check if the function has changed
182-
existingFn, exists := m.fnToToolMap[fullName]
183-
if exists {
184-
if !cmp.Equal(*existingFn.Function, *fn) {
185-
changed = true
186-
}
187-
if !existingFn.SchemaFetchSuccess {
188-
changed = true
189-
}
199+
if !cmp.Equal(*existingFn.Function, *fn) {
200+
changed = true
201+
}
202+
if !existingFn.SchemaFetchSuccess {
203+
changed = true
190204
}
191205
if !changed {
192206
continue
193207
}
194208
}
195209

210+
if hasFailure && configHash != "" && failureState.configHash == configHash {
211+
if shouldSkipFailure(failureState, m.pollInterval, time.Now()) {
212+
continue
213+
}
214+
}
215+
196216
// Convert function to tool
217+
attemptAt := time.Now()
197218
fnTool, err := m.convertFunctionToTool(fn)
198-
if err != nil || !fnTool.SchemaFetchSuccess {
199-
if err != nil {
200-
log.Printf("Failed to convert function %s to tool: %v", fullName, err)
201-
} else {
202-
log.Printf("Failed to fetch schema for function %s, retry later...", fullName)
219+
if err != nil || (fnTool != nil && !fnTool.SchemaFetchSuccess) {
220+
failureErr := err
221+
if failureErr == nil && fnTool != nil && fnTool.SchemaFetchError != nil {
222+
failureErr = fnTool.SchemaFetchError
223+
}
224+
if failureErr == nil {
225+
failureErr = errors.New("schema fetch failed")
226+
}
227+
228+
category := classifyConvertError(failureErr)
229+
errorMsg := failureErr.Error()
230+
logNow := shouldLogFailure(failureState, configHash, category, errorMsg)
231+
232+
if configHash != "" {
233+
newState := &functionFailureState{
234+
configHash: configHash,
235+
category: category,
236+
lastError: errorMsg,
237+
lastAttemptAt: attemptAt,
238+
}
239+
if logNow {
240+
newState.lastLoggedAt = time.Now()
241+
} else if failureState != nil {
242+
newState.lastLoggedAt = failureState.lastLoggedAt
243+
}
244+
m.mutex.Lock()
245+
m.failedFunctions[fullName] = newState
246+
m.mutex.Unlock()
247+
}
248+
if logNow {
249+
if err != nil {
250+
log.Printf("Failed to convert function %s to tool: %v (category=%s)", fullName, failureErr, category)
251+
} else {
252+
log.Printf("Failed to fetch schema for function %s, retry later: %v (category=%s)", fullName, failureErr, category)
253+
}
203254
}
204255
continue
205256
}
206257

258+
if hasFailure {
259+
m.mutex.Lock()
260+
delete(m.failedFunctions, fullName)
261+
m.mutex.Unlock()
262+
}
263+
207264
if changed {
208265
if m.sessionID != "" {
209266
err := m.mcpServer.DeleteSessionTools(m.sessionID, fnTool.Tool.Name)
@@ -248,12 +305,61 @@ func (m *PulsarFunctionManager) updateFunctions() {
248305
m.mcpServer.DeleteTools(fnTool.Tool.Name)
249306
}
250307
delete(m.fnToToolMap, fullName)
308+
delete(m.failedFunctions, fullName)
251309
log.Printf("Removed function %s from MCP tools [%s]", fullName, fnTool.Tool.Name)
252310
}
253311
}
254312
m.mutex.Unlock()
255313
}
256314

315+
func computeFunctionConfigHash(fn *utils.FunctionConfig) (string, error) {
316+
if fn == nil {
317+
return "", errors.New("function config is nil")
318+
}
319+
data, err := json.Marshal(fn)
320+
if err != nil {
321+
return "", err
322+
}
323+
sum := sha256.Sum256(data)
324+
return fmt.Sprintf("%x", sum[:]), nil
325+
}
326+
327+
func shouldSkipFailure(state *functionFailureState, pollInterval time.Duration, now time.Time) bool {
328+
if state == nil {
329+
return false
330+
}
331+
switch state.category {
332+
case failurePermanent:
333+
return true
334+
case failureRetryable:
335+
if state.lastAttemptAt.IsZero() {
336+
return false
337+
}
338+
return now.Sub(state.lastAttemptAt) < pollInterval
339+
default:
340+
return true
341+
}
342+
}
343+
344+
func shouldLogFailure(prev *functionFailureState, configHash string, category failureCategory, errMsg string) bool {
345+
if prev == nil {
346+
return true
347+
}
348+
if configHash == "" {
349+
return true
350+
}
351+
if prev.configHash != configHash {
352+
return true
353+
}
354+
if prev.category != category {
355+
return true
356+
}
357+
if prev.lastError != errMsg {
358+
return true
359+
}
360+
return false
361+
}
362+
257363
// getFunctionsList retrieves all functions from the specified tenants/namespaces
258364
func (m *PulsarFunctionManager) getFunctionsList() ([]*utils.FunctionConfig, error) {
259365
var allFunctions []*utils.FunctionConfig
@@ -354,9 +460,10 @@ func (m *PulsarFunctionManager) getFunctionsInNamespace(tenant, namespace string
354460
// convertFunctionToTool converts a Pulsar Function to an MCP Tool
355461
func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig) (*FunctionTool, error) {
356462
schemaFetchSuccess := true
463+
var schemaFetchErr error
357464
// Determine input and output topics
358465
if len(fn.InputSpecs) == 0 {
359-
return nil, fmt.Errorf("function has no input topics")
466+
return nil, ErrFunctionNoInputTopics
360467
}
361468

362469
var inputTopic string
@@ -366,7 +473,7 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig)
366473
break
367474
}
368475
if inputTopic == "" {
369-
return nil, fmt.Errorf("function has no input topics")
476+
return nil, ErrFunctionNoInputTopics
370477
}
371478

372479
// Get schema for input topic
@@ -378,7 +485,12 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig)
378485
if restError.Code != 404 {
379486
log.Printf("Failed to get schema for input topic %s: %v", inputTopic, err)
380487
schemaFetchSuccess = false
488+
schemaFetchErr = errors.Join(schemaFetchErr, err)
381489
}
490+
} else {
491+
log.Printf("Failed to get schema for input topic %s: %v", inputTopic, err)
492+
schemaFetchSuccess = false
493+
schemaFetchErr = errors.Join(schemaFetchErr, err)
382494
}
383495
}
384496

@@ -394,7 +506,12 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig)
394506
if restError.Code != 404 {
395507
log.Printf("Failed to get schema for output topic %s: %v", outputTopic, err)
396508
schemaFetchSuccess = false
509+
schemaFetchErr = errors.Join(schemaFetchErr, err)
397510
}
511+
} else {
512+
log.Printf("Failed to get schema for output topic %s: %v", outputTopic, err)
513+
schemaFetchSuccess = false
514+
schemaFetchErr = errors.Join(schemaFetchErr, err)
398515
}
399516
}
400517
}
@@ -409,12 +526,12 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig)
409526

410527
schemaConverter, err := schema.ConverterFactory(inputSchema.Type)
411528
if err != nil {
412-
return nil, fmt.Errorf("failed to create schema converter: %w", err)
529+
return nil, errors.Join(ErrSchemaConversionFailed, err)
413530
}
414531

415532
toolInputSchemaProperties, err := schemaConverter.ToMCPToolInputSchemaProperties(inputSchema.PulsarSchemaInfo)
416533
if err != nil {
417-
return nil, fmt.Errorf("failed to convert input schema to MCP tool input schema properties: %w", err)
534+
return nil, errors.Join(ErrSchemaConversionFailed, err)
418535
}
419536

420537
toolInputSchemaProperties = append(toolInputSchemaProperties, mcp.WithDescription(description))
@@ -441,6 +558,7 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig)
441558
OutputTopic: outputTopic,
442559
Tool: tool,
443560
SchemaFetchSuccess: schemaFetchSuccess,
561+
SchemaFetchError: schemaFetchErr,
444562
}, nil
445563
}
446564

0 commit comments

Comments
 (0)