diff --git a/config/config.yaml b/config/config.yaml index b29a88d42..a7c875fb2 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -500,6 +500,16 @@ reconcile: # a.k.a replication lag - calculated as "MAX(absolute_delay) FROM system.replicas" # is within this specified delay (in seconds) delay: 10 + # Optional replicated-host catch-up gate before advancing to the next host. + # Disabled by default to preserve existing reconcile behavior. + sync: + enabled: "false" + mode: "lightweight" + timeout: 0 + onTimeout: "abort" + health: + pollInterval: 10 + successThreshold: 6 probes: # Whether the operator during host launch procedure should wait for startup probe to succeed. # In case probe is unspecified wait is assumed to be completed successfully. diff --git a/deploy/builder/templates-config/config.yaml b/deploy/builder/templates-config/config.yaml index a9eb7797e..51fab2231 100644 --- a/deploy/builder/templates-config/config.yaml +++ b/deploy/builder/templates-config/config.yaml @@ -494,6 +494,16 @@ reconcile: # a.k.a replication lag - calculated as "MAX(absolute_delay) FROM system.replicas" # is within this specified delay (in seconds) delay: 10 + # Optional replicated-host catch-up gate before advancing to the next host. + # Disabled by default to preserve existing reconcile behavior. + sync: + enabled: "false" + mode: "lightweight" + timeout: 0 + onTimeout: "abort" + health: + pollInterval: 10 + successThreshold: 6 probes: # Whether the operator during host launch procedure should wait for startup probe to succeed. # In case probe is unspecified wait is assumed to be completed successfully. diff --git a/deploy/helm/clickhouse-operator/values.yaml b/deploy/helm/clickhouse-operator/values.yaml index 9091a4da9..5086d4e1a 100644 --- a/deploy/helm/clickhouse-operator/values.yaml +++ b/deploy/helm/clickhouse-operator/values.yaml @@ -775,6 +775,16 @@ configs: # a.k.a replication lag - calculated as "MAX(absolute_delay) FROM system.replicas" # is within this specified delay (in seconds) delay: 10 + # Optional replicated-host catch-up gate before advancing to the next host. + # Disabled by default to preserve existing reconcile behavior. + sync: + enabled: "false" + mode: "lightweight" + timeout: 0 + onTimeout: "abort" + health: + pollInterval: 10 + successThreshold: 6 probes: # Whether the operator during host launch procedure should wait for startup probe to succeed. # In case probe is unspecified wait is assumed to be completed successfully. diff --git a/deploy/operator/clickhouse-operator-install-ansible.yaml b/deploy/operator/clickhouse-operator-install-ansible.yaml index a0c3954d6..42fa29293 100644 --- a/deploy/operator/clickhouse-operator-install-ansible.yaml +++ b/deploy/operator/clickhouse-operator-install-ansible.yaml @@ -6186,6 +6186,16 @@ data: # a.k.a replication lag - calculated as "MAX(absolute_delay) FROM system.replicas" # is within this specified delay (in seconds) delay: 10 + # Optional replicated-host catch-up gate before advancing to the next host. + # Disabled by default to preserve existing reconcile behavior. + sync: + enabled: "false" + mode: "lightweight" + timeout: 0 + onTimeout: "abort" + health: + pollInterval: 10 + successThreshold: 6 probes: # Whether the operator during host launch procedure should wait for startup probe to succeed. # In case probe is unspecified wait is assumed to be completed successfully. diff --git a/deploy/operator/clickhouse-operator-install-bundle-v1beta1.yaml b/deploy/operator/clickhouse-operator-install-bundle-v1beta1.yaml index 0716b6803..3fa59e414 100644 --- a/deploy/operator/clickhouse-operator-install-bundle-v1beta1.yaml +++ b/deploy/operator/clickhouse-operator-install-bundle-v1beta1.yaml @@ -5360,7 +5360,6 @@ metadata: namespace: kube-system labels: clickhouse.altinity.com/chop: 0.27.2 - # Template Parameters: # # NAMESPACE=kube-system @@ -5613,7 +5612,6 @@ subjects: - kind: ServiceAccount name: clickhouse-operator namespace: kube-system - # Template Parameters: # # NAMESPACE=kube-system @@ -6385,6 +6383,16 @@ data: # a.k.a replication lag - calculated as "MAX(absolute_delay) FROM system.replicas" # is within this specified delay (in seconds) delay: 10 + # Optional replicated-host catch-up gate before advancing to the next host. + # Disabled by default to preserve existing reconcile behavior. + sync: + enabled: "false" + mode: "lightweight" + timeout: 0 + onTimeout: "abort" + health: + pollInterval: 10 + successThreshold: 6 probes: # Whether the operator during host launch procedure should wait for startup probe to succeed. # In case probe is unspecified wait is assumed to be completed successfully. diff --git a/deploy/operator/clickhouse-operator-install-bundle.yaml b/deploy/operator/clickhouse-operator-install-bundle.yaml index 62a497785..ce6fc1f67 100644 --- a/deploy/operator/clickhouse-operator-install-bundle.yaml +++ b/deploy/operator/clickhouse-operator-install-bundle.yaml @@ -6445,6 +6445,16 @@ data: # a.k.a replication lag - calculated as "MAX(absolute_delay) FROM system.replicas" # is within this specified delay (in seconds) delay: 10 + # Optional replicated-host catch-up gate before advancing to the next host. + # Disabled by default to preserve existing reconcile behavior. + sync: + enabled: "false" + mode: "lightweight" + timeout: 0 + onTimeout: "abort" + health: + pollInterval: 10 + successThreshold: 6 probes: # Whether the operator during host launch procedure should wait for startup probe to succeed. # In case probe is unspecified wait is assumed to be completed successfully. diff --git a/deploy/operator/clickhouse-operator-install-template-v1beta1.yaml b/deploy/operator/clickhouse-operator-install-template-v1beta1.yaml index 13ce1cecf..3773b124e 100644 --- a/deploy/operator/clickhouse-operator-install-template-v1beta1.yaml +++ b/deploy/operator/clickhouse-operator-install-template-v1beta1.yaml @@ -5360,7 +5360,6 @@ metadata: namespace: ${OPERATOR_NAMESPACE} labels: clickhouse.altinity.com/chop: 0.27.2 - # Template Parameters: # # NAMESPACE=${OPERATOR_NAMESPACE} @@ -6132,6 +6131,16 @@ data: # a.k.a replication lag - calculated as "MAX(absolute_delay) FROM system.replicas" # is within this specified delay (in seconds) delay: 10 + # Optional replicated-host catch-up gate before advancing to the next host. + # Disabled by default to preserve existing reconcile behavior. + sync: + enabled: "false" + mode: "lightweight" + timeout: 0 + onTimeout: "abort" + health: + pollInterval: 10 + successThreshold: 6 probes: # Whether the operator during host launch procedure should wait for startup probe to succeed. # In case probe is unspecified wait is assumed to be completed successfully. diff --git a/deploy/operator/clickhouse-operator-install-template.yaml b/deploy/operator/clickhouse-operator-install-template.yaml index 70d9f948f..afb17b4c2 100644 --- a/deploy/operator/clickhouse-operator-install-template.yaml +++ b/deploy/operator/clickhouse-operator-install-template.yaml @@ -6179,6 +6179,16 @@ data: # a.k.a replication lag - calculated as "MAX(absolute_delay) FROM system.replicas" # is within this specified delay (in seconds) delay: 10 + # Optional replicated-host catch-up gate before advancing to the next host. + # Disabled by default to preserve existing reconcile behavior. + sync: + enabled: "false" + mode: "lightweight" + timeout: 0 + onTimeout: "abort" + health: + pollInterval: 10 + successThreshold: 6 probes: # Whether the operator during host launch procedure should wait for startup probe to succeed. # In case probe is unspecified wait is assumed to be completed successfully. diff --git a/deploy/operator/clickhouse-operator-install-tf.yaml b/deploy/operator/clickhouse-operator-install-tf.yaml index 5bb2526e0..9f20a30de 100644 --- a/deploy/operator/clickhouse-operator-install-tf.yaml +++ b/deploy/operator/clickhouse-operator-install-tf.yaml @@ -6186,6 +6186,16 @@ data: # a.k.a replication lag - calculated as "MAX(absolute_delay) FROM system.replicas" # is within this specified delay (in seconds) delay: 10 + # Optional replicated-host catch-up gate before advancing to the next host. + # Disabled by default to preserve existing reconcile behavior. + sync: + enabled: "false" + mode: "lightweight" + timeout: 0 + onTimeout: "abort" + health: + pollInterval: 10 + successThreshold: 6 probes: # Whether the operator during host launch procedure should wait for startup probe to succeed. # In case probe is unspecified wait is assumed to be completed successfully. diff --git a/docs/operator_configuration.md b/docs/operator_configuration.md index 7e4d58af1..fc11f8bf8 100644 --- a/docs/operator_configuration.md +++ b/docs/operator_configuration.md @@ -222,6 +222,85 @@ spec: See [Keeper Reference](keeper_reference.md) for details on how CHI references CHK resources. +### Replicated Host Sync Gate + +The operator can optionally block a rolling host reconcile until a recreated replicated +ClickHouse host catches up to a bounded replication baseline. This is an operator +rolling gate, not a readiness probe. It is disabled by default. + +This is especially useful for local or direct-attached storage deployments, including +NVMe-backed Local PVs, where a recreated pod may start with an empty or replaced disk +and must rebuild replicated data from peer replicas before the operator rolls the next +host. + +The existing caught-up marker path remains unchanged when this gate is disabled. That +path only polls the local host's `MAX(absolute_delay)` from `system.replicas` before +writing `status.hostsWithReplicaCaughtUp`, which is weak for recreated-host recovery +because the metric is limited to replicated objects already loaded and visible on that +local server. During recreated-host recovery, asynchronous database/table loading may +not have exposed all replicated objects on the local host yet, and a local delay metric +cannot discover replicated objects that exist on peers or issue a ClickHouse sync +barrier for their known parts. The sync gate adds those checks before the operator +advances to the next host. + +```yaml +spec: + reconcile: + host: + wait: + replicas: + sync: + enabled: "false" + mode: "lightweight" + timeout: 0 + onTimeout: "abort" + health: + pollInterval: 10 + successThreshold: 6 +``` + +| Setting | Default | Description | +|---|---|---| +| `enabled` | `"false"` | Enables the replicated-host sync gate. Existing replica-delay behavior is unchanged when disabled. | +| `mode` | `"lightweight"` | Uses `SYSTEM SYNC REPLICA ... LIGHTWEIGHT`. No fallback to legacy `SYSTEM SYNC REPLICA` is performed. | +| `timeout` | `0` | Whole-gate timeout in seconds. `0` means unbounded. | +| `onTimeout` | `"abort"` | `abort` stops reconcile on the gate deadline. `proceed` advances without writing the caught-up marker, so a later reconcile can try again. | +| `health.pollInterval` | `10` | Seconds between post-sync health checks. | +| `health.successThreshold` | `6` | Consecutive healthy checks required after sync before the caught-up marker is written. | + +When enabled, the gate waits for asynchronous database loading when ClickHouse exposes +`system.asynchronous_loader`, discovers replicated objects from peer replicas, syncs +`Replicated` databases with `SYSTEM SYNC DATABASE REPLICA`, syncs replicated tables +with `SYSTEM SYNC REPLICA ... LIGHTWEIGHT`, and then requires a stable health window. +Health is based on `system.replicas`: `is_readonly = 0`, `is_session_expired = 0`, and +`absolute_delay <= reconcile.host.wait.replicas.delay`. + +The `LIGHTWEIGHT` baseline is the time when the sync command runs. It waits for the +relevant part-acquisition work known at that point; it does not require +`system.replication_queue` to become empty and does not block forever on unrelated +merges, mutations, or new ingest that arrives after the sync command. ClickHouse +versions below `23.4` do not support `LIGHTWEIGHT`; enabling this gate on those +versions fails explicitly instead of silently falling back. + +Hard failures always abort regardless of `onTimeout`: query or connection failure, +parent reconcile context cancellation, failed/canceled async load jobs, readonly +replicas, and expired Keeper sessions. The caught-up marker is written only after real +success or when peer discovery confirms that there are no replicated objects to sync. + +Manual local-PV/data-loss validation: + +1. Create a CHI with a replicated shard and `sync.enabled: "true"`. +2. Wait for the current hosts to become caught up and confirm + `status.hostsWithReplicaCaughtUp` contains the host FQDNs. +3. Simulate storage loss for one host, for example by removing the local PV/PVC data + in a test environment. +4. Reconcile the CHI and confirm the operator removes the stale caught-up marker for + the recreated host. +5. Confirm the recreated host runs the sync gate and the next host in the shard does + not advance while the recreated host is still behind. +6. Allow replication to catch up and confirm the recreated host receives the + caught-up marker again, then the next host proceeds. + ## Security The `security:` block at the chopconf top level (sibling of `clickhouse:`) holds operator-wide hardening defaults across three orthogonal axes: transport hardening (`security.policy`), FIPS cryptographic-module enforcement (`security.fips.enforced`), and workload supply-chain gating (`security.images.policy`). Per-component sub-blocks under it cover ClickHouse-client TLS, ZooKeeper-client TLS, Kubernetes-client TLS, and the operator↔metrics-exporter IPC channel. diff --git a/pkg/apis/clickhouse-keeper.altinity.com/v1/type_status.go b/pkg/apis/clickhouse-keeper.altinity.com/v1/type_status.go index 937128d88..767383b40 100644 --- a/pkg/apis/clickhouse-keeper.altinity.com/v1/type_status.go +++ b/pkg/apis/clickhouse-keeper.altinity.com/v1/type_status.go @@ -184,6 +184,20 @@ func (s *Status) PushHostReplicaCaughtUp(host string) { }) } +// RemoveHostReplicaCaughtUp removes host from the list of hosts with replica caught-up +func (s *Status) RemoveHostReplicaCaughtUp(host string) { + host = util.NormalizeFQDN(host) + doWithWriteLock(s, func(s *Status) { + hosts := s.HostsWithReplicaCaughtUp[:0] + for _, caughtUpHost := range s.HostsWithReplicaCaughtUp { + if caughtUpHost != host { + hosts = append(hosts, caughtUpHost) + } + } + s.HostsWithReplicaCaughtUp = hosts + }) +} + // PushHostTablesCreated pushes host to the list of hosts with created tables func (s *Status) PushHostTablesCreated(host string) { host = util.NormalizeFQDN(host) diff --git a/pkg/apis/clickhouse.altinity.com/v1/interface.go b/pkg/apis/clickhouse.altinity.com/v1/interface.go index 141154928..7ee5f9e39 100644 --- a/pkg/apis/clickhouse.altinity.com/v1/interface.go +++ b/pkg/apis/clickhouse.altinity.com/v1/interface.go @@ -101,6 +101,7 @@ type IStatus interface { GetHostsWithReplicaCaughtUp() []string PushHostTablesCreated(host string) PushHostReplicaCaughtUp(host string) + RemoveHostReplicaCaughtUp(host string) HasNormalizedCRCompleted() bool diff --git a/pkg/apis/clickhouse.altinity.com/v1/type_configuration_chop.go b/pkg/apis/clickhouse.altinity.com/v1/type_configuration_chop.go index f9610f1ab..5046c8bcc 100644 --- a/pkg/apis/clickhouse.altinity.com/v1/type_configuration_chop.go +++ b/pkg/apis/clickhouse.altinity.com/v1/type_configuration_chop.go @@ -222,6 +222,14 @@ const ( defaultMaxReplicationDelay = 10 ) +const ( + defaultReconcileHostWaitReplicasSyncMode = "lightweight" + defaultReconcileHostWaitReplicasSyncOnTimeout = "abort" + defaultReconcileHostWaitReplicasSyncTimeoutSeconds = 0 + defaultReconcileHostWaitReplicasSyncHealthPollSeconds = 10 + defaultReconcileHostWaitReplicasSyncHealthSuccessThreshold = 6 +) + // OperatorConfig specifies operator configuration // !!! IMPORTANT !!! // !!! IMPORTANT !!! @@ -716,6 +724,7 @@ func (wait ReconcileHostWait) Normalize() ReconcileHostWait { // Default update timeout in seconds wait.Replicas.Delay = types.NewInt32(defaultMaxReplicationDelay) } + wait.Replicas.Sync = wait.Replicas.Sync.Normalize() if wait.Probes == nil { wait.Probes = &ReconcileHostWaitProbes{} @@ -754,9 +763,130 @@ func (drop ReconcileHostDrop) MergeFrom(from ReconcileHostDrop) ReconcileHostDro } type ReconcileHostWaitReplicas struct { - All *types.StringBool `json:"all,omitempty" yaml:"all,omitempty"` - New *types.StringBool `json:"new,omitempty" yaml:"new,omitempty"` - Delay *types.Int32 `json:"delay,omitempty" yaml:"delay,omitempty"` + All *types.StringBool `json:"all,omitempty" yaml:"all,omitempty"` + New *types.StringBool `json:"new,omitempty" yaml:"new,omitempty"` + Delay *types.Int32 `json:"delay,omitempty" yaml:"delay,omitempty"` + Sync *ReconcileHostWaitReplicasSync `json:"sync,omitempty" yaml:"sync,omitempty"` +} + +// ReconcileHostWaitReplicasSync configures a replicated-host catch-up gate before advancing shard rolling reconcile. +type ReconcileHostWaitReplicasSync struct { + Enabled *types.StringBool `json:"enabled,omitempty" yaml:"enabled,omitempty"` + Mode *types.String `json:"mode,omitempty" yaml:"mode,omitempty"` + Timeout *types.Int32 `json:"timeout,omitempty" yaml:"timeout,omitempty"` + OnTimeout *types.String `json:"onTimeout,omitempty" yaml:"onTimeout,omitempty"` + Health *ReconcileHostWaitReplicasSyncHealth `json:"health,omitempty" yaml:"health,omitempty"` +} + +// ReconcileHostWaitReplicasSyncHealth configures the stable-health window after replicated-host sync. +type ReconcileHostWaitReplicasSyncHealth struct { + PollInterval *types.Int32 `json:"pollInterval,omitempty" yaml:"pollInterval,omitempty"` + SuccessThreshold *types.Int32 `json:"successThreshold,omitempty" yaml:"successThreshold,omitempty"` +} + +func isValidReconcileHostWaitReplicasSyncMode(value string) bool { + return value == defaultReconcileHostWaitReplicasSyncMode +} + +func isValidReconcileHostWaitReplicasSyncOnTimeout(value string) bool { + return value == "abort" || value == "proceed" +} + +func (syncConfig *ReconcileHostWaitReplicasSync) Normalize() *ReconcileHostWaitReplicasSync { + if syncConfig == nil { + syncConfig = &ReconcileHostWaitReplicasSync{} + } + syncConfig.Enabled = syncConfig.Enabled.Normalize(false) + if !isValidReconcileHostWaitReplicasSyncMode(syncConfig.Mode.Value()) { + syncConfig.Mode = types.NewString(defaultReconcileHostWaitReplicasSyncMode) + } + if syncConfig.Timeout == nil || syncConfig.Timeout.Value() < 0 { + syncConfig.Timeout = types.NewInt32(defaultReconcileHostWaitReplicasSyncTimeoutSeconds) + } + if !isValidReconcileHostWaitReplicasSyncOnTimeout(syncConfig.OnTimeout.Value()) { + syncConfig.OnTimeout = types.NewString(defaultReconcileHostWaitReplicasSyncOnTimeout) + } + syncConfig.Health = syncConfig.Health.Normalize() + return syncConfig +} + +func (health *ReconcileHostWaitReplicasSyncHealth) Normalize() *ReconcileHostWaitReplicasSyncHealth { + if health == nil { + health = &ReconcileHostWaitReplicasSyncHealth{} + } + if health.PollInterval == nil || health.PollInterval.Value() <= 0 { + health.PollInterval = types.NewInt32(defaultReconcileHostWaitReplicasSyncHealthPollSeconds) + } + if health.SuccessThreshold == nil || health.SuccessThreshold.Value() <= 0 { + health.SuccessThreshold = types.NewInt32(defaultReconcileHostWaitReplicasSyncHealthSuccessThreshold) + } + return health +} + +func (syncConfig *ReconcileHostWaitReplicasSync) MergeFrom(from *ReconcileHostWaitReplicasSync) *ReconcileHostWaitReplicasSync { + if from == nil { + return syncConfig + } + if syncConfig == nil { + syncConfig = &ReconcileHostWaitReplicasSync{} + } + syncConfig.Enabled = syncConfig.Enabled.MergeFrom(from.Enabled) + syncConfig.Mode = syncConfig.Mode.MergeFrom(from.Mode) + syncConfig.Timeout = syncConfig.Timeout.MergeFrom(from.Timeout) + syncConfig.OnTimeout = syncConfig.OnTimeout.MergeFrom(from.OnTimeout) + syncConfig.Health = syncConfig.Health.MergeFrom(from.Health) + return syncConfig +} + +func (health *ReconcileHostWaitReplicasSyncHealth) MergeFrom(from *ReconcileHostWaitReplicasSyncHealth) *ReconcileHostWaitReplicasSyncHealth { + if from == nil { + return health + } + if health == nil { + health = &ReconcileHostWaitReplicasSyncHealth{} + } + health.PollInterval = health.PollInterval.MergeFrom(from.PollInterval) + health.SuccessThreshold = health.SuccessThreshold.MergeFrom(from.SuccessThreshold) + return health +} + +func (syncConfig *ReconcileHostWaitReplicasSync) IsEnabled() bool { + return syncConfig != nil && syncConfig.Enabled.Value() +} + +func (syncConfig *ReconcileHostWaitReplicasSync) GetMode() string { + if syncConfig == nil || !isValidReconcileHostWaitReplicasSyncMode(syncConfig.Mode.Value()) { + return defaultReconcileHostWaitReplicasSyncMode + } + return syncConfig.Mode.Value() +} + +func (syncConfig *ReconcileHostWaitReplicasSync) GetTimeout() int { + if syncConfig == nil || syncConfig.Timeout == nil || syncConfig.Timeout.Value() < 0 { + return defaultReconcileHostWaitReplicasSyncTimeoutSeconds + } + return syncConfig.Timeout.IntValue() +} + +func (syncConfig *ReconcileHostWaitReplicasSync) GetOnTimeout() string { + if syncConfig == nil || !isValidReconcileHostWaitReplicasSyncOnTimeout(syncConfig.OnTimeout.Value()) { + return defaultReconcileHostWaitReplicasSyncOnTimeout + } + return syncConfig.OnTimeout.Value() +} + +func (syncConfig *ReconcileHostWaitReplicasSync) GetPollInterval() int { + if syncConfig == nil || syncConfig.Health == nil || syncConfig.Health.PollInterval == nil || syncConfig.Health.PollInterval.Value() <= 0 { + return defaultReconcileHostWaitReplicasSyncHealthPollSeconds + } + return syncConfig.Health.PollInterval.IntValue() +} + +func (syncConfig *ReconcileHostWaitReplicasSync) GetSuccessThreshold() int { + if syncConfig == nil || syncConfig.Health == nil || syncConfig.Health.SuccessThreshold == nil || syncConfig.Health.SuccessThreshold.Value() <= 0 { + return defaultReconcileHostWaitReplicasSyncHealthSuccessThreshold + } + return syncConfig.Health.SuccessThreshold.IntValue() } func (r *ReconcileHostWaitReplicas) MergeFrom(from *ReconcileHostWaitReplicas) *ReconcileHostWaitReplicas { @@ -777,6 +907,7 @@ func (r *ReconcileHostWaitReplicas) MergeFrom(from *ReconcileHostWaitReplicas) * r.All = r.All.MergeFrom(from.All) r.New = r.New.MergeFrom(from.New) r.Delay = r.Delay.MergeFrom(from.Delay) + r.Sync = r.Sync.MergeFrom(from.Sync) return r } diff --git a/pkg/apis/clickhouse.altinity.com/v1/type_configuration_chop_sync_test.go b/pkg/apis/clickhouse.altinity.com/v1/type_configuration_chop_sync_test.go new file mode 100644 index 000000000..7fd93ae92 --- /dev/null +++ b/pkg/apis/clickhouse.altinity.com/v1/type_configuration_chop_sync_test.go @@ -0,0 +1,55 @@ +package v1 + +import ( + "testing" + + "github.com/altinity/clickhouse-operator/pkg/apis/common/types" +) + +func TestReconcileHostWaitReplicasSyncNormalizeDefaults(t *testing.T) { + var syncConfig *ReconcileHostWaitReplicasSync + syncConfig = syncConfig.Normalize() + if syncConfig.IsEnabled() { + t.Fatalf("enabled must default to false") + } + if syncConfig.GetMode() != "lightweight" { + t.Fatalf("mode default = %q, want lightweight", syncConfig.GetMode()) + } + if syncConfig.GetTimeout() != 0 { + t.Fatalf("timeout default = %d, want 0 (unbounded)", syncConfig.GetTimeout()) + } + if syncConfig.GetOnTimeout() != "abort" { + t.Fatalf("onTimeout default = %q, want abort", syncConfig.GetOnTimeout()) + } + if syncConfig.GetPollInterval() != 10 || syncConfig.GetSuccessThreshold() != 6 { + t.Fatalf("health defaults = %d/%d, want 10/6", syncConfig.GetPollInterval(), syncConfig.GetSuccessThreshold()) + } +} + +func TestReconcileHostWaitReplicasSyncNormalizeRejectsInvalid(t *testing.T) { + syncConfig := &ReconcileHostWaitReplicasSync{ + Mode: types.NewString("bogus"), + Timeout: types.NewInt32(-5), + OnTimeout: types.NewString("explode"), + Health: &ReconcileHostWaitReplicasSyncHealth{ + PollInterval: types.NewInt32(0), + SuccessThreshold: types.NewInt32(-1), + }, + } + syncConfig = syncConfig.Normalize() + if syncConfig.GetMode() != "lightweight" || syncConfig.GetOnTimeout() != "abort" { + t.Fatalf("invalid enums must fall back to defaults") + } + if syncConfig.GetTimeout() != 0 || syncConfig.GetPollInterval() != 10 || syncConfig.GetSuccessThreshold() != 6 { + t.Fatalf("invalid numerics must fall back to defaults") + } +} + +func TestReconcileHostWaitReplicasSyncMergeFromPrefersLocal(t *testing.T) { + localSyncConfig := (&ReconcileHostWaitReplicasSync{Enabled: types.NewStringBool(true)}).Normalize() + parentSyncConfig := (&ReconcileHostWaitReplicasSync{Enabled: types.NewStringBool(false), Timeout: types.NewInt32(30)}).Normalize() + mergedSyncConfig := localSyncConfig.MergeFrom(parentSyncConfig) + if !mergedSyncConfig.IsEnabled() { + t.Fatalf("merge must prefer local enabled=true") + } +} diff --git a/pkg/apis/clickhouse.altinity.com/v1/type_host.go b/pkg/apis/clickhouse.altinity.com/v1/type_host.go index 362ac428e..c827ea360 100644 --- a/pkg/apis/clickhouse.altinity.com/v1/type_host.go +++ b/pkg/apis/clickhouse.altinity.com/v1/type_host.go @@ -66,6 +66,7 @@ type HostRuntime struct { reconcileAttributes *types.ReconcileAttributes `json:"-" yaml:"-" testdiff:"ignore"` replicas *types.Int32 `json:"-" yaml:"-"` hasData bool `json:"-" yaml:"-"` + forceReplicaCatchUp bool `json:"-" yaml:"-"` // CurStatefulSet is a current stateful set, fetched from k8s CurStatefulSet *apps.StatefulSet `json:"-" yaml:"-" testdiff:"ignore"` @@ -736,6 +737,20 @@ func (host *Host) SetHasData(hasData bool) { host.Runtime.hasData = hasData } +func (host *Host) IsForceReplicaCatchUp() bool { + if host == nil { + return false + } + return host.Runtime.forceReplicaCatchUp +} + +func (host *Host) SetForceReplicaCatchUp(force bool) { + if host == nil { + return + } + host.Runtime.forceReplicaCatchUp = force +} + func (host *Host) IsZero() bool { return host == nil } diff --git a/pkg/apis/clickhouse.altinity.com/v1/type_host_runtime_test.go b/pkg/apis/clickhouse.altinity.com/v1/type_host_runtime_test.go new file mode 100644 index 000000000..1dd1edf63 --- /dev/null +++ b/pkg/apis/clickhouse.altinity.com/v1/type_host_runtime_test.go @@ -0,0 +1,20 @@ +package v1 + +import "testing" + +func TestHostForceReplicaCatchUpDefaultsFalseAndCanBeSet(t *testing.T) { + host := &Host{} + if host.IsForceReplicaCatchUp() { + t.Fatalf("force replica catch-up must default to false") + } + + host.SetForceReplicaCatchUp(true) + if !host.IsForceReplicaCatchUp() { + t.Fatalf("force replica catch-up must be true after SetForceReplicaCatchUp(true)") + } + + host.SetForceReplicaCatchUp(false) + if host.IsForceReplicaCatchUp() { + t.Fatalf("force replica catch-up must be false after SetForceReplicaCatchUp(false)") + } +} diff --git a/pkg/apis/clickhouse.altinity.com/v1/type_status.go b/pkg/apis/clickhouse.altinity.com/v1/type_status.go index d7c7d2e4f..83413ef27 100644 --- a/pkg/apis/clickhouse.altinity.com/v1/type_status.go +++ b/pkg/apis/clickhouse.altinity.com/v1/type_status.go @@ -203,6 +203,20 @@ func (s *Status) PushHostReplicaCaughtUp(host string) { }) } +// RemoveHostReplicaCaughtUp removes host from the list of hosts with replica caught-up +func (s *Status) RemoveHostReplicaCaughtUp(host string) { + host = util.NormalizeFQDN(host) + doWithWriteLock(s, func(s *Status) { + hosts := s.HostsWithReplicaCaughtUp[:0] + for _, caughtUpHost := range s.HostsWithReplicaCaughtUp { + if caughtUpHost != host { + hosts = append(hosts, caughtUpHost) + } + } + s.HostsWithReplicaCaughtUp = hosts + }) +} + // PushHostTablesCreated pushes host to the list of hosts with created tables func (s *Status) PushHostTablesCreated(host string) { host = util.NormalizeFQDN(host) diff --git a/pkg/apis/clickhouse.altinity.com/v1/type_status_test.go b/pkg/apis/clickhouse.altinity.com/v1/type_status_test.go index e30b0653f..0a72ef677 100644 --- a/pkg/apis/clickhouse.altinity.com/v1/type_status_test.go +++ b/pkg/apis/clickhouse.altinity.com/v1/type_status_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" "github.com/altinity/clickhouse-operator/pkg/apis/common/types" + "github.com/altinity/clickhouse-operator/pkg/util" ) var normalizedChiA = &ClickHouseInstallation{} @@ -109,6 +110,21 @@ func TestCopyFromUsedTemplates(t *testing.T) { }) } +func TestRemoveHostReplicaCaughtUp(t *testing.T) { + const fqdn = "chi-x-default-0-0" + status := &Status{} + status.PushHostReplicaCaughtUp(fqdn) + status.PushHostReplicaCaughtUp("chi-x-default-0-1") + + status.RemoveHostReplicaCaughtUp(fqdn) + + for _, host := range status.GetHostsWithReplicaCaughtUp() { + if host == util.NormalizeFQDN(fqdn) { + t.Fatalf("host should have been removed: %v", status.GetHostsWithReplicaCaughtUp()) + } + } +} + // NB: These tests mostly exist to exercise synchronization and detect regressions related to them via the // Golang race detector. See: https://go.dev/blog/race-detector // In short, add -race to the go test flags when running this. diff --git a/pkg/apis/clickhouse.altinity.com/v1/zz_generated.deepcopy.go b/pkg/apis/clickhouse.altinity.com/v1/zz_generated.deepcopy.go index dd8160f73..a9c7f5285 100644 --- a/pkg/apis/clickhouse.altinity.com/v1/zz_generated.deepcopy.go +++ b/pkg/apis/clickhouse.altinity.com/v1/zz_generated.deepcopy.go @@ -2918,6 +2918,11 @@ func (in *ReconcileHostWaitReplicas) DeepCopyInto(out *ReconcileHostWaitReplicas *out = new(types.Int32) **out = **in } + if in.Sync != nil { + in, out := &in.Sync, &out.Sync + *out = new(ReconcileHostWaitReplicasSync) + (*in).DeepCopyInto(*out) + } return } @@ -2931,6 +2936,73 @@ func (in *ReconcileHostWaitReplicas) DeepCopy() *ReconcileHostWaitReplicas { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ReconcileHostWaitReplicasSync) DeepCopyInto(out *ReconcileHostWaitReplicasSync) { + *out = *in + if in.Enabled != nil { + in, out := &in.Enabled, &out.Enabled + *out = new(types.StringBool) + **out = **in + } + if in.Mode != nil { + in, out := &in.Mode, &out.Mode + *out = new(types.String) + **out = **in + } + if in.Timeout != nil { + in, out := &in.Timeout, &out.Timeout + *out = new(types.Int32) + **out = **in + } + if in.OnTimeout != nil { + in, out := &in.OnTimeout, &out.OnTimeout + *out = new(types.String) + **out = **in + } + if in.Health != nil { + in, out := &in.Health, &out.Health + *out = new(ReconcileHostWaitReplicasSyncHealth) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ReconcileHostWaitReplicasSync. +func (in *ReconcileHostWaitReplicasSync) DeepCopy() *ReconcileHostWaitReplicasSync { + if in == nil { + return nil + } + out := new(ReconcileHostWaitReplicasSync) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ReconcileHostWaitReplicasSyncHealth) DeepCopyInto(out *ReconcileHostWaitReplicasSyncHealth) { + *out = *in + if in.PollInterval != nil { + in, out := &in.PollInterval, &out.PollInterval + *out = new(types.Int32) + **out = **in + } + if in.SuccessThreshold != nil { + in, out := &in.SuccessThreshold, &out.SuccessThreshold + *out = new(types.Int32) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ReconcileHostWaitReplicasSyncHealth. +func (in *ReconcileHostWaitReplicasSyncHealth) DeepCopy() *ReconcileHostWaitReplicasSyncHealth { + if in == nil { + return nil + } + out := new(ReconcileHostWaitReplicasSyncHealth) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ReconcileMacros) DeepCopyInto(out *ReconcileMacros) { *out = *in diff --git a/pkg/controller/chi/worker-deleter.go b/pkg/controller/chi/worker-deleter.go index 9b1e38179..b9c0d215a 100644 --- a/pkg/controller/chi/worker-deleter.go +++ b/pkg/controller/chi/worker-deleter.go @@ -528,8 +528,8 @@ func (w *worker) deleteHost(ctx context.Context, chi *api.ClickHouseInstallation return nil } - w.a.V(2).M(host).S().Info(host.Runtime.Address.HostName) - defer w.a.V(2).M(host).E().Info(host.Runtime.Address.HostName) + w.a.V(2).M(host).S().Info("%s", host.Runtime.Address.HostName) + defer w.a.V(2).M(host).E().Info("%s", host.Runtime.Address.HostName) w.a.V(1). WithEvent(host.GetCR(), a.EventActionDelete, a.EventReasonDeleteStarted). diff --git a/pkg/controller/chi/worker-reconciler-chi.go b/pkg/controller/chi/worker-reconciler-chi.go index 34123eaa5..e0bd7d033 100644 --- a/pkg/controller/chi/worker-reconciler-chi.go +++ b/pkg/controller/chi/worker-reconciler-chi.go @@ -203,7 +203,7 @@ func (w *worker) buildCR(ctx context.Context, _cr *api.ClickHouseInstallation) * actionPlan := api.MakeActionPlan(cr.GetAncestorT(), cr) cr.EnsureRuntime().ActionPlan = actionPlan cr.EnsureStatus().SetActionPlan(actionPlan) - w.a.V(1).M(cr).Info(actionPlan.Log("buildCR")) + w.a.V(1).M(cr).Info("%s", actionPlan.Log("buildCR")) return cr } @@ -945,6 +945,7 @@ func (w *worker) reconcileHostMain(ctx context.Context, host *api.Host) error { w.a.V(1).M(host).F().Warning("Data loss detected for host: %s. Aborting reconcile as configured (onDataLoss: abort)", host.GetName()) return common.ErrCRUDAbort } + w.forceReplicaCatchUpAfterStorageLoss(host, w.c.namer.Name(interfaces.NameFQDN, host)) stsReconcileOpts, migrateTableOpts = w.hostPVCsDataLossDetectedOptions(host) w.a.V(1). M(host).F(). @@ -955,6 +956,7 @@ func (w *worker) reconcileHostMain(ctx context.Context, host *api.Host) error { return common.ErrCRUDAbort } // stsReconcileOpts, migrateTableOpts = w.hostPVCsDataVolumeMissedDetectedOptions(host) + w.forceReplicaCatchUpAfterStorageLoss(host, w.c.namer.Name(interfaces.NameFQDN, host)) stsReconcileOpts, migrateTableOpts = w.hostPVCsDataLossDetectedOptions(host) w.a.V(1). M(host).F(). @@ -1017,6 +1019,14 @@ func (w *worker) prepareStsReconcileOptsWaitSection(host *api.Host, opts *statef return opts } +func (w *worker) forceReplicaCatchUpAfterStorageLoss(host *api.Host, fqdn string) { + if !chop.Config().Reconcile.Host.Wait.Replicas.Sync.IsEnabled() { + return + } + host.SetForceReplicaCatchUp(true) + host.GetCR().IEnsureStatus().RemoveHostReplicaCaughtUp(fqdn) +} + func (w *worker) reconcileHostPVCs(ctx context.Context, host *api.Host) storage.ErrorDataPersistence { return storage.NewStorageReconciler( w.task, diff --git a/pkg/controller/chi/worker-reconciler-chi_test.go b/pkg/controller/chi/worker-reconciler-chi_test.go index 2a9ed7e9f..0cdb80292 100644 --- a/pkg/controller/chi/worker-reconciler-chi_test.go +++ b/pkg/controller/chi/worker-reconciler-chi_test.go @@ -22,6 +22,8 @@ import ( core "k8s.io/api/core/v1" api "github.com/altinity/clickhouse-operator/pkg/apis/clickhouse.altinity.com/v1" + "github.com/altinity/clickhouse-operator/pkg/apis/common/types" + "github.com/altinity/clickhouse-operator/pkg/chop" ) // sts is a small builder for an apps/v1 StatefulSet with a single-container pod template @@ -46,6 +48,50 @@ func hostWith(cur, desired *apps.StatefulSet) *api.Host { return h } +func withReplicaSyncGate(t *testing.T, enabled bool) { + t.Helper() + cfg := chop.Config() + prev := cfg.Reconcile.Host.Wait.Replicas.Sync + t.Cleanup(func() { + cfg.Reconcile.Host.Wait.Replicas.Sync = prev + }) + cfg.Reconcile.Host.Wait.Replicas.Sync = (&api.ReconcileHostWaitReplicasSync{ + Enabled: types.NewStringBool(enabled), + }).Normalize() +} + +func hostWithReplicaCaughtUpMarker(fqdn string) *api.Host { + cr := &api.ClickHouseInstallation{} + host := &api.Host{} + host.SetCR(cr) + host.GetCR().IEnsureStatus().PushHostReplicaCaughtUp(fqdn) + return host +} + +func TestForceReplicaCatchUpAfterStorageLossNoopWhenSyncDisabled(t *testing.T) { + const fqdn = "chi-x-default-0-0" + withReplicaSyncGate(t, false) + host := hostWithReplicaCaughtUpMarker(fqdn) + w := &worker{} + + w.forceReplicaCatchUpAfterStorageLoss(host, fqdn) + + require.False(t, host.IsForceReplicaCatchUp()) + require.True(t, host.HasListedReplicaCaughtUp(fqdn)) +} + +func TestForceReplicaCatchUpAfterStorageLossClearsMarkerWhenSyncEnabled(t *testing.T) { + const fqdn = "chi-x-default-0-0" + withReplicaSyncGate(t, true) + host := hostWithReplicaCaughtUpMarker(fqdn) + w := &worker{} + + w.forceReplicaCatchUpAfterStorageLoss(host, fqdn) + + require.True(t, host.IsForceReplicaCatchUp()) + require.False(t, host.HasListedReplicaCaughtUp(fqdn)) +} + // TestHostRequiresStatefulSetRollout exercises the pure decision function that gates // the pre-rollout software restart in reconcileHostStatefulSet. // diff --git a/pkg/controller/chi/worker-secret.go b/pkg/controller/chi/worker-secret.go index 4b15ad52a..42028bd20 100644 --- a/pkg/controller/chi/worker-secret.go +++ b/pkg/controller/chi/worker-secret.go @@ -25,8 +25,8 @@ import ( // reconcileSecret reconciles core.Secret func (w *worker) reconcileSecret(ctx context.Context, cr api.ICustomResource, secret *core.Secret) error { - w.a.V(2).M(cr).S().Info(secret.Name) - defer w.a.V(2).M(cr).E().Info(secret.Name) + w.a.V(2).M(cr).S().Info("%s", secret.Name) + defer w.a.V(2).M(cr).E().Info("%s", secret.Name) // Check whether this object already exists if _, err := w.c.getSecret(ctx, secret); err == nil { diff --git a/pkg/controller/chi/worker-service.go b/pkg/controller/chi/worker-service.go index 99f4550b3..8a7c13d50 100644 --- a/pkg/controller/chi/worker-service.go +++ b/pkg/controller/chi/worker-service.go @@ -29,8 +29,8 @@ import ( // reconcileService reconciles core.Service func (w *worker) reconcileService(ctx context.Context, cr chi.ICustomResource, service, prevService *core.Service) error { - w.a.V(2).M(cr).S().Info(service.GetName()) - defer w.a.V(2).M(cr).E().Info(service.GetName()) + w.a.V(2).M(cr).S().Info("%s", service.GetName()) + defer w.a.V(2).M(cr).E().Info("%s", service.GetName()) // Check whether this object already exists curService, err := w.c.getService(ctx, service) diff --git a/pkg/controller/chi/worker-status-helpers.go b/pkg/controller/chi/worker-status-helpers.go index f04cb3f2d..0ab1e3287 100644 --- a/pkg/controller/chi/worker-status-helpers.go +++ b/pkg/controller/chi/worker-status-helpers.go @@ -16,6 +16,7 @@ package chi import ( "context" + "errors" "time" core "k8s.io/api/core/v1" @@ -180,6 +181,45 @@ func (w *worker) doesHostHaveNoReplicationDelay(ctx context.Context, host *api.H return delay <= chop.Config().Reconcile.Host.Wait.Replicas.Delay.IntValue() } +func (w *worker) syncHealthOK(ctx context.Context, host *api.Host, deadline time.Time) (ok bool, hardFail bool, err error) { + clusterSchemer := w.ensureClusterSchemer(host) + readHealth := func(read func(context.Context, *api.Host) (int, error)) (int, bool, error) { + if contextError := ctx.Err(); contextError != nil { + return 0, false, contextError + } + queryCtx, cancel := context.WithDeadline(ctx, deadline) + defer cancel() + healthValue, queryErr := read(queryCtx, host) + if contextError := ctx.Err(); contextError != nil { + return 0, false, contextError + } + if queryCtx.Err() != nil || errors.Is(queryErr, context.DeadlineExceeded) { + return 0, true, nil + } + if queryErr != nil { + return 0, false, queryErr + } + return healthValue, false, nil + } + + readonly, notReady, err := readHealth(clusterSchemer.HostMaxIsReadonly) + if err != nil || notReady { + return false, false, err + } + sessionExpired, notReady, err := readHealth(clusterSchemer.HostMaxIsSessionExpired) + if err != nil || notReady { + return false, false, err + } + replicaDelay, notReady, err := readHealth(clusterSchemer.HostMaxReplicaDelay) + if err != nil || notReady { + return false, false, err + } + if readonly != 0 || sessionExpired != 0 { + return false, true, nil + } + return replicaDelay <= chop.Config().Reconcile.Host.Wait.Replicas.Delay.IntValue(), false, nil +} + // isCHIProcessedOnTheSameIP checks whether it is just a restart of the operator on the same IP func (w *worker) isCHIProcessedOnTheSameIP(chi *api.ClickHouseInstallation) bool { ip, _ := chop.GetRuntimeParam(deployment.OPERATOR_POD_IP) diff --git a/pkg/controller/chi/worker-sync-gate_test.go b/pkg/controller/chi/worker-sync-gate_test.go new file mode 100644 index 000000000..cfa62e4b0 --- /dev/null +++ b/pkg/controller/chi/worker-sync-gate_test.go @@ -0,0 +1,69 @@ +package chi + +import ( + "errors" + "testing" + "time" + + common "github.com/altinity/clickhouse-operator/pkg/controller/common" + a "github.com/altinity/clickhouse-operator/pkg/controller/common/announcer" +) + +func healthWindowStepForTest(counter int, ok bool, threshold int) (int, bool) { + return healthWindowStep(counter, ok, threshold) +} + +func TestHealthWindowConsecutive(t *testing.T) { + counter := 0 + done := false + for i := 0; i < 6; i++ { + counter, done = healthWindowStepForTest(counter, true, 6) + } + if !done || counter != 6 { + t.Fatalf("6 consecutive OK must satisfy threshold; counter=%d done=%v", counter, done) + } +} + +func TestHealthWindowResetsOnFailure(t *testing.T) { + counter, _ := healthWindowStepForTest(0, true, 6) + counter, _ = healthWindowStepForTest(counter, true, 6) + counter, done := healthWindowStepForTest(counter, false, 6) + if counter != 0 || done { + t.Fatalf("not-OK poll must reset counter; counter=%d done=%v", counter, done) + } +} + +func TestOnSoftTimeoutNeverPushesMarker(t *testing.T) { + advance, pushMarker, err := onSoftTimeout("proceed") + if !advance || pushMarker || err != nil { + t.Fatalf("proceed => advance without marker; got advance=%v push=%v err=%v", advance, pushMarker, err) + } + + advance, pushMarker, err = onSoftTimeout("abort") + if advance || pushMarker || !errors.Is(err, common.ErrCRUDAbort) { + t.Fatalf("abort => abort without marker; got advance=%v push=%v err=%v", advance, pushMarker, err) + } +} + +func TestSyncGateHealthStepTreatsHardFailAsNotReadyBeforeDeadline(t *testing.T) { + counter, done, hardDeadline := syncGateHealthStep(3, true, true, 6, time.Second) + if counter != 0 || done || hardDeadline { + t.Fatalf("hard health before deadline must reset and keep waiting; counter=%d done=%v hardDeadline=%v", counter, done, hardDeadline) + } +} + +func TestSyncGateHealthStepReturnsHardFailAtDeadline(t *testing.T) { + counter, done, hardDeadline := syncGateHealthStep(3, true, true, 6, 0) + if counter != 0 || done || !hardDeadline { + t.Fatalf("hard health at deadline must hard fail; counter=%d done=%v hardDeadline=%v", counter, done, hardDeadline) + } +} + +func TestReplicaSyncGateEventReasonDistinguishesProceedWithoutMarker(t *testing.T) { + if got := replicaSyncGateEventReason(true); got != a.EventReasonReconcileCompleted { + t.Fatalf("caught-up sync gate must report completed event; got %s", got) + } + if got := replicaSyncGateEventReason(false); got == a.EventReasonReconcileCompleted { + t.Fatalf("proceed without marker must not report completed event") + } +} diff --git a/pkg/controller/chi/worker-wait-exclude-include-restart.go b/pkg/controller/chi/worker-wait-exclude-include-restart.go index 370aa2867..dca7dca8f 100644 --- a/pkg/controller/chi/worker-wait-exclude-include-restart.go +++ b/pkg/controller/chi/worker-wait-exclude-include-restart.go @@ -16,16 +16,20 @@ package chi import ( "context" + "errors" + "fmt" "time" log "github.com/altinity/clickhouse-operator/pkg/announcer" api "github.com/altinity/clickhouse-operator/pkg/apis/clickhouse.altinity.com/v1" "github.com/altinity/clickhouse-operator/pkg/apis/common/types" "github.com/altinity/clickhouse-operator/pkg/chop" + common "github.com/altinity/clickhouse-operator/pkg/controller/common" a "github.com/altinity/clickhouse-operator/pkg/controller/common/announcer" "github.com/altinity/clickhouse-operator/pkg/controller/common/poller" "github.com/altinity/clickhouse-operator/pkg/controller/common/poller/domain" "github.com/altinity/clickhouse-operator/pkg/interfaces" + "github.com/altinity/clickhouse-operator/pkg/model/chi/schemer" "github.com/altinity/clickhouse-operator/pkg/util" ) @@ -145,6 +149,13 @@ func (w *worker) shouldWaitReplicationHost(host *api.Host) bool { host.Runtime.Address.ReplicaIndex, host.Runtime.Address.ShardIndex, host.Runtime.Address.ClusterName) return false + case chop.Config().Reconcile.Host.Wait.Replicas.Sync.IsEnabled() && host.IsForceReplicaCatchUp(): + w.a.V(1). + M(host).F(). + Info("Force replica catch-up after data loss. Host/shard/cluster: %d/%d/%s", + host.Runtime.Address.ReplicaIndex, host.Runtime.Address.ShardIndex, host.Runtime.Address.ClusterName) + return true + case host.IsFirstInCluster(): w.a.V(1). M(host).F(). @@ -191,6 +202,21 @@ func (w *worker) shouldWaitReplicationHost(host *api.Host) bool { return false } +func healthWindowStep(counter int, ok bool, threshold int) (int, bool) { + if !ok { + return 0, false + } + counter++ + return counter, counter >= threshold +} + +func onSoftTimeout(onTimeout string) (advance bool, pushMarker bool, err error) { + if onTimeout == "proceed" { + return true, false, nil + } + return false, false, common.ErrCRUDAbort +} + // includeHost includes host back into all activities - such as cluster, service, etc func (w *worker) includeHost(ctx context.Context, host *api.Host) error { w.a.V(1). @@ -200,6 +226,7 @@ func (w *worker) includeHost(ctx context.Context, host *api.Host) error { // w.includeHostIntoClickHouseCluster(ctx, host) w.ascendHostInClickHouseCluster(ctx, host) + syncGateEnabled := chop.Config().Reconcile.Host.Wait.Replicas.Sync.IsEnabled() err := w.catchReplicationLag(ctx, host) if err == nil { w.a.V(1). @@ -212,6 +239,9 @@ func (w *worker) includeHost(ctx context.Context, host *api.Host) error { M(host).F(). Warning("Will NOT include host into cluster due to replication lag. Host/shard/cluster: %d/%d/%s", host.Runtime.Address.ReplicaIndex, host.Runtime.Address.ShardIndex, host.Runtime.Address.ClusterName) + if syncGateEnabled { + return err + } } return nil @@ -330,7 +360,34 @@ func (w *worker) catchReplicationLag(ctx context.Context, host *api.Host) error // Host is alive but catching up - add to monitoring so metrics are collected during the wait w.addHostToMonitoring(host) - err := w.waitHostHasNoReplicationDelay(ctx, host) + var err error + if chop.Config().Reconcile.Host.Wait.Replicas.Sync.IsEnabled() { + var caughtUp bool + caughtUp, err = w.runReplicaSyncGate(ctx, host) + if err == nil { + w.a.V(1). + M(host).F(). + WithEvent(host.GetCR(), a.EventActionReconcile, replicaSyncGateEventReason(caughtUp)). + Info("Wait for host to catch replication lag - %s "+ + "Host/shard/cluster: %d/%d/%s", + replicaSyncGateResultLabel(caughtUp), + host.Runtime.Address.ReplicaIndex, host.Runtime.Address.ShardIndex, host.Runtime.Address.ClusterName, + ) + } else { + w.a.V(1). + M(host).F(). + WithEvent(host.GetCR(), a.EventActionReconcile, a.EventReasonReconcileFailed). + Info("Wait for host to catch replication lag - FAILED "+ + "Host/shard/cluster: %d/%d/%s"+ + "err: %v ", + host.Runtime.Address.ReplicaIndex, host.Runtime.Address.ShardIndex, host.Runtime.Address.ClusterName, + err, + ) + } + return err + } + + err = w.waitHostHasNoReplicationDelay(ctx, host) if err == nil { w.a.V(1). M(host).F(). @@ -356,6 +413,117 @@ func (w *worker) catchReplicationLag(ctx context.Context, host *api.Host) error return err } +func (w *worker) runReplicaSyncGate(ctx context.Context, host *api.Host) (bool, error) { + syncConfig := chop.Config().Reconcile.Host.Wait.Replicas.Sync + clusterSchemer := w.ensureClusterSchemer(host) + hostFQDN := w.c.namer.Name(interfaces.NameFQDN, host) + deadline := syncGateDeadline(syncConfig.GetTimeout()) + + failSoft := func(reason string) (bool, error) { + advance, _, err := onSoftTimeout(syncConfig.GetOnTimeout()) + if advance { + w.a.M(host).F().Warning("sync gate %s; proceeding without caught-up marker (onTimeout=proceed)", reason) + } + return false, err + } + classifyErr := func(err error) (bool, error) { + if err == nil { + return false, nil + } + if contextError := ctx.Err(); contextError != nil { + return false, contextError + } + if errors.Is(err, schemer.ErrGateDeadline) { + return failSoft("timed out") + } + return false, err + } + + if err := clusterSchemer.HostAsyncLoadBarrier(ctx, host, deadline); err != nil { + return classifyErr(err) + } + replicatedObjects, err := clusterSchemer.PeerReplicatedObjectCount(ctx, host, deadline) + if err != nil { + return classifyErr(err) + } + if replicatedObjects == 0 { + host.GetCR().IEnsureStatus().PushHostReplicaCaughtUp(hostFQDN) + return true, nil + } + if err := clusterSchemer.HostSyncReplicatedObjects(ctx, host, deadline); err != nil { + return classifyErr(err) + } + + healthCounter := 0 + for { + ok, hardFail, healthErr := w.syncHealthOK(ctx, host, deadline) + if healthErr != nil { + return classifyErr(healthErr) + } + + remaining := time.Until(deadline) + var done bool + var hardDeadline bool + healthCounter, done, hardDeadline = syncGateHealthStep(healthCounter, ok, hardFail, syncConfig.GetSuccessThreshold(), remaining) + if hardDeadline { + return false, syncGateHardFailError(host) + } + if done { + host.GetCR().IEnsureStatus().PushHostReplicaCaughtUp(hostFQDN) + return true, nil + } + + if remaining <= 0 { + return failSoft("health window not satisfied") + } + sleepDuration := time.Duration(syncConfig.GetPollInterval()) * time.Second + if sleepDuration > remaining { + sleepDuration = remaining + } + select { + case <-ctx.Done(): + return false, ctx.Err() + case <-time.After(sleepDuration): + if hardFail && !time.Now().Before(deadline) { + return false, syncGateHardFailError(host) + } + } + } +} + +func syncGateHealthStep(counter int, ok bool, hardFail bool, threshold int, remaining time.Duration) (int, bool, bool) { + if hardFail { + return 0, false, remaining <= 0 + } + nextCounter, done := healthWindowStep(counter, ok, threshold) + return nextCounter, done, false +} + +func syncGateHardFailError(host *api.Host) error { + return fmt.Errorf("host %s readonly or session-expired; refusing to advance", host.GetName()) +} + +func replicaSyncGateEventReason(caughtUp bool) string { + if caughtUp { + return a.EventReasonReconcileCompleted + } + return a.EventReasonReconcileProceed +} + +func replicaSyncGateResultLabel(caughtUp bool) string { + if caughtUp { + return "COMPLETED" + } + return "PROCEEDED without caught-up marker" +} + +func syncGateDeadline(timeoutSeconds int) time.Time { + if timeoutSeconds <= 0 { + return time.Now().Add(time.Hour * 24 * 365 * 100) + } + return time.Now().Add(time.Duration(timeoutSeconds) * time.Second) +} + // shouldExcludeHost determines whether host to be excluded from cluster before reconcile func (w *worker) shouldExcludeHost(ctx context.Context, host *api.Host) bool { switch { diff --git a/pkg/controller/common/announcer/event-emitter.go b/pkg/controller/common/announcer/event-emitter.go index e71547ad1..6c0be1e6e 100644 --- a/pkg/controller/common/announcer/event-emitter.go +++ b/pkg/controller/common/announcer/event-emitter.go @@ -47,6 +47,7 @@ const ( EventReasonReconcileInProgress = "ReconcileInProgress" EventReasonReconcileCompleted = "ReconcileCompleted" EventReasonReconcileFailed = "ReconcileFailed" + EventReasonReconcileProceed = "ReconcileProceed" EventReasonCreateStarted = "CreateStarted" EventReasonCreateInProgress = "CreateInProgress" EventReasonCreateCompleted = "CreateCompleted" diff --git a/pkg/model/chi/schemer/schemer.go b/pkg/model/chi/schemer/schemer.go index 6b503bedd..487787ca2 100644 --- a/pkg/model/chi/schemer/schemer.go +++ b/pkg/model/chi/schemer/schemer.go @@ -16,6 +16,8 @@ package schemer import ( "context" + "errors" + "fmt" "time" log "github.com/altinity/clickhouse-operator/pkg/announcer" @@ -34,6 +36,14 @@ type ClusterSchemer struct { version *swversion.SoftWareVersion } +type replicatedTable struct { + DatabaseName string + TableName string +} + +// ErrGateDeadline marks the shared sync-gate deadline being reached. +var ErrGateDeadline = errors.New("sync gate deadline exceeded") + // NewClusterSchemer creates new Schemer object func NewClusterSchemer(clusterConnectionParams *clickhouse.ClusterConnectionParams, version *swversion.SoftWareVersion) *ClusterSchemer { return &ClusterSchemer{ @@ -174,7 +184,103 @@ func (s *ClusterSchemer) HostClickHouseVersion(ctx context.Context, host *api.Ho // HostMaxReplicaDelay returns max replica delay on the host func (s *ClusterSchemer) HostMaxReplicaDelay(ctx context.Context, host *api.Host) (int, error) { - return s.QueryHostInt(ctx, host, s.sqlMaxReplicaDelay()) + replicaDelay, err := s.QueryHostInt(ctx, host, s.sqlMaxReplicaDelay()) + if contextError := ctx.Err(); contextError != nil { + return 0, contextError + } + return replicaDelay, err +} + +func (s *ClusterSchemer) HostMaxIsReadonly(ctx context.Context, host *api.Host) (int, error) { + readonly, err := s.QueryHostInt(ctx, host, s.sqlReplicaHealth("is_readonly")) + if contextError := ctx.Err(); contextError != nil { + return 0, contextError + } + return readonly, err +} + +func (s *ClusterSchemer) HostMaxIsSessionExpired(ctx context.Context, host *api.Host) (int, error) { + sessionExpired, err := s.QueryHostInt(ctx, host, s.sqlReplicaHealth("is_session_expired")) + if contextError := ctx.Err(); contextError != nil { + return 0, contextError + } + return sessionExpired, err +} + +func (s *ClusterSchemer) PeerReplicatedObjectCount(ctx context.Context, host *api.Host, deadline time.Time) (int, error) { + databaseNames, replicatedTables, err := s.peerReplicatedObjects(ctx, host, deadline) + if err != nil { + return 0, err + } + return len(databaseNames) + len(replicatedTables), nil +} + +func (s *ClusterSchemer) HostAsyncLoadBarrier(ctx context.Context, host *api.Host, deadline time.Time) error { + for { + asyncLoaderExists, err := s.queryHostIntWithDeadline(ctx, host, deadline, s.sqlAsyncLoaderTableExists()) + if err != nil { + return err + } + if asyncLoaderExists == 0 { + return nil + } + + pendingLoadJobs, failedLoadJobs, err := s.queryHostIntPairWithDeadline(ctx, host, deadline, s.sqlAsyncLoaderState()) + if err != nil { + return err + } + if failedLoadJobs > 0 { + failedLoadJob, detailErr := s.queryHostStringWithDeadline(ctx, host, deadline, s.sqlAsyncLoaderFailedDetails()) + if detailErr != nil { + return detailErr + } + return fmt.Errorf("async loader failed or canceled job: %s", failedLoadJob) + } + if pendingLoadJobs == 0 { + return nil + } + if err := waitForNextGatePoll(ctx, deadline); err != nil { + return err + } + } +} + +func (s *ClusterSchemer) HostSyncReplicatedObjects(ctx context.Context, host *api.Host, deadline time.Time) error { + if (s == nil) || (s.version == nil) || !s.version.Matches(">= 23.4") { + return fmt.Errorf("SYSTEM SYNC REPLICA ... LIGHTWEIGHT requires ClickHouse >= 23.4, got %s", s.version) + } + + if err := s.HostAsyncLoadBarrier(ctx, host, deadline); err != nil { + return err + } + + databaseNames, _, err := s.peerReplicatedObjects(ctx, host, deadline) + if err != nil { + return err + } + for _, databaseName := range databaseNames { + if err := s.execHostWithDeadline(ctx, host, deadline, s.sqlSyncDatabaseReplica(databaseName)); err != nil { + return err + } + } + + if err := s.HostAsyncLoadBarrier(ctx, host, deadline); err != nil { + return err + } + + _, replicatedTables, err := s.peerReplicatedObjects(ctx, host, deadline) + if err != nil { + return err + } + for _, replicatedTable := range replicatedTables { + if err := s.execHostWithDeadline(ctx, host, deadline, s.sqlWaitLoadingParts(replicatedTable.DatabaseName, replicatedTable.TableName)); err != nil { + return err + } + if err := s.execHostWithDeadline(ctx, host, deadline, s.sqlSyncReplicaLightweight(replicatedTable.DatabaseName, replicatedTable.TableName)); err != nil { + return err + } + } + return nil } // HostShutdown shutdown a host @@ -198,3 +304,210 @@ func debugCreateSQLs(names, sqls []string, err error) ([]string, []string) { } return names, sqls } + +func (s *ClusterSchemer) peerReplicatedObjects(ctx context.Context, host *api.Host, deadline time.Time) ([]string, []replicatedTable, error) { + if _, err := gateRemaining(ctx, deadline); err != nil { + return nil, nil, err + } + + peers := s.Names(interfaces.NameFQDNs, host, api.Cluster{}, true) + if len(peers) == 0 { + return nil, nil, nil + } + + queryCtx, cancel, err := gateQueryContext(ctx, deadline) + if err != nil { + return nil, nil, err + } + defer cancel() + + queryResult, err := s.Cluster.SetHosts(peers).QueryAny(queryCtx, s.sqlReplicatedObjects(host.Runtime.Address.ClusterName)) + if mappedErr := gateQueryError(ctx, queryCtx, err); mappedErr != nil { + return nil, nil, mappedErr + } + if queryResult == nil { + return nil, nil, fmt.Errorf("empty replicated object discovery result from peers %v", peers) + } + defer queryResult.Close() + + databaseNames := make([]string, 0) + replicatedTables := make([]replicatedTable, 0) + for queryResult.Rows.Next() { + var objectType string + var databaseName string + var tableName string + if err := queryResult.Rows.Scan(&objectType, &databaseName, &tableName); err != nil { + if mappedErr := gateQueryError(ctx, queryCtx, err); mappedErr != nil { + return nil, nil, mappedErr + } + return nil, nil, err + } + switch objectType { + case "database": + databaseNames = append(databaseNames, databaseName) + case "table": + replicatedTables = append(replicatedTables, replicatedTable{ + DatabaseName: databaseName, + TableName: tableName, + }) + default: + return nil, nil, fmt.Errorf("unknown replicated object type %q", objectType) + } + } + if err := queryResult.Rows.Err(); err != nil { + if mappedErr := gateQueryError(ctx, queryCtx, err); mappedErr != nil { + return nil, nil, mappedErr + } + return nil, nil, err + } + if mappedErr := gateQueryError(ctx, queryCtx, nil); mappedErr != nil { + return nil, nil, mappedErr + } + return databaseNames, replicatedTables, nil +} + +func (s *ClusterSchemer) execHostWithDeadline(ctx context.Context, host *api.Host, deadline time.Time, querySQL string) error { + remaining, err := gateRemaining(ctx, deadline) + if err != nil { + return err + } + + opts := clickhouse.NewQueryOptions() + opts.SetRetry(false) + opts.SetQueryTimeout(remaining) + + err = s.ExecHost(ctx, host, []string{sqlWithReceiveTimeout(querySQL, remaining)}, opts) + if contextError := ctx.Err(); contextError != nil { + return contextError + } + if errors.Is(err, context.DeadlineExceeded) { + return ErrGateDeadline + } + return err +} + +func (s *ClusterSchemer) queryHostIntWithDeadline(ctx context.Context, host *api.Host, deadline time.Time, querySQL string) (int, error) { + queryCtx, cancel, err := gateQueryContext(ctx, deadline) + if err != nil { + return 0, err + } + defer cancel() + + queryValue, err := s.QueryHostInt(queryCtx, host, querySQL) + if mappedErr := gateQueryError(ctx, queryCtx, err); mappedErr != nil { + return 0, mappedErr + } + return queryValue, nil +} + +func (s *ClusterSchemer) queryHostStringWithDeadline(ctx context.Context, host *api.Host, deadline time.Time, querySQL string) (string, error) { + queryCtx, cancel, err := gateQueryContext(ctx, deadline) + if err != nil { + return "", err + } + defer cancel() + + queryValue, err := s.QueryHostString(queryCtx, host, querySQL) + if mappedErr := gateQueryError(ctx, queryCtx, err); mappedErr != nil { + return "", mappedErr + } + return queryValue, nil +} + +func (s *ClusterSchemer) queryHostIntPairWithDeadline(ctx context.Context, host *api.Host, deadline time.Time, querySQL string) (int, int, error) { + queryCtx, cancel, err := gateQueryContext(ctx, deadline) + if err != nil { + return 0, 0, err + } + defer cancel() + + queryResult, err := s.QueryHost(queryCtx, host, querySQL) + if mappedErr := gateQueryError(ctx, queryCtx, err); mappedErr != nil { + return 0, 0, mappedErr + } + if queryResult == nil { + return 0, 0, fmt.Errorf("empty query result") + } + defer queryResult.Close() + + if !queryResult.Rows.Next() { + if err := queryResult.Rows.Err(); err != nil { + if mappedErr := gateQueryError(ctx, queryCtx, err); mappedErr != nil { + return 0, 0, mappedErr + } + return 0, 0, err + } + return 0, 0, fmt.Errorf("found no rows") + } + + var firstValue int + var secondValue int + if err := queryResult.Rows.Scan(&firstValue, &secondValue); err != nil { + if mappedErr := gateQueryError(ctx, queryCtx, err); mappedErr != nil { + return 0, 0, mappedErr + } + return 0, 0, err + } + if err := queryResult.Rows.Err(); err != nil { + if mappedErr := gateQueryError(ctx, queryCtx, err); mappedErr != nil { + return 0, 0, mappedErr + } + return 0, 0, err + } + if mappedErr := gateQueryError(ctx, queryCtx, nil); mappedErr != nil { + return 0, 0, mappedErr + } + return firstValue, secondValue, nil +} + +func gateQueryContext(ctx context.Context, deadline time.Time) (context.Context, context.CancelFunc, error) { + remaining, err := gateRemaining(ctx, deadline) + if err != nil { + return nil, nil, err + } + queryCtx, cancel := context.WithTimeout(ctx, remaining) + return queryCtx, cancel, nil +} + +func gateRemaining(ctx context.Context, deadline time.Time) (time.Duration, error) { + if contextError := ctx.Err(); contextError != nil { + return 0, contextError + } + remaining := time.Until(deadline) + if remaining <= 0 { + return 0, ErrGateDeadline + } + return remaining, nil +} + +func gateQueryError(parentCtx, queryCtx context.Context, err error) error { + if contextError := parentCtx.Err(); contextError != nil { + return contextError + } + if errors.Is(err, context.DeadlineExceeded) || errors.Is(queryCtx.Err(), context.DeadlineExceeded) { + return ErrGateDeadline + } + if contextError := queryCtx.Err(); contextError != nil { + return contextError + } + return err +} + +func waitForNextGatePoll(ctx context.Context, deadline time.Time) error { + remaining, err := gateRemaining(ctx, deadline) + if err != nil { + return err + } + sleepDuration := time.Second + if remaining < sleepDuration { + sleepDuration = remaining + } + timer := time.NewTimer(sleepDuration) + defer timer.Stop() + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} diff --git a/pkg/model/chi/schemer/sql.go b/pkg/model/chi/schemer/sql.go index c6f6fb126..d69ec137c 100644 --- a/pkg/model/chi/schemer/sql.go +++ b/pkg/model/chi/schemer/sql.go @@ -17,6 +17,8 @@ package schemer import ( "context" "fmt" + "strings" + "time" "github.com/MakeNowJust/heredoc" @@ -91,6 +93,113 @@ func (s *ClusterSchemer) sqlSyncTable(ctx context.Context, host *api.Host) ([]st return names, sqlStatements, nil } +func (s *ClusterSchemer) sqlReplicaHealth(column string) string { + return fmt.Sprintf("SELECT coalesce(max(%s),0) FROM system.replicas", column) +} + +func (s *ClusterSchemer) sqlSyncReplicaLightweight(databaseName, tableName string) string { + return fmt.Sprintf(`SYSTEM SYNC REPLICA "%s"."%s" LIGHTWEIGHT`, quoteIdent(databaseName), quoteIdent(tableName)) +} + +func (s *ClusterSchemer) sqlSyncDatabaseReplica(databaseName string) string { + return fmt.Sprintf(`SYSTEM SYNC DATABASE REPLICA "%s"`, quoteIdent(databaseName)) +} + +func (s *ClusterSchemer) sqlWaitLoadingParts(databaseName, tableName string) string { + return fmt.Sprintf(`SYSTEM WAIT LOADING PARTS "%s"."%s"`, quoteIdent(databaseName), quoteIdent(tableName)) +} + +func (s *ClusterSchemer) sqlAsyncLoaderTableExists() string { + return "SELECT count() FROM system.tables WHERE database='system' AND name='asynchronous_loader'" +} + +func (s *ClusterSchemer) sqlAsyncLoaderState() string { + return heredoc.Doc(` + SELECT + countIf(status = 'PENDING' OR is_executing = 1 OR is_ready = 1 OR is_blocked = 1), + countIf(status IN ('FAILED', 'CANCELED')) + FROM + system.asynchronous_loader + WHERE + startsWith(job, 'startup ') AND + (position(job, ' database ') > 0 OR position(job, ' table ') > 0) + `) +} + +func (s *ClusterSchemer) sqlAsyncLoaderFailedDetails() string { + return heredoc.Doc(` + SELECT + concat(job, ': ', status, ifNull(concat(': ', exception), '')) + FROM + system.asynchronous_loader + WHERE + startsWith(job, 'startup ') AND + (position(job, ' database ') > 0 OR position(job, ' table ') > 0) AND + status IN ('FAILED', 'CANCELED') + LIMIT 1 + `) +} + +func (s *ClusterSchemer) sqlReplicatedObjects(cluster string) string { + return heredoc.Docf(` + SELECT + 'database' AS object_type, + name AS database, + '' AS table_name + FROM + ( + SELECT * + FROM clusterAllReplicas('%s', system.databases) + SETTINGS skip_unavailable_shards = 1 + ) databases + WHERE + name NOT IN (%s) AND + engine = 'Replicated' + UNION ALL + SELECT + 'table' AS object_type, + database, + name AS table_name + FROM + ( + SELECT * + FROM clusterAllReplicas('%s', system.tables) + SETTINGS skip_unavailable_shards = 1 + ) tables + WHERE + database NOT IN (%s) AND + engine LIKE 'Replicated%%' + `, + cluster, + ignoredDBs, + cluster, + ignoredDBs, + ) +} + +func sqlWithReceiveTimeout(sql string, remaining time.Duration) string { + seconds := receiveTimeoutSeconds(remaining) + return fmt.Sprintf("%s SETTINGS receive_timeout=%d", sql, seconds) +} + +func receiveTimeoutSeconds(remaining time.Duration) int64 { + if remaining <= 0 { + return 1 + } + seconds := int64(remaining / time.Second) + if remaining%time.Second != 0 { + seconds++ + } + if seconds < 1 { + return 1 + } + return seconds +} + +func quoteIdent(identifier string) string { + return strings.ReplaceAll(identifier, `"`, `""`) +} + func (s *ClusterSchemer) sqlCreateDatabaseDistributed(cluster string) string { var createDatabaseStmt string switch { diff --git a/pkg/model/chi/schemer/sql_sync_test.go b/pkg/model/chi/schemer/sql_sync_test.go new file mode 100644 index 000000000..59273c97c --- /dev/null +++ b/pkg/model/chi/schemer/sql_sync_test.go @@ -0,0 +1,111 @@ +package schemer + +import ( + "context" + "errors" + "strings" + "testing" + "time" + + api "github.com/altinity/clickhouse-operator/pkg/apis/clickhouse.altinity.com/v1" + "github.com/altinity/clickhouse-operator/pkg/apis/swversion" +) + +func TestQuoteIdentDoublesQuotes(t *testing.T) { + if got := quoteIdent(`my"db`); got != `my""db` { + t.Fatalf("quoteIdent must double embedded quotes; got %q", got) + } +} + +func TestSQLReplicaHealthShape(t *testing.T) { + schemer := &ClusterSchemer{} + sql := schemer.sqlReplicaHealth("is_readonly") + if !strings.Contains(sql, "coalesce(max(is_readonly),0)") || !strings.Contains(sql, "system.replicas") { + t.Fatalf("health SQL wrong: %s", sql) + } +} + +func TestHostMaxReplicaDelayReturnsCanceledContext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + delay, err := (&ClusterSchemer{}).HostMaxReplicaDelay(ctx, &api.Host{}) + if delay != 0 || !errors.Is(err, context.Canceled) { + t.Fatalf("canceled context must be returned; delay=%d err=%v", delay, err) + } +} + +func TestSQLSyncReplicaLightweight(t *testing.T) { + schemer := &ClusterSchemer{} + sql := schemer.sqlSyncReplicaLightweight(`my"db`, "tbl") + if !strings.HasSuffix(sql, "LIGHTWEIGHT") { + t.Fatalf("table sync must end with LIGHTWEIGHT: %s", sql) + } + if !strings.Contains(sql, `"my""db"."tbl"`) { + t.Fatalf("identifiers must be quoted and escaped: %s", sql) + } +} + +func TestSQLSyncDatabaseReplicaHasNoLightweight(t *testing.T) { + schemer := &ClusterSchemer{} + sql := schemer.sqlSyncDatabaseReplica("db") + if strings.Contains(sql, "LIGHTWEIGHT") { + t.Fatalf("DATABASE REPLICA takes no LIGHTWEIGHT modifier: %s", sql) + } + if !strings.Contains(sql, "SYSTEM SYNC DATABASE REPLICA") || !strings.Contains(sql, `"db"`) { + t.Fatalf("wrong DB-sync stmt: %s", sql) + } +} + +func TestSQLWaitLoadingPartsShape(t *testing.T) { + schemer := &ClusterSchemer{} + sql := schemer.sqlWaitLoadingParts("db", "tbl") + if !strings.Contains(sql, "SYSTEM WAIT LOADING PARTS") || !strings.Contains(sql, `"db"."tbl"`) { + t.Fatalf("wrong wait-loading-parts stmt: %s", sql) + } +} + +func TestSQLWithReceiveTimeoutCeilsRemainingSeconds(t *testing.T) { + sql := sqlWithReceiveTimeout("SYSTEM SYNC REPLICA \"db\".\"tbl\" LIGHTWEIGHT", 1500*time.Millisecond) + if !strings.HasSuffix(sql, "SETTINGS receive_timeout=2") { + t.Fatalf("receive_timeout must ceil seconds: %s", sql) + } +} + +func TestSQLAsyncLoaderStateShape(t *testing.T) { + schemer := &ClusterSchemer{} + sql := schemer.sqlAsyncLoaderState() + if !strings.Contains(sql, "countIf(status = 'PENDING'") || !strings.Contains(sql, "status IN ('FAILED', 'CANCELED')") { + t.Fatalf("async loader state SQL must count pending and failed jobs: %s", sql) + } + if !strings.Contains(sql, "startsWith(job, 'startup ')") || !strings.Contains(sql, " database ") { + t.Fatalf("async loader state SQL must filter relevant startup load jobs: %s", sql) + } +} + +func TestHostSyncReplicatedObjectsRejectsUnsupportedLightweightVersion(t *testing.T) { + schemer := &ClusterSchemer{version: swversion.NewSoftWareVersion("23.3.22")} + err := schemer.HostSyncReplicatedObjects(context.Background(), &api.Host{}, time.Now().Add(time.Minute)) + if err == nil { + t.Fatalf("expected unsupported LIGHTWEIGHT error") + } + if !strings.Contains(err.Error(), "requires ClickHouse >= 23.4") { + t.Fatalf("wrong version error: %v", err) + } +} + +func TestHostAsyncLoadBarrierReturnsGateDeadlineWhenExpired(t *testing.T) { + schemer := &ClusterSchemer{} + err := schemer.HostAsyncLoadBarrier(context.Background(), &api.Host{}, time.Now().Add(-time.Second)) + if !errors.Is(err, ErrGateDeadline) { + t.Fatalf("expected ErrGateDeadline, got %v", err) + } +} + +func TestPeerReplicatedObjectCountReturnsGateDeadlineWhenExpired(t *testing.T) { + schemer := &ClusterSchemer{} + _, err := schemer.PeerReplicatedObjectCount(context.Background(), &api.Host{}, time.Now().Add(-time.Second)) + if !errors.Is(err, ErrGateDeadline) { + t.Fatalf("expected ErrGateDeadline, got %v", err) + } +} diff --git a/tests/e2e/manifests/chi/test-079-sync-gate-1.yaml b/tests/e2e/manifests/chi/test-079-sync-gate-1.yaml new file mode 100644 index 000000000..1fa76d8bd --- /dev/null +++ b/tests/e2e/manifests/chi/test-079-sync-gate-1.yaml @@ -0,0 +1,17 @@ +apiVersion: "clickhouse.altinity.com/v1" +kind: "ClickHouseInstallation" +metadata: + name: "test-079-sync-gate" +spec: + useTemplates: + - name: clickhouse-version + configuration: + zookeeper: + nodes: + - host: zookeeper + port: 2181 + clusters: + - name: "default" + layout: + shardsCount: 1 + replicasCount: 1 diff --git a/tests/e2e/manifests/chi/test-079-sync-gate-2.yaml b/tests/e2e/manifests/chi/test-079-sync-gate-2.yaml new file mode 100644 index 000000000..e452c1b2a --- /dev/null +++ b/tests/e2e/manifests/chi/test-079-sync-gate-2.yaml @@ -0,0 +1,17 @@ +apiVersion: "clickhouse.altinity.com/v1" +kind: "ClickHouseInstallation" +metadata: + name: "test-079-sync-gate" +spec: + useTemplates: + - name: clickhouse-version + configuration: + zookeeper: + nodes: + - host: zookeeper + port: 2181 + clusters: + - name: "default" + layout: + shardsCount: 1 + replicasCount: 3 diff --git a/tests/e2e/manifests/chopconf/test-079-sync-gate.yaml b/tests/e2e/manifests/chopconf/test-079-sync-gate.yaml new file mode 100644 index 000000000..30b2541be --- /dev/null +++ b/tests/e2e/manifests/chopconf/test-079-sync-gate.yaml @@ -0,0 +1,17 @@ +apiVersion: "clickhouse.altinity.com/v1" +kind: "ClickHouseOperatorConfiguration" +metadata: + name: "sync-gate" +spec: + reconcile: + host: + wait: + replicas: + sync: + enabled: "true" + mode: "lightweight" + timeout: 120 + onTimeout: "abort" + health: + pollInterval: 5 + successThreshold: 3 diff --git a/tests/e2e/test_operator.py b/tests/e2e/test_operator.py index 5ccc492b3..046e934b3 100644 --- a/tests/e2e/test_operator.py +++ b/tests/e2e/test_operator.py @@ -5702,7 +5702,7 @@ def test_010056(self): assert out != "0" with And("Replica still should be unready after reconcile timeout"): - ready = kubectl.get_field("pod", f"chi-{chi}-{cluster}-0-1-0", ".metadata.labels.clickhouse\.altinity\.com\/ready") + ready = kubectl.get_field("pod", f"chi-{chi}-{cluster}-0-1-0", r".metadata.labels.clickhouse\.altinity\.com\/ready") print(f"ready label={ready}") assert ready != "yes", error("Replica should be unready") @@ -5728,7 +5728,7 @@ def test_010056(self): with Then("Replica should become ready"): kubectl.wait_field("pod", f"chi-{chi}-{cluster}-0-1-0", - ".metadata.labels.clickhouse\.altinity\.com\/ready", value="yes") + r".metadata.labels.clickhouse\.altinity\.com\/ready", value="yes") with And("Replication delay should be zero"): out = clickhouse.query(chi, "select max(absolute_delay) from system.replicas", host=f"chi-{chi}-{cluster}-0-1-0") @@ -7077,6 +7077,164 @@ def test_010072(self): with Finally("I clean up"): delete_test_namespace() + +@TestScenario +@Name("test_010079. Test replicated host sync gate") +def test_010079(self): + create_shell_namespace_clickhouse_template() + + with Given("I enable replicated host sync gate"): + util.apply_operator_config("manifests/chopconf/test-079-sync-gate.yaml") + + util.require_keeper(keeper_type=self.context.keeper_type) + + manifest = "manifests/chi/test-079-sync-gate-1.yaml" + chi = yaml_manifest.get_name(util.get_full_path(manifest)) + cluster = "default" + source_host = f"chi-{chi}-{cluster}-0-0-0" + delayed_replica_host = f"chi-{chi}-{cluster}-0-1-0" + next_replica_host = f"chi-{chi}-{cluster}-0-2-0" + delayed_replica_fqdn = f"chi-{chi}-{cluster}-0-1.{current().context.test_namespace}.svc.cluster.local" + + def get_replica_caught_up_hosts(): + chi_status = kubectl.get("chi", chi).get("status") or {} + return chi_status.get("hostsWithReplicaCaughtUp") or [] + + def wait_table_exists_on_delayed_replica(): + table_exists = "0" + for attempt_index in range(1, 11): + table_exists = clickhouse.query_with_error( + chi, + "select count() from system.tables where name='test_079'", + host=delayed_replica_host, + ) + if table_exists == "1": + break + retry_sleep(attempt_index, 10, "Table is not ready on delayed replica") + assert table_exists == "1", error("Table was not created on a new replica") + + def wait_replica_caught_up_marker(): + caught_up_hosts = [] + for attempt_index in range(1, 25): + caught_up_hosts = get_replica_caught_up_hosts() + if delayed_replica_fqdn in caught_up_hosts: + break + retry_sleep(attempt_index, 5, "Replica caught-up marker is not ready") + assert delayed_replica_fqdn in caught_up_hosts, error("Replica caught-up marker was not written") + + def wait_delayed_replica_row_count(expected_count): + row_count = "" + for attempt_index in range(1, 13): + row_count = clickhouse.query(chi, "select count() from test_079", host=delayed_replica_host) + if row_count == expected_count: + break + retry_sleep(attempt_index, 5, "Table data is not yet replicated") + assert row_count == expected_count, error("Table data has not been replicated") + + with Given("CHI is installed"): + kubectl.create_and_check( + manifest=manifest, + check={ + "pod_count": 1, + "apply_templates": { + current().context.clickhouse_template, + }, + "do_not_delete": 1, + }, + ) + + with Then("Create a replicated table"): + clickhouse.query( + chi, + "CREATE TABLE test_079 (a Int64) Engine = ReplicatedMergeTree('/clickhouse/tables/{database}/{table}', '{replica}') ORDER BY a PARTITION BY a", + ) + clickhouse.query(chi, "INSERT INTO test_079 SELECT 1") + + with And("STOP REPLICATED SENDS"): + clickhouse.query(chi, "SYSTEM STOP REPLICATED SENDS", host=source_host) + + with When("Scale to three replicas while the new replica is delayed"): + kubectl.create_and_check( + manifest="manifests/chi/test-079-sync-gate-2.yaml", + check={ + "do_not_delete": 1, + "pod_count": 2, + "chi_status": "InProgress", + }, + ) + + with Then("Table should be created on the delayed replica"): + wait_table_exists_on_delayed_replica() + + with And("Table should have no data replicated"): + query_result = clickhouse.query(chi, "select count() from test_079", host=delayed_replica_host) + assert query_result == "0", error("Table data has been replicated") + + with And("Replication delay should be non-zero"): + replica_delay = clickhouse.query( + chi, + "select max(absolute_delay) from system.replicas", + host=delayed_replica_host, + ) + print(f"max(absolute_delay)={replica_delay}") + assert replica_delay != "0" + + with And("Wait for the sync gate to observe the delayed replica"): + time.sleep(30) + + with And("Delayed replica should not have a caught-up marker"): + caught_up_hosts = get_replica_caught_up_hosts() + print(yaml.safe_dump(caught_up_hosts)) + assert delayed_replica_fqdn not in caught_up_hosts + + with And("Next replica should not be created while the gate waits"): + pod_count = kubectl.get_count("pod", chi=chi) + assert pod_count == 2, error(f"Expected 2 pods while gate waits, got {pod_count}") + next_replica_pod = kubectl.get("pod", next_replica_host, ok_to_fail=True) + assert next_replica_pod is None, error("Next replica should not be created before sync completes") + + with And("Delayed replica should still be unready"): + ready_label = kubectl.get_field( + "pod", + delayed_replica_host, + r".metadata.labels.clickhouse\.altinity\.com\/ready", + ) + print(f"ready label={ready_label}") + assert ready_label != "yes", error("Delayed replica should be unready") + + with When("START REPLICATED SENDS"): + clickhouse.query(chi, "SYSTEM START REPLICATED SENDS", host=source_host) + + with And("Live inserts continue after sync starts"): + clickhouse.query(chi, "INSERT INTO test_079 SELECT number + 2 FROM numbers(5)", host=source_host) + + with Then("Delayed replica should receive a caught-up marker"): + wait_replica_caught_up_marker() + + with And("Delayed replica should become ready"): + kubectl.wait_field( + "pod", + delayed_replica_host, + r".metadata.labels.clickhouse\.altinity\.com\/ready", + value="yes", + ) + + with And("Next replica should be created after sync completes"): + kubectl.wait_object("pod", "", label=f"-l clickhouse.altinity.com/chi={chi}", count=3) + kubectl.wait_field( + "pod", + next_replica_host, + r".metadata.labels.clickhouse\.altinity\.com\/ready", + value="yes", + ) + + with And("Live inserts should be visible on the synced replica"): + wait_delayed_replica_row_count("6") + + with Finally("I clean up"): + delete_test_namespace() + + @TestScenario @Tags("HEAVY") @Requirements(RQ_SRS_026_ClickHouseOperator_EnableHttps("1.0"))