forked from a2aproject/a2a-python
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy patha2a_db_cli.py
More file actions
164 lines (139 loc) · 5.28 KB
/
a2a_db_cli.py
File metadata and controls
164 lines (139 loc) · 5.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import argparse
import logging
import os
from importlib.resources import files
try:
from alembic import command
from alembic.config import Config
except ImportError as e:
raise ImportError(
"CLI requires Alembic. Install with: 'pip install a2a-sdk[db-cli]'."
) from e
def _add_shared_args(
parser: argparse.ArgumentParser, is_sub: bool = False
) -> None:
"""Add common arguments to the given parser."""
prefix = 'sub_' if is_sub else ''
parser.add_argument(
'--database-url',
dest=f'{prefix}database_url',
help='Database URL to use for the migrations. If not set, the DATABASE_URL environment variable will be used.',
)
parser.add_argument(
'--tasks-table',
dest=f'{prefix}tasks_table',
help='Custom tasks table to update. If not set, the default is "tasks".',
)
parser.add_argument(
'--push-notification-configs-table',
dest=f'{prefix}push_notification_configs_table',
help='Custom push notification configs table to update. If not set, the default is "push_notification_configs".',
)
parser.add_argument(
'-v',
'--verbose',
dest=f'{prefix}verbose',
help='Enable verbose output (sets sqlalchemy.engine logging to INFO)',
action='store_true',
)
parser.add_argument(
'--sql',
dest=f'{prefix}sql',
help='Run migrations in sql mode (generate SQL instead of executing)',
action='store_true',
)
def create_parser() -> argparse.ArgumentParser:
"""Create the argument parser for the migration tool."""
parser = argparse.ArgumentParser(description='A2A Database Migration Tool')
# Global options
parser.add_argument(
'--add_columns_owner_last_updated-default-owner',
dest='owner',
help="Value for the 'owner' column (used in specific migrations). If not set defaults to 'legacy_v03_no_user_info'",
)
_add_shared_args(parser)
subparsers = parser.add_subparsers(dest='cmd', help='Migration command')
# Upgrade command
up_parser = subparsers.add_parser(
'upgrade', help='Upgrade to a later version'
)
up_parser.add_argument(
'revision',
nargs='?',
default='head',
help='Revision target (default: head)',
)
up_parser.add_argument(
'--add_columns_owner_last_updated-default-owner',
dest='sub_owner',
help="Value for the 'owner' column (used in specific migrations). If not set defaults to 'legacy_v03_no_user_info'",
)
_add_shared_args(up_parser, is_sub=True)
# Downgrade command
down_parser = subparsers.add_parser(
'downgrade', help='Revert to a previous version'
)
down_parser.add_argument(
'revision',
nargs='?',
default='base',
help='Revision target (e.g., -1, base or a specific ID)',
)
_add_shared_args(down_parser, is_sub=True)
# Current command
current_parser = subparsers.add_parser(
'current', help='Display the current revision for a database'
)
_add_shared_args(current_parser, is_sub=True)
return parser
def run_migrations() -> None:
"""CLI tool to manage database migrations."""
# Configure logging to show INFO messages
logging.basicConfig(level=logging.INFO, format='%(levelname)s %(message)s')
parser = create_parser()
args = parser.parse_args()
# Default to upgrade head if no command is provided
if not args.cmd:
args.cmd = 'upgrade'
args.revision = 'head'
# Locate the bundled alembic.ini
ini_path = files('a2a').joinpath('alembic.ini')
cfg = Config(str(ini_path))
# Dynamically set the script location
migrations_path = files('a2a').joinpath('migrations')
cfg.set_main_option('script_location', str(migrations_path))
# Consolidate owner, db_url, tables, verbose and sql values
owner = args.owner or getattr(args, 'sub_owner', None)
db_url = args.database_url or getattr(args, 'sub_database_url', None)
task_table = args.tasks_table or getattr(args, 'sub_tasks_table', None)
push_notification_configs_table = (
args.push_notification_configs_table
or getattr(args, 'sub_push_notification_configs_table', None)
)
verbose = args.verbose or getattr(args, 'sub_verbose', False)
sql = args.sql or getattr(args, 'sub_sql', False)
# Pass custom arguments to the migration context
if owner:
cfg.set_main_option(
'add_columns_owner_last_updated_default_owner', owner
)
if db_url:
os.environ['DATABASE_URL'] = db_url
if task_table:
cfg.set_main_option('tasks_table', task_table)
if push_notification_configs_table:
cfg.set_main_option(
'push_notification_configs_table', push_notification_configs_table
)
if verbose:
cfg.set_main_option('verbose', 'true')
# Execute the requested command
if args.cmd == 'upgrade':
logging.info('Upgrading database to %s', args.revision)
command.upgrade(cfg, args.revision, sql=sql)
elif args.cmd == 'downgrade':
logging.info('Downgrading database to %s', args.revision)
command.downgrade(cfg, args.revision, sql=sql)
elif args.cmd == 'current':
command.current(cfg, verbose=verbose)
logging.info('Done.')