Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3283,6 +3283,32 @@ object KyuubiConf {
.toSet()
.createWithDefault(Set.empty)

val SERVER_LIMIT_CONNECTIONS_IP_ALLOWLIST: ConfigEntry[Set[String]] =
buildConf("kyuubi.server.limit.connections.ip.allowlist")
.doc("When this list is not empty, only the client ip in the allow list will be" +
" permitted to connect to kyuubi server, all other ips will be denied." +
" If this list is empty (default), no ip allowlist restriction is applied." +
" Note: if a client ip is in both ip.allowlist and ip.deny.list," +
" the deny list takes higher priority.")
.version("1.10.0")
.serverOnly
.stringConf
.toSet()
.createWithDefault(Set.empty)

val SERVER_LIMIT_CONNECTIONS_USER_ALLOWLIST: ConfigEntry[Set[String]] =
buildConf("kyuubi.server.limit.connections.user.allowlist")
.doc("When this list is not empty, only the user in the allow list will be" +
" permitted to connect to kyuubi server, all other users will be denied." +
" If this list is empty (default), no user allowlist restriction is applied." +
" Note: if a user is in both user.allowlist and user.deny.list," +
" the deny list takes higher priority.")
.version("1.10.0")
.serverOnly
.stringConf
.toSet()
.createWithDefault(Set.empty)

val SERVER_LIMIT_BATCH_CONNECTIONS_PER_USER: OptionalConfigEntry[Int] =
buildConf("kyuubi.server.limit.batch.connections.per.user")
.doc("Maximum kyuubi server batch connections per user." +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,22 @@ object KyuubiServer extends Logging {
info(s"Refreshed deny client ips from $existingDenyIps to $refreshedDenyIps")
}

private[kyuubi] def refreshIpAllowlist(): Unit = synchronized {
val sessionMgr = kyuubiServer.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
val existingIpAllowlist = sessionMgr.getIpAllowlist
sessionMgr.refreshIpAllowlist(createKyuubiConf())
val refreshedIpAllowlist = sessionMgr.getIpAllowlist
info(s"Refreshed ip allowlist from $existingIpAllowlist to $refreshedIpAllowlist")
}

private[kyuubi] def refreshUserAllowlist(): Unit = synchronized {
val sessionMgr = kyuubiServer.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
val existingUserAllowlist = sessionMgr.getUserAllowlist
sessionMgr.refreshUserAllowlist(createKyuubiConf())
val refreshedUserAllowlist = sessionMgr.getUserAllowlist
info(s"Refreshed user allowlist from $existingUserAllowlist to $refreshedUserAllowlist")
}

private def createKyuubiConf(): KyuubiConf = {
KyuubiConf().loadFileDefaults().loadFromArgs(commandArgs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,44 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
Response.ok(s"Refresh the deny ips successfully.").build()
}

@ApiResponse(
responseCode = "200",
content = Array(new Content(mediaType = MediaType.APPLICATION_JSON)),
description = "refresh the ip allowlist")
@POST
@Path("refresh/ip_allowlist")
def refreshIpAllowlist(): Response = {
val userName = fe.getSessionUser(Map.empty[String, String])
val ipAddress = fe.getIpAddress
info(s"Receive refresh ip allowlist request from $userName/$ipAddress")
if (!fe.isAdministrator(userName)) {
throw new ForbiddenException(
s"$userName is not allowed to refresh the ip allowlist")
}
info(s"Reloading ip allowlist")
KyuubiServer.refreshIpAllowlist()
Response.ok(s"Refresh the ip allowlist successfully.").build()
}

@ApiResponse(
responseCode = "200",
content = Array(new Content(mediaType = MediaType.APPLICATION_JSON)),
description = "refresh the user allowlist")
@POST
@Path("refresh/user_allowlist")
def refreshUserAllowlist(): Response = {
val userName = fe.getSessionUser(Map.empty[String, String])
val ipAddress = fe.getIpAddress
info(s"Receive refresh user allowlist request from $userName/$ipAddress")
if (!fe.isAdministrator(userName)) {
throw new ForbiddenException(
s"$userName is not allowed to refresh the user allowlist")
}
info(s"Reloading user allowlist")
KyuubiServer.refreshUserAllowlist()
Response.ok(s"Refresh the user allowlist successfully.").build()
}

@ApiResponse(
responseCode = "200",
content = Array(new Content(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,17 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
conf.get(SERVER_LIMIT_CONNECTIONS_USER_UNLIMITED_LIST).filter(_.nonEmpty)
val userDenyList = conf.get(SERVER_LIMIT_CONNECTIONS_USER_DENY_LIST).filter(_.nonEmpty)
val ipDenyList = conf.get(SERVER_LIMIT_CONNECTIONS_IP_DENY_LIST).filter(_.nonEmpty)
val ipAllowlist = conf.get(SERVER_LIMIT_CONNECTIONS_IP_ALLOWLIST).filter(_.nonEmpty)
val userAllowlist = conf.get(SERVER_LIMIT_CONNECTIONS_USER_ALLOWLIST).filter(_.nonEmpty)
limiter = applySessionLimiter(
userLimit,
ipAddressLimit,
userIpAddressLimit,
userUnlimitedList,
userDenyList,
ipDenyList)
ipDenyList,
ipAllowlist,
userAllowlist)

val userBatchLimit = conf.get(SERVER_LIMIT_BATCH_CONNECTIONS_PER_USER).getOrElse(0)
val ipAddressBatchLimit = conf.get(SERVER_LIMIT_BATCH_CONNECTIONS_PER_IPADDRESS).getOrElse(0)
Expand All @@ -414,7 +418,9 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
userIpAddressBatchLimit,
userUnlimitedList,
userDenyList,
ipDenyList)
ipDenyList,
ipAllowlist,
userAllowlist)
}

private[kyuubi] def getUnlimitedUsers: Set[String] = {
Expand Down Expand Up @@ -448,22 +454,49 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
batchLimiter.foreach(SessionLimiter.resetDenyIps(_, denyIps))
}

private[kyuubi] def getIpAllowlist: Set[String] = {
limiter.orElse(batchLimiter).map(SessionLimiter.getIpAllowlist).getOrElse(Set.empty)
}

private[kyuubi] def refreshIpAllowlist(conf: KyuubiConf): Unit = {
val ipAllowlist =
conf.get(SERVER_LIMIT_CONNECTIONS_IP_ALLOWLIST).filter(_.nonEmpty)
limiter.foreach(SessionLimiter.resetIpAllowlist(_, ipAllowlist))
batchLimiter.foreach(SessionLimiter.resetIpAllowlist(_, ipAllowlist))
}

private[kyuubi] def getUserAllowlist: Set[String] = {
limiter.orElse(batchLimiter).map(SessionLimiter.getUserAllowlist).getOrElse(Set.empty)
}

private[kyuubi] def refreshUserAllowlist(conf: KyuubiConf): Unit = {
val userAllowlist =
conf.get(SERVER_LIMIT_CONNECTIONS_USER_ALLOWLIST).filter(_.nonEmpty)
limiter.foreach(SessionLimiter.resetUserAllowlist(_, userAllowlist))
batchLimiter.foreach(SessionLimiter.resetUserAllowlist(_, userAllowlist))
}

private def applySessionLimiter(
userLimit: Int,
ipAddressLimit: Int,
userIpAddressLimit: Int,
userUnlimitedList: Set[String],
userDenyList: Set[String],
ipDenyList: Set[String]): Option[SessionLimiter] = {
ipDenyList: Set[String],
ipAllowlist: Set[String] = Set.empty,
userAllowlist: Set[String] = Set.empty): Option[SessionLimiter] = {
if (Seq(userLimit, ipAddressLimit, userIpAddressLimit).exists(_ > 0) ||
userDenyList.nonEmpty || ipDenyList.nonEmpty) {
userDenyList.nonEmpty || ipDenyList.nonEmpty ||
ipAllowlist.nonEmpty || userAllowlist.nonEmpty) {
Some(SessionLimiter(
userLimit,
ipAddressLimit,
userIpAddressLimit,
userUnlimitedList,
userDenyList,
ipDenyList))
ipDenyList,
ipAllowlist,
userAllowlist))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ class SessionLimiterWithAccessControlListImpl(
userIpAddressLimit: Int,
var unlimitedUsers: Set[String],
var denyUsers: Set[String],
var denyIps: Set[String])
var denyIps: Set[String],
var ipAllowlist: Set[String] = Set.empty,
var userAllowlist: Set[String] = Set.empty)
extends SessionLimiterImpl(userLimit, ipAddressLimit, userIpAddressLimit) {
override def increment(userIpAddress: UserIpAddress): Unit = {
val user = userIpAddress.user
Expand All @@ -123,6 +125,19 @@ class SessionLimiterWithAccessControlListImpl(
s"Connection denied because the client ip is in the deny ip list. (ipAddress: $ip)"
throw KyuubiSQLException(errorMsg)
}
// user allowlist check: when allowlist is not empty, only allowed users can connect
if (userAllowlist.nonEmpty && StringUtils.isNotBlank(user) &&
!userAllowlist.contains(user)) {
val errorMsg =
s"Connection denied because the user is not in the user allowlist. (user: $user)"
throw KyuubiSQLException(errorMsg)
}
// ip allowlist check: when allowlist is not empty, only allowed ips can connect
if (ipAllowlist.nonEmpty && StringUtils.isNotBlank(ip) && !ipAllowlist.contains(ip)) {
val errorMsg =
s"Connection denied because the client ip is not in the ip allowlist. (ipAddress: $ip)"
throw KyuubiSQLException(errorMsg)
}

if (!unlimitedUsers.contains(user)) {
super.increment(userIpAddress)
Expand All @@ -140,6 +155,14 @@ class SessionLimiterWithAccessControlListImpl(
private[kyuubi] def setDenyIps(denyIps: Set[String]): Unit = {
this.denyIps = denyIps
}

private[kyuubi] def setIpAllowlist(ipAllowlist: Set[String]): Unit = {
this.ipAllowlist = ipAllowlist
}

private[kyuubi] def setUserAllowlist(userAllowlist: Set[String]): Unit = {
this.userAllowlist = userAllowlist
}
}

object SessionLimiter {
Expand All @@ -150,14 +173,18 @@ object SessionLimiter {
userIpAddressLimit: Int,
unlimitedUsers: Set[String] = Set.empty,
denyUsers: Set[String] = Set.empty,
denyIps: Set[String] = Set.empty): SessionLimiter = {
denyIps: Set[String] = Set.empty,
ipAllowlist: Set[String] = Set.empty,
userAllowlist: Set[String] = Set.empty): SessionLimiter = {
new SessionLimiterWithAccessControlListImpl(
userLimit,
ipAddressLimit,
userIpAddressLimit,
unlimitedUsers,
denyUsers,
denyIps)
denyIps,
ipAllowlist,
userAllowlist)
}

def resetUnlimitedUsers(limiter: SessionLimiter, unlimitedUsers: Set[String]): Unit =
Expand Down Expand Up @@ -192,4 +219,26 @@ object SessionLimiter {
case l: SessionLimiterWithAccessControlListImpl => l.denyIps
case _ => Set.empty
}

def resetIpAllowlist(limiter: SessionLimiter, ipAllowlist: Set[String]): Unit =
limiter match {
case l: SessionLimiterWithAccessControlListImpl => l.setIpAllowlist(ipAllowlist)
case _ =>
}

def getIpAllowlist(limiter: SessionLimiter): Set[String] = limiter match {
case l: SessionLimiterWithAccessControlListImpl => l.ipAllowlist
case _ => Set.empty
}

def resetUserAllowlist(limiter: SessionLimiter, userAllowlist: Set[String]): Unit =
limiter match {
case l: SessionLimiterWithAccessControlListImpl => l.setUserAllowlist(userAllowlist)
case _ =>
}

def getUserAllowlist(limiter: SessionLimiter): Set[String] = limiter match {
case l: SessionLimiterWithAccessControlListImpl => l.userAllowlist
case _ => Set.empty
}
}
Loading
Loading