Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -322,20 +322,15 @@ object Utils extends Logging {
}
}

def redactCommandLineArgs(conf: KyuubiConf, commands: Iterable[String]): Iterable[String] = {
conf.get(SERVER_SECRET_REDACTION_PATTERN) match {
case Some(redactionPattern) =>
var nextKV = false
def redactCommandLineArgs(
redactionPattern: Option[Regex],
commands: Iterable[String]): Iterable[String] = {
redactionPattern match {
case Some(pattern) =>
commands.map {
case PATTERN_FOR_KEY_VALUE_ARG(key, value) if nextKV =>
val (_, newValue) = redact(redactionPattern, Seq((key, value))).head
nextKV = false
case PATTERN_FOR_KEY_VALUE_ARG(key, value) =>
val (_, newValue) = redact(pattern, Seq((key, value))).head
genKeyValuePair(key, newValue)

case cmd if cmd == CONF =>
nextKV = true
cmd

case cmd =>
cmd
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ class UtilsSuite extends KyuubiFunSuite {
val commands = buffer

// Redact sensitive information
val redactedCmdArgs = Utils.redactCommandLineArgs(conf, commands)
val redactedCmdArgs =
Utils.redactCommandLineArgs(conf.get(SERVER_SECRET_REDACTION_PATTERN), commands)

val expectBuffer = new mutable.ListBuffer[String]()
expectBuffer += "main"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.{Semaphore, TimeUnit}

import scala.collection.JavaConverters._
import scala.util.Random
import scala.util.matching.Regex

import com.codahale.metrics.MetricRegistry
import com.google.common.annotations.VisibleForTesting
Expand Down Expand Up @@ -62,6 +63,7 @@ private[kyuubi] class EngineRef(
groupProvider: GroupProvider,
engineRefId: String,
engineManager: KyuubiApplicationManager,
redactionPattern: Option[Regex],
startupProcessSemaphore: Option[Semaphore] = None)
extends Logging {
// The corresponding ServerSpace where the engine belongs to
Expand Down Expand Up @@ -234,19 +236,38 @@ private[kyuubi] class EngineRef(
builder = engineType match {
case SPARK_SQL =>
conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
new SparkProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
new SparkProcessBuilder(
appUser,
doAsEnabled,
conf,
engineRefId,
redactionPattern,
extraEngineLog)
case FLINK_SQL =>
conf.setIfMissing(FlinkProcessBuilder.APP_KEY, defaultEngineName)
new FlinkProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
new FlinkProcessBuilder(
appUser,
doAsEnabled,
conf,
engineRefId,
redactionPattern,
extraEngineLog)
case TRINO =>
new TrinoProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
new TrinoProcessBuilder(
appUser,
doAsEnabled,
conf,
engineRefId,
redactionPattern,
extraEngineLog)
case HIVE_SQL =>
conf.setIfMissing(HiveProcessBuilder.HIVE_ENGINE_NAME, defaultEngineName)
HiveProcessBuilder(
appUser,
doAsEnabled,
conf,
engineRefId,
redactionPattern,
extraEngineLog,
defaultEngineName)
case JDBC =>
Expand All @@ -256,6 +277,7 @@ private[kyuubi] class EngineRef(
doAsEnabled,
conf,
engineRefId,
redactionPattern,
extraEngineLog,
defaultEngineName)
case DATA_AGENT =>
Expand All @@ -268,7 +290,13 @@ private[kyuubi] class EngineRef(
info(s"Data Agent JDBC URL not configured, using Kyuubi server via ZK: $jdbcUrl")
}
}
new DataAgentProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
new DataAgentProcessBuilder(
appUser,
doAsEnabled,
conf,
engineRefId,
redactionPattern,
extraEngineLog)
}

MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths}

import scala.collection.JavaConverters._
import scala.util.matching.Regex

import com.google.common.collect.EvictingQueue
import org.apache.commons.lang3.StringUtils
Expand Down Expand Up @@ -107,6 +108,8 @@ trait ProcBuilder {

def env: Map[String, String] = conf.getEnvs

def redactionPattern: Option[Regex]

protected val extraEngineLog: Option[OperationLog]

/**
Expand Down Expand Up @@ -295,7 +298,7 @@ trait ProcBuilder {
if (commands == null) {
super.toString
} else {
Utils.redactCommandLineArgs(conf, commands).map {
Utils.redactCommandLineArgs(redactionPattern, commands).map {
case arg if arg.startsWith("-") => s"\\\n\t$arg"
case arg => arg
}.mkString(" ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File
import java.nio.file.{Files, Paths}

import scala.collection.mutable
import scala.util.matching.Regex

import com.google.common.annotations.VisibleForTesting
import org.apache.commons.lang3.StringUtils
Expand All @@ -38,12 +39,13 @@ class DataAgentProcessBuilder(
override val doAsEnabled: Boolean,
override val conf: KyuubiConf,
val engineRefId: String,
override val redactionPattern: Option[Regex],
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {

@VisibleForTesting
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) = {
this(proxyUser, doAsEnabled, conf, "")
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
this(proxyUser, doAsEnabled, conf, "", None)
}

override def shortName: String = "data-agent"
Expand Down Expand Up @@ -101,7 +103,7 @@ class DataAgentProcessBuilder(
super.toString
} else {
redactConfValues(
Utils.redactCommandLineArgs(conf, commands),
Utils.redactCommandLineArgs(redactionPattern, commands),
Set(ENGINE_DATA_AGENT_OPENAI_API_KEY.key)).map {
case arg if arg.startsWith("-") || arg == mainClass => s"\\\n\t$arg"
case arg => arg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.{File, FilenameFilter}
import java.nio.file.{Files, Paths}

import scala.collection.mutable
import scala.util.matching.Regex

import com.google.common.annotations.VisibleForTesting
import org.apache.commons.lang3.StringUtils
Expand All @@ -46,12 +47,13 @@ class FlinkProcessBuilder(
override val doAsEnabled: Boolean,
override val conf: KyuubiConf,
val engineRefId: String,
override val redactionPattern: Option[Regex],
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {

@VisibleForTesting
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
this(proxyUser, doAsEnabled, conf, "")
this(proxyUser, doAsEnabled, conf, "", None)
}

val flinkHome: String = getEngineHome(shortName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File
import java.nio.file.{Files, Paths}

import scala.collection.mutable
import scala.util.matching.Regex

import com.google.common.annotations.VisibleForTesting
import org.apache.commons.lang3.StringUtils
Expand All @@ -42,12 +43,13 @@ class HiveProcessBuilder(
override val doAsEnabled: Boolean,
override val conf: KyuubiConf,
val engineRefId: String,
override val redactionPattern: Option[Regex],
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {

@VisibleForTesting
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
this(proxyUser, doAsEnabled, conf, "")
this(proxyUser, doAsEnabled, conf, "", None)
}

protected val hiveHome: String = getEngineHome(shortName)
Expand Down Expand Up @@ -127,16 +129,28 @@ object HiveProcessBuilder extends Logging {
doAsEnabled: Boolean,
conf: KyuubiConf,
engineRefId: String,
redactPattern: Option[Regex],
extraEngineLog: Option[OperationLog],
defaultEngineName: String): HiveProcessBuilder = {
checkKeytab(proxyUser, conf)
DeployMode.withName(conf.get(ENGINE_HIVE_DEPLOY_MODE)) match {
case LOCAL =>
new HiveProcessBuilder(proxyUser, doAsEnabled, conf, engineRefId, extraEngineLog)
case LOCAL => new HiveProcessBuilder(
proxyUser,
doAsEnabled,
conf,
engineRefId,
redactPattern,
extraEngineLog)
case YARN =>
warn(s"Hive on YARN model is experimental.")
conf.setIfMissing(ENGINE_DEPLOY_YARN_MODE_APP_NAME, Some(defaultEngineName))
new HiveYarnModeProcessBuilder(proxyUser, doAsEnabled, conf, engineRefId, extraEngineLog)
new HiveYarnModeProcessBuilder(
proxyUser,
doAsEnabled,
conf,
engineRefId,
redactPattern,
extraEngineLog)
case other => throw new KyuubiException(s"Unsupported deploy mode: $other")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.matching.Regex

import org.apache.kyuubi.{KyuubiException, Logging, SCALA_COMPILE_VERSION}
import org.apache.kyuubi.config.KyuubiConf
Expand All @@ -43,8 +44,15 @@ class HiveYarnModeProcessBuilder(
override val doAsEnabled: Boolean,
override val conf: KyuubiConf,
override val engineRefId: String,
override val redactionPattern: Option[Regex],
override val extraEngineLog: Option[OperationLog] = None)
extends HiveProcessBuilder(proxyUser, doAsEnabled, conf, engineRefId, extraEngineLog)
extends HiveProcessBuilder(
proxyUser,
doAsEnabled,
conf,
engineRefId,
redactionPattern,
extraEngineLog)
with Logging {

override protected def mainClass: String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File
import java.nio.file.Paths

import scala.collection.mutable
import scala.util.matching.Regex

import com.google.common.annotations.VisibleForTesting
import org.apache.commons.lang3.StringUtils
Expand All @@ -41,12 +42,13 @@ class JdbcProcessBuilder(
override val doAsEnabled: Boolean,
override val conf: KyuubiConf,
val engineRefId: String,
override val redactionPattern: Option[Regex],
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {

@VisibleForTesting
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
this(proxyUser, doAsEnabled, conf, "")
this(proxyUser, doAsEnabled, conf, "", None)
}

/**
Expand Down Expand Up @@ -110,7 +112,7 @@ class JdbcProcessBuilder(
super.toString
} else {
redactConfValues(
Utils.redactCommandLineArgs(conf, commands),
Utils.redactCommandLineArgs(redactionPattern, commands),
Set(ENGINE_JDBC_CONNECTION_PASSWORD.key)).map {
case arg if arg.startsWith("-") => s"\\\n\t$arg"
case arg => arg
Expand All @@ -128,16 +130,29 @@ object JdbcProcessBuilder extends Logging {
doAsEnabled: Boolean,
conf: KyuubiConf,
engineRefId: String,
redactPattern: Option[Regex],
extraEngineLog: Option[OperationLog],
defaultEngineName: String): JdbcProcessBuilder = {
checkKeytab(proxyUser, conf)
DeployMode.withName(conf.get(ENGINE_JDBC_DEPLOY_MODE)) match {
case LOCAL =>
new JdbcProcessBuilder(proxyUser, doAsEnabled, conf, engineRefId, extraEngineLog)
new JdbcProcessBuilder(
proxyUser,
doAsEnabled,
conf,
engineRefId,
redactPattern,
extraEngineLog)
case YARN =>
warn(s"JDBC on YARN model is experimental.")
conf.setIfMissing(ENGINE_DEPLOY_YARN_MODE_APP_NAME, Some(defaultEngineName))
new JdbcYarnModeProcessBuilder(proxyUser, doAsEnabled, conf, engineRefId, extraEngineLog)
new JdbcYarnModeProcessBuilder(
proxyUser,
doAsEnabled,
conf,
engineRefId,
redactPattern,
extraEngineLog)
case other => throw new KyuubiException(s"Unsupported deploy mode: $other")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.matching.Regex

import org.apache.commons.lang3.StringUtils

Expand All @@ -44,8 +45,15 @@ class JdbcYarnModeProcessBuilder(
override val doAsEnabled: Boolean,
override val conf: KyuubiConf,
override val engineRefId: String,
override val redactionPattern: Option[Regex],
override val extraEngineLog: Option[OperationLog] = None)
extends JdbcProcessBuilder(proxyUser, doAsEnabled, conf, engineRefId, extraEngineLog)
extends JdbcProcessBuilder(
proxyUser,
doAsEnabled,
conf,
engineRefId,
redactionPattern,
extraEngineLog)
with Logging {

override protected def mainClass: String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kyuubi.engine.spark

import scala.collection.mutable
import scala.util.matching.Regex

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.KyuubiApplicationManager
Expand All @@ -33,9 +34,10 @@ class SparkBatchProcessBuilder(
override val mainClass: String,
batchConf: Map[String, String],
batchArgs: Seq[String],
override val redactionPattern: Option[Regex],
override val extraEngineLog: Option[OperationLog])
// TODO respect doAsEnabled
extends SparkProcessBuilder(proxyUser, true, conf, batchId, extraEngineLog) {
extends SparkProcessBuilder(proxyUser, true, conf, batchId, redactionPattern, extraEngineLog) {
import SparkProcessBuilder._

override protected[kyuubi] lazy val commands: Iterable[String] = {
Expand Down
Loading
Loading