Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions app/org/thp/cortex/services/K8sJobRunnerSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ class K8sJobRunnerSrv(

lazy val logger: Logger = Logger(getClass)

private val k8sLabelRegex = "^[A-Za-z0-9]([A-Za-z0-9._-]*[A-Za-z0-9])?$".r

def sanitizeForK8sLabel(value: String): String = {
value match {
case v if v.nonEmpty && v.length <= 63 && k8sLabelRegex.matches(v) => v
case _ =>
val hex = value.getBytes("UTF-8").map("%02x".format(_)).mkString
if (hex.length > 63) hex.take(63) else hex
}
}

lazy val isAvailable: Boolean =
Try {
val ver = client.getVersion
Expand All @@ -55,9 +66,10 @@ class K8sJobRunnerSrv(
// make the default longer than likely values, but still not infinite
val timeout_or_default = timeout.getOrElse(8.hours)
// https://kubernetes.io/docs/concepts/overview/working-with-objects/names/
// FIXME: this collapses case, jeopardizing the uniqueness of the identifier.
// LDH: lowercase, digits, hyphens.
val ldh_jobid = job.id.toLowerCase().replace('_', '-')
// LDH: lowercase, digits, hyphens. Must start/end with alphanumeric.
val sanitizedJobId = sanitizeForK8sLabel(job.id)
val sanitizedWorkerId = sanitizeForK8sLabel(job.workerId())
val ldh_jobid = sanitizedJobId.toLowerCase().replace('_', '-').replaceAll("[^a-z0-9-]", "").replaceAll("^-+|-+$", "")
val kjobName = "neuron-job-" + ldh_jobid
val pvcvs = new PersistentVolumeClaimVolumeSourceBuilder()
.withClaimName(persistentVolumeClaimName.get)
Expand All @@ -69,8 +81,8 @@ class K8sJobRunnerSrv(
.withNewMetadata()
.withName(kjobName)
.withLabels(Map(
"cortex-job-id" -> job.id,
"cortex-worker-id" -> job.workerId(),
"cortex-job-id" -> sanitizedJobId,
"cortex-worker-id" -> sanitizedWorkerId,
"cortex-neuron-job" -> "true").asJava)
.endMetadata()
.withNewSpec()
Expand Down Expand Up @@ -124,7 +136,7 @@ class K8sJobRunnerSrv(
s" image : $dockerImage\n" +
s" mount : pvc $persistentVolumeClaimName subdir $relativeJobDirectory as /job" +
created_env.map(ev => s"\n env : ${ev.getName} = ${ev.getValue}").mkString)
val ended_kjob = client.batch().v1().jobs().withLabel("cortex-job-id", job.id)
val ended_kjob = client.batch().v1().jobs().withLabel("cortex-job-id", sanitizedJobId)
.waitUntilCondition(x => Option(x).flatMap(j =>
Option(j.getStatus).flatMap(s =>
Some(s.getConditions.asScala.map(_.getType).exists(t =>
Expand All @@ -140,7 +152,7 @@ class K8sJobRunnerSrv(
}
// let's find the job by the attribute we know is fundamentally
// unique, rather than one constructed from it
val deleted: util.List[StatusDetails] = client.batch().v1().jobs().withLabel("cortex-job-id", job.id).delete()
val deleted: util.List[StatusDetails] = client.batch().v1().jobs().withLabel("cortex-job-id", sanitizedJobId).delete()
if(!deleted.isEmpty) {
logger.info(s"Deleted Kubernetes Job for job ${job.id}")
} else {
Expand Down