Created
October 9, 2024 12:54
-
-
Save snyderra/144f21dd61bae49d807444c376bd4eb0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| # os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1' | |
| from flask import Flask, request, jsonify | |
| from requests_oauthlib import OAuth2Session | |
| import uuid | |
| import time | |
| from cryptography import x509 | |
| from cryptography.x509.oid import NameOID | |
| from cryptography.hazmat.primitives import hashes, serialization | |
| from cryptography.hazmat.primitives.asymmetric import rsa | |
| from cryptography.hazmat.primitives.serialization import BestAvailableEncryption | |
| from datetime import datetime, timedelta | |
| from functools import wraps | |
| import logging | |
| import sys | |
| app = Flask(__name__) | |
| app.config['SECRET_KEY'] = 'your_secret_key' | |
| @app.before_request | |
| def log_request_headers(): | |
| print("Request:") | |
| print(request.headers) | |
| print() | |
| print(request.get_data(as_text=True)) | |
| @app.after_request | |
| def log_response(response): | |
| print("Response:") | |
| print(response.status) | |
| print() | |
| print(response.headers) | |
| print() | |
| print(response.get_data(as_text=True)) | |
| return response | |
| # In-memory storage for device codes | |
| device_codes = {} | |
| def require_authentication(func): | |
| @wraps(func) | |
| def wrapper(*args, **kwargs): | |
| # Check if the request has an access token | |
| access_token = request.headers.get('Authorization') | |
| if access_token: | |
| # Verify the access token against the device codes | |
| for device_code, data in device_codes.items(): | |
| if data.get('access_token') == access_token: | |
| # Authentication successful, proceed with the decorated function | |
| return func(*args, **kwargs) | |
| # Authentication failed, return an error response | |
| return jsonify({'error': 'authentication failed'}), 401 | |
| return wrapper | |
| @app.route('/activate_form', methods=['GET']) | |
| def activate_form(): | |
| return ''' | |
| <!DOCTYPE html> | |
| <html lang="en"> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <title>Responsive Form</title> | |
| <style> | |
| body { | |
| font-family: Arial, sans-serif; | |
| margin: 0; | |
| padding: 0; | |
| } | |
| form { | |
| max-width: 100%; | |
| margin: 20px auto; | |
| padding: 15px; | |
| box-sizing: border-box; | |
| } | |
| label, input { | |
| display: block; | |
| width: 100%; | |
| margin-bottom: 10px; | |
| } | |
| input[type="text"], input[type="submit"] { | |
| padding: 10px; | |
| font-size: 16px; | |
| box-sizing: border-box; | |
| } | |
| input[type="submit"] { | |
| background-color: #007bff; | |
| color: white; | |
| border: none; | |
| cursor: pointer; | |
| } | |
| input[type="submit"]:hover { | |
| background-color: #0056b3; | |
| } | |
| </style> | |
| </head> | |
| <body> | |
| <form method="POST" action="/activate"> | |
| <label for="user_code">User Code:</label> | |
| <input type="text" id="user_code" name="user_code"> | |
| <input type="submit" value="Activate"> | |
| </form> | |
| </body> | |
| </html> | |
| ''' | |
| @app.route('/oauth/device_authorization', methods=['POST']) | |
| def device_authorization(): | |
| client_id = request.form.get('client_id') | |
| scope = request.form.get('scope') | |
| # Generate device and user codes | |
| device_code = str(uuid.uuid4()) | |
| user_code = str(uuid.uuid4())[:4] | |
| # Store the device code with an expiration time | |
| device_codes[device_code] = { | |
| 'client_id': client_id, | |
| 'user_code': user_code, | |
| 'scope': scope, | |
| 'expires_in': 600, # 10 minutes | |
| 'created_at': time.time(), | |
| 'authorized': False | |
| } | |
| response = { | |
| 'device_code': device_code, | |
| 'user_code': user_code, | |
| 'verification_uri': 'https://test.example.com:8443/activate_form', | |
| 'expires_in': 600 | |
| } | |
| return jsonify(response) | |
| @app.route('/activate', methods=['POST']) | |
| def activate(): | |
| user_code = request.form.get('user_code') | |
| # Find the device code associated with the user code | |
| for device_code, data in device_codes.items(): | |
| if data['user_code'] == user_code: | |
| data['authorized'] = True | |
| return jsonify({'status': 'authorized'}) | |
| return jsonify({'status': 'invalid user code'}), 400 | |
| @app.route('/oauth/token', methods=['POST']) | |
| def token(): | |
| # Get the request headers | |
| headers = request.headers | |
| print(headers) | |
| print(request.form) | |
| grant_type = request.form.get('grant_type') | |
| if grant_type.endswith('device_code'): | |
| device_code = request.form.get('device_code') | |
| # Check if the device code is valid and authorized | |
| if device_code in device_codes: | |
| data = device_codes[device_code] | |
| if data['authorized']: | |
| # Generate an access token | |
| access_token = data['access_token'] = str(uuid.uuid4()) | |
| refresh_token = data['refresh_token'] = str(uuid.uuid4()) | |
| return jsonify({'access_token': access_token, 'token_type': 'bearer', 'expires_in': 3600, 'refresh_token': refresh_token}) | |
| else: | |
| return jsonify({'error': 'authorization_pending'}), 400 | |
| if grant_type.endswith('refresh_token'): | |
| refresh_token = request.form.get('refresh_token') | |
| for device_code, data in device_codes.items(): | |
| if data['refresh_token'] == refresh_token: | |
| access_token = data['access_token'] = str(uuid.uuid4()) | |
| refresh_token = data['refresh_token'] = str(uuid.uuid4()) | |
| return jsonify({'access_token': access_token, 'token_type': 'bearer', 'expires_in': 3600, 'refresh_token': refresh_token}) | |
| else: | |
| return jsonify({'error': 'expired_token'}), 400 | |
| return jsonify({'error': 'unsupported_grant_type'}), 400 | |
| @app.route('/query', methods=['GET']) | |
| @require_authentication | |
| def query(): | |
| access_token = request.headers.get('Authorization') | |
| if access_token: | |
| for device_code, data in device_codes.items(): | |
| if data.get('access_token') == access_token: | |
| return jsonify({'status': 'success'}) | |
| return jsonify({'status': 'invalid access token'}), 401 | |
| def generate_root_certificate(): | |
| # Generate a private key | |
| private_key = rsa.generate_private_key( | |
| public_exponent=65537, | |
| key_size=2048, | |
| ) | |
| # Create a self-signed certificate | |
| subject = issuer = x509.Name([ | |
| x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"), | |
| x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"OHIO"), | |
| x509.NameAttribute(NameOID.LOCALITY_NAME, u"CITY"), | |
| x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Company"), | |
| x509.NameAttribute(NameOID.COMMON_NAME, u"Root CA"), | |
| ]) | |
| certificate = x509.CertificateBuilder().subject_name( | |
| subject | |
| ).issuer_name( | |
| issuer | |
| ).public_key( | |
| private_key.public_key() | |
| ).serial_number( | |
| x509.random_serial_number() | |
| ).not_valid_before( | |
| datetime.utcnow() | |
| ).not_valid_after( | |
| # Certificate valid for 10 years | |
| datetime.utcnow() + timedelta(days=3650) | |
| ).add_extension( | |
| x509.BasicConstraints(ca=True, path_length=None), | |
| critical=True, | |
| ).sign(private_key, hashes.SHA256()) | |
| # Save the private key and certificate to files | |
| with open("root_private_key.pem", "wb") as f: | |
| f.write(private_key.private_bytes( | |
| encoding=serialization.Encoding.PEM, | |
| format=serialization.PrivateFormat.TraditionalOpenSSL, | |
| encryption_algorithm=BestAvailableEncryption(b"passphrase"), | |
| )) | |
| with open("root_certificate.pem", "wb") as f: | |
| f.write(certificate.public_bytes(serialization.Encoding.PEM)) | |
| def generate_certificate(): | |
| # Load the root private key | |
| with open("root_private_key.pem", "rb") as f: | |
| root_private_key = serialization.load_pem_private_key( | |
| f.read(), | |
| password=b"passphrase" | |
| ) | |
| # Load the root certificate | |
| with open("root_certificate.pem", "rb") as f: | |
| root_certificate = x509.load_pem_x509_certificate(f.read()) | |
| # Generate a private key | |
| private_key = rsa.generate_private_key( | |
| public_exponent=65537, | |
| key_size=2048, | |
| ) | |
| # Create a certificate signing request (CSR) | |
| subject = x509.Name([ | |
| x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"), | |
| x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"OHIO"), | |
| x509.NameAttribute(NameOID.LOCALITY_NAME, u"CITY"), | |
| x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Company"), | |
| x509.NameAttribute(NameOID.COMMON_NAME, u"test.example.com"), | |
| ]) | |
| csr = x509.CertificateSigningRequestBuilder().subject_name( | |
| subject | |
| ).add_extension( | |
| x509.SubjectAlternativeName([x509.DNSName(u"localhost"),x509.DNSName(u"test.example.com")]), | |
| critical=True, | |
| ).sign(private_key, hashes.SHA256()) | |
| # Create a certificate | |
| certificate = x509.CertificateBuilder().subject_name( | |
| csr.subject | |
| ).issuer_name( | |
| root_certificate.subject | |
| ).public_key( | |
| csr.public_key() | |
| ).serial_number( | |
| x509.random_serial_number() | |
| ).not_valid_before( | |
| datetime.utcnow() | |
| ).not_valid_after( | |
| # Certificate valid for 1 year | |
| datetime.utcnow() + timedelta(days=365) | |
| ).add_extension( | |
| x509.AuthorityKeyIdentifier.from_issuer_public_key(root_certificate.public_key()), | |
| critical=False, | |
| ).add_extension( | |
| x509.SubjectKeyIdentifier.from_public_key(csr.public_key()), | |
| critical=False, | |
| ).add_extension( | |
| x509.SubjectAlternativeName(csr.extensions.get_extension_for_class(x509.SubjectAlternativeName).value), | |
| critical=False, | |
| ).sign(root_private_key, hashes.SHA256()) | |
| # Save the private key and certificate to files | |
| with open("private_key.pem", "wb") as f: | |
| f.write(private_key.private_bytes( | |
| encoding=serialization.Encoding.PEM, | |
| format=serialization.PrivateFormat.TraditionalOpenSSL, | |
| # encryption_algorithm=BestAvailableEncryption(b"passphrase"), | |
| encryption_algorithm=serialization.NoEncryption(), | |
| )) | |
| with open("certificate.pem", "wb") as f: | |
| f.write(certificate.public_bytes(serialization.Encoding.PEM)) | |
| if __name__ == '__main__': | |
| # Generate a self-signed certificate | |
| if not os.path.isfile("root_certificate.pem"): | |
| generate_root_certificate() | |
| generate_certificate() | |
| # Run the Flask app with the generated certificate | |
| app.run(debug=True, host='0.0.0.0', port=8443, ssl_context=('certificate.pem', 'private_key.pem')) | |
| #app.run(debug=True, host='0.0.0.0', port=8080) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment