Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ publishing {
maven(MavenPublication) {
groupId = 'com.joom.spark'
artifactId = "spark-platform_$scalaVersion"
version = '0.4.9'
version = '0.4.10'

from components.java

Expand Down
2 changes: 2 additions & 0 deletions lib/src/main/scala/com/joom/spark/monitoring/Parts.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ case class StageSummary(
memorySpillGB: Double,
diskSpillGB: Double,
inputGB: Double,
outputGB: Double,
shuffleWriteGB: Double,
peakExecutionMemoryGB: Double,
properties: Map[String, String],
endTs: Long,
details: Option[String],
)

case class ExecutorMetric(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String,
var sent: Boolean = false,
var startedTaskCount: Int = 0,
var failureReason: Option[String] = None,
var properties: Map[String, String] = Map())
var properties: Map[String, String] = Map(),
var details: Option[String] = None)
private val tasksPerStage = mutable.Map[StageFullId, mutable.ArrayBuffer[(TaskMetrics, TaskEndReason)]]()
private val stageState = mutable.Map[StageFullId, StageState]()
private val appStart: Instant = Instant.now()
Expand Down Expand Up @@ -155,7 +156,9 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String,
}.toMap

val startTime = stageSubmitted.stageInfo.submissionTime.map(Instant.ofEpochMilli).getOrElse(Instant.now())
stageState.getOrElseUpdate(stageFullId, StageState(startTime)).properties = properties
val state = stageState.getOrElseUpdate(stageFullId, StageState(startTime))
state.properties = properties
state.details = Some(stageSubmitted.stageInfo.details)
}

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
Expand Down Expand Up @@ -236,7 +239,7 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String,
val failureReason = state.failureReason
val startTime = state.startTime
val summary = summarizeStage(appId, stageFullId.stageId, stageFullId.attemptNumber, success, failureReason,
startTime, tasks.toSeq, state.properties, endTs)
startTime, tasks.toSeq, state.properties, endTs, state.details)

implicit val codec: JsonValueCodec[StageSummary] = JsonCodecMaker.make
send("stages", summary.get)
Expand Down Expand Up @@ -342,7 +345,9 @@ object StatsReportingSparkListener {
failureReason: Option[String], startTime: Instant,
rawTaskMetrics: Seq[(TaskMetrics, TaskEndReason)],
properties: Map[String, String],
endTime: Option[Long]): Option[StageSummary] = {
endTime: Option[Long],
details: Option[String],
): Option[StageSummary] = {
val taskMetrics = rawTaskMetrics.map(_._1)
.filter(_ != null) // For failed tasks, there will be 'null' TaskMetrics instances.
val runTimes = taskMetrics.map(_.executorRunTime.toDouble / 1000.0)
Expand Down Expand Up @@ -375,10 +380,12 @@ object StatsReportingSparkListener {
memorySpillGB = taskMetrics.map(_.memoryBytesSpilled.toDouble).sum / GiB,
diskSpillGB = taskMetrics.map(_.diskBytesSpilled).sum / GiB,
inputGB = taskMetrics.map(_.inputMetrics.bytesRead).sum / GiB,
outputGB = taskMetrics.map(_.outputMetrics.bytesWritten).sum / GiB,
shuffleWriteGB = taskMetrics.map(_.shuffleWriteMetrics.bytesWritten).sum / GiB,
peakExecutionMemoryGB = taskMetrics.map(_.peakExecutionMemory).sum / GiB,
properties = properties,
endTs = endTs,
details = details,
))
}
}