Skip to content

Commit 550bf51

Browse files
committed
Propagate backlinks on Signal and Signal-with-Start responses
1 parent a6c424b commit 550bf51

11 files changed

Lines changed: 905 additions & 230 deletions

File tree

api/historyservice/v1/request_response.pb.go

Lines changed: 205 additions & 174 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/internal/temporal/server/api/historyservice/v1/request_response.proto

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,9 @@ message SignalWorkflowExecutionRequest {
491491
bool child_workflow_only = 4;
492492
}
493493

494-
message SignalWorkflowExecutionResponse {}
494+
message SignalWorkflowExecutionResponse {
495+
temporal.api.common.v1.Link link = 1;
496+
}
495497

496498
message SignalWithStartWorkflowExecutionRequest {
497499
option (routing).workflow_id = "signal_with_start_request.workflow_id";
@@ -505,6 +507,8 @@ message SignalWithStartWorkflowExecutionRequest {
505507
message SignalWithStartWorkflowExecutionResponse {
506508
string run_id = 1;
507509
bool started = 2;
510+
temporal.api.common.v1.Link signal_link = 3;
511+
temporal.api.common.v1.Link workflow_start_link = 4;
508512
}
509513

510514
message RemoveSignalMutableStateRequest {

service/frontend/workflow_handler.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2246,15 +2246,17 @@ func (wh *WorkflowHandler) SignalWorkflowExecution(ctx context.Context, request
22462246
return nil, err
22472247
}
22482248

2249-
_, err = wh.historyClient.SignalWorkflowExecution(ctx, &historyservice.SignalWorkflowExecutionRequest{
2249+
resp, err := wh.historyClient.SignalWorkflowExecution(ctx, &historyservice.SignalWorkflowExecutionRequest{
22502250
NamespaceId: namespaceID.String(),
22512251
SignalRequest: request,
22522252
})
22532253
if err != nil {
22542254
return nil, err
22552255
}
22562256

2257-
return &workflowservice.SignalWorkflowExecutionResponse{}, nil
2257+
return &workflowservice.SignalWorkflowExecutionResponse{
2258+
Link: resp.GetLink(),
2259+
}, nil
22582260
}
22592261

22602262
// SignalWithStartWorkflowExecution is used to ensure sending signal to a workflow.
@@ -2364,8 +2366,10 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context,
23642366
}
23652367

23662368
return &workflowservice.SignalWithStartWorkflowExecutionResponse{
2367-
RunId: resp.GetRunId(),
2368-
Started: resp.Started,
2369+
RunId: resp.GetRunId(),
2370+
Started: resp.Started,
2371+
SignalLink: resp.GetSignalLink(),
2372+
WorkflowStartLink: resp.GetWorkflowStartLink(),
23692373
}, nil
23702374
}
23712375

service/history/api/link_util.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package api
2+
3+
import (
4+
commonpb "go.temporal.io/api/common/v1"
5+
enumspb "go.temporal.io/api/enums/v1"
6+
"go.temporal.io/server/common"
7+
)
8+
9+
// GenerateStartedEventRefLink builds a Link pointing to the WORKFLOW_EXECUTION_STARTED event.
10+
// Use this for backlinks to workflow start: the started event is always EventId=1 (FirstEventID)
11+
// and is never buffered, so a concrete EventReference is appropriate.
12+
func GenerateStartedEventRefLink(namespace, workflowID, runID string) *commonpb.Link {
13+
return &commonpb.Link{
14+
Variant: &commonpb.Link_WorkflowEvent_{
15+
WorkflowEvent: &commonpb.Link_WorkflowEvent{
16+
Namespace: namespace,
17+
WorkflowId: workflowID,
18+
RunId: runID,
19+
Reference: &commonpb.Link_WorkflowEvent_EventRef{
20+
EventRef: &commonpb.Link_WorkflowEvent_EventReference{
21+
EventId: common.FirstEventID,
22+
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
23+
},
24+
},
25+
},
26+
},
27+
}
28+
}
29+
30+
// GenerateRequestIdRefLink builds a Link with a RequestIdReference.
31+
// Use this for events that are buffered at signal time (e.g. SIGNALED), where the
32+
// concrete EventId is not yet known. The server resolves the RequestId to a real
33+
// EventId once the buffer flushes.
34+
func GenerateRequestIdRefLink(namespace, workflowID, runID, requestID string, eventType enumspb.EventType) *commonpb.Link {
35+
return &commonpb.Link{
36+
Variant: &commonpb.Link_WorkflowEvent_{
37+
WorkflowEvent: &commonpb.Link_WorkflowEvent{
38+
Namespace: namespace,
39+
WorkflowId: workflowID,
40+
RunId: runID,
41+
Reference: &commonpb.Link_WorkflowEvent_RequestIdRef{
42+
RequestIdRef: &commonpb.Link_WorkflowEvent_RequestIdReference{
43+
RequestId: requestID,
44+
EventType: eventType,
45+
},
46+
},
47+
},
48+
},
49+
}
50+
}

service/history/api/multioperation/api.go

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"go.temporal.io/api/serviceerror"
1111
"go.temporal.io/server/api/historyservice/v1"
1212
"go.temporal.io/server/api/matchingservice/v1"
13-
"go.temporal.io/server/common"
1413
"go.temporal.io/server/common/definition"
1514
"go.temporal.io/server/common/locks"
1615
"go.temporal.io/server/common/namespace"
@@ -338,20 +337,7 @@ func (uws *updateWithStart) updateWorkflow(
338337
RunId: currentWorkflowLease.GetContext().GetWorkflowKey().RunID,
339338
Started: false, // set explicitly for emphasis
340339
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
341-
Link: &commonpb.Link{
342-
Variant: &commonpb.Link_WorkflowEvent_{
343-
WorkflowEvent: &commonpb.Link_WorkflowEvent{
344-
WorkflowId: wfKey.WorkflowID,
345-
RunId: wfKey.RunID,
346-
Reference: &commonpb.Link_WorkflowEvent_EventRef{
347-
EventRef: &commonpb.Link_WorkflowEvent_EventReference{
348-
EventId: common.FirstEventID,
349-
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
350-
},
351-
},
352-
},
353-
},
354-
},
340+
Link: api.GenerateStartedEventRefLink("", wfKey.WorkflowID, wfKey.RunID),
355341
}
356342

357343
return makeResponse(startResp, updateResp), nil

service/history/api/signalwithstartworkflow/api.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,21 @@ func Invoke(
9999
api.ReactivateVersionWorkflowIfPinned(ctx, namespaceEntry, request.GetVersioningOverride(), reactivationSignalCache, reactivationSignaler, shard.GetConfig().EnableVersionReactivationSignals())
100100
}
101101

102+
swr := signalWithStartRequest.SignalWithStartRequest
102103
return &historyservice.SignalWithStartWorkflowExecutionResponse{
103104
RunId: runID,
104105
Started: started,
106+
SignalLink: api.GenerateRequestIdRefLink(
107+
swr.GetNamespace(),
108+
swr.GetWorkflowId(),
109+
runID,
110+
swr.GetRequestId(),
111+
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED,
112+
),
113+
WorkflowStartLink: api.GenerateStartedEventRefLink(
114+
swr.GetNamespace(),
115+
swr.GetWorkflowId(),
116+
runID,
117+
),
105118
}, nil
106119
}

0 commit comments

Comments
 (0)