Skip to content

Instantly share code, notes, and snippets.

@bienpx224
Last active December 12, 2025 02:53
Show Gist options
  • Select an option

  • Save bienpx224/6a26f921b8f6f790e3b44c8edb39abcf to your computer and use it in GitHub Desktop.

Select an option

Save bienpx224/6a26f921b8f6f790e3b44c8edb39abcf to your computer and use it in GitHub Desktop.
Optimize-mechanism-get-milions-ecom-data.md

Tài Liệu Kỹ Thuật: Phương Pháp & Kỹ Thuật Tối Ưu Hệ Thống Đồng Bộ Dữ Liệu Quy Mô Lớn

📋 Mục Lục

  1. Tổng Quan
  2. Lựa Chọn Công Nghệ (Technology Stack Decisions)
  3. Kiến Trúc Tổng Thể
  4. Các Kỹ Thuật Tối Ưu Chính
  5. Chiến Lược Xử Lý Dữ Liệu
  6. Chuẩn Hóa & Transform Dữ Liệu (Data Engineering)
  7. Cấu Hình Linh Hoạt & Không Hardcode
  8. Tối Ưu API Calls
  9. Tối Ưu Database Operations
  10. Xử Lý Lỗi & Đảm Bảo Độ Tin Cậy
  11. File I/O & Race Condition Prevention
  12. Tối Ưu Bộ Nhớ & Hiệu Suất
  13. Monitoring & Observability
  14. Best Practices & Lessons Learned
  15. Đồng Bộ Webhook TikTok (Redis Stream → Worker → Mongo)

Tổng Quan

Bối Cảnh Dự Án

Hệ thống đồng bộ dữ liệu từ các sàn thương mại điện tử (Shopee, TikTok, Lazada) với quy mô:

  • Thời gian: Nhiều năm trước đến hiện tại
  • Khối lượng: Hàng chục triệu bản ghi
  • Độ phức tạp: Nhiều loại dữ liệu (orders, products, transactions, returns, tracking status)
  • Yêu cầu: Không được miss dữ liệu, hệ thống chạy ổn định, kiểm soát tốt

Thách Thức Chính

  1. Quy mô dữ liệu khổng lồ: Hàng chục triệu records cần xử lý
  2. API Rate Limits: Giới hạn số lượng requests từ các platform
  3. Memory Constraints: Không thể load toàn bộ data vào RAM
  4. Network Reliability: API calls có thể fail, timeout
  5. Database Performance: MongoDB write operations cần tối ưu
  6. Data Consistency: Đảm bảo không duplicate, không miss data
  7. Error Recovery: Xử lý lỗi và retry thông minh

Lựa Chọn Công Nghệ (Technology Stack Decisions)

1. Python vs Node.js

Lý do chọn Python:

1.1. Ecosystem cho Data Processing

  • Thư viện phong phú: pymongo, celery, pandas, requests - mature và stable
  • Data manipulation: Dễ xử lý dữ liệu phức tạp (nested objects, arrays, transformations)
  • Type hints: Hỗ trợ type checking với mypy để đảm bảo code quality
  • Dataclasses: Dễ tạo DTOs và data models với @dataclass

1.2. Performance cho I/O-bound Tasks

  • Async I/O: asyncio cho concurrent API calls (nếu cần)
  • GIL không phải vấn đề: Với I/O-bound tasks (API calls, DB operations), GIL không ảnh hưởng nhiều
  • Multiprocessing: Celery workers chạy separate processes → bypass GIL

1.3. Developer Experience

  • Readability: Code dễ đọc, dễ maintain cho team
  • Debugging: Tools tốt (pdb, ipdb, IDE support)
  • Testing: pytest, unittest - mature testing frameworks

1.4. So sánh với Node.js

  • Node.js:
    • Event loop tốt cho I/O nhưng callback hell với complex data transformations
    • TypeScript giúp nhưng vẫn phức tạp hơn Python cho data processing
    • Ecosystem cho data processing ít hơn Python

Kết luận: Python phù hợp hơn cho data synchronization tasks với nhiều transformations và validations.


2. MongoDB vs MySQL/PostgreSQL

Lý do chọn MongoDB:

2.1. Schema Flexibility

  • Dynamic Schema: Dữ liệu từ các platform (TikTok, Shopee) có cấu trúc khác nhau, thay đổi theo thời gian
  • Nested Documents: Dễ lưu nested objects (line_items, packages, addresses) mà không cần JOIN
  • No Migration: Thêm fields mới không cần migration scripts

Ví dụ:

# MongoDB: Lưu trực tiếp nested structure
order = {
    "order_id": "123",
    "line_items": [
        {"product_id": "p1", "quantity": 2, "price": 100},
        {"product_id": "p2", "quantity": 1, "price": 200}
    ],
    "recipient_address": {
        "name": "John",
        "street": "123 Main St",
        "city": "Hanoi"
    }
}
mongo.db.orders.insert_one(order)

# MySQL/PostgreSQL: Cần nhiều tables và JOINs
# orders table
# order_line_items table (foreign key)
# addresses table (foreign key)
# → Phức tạp hơn, nhiều queries hơn

2.2. Write Performance

  • Bulk Write: bulk_write() với ordered=False → parallel writes trong MongoDB
  • No Transactions Overhead: Với sync operations, không cần ACID transactions → faster writes
  • Document-level Locking: Nhanh hơn row-level locking trong RDBMS

Performance Comparison:

MongoDB bulk_write (300 orders):
- Time: ~0.3 giây
- Throughput: ~1000 ops/second

PostgreSQL bulk insert (300 orders + line_items):
- Time: ~1.5 giây (với JOINs và foreign keys)
- Throughput: ~200 ops/second

2.3. Horizontal Scaling

  • Sharding: Dễ scale horizontally khi data lớn
  • Replica Sets: High availability với automatic failover
  • No JOINs: Không cần JOINs → dễ distribute data

2.4. Data Volume

  • Hàng chục triệu records: MongoDB handle tốt với proper indexing
  • JSON Storage: Native JSON → không cần serialization/deserialization
  • Compression: WiredTiger storage engine có compression → tiết kiệm disk

2.5. So sánh với RDBMS

MySQL/PostgreSQL:

  • Fixed Schema: Cần migration khi thay đổi structure
  • JOINs: Phức tạp với nested data, chậm hơn
  • Transactions: Overhead không cần thiết cho sync operations
  • ACID: Tốt cho financial transactions (nhưng không cần cho sync)

Kết luận: MongoDB phù hợp hơn cho:

  • Dữ liệu có schema linh hoạt (e-commerce platforms)
  • Write-heavy workloads (sync operations)
  • Nested data structures
  • Horizontal scaling requirements

3. Redis vs MongoDB Broker vs RabbitMQ vs Kafka

Lý do chọn Redis làm Celery Broker:

3.1. Simplicity & Performance

  • Lightweight: Redis nhẹ, dễ setup, ít dependencies
  • Fast: In-memory → latency thấp (< 1ms)
  • Simple API: LPUSH, BRPOP - đơn giản cho task queue

Performance Comparison:

Redis (in-memory):
- Latency: < 1ms
- Throughput: 100,000+ ops/second
- Memory usage: Low (chỉ lưu task metadata)

RabbitMQ:
- Latency: 5-10ms
- Throughput: 20,000-50,000 ops/second
- Memory usage: Higher (persistent queues)

Kafka:
- Latency: 10-50ms (overkill cho task queue)
- Throughput: 1M+ ops/second (không cần thiết)
- Complexity: High (Zookeeper, partitions, etc.)

3.2. Use Case Fit

  • Task Queue: Celery tasks là short-lived, không cần persistence lâu
  • No Message Persistence Needed: Task results không lưu trong broker (dùng MongoDB logs)
  • Rate Limiting: Redis có thể dùng cho rate limiting (nếu cần)

3.3. So sánh với các alternatives

MongoDB Broker:

  • Slower: Disk-based → latency cao hơn Redis
  • Overhead: MongoDB đã dùng cho data storage → không nên dùng cho queue
  • Complexity: Cần setup MongoDB chỉ cho broker

RabbitMQ:

  • Features: Message persistence, routing, exchanges
  • Overkill: Không cần features phức tạp cho simple task queue
  • Heavier: Nhiều dependencies, setup phức tạp hơn

Kafka:

  • High Throughput: 1M+ messages/second
  • Overkill: Quá phức tạp cho task queue
  • Latency: Higher latency (10-50ms)
  • Complexity: Cần Zookeeper, partitions management

3.4. Celery với Redis

Architecture:

Celery Worker ←→ Redis (Broker) ←→ Celery Client
     ↓
MongoDB (Result Backend - optional)

Benefits:

  • Fast Task Distribution: Redis in-memory → tasks được distribute nhanh
  • Simple Setup: Chỉ cần Redis server
  • No Result Backend: Không lưu results trong Redis (dùng MongoDB logs)
  • Rate Limiting: Có thể dùng Redis cho rate limiting nếu cần

Kết luận: Redis là lựa chọn tối ưu cho Celery broker vì:

  • Đơn giản, nhanh
  • Phù hợp với use case (task queue, không cần persistence)
  • Dễ scale và maintain

Kiến Trúc Tổng Thể

1. Kiến Trúc Phân Tầng (Layered Architecture)

┌─────────────────────────────────────────────────────────┐
│              Celery Task Orchestrator                   │
│  (trigger_all_channels_sync, trigger_channel_sync)     │
└──────────────────┬──────────────────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────────────────┐
│            Chunk Processing Layer                       │
│  (process_shop_chunk - chia nhỏ theo date range)        │
└──────────────────┬──────────────────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────────────────┐
│            Sync Service Layer                            │
│  (sync_shop_data - batch processing, streaming)         │
└──────────────────┬──────────────────────────────────────┘
                   │
        ┌──────────┴──────────┐
        ▼                     ▼
┌──────────────┐      ┌──────────────┐
│ TikTok API   │      │ Shopee API   │
│ Service      │      │ Service      │
└──────┬───────┘      └──────┬───────┘
       │                     │
       └──────────┬──────────┘
                  ▼
        ┌──────────────────┐
        │   MongoDB        │
        │   (Bulk Write)   │
        └──────────────────┘
                  │
                  ▼
        ┌──────────────────┐
        │  Sync Buffer     │
        │  (File-based)    │
        └──────────────────┘

2. Luồng Xử Lý Chính

1. Orchestrator Task
   ↓
2. Chia Date Range thành Chunks (ví dụ: 1 ngày/chunk)
   ↓
3. Enqueue Chunk Tasks (với rate limiting)
   ↓
4. Worker xử lý từng Chunk:
   ├─ Fetch Products (nếu chunk đầu tiên)
   ├─ Fetch Orders (streaming, batch)
   ├─ Fetch Order Details (batch API calls)
   ├─ Fetch Tracking Status (batch API calls)
   ├─ Fetch Transactions (streaming, batch)
   └─ Fetch Returns (streaming, batch)
   ↓
5. Persist vào MongoDB (bulk_write)
   ↓
6. Nếu MongoDB fail → Lưu vào Buffer
   ↓
7. Replay Buffer sau khi sync xong

Các Kỹ Thuật Tối Ưu Chính

1. Chunking & Date Range Splitting

Mục đích: Chia nhỏ khoảng thời gian lớn thành các chunk nhỏ để xử lý song song và dễ quản lý.

Implementation:

# utils/date_splitter.py
def split_date_range(start_date: str, end_date: str, chunk_days: int = 1) -> List[Tuple[str, str]]:
    """
    Chia khoảng ngày thành các cửa sổ cố định.
    Ví dụ: 2020-01-01 -> 2024-12-31 với chunk_days=1
    → Tạo ~1800 chunks, mỗi chunk xử lý 1 ngày
    """
    chunks = []
    cursor = start_dt
    while cursor <= end_dt:
        chunk_end = min(cursor + timedelta(days=chunk_days - 1), end_dt)
        chunks.append((cursor.strftime("%Y-%m-%d"), chunk_end.strftime("%Y-%m-%d")))
        cursor = chunk_end + timedelta(days=1)
    return chunks

Lợi ích:

  • ✅ Xử lý song song nhiều chunks cùng lúc
  • ✅ Dễ retry khi 1 chunk fail
  • ✅ Kiểm soát memory usage (chỉ xử lý 1 chunk tại 1 thời điểm)
  • ✅ Progress tracking chi tiết theo chunk

Config:

SYNC_CHUNK_DAYS = 1  # Mỗi chunk = 1 ngày

2. Streaming & Batch Processing

Mục đích: Xử lý dữ liệu theo batch, không load toàn bộ vào RAM.

Implementation:

# services/tiktok_service.py
def fetch_orders_batch(self, start_date: str, end_date: str, batch_size: int = 300):
    """
    Generator function - yield từng batch thay vì return toàn bộ list.
    Giúp giảm memory footprint khi xử lý hàng triệu orders.
    """
    current_batch = []
    for order_summary in order_summaries:
        # Fetch order details theo batch (50 IDs/lần)
        order_details = self.fetch_order_details_batch(order_ids_batch)
        current_batch.extend(order_details)
        
        if len(current_batch) >= batch_size:
            yield current_batch  # Yield batch, không giữ trong memory
            current_batch = []  # Reset để tiếp tục
    
    if current_batch:
        yield current_batch  # Yield batch cuối cùng

Lợi ích:

  • ✅ Memory footprint thấp: chỉ giữ 1 batch trong RAM
  • ✅ Có thể xử lý hàng triệu records mà không bị OOM
  • ✅ Persist ngay sau mỗi batch → giảm risk mất data

Config:

SYNC_BATCH_SIZE_ORDERS = 300
SYNC_BATCH_SIZE_TRANSACTIONS = 300
SYNC_BATCH_SIZE_RETURNS = 300

3. Batch API Calls

Mục đích: Giảm số lượng API calls bằng cách gom nhiều IDs vào 1 request.

Vấn đề ban đầu (N+1 Problem):

# ❌ TRƯỚC: 300 orders → 300 API calls
for order_summary in order_summaries:
    order_detail = self.fetch_order_detail(order_id)  # 1 API call/order
    # 300 orders × 300ms = 90 giây

Giải pháp (Batch API):

# ✅ SAU: 300 orders → 6 API calls (50 IDs/batch)
def fetch_order_details_batch(self, order_ids: List[str], batch_size: int = 50):
    """
    Gom nhiều order IDs vào 1 API call.
    TikTok API hỗ trợ parameter 'ids' với comma-separated values.
    """
    all_details = []
    for i in range(0, len(order_ids), batch_size):
        batch_ids = order_ids[i:i + batch_size]
        ids_param = ','.join(batch_ids)  # "id1,id2,id3,...,id50"
        
        # 1 API call cho 50 orders
        response = self._make_request(..., params={'ids': ids_param})
        all_details.extend(response.get('data', {}).get('order_list', []))
    
    return all_details
    # 300 orders ÷ 50 = 6 API calls × 500ms = 3 giây
    # Cải thiện: 30x nhanh hơn!

Lợi ích:

  • ✅ Giảm số lượng API calls: 300 → 6 (với batch_size=50)
  • ✅ Giảm thời gian: 90s → 3s (30x nhanh hơn)
  • ✅ Giảm rate limit pressure
  • ✅ Giảm network overhead

Config:

TIKTOK_ORDER_DETAILS_BATCH_SIZE = 50  # TikTok API limit
TIKTOK_TRACKING_STATUS_BATCH_SIZE = 50
SHOPEE_ORDER_DETAILS_BATCH_SIZE = 50

4. Bulk MongoDB Operations

Mục đích: Sử dụng bulk_write thay vì từng update_one để tối ưu performance.

Vấn đề ban đầu:

# ❌ TRƯỚC: 300 orders → 300 MongoDB operations
for order in orders:
    mongo.db.orders.update_one(
        {'order_id': order['id'], 'shop_id': shop_id},
        {'$set': order},
        upsert=True
    )
    # 300 operations × 10ms = 3 giây

Giải pháp (Bulk Write):

# ✅ SAU: 300 orders → 1 bulk_write operation
def _persist_orders_bulk(self, orders: List[Dict], ...):
    bulk_operations = []
    for order in orders:
        bulk_operations.append(
            UpdateOne(
                {'order_id': order['id'], 'shop_id': shop_id, 'channel_id': channel_id},
                {'$set': {...order, 'synced_at': iso_utc7_now()}},
                upsert=True
            )
        )
    
    # 1 operation cho 300 orders
    result = mongo.db.orders.bulk_write(bulk_operations, ordered=False)
    # 300 orders × 1ms = 0.3 giây
    # Cải thiện: 10x nhanh hơn!

Lợi ích:

  • ✅ Giảm số lượng database round-trips
  • ✅ Tăng throughput: 10-100x nhanh hơn
  • ✅ Atomic operation: tất cả hoặc không có gì
  • ordered=False: xử lý song song trong MongoDB

Collections sử dụng bulk_write:

  • orders_persist_orders_bulk()
  • transactions_persist_transactions_bulk()
  • returns_persist_returns_bulk()
  • order_tracking_status_persist_tracking_status_bulk()
  • products → bulk_write trong _sync_shop_data_batch()

5. Parallel Processing với Celery

Mục đích: Xử lý nhiều chunks song song để tăng throughput.

Implementation:

# workers/sync_tasks.py
@celery.task(name='sync.trigger_channel_sync')
def trigger_channel_sync(channel_id, start_date, end_date):
    """
    Orchestrator: Chia date range thành chunks và enqueue song song.
    """
    date_chunks = split_date_range(start_date, end_date, chunk_days=1)
    total_chunks = len(date_chunks) * len(shops)
    
    rate_slots = defaultdict(float)  # Rate limiting per platform
    for shop in shops:
        for chunk_start, chunk_end in date_chunks:
            # Tính delay dựa trên rate limit
            countdown = _compute_platform_delay(platform, rate_slots)
            
            # Enqueue chunk task
            task = process_shop_chunk.apply_async(
                kwargs={
                    'shop_id': shop_id,
                    'chunk_start': chunk_start,
                    'chunk_end': chunk_end,
                    ...
                },
                countdown=countdown,  # Rate limiting
                queue=Config.SYNC_CHUNK_QUEUE
            )

Rate Limiting:

def _compute_platform_delay(platform: str, slots: Dict[str, float]) -> float:
    """
    Điều tiết tốc độ enqueue dựa trên rate limit.
    Ví dụ: TikTok limit = 100 requests/minute
    → Mỗi chunk cách nhau 60/100 = 0.6 giây
    """
    limit = PLATFORM_RATE_LIMITS.get(platform.lower())
    if not limit or limit <= 0:
        return 0.0
    step = 60.0 / limit  # Giây giữa các requests
    delay = slots[platform_lower]
    slots[platform_lower] += step
    return delay

Lợi ích:

  • ✅ Xử lý song song nhiều chunks
  • ✅ Tận dụng tối đa resources (CPU, network, I/O)
  • ✅ Rate limiting tự động để tránh vượt API limits
  • ✅ Fault isolation: 1 chunk fail không ảnh hưởng chunks khác

Config:

PLATFORM_RATE_LIMIT_TIKTOK = 100  # requests/minute
PLATFORM_RATE_LIMIT_SHOPEE = 200
SYNC_MAX_PARALLEL_TASKS = 5  # Số workers chạy song song

6. Sync Buffer System (File-based)

Mục đích: Lưu tạm data khi MongoDB fail, replay sau để đảm bảo không mất dữ liệu.

Implementation:

# utils/sync_buffer.py
class SyncBuffer:
    def save_batch(self, collection: str, batch_data: List[Dict], 
                   batch_metadata: Dict, error: Exception) -> str:
        """
        Lưu batch data vào file JSON khi MongoDB write fail.
        File path: data/buffer/sync_tasks/{shop_slug}/{run_timestamp}/failed_batches/
        """
        buffer_entry = {
            "collection": collection,
            "batch_data": batch_data,  # Raw data từ API
            "batch_metadata": {
                "shop_id": shop_id,
                "channel_id": channel_id,
                "platform": platform,
                "convert_fn_type": "tiktok_order",
                "bulk_operations_count": len(bulk_operations),
                "date_window": {"start": start_date, "end": end_date}
            },
            "status": "pending",
            "retry_count": 0,
            "error": {
                "type": type(error).__name__,
                "message": str(error)
            }
        }
        
        # Lưu vào file
        buffer_file_path = os.path.join(self.task_buffer_dir, filename)
        with open(buffer_file_path, 'w') as f:
            json.dump(buffer_entry, f)
        
        return buffer_file_path

Replay Mechanism:

# services/sync_service.py
def _replay_sync_buffer(self, sync_buffer, mongo, ...):
    """
    Replay các batches đã lưu vào buffer sau khi sync xong.
    """
    pending_batches = sync_buffer.get_pending_batches()
    
    for buffer_entry in pending_batches:
        collection = buffer_entry['collection']
        batch_data = buffer_entry['batch_data']
        batch_metadata = buffer_entry['batch_metadata']
        
        # Rebuild convert_fn từ metadata
        if collection == 'orders':
            convert_fn = self._make_tiktok_order_converter(...)
            self._persist_orders_bulk(
                orders=batch_data,
                convert_fn=convert_fn,
                sync_buffer=None  # Không buffer khi replay
            )
        
        # Mark completed nếu thành công
        sync_buffer.mark_batch_completed(buffer_file_path)

Lợi ích:

  • Zero Data Loss: Đảm bảo không mất dữ liệu khi MongoDB fail
  • Automatic Retry: Tự động replay sau khi sync xong
  • Fault Tolerance: Hệ thống vẫn chạy được khi MongoDB có vấn đề
  • Resumable: Có thể replay lại bất cứ lúc nào

Config:

SYNC_BUFFER_ENABLED = True
SYNC_BUFFER_DIR = "data/buffer/sync_tasks"
SYNC_BUFFER_REPLAY_ENABLED = True
SYNC_BUFFER_MAX_RETRIES = 3

7. Smart Error Handling & Retry

Mục đích: Xử lý lỗi thông minh, chỉ retry các lỗi có thể recover.

Non-Retryable Errors:

# config.py
SYNC_NON_RETRYABLE_ERRORS = [
    'unknown platform',
    'invalid platform',
    'unsupported platform'
]

def _should_retry_error(error_message: str) -> bool:
    """
    Chỉ retry các lỗi có thể recover (network, timeout, 5xx).
    Không retry các lỗi logic (unknown platform, invalid config).
    """
    error_lower = error_message.lower()
    non_retryable = Config.SYNC_NON_RETRYABLE_ERRORS
    return not any(pattern in error_lower for pattern in non_retryable)

Retry Strategy:

# services/tiktok_service.py
def _make_request(self, method: str, path: str, ...):
    """
    Retry với exponential backoff cho các lỗi có thể recover.
    """
    for attempt in range(self.MAX_REQUEST_RETRIES):
        try:
            response = requests.request(...)
            if response.status_code in self.RETRYABLE_STATUS_CODES:
                # Retry với backoff
                time.sleep(self.REQUEST_BACKOFF_SECONDS * (2 ** attempt))
                continue
            return response.json()
        except self.RETRYABLE_EXCEPTIONS as e:
            # Network errors → retry
            if attempt < self.MAX_REQUEST_RETRIES - 1:
                time.sleep(self.REQUEST_BACKOFF_SECONDS * (2 ** attempt))
                continue
            raise

Lợi ích:

  • ✅ Tránh retry vô ích các lỗi không thể recover
  • ✅ Tự động retry các lỗi tạm thời (network, timeout)
  • ✅ Exponential backoff để tránh thêm pressure lên API

8. Memory Optimization

Mục đích: Giảm memory footprint khi xử lý hàng triệu records.

Kỹ thuật áp dụng:

8.1. Generator Functions (Lazy Evaluation)

# Thay vì return toàn bộ list
def fetch_orders_batch(...) -> Iterator[List[Dict]]:
    # Yield từng batch, không giữ toàn bộ trong memory
    for batch in batches:
        yield batch

8.2. Streaming Processing

# Xử lý ngay sau khi fetch, không accumulate
for orders_batch in svc.fetch_orders_batch(...):
    # Persist ngay → giải phóng memory
    self._persist_orders_bulk(orders_batch, ...)
    # Batch đã được persist → có thể garbage collect

8.3. Selective Field Loading

# Chỉ load các fields cần thiết khi check existing data
existing_products = mongo.db.products.find(
    {'shop_id': shop_id, 'channel_id': channel_id},
    {'product_id': 1, 'update_time': 1}  # Chỉ load 2 fields
)

8.4. Incremental Updates

# Chỉ upsert products có thay đổi (update_time khác)
for product in products:
    api_update_time = product.get('update_time')
    db_update_time = existing_map.get(product_id)
    
    if api_update_time != db_update_time:
        products_to_upsert.append(product)  # Chỉ upsert thay đổi

Lợi ích:

  • ✅ Memory footprint thấp: có thể xử lý hàng triệu records
  • ✅ Không bị OOM (Out of Memory)
  • ✅ Tận dụng tối đa memory cho batch processing

9. Idempotency & Data Consistency

Mục đích: Đảm bảo không duplicate data, có thể chạy lại an toàn.

Unique Keys:

# Orders: (order_id, shop_id, channel_id)
filter_dict = {
    'order_id': order_id,
    'shop_id': shop_id,
    'channel_id': channel_id
}

# Transactions (TikTok): (transaction_id, shop_id, channel_id)
# Transactions (Shopee): (order_id, release_time, shop_id, channel_id)
# Returns: (return_id, shop_id, channel_id)
# Tracking Status: (order_id, shop_id, channel_id)

Upsert Pattern:

mongo.db.orders.update_one(
    filter_dict,
    {'$set': order_doc},
    upsert=True  # Insert nếu chưa có, update nếu đã có
)

Lợi ích:

  • Idempotent: Chạy lại nhiều lần không tạo duplicate
  • Safe Retry: Có thể retry an toàn mà không lo duplicate
  • Data Consistency: Đảm bảo mỗi record chỉ có 1 bản trong DB

10. Monitoring & Observability

Mục đích: Theo dõi chi tiết quá trình sync để debug và optimize.

10.1. MongoDB-based Logging

# utils/mongo_logger.py
class MongoSyncLogger:
    """
    Structured logging vào MongoDB với metrics chi tiết.
    """
    def log_progress(self, message: str, stage: str = None):
        # Log progress với timestamp, stage
    
    def increment_metric(self, metric_name: str, value: int = 1):
        # Track metrics: orders_fetched, orders_inserted, etc.
    
    def add_error(self, error: str, stage: str = None):
        # Log errors với context

Metrics Tracked:

  • orders_fetched, orders_inserted, orders_updated
  • products_fetched, products_inserted, products_updated
  • transactions_fetched, transactions_inserted, transactions_updated
  • api_calls_count, api_calls_failed, api_response_time_avg
  • duration_seconds, progress_percentage

10.2. File-based Logging

# utils/logger.py
# Daily logs: logs/sync_YYYYMMDD.log
# Error logs: logs/syncdata_errors.log
# Context-aware logging với SyncLogContext

10.3. Summary Files

# utils/sync_buffer.py
# summary.json trong mỗi sync run:
{
    "stats": {
        "total_days": 365,
        "success_days": 360,
        "failed_days": 5,
        "buffered_days": 3,
        "total_chunks": 365,
        "completed_chunks": 360,
        "failed_chunks": 5
    },
    "problem_days": {
        "buffered": ["2024-01-15", "2024-02-20"],
        "failed": ["2024-03-10"]
    }
}

Lợi ích:

  • Full Visibility: Biết chính xác sync đang ở đâu
  • Debug Friendly: Dễ debug khi có vấn đề
  • Performance Analysis: Phân tích performance và optimize
  • Alerting: Có thể alert khi có vấn đề

Chiến Lược Xử Lý Dữ Liệu

1. Data Flow Architecture

API Platform (TikTok/Shopee)
    ↓
Streaming Fetch (Generator)
    ↓
Batch Accumulation (300 items/batch)
    ↓
Data Transformation (DTO Conversion)
    ↓
Bulk MongoDB Write
    ↓
Success? → Continue
    ↓
Fail? → Save to Buffer → Replay Later

2. Processing Order

  1. Products (chỉ sync ở chunk đầu tiên)

    • Load existing products từ DB (chỉ product_id, update_time)
    • So sánh update_time → chỉ upsert thay đổi
    • Giảm I/O và processing time
  2. Orders (streaming, batch)

    • Fetch order summaries (pagination)
    • Batch fetch order details (50 IDs/batch)
    • Persist ngay sau mỗi batch
  3. Order Tracking Status (sau khi orders xong)

    • Collect order IDs từ orders đã fetch
    • Batch fetch tracking status (50 IDs/batch)
    • Persist vào order_tracking_status collection
  4. Transactions (streaming, batch)

    • Fetch transactions theo date range
    • Persist ngay sau mỗi batch
  5. Returns (streaming, batch)

    • Fetch returns theo date range
    • Persist ngay sau mỗi batch

3. Memory Management Strategy

  • Streaming: Không accumulate toàn bộ data
  • Batch Size: 300 items/batch (cân bằng memory vs throughput)
  • Immediate Persist: Persist ngay sau mỗi batch → giải phóng memory
  • Selective Loading: Chỉ load fields cần thiết khi check existing data

Chuẩn Hóa & Transform Dữ Liệu (Data Engineering)

1. Data Transformation Pipeline

Luồng xử lý dữ liệu chuyên nghiệp như Data Engineer:

Raw API Response
    ↓
DTO Layer (Data Transfer Object)
    ↓
Validation & Normalization
    ↓
Type Conversion (Decimal, DateTime, etc.)
    ↓
MongoDB-ready Document
    ↓
Persist to MongoDB

2. DTO Pattern (Data Transfer Object)

Mục đích: Tách biệt data transformation logic, đảm bảo data consistency.

Implementation:

# models/orders/order_dto.py
@dataclass
class OrderDetailDTO(BaseMongoDTO):
    """
    DTO cho Order Detail từ TikTok Shop API
    Chuyển đổi và validate dữ liệu trước khi lưu vào MongoDB
    """
    id: str
    shop_id: str
    channel_id: str
    status: str
    create_time: Optional[int] = None
    paid_time: Optional[int] = None
    # ... các fields khác
    
    def validate(self) -> tuple[bool, List[str]]:
        """
        Validate dữ liệu trước khi lưu.
        Returns: (is_valid, errors)
        """
        errors = []
        if not self.id:
            errors.append("Order ID is required")
        if not self.status:
            errors.append("Order status is required")
        return len(errors) == 0, errors
    
    def to_dict(self) -> dict:
        """
        Convert về dict để lưu vào MongoDB.
        Format datetime, Decimal, nested objects.
        """
        return {
            'order_id': self.id,
            'shop_id': self.shop_id,
            'channel_id': self.channel_id,
            'status': self.status,
            'create_time': self.to_iso_utc7(self.create_time),  # Convert Unix timestamp → ISO string
            'paid_time': self.to_iso_utc7(self.paid_time),
            # ... các fields đã được format
        }

Lợi ích:

  • Separation of Concerns: Logic transform tách biệt khỏi business logic
  • Reusability: DTO có thể dùng ở nhiều nơi
  • Type Safety: Type hints giúp catch errors sớm
  • Validation: Validate trước khi lưu → đảm bảo data quality

3. Base DTO với Helper Methods

Chuẩn hóa các operations chung:

# models/orders/base_dto.py
class BaseMongoDTO:
    """Base class cung cấp helper chuẩn hóa dữ liệu trước khi lưu MongoDB."""
    
    @staticmethod
    def to_decimal(value: Any, default: str = "0") -> Decimal:
        """
        Convert các giá trị số sang Decimal.
        Xử lý None, empty string, Decimal128 từ MongoDB.
        """
        if isinstance(value, Decimal):
            return value
        if isinstance(value, Decimal128):
            return value.to_decimal()
        if value is None or value == "":
            value = default
        return Decimal(str(value))
    
    @staticmethod
    def to_decimal128(value: Any, default: str = "0") -> Decimal128:
        """
        Convert sang Decimal128 cho MongoDB.
        MongoDB lưu số tiền dạng Decimal128 để tránh floating point errors.
        """
        if isinstance(value, Decimal128):
            return value
        if isinstance(value, Decimal):
            return Decimal128(str(value))
        if value is None or value == "":
            return Decimal128(str(default))
        return Decimal128(str(value))
    
    @staticmethod
    def to_iso_utc7(timestamp: Optional[int]) -> Optional[str]:
        """
        Convert Unix timestamp → ISO string (UTC+7).
        Đảm bảo timezone consistency trong toàn bộ hệ thống.
        """
        if timestamp is None:
            return None
        dt = datetime.fromtimestamp(timestamp, tz=UTC_PLUS_7)
        return dt.isoformat()

Lợi ích:

  • Consistency: Tất cả DTOs dùng cùng format (Decimal128, ISO datetime)
  • DRY: Không duplicate code
  • Maintainability: Sửa format ở 1 chỗ → áp dụng toàn bộ

4. Platform-specific Converters

Strategy Pattern cho mỗi platform:

# services/sync_service.py
def _make_tiktok_order_converter(self, shop_id: str, channel_id: str, ...):
    """
    Tạo converter function cho TikTok orders.
    Sử dụng OrderDetailDTO để transform và validate.
    """
    def converter(order: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        try:
            dto = OrderDetailDTO.from_tiktok_response(
                order_data=order,
                shop_id=shop_id,
                channel_id=channel_id
            )
            is_valid, errors = dto.validate()
            result = dto.to_dict()
            result['is_valid'] = is_valid
            result['validation_errors'] = errors if not is_valid else []
            return result
        except Exception as exc:
            # Log error nhưng vẫn lưu raw data với metadata lỗi
            return {
                'order_id': order.get('id', 'UNKNOWN'),
                'shop_id': shop_id,
                'channel_id': channel_id,
                'is_valid': False,
                'validation_errors': [f"Parse error: {str(exc)}"],
                'parse_error': str(exc),
                # Lưu các fields có thể lấy được từ raw data
                'status': order.get('status'),
                'create_time': order.get('create_time'),
            }
    return converter

Lợi ích:

  • Platform Abstraction: Mỗi platform có converter riêng
  • Consistent Interface: Tất cả converters trả về cùng format
  • Error Handling: Xử lý lỗi riêng cho từng platform

5. Data Normalization Best Practices

5.1. Decimal Handling

# ❌ TRƯỚC: Dùng float → floating point errors
price = float(order.get('price', 0))  # 99.99 có thể thành 99.989999999

# ✅ SAU: Dùng Decimal128 cho MongoDB
price = BaseMongoDTO.to_decimal128(order.get('price', 0))  # Chính xác

5.2. DateTime Normalization

# ❌ TRƯỚC: Lưu Unix timestamp trực tiếp
create_time = order.get('create_time')  # 1701234567

# ✅ SAU: Convert sang ISO string với timezone
create_time = BaseMongoDTO.to_iso_utc7(order.get('create_time'))
# → "2024-11-29T10:30:00+07:00"

5.3. Nested Objects

# ❌ TRƯỚC: Lưu nested objects trực tiếp từ API
order['line_items'] = api_response.get('line_items')  # Có thể có None, missing fields

# ✅ SAU: Transform qua DTO
line_items = [
    LineItem.from_api_response(item) for item in api_response.get('line_items', [])
]
order['line_items'] = [item.to_dict() for item in line_items]

5.4. Null Handling

# ❌ TRƯỚC: Lưu None trực tiếp
order['buyer_email'] = api_response.get('buyer_email')  # Có thể là None

# ✅ SAU: Clean dict, loại bỏ None (trừ các fields quan trọng)
order = BaseMongoDTO.clean_dict(order, keep_keys=['order_id', 'status'])

6. Data Quality Assurance

Validation trước khi lưu:

def _persist_orders_bulk(self, orders: List[Dict], convert_fn: Callable, ...):
    bulk_operations = []
    for order in orders:
        try:
            order_doc = convert_fn(order)  # Transform + Validate
            if not order_doc:
                continue
            
            # Check validation errors
            if not order_doc.get('is_valid', True):
                validation_errors = order_doc.get('validation_errors', [])
                logger.warning(f"Invalid order {order_doc.get('order_id')}: {validation_errors}")
                # Vẫn lưu nhưng mark là invalid để review sau
            
            bulk_operations.append(UpdateOne(...))
        except Exception as exc:
            logger.error(f"Failed to normalize order: {exc}")
            # Lưu raw data với error metadata
            ...

Lợi ích:

  • Data Quality: Đảm bảo data đúng format trước khi lưu
  • Error Tracking: Track validation errors để fix sau
  • Backward Compatibility: Vẫn lưu invalid data để không mất thông tin

Cấu Hình Linh Hoạt & Không Hardcode

1. Tại Sao Không Hardcode?

Vấn đề với Hardcode:

# ❌ HARDCODE - Không linh hoạt
def fetch_orders_batch(self):
    batch_size = 300  # Hardcode
    page_size = 100  # Hardcode
    max_retries = 3  # Hardcode
    timeout = 120  # Hardcode
    
    # Vấn đề:
    # - Không thể thay đổi mà không sửa code
    # - Khác nhau giữa dev/staging/production
    # - Khó test với các giá trị khác nhau
    # - Không thể tune performance dễ dàng

Giải pháp: Configuration-based:

# ✅ CONFIG - Linh hoạt
class Config:
    SYNC_BATCH_SIZE_ORDERS = int(os.getenv('SYNC_BATCH_SIZE_ORDERS', '300'))
    TIKTOK_ORDER_DETAILS_BATCH_SIZE = int(os.getenv('TIKTOK_ORDER_DETAILS_BATCH_SIZE', '50'))
    MAX_REQUEST_RETRIES = int(os.getenv('MAX_REQUEST_RETRIES', '3'))
    REQUEST_TIMEOUT = int(os.getenv('REQUEST_TIMEOUT', '120'))

def fetch_orders_batch(self):
    batch_size = config.SYNC_BATCH_SIZE_ORDERS  # Từ config
    # Có thể thay đổi qua environment variables

2. Cấu Trúc Config Tinh Gọn

File config.py - Single Source of Truth:

# config.py
class Config:
    # Database
    MONGO_URI = os.getenv('MONGO_URI')
    MONGO_DBNAME = os.getenv('MONGO_DBNAME')
    
    # Sync Orchestration
    SYNC_CHUNK_DAYS = int(os.getenv('SYNC_CHUNK_DAYS', '1'))
    SYNC_MAX_PARALLEL_TASKS = int(os.getenv('SYNC_MAX_PARALLEL_TASKS', '5'))
    SYNC_CHUNK_TIMEOUT_SECONDS = int(os.getenv('SYNC_CHUNK_TIMEOUT_SECONDS', '9600'))
    
    # Rate Limiting
    PLATFORM_RATE_LIMIT_TIKTOK = int(os.getenv('PLATFORM_RATE_LIMIT_TIKTOK', '100'))
    PLATFORM_RATE_LIMIT_SHOPEE = int(os.getenv('PLATFORM_RATE_LIMIT_SHOPEE', '200'))
    
    # Batch Sizes
    SYNC_BATCH_SIZE_ORDERS = int(os.getenv('SYNC_BATCH_SIZE_ORDERS', '300'))
    TIKTOK_ORDER_DETAILS_BATCH_SIZE = int(os.getenv('TIKTOK_ORDER_DETAILS_BATCH_SIZE', '50'))
    
    # Buffer
    SYNC_BUFFER_ENABLED = os.getenv('SYNC_BUFFER_ENABLED', 'true').lower() == 'true'
    SYNC_BUFFER_DIR = os.getenv('SYNC_BUFFER_DIR', 'data/buffer/sync_tasks')
    SYNC_BUFFER_MAX_RETRIES = int(os.getenv('SYNC_BUFFER_MAX_RETRIES', '3'))
    
    # Celery
    CELERY_BROKER_URL = os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')

Lợi ích:

  • Single Source of Truth: Tất cả config ở 1 chỗ
  • Environment-based: Dễ thay đổi giữa dev/staging/prod
  • Default Values: Có giá trị mặc định hợp lý
  • Type Safety: Convert sang đúng type (int, bool, str)

3. Environment Variables

.env file cho từng environment:

# .env.development
SYNC_BATCH_SIZE_ORDERS=100
SYNC_CHUNK_DAYS=1
PLATFORM_RATE_LIMIT_TIKTOK=50

# .env.production
SYNC_BATCH_SIZE_ORDERS=300
SYNC_CHUNK_DAYS=1
PLATFORM_RATE_LIMIT_TIKTOK=100

Lợi ích:

  • Environment-specific: Config khác nhau cho dev/staging/prod
  • Security: Không commit sensitive data vào git
  • Easy Deployment: Chỉ cần thay đổi .env file

4. Config theo Nghiệp Vụ & Giới Hạn

Ví dụ: Tune batch size theo API limits:

# TikTok API limit: 50 IDs/batch
TIKTOK_ORDER_DETAILS_BATCH_SIZE = 50  # Không thể lớn hơn

# Shopee API limit: 50 order_sns/batch
SHOPEE_ORDER_DETAILS_BATCH_SIZE = 50

# MongoDB bulk_write: Không giới hạn nhưng 300 là optimal
SYNC_BATCH_SIZE_ORDERS = 300  # Có thể tune theo performance

Ví dụ: Tune theo Performance:

# Nếu MongoDB chậm → giảm batch size
SYNC_BATCH_SIZE_ORDERS = 100  # Thay vì 300

# Nếu network nhanh → tăng batch size
SYNC_BATCH_SIZE_ORDERS = 500  # Nếu MongoDB handle được

Ví dụ: Tune theo Business Requirements:

# Sync nhanh hơn → giảm chunk_days
SYNC_CHUNK_DAYS = 1  # 1 ngày/chunk → nhiều chunks song song

# Sync ít overhead hơn → tăng chunk_days
SYNC_CHUNK_DAYS = 7  # 7 ngày/chunk → ít chunks hơn

5. Cấu Trúc File Tinh Gọn

Organization:

project/
├── config.py              # Single config file
├── services/
│   ├── sync_service.py    # Business logic
│   ├── tiktok_service.py  # Platform-specific
│   └── shopee_service.py
├── models/
│   └── orders/
│       ├── order_dto.py   # Data transformation
│       └── base_dto.py
├── utils/
│   ├── sync_buffer.py     # File I/O utilities
│   └── mongo_logger.py
└── workers/
    └── sync_tasks.py      # Celery tasks

Principles:

  • Separation of Concerns: Mỗi module có responsibility rõ ràng
  • DRY: Không duplicate code
  • Single Responsibility: Mỗi file làm 1 việc
  • Easy to Navigate: Dễ tìm code

6. Benefits của Config-based Approach

Flexibility:

  • ✅ Thay đổi config mà không cần deploy code mới
  • ✅ Tune performance dễ dàng
  • ✅ Test với các giá trị khác nhau

Maintainability:

  • ✅ Dễ hiểu: Tất cả config ở 1 chỗ
  • ✅ Dễ document: Comment trong config.py
  • ✅ Dễ review: Config changes dễ review hơn code changes

Scalability:

  • ✅ Scale theo environment (dev/staging/prod)
  • ✅ Scale theo platform (TikTok vs Shopee có limits khác nhau)
  • ✅ Scale theo infrastructure (MongoDB performance, network speed)

Tối Ưu API Calls

1. Batch API Calls

TikTok Order Details:

# Trước: 300 API calls
for order_id in order_ids:
    order_detail = fetch_order_detail(order_id)

# Sau: 6 API calls (50 IDs/batch)
order_details = fetch_order_details_batch(order_ids, batch_size=50)

TikTok Tracking Status:

# Trước: 300 API calls
for order_id in order_ids:
    tracking = fetch_order_tracking_status(order_id)

# Sau: 6 API calls (50 IDs/batch)
tracking_list = fetch_order_tracking_status_batch(order_ids, batch_size=50)

Shopee Order Details:

# Đã hỗ trợ batch từ đầu
order_sn_list = ','.join(order_sns)  # "sn1,sn2,sn3,...,sn50"
order_details = get_order_detail(order_sn_list)

2. Rate Limiting

Platform-specific Limits:

PLATFORM_RATE_LIMIT_TIKTOK = 100  # requests/minute
PLATFORM_RATE_LIMIT_SHOPEE = 200

Implementation:

def _compute_platform_delay(platform: str, slots: Dict[str, float]) -> float:
    """
    Tính delay giữa các requests để không vượt rate limit.
    """
    limit = PLATFORM_RATE_LIMITS.get(platform.lower())
    step = 60.0 / limit  # Giây giữa các requests
    delay = slots[platform_lower]
    slots[platform_lower] += step
    return delay

3. Retry với Exponential Backoff

MAX_REQUEST_RETRIES = 3
REQUEST_BACKOFF_SECONDS = 2

for attempt in range(MAX_REQUEST_RETRIES):
    try:
        response = requests.request(...)
        return response.json()
    except RETRYABLE_EXCEPTIONS:
        if attempt < MAX_REQUEST_RETRIES - 1:
            time.sleep(REQUEST_BACKOFF_SECONDS * (2 ** attempt))
            continue
        raise

Tối Ưu Database Operations

1. Bulk Write Operations

Collections sử dụng bulk_write:

  • orders_persist_orders_bulk()
  • transactions_persist_transactions_bulk()
  • returns_persist_returns_bulk()
  • order_tracking_status_persist_tracking_status_bulk()
  • products → bulk_write trong products sync

Performance Gain:

  • 300 individual updates: ~3 giây
  • 1 bulk_write (300 operations): ~0.3 giây
  • 10x nhanh hơn

2. Selective Field Loading

# Chỉ load fields cần thiết khi check existing
existing_products = mongo.db.products.find(
    {'shop_id': shop_id, 'channel_id': channel_id},
    {'product_id': 1, 'update_time': 1}  # Chỉ 2 fields
)

3. Incremental Updates

# Chỉ upsert products có thay đổi
for product in products:
    api_update_time = product.get('update_time')
    db_update_time = existing_map.get(product_id)
    
    if api_update_time != db_update_time:
        products_to_upsert.append(product)

Xử Lý Lỗi & Đảm Bảo Độ Tin Cậy

1. Sync Buffer System

Khi MongoDB Write Fail:

try:
    result = mongo.db.orders.bulk_write(bulk_operations)
except Exception as exc:
    # Lưu vào buffer để replay sau
    sync_buffer.save_batch(
        collection='orders',
        batch_data=orders,  # Raw data từ API
        batch_metadata={...},
        error=exc
    )

Replay sau khi Sync Xong:

# Tự động replay các batches đã lưu
buffer_replay_result = self._replay_sync_buffer(
    sync_buffer=sync_buffer,
    mongo=mongo,
    ...
)

2. Retry Strategy

Max Retries:

SYNC_BUFFER_MAX_RETRIES = 3  # Retry tối đa 3 lần

Non-Retryable Errors:

SYNC_NON_RETRYABLE_ERRORS = [
    'unknown platform',
    'invalid platform',
    'unsupported platform'
]

3. Error Tracking

MongoDB Logging:

mongo_logger.add_error(error_msg, stage='orders_fetch')

File Logging:

# logs/syncdata_errors.log
logger.error(f"Failed to fetch orders: {error}")

Summary Tracking:

# summary.json
{
    "failed_dates": [
        {
            "date": "2024-01-15",
            "status": "failed",
            "note": "MongoDB connection timeout",
            "timestamp": "2024-01-15T10:30:00+07:00"
        }
    ]
}

File I/O & Race Condition Prevention

1. Vấn Đề Race Condition

Khi nhiều processes cùng ghi vào 1 file:

# ❌ KHÔNG AN TOÀN: Nhiều processes cùng ghi
def write_summary(data):
    # Process A: Load file
    with open('summary.json', 'r') as f:
        existing = json.load(f)
    
    # Process B: Cũng load file (cùng lúc với A)
    with open('summary.json', 'r') as f:
        existing = json.load(f)
    
    # Process A: Update và ghi
    existing['stats']['completed_chunks'] += 1
    with open('summary.json', 'w') as f:
        json.dump(existing, f)  # Ghi đè
    
    # Process B: Update và ghi (ghi đè mất thay đổi của A)
    existing['stats']['completed_chunks'] += 1
    with open('summary.json', 'w') as f:
        json.dump(existing, f)  # Mất thay đổi của A!
    
    # → Race condition: Chỉ có 1 thay đổi được lưu

2. Giải Pháp: File Locking với fcntl

Implementation:

# utils/sync_buffer.py
import fcntl  # Unix/Linux/Mac file locking

class ShopSyncSummary:
    def _write(self):
        """
        Ghi summary.json với file locking và atomic write.
        """
        temp_path = f"{self.summary_path}.tmp"
        lock_path = f"{self.summary_path}.lock"
        
        for attempt in range(max_retries):
            lock_file = None
            try:
                # 1. Acquire exclusive lock
                lock_file = open(lock_path, 'w')
                fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
                # LOCK_EX = Exclusive lock (chỉ 1 process có thể giữ)
                # LOCK_NB = Non-blocking (raise exception nếu không acquire được)
                
                # 2. Load existing data
                existing_data = {}
                if os.path.exists(self.summary_path):
                    with open(self.summary_path, 'r') as f:
                        existing_data = json.load(f)
                
                # 3. Merge với existing data
                merged_data = existing_data.copy()
                # ... merge logic ...
                
                # 4. Atomic write: Ghi vào temp file trước
                with open(temp_path, 'w') as f:
                    json.dump(merged_data, f)
                
                # 5. Atomic rename (atomic operation trên filesystem)
                os.replace(temp_path, self.summary_path)
                # rename là atomic → không có race condition
                
                # 6. Release lock
                fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
                lock_file.close()
                os.remove(lock_path)
                return
                
            except BlockingIOError:
                # Lock đang được giữ bởi process khác → retry
                if lock_file:
                    lock_file.close()
                time.sleep(retry_delay * (attempt + 1))
                continue

Lợi ích:

  • Exclusive Lock: Chỉ 1 process có thể ghi tại 1 thời điểm
  • Atomic Write: Ghi vào temp file → rename (atomic)
  • No Data Loss: Không mất thay đổi của processes khác
  • Retry Logic: Tự động retry nếu không acquire được lock

3. Windows Fallback

Windows không có fcntl:

# File locking support
try:
    import fcntl
    HAS_FCNTL = True
except ImportError:
    HAS_FCNTL = False  # Windows

def _write(self):
    if HAS_FCNTL:
        # Unix/Linux/Mac: Dùng fcntl
        fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
    else:
        # Windows: Dùng file existence check (less safe nhưng better than nothing)
        if os.path.exists(lock_path) and attempt < max_retries - 1:
            time.sleep(retry_delay * (attempt + 1))
            continue

Lợi ích:

  • Cross-platform: Hoạt động trên Unix/Linux/Mac và Windows
  • Graceful Degradation: Windows dùng fallback method

4. Atomic Operations

Atomic Write Pattern:

# ❌ KHÔNG ATOMIC: Ghi trực tiếp
with open('summary.json', 'w') as f:
    json.dump(data, f)  # Nếu crash giữa chừng → file corrupt

# ✅ ATOMIC: Ghi vào temp file → rename
temp_path = f"{self.summary_path}.tmp"
with open(temp_path, 'w') as f:
    json.dump(data, f)  # Ghi vào temp file
os.replace(temp_path, self.summary_path)  # Rename là atomic
# Nếu crash → temp file vẫn còn, có thể recover

Lợi ích:

  • No Corruption: Nếu crash → file gốc không bị corrupt
  • Atomic: Rename là atomic operation trên filesystem
  • Recovery: Có thể recover từ temp file nếu cần

5. Merge Strategy để Tránh Ghi Đè

Vấn đề: Nhiều chunks cùng update counters trong summary.json

Giải pháp: Merge thay vì ghi đè:

def _write(self):
    # Load existing data
    existing_data = {}
    if os.path.exists(self.summary_path):
        with open(self.summary_path, 'r') as f:
            existing_data = json.load(f)
    
    # Merge thay vì ghi đè
    merged_data = existing_data.copy()
    
    # Merge stats: Cộng dồn thay vì ghi đè
    existing_stats = merged_data.get('stats', {})
    new_stats = self.data.get('stats', {})
    
    # Cộng dồn counters
    existing_stats['completed_chunks'] = existing_stats.get('completed_chunks', 0) + new_stats.get('completed_chunks', 0)
    existing_stats['failed_chunks'] = existing_stats.get('failed_chunks', 0) + new_stats.get('failed_chunks', 0)
    
    merged_data['stats'] = existing_stats
    
    # Ghi merged data
    with open(temp_path, 'w') as f:
        json.dump(merged_data, f)
    os.replace(temp_path, self.summary_path)

Lợi ích:

  • No Overwrite: Không ghi đè thay đổi của processes khác
  • Accumulative: Cộng dồn counters từ nhiều chunks
  • Consistency: Đảm bảo summary.json luôn đúng

6. Batch Counter để Tránh Trùng Tên File

Vấn đề: Nhiều processes cùng tạo buffer files → trùng tên

Giải pháp: Scan existing files để tìm max index:

def save_batch(self, collection: str, batch_data: List[Dict], ...):
    # Scan existing files để tìm max index
    counter_key = f"{collection}:{chunk_label}"
    if counter_key not in self._batch_counters:
        max_index = 0
        if os.path.exists(self.task_buffer_dir):
            pattern_prefix = f"{chunk_label}_{collection}_batch_"
            for filename in os.listdir(self.task_buffer_dir):
                if filename.startswith(pattern_prefix) and filename.endswith('.json'):
                    # Extract số thứ tự: YYYYMMDD_collection_batch_XXX.json
                    parts = filename.replace('.json', '').split('_')
                    if len(parts) >= 4 and parts[-1].isdigit():
                        index = int(parts[-1])
                        max_index = max(max_index, index)
        self._batch_counters[counter_key] = max_index
    
    # Tăng counter
    next_index = self._batch_counters[counter_key] + 1
    self._batch_counters[counter_key] = next_index
    
    # Tạo filename với index unique
    buffer_filename = f"{chunk_label}_{collection}_batch_{next_index:03d}.json"

Lợi ích:

  • No Collision: Mỗi file có tên unique
  • Sequential: Files được đánh số tuần tự
  • Recovery-friendly: Dễ scan và replay

7. Best Practices cho File I/O

DO:

  • ✅ Luôn dùng file locking khi nhiều processes
  • ✅ Atomic write (temp file → rename)
  • ✅ Merge thay vì ghi đè
  • ✅ Retry logic với exponential backoff
  • ✅ Error handling và fallback

DON'T:

  • ❌ Ghi trực tiếp vào file chính
  • ❌ Không có locking khi nhiều processes
  • ❌ Ghi đè thay vì merge
  • ❌ Không có retry logic

Tối Ưu Bộ Nhớ & Hiệu Suất

1. Generator Functions

def fetch_orders_batch(...) -> Iterator[List[Dict]]:
    # Yield từng batch, không return toàn bộ list
    for batch in batches:
        yield batch

2. Streaming Processing

# Xử lý ngay, không accumulate
for orders_batch in svc.fetch_orders_batch(...):
    self._persist_orders_bulk(orders_batch, ...)
    # Batch đã persist → có thể GC

3. Batch Size Tuning

SYNC_BATCH_SIZE_ORDERS = 300  # Cân bằng memory vs throughput
SYNC_BATCH_SIZE_TRANSACTIONS = 300
SYNC_BATCH_SIZE_RETURNS = 300

4. Memory-efficient Data Structures

  • Sử dụng dict thay vì list khi cần lookup nhanh
  • Chỉ giữ data cần thiết trong memory
  • Release references ngay sau khi dùng xong

Monitoring & Observability

1. MongoDB Sync Logs

Structure:

{
    "operation": "sync_shop_chunk",
    "status": "completed",
    "platform": "tiktok",
    "shop_id": "...",
    "channel_id": "...",
    "date_range_start": "2024-01-01",
    "date_range_end": "2024-01-01",
    "orders_fetched": 1500,
    "orders_inserted": 1200,
    "orders_updated": 300,
    "duration_seconds": 45.5,
    "api_calls_count": 50,
    "api_response_time_avg": 0.25,
    "stages": [...],
    "chunks": [...],
    "errors": [...],
    "warnings": [...]
}

2. File-based Logs

Daily Logs:

logs/sync_20241129.log

Error Logs:

logs/syncdata_errors.log

3. Summary Files

Location:

data/buffer/sync_tasks/{shop_slug}/{run_timestamp}/summary.json

Content:

{
    "stats": {
        "total_days": 365,
        "success_days": 360,
        "failed_days": 5,
        "buffered_days": 3,
        "total_chunks": 365,
        "completed_chunks": 360,
        "failed_chunks": 5
    },
    "problem_days": {
        "buffered": ["2024-01-15"],
        "failed": ["2024-03-10"]
    },
    "result": {
        "orders_count": 150000,
        "products_count": 5000,
        "transactions_count": 120000
    }
}

4. Metrics Queries

# Get recent syncs
recent_syncs = SyncMetrics.get_recent_syncs(limit=10, platform='tiktok')

# Get sync stats
stats = SyncMetrics.get_sync_stats(
    start_date=datetime(2024, 11, 1),
    end_date=datetime(2024, 11, 4),
    platform='tiktok'
)

# Get failed syncs
failed = SyncMetrics.get_failed_syncs(limit=20)

# Get slow syncs
slow = SyncMetrics.get_slow_syncs(min_duration_seconds=60)

Best Practices & Lessons Learned

1. Chunking Strategy

DO:

  • Chia nhỏ date range thành chunks (1 ngày/chunk)
  • Xử lý song song nhiều chunks
  • Retry từng chunk độc lập

DON'T:

  • Sync toàn bộ date range trong 1 task
  • Quá nhiều chunks nhỏ (overhead)
  • Quá ít chunks lớn (memory issues)

2. Batch Size Tuning

DO:

  • Batch size = 300 cho orders/transactions/returns
  • Batch size = 50 cho API calls (theo API limits)
  • Monitor và adjust dựa trên performance

DON'T:

  • Batch quá nhỏ (overhead)
  • Batch quá lớn (memory issues, timeout)

3. Error Handling

DO:

  • Lưu vào buffer khi MongoDB fail
  • Retry với exponential backoff
  • Track errors chi tiết

DON'T:

  • Throw exception ngay khi fail
  • Retry vô hạn
  • Bỏ qua errors

4. Memory Management

DO:

  • Sử dụng generator functions
  • Persist ngay sau mỗi batch
  • Selective field loading

DON'T:

  • Accumulate toàn bộ data trong memory
  • Load unnecessary fields
  • Giữ references không cần thiết

5. Monitoring

DO:

  • Log chi tiết vào MongoDB
  • Track metrics quan trọng
  • Alert khi có vấn đề

DON'T:

  • Log quá nhiều (performance impact)
  • Bỏ qua monitoring
  • Không track metrics

Đồng Bộ Webhook TikTok (Redis Stream → Worker → Mongo)

Vì sao chọn Redis Stream (so với Kafka/Queue khác)

  • Redis Stream nhẹ, triển khai nhanh, phù hợp throughput trung bình và độ trễ thấp; không cần cluster phức tạp.
  • Hỗ trợ consumer group, pending list giúp retry tự nhiên; idempotent dựa trên idempotency_key.
  • Kafka phù hợp throughput cực lớn và yêu cầu retention dài; bài toán hiện thiên về xử lý theo event gần-real-time, độ phức tạp vận hành thấp.
  • Dự án hiện tại đang sử dụng Redis -> tái sử dụng, tránh setup thêm lib khác.
  • Teammate có kinh nghiệm triển khai tốt hơn với Redis, giúp triển khai nhanh và chuẩn xác hơn.

Luồng xử lý rút gọn

  • Nhận webhook, validate keys bắt buộc, ghi vào Redis Stream.
  • Worker (consumer group) đọc batch, gom ID theo shop (orders/returns/products).
  • Khởi tạo service theo shop (check credential); chỉ fetch chi tiết các ID chưa có trong Mongo.
  • Orders: map OrderDetailDTO để chuẩn hóa timestamp ISO+7, tiền tệ Decimal128, thêm metadata; upsert idempotent.
  • Returns/Products: tương tự như với Orders thôi.
  • Ack sau khi upsert thành công; lỗi Mongo/Redis giữ pending để retry.
  • Lưu ý : Còn có cronjob để chạy mỗi 6h để crawl đơn hàng trong ngày, để đảm bảo không bị miss dữ liệu.

Kỹ thuật nổi bật

  • DTO chuẩn hóa để đồng nhất format giữa webhook và sync định kỳ (Thêm metadata, convert định dạng số cho tiền, định dạng timestamp chung)
  • Bulk upsert Mongo (ordered=False) tăng throughput, tránh overwrite toàn bộ.
  • Missing-only fetch giảm số call API, tránh ghi đè dữ liệu đã có.
  • Metadata đầy đủ (platform, channel_id, shop_id, details_fetched_at, synced_at) phục vụ audit/trace.

Chiến lược lỗi & tin cậy

  • Không ack khi lỗi Mongo/Redis để auto-retry qua pending list.
  • Idempotency bằng idempotency_key + upsert theo khóa {order_id|return_id|product_id, shop_id}.
  • Có thể điều chỉnh maxlen stream để cân bằng lưu trữ và bảo toàn webhook.

Hệ thống Event-driven để thông báo tới các services khác (CRM, Reporting) trong hệ thống Microservices biết có sự thay đổi dữ liệu.

  • Update later...

Kết Luận

Hệ thống đồng bộ dữ liệu này đã áp dụng nhiều kỹ thuật tối ưu để xử lý hàng chục triệu records một cách hiệu quả:

  1. Chunking & Parallel Processing: Chia nhỏ và xử lý song song
  2. Streaming & Batch Processing: Giảm memory footprint
  3. Batch API Calls: Giảm số lượng API calls (30x nhanh hơn)
  4. Bulk MongoDB Operations: Tăng throughput (10x nhanh hơn)
  5. Sync Buffer System: Đảm bảo zero data loss
  6. Smart Error Handling: Retry thông minh
  7. Memory Optimization: Có thể xử lý hàng triệu records
  8. Monitoring & Observability: Full visibility

Với các kỹ thuật này, hệ thống có thể:

  • ✅ Xử lý hàng chục triệu records
  • ✅ Chạy ổn định, không miss data
  • ✅ Kiểm soát tốt memory và performance
  • ✅ Tự động recover khi có lỗi
  • ✅ Monitor và debug dễ dàng

Tác giả: Bienpx Ngày tạo: 2025-11-29
Phiên bản: 1.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment