-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmain.py
More file actions
121 lines (94 loc) · 4.05 KB
/
main.py
File metadata and controls
121 lines (94 loc) · 4.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
import models
import schemas
from database import SessionLocal, engine
from typing import List
# Initialize the app and create database tables
app = FastAPI()
models.Base.metadata.create_all(bind=engine)
# Dependency for the database session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Utility Functions
def get_book_or_404(book_id: int, db: Session):
book = db.query(models.Book).filter(models.Book.id == book_id).first()
if not book:
raise HTTPException(status_code=404, detail="Book not found")
return book
# CRUD Operations
@app.post("/books/", response_model=schemas.Book)
def create_book(book: schemas.BookCreate, db: Session = Depends(get_db)):
db_book = models.Book(**book.dict())
db.add(db_book)
db.commit()
db.refresh(db_book)
return db_book
@app.get("/books/", response_model=List[schemas.Book])
def read_books(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
books = db.query(models.Book).offset(skip).limit(limit).all()
return books
@app.get("/books/{book_id}", response_model=schemas.Book)
def read_book(book_id: int, db: Session = Depends(get_db)):
book = db.query(models.Book).filter(models.Book.id == book_id).first()
if book is None:
raise HTTPException(status_code=404, detail="Book not found")
return book
@app.put("/books/{book_id}", response_model=schemas.Book)
def update_book(book_id: int, book: schemas.BookCreate, db: Session = Depends(get_db)):
db_book = db.query(models.Book).filter(models.Book.id == book_id).first()
if db_book is None:
raise HTTPException(status_code=404, detail="Book not found")
for key, value in book.dict().items():
setattr(db_book, key, value)
db.commit()
db.refresh(db_book)
return db_book
@app.delete("/books/{book_id}", response_model=schemas.Book)
def delete_book(book_id: int, db: Session = Depends(get_db)):
db_book = db.query(models.Book).filter(models.Book.id == book_id).first()
if db_book is None:
raise HTTPException(status_code=404, detail="Book not found")
db.delete(db_book)
db.commit()
return db_book
@app.post("/publishers/", response_model=schemas.Publisher)
def create_publisher(publisher: schemas.PublisherCreate, db: Session = Depends(get_db)):
db_publisher = models.Publisher(**publisher.dict())
db.add(db_publisher)
db.commit()
db.refresh(db_publisher)
return db_publisher
@app.get("/publishers/", response_model=List[schemas.Publisher])
def read_publishers(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
publishers = db.query(models.Publisher).offset(skip).limit(limit).all()
return publishers
@app.get("/publishers/{publisher_id}", response_model=schemas.Publisher)
def read_publisher(publisher_id: int, db: Session = Depends(get_db)):
publisher = db.query(models.Publisher).filter(models.Publisher.id == publisher_id).first()
if not publisher:
raise HTTPException(status_code=404, detail="Publisher not found")
return publisher
@app.put("/publishers/{publisher_id}", response_model=schemas.Publisher)
def update_publisher(publisher_id: int, publisher: schemas.PublisherCreate, db: Session = Depends(get_db)):
db_publisher = db.query(models.Publisher).filter(models.Publisher.id == publisher_id).first()
if not db_publisher:
raise HTTPException(status_code=404, detail="Publisher not found")
for key, value in publisher.dict().items():
setattr(db_publisher, key, value)
db.commit()
db.refresh(db_publisher)
return db_publisher
@app.delete("/publishers/{publisher_id}", response_model=schemas.Publisher)
def delete_publisher(publisher_id: int, db: Session = Depends(get_db)):
db_publisher = db.query(models.Publisher).filter(models.Publisher.id == publisher_id).first()
if not db_publisher:
raise HTTPException(status_code=404, detail="Publisher not found")
db.delete(db_publisher)
db.commit()
return db_publisher