Skip to content

feat: Implement a vertex based task store for the 1.0 branch#791

Draft
gaborfeher wants to merge 2 commits intoa2aproject:1.0-devfrom
gaborfeher:1.0-dev
Draft

feat: Implement a vertex based task store for the 1.0 branch#791
gaborfeher wants to merge 2 commits intoa2aproject:1.0-devfrom
gaborfeher:1.0-dev

Conversation

@gaborfeher
Copy link
Collaborator

For #751

@gaborfeher gaborfeher requested a review from a team as a code owner March 9, 2026 13:23
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request integrates the system with Google Cloud's Vertex AI Agent Engine by introducing a new task storage implementation. It provides the necessary components to save, retrieve, and update tasks directly within Vertex AI, enhancing the platform's capabilities for managing AI-driven workflows. The changes include core logic for data conversion and a robust testing suite to ensure reliability.

Highlights

  • New Vertex AI Task Store: Implemented a VertexTaskStore to enable persistence and management of tasks using the Google Cloud Vertex AI Agent Engine.
  • Type Conversion Utilities: Introduced vertex_task_converter.py to facilitate seamless conversion between internal SDK task types and Vertex AI's specific data structures.
  • Dependency Management: Added google-cloud-aiplatform as a new optional dependency under the vertex extra, and updated the all extra to include it.
  • Comprehensive Testing: Included new test files and a dedicated shell script to ensure the correct functionality and integration of the Vertex AI task store and its conversion logic.
Changelog
  • pyproject.toml
    • Added google-cloud-aiplatform as a vertex optional dependency.
    • Updated the all optional dependency to include vertex.
    • Adjusted Python version resolution markers.
  • src/a2a/contrib/tasks/vertex_task_converter.py
    • Added a new module for converting between SDK Task and Artifact types and Vertex AI A2aTask and TaskArtifact types.
  • src/a2a/contrib/tasks/vertex_task_store.py
    • Added a new VertexTaskStore class that implements the TaskStore interface using the Vertex AI Agent Engine.
    • Provided methods for saving, retrieving, and updating tasks, including handling state, metadata, and artifact changes.
  • tck/sut_agent_with_vertex_task_store.py
    • Added a new test agent script to demonstrate serving with the VertexTaskStore, requiring Vertex AI environment variables.
  • tests/contrib/tasks/fake_vertex_client.py
    • Added a mock implementation of the Vertex AI client for unit testing the VertexTaskStore without actual cloud calls.
  • tests/contrib/tasks/run_vertex_tests.sh
    • Added a shell script to execute Vertex AI related tests, checking for necessary environment variables.
  • tests/contrib/tasks/test_vertex_task_converter.py
    • Added comprehensive unit tests for the vertex_task_converter module, covering state, part, artifact, and task conversions.
  • tests/contrib/tasks/test_vertex_task_store.py
    • Added unit and integration tests for the VertexTaskStore, including saving, retrieving, updating tasks, and handling metadata and artifacts.
  • uv.lock
    • Updated the lock file to reflect new dependencies and adjusted Python version constraints.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new TaskStore implementation using Google Cloud Vertex AI, along with the necessary data converters and tests. The implementation is well-structured and includes comprehensive tests for both fake and real backends. My feedback includes suggestions for improving code readability and maintainability, such as refactoring a complex method to use list comprehensions, using f-strings for path construction, and improving test setup practices. I've also noted a few minor points regarding docstrings and exception handling, and added a style guide reference for import placement.

Note: Security Review did not run due to the size of the PR.

Comment on lines +126 to +145
for artifact in previous_task_artifacts.values():
if artifact.artifact_id not in task_artifacts:
if not task_artifact_change.deleted_artifact_ids:
task_artifact_change.deleted_artifact_ids = []
task_artifact_change.deleted_artifact_ids.append(
artifact.artifact_id
)
for artifact in task_artifacts.values():
if artifact.artifact_id not in previous_task_artifacts:
if not task_artifact_change.added_artifacts:
task_artifact_change.added_artifacts = []
task_artifact_change.added_artifacts.append(
vertex_task_converter.to_stored_artifact(artifact)
)
elif artifact != previous_task_artifacts[artifact.artifact_id]:
if not task_artifact_change.updated_artifacts:
task_artifact_change.updated_artifacts = []
task_artifact_change.updated_artifacts.append(
vertex_task_converter.to_stored_artifact(artifact)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for detecting artifact changes can be made more efficient and readable by using list comprehensions. This avoids repeated checks and list initializations inside loops.

            deleted_ids = [
                artifact.artifact_id
                for artifact in previous_task_artifacts.values()
                if artifact.artifact_id not in task_artifacts
            ]
            if deleted_ids:
                task_artifact_change.deleted_artifact_ids = deleted_ids

            added_artifacts = [
                vertex_task_converter.to_stored_artifact(artifact)
                for artifact in task_artifacts.values()
                if artifact.artifact_id not in previous_task_artifacts
            ]
            if added_artifacts:
                task_artifact_change.added_artifacts = added_artifacts

            updated_artifacts = [
                vertex_task_converter.to_stored_artifact(artifact)
                for artifact_id, artifact in task_artifacts.items()
                if artifact_id in previous_task_artifacts
                and artifact != previous_task_artifacts[artifact_id]
            ]
            if updated_artifacts:
                task_artifact_change.updated_artifacts = updated_artifacts

if not events:
return
await self._client.aio.agent_engines.a2a_tasks.events.append(
name=self._agent_engine_resource_id + '/a2aTasks/' + task.id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

Using an f-string for constructing the resource name is more readable and idiomatic than string concatenation. This also applies on line 190.

Suggested change
name=self._agent_engine_resource_id + '/a2aTasks/' + task.id,
name=f'{self._agent_engine_resource_id}/a2aTasks/{task.id}',

params: ListTasksRequest,
context: ServerCallContext | None = None,
) -> ListTasksResponse:
"""Retrieves a list of tasks from the store."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The list method raises NotImplementedError. It would be helpful to update the docstring to explain why, similar to how it's done for the delete method. This clarifies whether the feature is unsupported by the backend or planned for future implementation.

Suggested change
"""Retrieves a list of tasks from the store."""
"""The backend doesn't support listing tasks, so this is not implemented."""

try:
del task.output.artifacts[:]
task.output.artifacts.extend(current_artifacts)
except Exception:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

Catching a broad Exception can hide unexpected errors during testing. It's better to catch more specific exceptions. If the goal is to handle cases where task.output.artifacts might not be a list that supports item deletion, consider catching AttributeError or TypeError instead.

'VERTEX_API_VERSION',
]
)
import sys
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

According to PEP 8, imports should be at the top of the file. Please move import sys to the top with the other imports.

References
  1. PEP 8 recommends that imports are always placed at the top of the file, just after any module comments and docstrings, and before module globals and constants. (link)

Comment on lines +117 to +118
sys.path.append(os.path.dirname(__file__))
from fake_vertex_client import FakeVertexClient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

Modifying sys.path at runtime can make tests less predictable and harder to maintain. Consider making the tests/contrib/tasks directory a Python package by adding an empty __init__.py file. This would allow you to use a relative import like from .fake_vertex_client import FakeVertexClient without manipulating the path.

@gaborfeher gaborfeher force-pushed the 1.0-dev branch 3 times, most recently from 1d6e5c0 to 2f2ffd5 Compare March 9, 2026 13:41
@gaborfeher gaborfeher marked this pull request as draft March 9, 2026 14:15
@gaborfeher gaborfeher force-pushed the 1.0-dev branch 13 times, most recently from 5baa501 to 7c2456b Compare March 10, 2026 10:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant