-
Notifications
You must be signed in to change notification settings - Fork 29
[APIE-1040] Add generic --wait framework; refactor Flink statement create #3361
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,7 @@ | |
| "github.com/confluentinc/cli/v4/pkg/errors" | ||
| "github.com/confluentinc/cli/v4/pkg/flink/types" | ||
| "github.com/confluentinc/cli/v4/pkg/output" | ||
| "github.com/confluentinc/cli/v4/pkg/retry" | ||
| "github.com/confluentinc/cli/v4/pkg/wait" | ||
| ) | ||
|
|
||
| func (c *command) newStatementCreateCommandOnPrem() *cobra.Command { | ||
|
|
@@ -36,7 +36,8 @@ | |
| cmd.Flags().String("catalog", "", "The name of the default catalog.") | ||
| cmd.Flags().String("database", "", "The name of the default database.") | ||
| cmd.Flags().String("flink-configuration", "", "The file path to hold the Flink configuration for the statement.") | ||
| cmd.Flags().Bool("wait", false, "Boolean flag to block until the statement is running or has failed.") | ||
| pcmd.AddWaitFlag(cmd) | ||
| pcmd.AddWaitTimeoutFlag(cmd, time.Minute) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TF doesn't have the user configurable wait timeout, as it's purely handled inside an internal function with hardcoded value, since we want to have unified UX for CLI and TF users, when designing please take this into consideration, we need to either alter the existing TF behavior or let CLI follow the CLI behavior. |
||
| addCmfFlagSet(cmd) | ||
| pcmd.AddOutputFlag(cmd) | ||
|
|
||
|
|
@@ -47,7 +48,7 @@ | |
| return cmd | ||
| } | ||
|
|
||
| func (c *command) statementCreateOnPrem(cmd *cobra.Command, args []string) error { | ||
|
Check failure on line 51 in internal/flink/command_statement_create_onprem.go
|
||
| // Flink statement name can be automatically generated or provided by the user | ||
| name := types.GenerateStatementNameForOnPrem() | ||
| if len(args) == 1 { | ||
|
|
@@ -114,7 +115,7 @@ | |
| Stopped: cmfsdk.PtrBool(false), | ||
| }, | ||
| } | ||
| wait, err := cmd.Flags().GetBool("wait") | ||
| shouldWait, err := cmd.Flags().GetBool("wait") | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
@@ -124,22 +125,24 @@ | |
| return err | ||
| } | ||
|
|
||
| if wait { | ||
| err := retry.Retry(time.Second*2, time.Minute, func() error { | ||
| polledStatement, err := client.GetStatement(c.createContext(), environment, name) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if polledStatement.GetStatus().Phase == "PENDING" { | ||
| return fmt.Errorf(`statement phase is "%s"`, polledStatement.GetStatus().Phase) | ||
| } | ||
| // Update the finalStatement with the completed state | ||
| finalStatement = polledStatement | ||
| return nil | ||
| }) | ||
| if shouldWait { | ||
| timeout, err := cmd.Flags().GetDuration("wait-timeout") | ||
| if err != nil { | ||
| return err | ||
| } | ||
| finalStatement, err = wait.Poll(cmd.Context(), wait.Options[cmfsdk.Statement]{ | ||
|
Comment on lines
+128
to
+133
|
||
| Fetch: func() (cmfsdk.Statement, error) { | ||
| return client.GetStatement(c.createContext(), environment, name) | ||
| }, | ||
| IsTerminal: func(s cmfsdk.Statement) bool { | ||
| return s.GetStatus().Phase != "PENDING" | ||
| }, | ||
| Tick: 2 * time.Second, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Also, please investigate on how TF handles the polling interval and timeout, is it purely manual setting, is it semi-manual setting with default, is it something we can derive from OpenAPI spec, is it something we can write to registry.yaml as override value, once you have a proposal, please sync with Kostya Linou (@linouk23) for the unified implementation. |
||
| Timeout: timeout, | ||
| }) | ||
| if err != nil { | ||
| return errors.NewErrorWithSuggestions(err.Error(), "Increase `--wait-timeout` or omit `--wait`.") | ||
|
Comment on lines
+143
to
+144
|
||
| } | ||
| } | ||
|
|
||
| if output.GetFormat(cmd) == output.Human { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| package wait | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "time" | ||
| ) | ||
|
|
||
| // Options describes a single Poll invocation. T is the polled resource type. | ||
| type Options[T any] struct { | ||
| Fetch func() (T, error) | ||
| IsTerminal func(T) bool | ||
| IsFailed func(T) bool | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| Tick time.Duration | ||
| Timeout time.Duration | ||
| } | ||
|
|
||
| var ( | ||
| ErrTimeout = errors.New("wait timed out") | ||
| ErrFailed = errors.New("resource entered failed state") | ||
| ) | ||
|
|
||
| // Poll calls opts.Fetch immediately, then every opts.Tick until IsFailed | ||
| // returns true (ErrFailed), IsTerminal returns true (success), opts.Timeout | ||
| // elapses (ErrTimeout), ctx is cancelled (ctx.Err()), or Fetch returns a | ||
| // non-nil error. Always returns the most recent successfully-fetched T. | ||
| func Poll[T any](ctx context.Context, opts Options[T]) (T, error) { | ||
| var last T | ||
|
|
||
| check := func() (T, bool, error) { | ||
| v, err := opts.Fetch() | ||
| if err != nil { | ||
| return last, true, err | ||
|
Comment on lines
+31
to
+33
|
||
| } | ||
| last = v | ||
| if opts.IsFailed != nil && opts.IsFailed(v) { | ||
| return last, true, ErrFailed | ||
| } | ||
| if opts.IsTerminal(v) { | ||
| return last, true, nil | ||
| } | ||
| return last, false, nil | ||
| } | ||
|
|
||
| if v, done, err := check(); done { | ||
| return v, err | ||
| } | ||
|
|
||
| ticker := time.NewTicker(opts.Tick) | ||
| defer ticker.Stop() | ||
| deadline := time.After(opts.Timeout) | ||
|
|
||
| for { | ||
| select { | ||
| case <-ticker.C: | ||
| if v, done, err := check(); done { | ||
| return v, err | ||
| } | ||
| case <-deadline: | ||
| return last, ErrTimeout | ||
| case <-ctx.Done(): | ||
| return last, ctx.Err() | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,132 @@ | ||
| package wait | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| type fakeResource struct { | ||
| phase string | ||
| } | ||
|
|
||
| func TestPoll_ImmediateReady(t *testing.T) { | ||
|
Check warning on line 17 in pkg/wait/wait_test.go
|
||
| calls := 0 | ||
| v, err := Poll(context.Background(), Options[fakeResource]{ | ||
| Fetch: func() (fakeResource, error) { | ||
| calls++ | ||
| return fakeResource{phase: "READY"}, nil | ||
| }, | ||
| IsTerminal: func(r fakeResource) bool { return r.phase == "READY" }, | ||
| Tick: time.Millisecond, | ||
| Timeout: time.Second, | ||
| }) | ||
| require.NoError(t, err) | ||
| require.Equal(t, "READY", v.phase) | ||
| require.Equal(t, 1, calls) | ||
| } | ||
|
|
||
| func TestPoll_EventuallyReady(t *testing.T) { | ||
|
Check warning on line 33 in pkg/wait/wait_test.go
|
||
| calls := 0 | ||
| v, err := Poll(context.Background(), Options[fakeResource]{ | ||
| Fetch: func() (fakeResource, error) { | ||
| calls++ | ||
| if calls < 3 { | ||
| return fakeResource{phase: "PENDING"}, nil | ||
| } | ||
| return fakeResource{phase: "READY"}, nil | ||
| }, | ||
| IsTerminal: func(r fakeResource) bool { return r.phase != "PENDING" }, | ||
| Tick: time.Nanosecond, | ||
| Timeout: time.Second, | ||
| }) | ||
| require.NoError(t, err) | ||
| require.Equal(t, "READY", v.phase) | ||
| require.Equal(t, 3, calls) | ||
| } | ||
|
|
||
| func TestPoll_Failed(t *testing.T) { | ||
|
Check warning on line 52 in pkg/wait/wait_test.go
|
||
| calls := 0 | ||
| v, err := Poll(context.Background(), Options[fakeResource]{ | ||
| Fetch: func() (fakeResource, error) { | ||
| calls++ | ||
| if calls == 1 { | ||
| return fakeResource{phase: "PENDING"}, nil | ||
| } | ||
| return fakeResource{phase: "FAILED"}, nil | ||
| }, | ||
| IsTerminal: func(r fakeResource) bool { return r.phase == "READY" || r.phase == "FAILED" }, | ||
| IsFailed: func(r fakeResource) bool { return r.phase == "FAILED" }, | ||
| Tick: time.Nanosecond, | ||
| Timeout: time.Second, | ||
| }) | ||
| require.ErrorIs(t, err, ErrFailed) | ||
| require.Equal(t, "FAILED", v.phase) | ||
| } | ||
|
|
||
| func TestPoll_Timeout(t *testing.T) { | ||
|
Check warning on line 71 in pkg/wait/wait_test.go
|
||
| v, err := Poll(context.Background(), Options[fakeResource]{ | ||
| Fetch: func() (fakeResource, error) { | ||
| return fakeResource{phase: "PENDING"}, nil | ||
| }, | ||
| IsTerminal: func(r fakeResource) bool { return r.phase != "PENDING" }, | ||
| Tick: time.Millisecond, | ||
| Timeout: 5 * time.Millisecond, | ||
| }) | ||
| require.ErrorIs(t, err, ErrTimeout) | ||
| require.Equal(t, "PENDING", v.phase) | ||
| } | ||
|
|
||
| func TestPoll_FetchError(t *testing.T) { | ||
|
Check warning on line 84 in pkg/wait/wait_test.go
|
||
| calls := 0 | ||
| fetchErr := fmt.Errorf("transient fetch failure") | ||
| v, err := Poll(context.Background(), Options[fakeResource]{ | ||
| Fetch: func() (fakeResource, error) { | ||
| calls++ | ||
| if calls == 1 { | ||
| return fakeResource{phase: "PENDING"}, nil | ||
| } | ||
| return fakeResource{}, fetchErr | ||
| }, | ||
| IsTerminal: func(r fakeResource) bool { return r.phase != "PENDING" }, | ||
| Tick: time.Nanosecond, | ||
| Timeout: time.Second, | ||
| }) | ||
| require.ErrorIs(t, err, fetchErr) | ||
| require.Equal(t, "PENDING", v.phase) | ||
| } | ||
|
|
||
| func TestPoll_FetchErrorOnFirstCall(t *testing.T) { | ||
|
Check warning on line 103 in pkg/wait/wait_test.go
|
||
| fetchErr := fmt.Errorf("initial fetch failure") | ||
| v, err := Poll(context.Background(), Options[fakeResource]{ | ||
| Fetch: func() (fakeResource, error) { | ||
| return fakeResource{}, fetchErr | ||
| }, | ||
| IsTerminal: func(r fakeResource) bool { return true }, | ||
| Tick: time.Nanosecond, | ||
| Timeout: time.Second, | ||
| }) | ||
| require.ErrorIs(t, err, fetchErr) | ||
| require.Equal(t, fakeResource{}, v) | ||
| } | ||
|
|
||
| func TestPoll_ContextCancelled(t *testing.T) { | ||
|
Check warning on line 117 in pkg/wait/wait_test.go
|
||
| ctx, cancel := context.WithCancel(context.Background()) | ||
|
Check warning on line 118 in pkg/wait/wait_test.go
|
||
| go func() { | ||
| time.Sleep(5 * time.Millisecond) | ||
| cancel() | ||
| }() | ||
| _, err := Poll(ctx, Options[fakeResource]{ | ||
| Fetch: func() (fakeResource, error) { | ||
| return fakeResource{phase: "PENDING"}, nil | ||
| }, | ||
| IsTerminal: func(r fakeResource) bool { return r.phase != "PENDING" }, | ||
| Tick: time.Millisecond, | ||
| Timeout: time.Second, | ||
| }) | ||
| require.True(t, errors.Is(err, context.Canceled)) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"PENDING"may not be the only intermediate status, there are"FAILING"and"STOPPING"for Flink statement:https://docs.confluent.io/cloud/current/ccloud/get-sqlv-1-statement/?_highlight=statement
If we think even one level higher, is there a way we can retrieve information from OpenAPI spec to determine which states are pending states, and which states are terminal states, this is the question we need to give answer to ourselves.
For example, a naive idea would be, if the status ends with
"ING", we need to wait, otherwise we should terminate.https://github.com/confluentinc/api/blob/master/flink-gateway/v1/openapi.yaml#L3313
Can you check if such idea can be applied to other async resources?