Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions proxy/nested_payload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package proxy

import (
"reflect"

"go.temporal.io/api/workflowservice/v1/workflowservicenexus"
"google.golang.org/protobuf/proto"
)

const (
// TemporalSystemNexusEndpoint is the well-known endpoint name for system Nexus operations.
TemporalSystemNexusEndpoint = "__temporal_system"
)

// SystemNexusOpKey identifies a system Nexus operation by its (endpoint, operation) pair.
type SystemNexusOpKey struct {
Endpoint string
Operation string
}

// SystemNexusOpTypes maps a system Nexus operation to the proto request and response
// types whose bytes are serialized in NexusOperationScheduled.Input and
// NexusOperationCompleted.Result.
type SystemNexusOpTypes struct {
// NewRequest returns a fresh, zero-valued instance of the request proto.
NewRequest func() proto.Message
// NewResponse returns a fresh, zero-valued instance of the response proto.
NewResponse func() proto.Message
}

// SystemNexusOperations is the canonical registry of known system Nexus operations.
// It is built dynamically from the generated workflowservicenexus package so that
// new operations added to the proto definitions are picked up automatically without
// requiring changes to this file or downstream consumers.
var SystemNexusOperations = buildNexusServiceRegistry(
TemporalSystemNexusEndpoint,
workflowservicenexus.WorkflowService,

Check failure on line 37 in proxy/nested_payload.go

View workflow job for this annotation

GitHub Actions / build-and-test

undefined: workflowservicenexus.WorkflowService
)

// buildNexusServiceRegistry uses reflection to iterate the fields of a generated
// Nexus service struct (e.g. workflowservicenexus.WorkflowService) and builds a
// registry entry for each field that implements the nexus.OperationReference
// interface (i.e. has Name(), InputType(), and OutputType() methods).
func buildNexusServiceRegistry(endpoint string, service any) map[SystemNexusOpKey]SystemNexusOpTypes {
registry := make(map[SystemNexusOpKey]SystemNexusOpTypes)
v := reflect.ValueOf(service)
for i := 0; i < v.NumField(); i++ {
field := v.Field(i)
nameMethod := field.MethodByName("Name")
if !nameMethod.IsValid() {
continue
}
inputTypeMethod := field.MethodByName("InputType")
outputTypeMethod := field.MethodByName("OutputType")
if !inputTypeMethod.IsValid() || !outputTypeMethod.IsValid() {
continue
}

name := nameMethod.Call(nil)[0].String()
inputType := inputTypeMethod.Call(nil)[0].Interface().(reflect.Type)
outputType := outputTypeMethod.Call(nil)[0].Interface().(reflect.Type)

registry[SystemNexusOpKey{Endpoint: endpoint, Operation: name}] = SystemNexusOpTypes{
NewRequest: newProtoFactory(inputType),
NewResponse: newProtoFactory(outputType),
}
}
return registry
}

// newProtoFactory returns a function that creates a new zero-valued proto.Message
// of the given type. The type should be a struct type (not a pointer); the returned
// function allocates a new instance and returns a pointer to it as proto.Message.
func newProtoFactory(t reflect.Type) func() proto.Message {
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
return func() proto.Message {
return reflect.New(t).Interface().(proto.Message)
}
}
25 changes: 25 additions & 0 deletions proxy/nested_payload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package proxy

import (
"testing"

"github.com/stretchr/testify/require"
"go.temporal.io/api/workflowservice/v1"
)

func TestSystemNexusOperations_SignalWithStartRegistered(t *testing.T) {
key := SystemNexusOpKey{
Endpoint: TemporalSystemNexusEndpoint,
Operation: "SignalWithStartWorkflowExecution",
}
types, ok := SystemNexusOperations[key]
require.True(t, ok, "SignalWithStartWorkflowExecution must be registered")

req := types.NewRequest()
_, ok = req.(*workflowservice.SignalWithStartWorkflowExecutionRequest)
require.True(t, ok, "request type must be SignalWithStartWorkflowExecutionRequest")

resp := types.NewResponse()
_, ok = resp.(*workflowservice.SignalWithStartWorkflowExecutionResponse)
require.True(t, ok, "response type must be SignalWithStartWorkflowExecutionResponse")
}
Loading