Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions migrations/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from alembic import context
from sqlalchemy import engine_from_config, pool

from print_service.models import Model
from print_service.models import BaseDbModel
from print_service.settings import get_settings


Expand All @@ -20,7 +20,7 @@
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = Model.metadata
target_metadata = BaseDbModel.metadata

# other values from the config, defined by the needs of env.py,
# can be acquired:
Expand Down
34 changes: 34 additions & 0 deletions migrations/versions/90539e2253b3_add_soft_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""add soft delete

Revision ID: 90539e2253b3
Revises: a68c6bb2972c
Create Date: 2025-06-01 17:29:08.641697

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '90539e2253b3'
down_revision = 'a68c6bb2972c'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('file', 'source',
existing_type=sa.VARCHAR(),
nullable=False)
op.add_column('union_member', sa.Column('is_deleted', sa.Boolean(), nullable=False))
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('union_member', 'is_deleted')
op.alter_column('file', 'source',
existing_type=sa.VARCHAR(),
nullable=True)
# ### end Alembic commands ###
24 changes: 23 additions & 1 deletion print_service/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,30 @@
settings = get_settings()


class PrintAPIError(Exception):
eng: str
ru: str

def __init__(self, eng: str, ru: str) -> None:
self.eng = eng
self.ru = ru
super().__init__(eng)


class ObjectNotFound(Exception):
pass
def __init__(self, obj: type, obj_id_or_name: int | str):
super().__init__(
f"Object {obj.__name__} {obj_id_or_name=} not found",
f"Объект {obj.__name__} с идентификатором {obj_id_or_name} не найден",
)


class AlreadyExists(PrintAPIError):
def __init__(self, obj: type, obj_id_or_name: int | str):
super().__init__(
f"Object {obj.__name__}, {obj_id_or_name=} already exists",
f"Объект {obj.__name__} с идентификатором {obj_id_or_name=} уже существует",
)


class TerminalTokenNotFound(ObjectNotFound):
Expand Down
93 changes: 3 additions & 90 deletions print_service/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,92 +1,5 @@
from __future__ import annotations
from .base import Base, BaseDbModel
from .db import *

import math
from datetime import datetime

from sqlalchemy import Column, DateTime, Integer, String
from sqlalchemy.ext.declarative import as_declarative
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql.schema import ForeignKey
from sqlalchemy.sql.sqltypes import Boolean


@as_declarative()
class Model:
pass


class UnionMember(Model):
__tablename__ = 'union_member'

id: Mapped[int] = mapped_column(Integer, primary_key=True)
surname: Mapped[str] = mapped_column(String, nullable=False)
union_number: Mapped[str] = mapped_column(String, nullable=True)
student_number: Mapped[str] = mapped_column(String, nullable=True)

files: Mapped[list[File]] = relationship('File', back_populates='owner')
print_facts: Mapped[list[PrintFact]] = relationship('PrintFact', back_populates='owner')


class File(Model):
__tablename__ = 'file'

id: Mapped[int] = Column(Integer, primary_key=True)
pin: Mapped[str] = Column(String, nullable=False)
file: Mapped[str] = Column(String, nullable=False)
owner_id: Mapped[int] = Column(Integer, ForeignKey('union_member.id'), nullable=False)
option_pages: Mapped[str] = Column(String)
option_copies: Mapped[int] = Column(Integer)
option_two_sided: Mapped[bool] = Column(Boolean)
created_at: Mapped[datetime] = Column(DateTime, nullable=False, default=datetime.utcnow)
updated_at: Mapped[datetime] = Column(
DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow
)
number_of_pages: Mapped[int] = Column(Integer)
source: Mapped[str] = Column(String, default='unknown', nullable=False)

owner: Mapped[UnionMember] = relationship('UnionMember', back_populates='files')
print_facts: Mapped[list[PrintFact]] = relationship('PrintFact', back_populates='file')

@property
def flatten_pages(self) -> list[int] | None:
'''Возвращает расширенный список из элементов списков внутренних целочисленных точек переданного множества отрезков
"1-5, 3, 2" --> [1, 2, 3, 4, 5, 3, 2]'''
if self.number_of_pages is None:
return None
result = list()
if self.option_pages == '':
return result
for part in self.option_pages.split(','):
x = part.split('-')
result.extend(range(int(x[0]), int(x[-1]) + 1))
return result

@property
def sheets_count(self) -> int | None:
'''Возвращает количество элементов списков внутренних целочисленных точек переданного множества отрезков
"1-5, 3, 2" --> 7
P.S. 1, 2, 3, 4, 5, 3, 2 -- 7 чисел'''
if self.number_of_pages is None:
return None
if not self.flatten_pages:
return (
math.ceil(self.number_of_pages - (self.option_two_sided * self.number_of_pages / 2))
* self.option_copies
)
if self.option_two_sided:
return math.ceil(len(self.flatten_pages) / 2) * self.option_copies
else:
return len(self.flatten_pages) * self.option_copies


class PrintFact(Model):
__tablename__ = 'print_fact'

id: Mapped[int] = Column(Integer, primary_key=True)
file_id: Mapped[int] = Column(Integer, ForeignKey('file.id'), nullable=False)
owner_id: Mapped[int] = Column(Integer, ForeignKey('union_member.id'), nullable=False)
created_at: Mapped[datetime] = Column(DateTime, nullable=False, default=datetime.utcnow)

owner: Mapped[UnionMember] = relationship('UnionMember', back_populates='print_facts')
file: Mapped[File] = relationship('File', back_populates='print_facts')
sheets_used: Mapped[int] = Column(Integer)
__all__ = ["Base", "BaseDbModel", "UnionMember", "File", "PrintFact"]
87 changes: 87 additions & 0 deletions print_service/models/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from __future__ import annotations

import re

from sqlalchemy import not_
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import Query, Session, as_declarative, declared_attr

from ..exceptions import AlreadyExists, ObjectNotFound


@as_declarative()
class Base:
"""Base class for all database entities"""

@declared_attr
def __tablename__(cls) -> str: # pylint: disable=no-self-argument
"""Generate database table name automatically.
Convert CamelCase class name to snake_case db table name.
"""
return re.sub(r"(?<!^)(?=[A-Z])", "_", cls.__name__).lower()

def __repr__(self):
attrs = []
for c in self.__table__.columns:
attrs.append(f"{c.name}={getattr(self, c.name)}")
return "{}({})".format(c.__class__.__name__, ', '.join(attrs))


class BaseDbModel(Base):
__abstract__ = True

@classmethod
def create(cls, *, session: Session, **kwargs) -> BaseDbModel:
obj = cls(**kwargs)
session.add(obj)
session.flush()
return obj

@classmethod
def query(cls, *, with_deleted: bool = False, session: Session) -> Query:
"""Get all objects with soft deletes"""
objs = session.query(cls)
if not with_deleted and hasattr(cls, "is_deleted"):
objs = objs.filter(not_(cls.is_deleted))
return objs

@classmethod
def get(cls, id: int | str, *, with_deleted=False, session: Session) -> BaseDbModel:
"""Get object with soft deletes"""
objs = session.query(cls)
if not with_deleted and hasattr(cls, "is_deleted"):
objs = objs.filter(not_(cls.is_deleted))
try:
if hasattr(cls, "uuid"):
return objs.filter(cls.uuid == id).one()
return objs.filter(cls.id == id).one()
except NoResultFound:
raise ObjectNotFound(obj=cls, obj_id_or_name=id)

@classmethod
def update(cls, id: int | str, *, session: Session, **kwargs) -> BaseDbModel:
"""Update model with new values from kwargs.
If no new values are given, raise HTTP 409 error.
"""
get_new_values = False
obj = cls.get(id, session=session)
for k, v in kwargs.items():
cur_v = getattr(obj, k)
if cur_v != v:
setattr(obj, k, v)
get_new_values = True
if not get_new_values:
raise AlreadyExists(cls, id)
session.add(obj)
session.flush()
return obj

@classmethod
def delete(cls, id: int | str, *, session: Session) -> None:
"""Soft delete object if possible, else hard delete"""
obj = cls.get(id, session=session)
if hasattr(obj, "is_deleted"):
obj.is_deleted = True
else:
session.delete(obj)
session.flush()
97 changes: 97 additions & 0 deletions print_service/models/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from __future__ import annotations

import math
from datetime import datetime

from sqlalchemy import Column, DateTime, Integer, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql.schema import ForeignKey
from sqlalchemy.sql.sqltypes import Boolean

from .base import BaseDbModel


class UnionMember(BaseDbModel):
__tablename__ = 'union_member'

id: Mapped[int] = mapped_column(Integer, primary_key=True)
surname: Mapped[str] = mapped_column(String, nullable=False)
union_number: Mapped[str] = mapped_column(String, nullable=True)
student_number: Mapped[str] = mapped_column(String, nullable=True)

files: Mapped[list[File]] = relationship('File', back_populates='owner')
print_facts: Mapped[list[PrintFact]] = relationship('PrintFact', back_populates='owner')
is_deleted: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)


class File(BaseDbModel):
__tablename__ = 'file'

id: Mapped[int] = Column(Integer, primary_key=True)
pin: Mapped[str] = Column(String, nullable=False)
file: Mapped[str] = Column(String, nullable=False)
owner_id: Mapped[int] = Column(Integer, ForeignKey('union_member.id'), nullable=False)
option_pages: Mapped[str] = Column(String)
option_copies: Mapped[int] = Column(Integer)
option_two_sided: Mapped[bool] = Column(Boolean)
created_at: Mapped[datetime] = Column(DateTime, nullable=False, default=datetime.utcnow)
updated_at: Mapped[datetime] = Column(
DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow
)
number_of_pages: Mapped[int] = Column(Integer)
source: Mapped[str] = Column(String, default='unknown', nullable=False)

owner: Mapped[UnionMember] = relationship(
'UnionMember',
primaryjoin="and_(File.owner_id==UnionMember.id, not_(UnionMember.is_deleted))",
back_populates='files',
)
print_facts: Mapped[list[PrintFact]] = relationship('PrintFact', back_populates='file')

@property
def flatten_pages(self) -> list[int] | None:
'''Возвращает расширенный список из элементов списков внутренних целочисленных точек переданного множества отрезков
"1-5, 3, 2" --> [1, 2, 3, 4, 5, 3, 2]'''
if self.number_of_pages is None:
return None
result = list()
if self.option_pages == '':
return result
for part in self.option_pages.split(','):
x = part.split('-')
result.extend(range(int(x[0]), int(x[-1]) + 1))
return result

@property
def sheets_count(self) -> int | None:
'''Возвращает количество элементов списков внутренних целочисленных точек переданного множества отрезков
"1-5, 3, 2" --> 7
P.S. 1, 2, 3, 4, 5, 3, 2 -- 7 чисел'''
if self.number_of_pages is None:
return None
if not self.flatten_pages:
return (
math.ceil(self.number_of_pages - (self.option_two_sided * self.number_of_pages / 2))
* self.option_copies
)
if self.option_two_sided:
return math.ceil(len(self.flatten_pages) / 2) * self.option_copies
else:
return len(self.flatten_pages) * self.option_copies


class PrintFact(BaseDbModel):
__tablename__ = 'print_fact'

id: Mapped[int] = Column(Integer, primary_key=True)
file_id: Mapped[int] = Column(Integer, ForeignKey('file.id'), nullable=False)
owner_id: Mapped[int] = Column(Integer, ForeignKey('union_member.id'), nullable=False)
created_at: Mapped[datetime] = Column(DateTime, nullable=False, default=datetime.utcnow)

owner: Mapped[UnionMember] = relationship(
'UnionMember',
primaryjoin="and_(PrintFact.owner_id==UnionMember.id, not_(UnionMember.is_deleted))",
back_populates='print_facts',
)
file: Mapped[File] = relationship('File', back_populates='print_facts')
sheets_used: Mapped[int] = Column(Integer)
2 changes: 1 addition & 1 deletion print_service/routes/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging

from auth_lib.fastapi import UnionAuth
from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends
from redis import Redis

from print_service.exceptions import TerminalTokenNotFound
Expand Down
1 change: 0 additions & 1 deletion print_service/routes/exc_handlers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import requests.models
import starlette.requests
from starlette.responses import JSONResponse

Expand Down
Loading
Loading