Skip to content

Commit e86c8e6

Browse files
committed
add tracking warn
Signed-off-by: Pavel Okhlopkov <pavel.okhlopkov@flant.com>
1 parent e52cd24 commit e86c8e6

1 file changed

Lines changed: 120 additions & 2 deletions

File tree

pkg/task/queue/task_queue.go

Lines changed: 120 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"log/slog"
77
"os"
88
"reflect"
9+
"runtime"
10+
"strconv"
911
"strings"
1012
"sync"
1113
"sync/atomic"
@@ -130,6 +132,21 @@ type TaskQueue struct {
130132
contextBuffer []bindingcontext.BindingContext
131133
monitorIDBuffer []string
132134
groupBuffer map[string]*compactionGroup
135+
136+
// Lock ownership tracking for recursive lock detection
137+
writeLockOwner int64 // goroutine ID of write lock owner
138+
readLockOwners sync.Map // map[goroutineID]bool for read lock owners
139+
140+
// Method call tracking for debugging
141+
addFirstCalled atomic.Bool
142+
removeFirstCalled atomic.Bool
143+
addLastCalled atomic.Bool
144+
removeLastCalled atomic.Bool
145+
addAfterCalled atomic.Bool
146+
addBeforeCalled atomic.Bool
147+
removeCalled atomic.Bool
148+
deleteFuncCalled atomic.Bool
149+
startCalled atomic.Bool
133150
}
134151

135152
// TaskQueueOption defines a functional option for TaskQueue configuration
@@ -314,6 +331,9 @@ func (q *TaskQueue) AddFirst(tasks ...task.Task) {
314331

315332
// addFirst adds new head element.
316333
func (q *TaskQueue) addFirst(tasks ...task.Task) {
334+
q.addFirstCalled.Store(true)
335+
defer q.addFirstCalled.Store(false)
336+
317337
// Also, add tasks in reverse order
318338
// at the start of the queue. The first task in HeadTasks
319339
// become the new first task in the queue.
@@ -339,6 +359,9 @@ func (q *TaskQueue) RemoveFirst() task.Task {
339359

340360
// removeFirst deletes a head element, so head is moved.
341361
func (q *TaskQueue) removeFirst() task.Task {
362+
q.removeFirstCalled.Store(true)
363+
defer q.removeFirstCalled.Store(false)
364+
342365
if q.isEmpty() {
343366
return nil
344367
}
@@ -377,6 +400,9 @@ func (q *TaskQueue) AddLast(tasks ...task.Task) {
377400
// addLast adds a new tail element.
378401
// It implements the merging logic for HookRun tasks by scanning the whole queue.
379402
func (q *TaskQueue) addLast(tasks ...task.Task) {
403+
q.addLastCalled.Store(true)
404+
defer q.addLastCalled.Store(false)
405+
380406
for _, t := range tasks {
381407
q.lazydebug("adding task to queue", func() []any {
382408
return []any{
@@ -683,6 +709,9 @@ func (q *TaskQueue) RemoveLast() task.Task {
683709

684710
// removeLast deletes a tail element, so tail is moved.
685711
func (q *TaskQueue) removeLast() task.Task {
712+
q.removeLastCalled.Store(true)
713+
defer q.removeLastCalled.Store(false)
714+
686715
element := q.items.Back()
687716
t := q.items.Remove(element)
688717
delete(q.idIndex, t.GetId())
@@ -745,6 +774,9 @@ func (q *TaskQueue) AddAfter(id string, tasks ...task.Task) {
745774

746775
// addAfter inserts a task after the task with specified id.
747776
func (q *TaskQueue) addAfter(id string, tasks ...task.Task) {
777+
q.addAfterCalled.Store(true)
778+
defer q.addAfterCalled.Store(false)
779+
748780
if element, ok := q.idIndex[id]; ok {
749781
// Insert new tasks right after the id task in reverse order.
750782
for i := len(tasks) - 1; i >= 0; i-- {
@@ -767,6 +799,9 @@ func (q *TaskQueue) AddBefore(id string, newTask task.Task) {
767799

768800
// addBefore inserts a task before the task with specified id.
769801
func (q *TaskQueue) addBefore(id string, newTask task.Task) {
802+
q.addBeforeCalled.Store(true)
803+
defer q.addBeforeCalled.Store(false)
804+
770805
if element, ok := q.idIndex[id]; ok {
771806
newElement := q.items.InsertBefore(newTask, element)
772807
q.idIndex[newTask.GetId()] = newElement
@@ -788,6 +823,9 @@ func (q *TaskQueue) Remove(id string) task.Task {
788823
}
789824

790825
func (q *TaskQueue) remove(id string) task.Task {
826+
q.removeCalled.Store(true)
827+
defer q.removeCalled.Store(false)
828+
791829
if element, ok := q.idIndex[id]; ok {
792830
t := q.items.Remove(element)
793831
delete(q.idIndex, id)
@@ -838,6 +876,9 @@ func (q *TaskQueue) Start(ctx context.Context) {
838876
}
839877

840878
q.withLock(func() {
879+
q.startCalled.Store(true)
880+
defer q.startCalled.Store(false)
881+
841882
if q.queueTasksCounter.IsAnyCapReached() {
842883
q.lazydebug("triggering compaction before task processing", func() []any {
843884
return []any{slog.String("queue", q.Name), slog.String("task_id", t.GetId()), slog.String("task_type", string(t.GetType())), slog.Int("queue_length", q.items.Len())}
@@ -892,6 +933,9 @@ func (q *TaskQueue) Start(ctx context.Context) {
892933
case Success, Keep:
893934
// Insert new tasks right after the current task in reverse order.
894935
q.withLock(func() {
936+
q.startCalled.Store(true)
937+
defer q.startCalled.Store(false)
938+
895939
q.addAfter(t.GetId(), taskRes.GetAfterTasks()...)
896940

897941
if taskRes.Status == Success {
@@ -1070,8 +1114,22 @@ func (q *TaskQueue) IterateSnapshot(doFn func(task.Task)) {
10701114

10711115
defer q.MeasureActionTime("IterateSnapshot")()
10721116

1117+
// Log all method call tracking bools in one warn log
1118+
q.logger.Warn("GetSnapshot method call tracking",
1119+
slog.Bool("addFirstCalled", q.addFirstCalled.Load()),
1120+
slog.Bool("removeFirstCalled", q.removeFirstCalled.Load()),
1121+
slog.Bool("addLastCalled", q.addLastCalled.Load()),
1122+
slog.Bool("removeLastCalled", q.removeLastCalled.Load()),
1123+
slog.Bool("addAfterCalled", q.addAfterCalled.Load()),
1124+
slog.Bool("addBeforeCalled", q.addBeforeCalled.Load()),
1125+
slog.Bool("removeCalled", q.removeCalled.Load()),
1126+
slog.Bool("deleteFuncCalled", q.deleteFuncCalled.Load()),
1127+
slog.Bool("startCalled", q.startCalled.Load()),
1128+
)
1129+
10731130
// Create snapshot under lock
10741131
snapshot := q.GetSnapshot()
1132+
q.logger.Warn("GetSnapshot done")
10751133

10761134
defer func() {
10771135
if r := recover(); r != nil {
@@ -1112,6 +1170,9 @@ func (q *TaskQueue) DeleteFunc(fn func(task.Task) bool) {
11121170
defer q.MeasureActionTime("Filter")()
11131171

11141172
q.withLock(func() {
1173+
q.deleteFuncCalled.Store(true)
1174+
defer q.deleteFuncCalled.Store(false)
1175+
11151176
for e := q.items.Front(); e != nil; {
11161177
current := e
11171178
e = e.Next()
@@ -1156,11 +1217,49 @@ func (q *TaskQueue) String() string {
11561217
return buf.String()
11571218
}
11581219

1220+
// getGoroutineID returns the current goroutine ID as an int64
1221+
func getGoroutineID() int64 {
1222+
var buf [64]byte
1223+
n := runtime.Stack(buf[:], false)
1224+
// Parse the goroutine ID from the stack trace
1225+
// Format: "goroutine 123 [running]:"
1226+
stack := string(buf[:n])
1227+
if idx := strings.Index(stack, "goroutine "); idx >= 0 {
1228+
start := idx + len("goroutine ")
1229+
end := strings.Index(stack[start:], " ")
1230+
if end > 0 {
1231+
if id, err := strconv.ParseInt(stack[start:start+end], 10, 64); err == nil {
1232+
return id
1233+
}
1234+
}
1235+
}
1236+
return 0 // fallback
1237+
}
1238+
11591239
func (q *TaskQueue) withLock(fn func()) {
1240+
currentGID := getGoroutineID()
1241+
1242+
// Check for recursive write lock
1243+
if atomic.LoadInt64(&q.writeLockOwner) == currentGID {
1244+
q.logger.Warn("recursive write lock detected",
1245+
slog.Int64("goroutine_id", currentGID),
1246+
slog.String("queue", q.Name))
1247+
}
1248+
1249+
// Check if this goroutine already holds a read lock
1250+
if _, hasReadLock := q.readLockOwners.Load(currentGID); hasReadLock {
1251+
q.logger.Warn("write lock acquired while holding read lock (lock promotion not allowed)",
1252+
slog.Int64("goroutine_id", currentGID),
1253+
slog.String("queue", q.Name))
1254+
}
1255+
11601256
q.m.Lock()
1161-
defer q.m.Unlock()
1257+
atomic.StoreInt64(&q.writeLockOwner, currentGID)
11621258

11631259
defer func() {
1260+
atomic.StoreInt64(&q.writeLockOwner, 0)
1261+
q.m.Unlock()
1262+
11641263
if r := recover(); r != nil {
11651264
q.logger.Warn("panic recovered in withLock", slog.Any("error", r))
11661265
}
@@ -1170,10 +1269,29 @@ func (q *TaskQueue) withLock(fn func()) {
11701269
}
11711270

11721271
func (q *TaskQueue) withRLock(fn func()) {
1272+
currentGID := getGoroutineID()
1273+
1274+
// Check for recursive read lock
1275+
if _, hasReadLock := q.readLockOwners.Load(currentGID); hasReadLock {
1276+
q.logger.Warn("recursive read lock detected",
1277+
slog.Int64("goroutine_id", currentGID),
1278+
slog.String("queue", q.Name))
1279+
}
1280+
1281+
// Check if this goroutine already holds the write lock
1282+
if atomic.LoadInt64(&q.writeLockOwner) == currentGID {
1283+
q.logger.Warn("read lock acquired while holding write lock",
1284+
slog.Int64("goroutine_id", currentGID),
1285+
slog.String("queue", q.Name))
1286+
}
1287+
11731288
q.m.RLock()
1174-
defer q.m.RUnlock()
1289+
q.readLockOwners.Store(currentGID, true)
11751290

11761291
defer func() {
1292+
q.readLockOwners.Delete(currentGID)
1293+
q.m.RUnlock()
1294+
11771295
if r := recover(); r != nil {
11781296
q.logger.Warn("panic recovered in withRLock", slog.Any("error", r))
11791297
}

0 commit comments

Comments
 (0)