|
| 1 | +import secrets |
| 2 | +import string |
| 3 | +from typing import Any |
| 4 | + |
1 | 5 | from sqlalchemy import insert, select |
2 | 6 | from sqlalchemy.dialects.postgresql import insert as pg_insert |
3 | 7 | from sqlalchemy.ext.asyncio import AsyncSession |
4 | 8 |
|
5 | | -from app.domains.auth.models import Permission, Role, role_permissions |
| 9 | +from app.core.security import PasswordSecurity |
| 10 | +from app.domains.auth.models import Permission, Role, User, role_permissions, user_roles |
6 | 11 |
|
7 | 12 |
|
8 | 13 | async def seed_roles(session: AsyncSession) -> None: |
@@ -62,7 +67,8 @@ async def seed_permissions(session: AsyncSession) -> None: |
62 | 67 | {"name": "ticket:update_status", "description": "Update ticket status"}, |
63 | 68 | ] |
64 | 69 |
|
65 | | - await session.execute(insert(Permission).values(permissions)) |
| 70 | + insert_stmt = pg_insert(Permission).values(permissions).on_conflict_do_nothing() |
| 71 | + await session.execute(insert_stmt) |
66 | 72 |
|
67 | 73 |
|
68 | 74 | async def seed_role_permissions(session: AsyncSession) -> None: |
@@ -90,3 +96,57 @@ async def seed_role_permissions(session: AsyncSession) -> None: |
90 | 96 | values = [{"role_id": role_id, "permission_id": perm_id} for perm_id in permission_ids] |
91 | 97 | insert_stmt = pg_insert(role_permissions).values(values).on_conflict_do_nothing() |
92 | 98 | await session.execute(insert_stmt) |
| 99 | + |
| 100 | + |
| 101 | +def generate_random_password(length: int = 16) -> str: |
| 102 | + alphabet = string.ascii_letters + string.digits + string.punctuation |
| 103 | + |
| 104 | + while True: |
| 105 | + password = "".join(secrets.choice(alphabet) for _ in range(length)) |
| 106 | + |
| 107 | + if ( |
| 108 | + any(c.islower() for c in password) |
| 109 | + and any(c.isupper() for c in password) |
| 110 | + and any(c.isdigit() for c in password) |
| 111 | + and any(c in string.punctuation for c in password) |
| 112 | + ): |
| 113 | + return password |
| 114 | + |
| 115 | + |
| 116 | +async def seed_users(session: AsyncSession) -> None: |
| 117 | + password_security = PasswordSecurity() |
| 118 | + |
| 119 | + admin_names: list[str] = ["angelina", "eduardo", "julia", "mafe", "pedro", "wesley"] |
| 120 | + default_password = "Admin@123!" |
| 121 | + users_payload: list[dict[str, Any]] = [] |
| 122 | + |
| 123 | + for name in admin_names: |
| 124 | + users_payload.append( |
| 125 | + { |
| 126 | + "email": f"{name}@syncdesk.pro", |
| 127 | + "password_hash": password_security.generate_password_hash(default_password), |
| 128 | + "username": name, |
| 129 | + "name": name, |
| 130 | + "must_change_password": False, |
| 131 | + "must_accept_terms": False |
| 132 | + } |
| 133 | + ) |
| 134 | + |
| 135 | + insert_stmt = pg_insert(User).values(users_payload).on_conflict_do_nothing() |
| 136 | + await session.execute(insert_stmt) |
| 137 | + |
| 138 | + role_result = await session.execute(select(Role.id).where(Role.name == "admin")) |
| 139 | + admin_role_id = role_result.scalar_one_or_none() |
| 140 | + if admin_role_id is None: |
| 141 | + return |
| 142 | + |
| 143 | + users_result = await session.execute( |
| 144 | + select(User.id).where(User.email.in_([user["email"] for user in users_payload])) |
| 145 | + ) |
| 146 | + user_ids = users_result.scalars().all() |
| 147 | + if not user_ids: |
| 148 | + return |
| 149 | + |
| 150 | + user_role_values = [{"user_id": user_id, "role_id": admin_role_id} for user_id in user_ids] |
| 151 | + role_insert_stmt = pg_insert(user_roles).values(user_role_values).on_conflict_do_nothing() |
| 152 | + await session.execute(role_insert_stmt) |
0 commit comments