diff --git a/internal/flink/command_statement_create.go b/internal/flink/command_statement_create.go index eda49f8606..31052aa5ee 100644 --- a/internal/flink/command_statement_create.go +++ b/internal/flink/command_statement_create.go @@ -1,7 +1,6 @@ package flink import ( - "fmt" "time" "github.com/spf13/cobra" @@ -16,7 +15,7 @@ import ( "github.com/confluentinc/cli/v4/pkg/flink/types" "github.com/confluentinc/cli/v4/pkg/output" "github.com/confluentinc/cli/v4/pkg/properties" - "github.com/confluentinc/cli/v4/pkg/retry" + "github.com/confluentinc/cli/v4/pkg/wait" ) func (c *command) newStatementCreateCommand() *cobra.Command { @@ -41,7 +40,8 @@ func (c *command) newStatementCreateCommand() *cobra.Command { c.addComputePoolFlag(cmd) pcmd.AddServiceAccountFlag(cmd, c.AuthenticatedCLICommand) c.addDatabaseFlag(cmd) - cmd.Flags().Bool("wait", false, "Block until the statement is running or has failed.") + pcmd.AddWaitFlag(cmd) + pcmd.AddWaitTimeoutFlag(cmd, time.Minute) cmd.Flags().StringSlice("property", []string{}, "A mechanism to pass properties in the form key=value when creating a Flink statement.") pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) pcmd.AddContextFlag(cmd, c.CLICommand) @@ -151,26 +151,28 @@ func (c *command) statementCreate(cmd *cobra.Command, args []string) error { if err != nil { return err } - wait, err := cmd.Flags().GetBool("wait") + shouldWait, err := cmd.Flags().GetBool("wait") if err != nil { return err } - if wait { - err := retry.Retry(time.Second, time.Minute, func() error { - statement, err = client.GetStatement(environmentId, name, c.Context.LastOrgId) - if err != nil { - return err - } - - if statement.Status.GetPhase() == "PENDING" { - return fmt.Errorf(`statement phase is "%s"`, statement.Status.GetPhase()) - } - - return nil - }) + if shouldWait { + timeout, err := cmd.Flags().GetDuration("wait-timeout") if err != nil { return err } + statement, err = wait.Poll(cmd.Context(), wait.Options[flinkgatewayv1.SqlV1Statement]{ + Fetch: func() (flinkgatewayv1.SqlV1Statement, error) { + return client.GetStatement(environmentId, name, c.Context.LastOrgId) + }, + IsTerminal: func(s flinkgatewayv1.SqlV1Statement) bool { + return s.Status.GetPhase() != "PENDING" + }, + Tick: time.Second, + Timeout: timeout, + }) + if err != nil { + return errors.NewErrorWithSuggestions(err.Error(), "Increase `--wait-timeout` or omit `--wait`.") + } } table := output.NewTable(cmd) diff --git a/internal/flink/command_statement_create_onprem.go b/internal/flink/command_statement_create_onprem.go index d6405eff7a..b27bb1194f 100644 --- a/internal/flink/command_statement_create_onprem.go +++ b/internal/flink/command_statement_create_onprem.go @@ -16,7 +16,7 @@ import ( "github.com/confluentinc/cli/v4/pkg/errors" "github.com/confluentinc/cli/v4/pkg/flink/types" "github.com/confluentinc/cli/v4/pkg/output" - "github.com/confluentinc/cli/v4/pkg/retry" + "github.com/confluentinc/cli/v4/pkg/wait" ) func (c *command) newStatementCreateCommandOnPrem() *cobra.Command { @@ -36,7 +36,8 @@ func (c *command) newStatementCreateCommandOnPrem() *cobra.Command { cmd.Flags().String("catalog", "", "The name of the default catalog.") cmd.Flags().String("database", "", "The name of the default database.") cmd.Flags().String("flink-configuration", "", "The file path to hold the Flink configuration for the statement.") - cmd.Flags().Bool("wait", false, "Boolean flag to block until the statement is running or has failed.") + pcmd.AddWaitFlag(cmd) + pcmd.AddWaitTimeoutFlag(cmd, time.Minute) addCmfFlagSet(cmd) pcmd.AddOutputFlag(cmd) @@ -114,7 +115,7 @@ func (c *command) statementCreateOnPrem(cmd *cobra.Command, args []string) error Stopped: cmfsdk.PtrBool(false), }, } - wait, err := cmd.Flags().GetBool("wait") + shouldWait, err := cmd.Flags().GetBool("wait") if err != nil { return err } @@ -124,22 +125,24 @@ func (c *command) statementCreateOnPrem(cmd *cobra.Command, args []string) error return err } - if wait { - err := retry.Retry(time.Second*2, time.Minute, func() error { - polledStatement, err := client.GetStatement(c.createContext(), environment, name) - if err != nil { - return err - } - if polledStatement.GetStatus().Phase == "PENDING" { - return fmt.Errorf(`statement phase is "%s"`, polledStatement.GetStatus().Phase) - } - // Update the finalStatement with the completed state - finalStatement = polledStatement - return nil - }) + if shouldWait { + timeout, err := cmd.Flags().GetDuration("wait-timeout") if err != nil { return err } + finalStatement, err = wait.Poll(cmd.Context(), wait.Options[cmfsdk.Statement]{ + Fetch: func() (cmfsdk.Statement, error) { + return client.GetStatement(c.createContext(), environment, name) + }, + IsTerminal: func(s cmfsdk.Statement) bool { + return s.GetStatus().Phase != "PENDING" + }, + Tick: 2 * time.Second, + Timeout: timeout, + }) + if err != nil { + return errors.NewErrorWithSuggestions(err.Error(), "Increase `--wait-timeout` or omit `--wait`.") + } } if output.GetFormat(cmd) == output.Human { diff --git a/pkg/cmd/flags.go b/pkg/cmd/flags.go index 1c10e9bd21..63a5c7a603 100644 --- a/pkg/cmd/flags.go +++ b/pkg/cmd/flags.go @@ -3,6 +3,7 @@ package cmd import ( "fmt" "strings" + "time" "github.com/spf13/cobra" @@ -212,6 +213,14 @@ func AddDryRunFlag(cmd *cobra.Command) { cmd.Flags().Bool("dry-run", false, "Run the command without committing changes.") } +func AddWaitFlag(cmd *cobra.Command) { + cmd.Flags().Bool("wait", false, "Block until the resource reaches a terminal state.") +} + +func AddWaitTimeoutFlag(cmd *cobra.Command, defaultTimeout time.Duration) { + cmd.Flags().Duration("wait-timeout", defaultTimeout, "Maximum time to wait when --wait is set.") +} + func AddKsqlClusterFlag(cmd *cobra.Command, c *AuthenticatedCLICommand) { cmd.Flags().String("ksql-cluster", "", "KSQL cluster for the pipeline.") RegisterFlagCompletionFunc(cmd, "ksql-cluster", func(cmd *cobra.Command, args []string) []string { diff --git a/pkg/wait/wait.go b/pkg/wait/wait.go new file mode 100644 index 0000000000..da200c1f90 --- /dev/null +++ b/pkg/wait/wait.go @@ -0,0 +1,65 @@ +package wait + +import ( + "context" + "errors" + "time" +) + +// Options describes a single Poll invocation. T is the polled resource type. +type Options[T any] struct { + Fetch func() (T, error) + IsTerminal func(T) bool + IsFailed func(T) bool + Tick time.Duration + Timeout time.Duration +} + +var ( + ErrTimeout = errors.New("wait timed out") + ErrFailed = errors.New("resource entered failed state") +) + +// Poll calls opts.Fetch immediately, then every opts.Tick until IsFailed +// returns true (ErrFailed), IsTerminal returns true (success), opts.Timeout +// elapses (ErrTimeout), ctx is cancelled (ctx.Err()), or Fetch returns a +// non-nil error. Always returns the most recent successfully-fetched T. +func Poll[T any](ctx context.Context, opts Options[T]) (T, error) { + var last T + + check := func() (T, bool, error) { + v, err := opts.Fetch() + if err != nil { + return last, true, err + } + last = v + if opts.IsFailed != nil && opts.IsFailed(v) { + return last, true, ErrFailed + } + if opts.IsTerminal(v) { + return last, true, nil + } + return last, false, nil + } + + if v, done, err := check(); done { + return v, err + } + + ticker := time.NewTicker(opts.Tick) + defer ticker.Stop() + deadline := time.After(opts.Timeout) + + for { + select { + case <-ticker.C: + if v, done, err := check(); done { + return v, err + } + case <-deadline: + return last, ErrTimeout + case <-ctx.Done(): + return last, ctx.Err() + } + } +} diff --git a/pkg/wait/wait_test.go b/pkg/wait/wait_test.go new file mode 100644 index 0000000000..66c55cb8e4 --- /dev/null +++ b/pkg/wait/wait_test.go @@ -0,0 +1,132 @@ +package wait + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type fakeResource struct { + phase string +} + +func TestPoll_ImmediateReady(t *testing.T) { + calls := 0 + v, err := Poll(context.Background(), Options[fakeResource]{ + Fetch: func() (fakeResource, error) { + calls++ + return fakeResource{phase: "READY"}, nil + }, + IsTerminal: func(r fakeResource) bool { return r.phase == "READY" }, + Tick: time.Millisecond, + Timeout: time.Second, + }) + require.NoError(t, err) + require.Equal(t, "READY", v.phase) + require.Equal(t, 1, calls) +} + +func TestPoll_EventuallyReady(t *testing.T) { + calls := 0 + v, err := Poll(context.Background(), Options[fakeResource]{ + Fetch: func() (fakeResource, error) { + calls++ + if calls < 3 { + return fakeResource{phase: "PENDING"}, nil + } + return fakeResource{phase: "READY"}, nil + }, + IsTerminal: func(r fakeResource) bool { return r.phase != "PENDING" }, + Tick: time.Nanosecond, + Timeout: time.Second, + }) + require.NoError(t, err) + require.Equal(t, "READY", v.phase) + require.Equal(t, 3, calls) +} + +func TestPoll_Failed(t *testing.T) { + calls := 0 + v, err := Poll(context.Background(), Options[fakeResource]{ + Fetch: func() (fakeResource, error) { + calls++ + if calls == 1 { + return fakeResource{phase: "PENDING"}, nil + } + return fakeResource{phase: "FAILED"}, nil + }, + IsTerminal: func(r fakeResource) bool { return r.phase == "READY" || r.phase == "FAILED" }, + IsFailed: func(r fakeResource) bool { return r.phase == "FAILED" }, + Tick: time.Nanosecond, + Timeout: time.Second, + }) + require.ErrorIs(t, err, ErrFailed) + require.Equal(t, "FAILED", v.phase) +} + +func TestPoll_Timeout(t *testing.T) { + v, err := Poll(context.Background(), Options[fakeResource]{ + Fetch: func() (fakeResource, error) { + return fakeResource{phase: "PENDING"}, nil + }, + IsTerminal: func(r fakeResource) bool { return r.phase != "PENDING" }, + Tick: time.Millisecond, + Timeout: 5 * time.Millisecond, + }) + require.ErrorIs(t, err, ErrTimeout) + require.Equal(t, "PENDING", v.phase) +} + +func TestPoll_FetchError(t *testing.T) { + calls := 0 + fetchErr := fmt.Errorf("transient fetch failure") + v, err := Poll(context.Background(), Options[fakeResource]{ + Fetch: func() (fakeResource, error) { + calls++ + if calls == 1 { + return fakeResource{phase: "PENDING"}, nil + } + return fakeResource{}, fetchErr + }, + IsTerminal: func(r fakeResource) bool { return r.phase != "PENDING" }, + Tick: time.Nanosecond, + Timeout: time.Second, + }) + require.ErrorIs(t, err, fetchErr) + require.Equal(t, "PENDING", v.phase) +} + +func TestPoll_FetchErrorOnFirstCall(t *testing.T) { + fetchErr := fmt.Errorf("initial fetch failure") + v, err := Poll(context.Background(), Options[fakeResource]{ + Fetch: func() (fakeResource, error) { + return fakeResource{}, fetchErr + }, + IsTerminal: func(r fakeResource) bool { return true }, + Tick: time.Nanosecond, + Timeout: time.Second, + }) + require.ErrorIs(t, err, fetchErr) + require.Equal(t, fakeResource{}, v) +} + +func TestPoll_ContextCancelled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(5 * time.Millisecond) + cancel() + }() + _, err := Poll(ctx, Options[fakeResource]{ + Fetch: func() (fakeResource, error) { + return fakeResource{phase: "PENDING"}, nil + }, + IsTerminal: func(r fakeResource) bool { return r.phase != "PENDING" }, + Tick: time.Millisecond, + Timeout: time.Second, + }) + require.True(t, errors.Is(err, context.Canceled)) +} diff --git a/test/fixtures/output/flink/statement/create-help-onprem.golden b/test/fixtures/output/flink/statement/create-help-onprem.golden index 8755604c1c..d24beac0b8 100644 --- a/test/fixtures/output/flink/statement/create-help-onprem.golden +++ b/test/fixtures/output/flink/statement/create-help-onprem.golden @@ -11,7 +11,8 @@ Flags: --catalog string The name of the default catalog. --database string The name of the default database. --flink-configuration string The file path to hold the Flink configuration for the statement. - --wait Boolean flag to block until the statement is running or has failed. + --wait Block until the resource reaches a terminal state. + --wait-timeout duration Maximum time to wait when --wait is set. (default 1m0s) --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. diff --git a/test/fixtures/output/flink/statement/create-help.golden b/test/fixtures/output/flink/statement/create-help.golden index da60ffb363..eb0cd9c376 100644 --- a/test/fixtures/output/flink/statement/create-help.golden +++ b/test/fixtures/output/flink/statement/create-help.golden @@ -17,7 +17,8 @@ Flags: --compute-pool string Flink compute pool ID. --service-account string Service account ID. --database string The database which will be used as the default database. When using Kafka, this is the cluster ID. - --wait Block until the statement is running or has failed. + --wait Block until the resource reaches a terminal state. + --wait-timeout duration Maximum time to wait when --wait is set. (default 1m0s) --property strings A mechanism to pass properties in the form key=value when creating a Flink statement. --environment string Environment ID. --context string CLI context name. diff --git a/test/fixtures/output/flink/statement/create-missing-compute-pool-failure.golden b/test/fixtures/output/flink/statement/create-missing-compute-pool-failure.golden index 637ec67841..9622b9bd85 100644 --- a/test/fixtures/output/flink/statement/create-missing-compute-pool-failure.golden +++ b/test/fixtures/output/flink/statement/create-missing-compute-pool-failure.golden @@ -10,7 +10,8 @@ Flags: --catalog string The name of the default catalog. --database string The name of the default database. --flink-configuration string The file path to hold the Flink configuration for the statement. - --wait Boolean flag to block until the statement is running or has failed. + --wait Block until the resource reaches a terminal state. + --wait-timeout duration Maximum time to wait when --wait is set. (default 1m0s) --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. diff --git a/test/fixtures/output/flink/statement/create-missing-sql-failure.golden b/test/fixtures/output/flink/statement/create-missing-sql-failure.golden index 6fa346c882..be9fa62ae0 100644 --- a/test/fixtures/output/flink/statement/create-missing-sql-failure.golden +++ b/test/fixtures/output/flink/statement/create-missing-sql-failure.golden @@ -10,7 +10,8 @@ Flags: --catalog string The name of the default catalog. --database string The name of the default database. --flink-configuration string The file path to hold the Flink configuration for the statement. - --wait Boolean flag to block until the statement is running or has failed. + --wait Block until the resource reaches a terminal state. + --wait-timeout duration Maximum time to wait when --wait is set. (default 1m0s) --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. diff --git a/test/fixtures/output/flink/statement/create-wait-timeout.golden b/test/fixtures/output/flink/statement/create-wait-timeout.golden new file mode 100644 index 0000000000..db7363e8aa --- /dev/null +++ b/test/fixtures/output/flink/statement/create-wait-timeout.golden @@ -0,0 +1,4 @@ +Error: wait timed out + +Suggestions: + Increase `--wait-timeout` or omit `--wait`. diff --git a/test/flink_test.go b/test/flink_test.go index a4f13ca38d..9431e4b787 100644 --- a/test/flink_test.go +++ b/test/flink_test.go @@ -478,6 +478,7 @@ func (s *CLITestSuite) TestFlinkStatementCreate() { {args: `flink statement create my-statement-2 --sql "INSERT * INTO table;" --cloud aws --region eu-west-1 --service-account sa-123456`, fixture: "flink/statement/create-without-compute-pool.golden"}, {args: `flink statement create my-statement --sql "INSERT * INTO table;" --compute-pool lfcp-123456`, fixture: "flink/statement/create-service-account-warning.golden"}, {args: `flink statement create my-statement --sql "INSERT * INTO table;" --compute-pool lfcp-123456 --service-account sa-123456 --wait`, fixture: "flink/statement/create-wait.golden"}, + {args: `flink statement create pending-statement --sql "INSERT * INTO table;" --compute-pool lfcp-123456 --service-account sa-123456 --wait --wait-timeout 100ms`, fixture: "flink/statement/create-wait-timeout.golden", exitCode: 1}, {args: `flink statement create --sql "INSERT * INTO table;" --compute-pool lfcp-123456 --service-account sa-123456 -o yaml`, fixture: "flink/statement/create-no-name-yaml.golden", regex: true}, {args: `flink statement create my-statement --sql "INSERT * INTO table;" --compute-pool lfcp-123456 --service-account sa-123456 --property property1=value1,property2=value2`, fixture: "flink/statement/create-with-properties.golden"}, {args: `flink statement create my-statement --sql "INSERT * INTO table;" --compute-pool lfcp-123456 --service-account sa-123456 --property invalid-format,property1=value1`, fixture: "flink/statement/create-invalid-property.golden", exitCode: 1}, diff --git a/test/test-server/flink_gateway_router.go b/test/test-server/flink_gateway_router.go index affabe13a4..17f5db54e8 100644 --- a/test/test-server/flink_gateway_router.go +++ b/test/test-server/flink_gateway_router.go @@ -226,6 +226,12 @@ func handleSqlEnvironmentsEnvironmentStatementsStatement(t *testing.T) http.Hand func handleStatementGet(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + phase := "COMPLETED" + detail := "SQL statement is completed" + if mux.Vars(r)["statement"] == "pending-statement" { + phase = "PENDING" + detail = "SQL statement is pending" + } statement := flinkgatewayv1.SqlV1Statement{ Name: flinkgatewayv1.PtrString(mux.Vars(r)["statement"]), Spec: &flinkgatewayv1.SqlV1StatementSpec{ @@ -238,8 +244,8 @@ func handleStatementGet(t *testing.T) http.HandlerFunc { Principal: flinkgatewayv1.PtrString(validFlinkStatementPrincipalId), }, Status: &flinkgatewayv1.SqlV1StatementStatus{ - Phase: "COMPLETED", - Detail: flinkgatewayv1.PtrString("SQL statement is completed"), + Phase: phase, + Detail: flinkgatewayv1.PtrString(detail), LatestOffsets: &map[string]string{ "customers_source": "partition:0,offset:9223372036854775808", },