Created
March 26, 2026 09:35
-
-
Save kenoir/60d664da315ef14368ad55c5bb631a5c to your computer and use it in GitHub Desktop.
Performance & memory investigation tests for OAI-PMH adapter pipeline
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Tests investigating post-HTTP-response bottlenecks in window processing. | |
| When running the reloader, the command hangs after the first HTTP 200 response: | |
| Processing window 2026-03-25T18:00:00+00:00 -> 2026-03-25T19:00:00+00:00 | |
| HTTP Request: GET ... "HTTP/1.1 200 OK" | |
| <hangs here> | |
| Between the HTTP response and the next log message, the following silent | |
| operations execute: | |
| 1. list() materialises *all* OAI records (follows resumptionToken pagination) | |
| 2. WindowRecordWriter serialises XML and calls AdapterStore.incremental_update | |
| 3. incremental_update: | |
| a. Scans Iceberg table for existing records (REST API → S3 I/O) | |
| b. Calls pyiceberg get_rows_to_update — row-by-row Python comparison | |
| c. _preserve_content_for_deletions calls .to_pylist() on existing data | |
| d. Commits Iceberg overwrite/append (S3 write I/O) | |
| 4. WindowStore.upsert writes another Iceberg transaction | |
| These tests time each phase with realistic data sizes to pinpoint the | |
| bottleneck. Local SQLite Iceberg tables eliminate S3 I/O so we can isolate | |
| CPU-bound costs from network I/O. | |
| Memory tests measure peak resident memory during each phase. The Lambda also | |
| uses suspiciously high memory even when there are no changes — several data | |
| structures hold overlapping copies of the full ``content`` column in memory. | |
| """ | |
| from __future__ import annotations | |
| import datetime | |
| import sys | |
| import time | |
| import tracemalloc | |
| from typing import Any | |
| from unittest.mock import patch | |
| from uuid import uuid1 | |
| import pyarrow as pa | |
| import pytest | |
| from lxml import etree | |
| from oai_pmh_client.models import Header, Record | |
| from pyiceberg.table import Table as IcebergTable | |
| from pyiceberg.table.upsert_util import get_rows_to_update | |
| from adapters.oai_pmh.record_writer import WindowRecordWriter, _serialize_metadata | |
| from adapters.utils.adapter_store import AdapterStore | |
| from adapters.utils.pipeline_store import PipelineStore | |
| from adapters.utils.schemata import ADAPTER_STORE_ARROW_SCHEMA | |
| from adapters.utils.window_store import WindowStatusRecord, WindowStore | |
| from tests.adapters.conftest import adapter_records_to_table | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| MARC21_TEMPLATE = """\ | |
| <record xmlns="http://www.loc.gov/MARC21/slim"> | |
| <leader>00000nam a2200000 a 4500</leader> | |
| <controlfield tag="001">{identifier}</controlfield> | |
| <controlfield tag="005">20260101120000.0</controlfield> | |
| <datafield tag="245" ind1="1" ind2="0"> | |
| <subfield code="a">Title for record {identifier}</subfield> | |
| </datafield> | |
| <datafield tag="100" ind1="1" ind2=" "> | |
| <subfield code="a">Author, Test</subfield> | |
| </datafield>{extra_fields} | |
| </record>""" | |
| def _make_extra_fields(n: int = 20) -> str: | |
| """Generate extra MARC datafields to produce realistic record sizes.""" | |
| parts: list[str] = [] | |
| for i in range(n): | |
| parts.append( | |
| f'\n <datafield tag="{650 + i % 50}" ind1="0" ind2="0">' | |
| f"\n <subfield code=\"a\">Subject heading {i} with extra padding " | |
| f"to make the field realistically long like a real catalogue record</subfield>" | |
| f"\n </datafield>" | |
| ) | |
| return "".join(parts) | |
| def _make_oai_record(identifier: str, extra_fields: int = 20) -> Record: | |
| """Create an OAI-PMH Record with realistic MARC21 XML content. | |
| Wraps the MARC21 record in a ``<metadata>`` element to match | |
| real OAI-PMH structure — the ``Record`` validator extracts the | |
| first child of the metadata wrapper via ``v[0]``. | |
| """ | |
| marc_xml = MARC21_TEMPLATE.format( | |
| identifier=identifier, | |
| extra_fields=_make_extra_fields(extra_fields), | |
| ) | |
| # Wrap in <metadata> so the Record validator picks up the whole <record> | |
| wrapper_str = f"<metadata>{marc_xml}</metadata>" | |
| metadata_element = etree.fromstring(wrapper_str) | |
| header = Header( | |
| identifier=identifier, | |
| datestamp=datetime.datetime(2026, 3, 25, 18, 0, 0, tzinfo=datetime.UTC), | |
| setSpec=[], | |
| status=None, | |
| ) | |
| return Record(header=header, metadata=metadata_element) | |
| def _make_adapter_row( | |
| identifier: str, | |
| extra_fields: int = 20, | |
| namespace: str = "test_namespace", | |
| ) -> dict[str, Any]: | |
| """Create an adapter store row dict with realistic MARC21 content.""" | |
| xml_str = MARC21_TEMPLATE.format( | |
| identifier=identifier, | |
| extra_fields=_make_extra_fields(extra_fields), | |
| ) | |
| return { | |
| "namespace": namespace, | |
| "id": identifier, | |
| "content": xml_str, | |
| "last_modified": datetime.datetime(2026, 3, 25, 12, 0, 0, tzinfo=datetime.UTC), | |
| "deleted": False, | |
| } | |
| def _seed_adapter_store( | |
| table: IcebergTable, | |
| n_records: int, | |
| extra_fields: int = 20, | |
| namespace: str = "test_namespace", | |
| ) -> AdapterStore: | |
| """Seed an adapter store with n_records of realistic content.""" | |
| rows = [ | |
| _make_adapter_row(f"rec-{i:05d}", extra_fields, namespace) | |
| for i in range(n_records) | |
| ] | |
| arrow_table = pa.Table.from_pylist(rows, schema=ADAPTER_STORE_ARROW_SCHEMA) | |
| table.append(arrow_table) | |
| return AdapterStore(table, namespace) | |
| # --------------------------------------------------------------------------- | |
| # Phase 0: Record size sanity check | |
| # --------------------------------------------------------------------------- | |
| class TestRecordSizes: | |
| """Verify test records are realistic sizes (real MARC21 records ~2-20KB).""" | |
| def test_oai_record_serialises_to_realistic_size(self) -> None: | |
| record = _make_oai_record("rec-00001", extra_fields=20) | |
| serialised = _serialize_metadata(record) | |
| assert serialised is not None | |
| size = len(serialised) | |
| print(f"\n Single record serialised size: {size} bytes") | |
| # Real MARC21 records with holdings are typically 2-20KB | |
| assert size > 500, ( | |
| f"Record too small ({size} bytes) — check the Record model's " | |
| "metadata validator is preserving the full <record> element" | |
| ) | |
| def test_adapter_row_content_size(self) -> None: | |
| row = _make_adapter_row("rec-00001", extra_fields=20) | |
| size = len(row["content"]) | |
| print(f"\n Adapter row content size: {size} bytes") | |
| assert size > 500 | |
| # --------------------------------------------------------------------------- | |
| # Phase 1: XML serialisation | |
| # --------------------------------------------------------------------------- | |
| class TestXMLSerialisationCost: | |
| """Measure cost of serialising MARC21 records from lxml to strings.""" | |
| @pytest.mark.parametrize("n_records", [100, 500, 1000]) | |
| def test_serialise_marc21_records(self, n_records: int) -> None: | |
| records = [_make_oai_record(f"rec-{i:05d}") for i in range(n_records)] | |
| start = time.perf_counter() | |
| serialised = [_serialize_metadata(r) for r in records] | |
| elapsed = time.perf_counter() - start | |
| total_bytes = sum(len(s) for s in serialised if s) | |
| avg_bytes = total_bytes // n_records | |
| print( | |
| f"\n Serialised {n_records} records in {elapsed:.3f}s " | |
| f"(avg {avg_bytes} bytes/record, total {total_bytes / 1024:.0f} KB)" | |
| ) | |
| # Serialisation should be fast — this is NOT the bottleneck | |
| assert elapsed < 5.0, f"XML serialisation took {elapsed:.1f}s for {n_records} records" | |
| # --------------------------------------------------------------------------- | |
| # Phase 2: get_rows_to_update (pyiceberg row-by-row comparison) | |
| # --------------------------------------------------------------------------- | |
| class TestGetRowsToUpdateCost: | |
| """Measure cost of pyiceberg's get_rows_to_update with large content. | |
| This function does row-by-row Python comparison: | |
| for source_idx, target_idx in zip(...): | |
| source_val = source_table.slice(idx, 1).column(key)[0].as_py() | |
| target_val = target_table.slice(idx, 1).column(key)[0].as_py() | |
| if source_val != target_val: ... | |
| For large MARC21 XML strings, each .as_py() deserialises the full content. | |
| """ | |
| @pytest.mark.parametrize("n_records", [100, 500, 1000]) | |
| def test_unchanged_records(self, n_records: int) -> None: | |
| """When all records are unchanged, every row is compared and none selected.""" | |
| rows = [_make_adapter_row(f"rec-{i:05d}") for i in range(n_records)] | |
| table = pa.Table.from_pylist(rows, schema=ADAPTER_STORE_ARROW_SCHEMA) | |
| # Compare content column only (what AdapterStore._compare_columns returns) | |
| join_fields = ["namespace", "id"] | |
| compare_cols = join_fields + ["content"] | |
| source = table.select(compare_cols) | |
| target = table.select(compare_cols) | |
| start = time.perf_counter() | |
| result = get_rows_to_update(source, target, join_fields) | |
| elapsed = time.perf_counter() - start | |
| print( | |
| f"\n get_rows_to_update: {n_records} unchanged records in {elapsed:.3f}s" | |
| ) | |
| assert result.num_rows == 0 | |
| # Flag if this is the bottleneck | |
| if elapsed > 1.0: | |
| print( | |
| f" ⚠ SLOW: {elapsed:.1f}s for {n_records} records — " | |
| "this row-by-row Python comparison is likely the bottleneck" | |
| ) | |
| @pytest.mark.parametrize("n_records", [100, 500, 1000]) | |
| def test_all_records_changed(self, n_records: int) -> None: | |
| """When all records have changed content, every row is compared + selected.""" | |
| original_rows = [_make_adapter_row(f"rec-{i:05d}") for i in range(n_records)] | |
| updated_rows = [ | |
| {**row, "content": row["content"] + "<!-- updated -->"} | |
| for row in original_rows | |
| ] | |
| source = pa.Table.from_pylist(updated_rows, schema=ADAPTER_STORE_ARROW_SCHEMA) | |
| target = pa.Table.from_pylist(original_rows, schema=ADAPTER_STORE_ARROW_SCHEMA) | |
| join_fields = ["namespace", "id"] | |
| compare_cols = join_fields + ["content"] | |
| start = time.perf_counter() | |
| result = get_rows_to_update( | |
| source.select(compare_cols), target.select(compare_cols), join_fields | |
| ) | |
| elapsed = time.perf_counter() - start | |
| print( | |
| f"\n get_rows_to_update: {n_records} changed records in {elapsed:.3f}s" | |
| ) | |
| assert result.num_rows == n_records | |
| if elapsed > 1.0: | |
| print( | |
| f" ⚠ SLOW: {elapsed:.1f}s for {n_records} records — " | |
| "row-by-row comparison is likely the bottleneck" | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Phase 3: Full incremental_update path | |
| # --------------------------------------------------------------------------- | |
| class TestIncrementalUpdateCost: | |
| """Measure end-to-end incremental_update time with existing data.""" | |
| @pytest.mark.parametrize("n_records", [100, 500]) | |
| def test_incremental_update_all_unchanged( | |
| self, temporary_table: IcebergTable, n_records: int | |
| ) -> None: | |
| """Re-ingesting the same records should detect no changes. | |
| Measures: Iceberg scan + get_rows_to_update + _preserve_content_for_deletions. | |
| """ | |
| store = _seed_adapter_store(temporary_table, n_records) | |
| # Build identical incoming data (same content, same timestamps) | |
| incoming_rows = [_make_adapter_row(f"rec-{i:05d}") for i in range(n_records)] | |
| incoming = pa.Table.from_pylist(incoming_rows, schema=ADAPTER_STORE_ARROW_SCHEMA) | |
| t0 = time.perf_counter() | |
| result = store.incremental_update(incoming) | |
| elapsed = time.perf_counter() - t0 | |
| print(f"\n incremental_update ({n_records} unchanged): {elapsed:.3f}s") | |
| # No changes expected | |
| if result is not None: | |
| print( | |
| f" updates={len(result.updated_record_ids)}, " | |
| f"inserts={len(result.inserted_record_ids)}" | |
| ) | |
| @pytest.mark.parametrize("n_records", [100, 500]) | |
| def test_incremental_update_all_new( | |
| self, temporary_table: IcebergTable, n_records: int | |
| ) -> None: | |
| """Inserting records into an empty table (no comparison needed).""" | |
| store = AdapterStore(temporary_table, "test_namespace") | |
| incoming_rows = [_make_adapter_row(f"rec-{i:05d}") for i in range(n_records)] | |
| incoming = pa.Table.from_pylist(incoming_rows, schema=ADAPTER_STORE_ARROW_SCHEMA) | |
| t0 = time.perf_counter() | |
| result = store.incremental_update(incoming) | |
| elapsed = time.perf_counter() - t0 | |
| print(f"\n incremental_update ({n_records} new inserts): {elapsed:.3f}s") | |
| assert result is not None | |
| assert len(result.inserted_record_ids) == n_records | |
| @pytest.mark.parametrize("n_records", [100, 500]) | |
| def test_incremental_update_all_changed( | |
| self, temporary_table: IcebergTable, n_records: int | |
| ) -> None: | |
| """All records changed — measures full update path including overwrite.""" | |
| store = _seed_adapter_store(temporary_table, n_records) | |
| # New data with updated content and newer timestamp | |
| updated_rows = [ | |
| { | |
| **_make_adapter_row(f"rec-{i:05d}"), | |
| "content": _make_adapter_row(f"rec-{i:05d}")["content"] | |
| + "<!-- changed -->", | |
| "last_modified": datetime.datetime( | |
| 2026, 3, 26, 12, 0, 0, tzinfo=datetime.UTC | |
| ), | |
| } | |
| for i in range(n_records) | |
| ] | |
| incoming = pa.Table.from_pylist(updated_rows, schema=ADAPTER_STORE_ARROW_SCHEMA) | |
| t0 = time.perf_counter() | |
| result = store.incremental_update(incoming) | |
| elapsed = time.perf_counter() - t0 | |
| print(f"\n incremental_update ({n_records} changed): {elapsed:.3f}s") | |
| assert result is not None | |
| print( | |
| f" updates={len(result.updated_record_ids)}, " | |
| f"inserts={len(result.inserted_record_ids)}" | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Phase 4: Full WindowRecordWriter callback with Iceberg | |
| # --------------------------------------------------------------------------- | |
| class TestWindowRecordWriterWithIceberg: | |
| """Measure the full record writer callback including Iceberg writes.""" | |
| @pytest.mark.parametrize("n_records", [100, 500]) | |
| def test_record_writer_empty_table( | |
| self, temporary_table: IcebergTable, n_records: int | |
| ) -> None: | |
| """First harvest — all records are inserts.""" | |
| store = AdapterStore(temporary_table, "test_namespace") | |
| writer = WindowRecordWriter( | |
| namespace="test_namespace", | |
| table_client=store, | |
| job_id="perf-test", | |
| window_range="2026-03-25T18:00:00Z-2026-03-25T19:00:00Z", | |
| ) | |
| records: list[tuple[str, Record]] = [ | |
| (f"rec-{i:05d}", _make_oai_record(f"rec-{i:05d}")) | |
| for i in range(n_records) | |
| ] | |
| t0 = time.perf_counter() | |
| result = writer(records) | |
| elapsed = time.perf_counter() - t0 | |
| print(f"\n WindowRecordWriter ({n_records} records, empty table): {elapsed:.3f}s") | |
| assert "changeset_id" in result["tags"] | |
| @pytest.mark.parametrize("n_records", [100, 500]) | |
| def test_record_writer_with_existing_unchanged_data( | |
| self, temporary_table: IcebergTable, n_records: int | |
| ) -> None: | |
| """Re-harvest — all records already exist and are unchanged.""" | |
| store = _seed_adapter_store(temporary_table, n_records) | |
| writer = WindowRecordWriter( | |
| namespace="test_namespace", | |
| table_client=store, | |
| job_id="perf-test", | |
| window_range="2026-03-25T18:00:00Z-2026-03-25T19:00:00Z", | |
| ) | |
| # Re-send the same records | |
| records: list[tuple[str, Record]] = [ | |
| (f"rec-{i:05d}", _make_oai_record(f"rec-{i:05d}")) | |
| for i in range(n_records) | |
| ] | |
| t0 = time.perf_counter() | |
| result = writer(records) | |
| elapsed = time.perf_counter() - t0 | |
| print( | |
| f"\n WindowRecordWriter ({n_records} records, all existing/unchanged): {elapsed:.3f}s" | |
| ) | |
| # This exercises the full get_rows_to_update path — likely the slowest case | |
| if elapsed > 2.0: | |
| print( | |
| f" ⚠ SLOW: {elapsed:.1f}s — the incremental_update comparison " | |
| "is likely the bottleneck when records haven't changed" | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Breakdown: time each sub-step of incremental_update individually | |
| # --------------------------------------------------------------------------- | |
| class TestIncrementalUpdateBreakdown: | |
| """Time each sub-step of incremental_update to find the exact bottleneck.""" | |
| def test_breakdown_with_500_existing_records( | |
| self, temporary_table: IcebergTable | |
| ) -> None: | |
| n_records = 500 | |
| store = _seed_adapter_store(temporary_table, n_records) | |
| incoming_rows = [_make_adapter_row(f"rec-{i:05d}") for i in range(n_records)] | |
| new_data = pa.Table.from_pylist(incoming_rows, schema=ADAPTER_STORE_ARROW_SCHEMA) | |
| new_data = store.normalise_table(new_data) | |
| # Step 1: Extract IDs and scan existing records | |
| t0 = time.perf_counter() | |
| from pyiceberg.expressions import In | |
| new_ids_filter = In("id", store._extract_ids(new_data)) | |
| existing_data = store.get_namespace_records(new_ids_filter) | |
| t_scan = time.perf_counter() - t0 | |
| # Step 2: Find updates (get_rows_to_update) | |
| t0 = time.perf_counter() | |
| updates = store._find_updates(existing_data, new_data) | |
| t_find_updates = time.perf_counter() - t0 | |
| # Step 3: Filter by timestamp | |
| t0 = time.perf_counter() | |
| updates = store._filter_updates_by_timestamp(updates, existing_data) | |
| t_filter_ts = time.perf_counter() - t0 | |
| # Step 4: Preserve content for deletions | |
| t0 = time.perf_counter() | |
| updates = store._transform_incremental_updates(updates, existing_data) | |
| t_preserve = time.perf_counter() - t0 | |
| # Step 5: Find inserts | |
| t0 = time.perf_counter() | |
| inserts = store._find_inserts(existing_data, new_data) | |
| t_inserts = time.perf_counter() - t0 | |
| # Step 6: Commit changeset | |
| t0 = time.perf_counter() | |
| result = store._commit_changeset( | |
| updates if updates.num_rows > 0 else None, | |
| inserts if inserts.num_rows > 0 else None, | |
| ) | |
| t_commit = time.perf_counter() - t0 | |
| total = t_scan + t_find_updates + t_filter_ts + t_preserve + t_inserts + t_commit | |
| print(f"\n === incremental_update breakdown ({n_records} records) ===") | |
| print(f" 1. Iceberg scan (get_namespace_records): {t_scan:.3f}s") | |
| print(f" 2. Find updates (get_rows_to_update): {t_find_updates:.3f}s") | |
| print(f" 3. Filter by timestamp: {t_filter_ts:.3f}s") | |
| print(f" 4. Preserve content for deletions: {t_preserve:.3f}s") | |
| print(f" 5. Find inserts: {t_inserts:.3f}s") | |
| print(f" 6. Commit changeset: {t_commit:.3f}s") | |
| print(f" TOTAL: {total:.3f}s") | |
| # Identify the bottleneck | |
| steps = { | |
| "Iceberg scan": t_scan, | |
| "get_rows_to_update": t_find_updates, | |
| "Filter timestamp": t_filter_ts, | |
| "Preserve deletions": t_preserve, | |
| "Find inserts": t_inserts, | |
| "Commit changeset": t_commit, | |
| } | |
| slowest = max(steps, key=steps.get) # type: ignore[arg-type] | |
| print(f"\n ➜ Slowest step: {slowest} ({steps[slowest]:.3f}s)") | |
| # --------------------------------------------------------------------------- | |
| # Phase 5: Memory profiling | |
| # --------------------------------------------------------------------------- | |
| def _mb(nbytes: int) -> float: | |
| return nbytes / (1024 * 1024) | |
| class TestMemoryUsage: | |
| """Measure memory consumed by each phase of the window processing pipeline. | |
| The Lambda reportedly uses high memory even when there are no changes. | |
| These tests identify which data structures contribute. | |
| """ | |
| def test_get_namespace_records_loads_full_content( | |
| self, | |
| temporary_table: IcebergTable, | |
| ) -> None: | |
| """get_namespace_records returns ALL columns including content. | |
| incremental_update calls this to load existing data. Even though | |
| _find_updates only compares ["namespace", "id", "content"], | |
| the full table (with changeset, last_modified, deleted) is loaded | |
| and kept alive for _preserve_content_for_deletions and | |
| _filter_updates_by_timestamp. | |
| """ | |
| n_records = 500 | |
| store = _seed_adapter_store(temporary_table, n_records) | |
| tracemalloc.start() | |
| from pyiceberg.expressions import In | |
| ids = [f"rec-{i:05d}" for i in range(n_records)] | |
| existing = store.get_namespace_records(In("id", ids)) | |
| snapshot = tracemalloc.take_snapshot() | |
| tracemalloc.stop() | |
| # Memory for the Arrow table itself | |
| arrow_bytes = existing.nbytes | |
| # Memory as reported by tracemalloc (includes Python overhead) | |
| total_traced = sum(stat.size for stat in snapshot.statistics("filename")) | |
| print(f"\n get_namespace_records ({n_records} records):") | |
| print(f" Arrow table nbytes: {_mb(arrow_bytes):.2f} MB") | |
| print(f" Columns: {existing.column_names}") | |
| print(f" Content column bytes: {_mb(existing.column('content').nbytes):.2f} MB") | |
| print(f" tracemalloc total: {_mb(total_traced):.2f} MB") | |
| def test_preserve_content_materialises_to_pylist( | |
| self, | |
| temporary_table: IcebergTable, | |
| ) -> None: | |
| """_preserve_content_for_deletions calls existing_data.to_pylist(). | |
| This converts the *entire* Arrow table to Python dicts in memory, | |
| duplicating all content strings. Even when there are zero deletions, | |
| it still builds the lookup dict from to_pylist(). | |
| """ | |
| n_records = 500 | |
| store = _seed_adapter_store(temporary_table, n_records) | |
| from pyiceberg.expressions import In | |
| ids = [f"rec-{i:05d}" for i in range(n_records)] | |
| existing = store.get_namespace_records(In("id", ids)) | |
| # Simulate an update set with no deletions (typical "no changes" case) | |
| # _preserve_content_for_deletions is still called and builds a full dict | |
| updates = existing.slice(0, 0) # Empty, but existing_data is still loaded | |
| tracemalloc.start() | |
| result1 = store._transform_incremental_updates(updates, existing) | |
| snap1 = tracemalloc.take_snapshot() | |
| tracemalloc.stop() | |
| # Now with a non-empty updates set (forces the to_pylist() path) | |
| updates_nonempty = existing.slice(0, min(10, n_records)) | |
| tracemalloc.start() | |
| result2 = store._transform_incremental_updates(updates_nonempty, existing) | |
| snap2 = tracemalloc.take_snapshot() | |
| tracemalloc.stop() | |
| mem1 = sum(stat.size for stat in snap1.statistics("filename")) | |
| mem2 = sum(stat.size for stat in snap2.statistics("filename")) | |
| print(f"\n _preserve_content_for_deletions ({n_records} existing):") | |
| print(f" Empty updates → memory: {_mb(mem1):.2f} MB") | |
| print(f" 10 updates → memory: {_mb(mem2):.2f} MB (to_pylist() activated)") | |
| print(f" existing Arrow nbytes: {_mb(existing.nbytes):.2f} MB") | |
| def test_load_status_map_full_table_scan( | |
| self, | |
| temporary_window_status_table: IcebergTable, | |
| ) -> None: | |
| """load_status_map scans the ENTIRE window_status table with no filters. | |
| Each row contains a record_ids list that can hold hundreds of IDs. | |
| Over months of 15-minute windows this table grows large: | |
| - 96 windows/day × 365 days ≈ 35,000 rows | |
| - Each row with ~100 record_ids ≈ 3.5M string entries | |
| """ | |
| store = WindowStore(temporary_window_status_table) | |
| # Simulate several weeks of window history | |
| n_windows = 500 | |
| records_per_window = 50 | |
| base = datetime.datetime(2026, 1, 1, tzinfo=datetime.UTC) | |
| for i in range(n_windows): | |
| start = base + datetime.timedelta(minutes=15 * i) | |
| end = start + datetime.timedelta(minutes=15) | |
| record_ids = tuple(f"rec-{j:05d}" for j in range(records_per_window)) | |
| store.upsert( | |
| WindowStatusRecord( | |
| window_key=f"{start.isoformat()}_{end.isoformat()}", | |
| window_start=start, | |
| window_end=end, | |
| state="success", | |
| attempts=1, | |
| last_error=None, | |
| record_ids=record_ids, | |
| updated_at=end, | |
| tags={"job_id": "test"}, | |
| ) | |
| ) | |
| tracemalloc.start() | |
| status_map = store.load_status_map() | |
| snapshot = tracemalloc.take_snapshot() | |
| tracemalloc.stop() | |
| total_mem = sum(stat.size for stat in snapshot.statistics("filename")) | |
| total_record_ids = sum( | |
| len(row.get("record_ids", [])) for row in status_map.values() | |
| ) | |
| print(f"\n load_status_map ({n_windows} windows, {records_per_window} IDs each):") | |
| print(f" Windows loaded: {len(status_map)}") | |
| print(f" Total record_ids: {total_record_ids}") | |
| print(f" tracemalloc total: {_mb(total_mem):.2f} MB") | |
| def test_incremental_update_memory_no_changes( | |
| self, | |
| temporary_table: IcebergTable, | |
| ) -> None: | |
| """Measure total memory for incremental_update when nothing has changed. | |
| This is the "suspiciously high memory" case. The pipeline: | |
| 1. Loads existing records (full content) into Arrow table | |
| 2. Builds projected copies for comparison | |
| 3. get_rows_to_update materialises rows into Python objects | |
| 4. _preserve_content_for_deletions may call to_pylist() | |
| Multiple overlapping copies of the content exist simultaneously. | |
| """ | |
| n_records = 500 | |
| store = _seed_adapter_store(temporary_table, n_records) | |
| incoming_rows = [_make_adapter_row(f"rec-{i:05d}") for i in range(n_records)] | |
| incoming = pa.Table.from_pylist(incoming_rows, schema=ADAPTER_STORE_ARROW_SCHEMA) | |
| content_bytes = incoming.column("content").nbytes | |
| tracemalloc.start() | |
| before = tracemalloc.take_snapshot() | |
| result = store.incremental_update(incoming) | |
| after = tracemalloc.take_snapshot() | |
| tracemalloc.stop() | |
| peak = _mb(after.statistics("filename")[0].size) if after.statistics("filename") else 0 | |
| diff_stats = after.compare_to(before, "lineno") | |
| total_alloc = sum(s.size_diff for s in diff_stats if s.size_diff > 0) | |
| print(f"\n incremental_update memory ({n_records} records, NO changes):") | |
| print(f" Content column size: {_mb(content_bytes):.2f} MB") | |
| print(f" New allocations during update: {_mb(total_alloc):.2f} MB") | |
| print(f" Ratio (alloc / content): {total_alloc / content_bytes:.1f}x") | |
| if total_alloc > content_bytes * 3: | |
| print( | |
| f" ⚠ HIGH MEMORY: allocated {total_alloc / content_bytes:.1f}x " | |
| "the content size — multiple copies in memory" | |
| ) | |
| def test_concurrent_data_copies_in_incremental_update( | |
| self, | |
| temporary_table: IcebergTable, | |
| ) -> None: | |
| """Count how many simultaneous copies of content data exist. | |
| During incremental_update the following all coexist in memory: | |
| - new_data Arrow table (incoming records with content) | |
| - existing_data Arrow table (from Iceberg scan, full content) | |
| - new_projected (select ["namespace", "id", "content"]) | |
| - existing_projected (select ["namespace", "id", "content"]) | |
| - get_rows_to_update internal: Python strings from .as_py() | |
| This test instruments the data to count content column copies. | |
| """ | |
| n_records = 200 | |
| store = _seed_adapter_store(temporary_table, n_records) | |
| incoming_rows = [_make_adapter_row(f"rec-{i:05d}") for i in range(n_records)] | |
| new_data = pa.Table.from_pylist(incoming_rows, schema=ADAPTER_STORE_ARROW_SCHEMA) | |
| new_data = store.normalise_table(new_data) | |
| from pyiceberg.expressions import In | |
| new_ids_filter = In("id", store._extract_ids(new_data)) | |
| existing_data = store.get_namespace_records(new_ids_filter) | |
| # At this point we already have two full copies of content in Arrow tables | |
| new_content_bytes = new_data.column("content").nbytes | |
| existing_content_bytes = existing_data.column("content").nbytes | |
| # _find_updates creates projected copies | |
| join_fields = ["namespace", "id"] | |
| compare_cols = join_fields + ["content"] | |
| new_projected = new_data.select(compare_cols) | |
| existing_projected = existing_data.select(compare_cols) | |
| # Arrow select() shares memory (zero-copy) — verify this | |
| # by checking the buffers are the same objects | |
| new_proj_bytes = new_projected.column("content").nbytes | |
| exist_proj_bytes = existing_projected.column("content").nbytes | |
| print(f"\n Content column copies ({n_records} records):") | |
| print(f" new_data content: {_mb(new_content_bytes):.2f} MB") | |
| print(f" existing_data content: {_mb(existing_content_bytes):.2f} MB") | |
| print(f" new_projected content: {_mb(new_proj_bytes):.2f} MB") | |
| print(f" existing_projected content:{_mb(exist_proj_bytes):.2f} MB") | |
| # get_rows_to_update does row-by-row .as_py() — measure the cost | |
| tracemalloc.start() | |
| _ = get_rows_to_update(new_projected, existing_projected, join_fields) | |
| snap = tracemalloc.take_snapshot() | |
| tracemalloc.stop() | |
| comparison_mem = sum(s.size for s in snap.statistics("filename")) | |
| print(f" get_rows_to_update alloc: {_mb(comparison_mem):.2f} MB") | |
| total_content = new_content_bytes + existing_content_bytes | |
| print(f"\n Total content in memory: {_mb(total_content):.2f} MB") | |
| print(f" (2x because both new + existing are loaded simultaneously)") | |
| def test_window_status_record_ids_memory( | |
| self, | |
| temporary_window_status_table: IcebergTable, | |
| ) -> None: | |
| """Measure memory used by record_ids lists in WindowStatusRecords. | |
| Each upsert writes the full record_ids tuple. Over time the | |
| window_status table accumulates record_ids for every window ever | |
| processed. load_status_map loads all of them into Python. | |
| """ | |
| store = WindowStore(temporary_window_status_table) | |
| # Simulate a realistic window with many records | |
| record_counts = [50, 100, 200, 500] | |
| for count in record_counts: | |
| base = datetime.datetime(2026, 1, 1, tzinfo=datetime.UTC) | |
| start = base | |
| end = base + datetime.timedelta(minutes=15) | |
| record_ids = tuple(f"oai:folio:rec-{j:05d}" for j in range(count)) | |
| # Measure size of one WindowStatusRecord's record_ids in Python | |
| ids_size = sys.getsizeof(record_ids) | |
| ids_size += sum(sys.getsizeof(rid) for rid in record_ids) | |
| print(f" record_ids({count}): {_mb(ids_size):.3f} MB per window") | |
| # For 35,000 windows (1 year of 15-min windows) with avg 100 IDs: | |
| est_windows = 35_000 | |
| avg_ids = 100 | |
| avg_id_len = len("oai:folio:rec-00000") | |
| est_bytes = est_windows * avg_ids * (avg_id_len + sys.getsizeof("")) | |
| print(f"\n Estimated memory for {est_windows} windows × {avg_ids} IDs: " | |
| f"{_mb(est_bytes):.0f} MB") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment