Skip to content

Instantly share code, notes, and snippets.

@snyderra
Created October 9, 2024 12:54
Show Gist options
  • Select an option

  • Save snyderra/144f21dd61bae49d807444c376bd4eb0 to your computer and use it in GitHub Desktop.

Select an option

Save snyderra/144f21dd61bae49d807444c376bd4eb0 to your computer and use it in GitHub Desktop.
import os
# os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
from flask import Flask, request, jsonify
from requests_oauthlib import OAuth2Session
import uuid
import time
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import BestAvailableEncryption
from datetime import datetime, timedelta
from functools import wraps
import logging
import sys
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key'
@app.before_request
def log_request_headers():
print("Request:")
print(request.headers)
print()
print(request.get_data(as_text=True))
@app.after_request
def log_response(response):
print("Response:")
print(response.status)
print()
print(response.headers)
print()
print(response.get_data(as_text=True))
return response
# In-memory storage for device codes
device_codes = {}
def require_authentication(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Check if the request has an access token
access_token = request.headers.get('Authorization')
if access_token:
# Verify the access token against the device codes
for device_code, data in device_codes.items():
if data.get('access_token') == access_token:
# Authentication successful, proceed with the decorated function
return func(*args, **kwargs)
# Authentication failed, return an error response
return jsonify({'error': 'authentication failed'}), 401
return wrapper
@app.route('/activate_form', methods=['GET'])
def activate_form():
return '''
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Responsive Form</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 0;
padding: 0;
}
form {
max-width: 100%;
margin: 20px auto;
padding: 15px;
box-sizing: border-box;
}
label, input {
display: block;
width: 100%;
margin-bottom: 10px;
}
input[type="text"], input[type="submit"] {
padding: 10px;
font-size: 16px;
box-sizing: border-box;
}
input[type="submit"] {
background-color: #007bff;
color: white;
border: none;
cursor: pointer;
}
input[type="submit"]:hover {
background-color: #0056b3;
}
</style>
</head>
<body>
<form method="POST" action="/activate">
<label for="user_code">User Code:</label>
<input type="text" id="user_code" name="user_code">
<input type="submit" value="Activate">
</form>
</body>
</html>
'''
@app.route('/oauth/device_authorization', methods=['POST'])
def device_authorization():
client_id = request.form.get('client_id')
scope = request.form.get('scope')
# Generate device and user codes
device_code = str(uuid.uuid4())
user_code = str(uuid.uuid4())[:4]
# Store the device code with an expiration time
device_codes[device_code] = {
'client_id': client_id,
'user_code': user_code,
'scope': scope,
'expires_in': 600, # 10 minutes
'created_at': time.time(),
'authorized': False
}
response = {
'device_code': device_code,
'user_code': user_code,
'verification_uri': 'https://test.example.com:8443/activate_form',
'expires_in': 600
}
return jsonify(response)
@app.route('/activate', methods=['POST'])
def activate():
user_code = request.form.get('user_code')
# Find the device code associated with the user code
for device_code, data in device_codes.items():
if data['user_code'] == user_code:
data['authorized'] = True
return jsonify({'status': 'authorized'})
return jsonify({'status': 'invalid user code'}), 400
@app.route('/oauth/token', methods=['POST'])
def token():
# Get the request headers
headers = request.headers
print(headers)
print(request.form)
grant_type = request.form.get('grant_type')
if grant_type.endswith('device_code'):
device_code = request.form.get('device_code')
# Check if the device code is valid and authorized
if device_code in device_codes:
data = device_codes[device_code]
if data['authorized']:
# Generate an access token
access_token = data['access_token'] = str(uuid.uuid4())
refresh_token = data['refresh_token'] = str(uuid.uuid4())
return jsonify({'access_token': access_token, 'token_type': 'bearer', 'expires_in': 3600, 'refresh_token': refresh_token})
else:
return jsonify({'error': 'authorization_pending'}), 400
if grant_type.endswith('refresh_token'):
refresh_token = request.form.get('refresh_token')
for device_code, data in device_codes.items():
if data['refresh_token'] == refresh_token:
access_token = data['access_token'] = str(uuid.uuid4())
refresh_token = data['refresh_token'] = str(uuid.uuid4())
return jsonify({'access_token': access_token, 'token_type': 'bearer', 'expires_in': 3600, 'refresh_token': refresh_token})
else:
return jsonify({'error': 'expired_token'}), 400
return jsonify({'error': 'unsupported_grant_type'}), 400
@app.route('/query', methods=['GET'])
@require_authentication
def query():
access_token = request.headers.get('Authorization')
if access_token:
for device_code, data in device_codes.items():
if data.get('access_token') == access_token:
return jsonify({'status': 'success'})
return jsonify({'status': 'invalid access token'}), 401
def generate_root_certificate():
# Generate a private key
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
# Create a self-signed certificate
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"OHIO"),
x509.NameAttribute(NameOID.LOCALITY_NAME, u"CITY"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Company"),
x509.NameAttribute(NameOID.COMMON_NAME, u"Root CA"),
])
certificate = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
private_key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.utcnow()
).not_valid_after(
# Certificate valid for 10 years
datetime.utcnow() + timedelta(days=3650)
).add_extension(
x509.BasicConstraints(ca=True, path_length=None),
critical=True,
).sign(private_key, hashes.SHA256())
# Save the private key and certificate to files
with open("root_private_key.pem", "wb") as f:
f.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=BestAvailableEncryption(b"passphrase"),
))
with open("root_certificate.pem", "wb") as f:
f.write(certificate.public_bytes(serialization.Encoding.PEM))
def generate_certificate():
# Load the root private key
with open("root_private_key.pem", "rb") as f:
root_private_key = serialization.load_pem_private_key(
f.read(),
password=b"passphrase"
)
# Load the root certificate
with open("root_certificate.pem", "rb") as f:
root_certificate = x509.load_pem_x509_certificate(f.read())
# Generate a private key
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
# Create a certificate signing request (CSR)
subject = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"OHIO"),
x509.NameAttribute(NameOID.LOCALITY_NAME, u"CITY"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Company"),
x509.NameAttribute(NameOID.COMMON_NAME, u"test.example.com"),
])
csr = x509.CertificateSigningRequestBuilder().subject_name(
subject
).add_extension(
x509.SubjectAlternativeName([x509.DNSName(u"localhost"),x509.DNSName(u"test.example.com")]),
critical=True,
).sign(private_key, hashes.SHA256())
# Create a certificate
certificate = x509.CertificateBuilder().subject_name(
csr.subject
).issuer_name(
root_certificate.subject
).public_key(
csr.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.utcnow()
).not_valid_after(
# Certificate valid for 1 year
datetime.utcnow() + timedelta(days=365)
).add_extension(
x509.AuthorityKeyIdentifier.from_issuer_public_key(root_certificate.public_key()),
critical=False,
).add_extension(
x509.SubjectKeyIdentifier.from_public_key(csr.public_key()),
critical=False,
).add_extension(
x509.SubjectAlternativeName(csr.extensions.get_extension_for_class(x509.SubjectAlternativeName).value),
critical=False,
).sign(root_private_key, hashes.SHA256())
# Save the private key and certificate to files
with open("private_key.pem", "wb") as f:
f.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
# encryption_algorithm=BestAvailableEncryption(b"passphrase"),
encryption_algorithm=serialization.NoEncryption(),
))
with open("certificate.pem", "wb") as f:
f.write(certificate.public_bytes(serialization.Encoding.PEM))
if __name__ == '__main__':
# Generate a self-signed certificate
if not os.path.isfile("root_certificate.pem"):
generate_root_certificate()
generate_certificate()
# Run the Flask app with the generated certificate
app.run(debug=True, host='0.0.0.0', port=8443, ssl_context=('certificate.pem', 'private_key.pem'))
#app.run(debug=True, host='0.0.0.0', port=8080)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment