diff --git a/CHANGELOG.md b/CHANGELOG.md index ce3dff33..0ee6a903 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ All notable changes to this project will be documented in this file. - Spark applications now correctly handle the case where both the History Server and the S3 connection use the same TLS secret class ([#655]). Previously, the Spark application pods contained the same TLS volume twice, which could not be applied to the API server. - The spark-submit job now sets the correct `-Djavax.net.ssl.trustStore` properties ([#655]). +- Spark application jobs can now have pod/node affinities. This was an omission as the application driver and executors already had this field for a long time. ([#664]). ### Changed @@ -49,6 +50,7 @@ All notable changes to this project will be documented in this file. [#656]: https://github.com/stackabletech/spark-k8s-operator/pull/656 [#660]: https://github.com/stackabletech/spark-k8s-operator/pull/660 [#663]: https://github.com/stackabletech/spark-k8s-operator/pull/663 +[#664]: https://github.com/stackabletech/spark-k8s-operator/pull/664 ## [25.11.0] - 2025-11-07 diff --git a/Cargo.nix b/Cargo.nix index 1a28d387..c61759a9 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -4815,7 +4815,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "f9b117c8c08557e9774f33145bb009fb74cb2437"; - sha256 = "0yxp9d7x3xzlc7i67mjkizf587hvx8kwjly9p10x320hvp91qf17"; + sha256 = "1yg7hbpgclp1zvfnhi4qkrwbgsa19v86plh77vqvwxzdxxxvxr4c"; }; libName = "k8s_version"; authors = [ @@ -9488,7 +9488,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "f9b117c8c08557e9774f33145bb009fb74cb2437"; - sha256 = "0yxp9d7x3xzlc7i67mjkizf587hvx8kwjly9p10x320hvp91qf17"; + sha256 = "1yg7hbpgclp1zvfnhi4qkrwbgsa19v86plh77vqvwxzdxxxvxr4c"; }; libName = "stackable_certs"; authors = [ @@ -9591,7 +9591,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "f9b117c8c08557e9774f33145bb009fb74cb2437"; - sha256 = "0yxp9d7x3xzlc7i67mjkizf587hvx8kwjly9p10x320hvp91qf17"; + sha256 = "1yg7hbpgclp1zvfnhi4qkrwbgsa19v86plh77vqvwxzdxxxvxr4c"; }; libName = "stackable_operator"; authors = [ @@ -9764,7 +9764,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "f9b117c8c08557e9774f33145bb009fb74cb2437"; - sha256 = "0yxp9d7x3xzlc7i67mjkizf587hvx8kwjly9p10x320hvp91qf17"; + sha256 = "1yg7hbpgclp1zvfnhi4qkrwbgsa19v86plh77vqvwxzdxxxvxr4c"; }; procMacro = true; libName = "stackable_operator_derive"; @@ -9799,7 +9799,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "f9b117c8c08557e9774f33145bb009fb74cb2437"; - sha256 = "0yxp9d7x3xzlc7i67mjkizf587hvx8kwjly9p10x320hvp91qf17"; + sha256 = "1yg7hbpgclp1zvfnhi4qkrwbgsa19v86plh77vqvwxzdxxxvxr4c"; }; libName = "stackable_shared"; authors = [ @@ -9990,7 +9990,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "f9b117c8c08557e9774f33145bb009fb74cb2437"; - sha256 = "0yxp9d7x3xzlc7i67mjkizf587hvx8kwjly9p10x320hvp91qf17"; + sha256 = "1yg7hbpgclp1zvfnhi4qkrwbgsa19v86plh77vqvwxzdxxxvxr4c"; }; libName = "stackable_telemetry"; authors = [ @@ -10100,7 +10100,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "f9b117c8c08557e9774f33145bb009fb74cb2437"; - sha256 = "0yxp9d7x3xzlc7i67mjkizf587hvx8kwjly9p10x320hvp91qf17"; + sha256 = "1yg7hbpgclp1zvfnhi4qkrwbgsa19v86plh77vqvwxzdxxxvxr4c"; }; libName = "stackable_versioned"; authors = [ @@ -10144,7 +10144,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "f9b117c8c08557e9774f33145bb009fb74cb2437"; - sha256 = "0yxp9d7x3xzlc7i67mjkizf587hvx8kwjly9p10x320hvp91qf17"; + sha256 = "1yg7hbpgclp1zvfnhi4qkrwbgsa19v86plh77vqvwxzdxxxvxr4c"; }; procMacro = true; libName = "stackable_versioned_macros"; @@ -10212,7 +10212,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "f9b117c8c08557e9774f33145bb009fb74cb2437"; - sha256 = "0yxp9d7x3xzlc7i67mjkizf587hvx8kwjly9p10x320hvp91qf17"; + sha256 = "1yg7hbpgclp1zvfnhi4qkrwbgsa19v86plh77vqvwxzdxxxvxr4c"; }; libName = "stackable_webhook"; authors = [ diff --git a/crate-hashes.json b/crate-hashes.json index 0ec63dc5..b41e87f3 100644 --- a/crate-hashes.json +++ b/crate-hashes.json @@ -4,14 +4,14 @@ "git+https://github.com/kube-rs/kube-rs?rev=fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5#kube-derive@3.0.1": "1irm4g79crlxjm3iqrgvx0f6wxdcj394ky84q89pk9i36y2mlw3n", "git+https://github.com/kube-rs/kube-rs?rev=fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5#kube-runtime@3.0.1": "1irm4g79crlxjm3iqrgvx0f6wxdcj394ky84q89pk9i36y2mlw3n", "git+https://github.com/kube-rs/kube-rs?rev=fe69cc486ff8e62a7da61d64ec3ebbd9e64c43b5#kube@3.0.1": "1irm4g79crlxjm3iqrgvx0f6wxdcj394ky84q89pk9i36y2mlw3n", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.106.2#k8s-version@0.1.3": "0yxp9d7x3xzlc7i67mjkizf587hvx8kwjly9p10x320hvp91qf17", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.106.2#stackable-certs@0.4.0": "0yxp9d7x3xzlc7i67mjkizf587hvx8kwjly9p10x320hvp91qf17", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.106.2#stackable-operator-derive@0.3.1": "0yxp9d7x3xzlc7i67mjkizf587hvx8kwjly9p10x320hvp91qf17", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.106.2#stackable-operator@0.106.2": "0yxp9d7x3xzlc7i67mjkizf587hvx8kwjly9p10x320hvp91qf17", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.106.2#stackable-shared@0.1.0": "0yxp9d7x3xzlc7i67mjkizf587hvx8kwjly9p10x320hvp91qf17", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.106.2#stackable-telemetry@0.6.1": "0yxp9d7x3xzlc7i67mjkizf587hvx8kwjly9p10x320hvp91qf17", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.106.2#stackable-versioned-macros@0.8.3": "0yxp9d7x3xzlc7i67mjkizf587hvx8kwjly9p10x320hvp91qf17", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.106.2#stackable-versioned@0.8.3": "0yxp9d7x3xzlc7i67mjkizf587hvx8kwjly9p10x320hvp91qf17", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.106.2#stackable-webhook@0.9.0": "0yxp9d7x3xzlc7i67mjkizf587hvx8kwjly9p10x320hvp91qf17", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.106.2#k8s-version@0.1.3": "1yg7hbpgclp1zvfnhi4qkrwbgsa19v86plh77vqvwxzdxxxvxr4c", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.106.2#stackable-certs@0.4.0": "1yg7hbpgclp1zvfnhi4qkrwbgsa19v86plh77vqvwxzdxxxvxr4c", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.106.2#stackable-operator-derive@0.3.1": "1yg7hbpgclp1zvfnhi4qkrwbgsa19v86plh77vqvwxzdxxxvxr4c", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.106.2#stackable-operator@0.106.2": "1yg7hbpgclp1zvfnhi4qkrwbgsa19v86plh77vqvwxzdxxxvxr4c", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.106.2#stackable-shared@0.1.0": "1yg7hbpgclp1zvfnhi4qkrwbgsa19v86plh77vqvwxzdxxxvxr4c", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.106.2#stackable-telemetry@0.6.1": "1yg7hbpgclp1zvfnhi4qkrwbgsa19v86plh77vqvwxzdxxxvxr4c", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.106.2#stackable-versioned-macros@0.8.3": "1yg7hbpgclp1zvfnhi4qkrwbgsa19v86plh77vqvwxzdxxxvxr4c", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.106.2#stackable-versioned@0.8.3": "1yg7hbpgclp1zvfnhi4qkrwbgsa19v86plh77vqvwxzdxxxvxr4c", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.106.2#stackable-webhook@0.9.0": "1yg7hbpgclp1zvfnhi4qkrwbgsa19v86plh77vqvwxzdxxxvxr4c", "git+https://github.com/stackabletech/product-config.git?tag=0.8.0#product-config@0.8.0": "1dz70kapm2wdqcr7ndyjji0lhsl98bsq95gnb2lw487wf6yr7987" } \ No newline at end of file diff --git a/docs/modules/spark-k8s/pages/usage-guide/operations/pod-placement.adoc b/docs/modules/spark-k8s/pages/usage-guide/operations/pod-placement.adoc index 34de5f2b..ffd247ef 100644 --- a/docs/modules/spark-k8s/pages/usage-guide/operations/pod-placement.adoc +++ b/docs/modules/spark-k8s/pages/usage-guide/operations/pod-placement.adoc @@ -1,11 +1,64 @@ = Pod Placement +:description: Learn how to configure pod placement for Spark applications and Spark History Server on the Stackable Data Platform. == Spark Applications -You can configure pod placement of application drivers and executors by adding an `affinity` property to the corresponding configuration section. +You can configure pod placement of the submit job, application driver and executors by adding an `affinity` property to the corresponding configuration section. + +Refer to the https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/[Kubernetes documentation] for more information about affinity. By default, the operator doesn't configure any affinity. +The following example shows how to use the `spec.job.config.affinity` property to configure the pod placement of the submit job. +In a similar way, you can configure the pod placement of the driver and executors by using the `spec.driver.config.affinity` and `spec.executor.config.affinity` properties respectively. + +[source,yaml] +---- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: examples # <1> +spec: + mode: cluster + mainApplicationFile: app.jar + sparkImage: + productVersion: 4.1.1 + job: + config: + affinity: # <2> + nodeSelector: # <3> + affinity-role: job + nodeAffinity: # <4> + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 11 + preference: + matchExpressions: + - key: topology.kubernetes.io/zone + operator: In + values: + - fictional-zone-job + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: topology.kubernetes.io/zone + operator: In + values: + - antarctica-east1 + - antarctica-west1 + podAffinity: # <5> + # ... + podAntiAffinity: # <6> + # ... +---- +<1> The name of the SparkApplication. +<2> The affinity configuration for the submit job. +<3> A node selector that matches nodes with the label `affinity-role=job`. +<4> A node affinity with both preferred and required rules. +<5> A pod affinity configuration. +<6> A pod anti-affinity configuration. + +Pod placement policies can also be configured in xref:usage-guide/app_templates.adoc[Spark Application Templates]. + == Spark History Server You can configure the Pod placement of Spark History Server pods as described in xref:concepts:operations/pod_placement.adoc[]. diff --git a/extra/crds.yaml b/extra/crds.yaml index 54dafd49..3299fab7 100644 --- a/extra/crds.yaml +++ b/extra/crds.yaml @@ -1556,6 +1556,38 @@ spec: config: default: {} properties: + affinity: + default: + nodeAffinity: null + nodeSelector: null + podAffinity: null + podAntiAffinity: null + description: |- + These configuration settings control + [Pod placement](https://docs.stackable.tech/home/nightly/concepts/operations/pod_placement). + properties: + nodeAffinity: + description: Same as the `spec.affinity.nodeAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + nodeSelector: + additionalProperties: + type: string + description: Simple key-value pairs forming a nodeSelector, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + podAffinity: + description: Same as the `spec.affinity.podAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + podAntiAffinity: + description: Same as the `spec.affinity.podAntiAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + type: object requestedSecretLifetime: description: |- Request secret (currently only autoTls certificates) lifetime from the secret operator, e.g. `7d`, or `30d`. @@ -5975,6 +6007,38 @@ spec: config: default: {} properties: + affinity: + default: + nodeAffinity: null + nodeSelector: null + podAffinity: null + podAntiAffinity: null + description: |- + These configuration settings control + [Pod placement](https://docs.stackable.tech/home/nightly/concepts/operations/pod_placement). + properties: + nodeAffinity: + description: Same as the `spec.affinity.nodeAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + nodeSelector: + additionalProperties: + type: string + description: Simple key-value pairs forming a nodeSelector, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + podAffinity: + description: Same as the `spec.affinity.podAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + podAntiAffinity: + description: Same as the `spec.affinity.podAntiAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + type: object requestedSecretLifetime: description: |- Request secret (currently only autoTls certificates) lifetime from the secret operator, e.g. `7d`, or `30d`. diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 3edd46f3..e4721f5d 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -1288,6 +1288,243 @@ mod tests { ); } + #[test] + fn test_role_affinities() { + let spark_application = serde_yaml::from_str::(indoc! {r#" +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: spark-examples +spec: + mode: cluster + mainApplicationFile: test.py + sparkImage: + productVersion: 1.2.3 + job: + config: + affinity: + nodeSelector: + affinity-role: job + nodeAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 11 + preference: + matchExpressions: + - key: topology.kubernetes.io/zone + operator: In + values: + - fictional-zone-job + podAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 21 + podAffinityTerm: + topologyKey: kubernetes.io/hostname + labelSelector: + matchLabels: + app.kubernetes.io/component: fictional-job + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 31 + podAffinityTerm: + topologyKey: kubernetes.io/hostname + labelSelector: + matchLabels: + app.kubernetes.io/component: fictional-job + driver: + config: + affinity: + nodeSelector: + affinity-role: driver + nodeAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 12 + preference: + matchExpressions: + - key: topology.kubernetes.io/zone + operator: In + values: + - fictional-zone-driver + podAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 22 + podAffinityTerm: + topologyKey: kubernetes.io/hostname + labelSelector: + matchLabels: + app.kubernetes.io/component: fictional-driver + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 32 + podAffinityTerm: + topologyKey: kubernetes.io/hostname + labelSelector: + matchLabels: + app.kubernetes.io/component: fictional-driver + executor: + replicas: 1 + config: + affinity: + nodeSelector: + affinity-role: executor + nodeAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 13 + preference: + matchExpressions: + - key: topology.kubernetes.io/zone + operator: In + values: + - fictional-zone-executor + podAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 23 + podAffinityTerm: + topologyKey: kubernetes.io/hostname + labelSelector: + matchLabels: + app.kubernetes.io/component: fictional-executor + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 33 + podAffinityTerm: + topologyKey: kubernetes.io/hostname + labelSelector: + matchLabels: + app.kubernetes.io/component: fictional-executor + + "#}) + .unwrap(); + + let job_affinity = spark_application.submit_config().unwrap().affinity; + assert_eq!( + Some("job"), + job_affinity + .node_selector + .as_ref() + .and_then(|selectors| selectors.node_selector.get("affinity-role")) + .map(String::as_str) + ); + assert_eq!( + Some(11), + job_affinity + .node_affinity + .as_ref() + .and_then(|a| a + .preferred_during_scheduling_ignored_during_execution + .as_ref()) + .and_then(|terms| terms.first()) + .map(|term| term.weight) + ); + assert_eq!( + Some(21), + job_affinity + .pod_affinity + .as_ref() + .and_then(|a| a + .preferred_during_scheduling_ignored_during_execution + .as_ref()) + .and_then(|terms| terms.first()) + .map(|term| term.weight) + ); + assert_eq!( + Some(31), + job_affinity + .pod_anti_affinity + .as_ref() + .and_then(|a| a + .preferred_during_scheduling_ignored_during_execution + .as_ref()) + .and_then(|terms| terms.first()) + .map(|term| term.weight) + ); + + let driver_affinity = spark_application.driver_config().unwrap().affinity; + assert_eq!( + Some("driver"), + driver_affinity + .node_selector + .as_ref() + .and_then(|selectors| selectors.node_selector.get("affinity-role")) + .map(String::as_str) + ); + assert_eq!( + Some(12), + driver_affinity + .node_affinity + .as_ref() + .and_then(|a| a + .preferred_during_scheduling_ignored_during_execution + .as_ref()) + .and_then(|terms| terms.first()) + .map(|term| term.weight) + ); + assert_eq!( + Some(22), + driver_affinity + .pod_affinity + .as_ref() + .and_then(|a| a + .preferred_during_scheduling_ignored_during_execution + .as_ref()) + .and_then(|terms| terms.first()) + .map(|term| term.weight) + ); + assert_eq!( + Some(32), + driver_affinity + .pod_anti_affinity + .as_ref() + .and_then(|a| a + .preferred_during_scheduling_ignored_during_execution + .as_ref()) + .and_then(|terms| terms.first()) + .map(|term| term.weight) + ); + + let executor_affinity = spark_application.executor_config().unwrap().affinity; + assert_eq!( + Some("executor"), + executor_affinity + .node_selector + .as_ref() + .and_then(|selectors| selectors.node_selector.get("affinity-role")) + .map(String::as_str) + ); + assert_eq!( + Some(13), + executor_affinity + .node_affinity + .as_ref() + .and_then(|a| a + .preferred_during_scheduling_ignored_during_execution + .as_ref()) + .and_then(|terms| terms.first()) + .map(|term| term.weight) + ); + assert_eq!( + Some(23), + executor_affinity + .pod_affinity + .as_ref() + .and_then(|a| a + .preferred_during_scheduling_ignored_during_execution + .as_ref()) + .and_then(|terms| terms.first()) + .map(|term| term.weight) + ); + assert_eq!( + Some(33), + executor_affinity + .pod_anti_affinity + .as_ref() + .and_then(|a| a + .preferred_during_scheduling_ignored_during_execution + .as_ref()) + .and_then(|terms| terms.first()) + .map(|term| term.weight) + ); + } + #[rstest] #[case("1800m", "2")] #[case("100m", "1")] diff --git a/rust/operator-binary/src/crd/roles.rs b/rust/operator-binary/src/crd/roles.rs index 580e0776..52e95af8 100644 --- a/rust/operator-binary/src/crd/roles.rs +++ b/rust/operator-binary/src/crd/roles.rs @@ -214,6 +214,10 @@ impl Configuration for RoleConfigFragment { pub struct SubmitConfig { #[fragment_attrs(serde(default))] pub resources: Resources, + + #[fragment_attrs(serde(default))] + pub affinity: StackableAffinity, + #[fragment_attrs(serde(default, flatten))] pub volume_mounts: Option, @@ -244,6 +248,7 @@ impl SubmitConfig { }, storage: SparkStorageConfigFragment {}, }, + affinity: Default::default(), volume_mounts: Some(VolumeMounts::default()), requested_secret_lifetime: Some(Self::DEFAULT_SECRET_LIFETIME), retry_on_failure_count: Some(DEFAULT_SUBMIT_JOB_RETRY_ON_FAILURE_COUNT), diff --git a/rust/operator-binary/src/crd/template_merger.rs b/rust/operator-binary/src/crd/template_merger.rs index bd5d2991..57650619 100644 --- a/rust/operator-binary/src/crd/template_merger.rs +++ b/rust/operator-binary/src/crd/template_merger.rs @@ -509,4 +509,261 @@ mod tests { "deps.requirements should be concatenated from all sources in order" ); } + + #[test] + fn test_merge_template_role_affinities_into_spark_application() { + let template = serde_yaml::from_str::< + crate::crd::template_spec::v1alpha1::SparkApplicationTemplate, + >(indoc! {r#" + --- + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkApplicationTemplate + metadata: + name: template-with-affinity + spec: + mode: cluster + mainApplicationFile: local:///template.jar + sparkImage: + productVersion: "3.5.8" + job: + config: + affinity: + nodeSelector: + affinity-role: template-job + nodeAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 11 + preference: + matchExpressions: + - key: topology.kubernetes.io/zone + operator: In + values: + - fictional-zone-job + podAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 21 + podAffinityTerm: + topologyKey: kubernetes.io/hostname + labelSelector: + matchLabels: + app.kubernetes.io/component: fictional-job + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 31 + podAffinityTerm: + topologyKey: kubernetes.io/hostname + labelSelector: + matchLabels: + app.kubernetes.io/component: fictional-job + driver: + config: + affinity: + nodeSelector: + affinity-role: template-driver + nodeAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 12 + preference: + matchExpressions: + - key: topology.kubernetes.io/zone + operator: In + values: + - fictional-zone-driver + podAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 22 + podAffinityTerm: + topologyKey: kubernetes.io/hostname + labelSelector: + matchLabels: + app.kubernetes.io/component: fictional-driver + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 32 + podAffinityTerm: + topologyKey: kubernetes.io/hostname + labelSelector: + matchLabels: + app.kubernetes.io/component: fictional-driver + executor: + replicas: 1 + config: + affinity: + nodeSelector: + affinity-role: template-executor + nodeAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 13 + preference: + matchExpressions: + - key: topology.kubernetes.io/zone + operator: In + values: + - fictional-zone-executor + podAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 23 + podAffinityTerm: + topologyKey: kubernetes.io/hostname + labelSelector: + matchLabels: + app.kubernetes.io/component: fictional-executor + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 33 + podAffinityTerm: + topologyKey: kubernetes.io/hostname + labelSelector: + matchLabels: + app.kubernetes.io/component: fictional-executor + "#}) + .unwrap(); + + let spark_app = serde_yaml::from_str::(indoc! {r#" + --- + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkApplication + metadata: + name: my-spark-app + namespace: default + spec: + mode: cluster + mainApplicationFile: local:///app.jar + sparkImage: + productVersion: "3.5.8" + "#}) + .unwrap(); + + let app_from_template = crate::crd::v1alpha1::SparkApplication::from(&template); + let merged = deep_merge(&app_from_template, &spark_app); + + let submit_affinity = merged.submit_config().unwrap().affinity; + assert_eq!( + Some("template-job"), + submit_affinity + .node_selector + .as_ref() + .and_then(|selectors| selectors.node_selector.get("affinity-role")) + .map(String::as_str) + ); + assert_eq!( + Some(11), + submit_affinity + .node_affinity + .as_ref() + .and_then(|a| a + .preferred_during_scheduling_ignored_during_execution + .as_ref()) + .and_then(|terms| terms.first()) + .map(|term| term.weight) + ); + assert_eq!( + Some(21), + submit_affinity + .pod_affinity + .as_ref() + .and_then(|a| a + .preferred_during_scheduling_ignored_during_execution + .as_ref()) + .and_then(|terms| terms.first()) + .map(|term| term.weight) + ); + assert_eq!( + Some(31), + submit_affinity + .pod_anti_affinity + .as_ref() + .and_then(|a| a + .preferred_during_scheduling_ignored_during_execution + .as_ref()) + .and_then(|terms| terms.first()) + .map(|term| term.weight) + ); + + let driver_affinity = merged.driver_config().unwrap().affinity; + assert_eq!( + Some("template-driver"), + driver_affinity + .node_selector + .as_ref() + .and_then(|selectors| selectors.node_selector.get("affinity-role")) + .map(String::as_str) + ); + assert_eq!( + Some(12), + driver_affinity + .node_affinity + .as_ref() + .and_then(|a| a + .preferred_during_scheduling_ignored_during_execution + .as_ref()) + .and_then(|terms| terms.first()) + .map(|term| term.weight) + ); + assert_eq!( + Some(22), + driver_affinity + .pod_affinity + .as_ref() + .and_then(|a| a + .preferred_during_scheduling_ignored_during_execution + .as_ref()) + .and_then(|terms| terms.first()) + .map(|term| term.weight) + ); + assert_eq!( + Some(32), + driver_affinity + .pod_anti_affinity + .as_ref() + .and_then(|a| a + .preferred_during_scheduling_ignored_during_execution + .as_ref()) + .and_then(|terms| terms.first()) + .map(|term| term.weight) + ); + + let executor_affinity = merged.executor_config().unwrap().affinity; + assert_eq!( + Some("template-executor"), + executor_affinity + .node_selector + .as_ref() + .and_then(|selectors| selectors.node_selector.get("affinity-role")) + .map(String::as_str) + ); + assert_eq!( + Some(13), + executor_affinity + .node_affinity + .as_ref() + .and_then(|a| a + .preferred_during_scheduling_ignored_during_execution + .as_ref()) + .and_then(|terms| terms.first()) + .map(|term| term.weight) + ); + assert_eq!( + Some(23), + executor_affinity + .pod_affinity + .as_ref() + .and_then(|a| a + .preferred_during_scheduling_ignored_during_execution + .as_ref()) + .and_then(|terms| terms.first()) + .map(|term| term.weight) + ); + assert_eq!( + Some(33), + executor_affinity + .pod_anti_affinity + .as_ref() + .and_then(|a| a + .preferred_during_scheduling_ignored_during_execution + .as_ref()) + .and_then(|terms| terms.first()) + .map(|term| term.weight) + ); + } } diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index 6a50f86e..6cf4eeaf 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -26,8 +26,8 @@ use stackable_operator::{ api::{ batch::v1::{Job, JobSpec}, core::v1::{ - ConfigMap, Container, EnvVar, PodSecurityContext, PodSpec, PodTemplateSpec, - ServiceAccount, Volume, + Affinity, ConfigMap, Container, EnvVar, PodSecurityContext, PodSpec, + PodTemplateSpec, ServiceAccount, Volume, }, rbac::v1::{ClusterRole, RoleBinding, RoleRef, Subject}, }, @@ -991,6 +991,11 @@ fn spark_job( restart_policy: Some("Never".to_string()), service_account_name: serviceaccount.metadata.name.clone(), volumes: Some(volumes), + affinity: Some(Affinity { + node_affinity: job_config.affinity.node_affinity.clone(), + pod_affinity: job_config.affinity.pod_affinity.clone(), + pod_anti_affinity: job_config.affinity.pod_anti_affinity.clone(), + }), image_pull_secrets: spark_image.pull_secrets.clone(), security_context: Some(security_context()), ..PodSpec::default() diff --git a/tests/templates/kuttl/spark-examples/10-assert.yaml b/tests/templates/kuttl/spark-examples/10-assert.yaml index 4b3b0dc3..ceebc449 100644 --- a/tests/templates/kuttl/spark-examples/10-assert.yaml +++ b/tests/templates/kuttl/spark-examples/10-assert.yaml @@ -10,3 +10,22 @@ metadata: name: spark-examples status: phase: Succeeded +--- +# Test that job affinities have been correctly added to the Job template +apiVersion: batch/v1 +kind: Job +metadata: + name: spark-examples +spec: + template: + spec: + affinity: + nodeAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - preference: + matchExpressions: + - key: disktype + operator: In + values: + - ssd + weight: 1 diff --git a/tests/templates/kuttl/spark-examples/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/spark-examples/10-deploy-spark-app.yaml.j2 index c8804ab8..6c70fc0d 100644 --- a/tests/templates/kuttl/spark-examples/10-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/spark-examples/10-deploy-spark-app.yaml.j2 @@ -18,6 +18,18 @@ spec: mode: cluster mainClass: org.apache.spark.examples.SparkALS mainApplicationFile: "local:///stackable/spark/examples/jars/spark-examples.jar" + job: + config: + affinity: + nodeAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 1 + preference: + matchExpressions: + - key: disktype + operator: In + values: + - ssd driver: config: logging: