-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtodo.txt
More file actions
139 lines (134 loc) · 4.58 KB
/
todo.txt
File metadata and controls
139 lines (134 loc) · 4.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
create users and authentication
later -
filter by priority
# new jwt imports
# router = APIRouter(
# prefix = '/auth',
# tags = ['authentication']
# )
# SECRET_KEY = os.getenv("SECRET_KEY")
# ALGORITHM = os.getenv("ALGORITHM")
# ACCESS_TOKEN_EXPIRE_MINUTES = os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES")
#
# pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
#
#
# def verify_password(plain_password, hashed_password):
# print("plain pw", plain_password)
# print("hashed pw", hashed_password)
# pwd = pwd_context.verify(plain_password, hashed_password)
# print("verify pw2", pwd)
#
# return pwd_context.verify(plain_password, hashed_password)
#
# def get_password_hash(password):
# return pwd_context.hash(password)
#
# def get_user(username:str, db: Session):
# user = db.query(User).filter(User.username == username).first()
# print("get user")
# if user is None:
# raise ValueError(f"User with username '{username}' not found'")
# user_dict = {key: value for key, value in user.__dict__.items() if not key.startswith("_")}
# print("user dict type =", user_dict)
# return UserInDB.model_validate(user_dict)
#
# def authenticate_user(username:str, password:str, db: Session):
# user = get_user(username, db)
# if not user:
# return False
# if not verify_password(password, user.hashed_password):
# print("verify not user")
# return False
#
# print("auth return")
# return user
#
#
#
# def create_access_token(data: dict, expires_delta: timedelta | None = None):
# to_encode = data.copy()
# print("timedelta create access token")
# if expires_delta:
# expire = datetime.now(timezone.utc) + expires_delta
# else:
# expire = datetime.now(timezone.utc) + timedelta(minutes = 15)
# to_encode.update({"exp":expire})
# encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm = ALGORITHM)
# return encoded_jwt
#
# async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)], db: Session = Depends(get_db)):
# credentials_exception = HTTPException(
# status_code=status.HTTP_401_UNAUTHORIZED,
# detail="Could not validate credentials",
# headers={"WWW-Authenticate": "Bearer"}
# )
# try:
# payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# username = payload.get("sub")
# if username is None:
# raise credentials_exception
# token_data = TokenData(username=username)
# except InvalidTokenError:
# raise credentials_exception
# user = get_user(username=token_data.username, db=db)
# print("get_current_user")
# if user is None:
# raise credentials_exception
# return user
#
#
# async def get_current_active_user(
# current_user: Annotated[User, Depends(get_current_user)]
# ):
# if current_user.disabled:
# raise HTTPException(status_code=400, detail="Inactive user")
# return current_user
#
# #todo understand from this point down
# @app.post("/token")
# async def login_for_access_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: Session = Depends(get_db) ) -> Token:
# print("log in for access token1", form_data.password)
# user = authenticate_user(form_data.username, form_data.password, db)
# print(user)
# print("log in for access token2")
#
# if not user:
#
# raise HTTPException(
# status_code=status.HTTP_401_UNAUTHORIZED,
# detail="Incorrect username or password",
# headers = {"WWW-Authenticate": "Bearer"},
# )
# print("log in for access token3")
# access_token_expires = timedelta(minutes = int(ACCESS_TOKEN_EXPIRE_MINUTES))
# access_token = create_access_token(
# data={"sub": form_data.username}, expires_delta= access_token_expires
# )
# return Token(access_token=access_token, token_type="bearer")
#
# @app.get("/me")
# async def read_users_me(
# current_user: Annotated[User, Depends(get_current_active_user)],
# ):
# return current_user
#
# # @app.get("/me/items/")
# # async def read_own_items(
# # current_user: Annotated[User, Depends(get_current_active_user)],
# # ):
# # return [{"item_id": "Foo", "owner": current_user.username}]
# #
#
# if __name__ == "__main__":
# db_session = next(get_db())
#
# username = "test1"
#
# try:
# user_data = get_user(username, db_session)
# print("Returned User Object:", user_data)
# except Exception as e:
# print(f"Error: {e}")
#