From b76375a915c3ea9f864c9b99b00410dec7bafb4f Mon Sep 17 00:00:00 2001 From: Sonny00 Date: Tue, 24 Mar 2026 01:32:37 +0100 Subject: [PATCH] fix: sanitize ES job IDs for K8s label compliance Base64url-encoded Elasticsearch IDs can end with '_', which is invalid for K8s labels. This causes intermittent job creation/deletion failures. Fix uses conditional hex-encoding to ensure valid labels. --- .../thp/cortex/services/K8sJobRunnerSrv.scala | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/app/org/thp/cortex/services/K8sJobRunnerSrv.scala b/app/org/thp/cortex/services/K8sJobRunnerSrv.scala index 588d82331..1e443b00d 100644 --- a/app/org/thp/cortex/services/K8sJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/K8sJobRunnerSrv.scala @@ -34,6 +34,17 @@ class K8sJobRunnerSrv( lazy val logger: Logger = Logger(getClass) + private val k8sLabelRegex = "^[A-Za-z0-9]([A-Za-z0-9._-]*[A-Za-z0-9])?$".r + + def sanitizeForK8sLabel(value: String): String = { + value match { + case v if v.nonEmpty && v.length <= 63 && k8sLabelRegex.matches(v) => v + case _ => + val hex = value.getBytes("UTF-8").map("%02x".format(_)).mkString + if (hex.length > 63) hex.take(63) else hex + } + } + lazy val isAvailable: Boolean = Try { val ver = client.getVersion @@ -55,9 +66,10 @@ class K8sJobRunnerSrv( // make the default longer than likely values, but still not infinite val timeout_or_default = timeout.getOrElse(8.hours) // https://kubernetes.io/docs/concepts/overview/working-with-objects/names/ - // FIXME: this collapses case, jeopardizing the uniqueness of the identifier. - // LDH: lowercase, digits, hyphens. - val ldh_jobid = job.id.toLowerCase().replace('_', '-') + // LDH: lowercase, digits, hyphens. Must start/end with alphanumeric. + val sanitizedJobId = sanitizeForK8sLabel(job.id) + val sanitizedWorkerId = sanitizeForK8sLabel(job.workerId()) + val ldh_jobid = sanitizedJobId.toLowerCase().replace('_', '-').replaceAll("[^a-z0-9-]", "").replaceAll("^-+|-+$", "") val kjobName = "neuron-job-" + ldh_jobid val pvcvs = new PersistentVolumeClaimVolumeSourceBuilder() .withClaimName(persistentVolumeClaimName.get) @@ -69,8 +81,8 @@ class K8sJobRunnerSrv( .withNewMetadata() .withName(kjobName) .withLabels(Map( - "cortex-job-id" -> job.id, - "cortex-worker-id" -> job.workerId(), + "cortex-job-id" -> sanitizedJobId, + "cortex-worker-id" -> sanitizedWorkerId, "cortex-neuron-job" -> "true").asJava) .endMetadata() .withNewSpec() @@ -124,7 +136,7 @@ class K8sJobRunnerSrv( s" image : $dockerImage\n" + s" mount : pvc $persistentVolumeClaimName subdir $relativeJobDirectory as /job" + created_env.map(ev => s"\n env : ${ev.getName} = ${ev.getValue}").mkString) - val ended_kjob = client.batch().v1().jobs().withLabel("cortex-job-id", job.id) + val ended_kjob = client.batch().v1().jobs().withLabel("cortex-job-id", sanitizedJobId) .waitUntilCondition(x => Option(x).flatMap(j => Option(j.getStatus).flatMap(s => Some(s.getConditions.asScala.map(_.getType).exists(t => @@ -140,7 +152,7 @@ class K8sJobRunnerSrv( } // let's find the job by the attribute we know is fundamentally // unique, rather than one constructed from it - val deleted: util.List[StatusDetails] = client.batch().v1().jobs().withLabel("cortex-job-id", job.id).delete() + val deleted: util.List[StatusDetails] = client.batch().v1().jobs().withLabel("cortex-job-id", sanitizedJobId).delete() if(!deleted.isEmpty) { logger.info(s"Deleted Kubernetes Job for job ${job.id}") } else {