Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ import scala.collection.mutable

class StatisticsManager {
// DataProcessor
// Plain maps (no withDefaultValue) so they survive Kryo round-trip.
Comment thread
Ma77Ball marked this conversation as resolved.
private val inputStatistics: mutable.Map[PortIdentity, (Long, Long)] =
mutable.Map.empty.withDefaultValue((0L, 0L))
mutable.Map.empty
Comment thread
Ma77Ball marked this conversation as resolved.
private val outputStatistics: mutable.Map[PortIdentity, (Long, Long)] =
mutable.Map.empty.withDefaultValue((0L, 0L))
mutable.Map.empty
private var dataProcessingTime: Long = 0L
private var totalExecutionTime: Long = 0L
private var workerStartTime: Long = 0L
Expand Down Expand Up @@ -82,8 +83,10 @@ class StatisticsManager {
*/
def increaseInputStatistics(portId: PortIdentity, size: Long): Unit = {
require(size >= 0, "Tuple size must be non-negative")
val (count, totalSize) = inputStatistics(portId)
inputStatistics.update(portId, (count + 1, totalSize + size))
inputStatistics.updateWith(portId) {
case Some((count, totalSize)) => Some((count + 1, totalSize + size))
case None => Some((1L, size))
}
}

/**
Expand All @@ -93,8 +96,10 @@ class StatisticsManager {
*/
def increaseOutputStatistics(portId: PortIdentity, size: Long): Unit = {
require(size >= 0, "Tuple size must be non-negative")
val (count, totalSize) = outputStatistics(portId)
outputStatistics.update(portId, (count + 1, totalSize + size))
outputStatistics.updateWith(portId) {
case Some((count, totalSize)) => Some((count + 1, totalSize + size))
case None => Some((1L, size))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,15 @@ class WorkerManagersSpec extends AnyFlatSpec {
val sm = new StatisticsManager()
sm.increaseOutputStatistics(PortIdentity(0), 30)
sm.increaseOutputStatistics(PortIdentity(0), 70)
assert(sm.getOutputTupleCount == 2L)
val out = sm.getStatistics(nullExec).outputTupleMetrics
assert(out.size == 1)
assert(out.head.tupleMetrics.count == 2L)
assert(out.head.tupleMetrics.size == 100L)
sm.increaseOutputStatistics(PortIdentity(1), 25)
assert(sm.getOutputTupleCount == 3L)
val byPort = sm
.getStatistics(nullExec)
.outputTupleMetrics
.map(m => m.portId -> (m.tupleMetrics.count, m.tupleMetrics.size))
.toMap
assert(byPort(PortIdentity(0)) == (2L, 100L))
assert(byPort(PortIdentity(1)) == (1L, 25L))
}

it should "reject negative tuple sizes" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class CheckpointSpec extends AnyFlatSpecLike with BeforeAndAfterAll {
system.actorOf(Props[SingleNodeListener](), "cluster-info")
}

"Default controller state" should "be serializable" in {
"Default controller state" should "round-trip through CheckpointState" in {
val cp =
new ControllerProcessor(
workflow.context,
Expand All @@ -73,16 +73,20 @@ class CheckpointSpec extends AnyFlatSpecLike with BeforeAndAfterAll {
)
val chkpt = new CheckpointState()
chkpt.save(CP_STATE_KEY, cp)
val restored: ControllerProcessor = chkpt.load(CP_STATE_KEY)
assert(restored.actorId == cp.actorId)
}

"Default worker state" should "be serializable" in {
"Default worker state" should "round-trip through CheckpointState" in {
val dp = new DataProcessor(
SELF,
msg => {},
inputMessageQueue = new LinkedBlockingQueue[DPInputQueueElement]()
)
val chkpt = new CheckpointState()
chkpt.save(DP_STATE_KEY, dp)
val restored: DataProcessor = chkpt.load(DP_STATE_KEY)
assert(restored.actorId == dp.actorId)
}

"CheckpointState" should "fail loudly on an unknown key" in {
Expand Down
Loading