Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,7 @@ object GSetKey {
}

@SerialVersionUID(1L)
final case class GSetKey[A](_id: String) extends Key[GSet[A]](_id) with ReplicatedDataSerialization
final case class GSetKey[A](_id: String) extends Key[GSet[A]](_id) with ReplicatedDataSerialization {
override def withId(newId: Key.KeyId): GSetKey[A] =
GSetKey(newId)
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,7 @@ object FlagKey {
}

@SerialVersionUID(1L)
final case class FlagKey(_id: String) extends Key[Flag](_id) with ReplicatedDataSerialization
final case class FlagKey(_id: String) extends Key[Flag](_id) with ReplicatedDataSerialization {
override def withId(newId: Key.KeyId): FlagKey =
FlagKey(newId)
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,7 @@ object GCounterKey {
}

@SerialVersionUID(1L)
final case class GCounterKey(_id: String) extends Key[GCounter](_id) with ReplicatedDataSerialization
final case class GCounterKey(_id: String) extends Key[GCounter](_id) with ReplicatedDataSerialization {
override def withId(newId: Key.KeyId): GCounterKey =
GCounterKey(newId)
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package org.apache.pekko.cluster.ddata

import org.apache.pekko.cluster.ddata.Key.UnspecificKey

object Key {

/**
Expand All @@ -24,6 +26,8 @@ object Key {

type KeyId = String

final case class UnspecificKey(_id: KeyId) extends Key[ReplicatedData](_id) with ReplicatedDataSerialization

}

/**
Expand All @@ -36,6 +40,9 @@ object Key {
*/
abstract class Key[+T <: ReplicatedData](val id: Key.KeyId) extends Serializable {

def withId(newId: Key.KeyId): Key[ReplicatedData] =
UnspecificKey(newId)

override final def equals(o: Any): Boolean = o match {
case k: Key[_] => id == k.id
case _ => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,7 @@ object LWWMapKey {
}

@SerialVersionUID(1L)
final case class LWWMapKey[A, B](_id: String) extends Key[LWWMap[A, B]](_id) with ReplicatedDataSerialization
final case class LWWMapKey[A, B](_id: String) extends Key[LWWMap[A, B]](_id) with ReplicatedDataSerialization {
override def withId(newId: Key.KeyId): LWWMapKey[A, B] =
LWWMapKey(newId)
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,7 @@ object LWWRegisterKey {
}

@SerialVersionUID(1L)
final case class LWWRegisterKey[A](_id: String) extends Key[LWWRegister[A]](_id) with ReplicatedDataSerialization
final case class LWWRegisterKey[A](_id: String) extends Key[LWWRegister[A]](_id) with ReplicatedDataSerialization {
override def withId(newId: Key.KeyId): LWWRegisterKey[A] =
LWWRegisterKey(newId)
}
Original file line number Diff line number Diff line change
Expand Up @@ -562,4 +562,7 @@ object ORMapKey {
@SerialVersionUID(1L)
final case class ORMapKey[A, B <: ReplicatedData](_id: String)
extends Key[ORMap[A, B]](_id)
with ReplicatedDataSerialization
with ReplicatedDataSerialization {
override def withId(newId: Key.KeyId): ORMapKey[A, B] =
ORMapKey(newId)
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,4 +328,7 @@ object ORMultiMapKey {
}

@SerialVersionUID(1L)
final case class ORMultiMapKey[A, B](_id: String) extends Key[ORMultiMap[A, B]](_id) with ReplicatedDataSerialization
final case class ORMultiMapKey[A, B](_id: String) extends Key[ORMultiMap[A, B]](_id) with ReplicatedDataSerialization {
override def withId(newId: Key.KeyId): ORMultiMapKey[A, B] =
ORMultiMapKey(newId)
}
Original file line number Diff line number Diff line change
Expand Up @@ -574,4 +574,7 @@ object ORSetKey {
}

@SerialVersionUID(1L)
final case class ORSetKey[A](_id: String) extends Key[ORSet[A]](_id) with ReplicatedDataSerialization
final case class ORSetKey[A](_id: String) extends Key[ORSet[A]](_id) with ReplicatedDataSerialization {
override def withId(newId: Key.KeyId): ORSetKey[A] =
ORSetKey(newId)
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,7 @@ object PNCounterKey {
}

@SerialVersionUID(1L)
final case class PNCounterKey(_id: String) extends Key[PNCounter](_id) with ReplicatedDataSerialization
final case class PNCounterKey(_id: String) extends Key[PNCounter](_id) with ReplicatedDataSerialization {
override def withId(newId: Key.KeyId): PNCounterKey =
PNCounterKey(newId)
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,7 @@ object PNCounterMapKey {
}

@SerialVersionUID(1L)
final case class PNCounterMapKey[A](_id: String) extends Key[PNCounterMap[A]](_id) with ReplicatedDataSerialization
final case class PNCounterMapKey[A](_id: String) extends Key[PNCounterMap[A]](_id) with ReplicatedDataSerialization {
override def withId(newId: Key.KeyId): PNCounterMapKey[A] =
PNCounterMapKey(newId)
}
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,10 @@ object Replicator {
* when the value of the given `key` is changed. Current value is also
* sent as a [[Changed]] message to a new subscriber.
*
* In addition to subscribing to individual keys it is possible to subscribe to all keys with a given prefix
* by using a `*` at the end of the key `id`. For example `GCounterKey("counter-*")`. Notifications will be
* sent for all matching keys, also new keys added later.
*
* Subscribers will be notified periodically with the configured `notify-subscribers-interval`,
* and it is also possible to send an explicit `FlushChanges` message to
* the `Replicator` to notify the subscribers immediately.
Expand Down Expand Up @@ -1310,14 +1314,14 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val serializer = SerializationExtension(context.system).serializerFor(classOf[DataEnvelope])
val maxPruningDisseminationNanos = maxPruningDissemination.toNanos

val expiryWildcards = settings.expiryKeys.collect { case (k, v) if k.endsWith("*") => k.dropRight(1) -> v }
val expiryWildcards = settings.expiryKeys.collect { case (k, v) if isWildcard(k) => dropWildcard(k) -> v }
val expiryEnabled: Boolean = settings.expiryKeys.nonEmpty
// updated on the gossip tick to avoid too many calls to `currentTimeMillis()`
private var currentUsedTimestamp = if (expiryEnabled) System.currentTimeMillis() else 0L

val hasDurableKeys = settings.durableKeys.nonEmpty
val durable = settings.durableKeys.filterNot(_.endsWith("*"))
val durableWildcards = settings.durableKeys.collect { case k if k.endsWith("*") => k.dropRight(1) }
val durable = settings.durableKeys.filterNot(isWildcard)
val durableWildcards = settings.durableKeys.collect { case k if isWildcard(k) => dropWildcard(k) }
val durableStore: ActorRef =
if (hasDurableKeys) {
val props = settings.durableStoreProps match {
Expand Down Expand Up @@ -1414,6 +1418,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
@nowarn("msg=deprecated")
val newSubscribers = new mutable.HashMap[KeyId, mutable.Set[ActorRef]] with mutable.MultiMap[KeyId, ActorRef]
var subscriptionKeys = Map.empty[KeyId, KeyR]
@nowarn("msg=deprecated")
val wildcardSubscribers = new mutable.HashMap[KeyId, mutable.Set[ActorRef]] with mutable.MultiMap[KeyId, ActorRef]

// To be able to do efficient stashing we use this field instead of sender().
// Using internal buffer instead of Stash to avoid the overhead of the Stash mailbox.
Expand Down Expand Up @@ -1883,7 +1889,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}

val dig =
if (subscribers.contains(key) && !changed.contains(key)) {
if (hasSubscriber(key) && !changed.contains(key)) {
val oldDigest = getDigest(key)
val (dig, payloadSize) = digest(newEnvelope)
payloadSizeAggregator.updatePayloadSize(key, payloadSize)
Expand Down Expand Up @@ -1963,7 +1969,12 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}

def isExpired(key: KeyId, timestamp: Timestamp, now: Long): Boolean = {
expiryEnabled && timestamp != 0L && timestamp <= now - getExpiryDuration(key).toMillis
if (expiryEnabled && timestamp != 0L) {
val expiryDuration = getExpiryDuration(key)
expiryDuration != Duration.Zero && timestamp <= now - expiryDuration.toMillis
} else {
false
}
}

def updateUsedTimestamp(key: KeyId, timestamp: Timestamp): Unit = {
Expand Down Expand Up @@ -1993,8 +2004,15 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog

@nowarn("msg=deprecated")
def receiveFlushChanges(): Unit = {
def notify(keyId: KeyId, subs: mutable.Set[ActorRef], sendExpiredIfMissing: Boolean): Unit = {
val key = subscriptionKeys(keyId)
def notify(keyId: KeyId, subs: Iterator[ActorRef], sendExpiredIfMissing: Boolean): Unit = {
val key = subscriptionKeys.get(keyId) match {
case Some(r) => r
case None =>
subscriptionKeys
.collectFirst { case (k, r) if isWildcard(k) && keyId.startsWith(dropWildcard(k)) => r.withId(keyId) }
.getOrElse(throw new IllegalStateException(s"Subscription notification of [$keyId], but no matching " +
s"subscription key in [${subscriptionKeys.keysIterator.mkString(", ")}]"))
}
getData(keyId) match {
case Some(envelope) =>
val msg = if (envelope.data == DeletedData) Deleted(key) else Changed(key)(envelope.data)
Expand All @@ -2009,17 +2027,25 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
}

if (subscribers.nonEmpty) {
for (key <- changed; if subscribers.contains(key); subs <- subscribers.get(key))
notify(key, subs, sendExpiredIfMissing = true)
if (subscribers.nonEmpty || wildcardSubscribers.nonEmpty) {
changed.foreach { key =>
if (hasSubscriber(key)) notify(key, getSubscribersIterator(key), sendExpiredIfMissing = true)
}
}

// Changed event is sent to new subscribers even though the key has not changed,
// i.e. send current value. Expired is not sent to new subscribers as the first event.
if (newSubscribers.nonEmpty) {
for ((key, subs) <- newSubscribers) {
notify(key, subs, sendExpiredIfMissing = false)
subs.foreach { subscribers.addBinding(key, _) }
if (isWildcard(key)) {
dataEntries.keysIterator.filter(_.startsWith(dropWildcard(key))).foreach { matchingKey =>
notify(matchingKey, subs.iterator, sendExpiredIfMissing = false)
}
subs.foreach { wildcardSubscribers.addBinding(dropWildcard(key), _) }
} else {
notify(key, subs.iterator, sendExpiredIfMissing = false)
subs.foreach { subscribers.addBinding(key, _) }
}
}
newSubscribers.clear()
}
Expand Down Expand Up @@ -2331,18 +2357,39 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}

def receiveUnsubscribe(key: KeyR, subscriber: ActorRef): Unit = {
subscribers.removeBinding(key.id, subscriber)
if (isWildcard(key.id)) wildcardSubscribers.removeBinding(dropWildcard(key.id), subscriber)
else subscribers.removeBinding(key.id, subscriber)
newSubscribers.removeBinding(key.id, subscriber)
if (!hasSubscriber(subscriber))
context.unwatch(subscriber)
if (!subscribers.contains(key.id) && !newSubscribers.contains(key.id))
if (!hasSubscriber(key.id) && !newSubscribers.contains(key.id))
subscriptionKeys -= key.id
}

def hasSubscriber(subscriber: ActorRef): Boolean =
subscribers.exists { case (_, s) => s.contains(subscriber) } ||
wildcardSubscribers.exists { case (_, s) => s.contains(subscriber) } ||
newSubscribers.exists { case (_, s) => s.contains(subscriber) }

private def hasSubscriber(keyId: KeyId): Boolean =
subscribers.contains(keyId) ||
(wildcardSubscribers.nonEmpty && wildcardSubscribers.exists { case (k, _) => keyId.startsWith(k) })

private def getSubscribersIterator(keyId: KeyId): Iterator[ActorRef] = {
val subscribersIter = subscribers.get(keyId).map(_.iterator).getOrElse(Iterator.empty)
if (wildcardSubscribers.isEmpty) subscribersIter
else
subscribersIter ++
wildcardSubscribers
.collectFirst { case (k, v) if keyId.startsWith(k) => v }
.map(_.iterator)
.getOrElse(Iterator.empty)
}

private def isWildcard(keyId: KeyId): Boolean = keyId.endsWith("*")

private def dropWildcard(keyId: KeyId): KeyId = keyId.dropRight(1)

def receiveTerminated(ref: ActorRef): Unit = {
if (ref == durableStore) {
log.error("Stopping distributed-data Replicator because durable store terminated")
Expand All @@ -2352,13 +2399,17 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
keys1.foreach { key =>
subscribers.removeBinding(key, ref)
}
val keys2 = newSubscribers.collect { case (k, s) if s.contains(ref) => k }
val keys2 = wildcardSubscribers.collect { case (k, s) if s.contains(ref) => k }
keys2.foreach { key =>
wildcardSubscribers.removeBinding(key, ref)
}
val keys3 = newSubscribers.collect { case (k, s) if s.contains(ref) => k }
keys3.foreach { key =>
newSubscribers.removeBinding(key, ref)
}

(keys1 ++ keys2).foreach { key =>
if (!subscribers.contains(key) && !newSubscribers.contains(key))
(keys1 ++ keys3).foreach { key =>
if (!hasSubscriber(key) && !newSubscribers.contains(key))
subscriptionKeys -= key
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
private val ORMultiMapManifest = "K"
private val ORMultiMapKeyManifest = "k"
private val VersionVectorManifest = "L"
private val UnspecificKeyManifest = "m"

private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] => AnyRef](
GSetManifest -> gsetFromBinary,
Expand Down Expand Up @@ -336,7 +337,8 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
ORMapKeyManifest -> (bytes => ORMapKey(keyIdFromBinary(bytes))),
LWWMapKeyManifest -> (bytes => LWWMapKey(keyIdFromBinary(bytes))),
PNCounterMapKeyManifest -> (bytes => PNCounterMapKey(keyIdFromBinary(bytes))),
ORMultiMapKeyManifest -> (bytes => ORMultiMapKey(keyIdFromBinary(bytes))))
ORMultiMapKeyManifest -> (bytes => ORMultiMapKey(keyIdFromBinary(bytes))),
UnspecificKeyManifest -> (bytes => Key.UnspecificKey(keyIdFromBinary(bytes))))

override def manifest(obj: AnyRef): String = obj match {
case _: ORSet[_] => ORSetManifest
Expand Down Expand Up @@ -368,6 +370,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
case _: LWWMapKey[_, _] => LWWMapKeyManifest
case _: PNCounterMapKey[_] => PNCounterMapKeyManifest
case _: ORMultiMapKey[_, _] => ORMultiMapKeyManifest
case _: Key.UnspecificKey => UnspecificKeyManifest

case _: ORSet.DeltaGroup[_] => ORSetDeltaGroupManifest
case _: ORMap.DeltaGroup[_, _] => ORMapDeltaGroupManifest
Expand Down
Loading
Loading