Created
May 16, 2025 08:19
-
-
Save mik9/d11ea2127768522854e27bc0cb8cb5ee to your computer and use it in GitHub Desktop.
google_apis_oauth_assertion.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import time | |
| import jwt # PyJWT library | |
| import requests # For making HTTP requests | |
| from cryptography.hazmat.primitives import serialization # For loading private key | |
| # --- Configuration --- | |
| # Replace with the actual path to your service account JSON key file | |
| SERVICE_ACCOUNT_FILE = 'Downloads/pcapi.json' | |
| # Replace with the desired scope(s) for your API access | |
| # You can find the required scopes in the Google API documentation | |
| # For example, for Google Drive API: 'https://www.googleapis.com/auth/drive' | |
| # For multiple scopes, separate them with a space: 'scope1 scope2' | |
| SCOPES = 'https://www.googleapis.com/auth/androidpublisher' # Example scope | |
| GOOGLE_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token' | |
| TOKEN_EXPIRY_SECONDS = 3600 # Token valid for 1 hour | |
| def load_service_account_creds(filepath): | |
| """Loads service account credentials from a JSON file.""" | |
| try: | |
| with open(filepath, 'r') as f: | |
| creds = json.load(f) | |
| # Basic validation of essential keys | |
| required_keys = ['type', 'project_id', 'private_key_id', 'private_key', 'client_email', 'client_id', 'token_uri'] | |
| if not all(key in creds for key in required_keys): | |
| raise ValueError("Service account JSON is missing one or more required keys.") | |
| if creds.get('type') != 'service_account': | |
| raise ValueError("JSON file does not appear to be a service account key.") | |
| return creds | |
| except FileNotFoundError: | |
| print(f"Error: Service account key file not found at {filepath}") | |
| return None | |
| except json.JSONDecodeError: | |
| print(f"Error: Could not decode JSON from {filepath}") | |
| return None | |
| except ValueError as ve: | |
| print(f"Error in service account file content: {ve}") | |
| return None | |
| def create_signed_assertion(creds, scopes): | |
| """ | |
| Creates a signed JWT assertion for Google OAuth2. | |
| Args: | |
| creds (dict): The loaded service account credentials. | |
| scopes (str): The space-separated scopes for which to request access. | |
| Returns: | |
| str: The signed JWT assertion, or None if an error occurs. | |
| """ | |
| if not creds: | |
| return None | |
| issued_at = int(time.time()) | |
| expires_at = issued_at + TOKEN_EXPIRY_SECONDS | |
| # JWT payload | |
| payload = { | |
| 'iss': creds['client_email'], # Issuer: service account email | |
| 'scope': scopes, # Scopes | |
| 'aud': GOOGLE_TOKEN_URI, # Audience: Google OAuth2 token endpoint | |
| 'exp': expires_at, # Expiration time | |
| 'iat': issued_at # Issued at time | |
| } | |
| # The private key from the service account JSON | |
| private_key_pem = creds['private_key'] | |
| try: | |
| # Load the private key. | |
| # The key is expected to be in PEM format. | |
| # PyJWT uses cryptography library for RSA signing. | |
| private_key = serialization.load_pem_private_key( | |
| private_key_pem.encode('utf-8'), | |
| password=None, # No password if the key is not encrypted | |
| ) | |
| # Sign the JWT | |
| # The 'RS256' algorithm is standard for Google service accounts | |
| signed_jwt = jwt.encode( | |
| payload, | |
| private_key, | |
| algorithm='RS256', | |
| headers={'kid': creds['private_key_id']} # Key ID from credentials | |
| ) | |
| return signed_jwt | |
| except Exception as e: | |
| print(f"Error creating signed JWT: {e}") | |
| return None | |
| def exchange_assertion_for_token(signed_assertion): | |
| """ | |
| Exchanges the signed JWT assertion for an access token. | |
| Args: | |
| signed_assertion (str): The signed JWT. | |
| Returns: | |
| dict: The JSON response from Google containing the access token, | |
| or None if an error occurs. | |
| """ | |
| if not signed_assertion: | |
| return None | |
| headers = { | |
| 'Content-Type': 'application/x-www-form-urlencoded' | |
| } | |
| data = { | |
| 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', | |
| 'assertion': signed_assertion | |
| } | |
| try: | |
| response = requests.post(GOOGLE_TOKEN_URI, headers=headers, data=data) | |
| response.raise_for_status() # Raises an HTTPError for bad responses (4XX or 5XX) | |
| return response.json() | |
| except requests.exceptions.HTTPError as http_err: | |
| print(f"HTTP error occurred: {http_err}") | |
| print(f"Response content: {response.content.decode()}") | |
| return None | |
| except requests.exceptions.RequestException as req_err: | |
| print(f"Request error occurred: {req_err}") | |
| return None | |
| except json.JSONDecodeError: | |
| print("Error: Could not decode JSON response from token endpoint.") | |
| print(f"Response content: {response.text}") # Log raw response for debugging | |
| return None | |
| if __name__ == '__main__': | |
| print("Attempting to get Google API access token using service account credentials...") | |
| # 1. Load credentials | |
| print(f"Loading service account credentials from: {SERVICE_ACCOUNT_FILE}") | |
| credentials = load_service_account_creds(SERVICE_ACCOUNT_FILE) | |
| if credentials: | |
| print("Credentials loaded successfully.") | |
| # 2. Create signed assertion | |
| print(f"Creating signed assertion for scopes: '{SCOPES}'") | |
| assertion = create_signed_assertion(credentials, SCOPES) | |
| if assertion: | |
| print(f"\nSigned JWT Assertion:\n{assertion}\n") # Uncomment to see the assertion | |
| else: | |
| print("\n--- Failed to create signed assertion ---") | |
| else: | |
| print("\n--- Failed to load service account credentials ---") | |
| print("Please ensure SERVICE_ACCOUNT_FILE path is correct and the file is valid.") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment