diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala index 8ae0419f0a3..ab46e17654c 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala @@ -31,10 +31,11 @@ import scala.collection.mutable class StatisticsManager { // DataProcessor + // Plain maps (no withDefaultValue) so they survive Kryo round-trip. private val inputStatistics: mutable.Map[PortIdentity, (Long, Long)] = - mutable.Map.empty.withDefaultValue((0L, 0L)) + mutable.Map.empty private val outputStatistics: mutable.Map[PortIdentity, (Long, Long)] = - mutable.Map.empty.withDefaultValue((0L, 0L)) + mutable.Map.empty private var dataProcessingTime: Long = 0L private var totalExecutionTime: Long = 0L private var workerStartTime: Long = 0L @@ -82,8 +83,10 @@ class StatisticsManager { */ def increaseInputStatistics(portId: PortIdentity, size: Long): Unit = { require(size >= 0, "Tuple size must be non-negative") - val (count, totalSize) = inputStatistics(portId) - inputStatistics.update(portId, (count + 1, totalSize + size)) + inputStatistics.updateWith(portId) { + case Some((count, totalSize)) => Some((count + 1, totalSize + size)) + case None => Some((1L, size)) + } } /** @@ -93,8 +96,10 @@ class StatisticsManager { */ def increaseOutputStatistics(portId: PortIdentity, size: Long): Unit = { require(size >= 0, "Tuple size must be non-negative") - val (count, totalSize) = outputStatistics(portId) - outputStatistics.update(portId, (count + 1, totalSize + size)) + outputStatistics.updateWith(portId) { + case Some((count, totalSize)) => Some((count + 1, totalSize + size)) + case None => Some((1L, size)) + } } /** diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala index 3fbff39148c..1932823f5dc 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala @@ -76,11 +76,15 @@ class WorkerManagersSpec extends AnyFlatSpec { val sm = new StatisticsManager() sm.increaseOutputStatistics(PortIdentity(0), 30) sm.increaseOutputStatistics(PortIdentity(0), 70) - assert(sm.getOutputTupleCount == 2L) - val out = sm.getStatistics(nullExec).outputTupleMetrics - assert(out.size == 1) - assert(out.head.tupleMetrics.count == 2L) - assert(out.head.tupleMetrics.size == 100L) + sm.increaseOutputStatistics(PortIdentity(1), 25) + assert(sm.getOutputTupleCount == 3L) + val byPort = sm + .getStatistics(nullExec) + .outputTupleMetrics + .map(m => m.portId -> (m.tupleMetrics.count, m.tupleMetrics.size)) + .toMap + assert(byPort(PortIdentity(0)) == (2L, 100L)) + assert(byPort(PortIdentity(1)) == (1L, 25L)) } it should "reject negative tuple sizes" in { diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala index fbc7e8044df..3d207fd23b3 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala @@ -63,7 +63,7 @@ class CheckpointSpec extends AnyFlatSpecLike with BeforeAndAfterAll { system.actorOf(Props[SingleNodeListener](), "cluster-info") } - "Default controller state" should "be serializable" in { + "Default controller state" should "round-trip through CheckpointState" in { val cp = new ControllerProcessor( workflow.context, @@ -73,9 +73,11 @@ class CheckpointSpec extends AnyFlatSpecLike with BeforeAndAfterAll { ) val chkpt = new CheckpointState() chkpt.save(CP_STATE_KEY, cp) + val restored: ControllerProcessor = chkpt.load(CP_STATE_KEY) + assert(restored.actorId == cp.actorId) } - "Default worker state" should "be serializable" in { + "Default worker state" should "round-trip through CheckpointState" in { val dp = new DataProcessor( SELF, msg => {}, @@ -83,6 +85,8 @@ class CheckpointSpec extends AnyFlatSpecLike with BeforeAndAfterAll { ) val chkpt = new CheckpointState() chkpt.save(DP_STATE_KEY, dp) + val restored: DataProcessor = chkpt.load(DP_STATE_KEY) + assert(restored.actorId == dp.actorId) } "CheckpointState" should "fail loudly on an unknown key" in {