Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,11 @@ private[spark] class Client(
kubernetesClient: KubernetesClient,
watcher: LoggingPodStatusWatcher) extends Logging {

// Generate the config map name once per Client instance
private val configMapName = KubernetesClientUtils.configMapNameDriver

def run(): Unit = {
val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient)
val configMapName = KubernetesClientUtils.configMapNameDriver
val confFilesMap = KubernetesClientUtils.buildSparkConfDirFilesMap(configMapName,
conf.sparkConf, resolvedDriverSpec.systemProperties)
val configMap = KubernetesClientUtils.buildConfigMap(configMapName, confFilesMap +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ object KubernetesClientUtils extends Logging {
}

@Since("3.1.0")
val configMapNameExecutor: String = configMapName(s"spark-exec-${KubernetesUtils.uniqueID()}")
def configMapNameExecutor: String = configMapName(s"spark-exec-${KubernetesUtils.uniqueID()}")

@Since("3.1.0")
val configMapNameDriver: String = configMapName(s"spark-drv-${KubernetesUtils.uniqueID()}")
def configMapNameDriver: String = configMapName(s"spark-drv-${KubernetesUtils.uniqueID()}")

private def buildStringFromPropertiesMap(configMapName: String,
propertiesMap: Map[String, String]): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.api.model.apiextensions.v1.{CustomResourceDefinition, CustomResourceDefinitionBuilder}
import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
import io.fabric8.kubernetes.client.dsl.PodResource
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mock, MockitoAnnotations}
import org.mockito.Mockito.{verify, when}
import org.scalatest.BeforeAndAfter
import org.scalatestplus.mockito.MockitoSugar._
Expand Down Expand Up @@ -93,22 +93,26 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
private val KEY_TO_PATH =
new KeyToPath(SPARK_CONF_FILE_NAME, 420, SPARK_CONF_FILE_NAME)

private def fullExpectedPod(keyToPaths: List[KeyToPath] = List(KEY_TO_PATH)) =
private def fullExpectedPod(
configMapName: String,
keyToPaths: List[KeyToPath] = List(KEY_TO_PATH)) =
new PodBuilder(BUILT_DRIVER_POD)
.editSpec()
.addToContainers(FULL_EXPECTED_CONTAINER)
.addNewVolume()
.withName(SPARK_CONF_VOLUME_DRIVER)
.withNewConfigMap()
.withItems(keyToPaths.asJava)
.withName(KubernetesClientUtils.configMapNameDriver)
.withName(configMapName)
.endConfigMap()
.endVolume()
.endSpec()
.build()

private def podWithOwnerReference(keyToPaths: List[KeyToPath] = List(KEY_TO_PATH)) =
new PodBuilder(fullExpectedPod(keyToPaths))
private def podWithOwnerReference(
configMapName: String,
keyToPaths: List[KeyToPath] = List(KEY_TO_PATH)) =
new PodBuilder(fullExpectedPod(configMapName, keyToPaths))
.editMetadata()
.withUid(DRIVER_POD_UID)
.endMetadata()
Expand Down Expand Up @@ -168,9 +172,12 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
private var kconf: KubernetesDriverConf = _
private var createdPodArgumentCaptor: ArgumentCaptor[Pod] = _
private var createdResourcesArgumentCaptor: ArgumentCaptor[Array[HasMetadata]] = _
private var testConfigMapName: String = _

before {
MockitoAnnotations.openMocks(this).close()
// Generate the config map name once per test to simulate consistent behavior
testConfigMapName = KubernetesClientUtils.configMapNameDriver
kconf = KubernetesTestConf.createDriverConf(
resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX))
when(driverBuilder.buildFromFeatures(kconf, kubernetesClient)).thenReturn(BUILT_KUBERNETES_SPEC)
Expand All @@ -180,10 +187,12 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {

createdPodArgumentCaptor = ArgumentCaptor.forClass(classOf[Pod])
createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[Array[HasMetadata]])
when(podsWithNamespace.resource(fullExpectedPod())).thenReturn(namedPods)
when(podsWithNamespace.resource(ArgumentMatchers.argThat[Pod](pod =>
pod != null && pod.getMetadata.getName == POD_NAME
))).thenReturn(namedPods)
when(resourceList.forceConflicts()).thenReturn(resourceList)
when(namedPods.serverSideApply()).thenReturn(podWithOwnerReference())
when(namedPods.create()).thenReturn(podWithOwnerReference())
when(namedPods.serverSideApply()).thenReturn(podWithOwnerReference(testConfigMapName))
when(namedPods.create()).thenReturn(podWithOwnerReference(testConfigMapName))
when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
val sId = submissionId(kconf.namespace, POD_NAME)
when(loggingPodStatusWatcher.watchOrStop(sId)).thenReturn(true)
Expand All @@ -199,7 +208,9 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
kubernetesClient,
loggingPodStatusWatcher)
submissionClient.run()
verify(podsWithNamespace).resource(fullExpectedPod())
verify(podsWithNamespace).resource(ArgumentMatchers.argThat[Pod](pod =>
pod != null && pod.getMetadata.getName == POD_NAME
))
verify(namedPods).create()
}

Expand All @@ -219,8 +230,9 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
assert(secrets.nonEmpty)
assert(configMaps.nonEmpty)
val configMap = configMaps.head
assert(configMap.getMetadata.getName ===
KubernetesClientUtils.configMapNameDriver)
// Verify the config map name starts with expected prefix and ends with expected suffix
assert(configMap.getMetadata.getName.startsWith("spark-drv-"))
assert(configMap.getMetadata.getName.endsWith("-conf-map"))
assert(configMap.getImmutable())
assert(configMap.getData.containsKey(SPARK_CONF_FILE_NAME))
assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf1key=conf1value"))
Expand Down Expand Up @@ -265,8 +277,9 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
assert(secrets.nonEmpty)
assert(configMaps.nonEmpty)
val configMap = configMaps.head
assert(configMap.getMetadata.getName ===
KubernetesClientUtils.configMapNameDriver)
// Verify the config map name starts with expected prefix and ends with expected suffix
assert(configMap.getMetadata.getName.startsWith("spark-drv-"))
assert(configMap.getMetadata.getName.endsWith("-conf-map"))
assert(configMap.getImmutable())
assert(configMap.getData.containsKey(SPARK_CONF_FILE_NAME))
assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf1key=conf1value"))
Expand Down Expand Up @@ -309,10 +322,11 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
val expectedKeyToPaths = (expectedConfFiles.map(x => new KeyToPath(x, 420, x)).toList ++
List(KEY_TO_PATH)).sortBy(x => x.getKey)

when(podsWithNamespace.resource(fullExpectedPod(expectedKeyToPaths)))
when(podsWithNamespace.resource(fullExpectedPod(testConfigMapName, expectedKeyToPaths)))
.thenReturn(namedPods)
when(namedPods.forceConflicts()).thenReturn(namedPods)
when(namedPods.serverSideApply()).thenReturn(podWithOwnerReference(expectedKeyToPaths))
when(namedPods.serverSideApply())
.thenReturn(podWithOwnerReference(testConfigMapName, expectedKeyToPaths))

kconf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf,
resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX))
Expand All @@ -331,9 +345,10 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
val configMaps = otherCreatedResources.toArray
.filter(_.isInstanceOf[ConfigMap]).map(_.asInstanceOf[ConfigMap])
assert(configMaps.nonEmpty)
val configMapName = KubernetesClientUtils.configMapNameDriver
val configMap: ConfigMap = configMaps.head
assert(configMap.getMetadata.getName == configMapName)
// Verify the config map name has expected format
assert(configMap.getMetadata.getName.startsWith("spark-drv-"))
assert(configMap.getMetadata.getName.endsWith("-conf-map"))
val configMapLoadedFiles = configMap.getData.keySet().asScala.toSet -
Config.KUBERNETES_NAMESPACE.key
assert(configMapLoadedFiles === expectedConfFiles.toSet ++ Set(SPARK_CONF_FILE_NAME))
Expand Down