Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Throttle keep-alive messages
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.sharding.typed.internal.ShardedDaemonProcessImpl$KeepAlivePinger$StartTick$")
9 changes: 8 additions & 1 deletion cluster-sharding-typed/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,18 @@ pekko.cluster.sharded-daemon-process {
# overriding those settings will be ignored.
sharding = ${pekko.cluster.sharding}

# Each entity is pinged at this interval from each node in the
# Each entity is pinged at this interval from a few nodes in the
# cluster to trigger a start if it has stopped, for example during
# rebalancing.
# See also keep-alive-from-number-of-nodes and keep-alive-throttle-interval
# Note: How the set of actors is kept alive may change in the future meaning this setting may go away.
keep-alive-interval = 10s

# Keep alive messages from this number of nodes.
keep-alive-from-number-of-nodes = 3

# Keep alive messages are sent with this delay between each message.
keep-alive-throttle-interval = 100 ms
}

pekko.cluster.configuration-compatibility-check.checkers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ object ShardedDaemonProcessSettings {
*/
def fromConfig(config: Config): ShardedDaemonProcessSettings = {
val keepAliveInterval = config.getDuration("keep-alive-interval").toScala
val keepAliveFromNumberOfNodes = config.getInt("keep-alive-from-number-of-nodes")
val keepAliveThrottleInterval = config.getDuration("keep-alive-throttle-interval").toScala

new ShardedDaemonProcessSettings(keepAliveInterval, None, None)
new ShardedDaemonProcessSettings(keepAliveInterval, None, None, keepAliveFromNumberOfNodes,
keepAliveThrottleInterval)
}

}
Expand All @@ -52,7 +55,9 @@ object ShardedDaemonProcessSettings {
final class ShardedDaemonProcessSettings @InternalApi private[pekko] (
val keepAliveInterval: FiniteDuration,
val shardingSettings: Option[ClusterShardingSettings],
val role: Option[String]) {
val role: Option[String],
val keepAliveFromNumberOfNodes: Int,
val keepAliveThrottleInterval: FiniteDuration) {

/**
* Scala API: The interval each parent of the sharded set is pinged from each node in the cluster.
Expand Down Expand Up @@ -86,10 +91,31 @@ final class ShardedDaemonProcessSettings @InternalApi private[pekko] (
def withRole(role: String): ShardedDaemonProcessSettings =
copy(role = Option(role))

/**
* Keep alive messages from this number of nodes.
*/
def withKeepAliveFromNumberOfNodes(keepAliveFromNumberOfNodes: Int): ShardedDaemonProcessSettings =
copy(keepAliveFromNumberOfNodes = keepAliveFromNumberOfNodes)

/**
* Scala API: Keep alive messages are sent with this delay between each message.
*/
def withKeepAliveThrottleInterval(keepAliveThrottleInterval: FiniteDuration): ShardedDaemonProcessSettings =
copy(keepAliveThrottleInterval = keepAliveThrottleInterval)

/**
* Java API: Keep alive messages are sent with this delay between each message.
*/
def withKeepAliveThrottleInterval(keepAliveThrottleInterval: Duration): ShardedDaemonProcessSettings =
copy(keepAliveThrottleInterval = keepAliveThrottleInterval.toScala)

private def copy(
keepAliveInterval: FiniteDuration = keepAliveInterval,
shardingSettings: Option[ClusterShardingSettings] = shardingSettings,
role: Option[String] = role): ShardedDaemonProcessSettings =
new ShardedDaemonProcessSettings(keepAliveInterval, shardingSettings, role)
role: Option[String] = role,
keepAliveFromNumberOfNodes: Int = keepAliveFromNumberOfNodes,
keepAliveThrottleInterval: FiniteDuration = keepAliveThrottleInterval): ShardedDaemonProcessSettings =
new ShardedDaemonProcessSettings(keepAliveInterval, shardingSettings, role, keepAliveFromNumberOfNodes,
keepAliveThrottleInterval)

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ import java.util.Optional
import java.util.function.IntFunction

import scala.jdk.OptionConverters._
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.reflect.ClassTag

import org.apache.pekko
import pekko.Done
import pekko.actor.typed.ActorRef
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.Behavior
import pekko.actor.typed.scaladsl.Behaviors
import pekko.actor.typed.scaladsl.LoggerOps
import pekko.annotation.InternalApi
import pekko.cluster.MemberStatus
import pekko.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import pekko.cluster.sharding.ShardRegion.EntityId
import pekko.cluster.sharding.typed.ClusterShardingSettings
Expand All @@ -42,7 +46,7 @@ import pekko.cluster.sharding.typed.scaladsl.StartEntity
import pekko.cluster.typed.Cluster
import pekko.cluster.typed.SelfUp
import pekko.cluster.typed.Subscribe
import pekko.util.PrettyDuration
import pekko.stream.scaladsl.Source

/**
* INTERNAL API
Expand All @@ -53,37 +57,67 @@ private[pekko] object ShardedDaemonProcessImpl {
object KeepAlivePinger {
sealed trait Event
private case object Tick extends Event
private case object StartTick extends Event
private case object SendKeepAliveDone extends Event

def apply[T](
settings: ShardedDaemonProcessSettings,
name: String,
identities: Set[EntityId],
shardingRef: ActorRef[ShardingEnvelope[T]]): Behavior[Event] =
Behaviors.setup { context =>
Cluster(context.system).subscriptions ! Subscribe(
context.messageAdapter[SelfUp](_ => StartTick),
classOf[SelfUp])
Behaviors.withTimers { timers =>
def triggerStartAll(): Unit = {
identities.foreach(id => shardingRef ! StartEntity(id))
shardingRef: ActorRef[ShardingEnvelope[T]]): Behavior[Event] = {
val sortedIdentities = identities.toVector.sorted

def sendKeepAliveMessages()(implicit sys: ActorSystem[_]): Future[Done] = {
if (settings.keepAliveThrottleInterval == Duration.Zero) {
sortedIdentities.foreach(id => shardingRef ! StartEntity(id))
Future.successful(Done)
} else {
Source(sortedIdentities).throttle(1, settings.keepAliveThrottleInterval).runForeach { id =>
shardingRef ! StartEntity(id)
}
}
}

Behaviors.setup[Event] { context =>
implicit val system: ActorSystem[_] = context.system
val cluster = Cluster(system)

if (cluster.selfMember.status == MemberStatus.Up)
context.self ! Tick
else
cluster.subscriptions ! Subscribe(context.messageAdapter[SelfUp](_ => Tick), classOf[SelfUp])

def isActive(): Boolean = {
val members = settings.role match {
case None => cluster.state.members
case Some(role) => cluster.state.members.filter(_.roles.contains(role))
}
// members are sorted so this is deterministic (the same) on all nodes
members.take(settings.keepAliveFromNumberOfNodes).contains(cluster.selfMember)
}

Behaviors.withTimers { timers =>
Behaviors.receiveMessage {
case StartTick =>
triggerStartAll()
context.log.debug2(
s"Starting Sharded Daemon Process KeepAlivePinger for [{}], with ping interval [{}]",
name,
PrettyDuration.format(settings.keepAliveInterval))
timers.startTimerWithFixedDelay(Tick, settings.keepAliveInterval)
Behaviors.same
case Tick =>
triggerStartAll()
context.log.debug("Periodic ping sent to [{}] processes", identities.size)
if (isActive()) {
context.log.debug2(
s"Sending periodic keep alive for Sharded Daemon Process [{}] to [{}] processes.",
name,
sortedIdentities.size)
context.pipeToSelf(sendKeepAliveMessages()) { _ =>
SendKeepAliveDone
}
} else {
timers.startSingleTimer(Tick, settings.keepAliveInterval)
}
Behaviors.same
case SendKeepAliveDone =>
timers.startSingleTimer(Tick, settings.keepAliveInterval)
Behaviors.same
}
}
}
}

}

final class MessageExtractor[T] extends ShardingMessageExtractor[ShardingEnvelope[T], T] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import pekko.actor.typed.Behavior
import pekko.actor.typed.scaladsl.Behaviors
import pekko.cluster.MemberStatus
import pekko.cluster.sharding.typed.ShardedDaemonProcessSettings
import pekko.cluster.sharding.typed.internal.ShardedDaemonProcessImpl.KeepAlivePinger
import pekko.cluster.typed.Cluster
import pekko.cluster.typed.Join

Expand All @@ -43,10 +44,12 @@ object ShardedDaemonProcessSpec {

# ping often/start fast for test
pekko.cluster.sharded-daemon-process.keep-alive-interval = 1s
pekko.cluster.sharded-daemon-process.keep-alive-throttle-interval = 20ms

pekko.coordinated-shutdown.terminate-actor-system = off
pekko.coordinated-shutdown.run-by-actor-system-terminate = off
""")
"""
)

object MyActor {
sealed trait Command
Expand Down Expand Up @@ -74,7 +77,7 @@ class ShardedDaemonProcessSpec

import ShardedDaemonProcessSpec._

"The ShardedDaemonSet" must {
"The ShardedDaemonProcess" must {

"have a single node cluster running first" in {
val probe = createTestProbe()
Expand Down Expand Up @@ -114,6 +117,33 @@ class ShardedDaemonProcessSpec

}

"KeepAlivePinger" must {
"have a single node cluster running first" in {
val probe = createTestProbe()
Cluster(system).manager ! Join(Cluster(system).selfMember.address)
probe.awaitAssert({
Cluster(system).selfMember.status == MemberStatus.Up
}, 3.seconds)
}

"throttle keep alive messages" in {
val shardingProbe = createTestProbe[Any]()
val settings = ShardedDaemonProcessSettings(system).withKeepAliveThrottleInterval(1.second)
val pinger = spawn(KeepAlivePinger(settings, "throttle", Set("1", "2", "3"), shardingProbe.ref))
// note that StartEntity.apply is actually a ShardingEnvelope wrapping the StartEntity message
shardingProbe.expectMessage(StartEntity("1"))
shardingProbe.expectNoMessage(100.millis)
shardingProbe.expectMessage(StartEntity("2"))
shardingProbe.expectNoMessage(100.millis)
shardingProbe.expectMessage(StartEntity("3"))
shardingProbe.expectNoMessage(100.millis)
shardingProbe.expectMessage(StartEntity("1"))

testKit.stop(pinger)
}

}

object TagProcessor {
sealed trait Command
def apply(tag: String): Behavior[Command] = Behaviors.setup { ctx =>
Expand Down
Loading