import json import os import duckdb import boto3 import datetime from typing import Any, Dict def construct_prepared_sql_and_params(sql_template, bind_params): single_value_params = {k: v for k, v in bind_params.items() if not isinstance(v, list)} list_params = {k: v for k, v in bind_params.items() if isinstance(v, list)} for key in single_value_params.keys(): sql_template = sql_template.replace(f"{{{key}}}", "?") for key, values in list_params.items(): placeholders = ', '.join(['?'] * len(values)) sql_template = sql_template.replace(f"{{{key}}}", placeholders) flat_params = [] for key in bind_params: if key in single_value_params: flat_params.append(single_value_params[key]) elif key in list_params: flat_params.extend(list_params[key]) return sql_template, flat_params def serialize_dates(row): for key, value in row.items(): if isinstance(value, datetime.date): row[key] = value.strftime('%d-%m-%Y') return row def lambda_handler(event: Dict[str, Any], context): print("Duckdb Lambda received event: " + json.dumps(event, indent=4)) file_name = event.get('file_name') sql = event.get('sql') bind_params = event.get('bind_params', {}) return_type = event.get('return_type', 'List') s3 = boto3.client('s3') bucket_name = os.getenv('BUCKET_NAME') local_file_name = f"/tmp/{file_name}" os.makedirs(os.path.dirname(local_file_name), exist_ok=True) s3.download_file(bucket_name, file_name, local_file_name) conn = duckdb.connect(database=':memory:', read_only=False) prepared_sql = sql % local_file_name try: prepared_sql, flat_params = construct_prepared_sql_and_params(prepared_sql, bind_params) print(prepared_sql) print(f"flat_params -> {flat_params}") cursor = conn.execute(prepared_sql, parameters=flat_params) columns = [description[0] for description in cursor.description] fetched_results = [dict(zip(columns, row)) for row in cursor.fetchall()] fetched_results = [serialize_dates(row) for row in fetched_results] if return_type == 'Single': results = fetched_results[0] if fetched_results else {} else: results = fetched_results finally: conn.close() print("Duckdb Lambda processing completed") return results