Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cmd/workflow/activate/activate.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package activate

import (
"context"
"fmt"

"github.com/rs/zerolog"
Expand Down Expand Up @@ -47,7 +48,7 @@ func New(runtimeContext *runtime.Context) *cobra.Command {
if err := handler.ValidateInputs(); err != nil {
return err
}
return handler.Execute()
return handler.Execute(cmd.Context())
},
}

Expand All @@ -66,6 +67,7 @@ type handler struct {
runtimeContext *runtime.Context

validated bool
execCtx context.Context
}

func newHandler(ctx *runtime.Context) *handler {
Expand Down Expand Up @@ -110,7 +112,9 @@ func (h *handler) ValidateInputs() error {
return nil
}

func (h *handler) Execute() error {
func (h *handler) Execute(ctx context.Context) error {
h.execCtx = ctx

if !h.validated {
return fmt.Errorf("handler inputs not validated")
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/workflow/activate/activate_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package activate

import (
"context"
"errors"
"testing"

Expand Down Expand Up @@ -35,7 +36,7 @@ func TestNonInteractive_WithoutYes_ReturnsError(t *testing.T) {
}
handler.validated = true

err := handler.Execute()
err := handler.Execute(context.Background())
require.Error(t, err)
require.Contains(t, err.Error(), "missing required flags for --non-interactive mode")
}
Expand All @@ -62,7 +63,7 @@ func TestNonInteractive_WithYes_PassesGuard(t *testing.T) {
}
handler.validated = true

err := handler.Execute()
err := handler.Execute(context.Background())
// Guard passes; error comes from WRC (no matching workflow), not the guard
require.Error(t, err)
require.NotContains(t, err.Error(), "missing required flags for --non-interactive mode")
Expand Down
4 changes: 2 additions & 2 deletions cmd/workflow/activate/registry_activate_strategy_private.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (a *privateRegistryActivateStrategy) Activate() error {

ui.Dim(fmt.Sprintf("Fetching workflow to activate... Name=%s", workflowName))

workflow, err := a.prc.GetWorkflowByName(workflowName)
workflow, err := a.prc.GetWorkflowByName(a.h.execCtx, workflowName)
if err != nil {
return fmt.Errorf("failed to get workflow: %w", err)
}
Expand All @@ -45,7 +45,7 @@ func (a *privateRegistryActivateStrategy) Activate() error {

ui.Dim(fmt.Sprintf("Processing activation for workflow ID %s...", workflow.WorkflowID))

result, err := a.prc.ActivateWorkflowInRegistry(workflow.WorkflowID)
result, err := a.prc.ActivateWorkflowInRegistry(a.h.execCtx, workflow.WorkflowID)
if err != nil {
return fmt.Errorf("failed to activate workflow in private registry: %w", err)
}
Expand Down
8 changes: 6 additions & 2 deletions cmd/workflow/delete/delete.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package delete

import (
"context"
"fmt"
"io"

Expand Down Expand Up @@ -44,7 +45,7 @@ func New(runtimeContext *runtime.Context) *cobra.Command {
if err != nil {
return err
}
return handler.Execute()
return handler.Execute(cmd.Context())
},
}

Expand All @@ -66,6 +67,7 @@ type handler struct {
runtimeContext *runtime.Context

validated bool
execCtx context.Context
}

func newHandler(ctx *runtime.Context, stdin io.Reader) *handler {
Expand Down Expand Up @@ -128,11 +130,13 @@ func (h *handler) ValidateInputs() error {
return nil
}

func (h *handler) Execute() error {
func (h *handler) Execute(ctx context.Context) error {
if !h.validated {
return fmt.Errorf("handler inputs not validated")
}

h.execCtx = ctx

adapter, err := newRegistryDeleteStrategy(h.runtimeContext.ResolvedRegistry, h)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions cmd/workflow/delete/registry_delete_strategy_private.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (a *privateRegistryDeleteStrategy) FetchWorkflows() ([]WorkflowToDelete, er

ui.Dim(fmt.Sprintf("Fetching workflow to delete... Name=%s", workflowName))

workflow, err := a.prc.GetWorkflowByName(workflowName)
workflow, err := a.prc.GetWorkflowByName(a.h.execCtx, workflowName)
if err != nil {
return nil, fmt.Errorf("failed to get workflow: %w", err)
}
Expand All @@ -55,7 +55,7 @@ func (a *privateRegistryDeleteStrategy) DeleteWorkflows(workflows []WorkflowToDe

for _, wf := range workflows {
workflowID := wf.RawID.(string)
deletedID, err := a.prc.DeleteWorkflowInRegistry(workflowID)
deletedID, err := a.prc.DeleteWorkflowInRegistry(a.h.execCtx, workflowID)
if err != nil {
h.log.Error().
Err(err).
Expand Down
4 changes: 4 additions & 0 deletions cmd/workflow/deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ type handler struct {
// existingWorkflowStatus stores the status of an existing workflow when updating.
// nil means this is a new workflow, otherwise it contains the current status (0=active, 1=paused).
existingWorkflowStatus *uint8

execCtx context.Context
}

var defaultOutputPath = "./binary.wasm.br.b64"
Expand Down Expand Up @@ -209,6 +211,8 @@ func (h *handler) Execute(ctx context.Context) error {
return fmt.Errorf("handler inputs not validated")
}

h.execCtx = ctx

deployAccess, err := h.credentials.GetDeploymentAccessStatus()
if err != nil {
return fmt.Errorf("failed to check deployment access: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions cmd/workflow/deploy/private_registry_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package deploy

import (
"context"
"encoding/base64"
"encoding/hex"
"encoding/json"
Expand Down Expand Up @@ -312,6 +313,7 @@ func TestCheckWorkflowExists_PrivateRegistry(t *testing.T) {
defer gqlServer.Close()

h.environmentSet.GraphQLURL = gqlServer.URL
h.execCtx = context.Background()
strategy := newPrivateRegistryDeployStrategy(h)

exists, status, err := strategy.CheckWorkflowExists("", "jnowak-workflow-test-v5", "", tt.workflowID)
Expand Down
4 changes: 2 additions & 2 deletions cmd/workflow/deploy/registry_deploy_strategy_private.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (a *privateRegistryDeployStrategy) RunPreDeployChecks() error {
func (a *privateRegistryDeployStrategy) CheckWorkflowExists(_, workflowName, _, workflowID string) (bool, *uint8, error) {
a.ensureClient()

workflow, err := a.prc.GetWorkflowByName(workflowName)
workflow, err := a.prc.GetWorkflowByName(a.h.execCtx, workflowName)
if err == nil {
if workflow.WorkflowID == workflowID {
return true, offchainStatusToUint8(workflow.Status), fmt.Errorf("workflow with id %s is already registered and unchanged; re-deployment skipped: %w", workflowID, errWorkflowUnchanged)
Expand All @@ -57,7 +57,7 @@ func (a *privateRegistryDeployStrategy) Upsert() error {
ui.Line()
ui.Dim(fmt.Sprintf("Registering workflow in private registry (workflowID: %s)...", input.WorkflowID))

result, err := a.prc.UpsertWorkflowInRegistry(input)
result, err := a.prc.UpsertWorkflowInRegistry(a.h.execCtx, input)
if err != nil {
return fmt.Errorf("failed to register workflow in private registry: %w", err)
}
Expand Down
8 changes: 6 additions & 2 deletions cmd/workflow/pause/pause.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pause

import (
"context"
"fmt"

"github.com/rs/zerolog"
Expand Down Expand Up @@ -46,7 +47,7 @@ func New(runtimeContext *runtime.Context) *cobra.Command {
if err := handler.ValidateInputs(); err != nil {
return err
}
return handler.Execute()
return handler.Execute(cmd.Context())
},
}

Expand All @@ -64,6 +65,7 @@ type handler struct {
runtimeContext *runtime.Context

validated bool
execCtx context.Context
}

func newHandler(ctx *runtime.Context) *handler {
Expand Down Expand Up @@ -107,7 +109,9 @@ func (h *handler) ValidateInputs() error {
return nil
}

func (h *handler) Execute() error {
func (h *handler) Execute(ctx context.Context) error {
h.execCtx = ctx

if !h.validated {
return fmt.Errorf("handler inputs not validated")
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/workflow/pause/pause_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pause

import (
"context"
"errors"
"testing"

Expand Down Expand Up @@ -34,7 +35,7 @@ func TestNonInteractive_WithoutYes_ReturnsError(t *testing.T) {
}
h.validated = true

err := h.Execute()
err := h.Execute(context.Background())
require.Error(t, err)
require.Contains(t, err.Error(), "missing required flags for --non-interactive mode")
}
Expand All @@ -60,7 +61,7 @@ func TestNonInteractive_WithYes_PassesGuard(t *testing.T) {
}
h.validated = true

err := h.Execute()
err := h.Execute(context.Background())
// Guard passes; error comes from WRC (no matching workflow), not the guard
require.Error(t, err)
require.NotContains(t, err.Error(), "missing required flags for --non-interactive mode")
Expand Down
4 changes: 2 additions & 2 deletions cmd/workflow/pause/registry_pause_strategy_private.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (a *privateRegistryPauseStrategy) Pause() error {

ui.Dim(fmt.Sprintf("Fetching workflow to pause... Name=%s", workflowName))

workflow, err := a.prc.GetWorkflowByName(workflowName)
workflow, err := a.prc.GetWorkflowByName(a.h.execCtx, workflowName)
if err != nil {
return fmt.Errorf("failed to get workflow: %w", err)
}
Expand All @@ -45,7 +45,7 @@ func (a *privateRegistryPauseStrategy) Pause() error {

ui.Dim(fmt.Sprintf("Processing pause for workflow ID %s...", workflow.WorkflowID))

result, err := a.prc.PauseWorkflowInRegistry(workflow.WorkflowID)
result, err := a.prc.PauseWorkflowInRegistry(a.h.execCtx, workflow.WorkflowID)
if err != nil {
return fmt.Errorf("failed to pause workflow in private registry: %w", err)
}
Expand Down
34 changes: 17 additions & 17 deletions internal/client/privateregistryclient/privateregistryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func (c *Client) SetServiceTimeout(timeout time.Duration) {
c.serviceTimeout = timeout
}

func (c *Client) CreateServiceContextWithTimeout() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), c.serviceTimeout) //nolint:gosec // G118 -- cancel is deferred by callers
func (c *Client) CreateServiceContextWithTimeout(parent context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(parent, c.serviceTimeout) //nolint:gosec // G118 -- cancel is deferred by callers
}

type OffchainWorkflow struct {
Expand Down Expand Up @@ -131,7 +131,7 @@ type GetOffchainWorkflowByNameResponse struct {
Workflow OffchainWorkflow `json:"workflow"`
}

func (c *Client) GetWorkflowByName(workflowName string) (OffchainWorkflow, error) {
func (c *Client) GetWorkflowByName(ctx context.Context, workflowName string) (OffchainWorkflow, error) {
if workflowName == "" {
return OffchainWorkflow{}, fmt.Errorf("workflowName is required")
}
Expand Down Expand Up @@ -165,10 +165,10 @@ query GetOffchainWorkflowByName($request: GetOffchainWorkflowByNameRequest!) {
GetOffchainWorkflowByName GetOffchainWorkflowByNameResponse `json:"getOffchainWorkflowByName"`
}

ctx, cancel := c.CreateServiceContextWithTimeout()
callCtx, cancel := c.CreateServiceContextWithTimeout(ctx)
defer cancel()

if err := c.graphql.Execute(ctx, req, &container); err != nil {
if err := c.graphql.Execute(callCtx, req, &container); err != nil {
return OffchainWorkflow{}, fmt.Errorf("get workflow by name in registry: %w", err)
}

Expand All @@ -178,7 +178,7 @@ query GetOffchainWorkflowByName($request: GetOffchainWorkflowByNameRequest!) {
return container.GetOffchainWorkflowByName.Workflow, nil
}

func (c *Client) UpsertWorkflowInRegistry(workflow OffchainWorkflowInput) (OffchainWorkflow, error) {
func (c *Client) UpsertWorkflowInRegistry(ctx context.Context, workflow OffchainWorkflowInput) (OffchainWorkflow, error) {
if err := validateUpsertWorkflowInput(workflow); err != nil {
return OffchainWorkflow{}, err
}
Expand Down Expand Up @@ -209,10 +209,10 @@ mutation UpsertOffchainWorkflow($request: UpsertOffchainWorkflowRequest!) {
UpsertOffchainWorkflow UpsertOffchainWorkflowResponse `json:"upsertOffchainWorkflow"`
}

ctx, cancel := c.CreateServiceContextWithTimeout()
callCtx, cancel := c.CreateServiceContextWithTimeout(ctx)
defer cancel()

if err := c.graphql.Execute(ctx, req, &container); err != nil {
if err := c.graphql.Execute(callCtx, req, &container); err != nil {
return OffchainWorkflow{}, fmt.Errorf("upsert workflow in registry: %w", err)
}

Expand All @@ -222,7 +222,7 @@ mutation UpsertOffchainWorkflow($request: UpsertOffchainWorkflowRequest!) {
return container.UpsertOffchainWorkflow.Workflow, nil
}

func (c *Client) PauseWorkflowInRegistry(workflowID string) (OffchainWorkflow, error) {
func (c *Client) PauseWorkflowInRegistry(ctx context.Context, workflowID string) (OffchainWorkflow, error) {
if workflowID == "" {
return OffchainWorkflow{}, fmt.Errorf("workflowId is required")
}
Expand Down Expand Up @@ -253,10 +253,10 @@ mutation PauseOffchainWorkflow($request: PauseOffchainWorkflowRequest!) {
PauseOffchainWorkflow PauseOffchainWorkflowResponse `json:"pauseOffchainWorkflow"`
}

ctx, cancel := c.CreateServiceContextWithTimeout()
callCtx, cancel := c.CreateServiceContextWithTimeout(ctx)
defer cancel()

if err := c.graphql.Execute(ctx, req, &container); err != nil {
if err := c.graphql.Execute(callCtx, req, &container); err != nil {
return OffchainWorkflow{}, fmt.Errorf("pause workflow in registry: %w", err)
}

Expand All @@ -266,7 +266,7 @@ mutation PauseOffchainWorkflow($request: PauseOffchainWorkflowRequest!) {
return container.PauseOffchainWorkflow.Workflow, nil
}

func (c *Client) ActivateWorkflowInRegistry(workflowID string) (OffchainWorkflow, error) {
func (c *Client) ActivateWorkflowInRegistry(ctx context.Context, workflowID string) (OffchainWorkflow, error) {
if workflowID == "" {
return OffchainWorkflow{}, fmt.Errorf("workflowId is required")
}
Expand Down Expand Up @@ -297,10 +297,10 @@ mutation ActivateOffchainWorkflow($request: ActivateOffchainWorkflowRequest!) {
ActivateOffchainWorkflow ActivateOffchainWorkflowResponse `json:"activateOffchainWorkflow"`
}

ctx, cancel := c.CreateServiceContextWithTimeout()
callCtx, cancel := c.CreateServiceContextWithTimeout(ctx)
defer cancel()

if err := c.graphql.Execute(ctx, req, &container); err != nil {
if err := c.graphql.Execute(callCtx, req, &container); err != nil {
return OffchainWorkflow{}, fmt.Errorf("activate workflow in registry: %w", err)
}

Expand All @@ -310,7 +310,7 @@ mutation ActivateOffchainWorkflow($request: ActivateOffchainWorkflowRequest!) {
return container.ActivateOffchainWorkflow.Workflow, nil
}

func (c *Client) DeleteWorkflowInRegistry(workflowID string) (string, error) {
func (c *Client) DeleteWorkflowInRegistry(ctx context.Context, workflowID string) (string, error) {
if workflowID == "" {
return "", fmt.Errorf("workflowId is required")
}
Expand All @@ -329,10 +329,10 @@ mutation DeleteOffchainWorkflow($request: DeleteOffchainWorkflowRequest!) {
DeleteOffchainWorkflow DeleteOffchainWorkflowResponse `json:"deleteOffchainWorkflow"`
}

ctx, cancel := c.CreateServiceContextWithTimeout()
callCtx, cancel := c.CreateServiceContextWithTimeout(ctx)
defer cancel()

if err := c.graphql.Execute(ctx, req, &container); err != nil {
if err := c.graphql.Execute(callCtx, req, &container); err != nil {
return "", fmt.Errorf("delete workflow in registry: %w", err)
}

Expand Down
Loading
Loading