Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/ksql/command_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func newClusterCommand(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Comm
cmd.AddCommand(c.newDeleteCommand())
cmd.AddCommand(c.newDescribeCommand())
cmd.AddCommand(c.newListCommand())
cmd.AddCommand(c.newUpdateCommand())
} else {
c := &ksqlCommand{pcmd.NewAuthenticatedWithMDSCLICommand(cmd, prerunner)}
cmd.AddCommand(c.newListCommandOnPrem())
Expand Down
144 changes: 144 additions & 0 deletions internal/ksql/command_cluster_update.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package ksql

import (
"fmt"
"sort"

"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/errors"
"github.com/confluentinc/cli/v4/pkg/examples"
"github.com/confluentinc/cli/v4/pkg/output"
)

// Valid CSU sizes that customers may target via self-serve cluster update.
// Mirrors the server-side authoritative list in cc-control-plane-ksql:
// internal/service/update_ksql_cluster_resize.go::validCSUSizes.
// Values 1, 2 are legacy and not user-selectable. Values above 28 still
// require a support ticket.
//
//nolint:gochecknoglobals
var validCsuSizes = []int32{4, 8, 12, 16, 20, 24, 28}

const csuSupportTicketMessage = "CSU values above 28 require a support ticket. " +
"Please contact Confluent Support to request a larger cluster size."

func (c *ksqlCommand) newUpdateCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "update <id>",
Short: "Update a ksqlDB cluster.",
Long: buildUpdateLongDescription(),
Args: cobra.ExactArgs(1),
ValidArgsFunction: pcmd.NewValidArgsFunction(c.validArgs),
RunE: c.update,
Example: examples.BuildExampleString(
examples.Example{
Text: `Resize ksqlDB cluster "lksqlc-12345" to 8 CSUs.`,
Code: "confluent ksql cluster update lksqlc-12345 --csu 8",
},
),
}

cmd.Flags().Int32("csu", 0, fmt.Sprintf(
"Target number of CSUs for the cluster. Valid values: %s.",
formatCsuList(validCsuSizes)))
pcmd.AddContextFlag(cmd, c.CLICommand)
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddOutputFlag(cmd)

cobra.CheckErr(cmd.MarkFlagRequired("csu"))

return cmd
}

func buildUpdateLongDescription() string {
return fmt.Sprintf(
`Update an existing ksqlDB cluster. Currently only the CSU count may be
modified, and only to larger sizes (shrink is not supported).

Valid CSU values are %s. Larger sizes require a support ticket.
The cluster will undergo a rolling restart to apply the new size; the
command returns once the resize has been accepted by the control plane.`,
formatCsuList(validCsuSizes))
Comment on lines +55 to +63
}

func (c *ksqlCommand) update(cmd *cobra.Command, args []string) error {
csu, err := cmd.Flags().GetInt32("csu")
if err != nil {
return err
}
if err := validateCsuForUpdate(csu); err != nil {
return err
}

environmentId, err := c.Context.EnvironmentId()
if err != nil {
return err
}

clusterId := args[0]

// Pre-check current CSU so we can short-circuit a no-op locally before
// issuing the PATCH. The server-side validator also rejects no-op resizes
// with 400 ("new CSU size is the same as old CSU size, no-op"), but a
// client-side check produces a clearer message and avoids a wasted API
// round trip. Note: shrink is not supported server-side either.
current, err := c.V2Client.DescribeKsqlCluster(clusterId, environmentId)
if err != nil {
return errors.CatchKSQLNotFoundError(err, clusterId)
}
currentCsu := current.Spec.GetCsu()
if currentCsu == csu {
return fmt.Errorf("ksqlDB cluster %q is already at %d CSUs; no change requested",
clusterId, csu)
}
if csu < currentCsu {
return fmt.Errorf("ksqlDB cluster %q is currently %d CSUs; shrinking is not supported "+
"(target %d < current %d)", clusterId, currentCsu, csu, currentCsu)
}

output.ErrPrintf(c.Config.EnableColor,
"Resizing ksqlDB cluster %q from %d to %d CSUs. A rolling restart will be "+
"performed asynchronously; the cluster will continue serving queries during the resize.\n",
clusterId, currentCsu, csu)

cluster, err := c.V2Client.UpdateKsqlCluster(clusterId, environmentId, csu)
if err != nil {
return err
}

Comment on lines +101 to +110
table := output.NewTable(cmd)
table.Add(c.formatClusterForDisplayAndList(&cluster))
return table.Print()
}

// validateCsuForUpdate returns nil if csu is in validCsuSizes, and a
// customer-safe error otherwise. The server-side check in
// cc-control-plane-ksql is authoritative; this client-side validation exists
// to fail fast with a clearer message before issuing the API call.
func validateCsuForUpdate(csu int32) error {
if csu > 28 {
return fmt.Errorf("%d CSUs: %s", csu, csuSupportTicketMessage)
}
Comment on lines +120 to +123
for _, valid := range validCsuSizes {
if csu == valid {
return nil
}
}
return fmt.Errorf("%d is not a valid CSU size for cluster update. Valid sizes are %s",
csu, formatCsuList(validCsuSizes))
}

func formatCsuList(sizes []int32) string {
sorted := append([]int32(nil), sizes...)
sort.Slice(sorted, func(i, j int) bool { return sorted[i] < sorted[j] })
out := ""
for i, s := range sorted {
if i > 0 {
out += ", "
}
out += fmt.Sprintf("%d", s)
}
return out
}
78 changes: 78 additions & 0 deletions internal/ksql/command_cluster_update_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package ksql

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestValidateCsuForUpdate(t *testing.T) {
tests := []struct {
name string
csu int32
expectErr bool
errContains string
}{
{name: "valid 4", csu: 4},
{name: "valid 8", csu: 8},
{name: "valid 12", csu: 12},
{name: "valid 16", csu: 16},
{name: "valid 20", csu: 20},
{name: "valid 24", csu: 24},
{name: "valid 28", csu: 28},
{
name: "legacy size 1 rejected",
csu: 1,
expectErr: true,
errContains: "not a valid CSU size",
},
{
name: "legacy size 2 rejected",
csu: 2,
expectErr: true,
errContains: "not a valid CSU size",
},
{
name: "in-range but non-canonical (5) rejected",
csu: 5,
expectErr: true,
errContains: "not a valid CSU size",
},
{
name: "in-range but non-canonical (10) rejected",
csu: 10,
expectErr: true,
errContains: "not a valid CSU size",
},
{
name: "above 28 routes to support-ticket message",
csu: 32,
expectErr: true,
errContains: "support ticket",
},
{
name: "well above ceiling routes to support-ticket message",
csu: 128,
expectErr: true,
errContains: "support ticket",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
err := validateCsuForUpdate(tc.csu)
if tc.expectErr {
require.Error(t, err)
require.Contains(t, err.Error(), tc.errContains)
} else {
require.NoError(t, err)
}
})
}
}

func TestFormatCsuList(t *testing.T) {
require.Equal(t, "4, 8, 12, 16, 20, 24, 28", formatCsuList(validCsuSizes))
// Input order should not matter; output is sorted ascending.
require.Equal(t, "4, 8, 16", formatCsuList([]int32{16, 4, 8}))
}
27 changes: 27 additions & 0 deletions pkg/ccloudv2/ksql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ccloudv2

import (
"context"
"fmt"
"net/http"

ksqlv2 "github.com/confluentinc/ccloud-sdk-go-v2/ksql/v2"
Expand Down Expand Up @@ -73,3 +74,29 @@ func (c *Client) CreateKsqlCluster(displayName, environmentId, kafkaClusterId, c
res, httpResp, err := c.KsqlClient.ClustersKsqldbcmV2Api.CreateKsqldbcmV2Cluster(c.ksqlApiContext()).KsqldbcmV2Cluster(cluster).Execute()
return res, errors.CatchCCloudV2Error(err, httpResp)
}

// UpdateKsqlCluster issues PATCH /ksqldbcm/v2/clusters/{id} with {"spec":{"csu": N}}
// to trigger a self-serve cluster resize.
//
// The PATCH operation is not yet available in ccloud-sdk-go-v2/ksql/v2 — it is
// being added in cc-api PR #2507 (KSQL-14844), after which the SDK needs to be
// regenerated and the ksql module dependency in go.mod bumped. Until that
// lands, calling this method returns a clear, customer-safe error rather than
// an HTTP failure. See KSQL-14849 for the work item.
//
// Wiring instructions once the SDK is regenerated:
//
// cluster := ksqlv2.KsqldbcmV2Cluster{Spec: &ksqlv2.KsqldbcmV2ClusterSpec{Csu: &csu}}
// res, httpResp, err := c.KsqlClient.ClustersKsqldbcmV2Api.
// UpdateKsqldbcmV2Cluster(c.ksqlApiContext(), id).
// KsqldbcmV2ClusterUpdate(cluster).Environment(environmentId).Execute()
// return res, errors.CatchCCloudV2Error(err, httpResp)
func (c *Client) UpdateKsqlCluster(id, environmentId string, csu int32) (ksqlv2.KsqldbcmV2Cluster, error) {
_ = id
_ = environmentId
_ = csu
return ksqlv2.KsqldbcmV2Cluster{}, fmt.Errorf(
"ksqlDB cluster update is not yet available in this CLI build; " +
"this command is pending a ccloud-sdk-go-v2/ksql regeneration " +
"after cc-api PR #2507 (KSQL-14844) merges. Track KSQL-14849 for status.")
Comment on lines +99 to +101
}