forked from google/adk-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbase_plugin.py
More file actions
394 lines (322 loc) · 13.2 KB
/
base_plugin.py
File metadata and controls
394 lines (322 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may in obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from abc import ABC
from typing import Any
from typing import Optional
from typing import TYPE_CHECKING
from typing import TypeVar
from google.genai import types
from ..agents.base_agent import BaseAgent
from ..agents.callback_context import CallbackContext
from ..events.event import Event
from ..models.llm_request import LlmRequest
from ..models.llm_response import LlmResponse
from ..tools.base_tool import BaseTool
if TYPE_CHECKING:
from ..agents.invocation_context import InvocationContext
from ..tools.tool_context import ToolContext
# Type alias: The value may or may not be awaitable, and value is optional.
T = TypeVar("T")
class BasePlugin(ABC):
"""Base class for creating plugins.
Plugins provide a structured way to intercept and modify agent, tool, and
LLM behaviors at critical execution points in a callback manner. While agent
callbacks apply to a particular agent, plugins applies globally to all
agents added in the runner. Plugins are best used for adding custom behaviors
like logging, monitoring, caching, or modifying requests and responses at key
stages.
A plugin can implement one or more methods of callbacks, but should not
implement the same method of callback for multiple times.
Relation with [Agent callbacks](https://google.github.io/adk-docs/callbacks/):
**Execution Order**
Similar to Agent callbacks, Plugins are executed in the order they are
registered. However, Plugin and Agent Callbacks are executed sequentially,
with Plugins takes precedence over agent callbacks. When the callback in a
plugin returns a value, it will short circuit all remaining plugins and
agent callbacks, causing all remaining plugins and agent callbacks
to be skipped.
**Change Propagation**
Plugins and agent callbacks can both modify the value of the input parameters,
including agent input, tool input, and LLM request/response, etc. They work in
the exactly same way. The modifications will be visible and passed to the next
callback in the chain. For example, if a plugin modifies the tool input with
before_tool_callback, the modified tool input will be passed to the
before_tool_callback of the next plugin, and further passed to the agent
callbacks if not short-circuited.
To use a plugin, implement the desired callback methods and pass an instance
of your custom plugin class to the ADK Runner.
Examples:
A simple plugin that logs every tool call.
>>> class ToolLoggerPlugin(BasePlugin):
.. def __init__(self):
.. super().__init__(name="tool_logger")
..
.. async def before_tool_callback(
.. self, *, tool: BaseTool, tool_args: dict[str, Any],
tool_context:
ToolContext
.. ):
.. print(f"[{self.name}] Calling tool '{tool.name}' with args:
{tool_args}")
..
.. async def after_tool_callback(
.. self, *, tool: BaseTool, tool_args: dict, tool_context:
ToolContext, result: dict
.. ):
.. print(f"[{self.name}] Tool '{tool.name}' finished with result:
{result}")
..
>>> # Add the plugin to ADK Runner
>>> # runner = Runner(
>>> # ...
>>> # plugins=[ToolLoggerPlugin(), AgentPolicyPlugin()],
>>> # )
"""
def __init__(self, name: str):
"""Initializes the plugin.
Args:
name: A unique identifier for this plugin instance.
"""
super().__init__()
self.name = name
async def on_user_message_callback(
self,
*,
invocation_context: InvocationContext,
user_message: types.Content,
) -> Optional[types.Content]:
"""Callback executed when a user message is received before an invocation starts.
This callback helps logging and modifying the user message before the
runner starts the invocation.
Args:
invocation_context: The context for the entire invocation.
user_message: The message content input by user.
Returns:
An optional `types.Content` to be returned to the ADK. Returning a
value to replace the user message. Returning `None` to proceed
normally.
"""
pass
async def before_run_callback(
self, *, invocation_context: InvocationContext
) -> Optional[types.Content]:
"""Callback executed before the ADK runner runs.
This is the first callback to be called in the lifecycle, ideal for global
setup or initialization tasks.
Args:
invocation_context: The context for the entire invocation, containing
session information, the root agent, etc.
Returns:
An optional `Event` to be returned to the ADK. Returning a value to
halt execution of the runner and ends the runner with that event. Return
`None` to proceed normally.
"""
pass
async def on_event_callback(
self, *, invocation_context: InvocationContext, event: Event
) -> Optional[Event]:
"""Callback executed after an event is yielded from runner.
This is the ideal place to make modification to the event before the event
is handled by the underlying agent app.
Args:
invocation_context: The context for the entire invocation.
event: The event raised by the runner.
Returns:
An optional value. A non-`None` return may be used by the framework to
modify or replace the response. Returning `None` allows the original
response to be used.
"""
pass
async def after_run_callback(
self, *, invocation_context: InvocationContext
) -> None:
"""Callback executed after an ADK runner run has completed.
This is the final callback in the ADK lifecycle, suitable for cleanup, final
logging, or reporting tasks.
Args:
invocation_context: The context for the entire invocation.
Returns:
None
"""
pass
async def close(self) -> None:
"""Method executed when the runner is closed.
This method is used for cleanup tasks such as closing network connections
or releasing resources.
"""
pass
async def before_agent_callback(
self, *, agent: BaseAgent, callback_context: CallbackContext
) -> Optional[types.Content]:
"""Callback executed before an agent's primary logic is invoked.
This callback can be used for logging, setup, or to short-circuit the
agent's execution by returning a value.
Args:
agent: The agent that is about to run.
callback_context: The context for the agent invocation.
Returns:
An optional `types.Content` object. If a value is returned, it will bypass
the agent's callbacks and its execution, and return this value directly.
Returning `None` allows the agent to proceed normally.
"""
pass
async def after_agent_callback(
self, *, agent: BaseAgent, callback_context: CallbackContext
) -> Optional[types.Content]:
"""Callback executed after an agent's primary logic has completed.
Args:
agent: The agent that has just run.
callback_context: The context for the agent invocation.
Returns:
An optional `types.Content` object. The content to return to the user.
When the content is present, the provided content will be used as agent
response and appended to event history as agent response.
"""
pass
async def before_model_callback(
self, *, callback_context: CallbackContext, llm_request: LlmRequest
) -> Optional[LlmResponse]:
"""Callback executed before a request is sent to the model.
This provides an opportunity to inspect, log, or modify the `LlmRequest`
object. It can also be used to implement caching by returning a cached
`LlmResponse`, which would skip the actual model call.
Args:
callback_context: The context for the current agent call.
llm_request: The prepared request object to be sent to the model.
Returns:
An optional value. The interpretation of a non-`None` trigger an early
exit and returns the response immediately. Returning `None` allows the LLM
request to proceed normally.
"""
pass
async def after_model_callback(
self, *, callback_context: CallbackContext, llm_response: LlmResponse
) -> Optional[LlmResponse]:
"""Callback executed after a response is received from the model.
This is the ideal place to log model responses, collect metrics on token
usage, or perform post-processing on the raw `LlmResponse`.
Args:
callback_context: The context for the current agent call.
llm_response: The response object received from the model.
Returns:
An optional value. A non-`None` return may be used by the framework to
modify or replace the response. Returning `None` allows the original
response to be used.
"""
pass
async def on_model_error_callback(
self,
*,
callback_context: CallbackContext,
llm_request: LlmRequest,
error: Exception,
) -> Optional[LlmResponse]:
"""Callback executed when a model call encounters an error.
This callback provides an opportunity to handle model errors gracefully,
potentially providing alternative responses or recovery mechanisms.
Args:
callback_context: The context for the current agent call.
llm_request: The request that was sent to the model when the error
occurred.
error: The exception that was raised during model execution.
Returns:
An optional LlmResponse. If an LlmResponse is returned, it will be used
instead of propagating the error. Returning `None` allows the original
error to be raised.
"""
pass
async def before_tool_callback(
self,
*,
tool: BaseTool,
tool_args: dict[str, Any],
tool_context: ToolContext,
) -> Optional[dict]:
"""Callback executed before a tool is called.
This callback is useful for logging tool usage, input validation, or
modifying the arguments before they are passed to the tool.
Args:
tool: The tool instance that is about to be executed.
tool_args: The dictionary of arguments to be used for invoking the tool.
tool_context: The context specific to the tool execution.
Returns:
An optional dictionary. If a dictionary is returned, it will stop the tool
execution and return this response immediately. Returning `None` uses the
original, unmodified arguments.
"""
pass
async def after_tool_callback(
self,
*,
tool: BaseTool,
tool_args: dict[str, Any],
tool_context: ToolContext,
result: dict,
) -> Optional[dict]:
"""Callback executed after a tool has been called.
This callback allows for inspecting, logging, or modifying the result
returned by a tool.
Args:
tool: The tool instance that has just been executed.
tool_args: The original arguments that were passed to the tool.
tool_context: The context specific to the tool execution.
result: The dictionary returned by the tool invocation.
Returns:
An optional dictionary. If a dictionary is returned, it will **replace**
the original result from the tool. This allows for post-processing or
altering tool outputs. Returning `None` uses the original, unmodified
result.
"""
pass
async def on_tool_error_callback(
self,
*,
tool: BaseTool,
tool_args: dict[str, Any],
tool_context: ToolContext,
error: Exception,
) -> Optional[dict]:
"""Callback executed when a tool call encounters an error.
This callback provides an opportunity to handle tool errors gracefully,
potentially providing alternative responses or recovery mechanisms.
Args:
tool: The tool instance that encountered an error.
tool_args: The arguments that were passed to the tool.
tool_context: The context specific to the tool execution.
error: The exception that was raised during tool execution.
Returns:
An optional dictionary. If a dictionary is returned, it will be used as
the tool response instead of propagating the error. Returning `None`
allows the original error to be raised.
"""
pass
async def on_state_change_callback(
self,
*,
callback_context: CallbackContext,
state_delta: dict[str, Any],
) -> None:
"""Callback executed when an event carries state changes.
This callback is invoked after an event with a non-empty
``state_delta`` is yielded from the runner. It is observational, but
returning a non-`None` value will short-circuit subsequent plugins.
Args:
callback_context: The context for the current invocation.
state_delta: A copy of the state changes carried by the event.
Mutating this dict does not affect the original state.
Returns:
None
"""
pass