Skip to content

Instantly share code, notes, and snippets.

@kenoir
Created March 26, 2026 09:35
Show Gist options
  • Select an option

  • Save kenoir/4df8566998ff0a76035735b0ca7d2a74 to your computer and use it in GitHub Desktop.

Select an option

Save kenoir/4df8566998ff0a76035735b0ca7d2a74 to your computer and use it in GitHub Desktop.
Performance & memory investigation tests for OAI-PMH adapter pipeline
"""Tests investigating post-HTTP-response bottlenecks in window processing.
When running the reloader, the command hangs after the first HTTP 200 response:
Processing window 2026-03-25T18:00:00+00:00 -> 2026-03-25T19:00:00+00:00
HTTP Request: GET ... "HTTP/1.1 200 OK"
<hangs here>
Between the HTTP response and the next log message, the following silent
operations execute:
1. list() materialises *all* OAI records (follows resumptionToken pagination)
2. WindowRecordWriter serialises XML and calls AdapterStore.incremental_update
3. incremental_update:
a. Scans Iceberg table for existing records (REST API → S3 I/O)
b. Calls pyiceberg get_rows_to_update — row-by-row Python comparison
c. _preserve_content_for_deletions calls .to_pylist() on existing data
d. Commits Iceberg overwrite/append (S3 write I/O)
4. WindowStore.upsert writes another Iceberg transaction
These tests time each phase with realistic data sizes to pinpoint the
bottleneck. Local SQLite Iceberg tables eliminate S3 I/O so we can isolate
CPU-bound costs from network I/O.
Memory tests measure peak resident memory during each phase. The Lambda also
uses suspiciously high memory even when there are no changes — several data
structures hold overlapping copies of the full ``content`` column in memory.
"""
from __future__ import annotations
import datetime
import sys
import time
import tracemalloc
from typing import Any
from unittest.mock import patch
from uuid import uuid1
import pyarrow as pa
import pytest
from lxml import etree
from oai_pmh_client.models import Header, Record
from pyiceberg.table import Table as IcebergTable
from pyiceberg.table.upsert_util import get_rows_to_update
from adapters.oai_pmh.record_writer import WindowRecordWriter, _serialize_metadata
from adapters.utils.adapter_store import AdapterStore
from adapters.utils.pipeline_store import PipelineStore
from adapters.utils.schemata import ADAPTER_STORE_ARROW_SCHEMA
from adapters.utils.window_store import WindowStatusRecord, WindowStore
from tests.adapters.conftest import adapter_records_to_table
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
MARC21_TEMPLATE = """\
<record xmlns="http://www.loc.gov/MARC21/slim">
<leader>00000nam a2200000 a 4500</leader>
<controlfield tag="001">{identifier}</controlfield>
<controlfield tag="005">20260101120000.0</controlfield>
<datafield tag="245" ind1="1" ind2="0">
<subfield code="a">Title for record {identifier}</subfield>
</datafield>
<datafield tag="100" ind1="1" ind2=" ">
<subfield code="a">Author, Test</subfield>
</datafield>{extra_fields}
</record>"""
def _make_extra_fields(n: int = 20) -> str:
"""Generate extra MARC datafields to produce realistic record sizes."""
parts: list[str] = []
for i in range(n):
parts.append(
f'\n <datafield tag="{650 + i % 50}" ind1="0" ind2="0">'
f"\n <subfield code=\"a\">Subject heading {i} with extra padding "
f"to make the field realistically long like a real catalogue record</subfield>"
f"\n </datafield>"
)
return "".join(parts)
def _make_oai_record(identifier: str, extra_fields: int = 20) -> Record:
"""Create an OAI-PMH Record with realistic MARC21 XML content.
Wraps the MARC21 record in a ``<metadata>`` element to match
real OAI-PMH structure — the ``Record`` validator extracts the
first child of the metadata wrapper via ``v[0]``.
"""
marc_xml = MARC21_TEMPLATE.format(
identifier=identifier,
extra_fields=_make_extra_fields(extra_fields),
)
# Wrap in <metadata> so the Record validator picks up the whole <record>
wrapper_str = f"<metadata>{marc_xml}</metadata>"
metadata_element = etree.fromstring(wrapper_str)
header = Header(
identifier=identifier,
datestamp=datetime.datetime(2026, 3, 25, 18, 0, 0, tzinfo=datetime.UTC),
setSpec=[],
status=None,
)
return Record(header=header, metadata=metadata_element)
def _make_adapter_row(
identifier: str,
extra_fields: int = 20,
namespace: str = "test_namespace",
) -> dict[str, Any]:
"""Create an adapter store row dict with realistic MARC21 content."""
xml_str = MARC21_TEMPLATE.format(
identifier=identifier,
extra_fields=_make_extra_fields(extra_fields),
)
return {
"namespace": namespace,
"id": identifier,
"content": xml_str,
"last_modified": datetime.datetime(2026, 3, 25, 12, 0, 0, tzinfo=datetime.UTC),
"deleted": False,
}
def _seed_adapter_store(
table: IcebergTable,
n_records: int,
extra_fields: int = 20,
namespace: str = "test_namespace",
) -> AdapterStore:
"""Seed an adapter store with n_records of realistic content."""
rows = [
_make_adapter_row(f"rec-{i:05d}", extra_fields, namespace)
for i in range(n_records)
]
arrow_table = pa.Table.from_pylist(rows, schema=ADAPTER_STORE_ARROW_SCHEMA)
table.append(arrow_table)
return AdapterStore(table, namespace)
# ---------------------------------------------------------------------------
# Phase 0: Record size sanity check
# ---------------------------------------------------------------------------
class TestRecordSizes:
"""Verify test records are realistic sizes (real MARC21 records ~2-20KB)."""
def test_oai_record_serialises_to_realistic_size(self) -> None:
record = _make_oai_record("rec-00001", extra_fields=20)
serialised = _serialize_metadata(record)
assert serialised is not None
size = len(serialised)
print(f"\n Single record serialised size: {size} bytes")
# Real MARC21 records with holdings are typically 2-20KB
assert size > 500, (
f"Record too small ({size} bytes) — check the Record model's "
"metadata validator is preserving the full <record> element"
)
def test_adapter_row_content_size(self) -> None:
row = _make_adapter_row("rec-00001", extra_fields=20)
size = len(row["content"])
print(f"\n Adapter row content size: {size} bytes")
assert size > 500
# ---------------------------------------------------------------------------
# Phase 1: XML serialisation
# ---------------------------------------------------------------------------
class TestXMLSerialisationCost:
"""Measure cost of serialising MARC21 records from lxml to strings."""
@pytest.mark.parametrize("n_records", [100, 500, 1000])
def test_serialise_marc21_records(self, n_records: int) -> None:
records = [_make_oai_record(f"rec-{i:05d}") for i in range(n_records)]
start = time.perf_counter()
serialised = [_serialize_metadata(r) for r in records]
elapsed = time.perf_counter() - start
total_bytes = sum(len(s) for s in serialised if s)
avg_bytes = total_bytes // n_records
print(
f"\n Serialised {n_records} records in {elapsed:.3f}s "
f"(avg {avg_bytes} bytes/record, total {total_bytes / 1024:.0f} KB)"
)
# Serialisation should be fast — this is NOT the bottleneck
assert elapsed < 5.0, f"XML serialisation took {elapsed:.1f}s for {n_records} records"
# ---------------------------------------------------------------------------
# Phase 2: get_rows_to_update (pyiceberg row-by-row comparison)
# ---------------------------------------------------------------------------
class TestGetRowsToUpdateCost:
"""Measure cost of pyiceberg's get_rows_to_update with large content.
This function does row-by-row Python comparison:
for source_idx, target_idx in zip(...):
source_val = source_table.slice(idx, 1).column(key)[0].as_py()
target_val = target_table.slice(idx, 1).column(key)[0].as_py()
if source_val != target_val: ...
For large MARC21 XML strings, each .as_py() deserialises the full content.
"""
@pytest.mark.parametrize("n_records", [100, 500, 1000])
def test_unchanged_records(self, n_records: int) -> None:
"""When all records are unchanged, every row is compared and none selected."""
rows = [_make_adapter_row(f"rec-{i:05d}") for i in range(n_records)]
table = pa.Table.from_pylist(rows, schema=ADAPTER_STORE_ARROW_SCHEMA)
# Compare content column only (what AdapterStore._compare_columns returns)
join_fields = ["namespace", "id"]
compare_cols = join_fields + ["content"]
source = table.select(compare_cols)
target = table.select(compare_cols)
start = time.perf_counter()
result = get_rows_to_update(source, target, join_fields)
elapsed = time.perf_counter() - start
print(
f"\n get_rows_to_update: {n_records} unchanged records in {elapsed:.3f}s"
)
assert result.num_rows == 0
# Flag if this is the bottleneck
if elapsed > 1.0:
print(
f" ⚠ SLOW: {elapsed:.1f}s for {n_records} records — "
"this row-by-row Python comparison is likely the bottleneck"
)
@pytest.mark.parametrize("n_records", [100, 500, 1000])
def test_all_records_changed(self, n_records: int) -> None:
"""When all records have changed content, every row is compared + selected."""
original_rows = [_make_adapter_row(f"rec-{i:05d}") for i in range(n_records)]
updated_rows = [
{**row, "content": row["content"] + "<!-- updated -->"}
for row in original_rows
]
source = pa.Table.from_pylist(updated_rows, schema=ADAPTER_STORE_ARROW_SCHEMA)
target = pa.Table.from_pylist(original_rows, schema=ADAPTER_STORE_ARROW_SCHEMA)
join_fields = ["namespace", "id"]
compare_cols = join_fields + ["content"]
start = time.perf_counter()
result = get_rows_to_update(
source.select(compare_cols), target.select(compare_cols), join_fields
)
elapsed = time.perf_counter() - start
print(
f"\n get_rows_to_update: {n_records} changed records in {elapsed:.3f}s"
)
assert result.num_rows == n_records
if elapsed > 1.0:
print(
f" ⚠ SLOW: {elapsed:.1f}s for {n_records} records — "
"row-by-row comparison is likely the bottleneck"
)
# ---------------------------------------------------------------------------
# Phase 3: Full incremental_update path
# ---------------------------------------------------------------------------
class TestIncrementalUpdateCost:
"""Measure end-to-end incremental_update time with existing data."""
@pytest.mark.parametrize("n_records", [100, 500])
def test_incremental_update_all_unchanged(
self, temporary_table: IcebergTable, n_records: int
) -> None:
"""Re-ingesting the same records should detect no changes.
Measures: Iceberg scan + get_rows_to_update + _preserve_content_for_deletions.
"""
store = _seed_adapter_store(temporary_table, n_records)
# Build identical incoming data (same content, same timestamps)
incoming_rows = [_make_adapter_row(f"rec-{i:05d}") for i in range(n_records)]
incoming = pa.Table.from_pylist(incoming_rows, schema=ADAPTER_STORE_ARROW_SCHEMA)
t0 = time.perf_counter()
result = store.incremental_update(incoming)
elapsed = time.perf_counter() - t0
print(f"\n incremental_update ({n_records} unchanged): {elapsed:.3f}s")
# No changes expected
if result is not None:
print(
f" updates={len(result.updated_record_ids)}, "
f"inserts={len(result.inserted_record_ids)}"
)
@pytest.mark.parametrize("n_records", [100, 500])
def test_incremental_update_all_new(
self, temporary_table: IcebergTable, n_records: int
) -> None:
"""Inserting records into an empty table (no comparison needed)."""
store = AdapterStore(temporary_table, "test_namespace")
incoming_rows = [_make_adapter_row(f"rec-{i:05d}") for i in range(n_records)]
incoming = pa.Table.from_pylist(incoming_rows, schema=ADAPTER_STORE_ARROW_SCHEMA)
t0 = time.perf_counter()
result = store.incremental_update(incoming)
elapsed = time.perf_counter() - t0
print(f"\n incremental_update ({n_records} new inserts): {elapsed:.3f}s")
assert result is not None
assert len(result.inserted_record_ids) == n_records
@pytest.mark.parametrize("n_records", [100, 500])
def test_incremental_update_all_changed(
self, temporary_table: IcebergTable, n_records: int
) -> None:
"""All records changed — measures full update path including overwrite."""
store = _seed_adapter_store(temporary_table, n_records)
# New data with updated content and newer timestamp
updated_rows = [
{
**_make_adapter_row(f"rec-{i:05d}"),
"content": _make_adapter_row(f"rec-{i:05d}")["content"]
+ "<!-- changed -->",
"last_modified": datetime.datetime(
2026, 3, 26, 12, 0, 0, tzinfo=datetime.UTC
),
}
for i in range(n_records)
]
incoming = pa.Table.from_pylist(updated_rows, schema=ADAPTER_STORE_ARROW_SCHEMA)
t0 = time.perf_counter()
result = store.incremental_update(incoming)
elapsed = time.perf_counter() - t0
print(f"\n incremental_update ({n_records} changed): {elapsed:.3f}s")
assert result is not None
print(
f" updates={len(result.updated_record_ids)}, "
f"inserts={len(result.inserted_record_ids)}"
)
# ---------------------------------------------------------------------------
# Phase 4: Full WindowRecordWriter callback with Iceberg
# ---------------------------------------------------------------------------
class TestWindowRecordWriterWithIceberg:
"""Measure the full record writer callback including Iceberg writes."""
@pytest.mark.parametrize("n_records", [100, 500])
def test_record_writer_empty_table(
self, temporary_table: IcebergTable, n_records: int
) -> None:
"""First harvest — all records are inserts."""
store = AdapterStore(temporary_table, "test_namespace")
writer = WindowRecordWriter(
namespace="test_namespace",
table_client=store,
job_id="perf-test",
window_range="2026-03-25T18:00:00Z-2026-03-25T19:00:00Z",
)
records: list[tuple[str, Record]] = [
(f"rec-{i:05d}", _make_oai_record(f"rec-{i:05d}"))
for i in range(n_records)
]
t0 = time.perf_counter()
result = writer(records)
elapsed = time.perf_counter() - t0
print(f"\n WindowRecordWriter ({n_records} records, empty table): {elapsed:.3f}s")
assert "changeset_id" in result["tags"]
@pytest.mark.parametrize("n_records", [100, 500])
def test_record_writer_with_existing_unchanged_data(
self, temporary_table: IcebergTable, n_records: int
) -> None:
"""Re-harvest — all records already exist and are unchanged."""
store = _seed_adapter_store(temporary_table, n_records)
writer = WindowRecordWriter(
namespace="test_namespace",
table_client=store,
job_id="perf-test",
window_range="2026-03-25T18:00:00Z-2026-03-25T19:00:00Z",
)
# Re-send the same records
records: list[tuple[str, Record]] = [
(f"rec-{i:05d}", _make_oai_record(f"rec-{i:05d}"))
for i in range(n_records)
]
t0 = time.perf_counter()
result = writer(records)
elapsed = time.perf_counter() - t0
print(
f"\n WindowRecordWriter ({n_records} records, all existing/unchanged): {elapsed:.3f}s"
)
# This exercises the full get_rows_to_update path — likely the slowest case
if elapsed > 2.0:
print(
f" ⚠ SLOW: {elapsed:.1f}s — the incremental_update comparison "
"is likely the bottleneck when records haven't changed"
)
# ---------------------------------------------------------------------------
# Breakdown: time each sub-step of incremental_update individually
# ---------------------------------------------------------------------------
class TestIncrementalUpdateBreakdown:
"""Time each sub-step of incremental_update to find the exact bottleneck."""
def test_breakdown_with_500_existing_records(
self, temporary_table: IcebergTable
) -> None:
n_records = 500
store = _seed_adapter_store(temporary_table, n_records)
incoming_rows = [_make_adapter_row(f"rec-{i:05d}") for i in range(n_records)]
new_data = pa.Table.from_pylist(incoming_rows, schema=ADAPTER_STORE_ARROW_SCHEMA)
new_data = store.normalise_table(new_data)
# Step 1: Extract IDs and scan existing records
t0 = time.perf_counter()
from pyiceberg.expressions import In
new_ids_filter = In("id", store._extract_ids(new_data))
existing_data = store.get_namespace_records(new_ids_filter)
t_scan = time.perf_counter() - t0
# Step 2: Find updates (get_rows_to_update)
t0 = time.perf_counter()
updates = store._find_updates(existing_data, new_data)
t_find_updates = time.perf_counter() - t0
# Step 3: Filter by timestamp
t0 = time.perf_counter()
updates = store._filter_updates_by_timestamp(updates, existing_data)
t_filter_ts = time.perf_counter() - t0
# Step 4: Preserve content for deletions
t0 = time.perf_counter()
updates = store._transform_incremental_updates(updates, existing_data)
t_preserve = time.perf_counter() - t0
# Step 5: Find inserts
t0 = time.perf_counter()
inserts = store._find_inserts(existing_data, new_data)
t_inserts = time.perf_counter() - t0
# Step 6: Commit changeset
t0 = time.perf_counter()
result = store._commit_changeset(
updates if updates.num_rows > 0 else None,
inserts if inserts.num_rows > 0 else None,
)
t_commit = time.perf_counter() - t0
total = t_scan + t_find_updates + t_filter_ts + t_preserve + t_inserts + t_commit
print(f"\n === incremental_update breakdown ({n_records} records) ===")
print(f" 1. Iceberg scan (get_namespace_records): {t_scan:.3f}s")
print(f" 2. Find updates (get_rows_to_update): {t_find_updates:.3f}s")
print(f" 3. Filter by timestamp: {t_filter_ts:.3f}s")
print(f" 4. Preserve content for deletions: {t_preserve:.3f}s")
print(f" 5. Find inserts: {t_inserts:.3f}s")
print(f" 6. Commit changeset: {t_commit:.3f}s")
print(f" TOTAL: {total:.3f}s")
# Identify the bottleneck
steps = {
"Iceberg scan": t_scan,
"get_rows_to_update": t_find_updates,
"Filter timestamp": t_filter_ts,
"Preserve deletions": t_preserve,
"Find inserts": t_inserts,
"Commit changeset": t_commit,
}
slowest = max(steps, key=steps.get) # type: ignore[arg-type]
print(f"\n ➜ Slowest step: {slowest} ({steps[slowest]:.3f}s)")
# ---------------------------------------------------------------------------
# Phase 5: Memory profiling
# ---------------------------------------------------------------------------
def _mb(nbytes: int) -> float:
return nbytes / (1024 * 1024)
class TestMemoryUsage:
"""Measure memory consumed by each phase of the window processing pipeline.
The Lambda reportedly uses high memory even when there are no changes.
These tests identify which data structures contribute.
"""
def test_get_namespace_records_loads_full_content(
self,
temporary_table: IcebergTable,
) -> None:
"""get_namespace_records returns ALL columns including content.
incremental_update calls this to load existing data. Even though
_find_updates only compares ["namespace", "id", "content"],
the full table (with changeset, last_modified, deleted) is loaded
and kept alive for _preserve_content_for_deletions and
_filter_updates_by_timestamp.
"""
n_records = 500
store = _seed_adapter_store(temporary_table, n_records)
tracemalloc.start()
from pyiceberg.expressions import In
ids = [f"rec-{i:05d}" for i in range(n_records)]
existing = store.get_namespace_records(In("id", ids))
snapshot = tracemalloc.take_snapshot()
tracemalloc.stop()
# Memory for the Arrow table itself
arrow_bytes = existing.nbytes
# Memory as reported by tracemalloc (includes Python overhead)
total_traced = sum(stat.size for stat in snapshot.statistics("filename"))
print(f"\n get_namespace_records ({n_records} records):")
print(f" Arrow table nbytes: {_mb(arrow_bytes):.2f} MB")
print(f" Columns: {existing.column_names}")
print(f" Content column bytes: {_mb(existing.column('content').nbytes):.2f} MB")
print(f" tracemalloc total: {_mb(total_traced):.2f} MB")
def test_preserve_content_materialises_to_pylist(
self,
temporary_table: IcebergTable,
) -> None:
"""_preserve_content_for_deletions calls existing_data.to_pylist().
This converts the *entire* Arrow table to Python dicts in memory,
duplicating all content strings. Even when there are zero deletions,
it still builds the lookup dict from to_pylist().
"""
n_records = 500
store = _seed_adapter_store(temporary_table, n_records)
from pyiceberg.expressions import In
ids = [f"rec-{i:05d}" for i in range(n_records)]
existing = store.get_namespace_records(In("id", ids))
# Simulate an update set with no deletions (typical "no changes" case)
# _preserve_content_for_deletions is still called and builds a full dict
updates = existing.slice(0, 0) # Empty, but existing_data is still loaded
tracemalloc.start()
result1 = store._transform_incremental_updates(updates, existing)
snap1 = tracemalloc.take_snapshot()
tracemalloc.stop()
# Now with a non-empty updates set (forces the to_pylist() path)
updates_nonempty = existing.slice(0, min(10, n_records))
tracemalloc.start()
result2 = store._transform_incremental_updates(updates_nonempty, existing)
snap2 = tracemalloc.take_snapshot()
tracemalloc.stop()
mem1 = sum(stat.size for stat in snap1.statistics("filename"))
mem2 = sum(stat.size for stat in snap2.statistics("filename"))
print(f"\n _preserve_content_for_deletions ({n_records} existing):")
print(f" Empty updates → memory: {_mb(mem1):.2f} MB")
print(f" 10 updates → memory: {_mb(mem2):.2f} MB (to_pylist() activated)")
print(f" existing Arrow nbytes: {_mb(existing.nbytes):.2f} MB")
def test_load_status_map_full_table_scan(
self,
temporary_window_status_table: IcebergTable,
) -> None:
"""load_status_map scans the ENTIRE window_status table with no filters.
Each row contains a record_ids list that can hold hundreds of IDs.
Over months of 15-minute windows this table grows large:
- 96 windows/day × 365 days ≈ 35,000 rows
- Each row with ~100 record_ids ≈ 3.5M string entries
"""
store = WindowStore(temporary_window_status_table)
# Simulate several weeks of window history
n_windows = 500
records_per_window = 50
base = datetime.datetime(2026, 1, 1, tzinfo=datetime.UTC)
for i in range(n_windows):
start = base + datetime.timedelta(minutes=15 * i)
end = start + datetime.timedelta(minutes=15)
record_ids = tuple(f"rec-{j:05d}" for j in range(records_per_window))
store.upsert(
WindowStatusRecord(
window_key=f"{start.isoformat()}_{end.isoformat()}",
window_start=start,
window_end=end,
state="success",
attempts=1,
last_error=None,
record_ids=record_ids,
updated_at=end,
tags={"job_id": "test"},
)
)
tracemalloc.start()
status_map = store.load_status_map()
snapshot = tracemalloc.take_snapshot()
tracemalloc.stop()
total_mem = sum(stat.size for stat in snapshot.statistics("filename"))
total_record_ids = sum(
len(row.get("record_ids", [])) for row in status_map.values()
)
print(f"\n load_status_map ({n_windows} windows, {records_per_window} IDs each):")
print(f" Windows loaded: {len(status_map)}")
print(f" Total record_ids: {total_record_ids}")
print(f" tracemalloc total: {_mb(total_mem):.2f} MB")
def test_incremental_update_memory_no_changes(
self,
temporary_table: IcebergTable,
) -> None:
"""Measure total memory for incremental_update when nothing has changed.
This is the "suspiciously high memory" case. The pipeline:
1. Loads existing records (full content) into Arrow table
2. Builds projected copies for comparison
3. get_rows_to_update materialises rows into Python objects
4. _preserve_content_for_deletions may call to_pylist()
Multiple overlapping copies of the content exist simultaneously.
"""
n_records = 500
store = _seed_adapter_store(temporary_table, n_records)
incoming_rows = [_make_adapter_row(f"rec-{i:05d}") for i in range(n_records)]
incoming = pa.Table.from_pylist(incoming_rows, schema=ADAPTER_STORE_ARROW_SCHEMA)
content_bytes = incoming.column("content").nbytes
tracemalloc.start()
before = tracemalloc.take_snapshot()
result = store.incremental_update(incoming)
after = tracemalloc.take_snapshot()
tracemalloc.stop()
peak = _mb(after.statistics("filename")[0].size) if after.statistics("filename") else 0
diff_stats = after.compare_to(before, "lineno")
total_alloc = sum(s.size_diff for s in diff_stats if s.size_diff > 0)
print(f"\n incremental_update memory ({n_records} records, NO changes):")
print(f" Content column size: {_mb(content_bytes):.2f} MB")
print(f" New allocations during update: {_mb(total_alloc):.2f} MB")
print(f" Ratio (alloc / content): {total_alloc / content_bytes:.1f}x")
if total_alloc > content_bytes * 3:
print(
f" ⚠ HIGH MEMORY: allocated {total_alloc / content_bytes:.1f}x "
"the content size — multiple copies in memory"
)
def test_concurrent_data_copies_in_incremental_update(
self,
temporary_table: IcebergTable,
) -> None:
"""Count how many simultaneous copies of content data exist.
During incremental_update the following all coexist in memory:
- new_data Arrow table (incoming records with content)
- existing_data Arrow table (from Iceberg scan, full content)
- new_projected (select ["namespace", "id", "content"])
- existing_projected (select ["namespace", "id", "content"])
- get_rows_to_update internal: Python strings from .as_py()
This test instruments the data to count content column copies.
"""
n_records = 200
store = _seed_adapter_store(temporary_table, n_records)
incoming_rows = [_make_adapter_row(f"rec-{i:05d}") for i in range(n_records)]
new_data = pa.Table.from_pylist(incoming_rows, schema=ADAPTER_STORE_ARROW_SCHEMA)
new_data = store.normalise_table(new_data)
from pyiceberg.expressions import In
new_ids_filter = In("id", store._extract_ids(new_data))
existing_data = store.get_namespace_records(new_ids_filter)
# At this point we already have two full copies of content in Arrow tables
new_content_bytes = new_data.column("content").nbytes
existing_content_bytes = existing_data.column("content").nbytes
# _find_updates creates projected copies
join_fields = ["namespace", "id"]
compare_cols = join_fields + ["content"]
new_projected = new_data.select(compare_cols)
existing_projected = existing_data.select(compare_cols)
# Arrow select() shares memory (zero-copy) — verify this
# by checking the buffers are the same objects
new_proj_bytes = new_projected.column("content").nbytes
exist_proj_bytes = existing_projected.column("content").nbytes
print(f"\n Content column copies ({n_records} records):")
print(f" new_data content: {_mb(new_content_bytes):.2f} MB")
print(f" existing_data content: {_mb(existing_content_bytes):.2f} MB")
print(f" new_projected content: {_mb(new_proj_bytes):.2f} MB")
print(f" existing_projected content:{_mb(exist_proj_bytes):.2f} MB")
# get_rows_to_update does row-by-row .as_py() — measure the cost
tracemalloc.start()
_ = get_rows_to_update(new_projected, existing_projected, join_fields)
snap = tracemalloc.take_snapshot()
tracemalloc.stop()
comparison_mem = sum(s.size for s in snap.statistics("filename"))
print(f" get_rows_to_update alloc: {_mb(comparison_mem):.2f} MB")
total_content = new_content_bytes + existing_content_bytes
print(f"\n Total content in memory: {_mb(total_content):.2f} MB")
print(f" (2x because both new + existing are loaded simultaneously)")
def test_window_status_record_ids_memory(
self,
temporary_window_status_table: IcebergTable,
) -> None:
"""Measure memory used by record_ids lists in WindowStatusRecords.
Each upsert writes the full record_ids tuple. Over time the
window_status table accumulates record_ids for every window ever
processed. load_status_map loads all of them into Python.
"""
store = WindowStore(temporary_window_status_table)
# Simulate a realistic window with many records
record_counts = [50, 100, 200, 500]
for count in record_counts:
base = datetime.datetime(2026, 1, 1, tzinfo=datetime.UTC)
start = base
end = base + datetime.timedelta(minutes=15)
record_ids = tuple(f"oai:folio:rec-{j:05d}" for j in range(count))
# Measure size of one WindowStatusRecord's record_ids in Python
ids_size = sys.getsizeof(record_ids)
ids_size += sum(sys.getsizeof(rid) for rid in record_ids)
print(f" record_ids({count}): {_mb(ids_size):.3f} MB per window")
# For 35,000 windows (1 year of 15-min windows) with avg 100 IDs:
est_windows = 35_000
avg_ids = 100
avg_id_len = len("oai:folio:rec-00000")
est_bytes = est_windows * avg_ids * (avg_id_len + sys.getsizeof(""))
print(f"\n Estimated memory for {est_windows} windows × {avg_ids} IDs: "
f"{_mb(est_bytes):.0f} MB")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment