Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 21 additions & 49 deletions internal/temporalcli/commands.activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ func (c *TemporalActivityUpdateOptionsCommand) run(cctx *CommandContext, args []
}

if exec != nil {
if c.ActivityId == "" {
return fmt.Errorf("either --activity-id and --workflow-id, or --query must be set")
}
result, err := cl.WorkflowService().UpdateActivityOptions(cctx, &workflowservice.UpdateActivityOptionsRequest{
Namespace: c.Parent.Namespace,
Execution: &common.WorkflowExecution{
Expand Down Expand Up @@ -208,21 +211,13 @@ func (c *TemporalActivityUpdateOptionsCommand) run(cctx *CommandContext, args []
} else {
updateActivitiesOperation := &batch.BatchOperationUpdateActivityOptions{
Identity: c.Parent.Identity,
Activity: &batch.BatchOperationUpdateActivityOptions_Type{Type: c.ActivityType},
Activity: &batch.BatchOperationUpdateActivityOptions_MatchAll{MatchAll: true},
UpdateMask: &fieldmaskpb.FieldMask{
Paths: updatePath,
},
RestoreOriginal: c.RestoreOriginalOptions,
}

if c.ActivityType != "" {
updateActivitiesOperation.Activity = &batch.BatchOperationUpdateActivityOptions_Type{Type: c.ActivityType}
} else if c.MatchAll {
updateActivitiesOperation.Activity = &batch.BatchOperationUpdateActivityOptions_MatchAll{MatchAll: true}
} else {
return fmt.Errorf("either Activity Type must be provided or MatchAll must be set to true")
}

batchReq.Operation = &workflowservice.StartBatchOperationRequest_UpdateActivityOptionsOperation{
UpdateActivityOptionsOperation: updateActivitiesOperation,
}
Expand All @@ -235,6 +230,10 @@ func (c *TemporalActivityUpdateOptionsCommand) run(cctx *CommandContext, args []
}

func (c *TemporalActivityPauseCommand) run(cctx *CommandContext, args []string) error {
if c.ActivityId == "" {
return fmt.Errorf("Activity Id must be specified")
}

cl, err := dialClient(cctx, &c.Parent.ClientOptions)
if err != nil {
return err
Expand All @@ -249,19 +248,12 @@ func (c *TemporalActivityPauseCommand) run(cctx *CommandContext, args []string)
},
Identity: c.Identity,
Reason: c.Reason,
Activity: &workflowservice.PauseActivityRequest_Id{Id: c.ActivityId},
}
if request.Identity == "" {
request.Identity = c.Parent.Identity
}

if c.ActivityId != "" && c.ActivityType != "" {
return fmt.Errorf("either Activity Type or Activity Id, but not both")
} else if c.ActivityType != "" {
request.Activity = &workflowservice.PauseActivityRequest_Type{Type: c.ActivityType}
} else if c.ActivityId != "" {
request.Activity = &workflowservice.PauseActivityRequest_Id{Id: c.ActivityId}
}

_, err = cl.WorkflowService().PauseActivity(cctx, request)
if err != nil {
return fmt.Errorf("unable to pause Activity: %w", err)
Expand Down Expand Up @@ -294,6 +286,10 @@ func (c *TemporalActivityUnpauseCommand) run(cctx *CommandContext, args []string
}

if exec != nil { // single workflow operation
if c.ActivityId == "" {
return fmt.Errorf("either --activity-id and --workflow-id, or --query must be set")
}

request := &workflowservice.UnpauseActivityRequest{
Namespace: c.Parent.Namespace,
Execution: &common.WorkflowExecution{
Expand All @@ -304,14 +300,7 @@ func (c *TemporalActivityUnpauseCommand) run(cctx *CommandContext, args []string
ResetHeartbeat: c.ResetHeartbeats,
Jitter: durationpb.New(c.Jitter.Duration()),
Identity: c.Parent.Identity,
}

if c.ActivityId != "" && c.ActivityType != "" {
return fmt.Errorf("either Activity Type or Activity Id, but not both")
} else if c.ActivityType != "" {
request.Activity = &workflowservice.UnpauseActivityRequest_Type{Type: c.ActivityType}
} else if c.ActivityId != "" {
request.Activity = &workflowservice.UnpauseActivityRequest_Id{Id: c.ActivityId}
Activity: &workflowservice.UnpauseActivityRequest_Id{Id: c.ActivityId},
}

_, err = cl.WorkflowService().UnpauseActivity(cctx, request)
Expand All @@ -324,13 +313,7 @@ func (c *TemporalActivityUnpauseCommand) run(cctx *CommandContext, args []string
ResetAttempts: c.ResetAttempts,
ResetHeartbeat: c.ResetHeartbeats,
Jitter: durationpb.New(c.Jitter.Duration()),
}
if c.ActivityType != "" {
unpauseActivitiesOperation.Activity = &batch.BatchOperationUnpauseActivities_Type{Type: c.ActivityType}
} else if c.MatchAll {
unpauseActivitiesOperation.Activity = &batch.BatchOperationUnpauseActivities_MatchAll{MatchAll: true}
} else {
return fmt.Errorf("either Activity Type must be provided or MatchAll must be set to true")
Activity: &batch.BatchOperationUnpauseActivities_MatchAll{MatchAll: true},
}

batchReq.Operation = &workflowservice.StartBatchOperationRequest_UnpauseActivitiesOperation{
Expand Down Expand Up @@ -369,7 +352,12 @@ func (c *TemporalActivityResetCommand) run(cctx *CommandContext, args []string)
}

if exec != nil { // single workflow operation
if c.ActivityId == "" {
return fmt.Errorf("either --activity-id and --workflow-id, or --query must be set")
}

request := &workflowservice.ResetActivityRequest{
Activity: &workflowservice.ResetActivityRequest_Id{Id: c.ActivityId},
Namespace: c.Parent.Namespace,
Execution: &common.WorkflowExecution{
WorkflowId: c.WorkflowId,
Expand All @@ -380,16 +368,6 @@ func (c *TemporalActivityResetCommand) run(cctx *CommandContext, args []string)
ResetHeartbeat: c.ResetHeartbeats,
}

if c.ActivityId != "" && c.ActivityType != "" {
return fmt.Errorf("either Activity Type or Activity Id, but not both")
} else if c.ActivityType != "" {
request.Activity = &workflowservice.ResetActivityRequest_Type{Type: c.ActivityType}
} else if c.ActivityId != "" {
request.Activity = &workflowservice.ResetActivityRequest_Id{Id: c.ActivityId}
} else {
return fmt.Errorf("either Activity Type or Activity Id must be provided")
}

resp, err := cl.WorkflowService().ResetActivity(cctx, request)
if err != nil {
return fmt.Errorf("unable to reset an Activity: %w", err)
Expand All @@ -414,13 +392,7 @@ func (c *TemporalActivityResetCommand) run(cctx *CommandContext, args []string)
KeepPaused: c.KeepPaused,
Jitter: durationpb.New(c.Jitter.Duration()),
RestoreOriginalOptions: c.RestoreOriginalOptions,
}
if c.ActivityType != "" {
resetActivitiesOperation.Activity = &batch.BatchOperationResetActivities_Type{Type: c.ActivityType}
} else if c.MatchAll {
resetActivitiesOperation.Activity = &batch.BatchOperationResetActivities_MatchAll{MatchAll: true}
} else {
return fmt.Errorf("either Activity Type must be provided or MatchAll must be set to true")
Activity: &batch.BatchOperationResetActivities_MatchAll{MatchAll: true},
}

batchReq.Operation = &workflowservice.StartBatchOperationRequest_ResetActivitiesOperation{
Expand Down
47 changes: 24 additions & 23 deletions internal/temporalcli/commands.activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ import (
)

const (
activityId string = "dev-activity-id"
activityType string = "DevActivity"
identity string = "MyIdentity"
activityId string = "dev-activity-id"
identity string = "MyIdentity"
)

func (s *SharedServerSuite) TestActivity_Complete() {
Expand Down Expand Up @@ -227,33 +226,35 @@ func (s *SharedServerSuite) TestActivityPauseUnpause() {
}, 5*time.Second, 100*time.Millisecond)
}

func (s *SharedServerSuite) TestActivityPauseUnpauseByType() {
func (s *SharedServerSuite) TestActivityCommandFailed_NoActivityId() {
run := s.waitActivityStarted()
res := sendActivityCommand("pause", run, s, "--activity-type", activityType)
s.NoError(res.Err)

res = sendActivityCommand("unpause", run, s, "--activity-type", activityType, "--reset-attempts")
s.NoError(res.Err)
}

func (s *SharedServerSuite) TestActivityCommandFailed_NoActivityTpeOrId() {
run := s.waitActivityStarted()
// pause is single-workflow only
res := sendActivityCommand("pause", run, s)
s.ErrorContains(res.Err, "Activity Id must be specified")

commands := []string{"pause", "unpause", "reset"}
for _, command := range commands {
// should fail because both activity-id and activity-type are not provided
res := sendActivityCommand(command, run, s)
s.Error(res.Err)
// unpause and reset support both single-workflow and batch modes
for _, command := range []string{"unpause", "reset"} {
res = sendActivityCommand(command, run, s)
s.ErrorContains(res.Err, "either --activity-id and --workflow-id, or --query must be set")
}
}

func (s *SharedServerSuite) TestActivityCommandFailed_BothActivityTpeOrId() {
func (s *SharedServerSuite) TestActivityCommandFailed_BothWorkflowIdAndQuery() {
run := s.waitActivityStarted()

commands := []string{"pause", "unpause", "reset"}
// unpause and reset support both single-workflow (--workflow-id) and batch
// (--query) modes; providing both at once should fail.
commands := []string{"unpause", "reset"}
for _, command := range commands {
res := sendActivityCommand(command, run, s, "--activity-id", activityId, "--activity-type", activityType)
s.Error(res.Err)
res := s.Execute(
"activity", command,
"--workflow-id", run.GetID(),
"--query", "WorkflowType='DevWorkflow'",
"--activity-id", activityId,
"--address", s.Address(),
)
s.ErrorContains(res.Err, "cannot set query when workflow ID is set")
}
}

Expand Down Expand Up @@ -423,7 +424,7 @@ func (s *SharedServerSuite) TestUnpauseActivity_BatchSuccess() {
"--address", s.Address(),
"--query", query,
"--reason", "unpause-test",
"--yes", "--match-all",
"--yes",
)
s.NoError(cmdRes.Err)
s.NotEmpty(startBatchRequest.JobId)
Expand Down Expand Up @@ -507,7 +508,7 @@ func (s *SharedServerSuite) TestResetActivity_BatchSuccess() {
"--address", s.Address(),
"--query", query,
"--reason", "unpause-test",
"--yes", "--match-all",
"--yes",
)
s.NoError(cmdRes.Err)
s.NotEmpty(startBatchRequest.JobId)
Expand Down
Loading
Loading