Skip to content

Adding a MaxConcurrentTasks flag to the self hosted worker #27

Open
ianhodge wants to merge 6 commits intomainfrom
03-02-ian_adding_max_concurrent_limit
Open

Adding a MaxConcurrentTasks flag to the self hosted worker #27
ianhodge wants to merge 6 commits intomainfrom
03-02-ian_adding_max_concurrent_limit

Conversation

@ianhodge
Copy link
Member

@ianhodge ianhodge commented Mar 3, 2026

WISOTT

Default to 0 for existing behavior

Copy link
Member Author

ianhodge commented Mar 3, 2026

This stack of pull requests is managed by Graphite. Learn more about stacking.

@ianhodge ianhodge changed the title ian/adding_max_concurrent_limit Adding a MaxConcurrentTasks flag to the self hosted worker Mar 3, 2026
@ianhodge ianhodge requested review from acarl005 and bnavetta March 3, 2026 02:00
@ianhodge ianhodge marked this pull request as ready for review March 3, 2026 02:00
sendChan chan []byte
activeTasks map[string]context.CancelFunc
tasksMutex sync.Mutex
taskSemaphore chan struct{} // nil when unlimited; buffered channel used as counting semaphore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably use https://pkg.go.dev/golang.org/x/sync/semaphore since it handles context cancellation for free.

That'd also let us use TryAcquire rather than Acquire, so that if a self-hosted worker is at capacity, the task can more quickly get routed to another worker or requeued server-side

NoCleanup: CLI.NoCleanup,
Volumes: CLI.Volumes,
Env: envMap,
APIKey: CLI.APIKey,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're probably gonna want a config file format pretty soon

@ianhodge ianhodge requested a review from bnavetta March 3, 2026 18:00
if w.taskSemaphore != nil {
if !w.taskSemaphore.TryAcquire(1) {
log.Warnf(w.ctx, "At max concurrency (%d), rejecting task: taskID=%s", w.config.MaxConcurrentTasks, assignment.TaskID)
if err := w.sendTaskFailed(assignment.TaskID, "worker at maximum concurrency"); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will mark the task as failed and unretryable, right? I think we need a new message type that signals "I will not claim this task"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants