Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python_Versions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"revision": 3
"revision": 7
}
1 change: 1 addition & 0 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def sickbayTests = [
def createPrismValidatesRunnerTask = { name, environmentType ->
Task vrTask = tasks.create(name: name, type: Test, group: "Verification") {
description "PrismRunner Java $environmentType ValidatesRunner suite"
outputs.upToDateWhen { false }
classpath = configurations.validatesRunner

var prismBuildTask = dependsOn(':runners:prism:build')
Expand Down
18 changes: 13 additions & 5 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,10 @@ type ElementManager struct {
}

func (em *ElementManager) addPending(v int) {
prev := em.livePending.Load()
em.livePending.Add(int64(v))
em.pendingElements.Add(v)
slog.Info("em.addPending", "delta", v, "prev", prev, "current", em.livePending.Load())
}

// LinkID represents a fully qualified input or output.
Expand Down Expand Up @@ -387,8 +389,11 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.
em.pendingElements.Wait()
slog.Debug("no more pending elements: terminating pipeline")
cancelFn(fmt.Errorf("elementManager out of elements, cleaning up"))
// Ensure the watermark evaluation goroutine exits.
// Ensure the watermark evaluation goroutine exits by locking the mutex
// before broadcasting, preventing a lost wake-up signal.
em.refreshCond.L.Lock()
em.refreshCond.Broadcast()
em.refreshCond.L.Unlock()
}()
// Watermark evaluation goroutine.
go func() {
Expand Down Expand Up @@ -530,7 +535,7 @@ func (em *ElementManager) DumpStages() string {
stageState = append(stageState, fmt.Sprintf("TestStreamHandler: completed %v, curIndex %v of %v events: %+v, processingTime %v, %v, ptEvents %v \n",
em.testStreamHandler.completed, em.testStreamHandler.nextEventIndex, len(em.testStreamHandler.events), em.testStreamHandler.events, em.testStreamHandler.processingTime, mtime.FromTime(em.testStreamHandler.processingTime), em.processTimeEvents))
} else {
stageState = append(stageState, fmt.Sprintf("ElementManager Now: %v processingTimeEvents: %v injectedBundles: %v\n", em.processingTimeNow(), em.processTimeEvents.events, em.injectedBundles))
stageState = append(stageState, fmt.Sprintf("ElementManager Now: %v processingTimeEvents: %v injectedBundles: %v livePending: %v\n", em.processingTimeNow(), em.processTimeEvents.events, em.injectedBundles, em.livePending.Load()))
}
sort.Strings(ids)
for _, id := range ids {
Expand Down Expand Up @@ -1091,9 +1096,8 @@ func (em *ElementManager) FailBundle(rb RunBundle) {
em.markChangedAndClearBundle(rb.StageID, rb.BundleID, nil)
}

// ReturnResiduals is called after a successful split, so the remaining work
// can be re-assigned to a new bundle.
func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, inputInfo PColInfo, residuals Residuals) {
slog.Info("ElementManager.ReturnResiduals start", "bundle", rb, "firstRsIndex", firstRsIndex)
stage := em.stages[rb.StageID]

stage.splitBundle(rb, firstRsIndex, em)
Expand All @@ -1103,6 +1107,7 @@ func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, inputI
count := stage.AddPending(em, unprocessedElements)
em.addPending(count)
}
slog.Info("ElementManager.ReturnResiduals end", "bundle", rb, "unprocessedCount", len(unprocessedElements), "livePending", em.livePending.Load())
em.markStagesAsChanged(singleSet(rb.StageID))
}

Expand Down Expand Up @@ -2187,7 +2192,7 @@ func (ss *stageState) splitBundle(rb RunBundle, firstResidual int, em *ElementMa
defer ss.mu.Unlock()

es := ss.inprogress[rb.BundleID]
slog.Debug("split elements", "bundle", rb, "elem count", len(es.es), "res", firstResidual)
slog.Info("splitBundle start", "bundle", rb, "elem count", len(es.es), "firstResidual", firstResidual, "livePending", em.livePending.Load())

prim := es.es[:firstResidual]
res := es.es[firstResidual:]
Expand All @@ -2207,6 +2212,7 @@ func (ss *stageState) splitBundle(rb RunBundle, firstResidual int, em *ElementMa
// we don't need to increment pending count in em, since it is already pending
ss.kind.addPending(ss, em, res)
ss.inprogress[rb.BundleID] = es
slog.Info("splitBundle completed", "bundle", rb, "primaryCount", len(prim), "residualCount", len(res), "livePending", em.livePending.Load())
}

// minimumPendingTimestamp returns the minimum pending timestamp from all pending elements,
Expand Down Expand Up @@ -2496,7 +2502,9 @@ func (em *ElementManager) wakeUpAt(t mtime.Time) {
// only create this goroutine if we have real-time clock enabled (also implying the pipeline does not have TestStream).
go func(fireAt time.Time) {
time.AfterFunc(time.Until(fireAt), func() {
em.refreshCond.L.Lock()
em.refreshCond.Broadcast()
em.refreshCond.L.Unlock()
})
}(t.ToTime())
}
Expand Down
5 changes: 4 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,12 @@ func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *wo

// Previous context cancelled so we need a new one
// for this request.
pool.StopWorker(bgContext, &fnpb.StopWorkerRequest{
_, err = pool.StopWorker(bgContext, &fnpb.StopWorkerRequest{
WorkerId: wk.ID,
})
if err != nil {
slog.Warn("StopWorker failed", "worker", wk, "error", err)
}
wk.Stop()
}

Expand Down
22 changes: 22 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"io"
"log/slog"
"runtime/debug"
"runtime/pprof"
"sort"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -391,6 +393,19 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
// Log a heartbeat every 60 seconds
case <-ticker.C:
j.Logger.Info("pipeline is running", slog.String("job", j.String()))
j.Logger.Info("pipeline stages state", slog.String("stages", em.DumpStages()))
var buf strings.Builder
goroutineDump(&buf)
j.Logger.Info("goroutines", slog.String("dump", buf.String()))
for envID, wk := range wks {
if wk != nil && wk.Connected() && !wk.Stopped() {
j.Logger.Info("worker status",
slog.String("workerID", wk.ID),
slog.String("envID", envID),
slog.Duration("uptime", wk.Uptime()),
slog.Any("active_bundles", wk.ActiveBundles()))
}
}
}
}
}
Expand Down Expand Up @@ -501,3 +516,10 @@ func buildTrigger(tpb *pipepb.Trigger) engine.Trigger {
return &engine.TriggerDefault{}
}
}

func goroutineDump(statusInfo *strings.Builder) {
profile := pprof.Lookup("goroutine")
if profile != nil {
profile.WriteTo(statusInfo, 1)
}
}
8 changes: 7 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, c
panic(err)
}

bundleStart := time.Now()

// Progress + split loop.
previousIndex := int64(-2)
previousTotalCount := int64(-2) // Total count of all pcollection elements.
Expand Down Expand Up @@ -232,7 +234,11 @@ progress:
md := wk.MonitoringMetadata(ctx, unknownIDs)
j.AddMetricShortIDs(md)
}
slog.Debug("progress report", "bundle", rb, "index", index, "prevIndex", previousIndex)
runningFor := time.Since(bundleStart)
slog.Debug("progress report", "bundle", rb, "runningFor", runningFor, "index", index, "prevIndex", previousIndex)
if runningFor > 5*time.Minute {
slog.Warn("Bundle has been running for a long time", "bundle", rb, "runningFor", runningFor, "worker", wk.ID)
}

// Check if there has been any measurable progress by the input, or all output pcollections since last report.
slow := previousIndex == index["index"] && previousTotalCount == index["totalCount"]
Expand Down
27 changes: 27 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type W struct {
// These are the ID sources
inst uint64
connected, stopped atomic.Bool
StartTime time.Time
StoppedChan chan struct{} // Channel to Broadcast stopped state.

InstReqs chan *fnpb.InstructionRequest
Expand Down Expand Up @@ -292,11 +293,37 @@ func (wk *W) Stopped() bool {
return wk.stopped.Load()
}

// Uptime returns how long the worker has been connected.
func (wk *W) Uptime() time.Duration {
wk.mu.Lock()
defer wk.mu.Unlock()
if wk.StartTime.IsZero() {
return 0
}
return time.Since(wk.StartTime)
}

// ActiveBundles returns a list of active bundles currently processing on this worker.
func (wk *W) ActiveBundles() []string {
wk.mu.Lock()
defer wk.mu.Unlock()
var bundles []string
for id, responder := range wk.activeInstructions {
if b, ok := responder.(*B); ok {
bundles = append(bundles, fmt.Sprintf("%s (%s)", id, b.PBDID))
}
}
return bundles
}

// Control relays instructions to SDKs and back again, coordinated via unique instructionIDs.
//
// Requests come from the runner, and are sent to the client in the SDK.
func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
wk.connected.Store(true)
wk.mu.Lock()
wk.StartTime = time.Now()
wk.mu.Unlock()
done := make(chan error, 1)
go func() {
for {
Expand Down
14 changes: 12 additions & 2 deletions sdks/python/apache_beam/runners/portability/prism_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,18 @@ def filter(self, record):
record.pathname = json_record["source"]["file"]
record.filename = os.path.basename(record.pathname)
record.lineno = json_record["source"]["line"]
record.created = datetime.datetime.fromisoformat(
json_record["time"]).timestamp()
time_str = json_record["time"]
match = re.match(r"^(.*?)(\.\d+)?(Z|[+-]\d{2}:?\d{2})?$", time_str)
if match:
base, frac, tz = match.groups()
if frac:
frac = (frac + "000000")[:7]
else:
frac = ""
if tz == 'Z':
tz = '+00:00'
time_str = base + frac + (tz or "")
record.created = datetime.datetime.fromisoformat(time_str).timestamp()
extras = {
k: v
for k, v in json_record.items()
Expand Down
Loading