-
-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathroutes.py
More file actions
135 lines (112 loc) · 4.21 KB
/
routes.py
File metadata and controls
135 lines (112 loc) · 4.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import time
from flask import Blueprint, request, session
from flask import render_template, redirect, jsonify
from werkzeug.security import gen_salt
from authlib.integrations.flask_oauth2 import current_token
from authlib.oauth2 import OAuth2Error
from .models import db, User, OAuth2Client
from .oauth2 import authorization, require_oauth
bp = Blueprint(__name__, 'home')
def current_user():
if 'id' in session:
uid = session['id']
return User.query.get(uid)
return None
@bp.route('/', methods=('GET', 'POST'))
def home():
if request.method == 'POST':
username = request.form.get('username')
user = User.query.filter_by(username=username).first()
if not user:
user = User(username=username)
db.session.add(user)
db.session.commit()
session['id'] = user.id
return redirect('/')
user = current_user()
if user:
clients = OAuth2Client.query.filter_by(user_id=user.id).all()
else:
clients = []
return render_template('home.html', user=user, clients=clients)
def split_by_crlf(s):
return [v for v in s.splitlines() if v]
@bp.route('/create_client', methods=('GET', 'POST'))
def create_client():
user = current_user()
if not user:
return redirect('/')
if request.method == 'GET':
return render_template('create_client.html')
form = request.form
client_id = gen_salt(24)
client = OAuth2Client(client_id=client_id, user_id=user.id)
# Mixin doesn't set the issue_at date
client.client_id_issued_at = int(time.time())
if client.token_endpoint_auth_method == 'none':
client.client_secret = ''
else:
client.client_secret = gen_salt(48)
client_metadata = {
"client_name": form["client_name"],
"client_uri": form["client_uri"],
"grant_types": split_by_crlf(form["grant_type"]),
"redirect_uris": split_by_crlf(form["redirect_uri"]),
"response_types": split_by_crlf(form["response_type"]),
"scope": form["scope"],
"token_endpoint_auth_method": form["token_endpoint_auth_method"]
}
client.set_client_metadata(client_metadata)
db.session.add(client)
db.session.commit()
return redirect('/')
@bp.route('/edit_client/<client_id>', methods=('GET', 'POST'))
def edit_client(client_id=None):
user = current_user()
if not user:
return redirect('/')
client = OAuth2Client.query.filter_by(client_id=client_id).first()
if request.method == 'GET':
return render_template('edit_client.html', client_id=client_id, client=client)
form = request.form
client_metadata = {
"client_name": form["client_name"],
"client_uri": form["client_uri"],
"grant_types": split_by_crlf(form["grant_type"]),
"redirect_uris": split_by_crlf(form["redirect_uri"]),
"response_types": split_by_crlf(form["response_type"]),
"scope": form["scope"],
"token_endpoint_auth_method": form["token_endpoint_auth_method"]
}
client.set_client_metadata(client_metadata)
if form['token_endpoint_auth_method'] == 'none':
client.client_secret = ''
elif client.client_secret == '':
client.client_secret = gen_salt(48)
db.session.commit()
return redirect('/')
@bp.route('/oauth/authorize', methods=['GET', 'POST'])
def authorize():
user = current_user()
if request.method == 'GET':
try:
grant = authorization.validate_consent_request(end_user=user)
except OAuth2Error as error:
return jsonify(dict(error.get_body()))
return render_template('authorize.html', user=user, grant=grant)
if not user and 'username' in request.form:
username = request.form.get('username')
user = User.query.filter_by(username=username).first()
if request.form['confirm']:
grant_user = user
else:
grant_user = None
return authorization.create_authorization_response(grant_user=grant_user)
@bp.route('/oauth/token', methods=['POST'])
def issue_token():
return authorization.create_token_response()
@bp.route('/api/me')
@require_oauth('profile')
def api_me():
user = current_token.user
return jsonify(id=user.id, username=user.username)