Skip to content

Commit ef5c48d

Browse files
authored
Merge pull request #19 from BukeLy/feat/user-whitelist-security
feat: add user whitelist security module
2 parents ae87300 + 328de52 commit ef5c48d

5 files changed

Lines changed: 453 additions & 7 deletions

File tree

agent-sdk-client/config.py

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,27 @@ def extract_command(text: Optional[str]) -> Optional[str]:
2828
return command
2929

3030

31-
def _load_config(config_path: Path = DEFAULT_CONFIG_PATH) -> tuple[list[str], dict[str, str]]:
32-
"""Load agent/local commands from TOML config file."""
31+
def _load_config(config_path: Path = DEFAULT_CONFIG_PATH) -> tuple[list[str], dict[str, str], list[int | str]]:
32+
"""Load commands and security config from TOML config file.
33+
34+
Returns:
35+
Tuple of (agent_commands, local_commands, user_whitelist).
36+
"""
3337
if not config_path.exists():
34-
return [], {}
38+
return [], {}, ['all']
3539

3640
try:
3741
with config_path.open('rb') as f:
3842
data = tomllib.load(f)
43+
44+
# Load agent commands
3945
agent_commands = data.get('agent_commands', {}).get('commands', [])
4046
if not isinstance(agent_commands, list):
4147
logger.warning("Agent commands config is not a list; ignoring configuration")
4248
agent_commands = []
4349
agent_commands = [cmd for cmd in agent_commands if isinstance(cmd, str)]
4450

51+
# Load local commands
4552
local_commands_raw = data.get('local_commands', {})
4653
if not isinstance(local_commands_raw, dict):
4754
logger.warning("Local commands config is not a table; ignoring configuration")
@@ -52,10 +59,28 @@ def _load_config(config_path: Path = DEFAULT_CONFIG_PATH) -> tuple[list[str], di
5259
if isinstance(name, str) and isinstance(value, str)
5360
}
5461

55-
return agent_commands, local_commands
62+
# Load security whitelist
63+
security = data.get('security', {})
64+
whitelist = security.get('user_whitelist', ['all'])
65+
if not isinstance(whitelist, list):
66+
logger.warning("user_whitelist is not a list; using default ['all']")
67+
whitelist = ['all']
68+
else:
69+
validated = []
70+
for item in whitelist:
71+
if item == 'all':
72+
validated.append('all')
73+
elif isinstance(item, int):
74+
validated.append(item)
75+
else:
76+
logger.warning(f"Invalid whitelist entry: {item}; skipping")
77+
whitelist = validated if validated else ['all']
78+
79+
return agent_commands, local_commands, whitelist
80+
5681
except (OSError, tomllib.TOMLDecodeError) as exc: # pragma: no cover - defensive logging
57-
logger.warning("Failed to load command configuration: %s", exc)
58-
return [], {}
82+
logger.warning("Failed to load configuration: %s", exc)
83+
return [], {}, ['all']
5984

6085

6186
@dataclass
@@ -68,18 +93,20 @@ class Config:
6893
queue_url: str
6994
agent_commands: list[str]
7095
local_commands: dict[str, str]
96+
user_whitelist: list[int | str]
7197

7298
@classmethod
7399
def from_env(cls, config_path: Optional[Path] = None) -> 'Config':
74100
"""Load configuration from environment variables."""
75-
agent_cmds, local_cmds = _load_config(config_path or DEFAULT_CONFIG_PATH)
101+
agent_cmds, local_cmds, whitelist = _load_config(config_path or DEFAULT_CONFIG_PATH)
76102
return cls(
77103
telegram_token=os.getenv('TELEGRAM_BOT_TOKEN', ''),
78104
agent_server_url=os.getenv('AGENT_SERVER_URL', ''),
79105
auth_token=os.getenv('SDK_CLIENT_AUTH_TOKEN', 'default-token'),
80106
queue_url=os.getenv('QUEUE_URL', ''),
81107
agent_commands=agent_cmds,
82108
local_commands=local_cmds,
109+
user_whitelist=whitelist,
83110
)
84111

85112
def get_command(self, text: Optional[str]) -> Optional[str]:

agent-sdk-client/config.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,9 @@ commands = [
88
[local_commands]
99
# Local-only commands handled by the client
1010
help = "Hello World"
11+
12+
[security]
13+
# User IDs allowed to add bot to groups and send private messages.
14+
# Use ["all"] to allow everyone (default behavior).
15+
# Example with specific users: user_whitelist = [123456789, 987654321]
16+
user_whitelist = ["all"]

agent-sdk-client/handler.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from telegram import Bot, Update
1313

1414
from config import Config
15+
from security import is_user_allowed, should_leave_group
1516

1617
logger = logging.getLogger()
1718
logger.setLevel(logging.INFO)
@@ -172,11 +173,35 @@ def lambda_handler(event: dict, context: Any) -> dict:
172173
logger.debug('Ignoring non-update webhook')
173174
return {'statusCode': 200}
174175

176+
# Handle my_chat_member event (bot added to group)
177+
if update.my_chat_member:
178+
if should_leave_group(update, config.user_whitelist):
179+
chat_id = update.my_chat_member.chat.id
180+
inviter_id = update.my_chat_member.from_user.id
181+
asyncio.run(bot.leave_chat(chat_id))
182+
logger.info(
183+
f"Left unauthorized group",
184+
extra={'chat_id': chat_id, 'inviter_id': inviter_id},
185+
)
186+
_send_metric('SecurityBlock.UnauthorizedGroup')
187+
return {'statusCode': 200}
188+
175189
message = update.message or update.edited_message
176190
if not message or not message.text:
177191
logger.debug('Ignoring webhook without text message')
178192
return {'statusCode': 200}
179193

194+
# Check private message whitelist
195+
if message.chat.type == 'private':
196+
user_id = message.from_user.id if message.from_user else None
197+
if user_id and not is_user_allowed(user_id, config.user_whitelist):
198+
logger.info(
199+
f"Blocked private message from unauthorized user",
200+
extra={'user_id': user_id},
201+
)
202+
_send_metric('SecurityBlock.UnauthorizedPrivate')
203+
return {'statusCode': 200}
204+
180205
cmd = config.get_command(message.text)
181206
if cmd and config.is_local_command(cmd):
182207
_handle_local_command(bot, message, config, cmd)

agent-sdk-client/security.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
"""Security module for Telegram Bot access control."""
2+
from telegram import Update
3+
4+
5+
def is_user_allowed(user_id: int, whitelist: list[int | str]) -> bool:
6+
"""Check if user is in whitelist.
7+
8+
Args:
9+
user_id: Telegram user ID to check.
10+
whitelist: List of allowed user IDs, or ['all'] to allow everyone.
11+
12+
Returns:
13+
True if user is allowed, False otherwise.
14+
"""
15+
if 'all' in whitelist:
16+
return True
17+
return user_id in whitelist
18+
19+
20+
def should_leave_group(update: Update, whitelist: list[int | str]) -> bool:
21+
"""Check if bot should leave a group based on who added it.
22+
23+
Args:
24+
update: Telegram Update object with my_chat_member event.
25+
whitelist: List of allowed user IDs who can add bot to groups.
26+
27+
Returns:
28+
True if bot should leave (added by unauthorized user), False otherwise.
29+
"""
30+
if not update.my_chat_member:
31+
return False
32+
33+
member_update = update.my_chat_member
34+
old_status = member_update.old_chat_member.status
35+
new_status = member_update.new_chat_member.status
36+
37+
# Bot being added to group (status changed from left/kicked to member/administrator)
38+
if old_status in ('left', 'kicked') and new_status in ('member', 'administrator'):
39+
inviter_id = member_update.from_user.id
40+
return not is_user_allowed(inviter_id, whitelist)
41+
42+
return False

0 commit comments

Comments
 (0)