Skip to content

Go-SDK: Support TaskFlow#68311

Open
jason810496 wants to merge 1 commit into
apache:mainfrom
jason810496:refactor/go-sdk/support-taskflow-syntax
Open

Go-SDK: Support TaskFlow#68311
jason810496 wants to merge 1 commit into
apache:mainfrom
jason810496:refactor/go-sdk/support-taskflow-syntax

Conversation

@jason810496

Copy link
Copy Markdown
Member

Why

A Go task could only read upstream XComs by calling the client explicitly without this PR's changes, matching the Java SDK's TaskFlow shape (#66332).

How

  • A task parameter that is a struct (or pointer to one) whose exported fields carry an xcom:"<task_id>[,key=<key>]" tag is treated as an XCom-input struct: at execution the runtime pulls each field from the named upstream in the current dag run and decodes it into the field's type. Binding lives on the function, not the registry, so AddTask is unchanged.
  • Decoding is strict by type: a dedicated struct field rejects unknown/renamed keys (no silent zero-fill), while a map[string]any/any field decodes loosely; a null value is only accepted by a nilable field. Signature/tag mistakes fail at registration.
  • The reflection/resolution logic lives in a new leaf package pkg/binding (shared by the coordinator and Edge-worker paths), keeping bundlev1/task.go thin.

What

  • pkg/binding: new Plan / Analyze / Resolve API for parameter analysis, XCom pull, and strict decode
  • pkg/execution coordinator fixes uncovered while wiring the example end-to-end:
    • GetXCom maps a null supervisor response to sdk.XComNotFound (mirrors the HTTP 404 mapping and xcom_pull returning None).
    • request frames encode user values with json struct tags so a typed XCom round-trips
    • log records set the reserved timestamp/event/level fields last so a user attribute can't corrupt the frame the supervisor parses. (Caught by main.go example when using timestamp as XCom field name)
  • Example bundle rewritten to TaskFlow style

Was generative AI tooling used to co-author this PR?

@jason810496 jason810496 added this to the Airflow 3.3.0 milestone Jun 10, 2026
@jason810496 jason810496 self-assigned this Jun 10, 2026
@jason810496 jason810496 changed the title Go-SDK: Support TaskFlow syntax Go-SDK: Support TaskFlow Jun 10, 2026

@jason810496 jason810496 left a comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a final user interface changes before the Go-SDK beta release.
Would appreciate review and welcome any suggestion, thanks.

Comment on lines +70 to +77
// ExtractInput declares extract's XCom inputs. The `xcom:"python_task_1"` tag
// binds this field to the return_value of the upstream Python task
// `python_task_1`, so the Go task receives a Python task's output without
// calling client.GetXCom itself. The runtime pulls and decodes it before
// extract runs.
type ExtractInput struct {
FromPython string `xcom:"python_task_1"`
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No matter how many upstream tasks there are, we will only inject all the upstream XCom result as one struct.

User should define all the upstream task_id in the xcom:<task_id> (or xcom:<task_id>,<key> if the they are not using default return_value key) tag.

Other directions I had consider:

  • Define the injection at AddTask level -> This will be ambiguous for user IMO as we still need to define the Dag structure at Python side at this stage. If we make the interface like AddTask(<task_id>, <other task function handle> ... <or task id>). User will be confuse about: Are we defining the edge in Go slide instead of Python side.

Comment on lines +88 to +92
// TransformResult is transform's return value.
type TransformResult struct {
Variable string `json:"variable"`
Extracted ExtractResult `json:"extracted"`
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the return type, user should define the all the fields with json tag.

"github.com/apache/airflow/go-sdk/pkg/sdkcontext"
"github.com/apache/airflow/go-sdk/sdk"
)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract most of the reflect and parameter binding into a new binding module to not overwhelm the task module.

// xcom_pull yields None when the key is missing), so surface it as
// not-found, matching how the HTTP client maps a 404 and how
// GetVariable handles the same condition above.
return nil, fmt.Errorf("%w: %q", sdk.XComNotFound, key)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentionally raise as error or it will show as decoding error even if the XCom not found.

@uranusjr

Copy link
Copy Markdown
Member

This is probably not tied to 3.3.0 since there’s not change needed to Airflow? Go SDK is released separately.

@jason810496

jason810496 commented Jun 10, 2026

Copy link
Copy Markdown
Member Author

This is probably not tied to 3.3.0 since there’s not change needed to Airflow? Go SDK is released separately.

We use 3.3.0 milestone to keep track of the AIP-108 progress (we don't have a specific milestone for Go and Java SDK). I add this to milestone as it would be better to support TaskFlow in the initial release that align with Java SDK.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

Support Taskflow-like syntax in the Go SDK

2 participants