Skip to content

Commit 8d297c6

Browse files
authored
reuse existing task definitions (#97)
1 parent 4843514 commit 8d297c6

1 file changed

Lines changed: 94 additions & 29 deletions

File tree

  • internal/pkg/object/command/ecs

internal/pkg/object/command/ecs/ecs.go

Lines changed: 94 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ import (
2727
type commandContext struct {
2828
TaskDefinitionTemplate string `yaml:"task_definition_template,omitempty" json:"task_definition_template,omitempty"`
2929
TaskCount int `yaml:"task_count,omitempty" json:"task_count,omitempty"`
30+
TaskFamily *string `yaml:"task_family,omitempty" json:"task_family,omitempty"`
31+
TaskRevision *int `yaml:"task_revision,omitempty" json:"task_revision,omitempty"`
32+
UseLatestRevision bool `yaml:"use_latest_revision,omitempty" json:"use_latest_revision,omitempty"`
3033
CPU int `yaml:"cpu,omitempty" json:"cpu,omitempty"`
3134
Memory int `yaml:"memory,omitempty" json:"memory,omitempty"`
3235
ContainerOverrides []types.ContainerOverride `yaml:"container_overrides,omitempty" json:"container_overrides,omitempty"`
@@ -83,6 +86,9 @@ type FailureReason string
8386
// executionContext holds the final resolved configuration for job execution.
8487
type executionContext struct {
8588
TaskCount int `json:"task_count"`
89+
TaskFamily *string `json:"task_family,omitempty"`
90+
TaskRevision *int `json:"task_revision,omitempty"`
91+
UseLatestRevision bool `json:"use_latest_revision,omitempty"`
8692
CPU int `json:"cpu"`
8793
Memory int `json:"memory"`
8894
TaskDefinitionWrapper *taskDefinitionWrapper `json:"task_definition_wrapper"`
@@ -152,8 +158,8 @@ func (e *commandContext) Execute(ctx context.Context, r *plugin.Runtime, job *jo
152158
return err
153159
}
154160

155-
// register task definition
156-
if err := execCtx.registerTaskDefinition(ctx); err != nil {
161+
// select or register task definition
162+
if err := execCtx.prepareTaskDefinition(ctx); err != nil {
157163
return err
158164
}
159165

@@ -182,8 +188,39 @@ func (e *commandContext) Execute(ctx context.Context, r *plugin.Runtime, job *jo
182188

183189
}
184190

185-
// prepare and register task definition with ECS
186-
func (execCtx *executionContext) registerTaskDefinition(ctx context.Context) error {
191+
// select or register task definition
192+
func (execCtx *executionContext) prepareTaskDefinition(ctx context.Context) error {
193+
194+
// Optionally override task family from config
195+
if execCtx.TaskFamily != nil {
196+
execCtx.TaskDefinitionWrapper.TaskDefinition.Family = aws.String(strings.TrimSpace(*execCtx.TaskFamily))
197+
}
198+
199+
family := aws.ToString(execCtx.TaskDefinitionWrapper.TaskDefinition.Family)
200+
201+
// If configured to reuse an existing task definition, do that first.
202+
if execCtx.UseLatestRevision {
203+
taskDefListOutput, err := execCtx.ecsClient.ListTaskDefinitions(ctx, &ecs.ListTaskDefinitionsInput{
204+
FamilyPrefix: aws.String(family),
205+
Status: types.TaskDefinitionStatusActive,
206+
Sort: types.SortOrderDesc,
207+
MaxResults: aws.Int32(1),
208+
})
209+
210+
if err != nil {
211+
return err
212+
}
213+
214+
if len(taskDefListOutput.TaskDefinitionArns) > 0 {
215+
return execCtx.adoptTaskDefinition(ctx, taskDefListOutput.TaskDefinitionArns[0])
216+
}
217+
}
218+
219+
if execCtx.TaskRevision != nil {
220+
return execCtx.adoptTaskDefinition(ctx, fmt.Sprintf("%s:%d", family, *execCtx.TaskRevision))
221+
}
222+
223+
// if you are here, you need to register a new task definition
187224
registerInput := &ecs.RegisterTaskDefinitionInput{
188225
Family: aws.String(aws.ToString(execCtx.TaskDefinitionWrapper.TaskDefinition.Family)),
189226
RequiresCompatibilities: []types.Compatibility{types.CompatibilityFargate},
@@ -206,6 +243,52 @@ func (execCtx *executionContext) registerTaskDefinition(ctx context.Context) err
206243

207244
}
208245

246+
func (execCtx *executionContext) adoptTaskDefinition(ctx context.Context, taskDefinitionArn string) error {
247+
248+
taskDefOutput, err := execCtx.ecsClient.DescribeTaskDefinition(ctx, &ecs.DescribeTaskDefinitionInput{
249+
TaskDefinition: aws.String(taskDefinitionArn),
250+
})
251+
if err != nil || taskDefOutput.TaskDefinition == nil {
252+
return fmt.Errorf("failed to describe task definition: %w", err)
253+
}
254+
255+
execCtx.taskDefARN = taskDefOutput.TaskDefinition.TaskDefinitionArn
256+
execCtx.TaskDefinitionWrapper = newTaskDefinitionWrapper(taskDefOutput.TaskDefinition)
257+
258+
return nil
259+
260+
}
261+
262+
func newTaskDefinitionWrapper(taskDef *types.TaskDefinition) *taskDefinitionWrapper {
263+
264+
// Pre-compute essential containers map
265+
essentialContainers := make(map[string]bool)
266+
for _, containerDef := range taskDef.ContainerDefinitions {
267+
if containerDef.Essential != nil && *containerDef.Essential {
268+
essentialContainers[aws.ToString(containerDef.Name)] = true
269+
}
270+
}
271+
272+
// Pre-compute containers with log configurations
273+
var logGroups []containerLogInfo
274+
for _, containerDef := range taskDef.ContainerDefinitions {
275+
if containerDef.LogConfiguration != nil {
276+
logGroups = append(logGroups, containerLogInfo{
277+
containerName: aws.ToString(containerDef.Name),
278+
logDriver: containerDef.LogConfiguration.LogDriver,
279+
options: containerDef.LogConfiguration.Options,
280+
})
281+
}
282+
}
283+
284+
return &taskDefinitionWrapper{
285+
TaskDefinition: taskDef,
286+
EssentialContainers: essentialContainers,
287+
LogGroups: logGroups,
288+
}
289+
290+
}
291+
209292
// startTasks launches all tasks and returns a map of task trackers
210293
func (execCtx *executionContext) startTasks(ctx context.Context, jobID string) error {
211294

@@ -224,6 +307,7 @@ func (execCtx *executionContext) startTasks(ctx context.Context, jobID string) e
224307
}
225308

226309
return nil
310+
227311
}
228312

229313
// monitor tasks until completion, faliure, or timeout
@@ -424,6 +508,11 @@ func validateExecutionContext(ctx *executionContext) error {
424508
return fmt.Errorf("memory (%d) needs to be greater than 0 and less than or equal to cluster max memory (%d)", ctx.Memory, ctx.ClusterConfig.MaxMemory)
425509
}
426510

511+
// Task definition selection business logic.
512+
if ctx.UseLatestRevision && ctx.TaskRevision != nil {
513+
return fmt.Errorf("use_latest_revision and task_revision cannot both be set")
514+
}
515+
427516
return nil
428517

429518
}
@@ -513,31 +602,7 @@ func loadTaskDefinitionTemplate(templatePath string) (*taskDefinitionWrapper, er
513602
return nil, err
514603
}
515604

516-
// Pre-compute essential containers map
517-
essentialContainers := make(map[string]bool)
518-
for _, containerDef := range taskDef.ContainerDefinitions {
519-
if containerDef.Essential != nil && *containerDef.Essential {
520-
essentialContainers[aws.ToString(containerDef.Name)] = true
521-
}
522-
}
523-
524-
// Pre-compute containers with log configurations
525-
var logGroups []containerLogInfo
526-
for _, containerDef := range taskDef.ContainerDefinitions {
527-
if containerDef.LogConfiguration != nil {
528-
logGroups = append(logGroups, containerLogInfo{
529-
containerName: aws.ToString(containerDef.Name),
530-
logDriver: containerDef.LogConfiguration.LogDriver,
531-
options: containerDef.LogConfiguration.Options,
532-
})
533-
}
534-
}
535-
536-
return &taskDefinitionWrapper{
537-
TaskDefinition: &taskDef,
538-
EssentialContainers: essentialContainers,
539-
LogGroups: logGroups,
540-
}, nil
605+
return newTaskDefinitionWrapper(&taskDef), nil
541606

542607
}
543608

0 commit comments

Comments
 (0)