Skip to content

Instantly share code, notes, and snippets.

@rjtshrm
Last active July 17, 2025 20:14
Show Gist options
  • Select an option

  • Save rjtshrm/4b004248611a44d3d8b26d9171e37cff to your computer and use it in GitHub Desktop.

Select an option

Save rjtshrm/4b004248611a44d3d8b26d9171e37cff to your computer and use it in GitHub Desktop.
import time
from pathlib import Path
import airflow.logging_config
import airflow.sdk.log
import structlog
from uuid6 import UUID
from airflow.sdk.api.datamodels._generated import TaskInstance
print(airflow.logging_config.REMOTE_TASK_LOG)
logger = structlog.get_logger()
processors = structlog.get_config()["processors"]
old_processors = processors.copy()
try:
procs, _ = airflow.sdk.log.logging_processors(enable_pretty_log=False)
processors.clear()
processors.extend(procs)
def drop(*args):
raise structlog.DropEvent()
processors[-1] = drop
log_path = Path("/opt/airflow/logs/")
task_log_file = log_path / f"log_{time.time()}.txt"
task_log_file.touch()
structlog.configure(
processors=processors,
logger_factory=structlog.PrintLoggerFactory(file=task_log_file.open("w+")),
)
log = structlog.get_logger()
log.info("Hi", foo="bar")
# Flush all the logs
airflow.logging_config.REMOTE_TASK_LOG.close()
task_log_file.unlink()
finally:
# remove LogCapture and restore original processors
processors.clear()
processors.extend(old_processors)
structlog.configure(processors=old_processors)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment