Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions internal/flink/command_statement_create.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package flink

import (
"fmt"
"time"

"github.com/spf13/cobra"
Expand All @@ -16,7 +15,7 @@
"github.com/confluentinc/cli/v4/pkg/flink/types"
"github.com/confluentinc/cli/v4/pkg/output"
"github.com/confluentinc/cli/v4/pkg/properties"
"github.com/confluentinc/cli/v4/pkg/retry"
"github.com/confluentinc/cli/v4/pkg/wait"
)

func (c *command) newStatementCreateCommand() *cobra.Command {
Expand All @@ -41,7 +40,8 @@
c.addComputePoolFlag(cmd)
pcmd.AddServiceAccountFlag(cmd, c.AuthenticatedCLICommand)
c.addDatabaseFlag(cmd)
cmd.Flags().Bool("wait", false, "Block until the statement is running or has failed.")
pcmd.AddWaitFlag(cmd)
pcmd.AddWaitTimeoutFlag(cmd, time.Minute)
cmd.Flags().StringSlice("property", []string{}, "A mechanism to pass properties in the form key=value when creating a Flink statement.")
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddContextFlag(cmd, c.CLICommand)
Expand All @@ -54,7 +54,7 @@
return cmd
}

func (c *command) statementCreate(cmd *cobra.Command, args []string) error {

Check failure on line 57 in internal/flink/command_statement_create.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 33 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3361&issues=99269462-b4d0-4722-bcf2-de05cfa8d0ee&open=99269462-b4d0-4722-bcf2-de05cfa8d0ee
environmentId, err := c.Context.EnvironmentId()
if err != nil {
return err
Expand Down Expand Up @@ -151,26 +151,28 @@
if err != nil {
return err
}
wait, err := cmd.Flags().GetBool("wait")
shouldWait, err := cmd.Flags().GetBool("wait")
if err != nil {
return err
}
if wait {
err := retry.Retry(time.Second, time.Minute, func() error {
statement, err = client.GetStatement(environmentId, name, c.Context.LastOrgId)
if err != nil {
return err
}

if statement.Status.GetPhase() == "PENDING" {
return fmt.Errorf(`statement phase is "%s"`, statement.Status.GetPhase())
}

return nil
})
if shouldWait {
timeout, err := cmd.Flags().GetDuration("wait-timeout")
if err != nil {
return err
}
statement, err = wait.Poll(cmd.Context(), wait.Options[flinkgatewayv1.SqlV1Statement]{
Fetch: func() (flinkgatewayv1.SqlV1Statement, error) {
return client.GetStatement(environmentId, name, c.Context.LastOrgId)
},
IsTerminal: func(s flinkgatewayv1.SqlV1Statement) bool {
return s.Status.GetPhase() != "PENDING"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"PENDING" may not be the only intermediate status, there are "FAILING" and "STOPPING" for Flink statement:
https://docs.confluent.io/cloud/current/ccloud/get-sqlv-1-statement/?_highlight=statement

If we think even one level higher, is there a way we can retrieve information from OpenAPI spec to determine which states are pending states, and which states are terminal states, this is the question we need to give answer to ourselves.

For example, a naive idea would be, if the status ends with "ING", we need to wait, otherwise we should terminate.
https://github.com/confluentinc/api/blob/master/flink-gateway/v1/openapi.yaml#L3313

Can you check if such idea can be applied to other async resources?

},
Tick: time.Second,
Timeout: timeout,
})
if err != nil {
return errors.NewErrorWithSuggestions(err.Error(), "Increase `--wait-timeout` or omit `--wait`.")
Comment on lines +173 to +174
}
}

table := output.NewTable(cmd)
Expand Down
35 changes: 19 additions & 16 deletions internal/flink/command_statement_create_onprem.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"github.com/confluentinc/cli/v4/pkg/errors"
"github.com/confluentinc/cli/v4/pkg/flink/types"
"github.com/confluentinc/cli/v4/pkg/output"
"github.com/confluentinc/cli/v4/pkg/retry"
"github.com/confluentinc/cli/v4/pkg/wait"
)

func (c *command) newStatementCreateCommandOnPrem() *cobra.Command {
Expand All @@ -36,7 +36,8 @@
cmd.Flags().String("catalog", "", "The name of the default catalog.")
cmd.Flags().String("database", "", "The name of the default database.")
cmd.Flags().String("flink-configuration", "", "The file path to hold the Flink configuration for the statement.")
cmd.Flags().Bool("wait", false, "Boolean flag to block until the statement is running or has failed.")
pcmd.AddWaitFlag(cmd)
pcmd.AddWaitTimeoutFlag(cmd, time.Minute)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TF doesn't have the user configurable wait timeout, as it's purely handled inside an internal function with hardcoded value, since we want to have unified UX for CLI and TF users, when designing please take this into consideration, we need to either alter the existing TF behavior or let CLI follow the CLI behavior.

addCmfFlagSet(cmd)
pcmd.AddOutputFlag(cmd)

Expand All @@ -47,7 +48,7 @@
return cmd
}

func (c *command) statementCreateOnPrem(cmd *cobra.Command, args []string) error {

Check failure on line 51 in internal/flink/command_statement_create_onprem.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 19 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3361&issues=00c9e1a0-374b-4f12-88fb-8976f07a19e1&open=00c9e1a0-374b-4f12-88fb-8976f07a19e1
// Flink statement name can be automatically generated or provided by the user
name := types.GenerateStatementNameForOnPrem()
if len(args) == 1 {
Expand Down Expand Up @@ -114,7 +115,7 @@
Stopped: cmfsdk.PtrBool(false),
},
}
wait, err := cmd.Flags().GetBool("wait")
shouldWait, err := cmd.Flags().GetBool("wait")
if err != nil {
return err
}
Expand All @@ -124,22 +125,24 @@
return err
}

if wait {
err := retry.Retry(time.Second*2, time.Minute, func() error {
polledStatement, err := client.GetStatement(c.createContext(), environment, name)
if err != nil {
return err
}
if polledStatement.GetStatus().Phase == "PENDING" {
return fmt.Errorf(`statement phase is "%s"`, polledStatement.GetStatus().Phase)
}
// Update the finalStatement with the completed state
finalStatement = polledStatement
return nil
})
if shouldWait {
timeout, err := cmd.Flags().GetDuration("wait-timeout")
if err != nil {
return err
}
finalStatement, err = wait.Poll(cmd.Context(), wait.Options[cmfsdk.Statement]{
Comment on lines +128 to +133
Fetch: func() (cmfsdk.Statement, error) {
return client.GetStatement(c.createContext(), environment, name)
},
IsTerminal: func(s cmfsdk.Statement) bool {
return s.GetStatus().Phase != "PENDING"
},
Tick: 2 * time.Second,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 * time.Second may work for Flink statement but not work for others, a short interval increases the chance of being rate limited, a long interval may upset users.

Also, please investigate on how TF handles the polling interval and timeout, is it purely manual setting, is it semi-manual setting with default, is it something we can derive from OpenAPI spec, is it something we can write to registry.yaml as override value, once you have a proposal, please sync with Kostya Linou (@linouk23) for the unified implementation.

Timeout: timeout,
})
if err != nil {
return errors.NewErrorWithSuggestions(err.Error(), "Increase `--wait-timeout` or omit `--wait`.")
Comment on lines +143 to +144
}
}

if output.GetFormat(cmd) == output.Human {
Expand Down
9 changes: 9 additions & 0 deletions pkg/cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"fmt"
"strings"
"time"

"github.com/spf13/cobra"

Expand Down Expand Up @@ -212,6 +213,14 @@ func AddDryRunFlag(cmd *cobra.Command) {
cmd.Flags().Bool("dry-run", false, "Run the command without committing changes.")
}

func AddWaitFlag(cmd *cobra.Command) {
cmd.Flags().Bool("wait", false, "Block until the resource reaches a terminal state.")
}

func AddWaitTimeoutFlag(cmd *cobra.Command, defaultTimeout time.Duration) {
cmd.Flags().Duration("wait-timeout", defaultTimeout, "Maximum time to wait when --wait is set.")
}

func AddKsqlClusterFlag(cmd *cobra.Command, c *AuthenticatedCLICommand) {
cmd.Flags().String("ksql-cluster", "", "KSQL cluster for the pipeline.")
RegisterFlagCompletionFunc(cmd, "ksql-cluster", func(cmd *cobra.Command, args []string) []string {
Expand Down
65 changes: 65 additions & 0 deletions pkg/wait/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package wait

import (
"context"
"errors"
"time"
)

// Options describes a single Poll invocation. T is the polled resource type.
type Options[T any] struct {
Fetch func() (T, error)
IsTerminal func(T) bool
IsFailed func(T) bool
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IsFailed is not wired up to the command source file, so we can't tell if the successful wait status or failed wait status, can we handle this better?

Tick time.Duration
Timeout time.Duration
}

var (
ErrTimeout = errors.New("wait timed out")
ErrFailed = errors.New("resource entered failed state")
)

// Poll calls opts.Fetch immediately, then every opts.Tick until IsFailed
// returns true (ErrFailed), IsTerminal returns true (success), opts.Timeout
// elapses (ErrTimeout), ctx is cancelled (ctx.Err()), or Fetch returns a
// non-nil error. Always returns the most recent successfully-fetched T.
func Poll[T any](ctx context.Context, opts Options[T]) (T, error) {
var last T

check := func() (T, bool, error) {
v, err := opts.Fetch()
if err != nil {
return last, true, err
Comment on lines +31 to +33
}
last = v
if opts.IsFailed != nil && opts.IsFailed(v) {
return last, true, ErrFailed
}
if opts.IsTerminal(v) {
return last, true, nil
}
return last, false, nil
}

if v, done, err := check(); done {
return v, err
}

ticker := time.NewTicker(opts.Tick)
defer ticker.Stop()
deadline := time.After(opts.Timeout)

for {
select {
case <-ticker.C:
if v, done, err := check(); done {
return v, err
}
case <-deadline:
return last, ErrTimeout
case <-ctx.Done():
return last, ctx.Err()
}
}
}
132 changes: 132 additions & 0 deletions pkg/wait/wait_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package wait

import (
"context"
"errors"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
)

type fakeResource struct {
phase string
}

func TestPoll_ImmediateReady(t *testing.T) {

Check warning on line 17 in pkg/wait/wait_test.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Rename function "TestPoll_ImmediateReady" to match the regular expression ^(_|[a-zA-Z0-9]+)$

[S100] Function names should comply with a naming convention See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3361&issues=53d1e9c6-c24f-4086-9fea-7643c718de9e&open=53d1e9c6-c24f-4086-9fea-7643c718de9e
calls := 0
v, err := Poll(context.Background(), Options[fakeResource]{
Fetch: func() (fakeResource, error) {
calls++
return fakeResource{phase: "READY"}, nil
},
IsTerminal: func(r fakeResource) bool { return r.phase == "READY" },
Tick: time.Millisecond,
Timeout: time.Second,
})
require.NoError(t, err)
require.Equal(t, "READY", v.phase)
require.Equal(t, 1, calls)
}

func TestPoll_EventuallyReady(t *testing.T) {

Check warning on line 33 in pkg/wait/wait_test.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Rename function "TestPoll_EventuallyReady" to match the regular expression ^(_|[a-zA-Z0-9]+)$

[S100] Function names should comply with a naming convention See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3361&issues=6fcd574c-d887-42da-9269-8d393df65c5d&open=6fcd574c-d887-42da-9269-8d393df65c5d
calls := 0
v, err := Poll(context.Background(), Options[fakeResource]{
Fetch: func() (fakeResource, error) {
calls++
if calls < 3 {
return fakeResource{phase: "PENDING"}, nil
}
return fakeResource{phase: "READY"}, nil
},
IsTerminal: func(r fakeResource) bool { return r.phase != "PENDING" },
Tick: time.Nanosecond,
Timeout: time.Second,
})
require.NoError(t, err)
require.Equal(t, "READY", v.phase)
require.Equal(t, 3, calls)
}

func TestPoll_Failed(t *testing.T) {

Check warning on line 52 in pkg/wait/wait_test.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Rename function "TestPoll_Failed" to match the regular expression ^(_|[a-zA-Z0-9]+)$

[S100] Function names should comply with a naming convention See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3361&issues=93918849-8e1b-4ad7-990d-df09098e24bd&open=93918849-8e1b-4ad7-990d-df09098e24bd
calls := 0
v, err := Poll(context.Background(), Options[fakeResource]{
Fetch: func() (fakeResource, error) {
calls++
if calls == 1 {
return fakeResource{phase: "PENDING"}, nil
}
return fakeResource{phase: "FAILED"}, nil
},
IsTerminal: func(r fakeResource) bool { return r.phase == "READY" || r.phase == "FAILED" },
IsFailed: func(r fakeResource) bool { return r.phase == "FAILED" },
Tick: time.Nanosecond,
Timeout: time.Second,
})
require.ErrorIs(t, err, ErrFailed)
require.Equal(t, "FAILED", v.phase)
}

func TestPoll_Timeout(t *testing.T) {

Check warning on line 71 in pkg/wait/wait_test.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Rename function "TestPoll_Timeout" to match the regular expression ^(_|[a-zA-Z0-9]+)$

[S100] Function names should comply with a naming convention See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3361&issues=118bed6f-1994-4e80-b280-ef5eaa97116c&open=118bed6f-1994-4e80-b280-ef5eaa97116c
v, err := Poll(context.Background(), Options[fakeResource]{
Fetch: func() (fakeResource, error) {
return fakeResource{phase: "PENDING"}, nil
},
IsTerminal: func(r fakeResource) bool { return r.phase != "PENDING" },
Tick: time.Millisecond,
Timeout: 5 * time.Millisecond,
})
require.ErrorIs(t, err, ErrTimeout)
require.Equal(t, "PENDING", v.phase)
}

func TestPoll_FetchError(t *testing.T) {

Check warning on line 84 in pkg/wait/wait_test.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Rename function "TestPoll_FetchError" to match the regular expression ^(_|[a-zA-Z0-9]+)$

[S100] Function names should comply with a naming convention See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3361&issues=dd2e84f2-b816-4b3e-810e-10575846e843&open=dd2e84f2-b816-4b3e-810e-10575846e843
calls := 0
fetchErr := fmt.Errorf("transient fetch failure")
v, err := Poll(context.Background(), Options[fakeResource]{
Fetch: func() (fakeResource, error) {
calls++
if calls == 1 {
return fakeResource{phase: "PENDING"}, nil
}
return fakeResource{}, fetchErr
},
IsTerminal: func(r fakeResource) bool { return r.phase != "PENDING" },
Tick: time.Nanosecond,
Timeout: time.Second,
})
require.ErrorIs(t, err, fetchErr)
require.Equal(t, "PENDING", v.phase)
}

func TestPoll_FetchErrorOnFirstCall(t *testing.T) {

Check warning on line 103 in pkg/wait/wait_test.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Rename function "TestPoll_FetchErrorOnFirstCall" to match the regular expression ^(_|[a-zA-Z0-9]+)$

[S100] Function names should comply with a naming convention See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3361&issues=2360e02e-1bcf-44c1-87df-0145142a143a&open=2360e02e-1bcf-44c1-87df-0145142a143a
fetchErr := fmt.Errorf("initial fetch failure")
v, err := Poll(context.Background(), Options[fakeResource]{
Fetch: func() (fakeResource, error) {
return fakeResource{}, fetchErr
},
IsTerminal: func(r fakeResource) bool { return true },
Tick: time.Nanosecond,
Timeout: time.Second,
})
require.ErrorIs(t, err, fetchErr)
require.Equal(t, fakeResource{}, v)
}

func TestPoll_ContextCancelled(t *testing.T) {

Check warning on line 117 in pkg/wait/wait_test.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Rename function "TestPoll_ContextCancelled" to match the regular expression ^(_|[a-zA-Z0-9]+)$

[S100] Function names should comply with a naming convention See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3361&issues=9317ab23-2ad5-4c4f-95e0-142f9c131b59&open=9317ab23-2ad5-4c4f-95e0-142f9c131b59
ctx, cancel := context.WithCancel(context.Background())

Check warning on line 118 in pkg/wait/wait_test.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Defer the cancel function 'cancel' after this context creation to prevent resource leaks.

[S8188] Context cancellation functions should be deferred See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3361&issues=eb593fe3-4185-4a9b-a5cf-0d552ea77e51&open=eb593fe3-4185-4a9b-a5cf-0d552ea77e51
go func() {
time.Sleep(5 * time.Millisecond)
cancel()
}()
_, err := Poll(ctx, Options[fakeResource]{
Fetch: func() (fakeResource, error) {
return fakeResource{phase: "PENDING"}, nil
},
IsTerminal: func(r fakeResource) bool { return r.phase != "PENDING" },
Tick: time.Millisecond,
Timeout: time.Second,
})
require.True(t, errors.Is(err, context.Canceled))
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ Flags:
--catalog string The name of the default catalog.
--database string The name of the default database.
--flink-configuration string The file path to hold the Flink configuration for the statement.
--wait Boolean flag to block until the statement is running or has failed.
--wait Block until the resource reaches a terminal state.
--wait-timeout duration Maximum time to wait when --wait is set. (default 1m0s)
--url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag.
--client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag.
--client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag.
Expand Down
3 changes: 2 additions & 1 deletion test/fixtures/output/flink/statement/create-help.golden
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ Flags:
--compute-pool string Flink compute pool ID.
--service-account string Service account ID.
--database string The database which will be used as the default database. When using Kafka, this is the cluster ID.
--wait Block until the statement is running or has failed.
--wait Block until the resource reaches a terminal state.
--wait-timeout duration Maximum time to wait when --wait is set. (default 1m0s)
--property strings A mechanism to pass properties in the form key=value when creating a Flink statement.
--environment string Environment ID.
--context string CLI context name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ Flags:
--catalog string The name of the default catalog.
--database string The name of the default database.
--flink-configuration string The file path to hold the Flink configuration for the statement.
--wait Boolean flag to block until the statement is running or has failed.
--wait Block until the resource reaches a terminal state.
--wait-timeout duration Maximum time to wait when --wait is set. (default 1m0s)
--url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag.
--client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag.
--client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ Flags:
--catalog string The name of the default catalog.
--database string The name of the default database.
--flink-configuration string The file path to hold the Flink configuration for the statement.
--wait Boolean flag to block until the statement is running or has failed.
--wait Block until the resource reaches a terminal state.
--wait-timeout duration Maximum time to wait when --wait is set. (default 1m0s)
--url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag.
--client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag.
--client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag.
Expand Down
Loading