Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ venv/*
env/*

.vscode/*
.claude/

# ignore http server log
atests/http_server/http_server.log
95 changes: 95 additions & 0 deletions atests/http_server/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@
# See AUTHORS and LICENSE for more information

from flask import Flask, Response, jsonify as flask_jsonify, request
from flask_httpauth import HTTPBasicAuth, HTTPDigestAuth

from .structures import CaseInsensitiveDict
from .helpers import get_dict, status_code
from .utils import weighted_choice


app = Flask(__name__)
app.config['SECRET_KEY'] = 'test-secret-key-for-digest-auth'

# Initialize authentication handlers
basic_auth = HTTPBasicAuth()
digest_auth = HTTPDigestAuth()


def jsonify(*args, **kwargs):
Expand Down Expand Up @@ -191,3 +197,92 @@ def redirect_to():
response.headers["Location"] = args["url"]

return response


# Basic auth verification callback
@basic_auth.verify_password
def verify_basic_password(username, password):
# Get expected credentials from the request path
path_parts = request.path.split('/')
if len(path_parts) >= 4 and path_parts[1] == 'basic-auth':
expected_user = path_parts[2]
expected_pass = path_parts[3]
return username == expected_user and password == expected_pass
return False


@app.route("/basic-auth/<user>/<passwd>")
@basic_auth.login_required
def basic_auth_endpoint(user, passwd):
"""Prompts the user for authorization using HTTP Basic Auth.
---
tags:
- Auth
parameters:
- in: path
name: user
type: string
required: true
- in: path
name: passwd
type: string
required: true
produces:
- application/json
responses:
200:
description: Successful authentication.
401:
description: Unsuccessful authentication.
"""
return jsonify(authenticated=True, user=basic_auth.current_user())


# Digest auth password callback
@digest_auth.get_password
def get_digest_password(username):
# Get expected credentials from the request path
path_parts = request.path.split('/')
if len(path_parts) >= 5 and path_parts[1] == 'digest-auth':
expected_user = path_parts[3]
expected_pass = path_parts[4]
if username == expected_user:
return expected_pass
return None


@app.route("/digest-auth/<qop>/<user>/<passwd>")
@app.route("/digest-auth/<qop>/<user>/<passwd>/<algorithm>")
@digest_auth.login_required
def digest_auth_endpoint(qop, user, passwd, algorithm='MD5'):
"""Prompts the user for authorization using HTTP Digest Auth.
---
tags:
- Auth
parameters:
- in: path
name: qop
type: string
required: true
- in: path
name: user
type: string
required: true
- in: path
name: passwd
type: string
required: true
- in: path
name: algorithm
type: string
required: false
default: MD5
produces:
- application/json
responses:
200:
description: Successful authentication.
401:
description: Unsuccessful authentication.
"""
return jsonify(authenticated=True, user=digest_auth.current_user())
145 changes: 0 additions & 145 deletions atests/http_server/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,126 +256,6 @@ def status_code(code):
return r


def check_basic_auth(user, passwd):
"""Checks user authentication using HTTP Basic Auth."""

auth = request.authorization
return auth and auth.username == user and auth.password == passwd


# Digest auth helpers
# qop is a quality of protection


def H(data, algorithm):
if algorithm == 'SHA-256':
return sha256(data).hexdigest()
elif algorithm == 'SHA-512':
return sha512(data).hexdigest()
else:
return md5(data).hexdigest()


def HA1(realm, username, password, algorithm):
"""Create HA1 hash by realm, username, password

HA1 = md5(A1) = MD5(username:realm:password)
"""
if not realm:
realm = u''
return H(b":".join([username.encode('utf-8'),
realm.encode('utf-8'),
password.encode('utf-8')]), algorithm)


def HA2(credentials, request, algorithm):
"""Create HA2 md5 hash

If the qop directive's value is "auth" or is unspecified, then HA2:
HA2 = md5(A2) = MD5(method:digestURI)
If the qop directive's value is "auth-int" , then HA2 is
HA2 = md5(A2) = MD5(method:digestURI:MD5(entityBody))
"""
if credentials.get("qop") == "auth" or credentials.get('qop') is None:
return H(b":".join([request['method'].encode('utf-8'), request['uri'].encode('utf-8')]), algorithm)
elif credentials.get("qop") == "auth-int":
for k in 'method', 'uri', 'body':
if k not in request:
raise ValueError("%s required" % k)
A2 = b":".join([request['method'].encode('utf-8'),
request['uri'].encode('utf-8'),
H(request['body'], algorithm).encode('utf-8')])
return H(A2, algorithm)
raise ValueError


def response(credentials, password, request):
"""Compile digest auth response

If the qop directive's value is "auth" or "auth-int" , then compute the response as follows:
RESPONSE = MD5(HA1:nonce:nonceCount:clienNonce:qop:HA2)
Else if the qop directive is unspecified, then compute the response as follows:
RESPONSE = MD5(HA1:nonce:HA2)

Arguments:
- `credentials`: credentials dict
- `password`: request user password
- `request`: request dict
"""
response = None
algorithm = credentials.get('algorithm')
HA1_value = HA1(
credentials.get('realm'),
credentials.get('username'),
password,
algorithm
)
HA2_value = HA2(credentials, request, algorithm)
if credentials.get('qop') is None:
response = H(b":".join([
HA1_value.encode('utf-8'),
credentials.get('nonce', '').encode('utf-8'),
HA2_value.encode('utf-8')
]), algorithm)
elif credentials.get('qop') == 'auth' or credentials.get('qop') == 'auth-int':
for k in 'nonce', 'nc', 'cnonce', 'qop':
if k not in credentials:
raise ValueError("%s required for response H" % k)
response = H(b":".join([HA1_value.encode('utf-8'),
credentials.get('nonce').encode('utf-8'),
credentials.get('nc').encode('utf-8'),
credentials.get('cnonce').encode('utf-8'),
credentials.get('qop').encode('utf-8'),
HA2_value.encode('utf-8')]), algorithm)
else:
raise ValueError("qop value are wrong")

return response


def check_digest_auth(user, passwd):
"""Check user authentication using HTTP Digest auth"""

if request.headers.get('Authorization'):
credentials = Authorization.from_header(request.headers.get('Authorization'))
if not credentials:
return
request_uri = request.script_root + request.path
if request.query_string:
request_uri += '?' + request.query_string
response_hash = response(credentials, passwd, dict(uri=request_uri,
body=request.data,
method=request.method))
if credentials.get('response') == response_hash:
return True
return False


def secure_cookie():
"""Return true if cookie should have secure attribute"""
return request.environ['wsgi.url_scheme'] == 'https'


def __parse_request_range(range_header_text):
""" Return a tuple describing the byte range requested in a GET request
If the range is open ended on the left or right side, then a value of None
Expand Down Expand Up @@ -453,28 +333,3 @@ def next_stale_after_value(stale_after):
return str(stal_after_count)
except ValueError:
return 'never'


def digest_challenge_response(app, qop, algorithm, stale=False):
response = app.make_response('')
response.status_code = 401

# RFC2616 Section4.2: HTTP headers are ASCII. That means
# request.remote_addr was originally ASCII, so I should be able to
# encode it back to ascii. Also, RFC2617 says about nonces: "The
# contents of the nonce are implementation dependent"
nonce = H(b''.join([
getattr(request, 'remote_addr', u'').encode('ascii'),
b':',
str(time.time()).encode('ascii'),
b':',
os.urandom(10)
]), algorithm)
opaque = H(os.urandom(10), algorithm)

auth = WWWAuthenticate("digest")
auth.set_digest('me@kennethreitz.com', nonce, opaque=opaque,
qop=('auth', 'auth-int') if qop is None else (qop,), algorithm=algorithm)
auth.stale = stale
response.headers['WWW-Authenticate'] = auth.to_header()
return response
16 changes: 8 additions & 8 deletions atests/test_authentication.robot
Original file line number Diff line number Diff line change
@@ -1,34 +1,34 @@
*** Settings ***
Library RequestsLibrary
Library customAuthenticator.py
Resource res_setup.robot


*** Test Cases ***
Get With Auth
[Tags] get get-cert
${auth}= Create List user passwd
Create Session httpbin https://httpbin.org auth=${auth} verify=${CURDIR}${/}cacert.pem
${resp}= GET On Session httpbin /basic-auth/user/passwd
Create Session authsession ${HTTP_LOCAL_SERVER} auth=${auth}
${resp}= GET On Session authsession /basic-auth/user/passwd
Should Be Equal As Strings ${resp.status_code} 200
Should Be Equal As Strings ${resp.json()['authenticated']} True

Get With Custom Auth
[Tags] get
${auth}= Get Custom Auth user passwd
Create Custom Session httpbin https://httpbin.org auth=${auth} verify=${CURDIR}${/}cacert.pem
${resp}= GET On Session httpbin /basic-auth/user/passwd
Create Custom Session authsession ${HTTP_LOCAL_SERVER} auth=${auth}
${resp}= GET On Session authsession /basic-auth/user/passwd
Should Be Equal As Strings ${resp.status_code} 200
Should Be Equal As Strings ${resp.json()['authenticated']} True

Get With Digest Auth
[Tags] get get-cert
${auth}= Create List user pass
Create Digest Session
... httpbin
... https://httpbin.org
... authsession
... ${HTTP_LOCAL_SERVER}
... auth=${auth}
... debug=3
... verify=${CURDIR}${/}cacert.pem
${resp}= GET On Session httpbin /digest-auth/auth/user/pass
${resp}= GET On Session authsession /digest-auth/auth/user/pass
Should Be Equal As Strings ${resp.status_code} 200
Should Be Equal As Strings ${resp.json()['authenticated']} True
12 changes: 6 additions & 6 deletions atests/test_ssl_certs.robot
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ Library RequestsLibrary
*** Test Cases ***
Get HTTPS & Verify Cert
[Tags] get get-cert
Create Session httpbin https://httpbin.org verify=True
${resp}= GET On Session httpbin /get
Create Session sslsession https://github.com verify=True
${resp}= GET On Session sslsession /
Should Be Equal As Strings ${resp.status_code} 200

Get HTTPS & Verify Cert with a CA bundle
[Tags] get get-cert
Create Session httpbin https://httpbin.org verify=${CURDIR}${/}cacert.pem
${resp}= GET On Session httpbin /get
Create Session sslsession https://github.com verify=${CURDIR}${/}cacert.pem
${resp}= GET On Session sslsession /
Should Be Equal As Strings ${resp.status_code} 200

Get HTTPS with Client Side Certificates
[Tags] get get-cert
@{client_certs}= Create List ${CURDIR}${/}clientcert.pem ${CURDIR}${/}clientkey.pem
Create Client Cert Session crtsession https://server.cryptomix.com/secure client_certs=@{client_certs}
${resp}= GET On Session crtsession /
Create Client Cert Session sslsession https://github.com client_certs=@{client_certs}
Comment thread
oboehmer marked this conversation as resolved.
Outdated
${resp}= GET On Session sslsession /
Should Be Equal As Strings ${resp.status_code} 200
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
Topic :: Software Development :: Testing
"""[1:-1]

TEST_REQUIRE = ['robotframework>=3.2.1', 'pytest', 'flask', 'six', 'coverage', 'flake8']
TEST_REQUIRE = ['robotframework>=3.2.1', 'pytest', 'flask', 'six', 'coverage', 'flake8', 'Flask-HTTPAuth==4.8.0']

VERSION = None
version_file = join(dirname(abspath(__file__)), 'src', 'RequestsLibrary', 'version.py')
Expand Down
Loading