- Tổng Quan
- Lựa Chọn Công Nghệ (Technology Stack Decisions)
- Kiến Trúc Tổng Thể
- Các Kỹ Thuật Tối Ưu Chính
- Chiến Lược Xử Lý Dữ Liệu
- Chuẩn Hóa & Transform Dữ Liệu (Data Engineering)
- Cấu Hình Linh Hoạt & Không Hardcode
- Tối Ưu API Calls
- Tối Ưu Database Operations
- Xử Lý Lỗi & Đảm Bảo Độ Tin Cậy
- File I/O & Race Condition Prevention
- Tối Ưu Bộ Nhớ & Hiệu Suất
- Monitoring & Observability
- Best Practices & Lessons Learned
- Đồng Bộ Webhook TikTok (Redis Stream → Worker → Mongo)
Hệ thống đồng bộ dữ liệu từ các sàn thương mại điện tử (Shopee, TikTok, Lazada) với quy mô:
- Thời gian: Nhiều năm trước đến hiện tại
- Khối lượng: Hàng chục triệu bản ghi
- Độ phức tạp: Nhiều loại dữ liệu (orders, products, transactions, returns, tracking status)
- Yêu cầu: Không được miss dữ liệu, hệ thống chạy ổn định, kiểm soát tốt
- Quy mô dữ liệu khổng lồ: Hàng chục triệu records cần xử lý
- API Rate Limits: Giới hạn số lượng requests từ các platform
- Memory Constraints: Không thể load toàn bộ data vào RAM
- Network Reliability: API calls có thể fail, timeout
- Database Performance: MongoDB write operations cần tối ưu
- Data Consistency: Đảm bảo không duplicate, không miss data
- Error Recovery: Xử lý lỗi và retry thông minh
Lý do chọn Python:
- ✅ Thư viện phong phú:
pymongo,celery,pandas,requests- mature và stable - ✅ Data manipulation: Dễ xử lý dữ liệu phức tạp (nested objects, arrays, transformations)
- ✅ Type hints: Hỗ trợ type checking với
mypyđể đảm bảo code quality - ✅ Dataclasses: Dễ tạo DTOs và data models với
@dataclass
- ✅ Async I/O:
asynciocho concurrent API calls (nếu cần) - ✅ GIL không phải vấn đề: Với I/O-bound tasks (API calls, DB operations), GIL không ảnh hưởng nhiều
- ✅ Multiprocessing: Celery workers chạy separate processes → bypass GIL
- ✅ Readability: Code dễ đọc, dễ maintain cho team
- ✅ Debugging: Tools tốt (
pdb,ipdb, IDE support) - ✅ Testing:
pytest,unittest- mature testing frameworks
- ❌ Node.js:
- Event loop tốt cho I/O nhưng callback hell với complex data transformations
- TypeScript giúp nhưng vẫn phức tạp hơn Python cho data processing
- Ecosystem cho data processing ít hơn Python
Kết luận: Python phù hợp hơn cho data synchronization tasks với nhiều transformations và validations.
Lý do chọn MongoDB:
- ✅ Dynamic Schema: Dữ liệu từ các platform (TikTok, Shopee) có cấu trúc khác nhau, thay đổi theo thời gian
- ✅ Nested Documents: Dễ lưu nested objects (line_items, packages, addresses) mà không cần JOIN
- ✅ No Migration: Thêm fields mới không cần migration scripts
Ví dụ:
# MongoDB: Lưu trực tiếp nested structure
order = {
"order_id": "123",
"line_items": [
{"product_id": "p1", "quantity": 2, "price": 100},
{"product_id": "p2", "quantity": 1, "price": 200}
],
"recipient_address": {
"name": "John",
"street": "123 Main St",
"city": "Hanoi"
}
}
mongo.db.orders.insert_one(order)
# MySQL/PostgreSQL: Cần nhiều tables và JOINs
# orders table
# order_line_items table (foreign key)
# addresses table (foreign key)
# → Phức tạp hơn, nhiều queries hơn- ✅ Bulk Write:
bulk_write()vớiordered=False→ parallel writes trong MongoDB - ✅ No Transactions Overhead: Với sync operations, không cần ACID transactions → faster writes
- ✅ Document-level Locking: Nhanh hơn row-level locking trong RDBMS
Performance Comparison:
MongoDB bulk_write (300 orders):
- Time: ~0.3 giây
- Throughput: ~1000 ops/second
PostgreSQL bulk insert (300 orders + line_items):
- Time: ~1.5 giây (với JOINs và foreign keys)
- Throughput: ~200 ops/second
- ✅ Sharding: Dễ scale horizontally khi data lớn
- ✅ Replica Sets: High availability với automatic failover
- ✅ No JOINs: Không cần JOINs → dễ distribute data
- ✅ Hàng chục triệu records: MongoDB handle tốt với proper indexing
- ✅ JSON Storage: Native JSON → không cần serialization/deserialization
- ✅ Compression: WiredTiger storage engine có compression → tiết kiệm disk
MySQL/PostgreSQL:
- ❌ Fixed Schema: Cần migration khi thay đổi structure
- ❌ JOINs: Phức tạp với nested data, chậm hơn
- ❌ Transactions: Overhead không cần thiết cho sync operations
- ✅ ACID: Tốt cho financial transactions (nhưng không cần cho sync)
Kết luận: MongoDB phù hợp hơn cho:
- Dữ liệu có schema linh hoạt (e-commerce platforms)
- Write-heavy workloads (sync operations)
- Nested data structures
- Horizontal scaling requirements
Lý do chọn Redis làm Celery Broker:
- ✅ Lightweight: Redis nhẹ, dễ setup, ít dependencies
- ✅ Fast: In-memory → latency thấp (< 1ms)
- ✅ Simple API:
LPUSH,BRPOP- đơn giản cho task queue
Performance Comparison:
Redis (in-memory):
- Latency: < 1ms
- Throughput: 100,000+ ops/second
- Memory usage: Low (chỉ lưu task metadata)
RabbitMQ:
- Latency: 5-10ms
- Throughput: 20,000-50,000 ops/second
- Memory usage: Higher (persistent queues)
Kafka:
- Latency: 10-50ms (overkill cho task queue)
- Throughput: 1M+ ops/second (không cần thiết)
- Complexity: High (Zookeeper, partitions, etc.)
- ✅ Task Queue: Celery tasks là short-lived, không cần persistence lâu
- ✅ No Message Persistence Needed: Task results không lưu trong broker (dùng MongoDB logs)
- ✅ Rate Limiting: Redis có thể dùng cho rate limiting (nếu cần)
MongoDB Broker:
- ❌ Slower: Disk-based → latency cao hơn Redis
- ❌ Overhead: MongoDB đã dùng cho data storage → không nên dùng cho queue
- ❌ Complexity: Cần setup MongoDB chỉ cho broker
RabbitMQ:
- ✅ Features: Message persistence, routing, exchanges
- ❌ Overkill: Không cần features phức tạp cho simple task queue
- ❌ Heavier: Nhiều dependencies, setup phức tạp hơn
Kafka:
- ✅ High Throughput: 1M+ messages/second
- ❌ Overkill: Quá phức tạp cho task queue
- ❌ Latency: Higher latency (10-50ms)
- ❌ Complexity: Cần Zookeeper, partitions management
Architecture:
Celery Worker ←→ Redis (Broker) ←→ Celery Client
↓
MongoDB (Result Backend - optional)
Benefits:
- ✅ Fast Task Distribution: Redis in-memory → tasks được distribute nhanh
- ✅ Simple Setup: Chỉ cần Redis server
- ✅ No Result Backend: Không lưu results trong Redis (dùng MongoDB logs)
- ✅ Rate Limiting: Có thể dùng Redis cho rate limiting nếu cần
Kết luận: Redis là lựa chọn tối ưu cho Celery broker vì:
- Đơn giản, nhanh
- Phù hợp với use case (task queue, không cần persistence)
- Dễ scale và maintain
┌─────────────────────────────────────────────────────────┐
│ Celery Task Orchestrator │
│ (trigger_all_channels_sync, trigger_channel_sync) │
└──────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Chunk Processing Layer │
│ (process_shop_chunk - chia nhỏ theo date range) │
└──────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Sync Service Layer │
│ (sync_shop_data - batch processing, streaming) │
└──────────────────┬──────────────────────────────────────┘
│
┌──────────┴──────────┐
▼ ▼
┌──────────────┐ ┌──────────────┐
│ TikTok API │ │ Shopee API │
│ Service │ │ Service │
└──────┬───────┘ └──────┬───────┘
│ │
└──────────┬──────────┘
▼
┌──────────────────┐
│ MongoDB │
│ (Bulk Write) │
└──────────────────┘
│
▼
┌──────────────────┐
│ Sync Buffer │
│ (File-based) │
└──────────────────┘
1. Orchestrator Task
↓
2. Chia Date Range thành Chunks (ví dụ: 1 ngày/chunk)
↓
3. Enqueue Chunk Tasks (với rate limiting)
↓
4. Worker xử lý từng Chunk:
├─ Fetch Products (nếu chunk đầu tiên)
├─ Fetch Orders (streaming, batch)
├─ Fetch Order Details (batch API calls)
├─ Fetch Tracking Status (batch API calls)
├─ Fetch Transactions (streaming, batch)
└─ Fetch Returns (streaming, batch)
↓
5. Persist vào MongoDB (bulk_write)
↓
6. Nếu MongoDB fail → Lưu vào Buffer
↓
7. Replay Buffer sau khi sync xong
Mục đích: Chia nhỏ khoảng thời gian lớn thành các chunk nhỏ để xử lý song song và dễ quản lý.
Implementation:
# utils/date_splitter.py
def split_date_range(start_date: str, end_date: str, chunk_days: int = 1) -> List[Tuple[str, str]]:
"""
Chia khoảng ngày thành các cửa sổ cố định.
Ví dụ: 2020-01-01 -> 2024-12-31 với chunk_days=1
→ Tạo ~1800 chunks, mỗi chunk xử lý 1 ngày
"""
chunks = []
cursor = start_dt
while cursor <= end_dt:
chunk_end = min(cursor + timedelta(days=chunk_days - 1), end_dt)
chunks.append((cursor.strftime("%Y-%m-%d"), chunk_end.strftime("%Y-%m-%d")))
cursor = chunk_end + timedelta(days=1)
return chunksLợi ích:
- ✅ Xử lý song song nhiều chunks cùng lúc
- ✅ Dễ retry khi 1 chunk fail
- ✅ Kiểm soát memory usage (chỉ xử lý 1 chunk tại 1 thời điểm)
- ✅ Progress tracking chi tiết theo chunk
Config:
SYNC_CHUNK_DAYS = 1 # Mỗi chunk = 1 ngàyMục đích: Xử lý dữ liệu theo batch, không load toàn bộ vào RAM.
Implementation:
# services/tiktok_service.py
def fetch_orders_batch(self, start_date: str, end_date: str, batch_size: int = 300):
"""
Generator function - yield từng batch thay vì return toàn bộ list.
Giúp giảm memory footprint khi xử lý hàng triệu orders.
"""
current_batch = []
for order_summary in order_summaries:
# Fetch order details theo batch (50 IDs/lần)
order_details = self.fetch_order_details_batch(order_ids_batch)
current_batch.extend(order_details)
if len(current_batch) >= batch_size:
yield current_batch # Yield batch, không giữ trong memory
current_batch = [] # Reset để tiếp tục
if current_batch:
yield current_batch # Yield batch cuối cùngLợi ích:
- ✅ Memory footprint thấp: chỉ giữ 1 batch trong RAM
- ✅ Có thể xử lý hàng triệu records mà không bị OOM
- ✅ Persist ngay sau mỗi batch → giảm risk mất data
Config:
SYNC_BATCH_SIZE_ORDERS = 300
SYNC_BATCH_SIZE_TRANSACTIONS = 300
SYNC_BATCH_SIZE_RETURNS = 300Mục đích: Giảm số lượng API calls bằng cách gom nhiều IDs vào 1 request.
Vấn đề ban đầu (N+1 Problem):
# ❌ TRƯỚC: 300 orders → 300 API calls
for order_summary in order_summaries:
order_detail = self.fetch_order_detail(order_id) # 1 API call/order
# 300 orders × 300ms = 90 giâyGiải pháp (Batch API):
# ✅ SAU: 300 orders → 6 API calls (50 IDs/batch)
def fetch_order_details_batch(self, order_ids: List[str], batch_size: int = 50):
"""
Gom nhiều order IDs vào 1 API call.
TikTok API hỗ trợ parameter 'ids' với comma-separated values.
"""
all_details = []
for i in range(0, len(order_ids), batch_size):
batch_ids = order_ids[i:i + batch_size]
ids_param = ','.join(batch_ids) # "id1,id2,id3,...,id50"
# 1 API call cho 50 orders
response = self._make_request(..., params={'ids': ids_param})
all_details.extend(response.get('data', {}).get('order_list', []))
return all_details
# 300 orders ÷ 50 = 6 API calls × 500ms = 3 giây
# Cải thiện: 30x nhanh hơn!Lợi ích:
- ✅ Giảm số lượng API calls: 300 → 6 (với batch_size=50)
- ✅ Giảm thời gian: 90s → 3s (30x nhanh hơn)
- ✅ Giảm rate limit pressure
- ✅ Giảm network overhead
Config:
TIKTOK_ORDER_DETAILS_BATCH_SIZE = 50 # TikTok API limit
TIKTOK_TRACKING_STATUS_BATCH_SIZE = 50
SHOPEE_ORDER_DETAILS_BATCH_SIZE = 50Mục đích: Sử dụng bulk_write thay vì từng update_one để tối ưu performance.
Vấn đề ban đầu:
# ❌ TRƯỚC: 300 orders → 300 MongoDB operations
for order in orders:
mongo.db.orders.update_one(
{'order_id': order['id'], 'shop_id': shop_id},
{'$set': order},
upsert=True
)
# 300 operations × 10ms = 3 giâyGiải pháp (Bulk Write):
# ✅ SAU: 300 orders → 1 bulk_write operation
def _persist_orders_bulk(self, orders: List[Dict], ...):
bulk_operations = []
for order in orders:
bulk_operations.append(
UpdateOne(
{'order_id': order['id'], 'shop_id': shop_id, 'channel_id': channel_id},
{'$set': {...order, 'synced_at': iso_utc7_now()}},
upsert=True
)
)
# 1 operation cho 300 orders
result = mongo.db.orders.bulk_write(bulk_operations, ordered=False)
# 300 orders × 1ms = 0.3 giây
# Cải thiện: 10x nhanh hơn!Lợi ích:
- ✅ Giảm số lượng database round-trips
- ✅ Tăng throughput: 10-100x nhanh hơn
- ✅ Atomic operation: tất cả hoặc không có gì
- ✅
ordered=False: xử lý song song trong MongoDB
Collections sử dụng bulk_write:
orders→_persist_orders_bulk()transactions→_persist_transactions_bulk()returns→_persist_returns_bulk()order_tracking_status→_persist_tracking_status_bulk()products→ bulk_write trong_sync_shop_data_batch()
Mục đích: Xử lý nhiều chunks song song để tăng throughput.
Implementation:
# workers/sync_tasks.py
@celery.task(name='sync.trigger_channel_sync')
def trigger_channel_sync(channel_id, start_date, end_date):
"""
Orchestrator: Chia date range thành chunks và enqueue song song.
"""
date_chunks = split_date_range(start_date, end_date, chunk_days=1)
total_chunks = len(date_chunks) * len(shops)
rate_slots = defaultdict(float) # Rate limiting per platform
for shop in shops:
for chunk_start, chunk_end in date_chunks:
# Tính delay dựa trên rate limit
countdown = _compute_platform_delay(platform, rate_slots)
# Enqueue chunk task
task = process_shop_chunk.apply_async(
kwargs={
'shop_id': shop_id,
'chunk_start': chunk_start,
'chunk_end': chunk_end,
...
},
countdown=countdown, # Rate limiting
queue=Config.SYNC_CHUNK_QUEUE
)Rate Limiting:
def _compute_platform_delay(platform: str, slots: Dict[str, float]) -> float:
"""
Điều tiết tốc độ enqueue dựa trên rate limit.
Ví dụ: TikTok limit = 100 requests/minute
→ Mỗi chunk cách nhau 60/100 = 0.6 giây
"""
limit = PLATFORM_RATE_LIMITS.get(platform.lower())
if not limit or limit <= 0:
return 0.0
step = 60.0 / limit # Giây giữa các requests
delay = slots[platform_lower]
slots[platform_lower] += step
return delayLợi ích:
- ✅ Xử lý song song nhiều chunks
- ✅ Tận dụng tối đa resources (CPU, network, I/O)
- ✅ Rate limiting tự động để tránh vượt API limits
- ✅ Fault isolation: 1 chunk fail không ảnh hưởng chunks khác
Config:
PLATFORM_RATE_LIMIT_TIKTOK = 100 # requests/minute
PLATFORM_RATE_LIMIT_SHOPEE = 200
SYNC_MAX_PARALLEL_TASKS = 5 # Số workers chạy song songMục đích: Lưu tạm data khi MongoDB fail, replay sau để đảm bảo không mất dữ liệu.
Implementation:
# utils/sync_buffer.py
class SyncBuffer:
def save_batch(self, collection: str, batch_data: List[Dict],
batch_metadata: Dict, error: Exception) -> str:
"""
Lưu batch data vào file JSON khi MongoDB write fail.
File path: data/buffer/sync_tasks/{shop_slug}/{run_timestamp}/failed_batches/
"""
buffer_entry = {
"collection": collection,
"batch_data": batch_data, # Raw data từ API
"batch_metadata": {
"shop_id": shop_id,
"channel_id": channel_id,
"platform": platform,
"convert_fn_type": "tiktok_order",
"bulk_operations_count": len(bulk_operations),
"date_window": {"start": start_date, "end": end_date}
},
"status": "pending",
"retry_count": 0,
"error": {
"type": type(error).__name__,
"message": str(error)
}
}
# Lưu vào file
buffer_file_path = os.path.join(self.task_buffer_dir, filename)
with open(buffer_file_path, 'w') as f:
json.dump(buffer_entry, f)
return buffer_file_pathReplay Mechanism:
# services/sync_service.py
def _replay_sync_buffer(self, sync_buffer, mongo, ...):
"""
Replay các batches đã lưu vào buffer sau khi sync xong.
"""
pending_batches = sync_buffer.get_pending_batches()
for buffer_entry in pending_batches:
collection = buffer_entry['collection']
batch_data = buffer_entry['batch_data']
batch_metadata = buffer_entry['batch_metadata']
# Rebuild convert_fn từ metadata
if collection == 'orders':
convert_fn = self._make_tiktok_order_converter(...)
self._persist_orders_bulk(
orders=batch_data,
convert_fn=convert_fn,
sync_buffer=None # Không buffer khi replay
)
# Mark completed nếu thành công
sync_buffer.mark_batch_completed(buffer_file_path)Lợi ích:
- ✅ Zero Data Loss: Đảm bảo không mất dữ liệu khi MongoDB fail
- ✅ Automatic Retry: Tự động replay sau khi sync xong
- ✅ Fault Tolerance: Hệ thống vẫn chạy được khi MongoDB có vấn đề
- ✅ Resumable: Có thể replay lại bất cứ lúc nào
Config:
SYNC_BUFFER_ENABLED = True
SYNC_BUFFER_DIR = "data/buffer/sync_tasks"
SYNC_BUFFER_REPLAY_ENABLED = True
SYNC_BUFFER_MAX_RETRIES = 3Mục đích: Xử lý lỗi thông minh, chỉ retry các lỗi có thể recover.
Non-Retryable Errors:
# config.py
SYNC_NON_RETRYABLE_ERRORS = [
'unknown platform',
'invalid platform',
'unsupported platform'
]
def _should_retry_error(error_message: str) -> bool:
"""
Chỉ retry các lỗi có thể recover (network, timeout, 5xx).
Không retry các lỗi logic (unknown platform, invalid config).
"""
error_lower = error_message.lower()
non_retryable = Config.SYNC_NON_RETRYABLE_ERRORS
return not any(pattern in error_lower for pattern in non_retryable)Retry Strategy:
# services/tiktok_service.py
def _make_request(self, method: str, path: str, ...):
"""
Retry với exponential backoff cho các lỗi có thể recover.
"""
for attempt in range(self.MAX_REQUEST_RETRIES):
try:
response = requests.request(...)
if response.status_code in self.RETRYABLE_STATUS_CODES:
# Retry với backoff
time.sleep(self.REQUEST_BACKOFF_SECONDS * (2 ** attempt))
continue
return response.json()
except self.RETRYABLE_EXCEPTIONS as e:
# Network errors → retry
if attempt < self.MAX_REQUEST_RETRIES - 1:
time.sleep(self.REQUEST_BACKOFF_SECONDS * (2 ** attempt))
continue
raiseLợi ích:
- ✅ Tránh retry vô ích các lỗi không thể recover
- ✅ Tự động retry các lỗi tạm thời (network, timeout)
- ✅ Exponential backoff để tránh thêm pressure lên API
Mục đích: Giảm memory footprint khi xử lý hàng triệu records.
Kỹ thuật áp dụng:
# Thay vì return toàn bộ list
def fetch_orders_batch(...) -> Iterator[List[Dict]]:
# Yield từng batch, không giữ toàn bộ trong memory
for batch in batches:
yield batch# Xử lý ngay sau khi fetch, không accumulate
for orders_batch in svc.fetch_orders_batch(...):
# Persist ngay → giải phóng memory
self._persist_orders_bulk(orders_batch, ...)
# Batch đã được persist → có thể garbage collect# Chỉ load các fields cần thiết khi check existing data
existing_products = mongo.db.products.find(
{'shop_id': shop_id, 'channel_id': channel_id},
{'product_id': 1, 'update_time': 1} # Chỉ load 2 fields
)# Chỉ upsert products có thay đổi (update_time khác)
for product in products:
api_update_time = product.get('update_time')
db_update_time = existing_map.get(product_id)
if api_update_time != db_update_time:
products_to_upsert.append(product) # Chỉ upsert thay đổiLợi ích:
- ✅ Memory footprint thấp: có thể xử lý hàng triệu records
- ✅ Không bị OOM (Out of Memory)
- ✅ Tận dụng tối đa memory cho batch processing
Mục đích: Đảm bảo không duplicate data, có thể chạy lại an toàn.
Unique Keys:
# Orders: (order_id, shop_id, channel_id)
filter_dict = {
'order_id': order_id,
'shop_id': shop_id,
'channel_id': channel_id
}
# Transactions (TikTok): (transaction_id, shop_id, channel_id)
# Transactions (Shopee): (order_id, release_time, shop_id, channel_id)
# Returns: (return_id, shop_id, channel_id)
# Tracking Status: (order_id, shop_id, channel_id)Upsert Pattern:
mongo.db.orders.update_one(
filter_dict,
{'$set': order_doc},
upsert=True # Insert nếu chưa có, update nếu đã có
)Lợi ích:
- ✅ Idempotent: Chạy lại nhiều lần không tạo duplicate
- ✅ Safe Retry: Có thể retry an toàn mà không lo duplicate
- ✅ Data Consistency: Đảm bảo mỗi record chỉ có 1 bản trong DB
Mục đích: Theo dõi chi tiết quá trình sync để debug và optimize.
# utils/mongo_logger.py
class MongoSyncLogger:
"""
Structured logging vào MongoDB với metrics chi tiết.
"""
def log_progress(self, message: str, stage: str = None):
# Log progress với timestamp, stage
def increment_metric(self, metric_name: str, value: int = 1):
# Track metrics: orders_fetched, orders_inserted, etc.
def add_error(self, error: str, stage: str = None):
# Log errors với contextMetrics Tracked:
orders_fetched,orders_inserted,orders_updatedproducts_fetched,products_inserted,products_updatedtransactions_fetched,transactions_inserted,transactions_updatedapi_calls_count,api_calls_failed,api_response_time_avgduration_seconds,progress_percentage
# utils/logger.py
# Daily logs: logs/sync_YYYYMMDD.log
# Error logs: logs/syncdata_errors.log
# Context-aware logging với SyncLogContext# utils/sync_buffer.py
# summary.json trong mỗi sync run:
{
"stats": {
"total_days": 365,
"success_days": 360,
"failed_days": 5,
"buffered_days": 3,
"total_chunks": 365,
"completed_chunks": 360,
"failed_chunks": 5
},
"problem_days": {
"buffered": ["2024-01-15", "2024-02-20"],
"failed": ["2024-03-10"]
}
}Lợi ích:
- ✅ Full Visibility: Biết chính xác sync đang ở đâu
- ✅ Debug Friendly: Dễ debug khi có vấn đề
- ✅ Performance Analysis: Phân tích performance và optimize
- ✅ Alerting: Có thể alert khi có vấn đề
API Platform (TikTok/Shopee)
↓
Streaming Fetch (Generator)
↓
Batch Accumulation (300 items/batch)
↓
Data Transformation (DTO Conversion)
↓
Bulk MongoDB Write
↓
Success? → Continue
↓
Fail? → Save to Buffer → Replay Later
-
Products (chỉ sync ở chunk đầu tiên)
- Load existing products từ DB (chỉ
product_id,update_time) - So sánh
update_time→ chỉ upsert thay đổi - Giảm I/O và processing time
- Load existing products từ DB (chỉ
-
Orders (streaming, batch)
- Fetch order summaries (pagination)
- Batch fetch order details (50 IDs/batch)
- Persist ngay sau mỗi batch
-
Order Tracking Status (sau khi orders xong)
- Collect order IDs từ orders đã fetch
- Batch fetch tracking status (50 IDs/batch)
- Persist vào
order_tracking_statuscollection
-
Transactions (streaming, batch)
- Fetch transactions theo date range
- Persist ngay sau mỗi batch
-
Returns (streaming, batch)
- Fetch returns theo date range
- Persist ngay sau mỗi batch
- Streaming: Không accumulate toàn bộ data
- Batch Size: 300 items/batch (cân bằng memory vs throughput)
- Immediate Persist: Persist ngay sau mỗi batch → giải phóng memory
- Selective Loading: Chỉ load fields cần thiết khi check existing data
Luồng xử lý dữ liệu chuyên nghiệp như Data Engineer:
Raw API Response
↓
DTO Layer (Data Transfer Object)
↓
Validation & Normalization
↓
Type Conversion (Decimal, DateTime, etc.)
↓
MongoDB-ready Document
↓
Persist to MongoDB
Mục đích: Tách biệt data transformation logic, đảm bảo data consistency.
Implementation:
# models/orders/order_dto.py
@dataclass
class OrderDetailDTO(BaseMongoDTO):
"""
DTO cho Order Detail từ TikTok Shop API
Chuyển đổi và validate dữ liệu trước khi lưu vào MongoDB
"""
id: str
shop_id: str
channel_id: str
status: str
create_time: Optional[int] = None
paid_time: Optional[int] = None
# ... các fields khác
def validate(self) -> tuple[bool, List[str]]:
"""
Validate dữ liệu trước khi lưu.
Returns: (is_valid, errors)
"""
errors = []
if not self.id:
errors.append("Order ID is required")
if not self.status:
errors.append("Order status is required")
return len(errors) == 0, errors
def to_dict(self) -> dict:
"""
Convert về dict để lưu vào MongoDB.
Format datetime, Decimal, nested objects.
"""
return {
'order_id': self.id,
'shop_id': self.shop_id,
'channel_id': self.channel_id,
'status': self.status,
'create_time': self.to_iso_utc7(self.create_time), # Convert Unix timestamp → ISO string
'paid_time': self.to_iso_utc7(self.paid_time),
# ... các fields đã được format
}Lợi ích:
- ✅ Separation of Concerns: Logic transform tách biệt khỏi business logic
- ✅ Reusability: DTO có thể dùng ở nhiều nơi
- ✅ Type Safety: Type hints giúp catch errors sớm
- ✅ Validation: Validate trước khi lưu → đảm bảo data quality
Chuẩn hóa các operations chung:
# models/orders/base_dto.py
class BaseMongoDTO:
"""Base class cung cấp helper chuẩn hóa dữ liệu trước khi lưu MongoDB."""
@staticmethod
def to_decimal(value: Any, default: str = "0") -> Decimal:
"""
Convert các giá trị số sang Decimal.
Xử lý None, empty string, Decimal128 từ MongoDB.
"""
if isinstance(value, Decimal):
return value
if isinstance(value, Decimal128):
return value.to_decimal()
if value is None or value == "":
value = default
return Decimal(str(value))
@staticmethod
def to_decimal128(value: Any, default: str = "0") -> Decimal128:
"""
Convert sang Decimal128 cho MongoDB.
MongoDB lưu số tiền dạng Decimal128 để tránh floating point errors.
"""
if isinstance(value, Decimal128):
return value
if isinstance(value, Decimal):
return Decimal128(str(value))
if value is None or value == "":
return Decimal128(str(default))
return Decimal128(str(value))
@staticmethod
def to_iso_utc7(timestamp: Optional[int]) -> Optional[str]:
"""
Convert Unix timestamp → ISO string (UTC+7).
Đảm bảo timezone consistency trong toàn bộ hệ thống.
"""
if timestamp is None:
return None
dt = datetime.fromtimestamp(timestamp, tz=UTC_PLUS_7)
return dt.isoformat()Lợi ích:
- ✅ Consistency: Tất cả DTOs dùng cùng format (Decimal128, ISO datetime)
- ✅ DRY: Không duplicate code
- ✅ Maintainability: Sửa format ở 1 chỗ → áp dụng toàn bộ
Strategy Pattern cho mỗi platform:
# services/sync_service.py
def _make_tiktok_order_converter(self, shop_id: str, channel_id: str, ...):
"""
Tạo converter function cho TikTok orders.
Sử dụng OrderDetailDTO để transform và validate.
"""
def converter(order: Dict[str, Any]) -> Optional[Dict[str, Any]]:
try:
dto = OrderDetailDTO.from_tiktok_response(
order_data=order,
shop_id=shop_id,
channel_id=channel_id
)
is_valid, errors = dto.validate()
result = dto.to_dict()
result['is_valid'] = is_valid
result['validation_errors'] = errors if not is_valid else []
return result
except Exception as exc:
# Log error nhưng vẫn lưu raw data với metadata lỗi
return {
'order_id': order.get('id', 'UNKNOWN'),
'shop_id': shop_id,
'channel_id': channel_id,
'is_valid': False,
'validation_errors': [f"Parse error: {str(exc)}"],
'parse_error': str(exc),
# Lưu các fields có thể lấy được từ raw data
'status': order.get('status'),
'create_time': order.get('create_time'),
}
return converterLợi ích:
- ✅ Platform Abstraction: Mỗi platform có converter riêng
- ✅ Consistent Interface: Tất cả converters trả về cùng format
- ✅ Error Handling: Xử lý lỗi riêng cho từng platform
# ❌ TRƯỚC: Dùng float → floating point errors
price = float(order.get('price', 0)) # 99.99 có thể thành 99.989999999
# ✅ SAU: Dùng Decimal128 cho MongoDB
price = BaseMongoDTO.to_decimal128(order.get('price', 0)) # Chính xác# ❌ TRƯỚC: Lưu Unix timestamp trực tiếp
create_time = order.get('create_time') # 1701234567
# ✅ SAU: Convert sang ISO string với timezone
create_time = BaseMongoDTO.to_iso_utc7(order.get('create_time'))
# → "2024-11-29T10:30:00+07:00"# ❌ TRƯỚC: Lưu nested objects trực tiếp từ API
order['line_items'] = api_response.get('line_items') # Có thể có None, missing fields
# ✅ SAU: Transform qua DTO
line_items = [
LineItem.from_api_response(item) for item in api_response.get('line_items', [])
]
order['line_items'] = [item.to_dict() for item in line_items]# ❌ TRƯỚC: Lưu None trực tiếp
order['buyer_email'] = api_response.get('buyer_email') # Có thể là None
# ✅ SAU: Clean dict, loại bỏ None (trừ các fields quan trọng)
order = BaseMongoDTO.clean_dict(order, keep_keys=['order_id', 'status'])Validation trước khi lưu:
def _persist_orders_bulk(self, orders: List[Dict], convert_fn: Callable, ...):
bulk_operations = []
for order in orders:
try:
order_doc = convert_fn(order) # Transform + Validate
if not order_doc:
continue
# Check validation errors
if not order_doc.get('is_valid', True):
validation_errors = order_doc.get('validation_errors', [])
logger.warning(f"Invalid order {order_doc.get('order_id')}: {validation_errors}")
# Vẫn lưu nhưng mark là invalid để review sau
bulk_operations.append(UpdateOne(...))
except Exception as exc:
logger.error(f"Failed to normalize order: {exc}")
# Lưu raw data với error metadata
...Lợi ích:
- ✅ Data Quality: Đảm bảo data đúng format trước khi lưu
- ✅ Error Tracking: Track validation errors để fix sau
- ✅ Backward Compatibility: Vẫn lưu invalid data để không mất thông tin
Vấn đề với Hardcode:
# ❌ HARDCODE - Không linh hoạt
def fetch_orders_batch(self):
batch_size = 300 # Hardcode
page_size = 100 # Hardcode
max_retries = 3 # Hardcode
timeout = 120 # Hardcode
# Vấn đề:
# - Không thể thay đổi mà không sửa code
# - Khác nhau giữa dev/staging/production
# - Khó test với các giá trị khác nhau
# - Không thể tune performance dễ dàngGiải pháp: Configuration-based:
# ✅ CONFIG - Linh hoạt
class Config:
SYNC_BATCH_SIZE_ORDERS = int(os.getenv('SYNC_BATCH_SIZE_ORDERS', '300'))
TIKTOK_ORDER_DETAILS_BATCH_SIZE = int(os.getenv('TIKTOK_ORDER_DETAILS_BATCH_SIZE', '50'))
MAX_REQUEST_RETRIES = int(os.getenv('MAX_REQUEST_RETRIES', '3'))
REQUEST_TIMEOUT = int(os.getenv('REQUEST_TIMEOUT', '120'))
def fetch_orders_batch(self):
batch_size = config.SYNC_BATCH_SIZE_ORDERS # Từ config
# Có thể thay đổi qua environment variablesFile config.py - Single Source of Truth:
# config.py
class Config:
# Database
MONGO_URI = os.getenv('MONGO_URI')
MONGO_DBNAME = os.getenv('MONGO_DBNAME')
# Sync Orchestration
SYNC_CHUNK_DAYS = int(os.getenv('SYNC_CHUNK_DAYS', '1'))
SYNC_MAX_PARALLEL_TASKS = int(os.getenv('SYNC_MAX_PARALLEL_TASKS', '5'))
SYNC_CHUNK_TIMEOUT_SECONDS = int(os.getenv('SYNC_CHUNK_TIMEOUT_SECONDS', '9600'))
# Rate Limiting
PLATFORM_RATE_LIMIT_TIKTOK = int(os.getenv('PLATFORM_RATE_LIMIT_TIKTOK', '100'))
PLATFORM_RATE_LIMIT_SHOPEE = int(os.getenv('PLATFORM_RATE_LIMIT_SHOPEE', '200'))
# Batch Sizes
SYNC_BATCH_SIZE_ORDERS = int(os.getenv('SYNC_BATCH_SIZE_ORDERS', '300'))
TIKTOK_ORDER_DETAILS_BATCH_SIZE = int(os.getenv('TIKTOK_ORDER_DETAILS_BATCH_SIZE', '50'))
# Buffer
SYNC_BUFFER_ENABLED = os.getenv('SYNC_BUFFER_ENABLED', 'true').lower() == 'true'
SYNC_BUFFER_DIR = os.getenv('SYNC_BUFFER_DIR', 'data/buffer/sync_tasks')
SYNC_BUFFER_MAX_RETRIES = int(os.getenv('SYNC_BUFFER_MAX_RETRIES', '3'))
# Celery
CELERY_BROKER_URL = os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')Lợi ích:
- ✅ Single Source of Truth: Tất cả config ở 1 chỗ
- ✅ Environment-based: Dễ thay đổi giữa dev/staging/prod
- ✅ Default Values: Có giá trị mặc định hợp lý
- ✅ Type Safety: Convert sang đúng type (int, bool, str)
.env file cho từng environment:
# .env.development
SYNC_BATCH_SIZE_ORDERS=100
SYNC_CHUNK_DAYS=1
PLATFORM_RATE_LIMIT_TIKTOK=50
# .env.production
SYNC_BATCH_SIZE_ORDERS=300
SYNC_CHUNK_DAYS=1
PLATFORM_RATE_LIMIT_TIKTOK=100Lợi ích:
- ✅ Environment-specific: Config khác nhau cho dev/staging/prod
- ✅ Security: Không commit sensitive data vào git
- ✅ Easy Deployment: Chỉ cần thay đổi .env file
Ví dụ: Tune batch size theo API limits:
# TikTok API limit: 50 IDs/batch
TIKTOK_ORDER_DETAILS_BATCH_SIZE = 50 # Không thể lớn hơn
# Shopee API limit: 50 order_sns/batch
SHOPEE_ORDER_DETAILS_BATCH_SIZE = 50
# MongoDB bulk_write: Không giới hạn nhưng 300 là optimal
SYNC_BATCH_SIZE_ORDERS = 300 # Có thể tune theo performanceVí dụ: Tune theo Performance:
# Nếu MongoDB chậm → giảm batch size
SYNC_BATCH_SIZE_ORDERS = 100 # Thay vì 300
# Nếu network nhanh → tăng batch size
SYNC_BATCH_SIZE_ORDERS = 500 # Nếu MongoDB handle đượcVí dụ: Tune theo Business Requirements:
# Sync nhanh hơn → giảm chunk_days
SYNC_CHUNK_DAYS = 1 # 1 ngày/chunk → nhiều chunks song song
# Sync ít overhead hơn → tăng chunk_days
SYNC_CHUNK_DAYS = 7 # 7 ngày/chunk → ít chunks hơnOrganization:
project/
├── config.py # Single config file
├── services/
│ ├── sync_service.py # Business logic
│ ├── tiktok_service.py # Platform-specific
│ └── shopee_service.py
├── models/
│ └── orders/
│ ├── order_dto.py # Data transformation
│ └── base_dto.py
├── utils/
│ ├── sync_buffer.py # File I/O utilities
│ └── mongo_logger.py
└── workers/
└── sync_tasks.py # Celery tasks
Principles:
- ✅ Separation of Concerns: Mỗi module có responsibility rõ ràng
- ✅ DRY: Không duplicate code
- ✅ Single Responsibility: Mỗi file làm 1 việc
- ✅ Easy to Navigate: Dễ tìm code
Flexibility:
- ✅ Thay đổi config mà không cần deploy code mới
- ✅ Tune performance dễ dàng
- ✅ Test với các giá trị khác nhau
Maintainability:
- ✅ Dễ hiểu: Tất cả config ở 1 chỗ
- ✅ Dễ document: Comment trong config.py
- ✅ Dễ review: Config changes dễ review hơn code changes
Scalability:
- ✅ Scale theo environment (dev/staging/prod)
- ✅ Scale theo platform (TikTok vs Shopee có limits khác nhau)
- ✅ Scale theo infrastructure (MongoDB performance, network speed)
TikTok Order Details:
# Trước: 300 API calls
for order_id in order_ids:
order_detail = fetch_order_detail(order_id)
# Sau: 6 API calls (50 IDs/batch)
order_details = fetch_order_details_batch(order_ids, batch_size=50)TikTok Tracking Status:
# Trước: 300 API calls
for order_id in order_ids:
tracking = fetch_order_tracking_status(order_id)
# Sau: 6 API calls (50 IDs/batch)
tracking_list = fetch_order_tracking_status_batch(order_ids, batch_size=50)Shopee Order Details:
# Đã hỗ trợ batch từ đầu
order_sn_list = ','.join(order_sns) # "sn1,sn2,sn3,...,sn50"
order_details = get_order_detail(order_sn_list)Platform-specific Limits:
PLATFORM_RATE_LIMIT_TIKTOK = 100 # requests/minute
PLATFORM_RATE_LIMIT_SHOPEE = 200Implementation:
def _compute_platform_delay(platform: str, slots: Dict[str, float]) -> float:
"""
Tính delay giữa các requests để không vượt rate limit.
"""
limit = PLATFORM_RATE_LIMITS.get(platform.lower())
step = 60.0 / limit # Giây giữa các requests
delay = slots[platform_lower]
slots[platform_lower] += step
return delayMAX_REQUEST_RETRIES = 3
REQUEST_BACKOFF_SECONDS = 2
for attempt in range(MAX_REQUEST_RETRIES):
try:
response = requests.request(...)
return response.json()
except RETRYABLE_EXCEPTIONS:
if attempt < MAX_REQUEST_RETRIES - 1:
time.sleep(REQUEST_BACKOFF_SECONDS * (2 ** attempt))
continue
raiseCollections sử dụng bulk_write:
orders→_persist_orders_bulk()transactions→_persist_transactions_bulk()returns→_persist_returns_bulk()order_tracking_status→_persist_tracking_status_bulk()products→ bulk_write trong products sync
Performance Gain:
- 300 individual updates: ~3 giây
- 1 bulk_write (300 operations): ~0.3 giây
- 10x nhanh hơn
# Chỉ load fields cần thiết khi check existing
existing_products = mongo.db.products.find(
{'shop_id': shop_id, 'channel_id': channel_id},
{'product_id': 1, 'update_time': 1} # Chỉ 2 fields
)# Chỉ upsert products có thay đổi
for product in products:
api_update_time = product.get('update_time')
db_update_time = existing_map.get(product_id)
if api_update_time != db_update_time:
products_to_upsert.append(product)Khi MongoDB Write Fail:
try:
result = mongo.db.orders.bulk_write(bulk_operations)
except Exception as exc:
# Lưu vào buffer để replay sau
sync_buffer.save_batch(
collection='orders',
batch_data=orders, # Raw data từ API
batch_metadata={...},
error=exc
)Replay sau khi Sync Xong:
# Tự động replay các batches đã lưu
buffer_replay_result = self._replay_sync_buffer(
sync_buffer=sync_buffer,
mongo=mongo,
...
)Max Retries:
SYNC_BUFFER_MAX_RETRIES = 3 # Retry tối đa 3 lầnNon-Retryable Errors:
SYNC_NON_RETRYABLE_ERRORS = [
'unknown platform',
'invalid platform',
'unsupported platform'
]MongoDB Logging:
mongo_logger.add_error(error_msg, stage='orders_fetch')File Logging:
# logs/syncdata_errors.log
logger.error(f"Failed to fetch orders: {error}")Summary Tracking:
# summary.json
{
"failed_dates": [
{
"date": "2024-01-15",
"status": "failed",
"note": "MongoDB connection timeout",
"timestamp": "2024-01-15T10:30:00+07:00"
}
]
}Khi nhiều processes cùng ghi vào 1 file:
# ❌ KHÔNG AN TOÀN: Nhiều processes cùng ghi
def write_summary(data):
# Process A: Load file
with open('summary.json', 'r') as f:
existing = json.load(f)
# Process B: Cũng load file (cùng lúc với A)
with open('summary.json', 'r') as f:
existing = json.load(f)
# Process A: Update và ghi
existing['stats']['completed_chunks'] += 1
with open('summary.json', 'w') as f:
json.dump(existing, f) # Ghi đè
# Process B: Update và ghi (ghi đè mất thay đổi của A)
existing['stats']['completed_chunks'] += 1
with open('summary.json', 'w') as f:
json.dump(existing, f) # Mất thay đổi của A!
# → Race condition: Chỉ có 1 thay đổi được lưuImplementation:
# utils/sync_buffer.py
import fcntl # Unix/Linux/Mac file locking
class ShopSyncSummary:
def _write(self):
"""
Ghi summary.json với file locking và atomic write.
"""
temp_path = f"{self.summary_path}.tmp"
lock_path = f"{self.summary_path}.lock"
for attempt in range(max_retries):
lock_file = None
try:
# 1. Acquire exclusive lock
lock_file = open(lock_path, 'w')
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
# LOCK_EX = Exclusive lock (chỉ 1 process có thể giữ)
# LOCK_NB = Non-blocking (raise exception nếu không acquire được)
# 2. Load existing data
existing_data = {}
if os.path.exists(self.summary_path):
with open(self.summary_path, 'r') as f:
existing_data = json.load(f)
# 3. Merge với existing data
merged_data = existing_data.copy()
# ... merge logic ...
# 4. Atomic write: Ghi vào temp file trước
with open(temp_path, 'w') as f:
json.dump(merged_data, f)
# 5. Atomic rename (atomic operation trên filesystem)
os.replace(temp_path, self.summary_path)
# rename là atomic → không có race condition
# 6. Release lock
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
lock_file.close()
os.remove(lock_path)
return
except BlockingIOError:
# Lock đang được giữ bởi process khác → retry
if lock_file:
lock_file.close()
time.sleep(retry_delay * (attempt + 1))
continueLợi ích:
- ✅ Exclusive Lock: Chỉ 1 process có thể ghi tại 1 thời điểm
- ✅ Atomic Write: Ghi vào temp file → rename (atomic)
- ✅ No Data Loss: Không mất thay đổi của processes khác
- ✅ Retry Logic: Tự động retry nếu không acquire được lock
Windows không có fcntl:
# File locking support
try:
import fcntl
HAS_FCNTL = True
except ImportError:
HAS_FCNTL = False # Windows
def _write(self):
if HAS_FCNTL:
# Unix/Linux/Mac: Dùng fcntl
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
else:
# Windows: Dùng file existence check (less safe nhưng better than nothing)
if os.path.exists(lock_path) and attempt < max_retries - 1:
time.sleep(retry_delay * (attempt + 1))
continueLợi ích:
- ✅ Cross-platform: Hoạt động trên Unix/Linux/Mac và Windows
- ✅ Graceful Degradation: Windows dùng fallback method
Atomic Write Pattern:
# ❌ KHÔNG ATOMIC: Ghi trực tiếp
with open('summary.json', 'w') as f:
json.dump(data, f) # Nếu crash giữa chừng → file corrupt
# ✅ ATOMIC: Ghi vào temp file → rename
temp_path = f"{self.summary_path}.tmp"
with open(temp_path, 'w') as f:
json.dump(data, f) # Ghi vào temp file
os.replace(temp_path, self.summary_path) # Rename là atomic
# Nếu crash → temp file vẫn còn, có thể recoverLợi ích:
- ✅ No Corruption: Nếu crash → file gốc không bị corrupt
- ✅ Atomic: Rename là atomic operation trên filesystem
- ✅ Recovery: Có thể recover từ temp file nếu cần
Vấn đề: Nhiều chunks cùng update counters trong summary.json
Giải pháp: Merge thay vì ghi đè:
def _write(self):
# Load existing data
existing_data = {}
if os.path.exists(self.summary_path):
with open(self.summary_path, 'r') as f:
existing_data = json.load(f)
# Merge thay vì ghi đè
merged_data = existing_data.copy()
# Merge stats: Cộng dồn thay vì ghi đè
existing_stats = merged_data.get('stats', {})
new_stats = self.data.get('stats', {})
# Cộng dồn counters
existing_stats['completed_chunks'] = existing_stats.get('completed_chunks', 0) + new_stats.get('completed_chunks', 0)
existing_stats['failed_chunks'] = existing_stats.get('failed_chunks', 0) + new_stats.get('failed_chunks', 0)
merged_data['stats'] = existing_stats
# Ghi merged data
with open(temp_path, 'w') as f:
json.dump(merged_data, f)
os.replace(temp_path, self.summary_path)Lợi ích:
- ✅ No Overwrite: Không ghi đè thay đổi của processes khác
- ✅ Accumulative: Cộng dồn counters từ nhiều chunks
- ✅ Consistency: Đảm bảo summary.json luôn đúng
Vấn đề: Nhiều processes cùng tạo buffer files → trùng tên
Giải pháp: Scan existing files để tìm max index:
def save_batch(self, collection: str, batch_data: List[Dict], ...):
# Scan existing files để tìm max index
counter_key = f"{collection}:{chunk_label}"
if counter_key not in self._batch_counters:
max_index = 0
if os.path.exists(self.task_buffer_dir):
pattern_prefix = f"{chunk_label}_{collection}_batch_"
for filename in os.listdir(self.task_buffer_dir):
if filename.startswith(pattern_prefix) and filename.endswith('.json'):
# Extract số thứ tự: YYYYMMDD_collection_batch_XXX.json
parts = filename.replace('.json', '').split('_')
if len(parts) >= 4 and parts[-1].isdigit():
index = int(parts[-1])
max_index = max(max_index, index)
self._batch_counters[counter_key] = max_index
# Tăng counter
next_index = self._batch_counters[counter_key] + 1
self._batch_counters[counter_key] = next_index
# Tạo filename với index unique
buffer_filename = f"{chunk_label}_{collection}_batch_{next_index:03d}.json"Lợi ích:
- ✅ No Collision: Mỗi file có tên unique
- ✅ Sequential: Files được đánh số tuần tự
- ✅ Recovery-friendly: Dễ scan và replay
DO:
- ✅ Luôn dùng file locking khi nhiều processes
- ✅ Atomic write (temp file → rename)
- ✅ Merge thay vì ghi đè
- ✅ Retry logic với exponential backoff
- ✅ Error handling và fallback
DON'T:
- ❌ Ghi trực tiếp vào file chính
- ❌ Không có locking khi nhiều processes
- ❌ Ghi đè thay vì merge
- ❌ Không có retry logic
def fetch_orders_batch(...) -> Iterator[List[Dict]]:
# Yield từng batch, không return toàn bộ list
for batch in batches:
yield batch# Xử lý ngay, không accumulate
for orders_batch in svc.fetch_orders_batch(...):
self._persist_orders_bulk(orders_batch, ...)
# Batch đã persist → có thể GCSYNC_BATCH_SIZE_ORDERS = 300 # Cân bằng memory vs throughput
SYNC_BATCH_SIZE_TRANSACTIONS = 300
SYNC_BATCH_SIZE_RETURNS = 300- Sử dụng
dictthay vìlistkhi cần lookup nhanh - Chỉ giữ data cần thiết trong memory
- Release references ngay sau khi dùng xong
Structure:
{
"operation": "sync_shop_chunk",
"status": "completed",
"platform": "tiktok",
"shop_id": "...",
"channel_id": "...",
"date_range_start": "2024-01-01",
"date_range_end": "2024-01-01",
"orders_fetched": 1500,
"orders_inserted": 1200,
"orders_updated": 300,
"duration_seconds": 45.5,
"api_calls_count": 50,
"api_response_time_avg": 0.25,
"stages": [...],
"chunks": [...],
"errors": [...],
"warnings": [...]
}Daily Logs:
logs/sync_20241129.log
Error Logs:
logs/syncdata_errors.log
Location:
data/buffer/sync_tasks/{shop_slug}/{run_timestamp}/summary.json
Content:
{
"stats": {
"total_days": 365,
"success_days": 360,
"failed_days": 5,
"buffered_days": 3,
"total_chunks": 365,
"completed_chunks": 360,
"failed_chunks": 5
},
"problem_days": {
"buffered": ["2024-01-15"],
"failed": ["2024-03-10"]
},
"result": {
"orders_count": 150000,
"products_count": 5000,
"transactions_count": 120000
}
}# Get recent syncs
recent_syncs = SyncMetrics.get_recent_syncs(limit=10, platform='tiktok')
# Get sync stats
stats = SyncMetrics.get_sync_stats(
start_date=datetime(2024, 11, 1),
end_date=datetime(2024, 11, 4),
platform='tiktok'
)
# Get failed syncs
failed = SyncMetrics.get_failed_syncs(limit=20)
# Get slow syncs
slow = SyncMetrics.get_slow_syncs(min_duration_seconds=60)✅ DO:
- Chia nhỏ date range thành chunks (1 ngày/chunk)
- Xử lý song song nhiều chunks
- Retry từng chunk độc lập
❌ DON'T:
- Sync toàn bộ date range trong 1 task
- Quá nhiều chunks nhỏ (overhead)
- Quá ít chunks lớn (memory issues)
✅ DO:
- Batch size = 300 cho orders/transactions/returns
- Batch size = 50 cho API calls (theo API limits)
- Monitor và adjust dựa trên performance
❌ DON'T:
- Batch quá nhỏ (overhead)
- Batch quá lớn (memory issues, timeout)
✅ DO:
- Lưu vào buffer khi MongoDB fail
- Retry với exponential backoff
- Track errors chi tiết
❌ DON'T:
- Throw exception ngay khi fail
- Retry vô hạn
- Bỏ qua errors
✅ DO:
- Sử dụng generator functions
- Persist ngay sau mỗi batch
- Selective field loading
❌ DON'T:
- Accumulate toàn bộ data trong memory
- Load unnecessary fields
- Giữ references không cần thiết
✅ DO:
- Log chi tiết vào MongoDB
- Track metrics quan trọng
- Alert khi có vấn đề
❌ DON'T:
- Log quá nhiều (performance impact)
- Bỏ qua monitoring
- Không track metrics
- Redis Stream nhẹ, triển khai nhanh, phù hợp throughput trung bình và độ trễ thấp; không cần cluster phức tạp.
- Hỗ trợ consumer group, pending list giúp retry tự nhiên; idempotent dựa trên
idempotency_key. - Kafka phù hợp throughput cực lớn và yêu cầu retention dài; bài toán hiện thiên về xử lý theo event gần-real-time, độ phức tạp vận hành thấp.
- Dự án hiện tại đang sử dụng Redis -> tái sử dụng, tránh setup thêm lib khác.
- Teammate có kinh nghiệm triển khai tốt hơn với Redis, giúp triển khai nhanh và chuẩn xác hơn.
- Nhận webhook, validate keys bắt buộc, ghi vào Redis Stream.
- Worker (consumer group) đọc batch, gom ID theo shop (orders/returns/products).
- Khởi tạo service theo shop (check credential); chỉ fetch chi tiết các ID chưa có trong Mongo.
- Orders: map
OrderDetailDTOđể chuẩn hóa timestamp ISO+7, tiền tệ Decimal128, thêm metadata; upsert idempotent. - Returns/Products: tương tự như với Orders thôi.
- Ack sau khi upsert thành công; lỗi Mongo/Redis giữ pending để retry.
- Lưu ý : Còn có cronjob để chạy mỗi 6h để crawl đơn hàng trong ngày, để đảm bảo không bị miss dữ liệu.
- DTO chuẩn hóa để đồng nhất format giữa webhook và sync định kỳ (Thêm metadata, convert định dạng số cho tiền, định dạng timestamp chung)
- Bulk upsert Mongo (
ordered=False) tăng throughput, tránh overwrite toàn bộ. - Missing-only fetch giảm số call API, tránh ghi đè dữ liệu đã có.
- Metadata đầy đủ (
platform,channel_id,shop_id,details_fetched_at,synced_at) phục vụ audit/trace.
- Không ack khi lỗi Mongo/Redis để auto-retry qua pending list.
- Idempotency bằng
idempotency_key+ upsert theo khóa{order_id|return_id|product_id, shop_id}. - Có thể điều chỉnh
maxlenstream để cân bằng lưu trữ và bảo toàn webhook.
Hệ thống Event-driven để thông báo tới các services khác (CRM, Reporting) trong hệ thống Microservices biết có sự thay đổi dữ liệu.
- Update later...
Hệ thống đồng bộ dữ liệu này đã áp dụng nhiều kỹ thuật tối ưu để xử lý hàng chục triệu records một cách hiệu quả:
- Chunking & Parallel Processing: Chia nhỏ và xử lý song song
- Streaming & Batch Processing: Giảm memory footprint
- Batch API Calls: Giảm số lượng API calls (30x nhanh hơn)
- Bulk MongoDB Operations: Tăng throughput (10x nhanh hơn)
- Sync Buffer System: Đảm bảo zero data loss
- Smart Error Handling: Retry thông minh
- Memory Optimization: Có thể xử lý hàng triệu records
- Monitoring & Observability: Full visibility
Với các kỹ thuật này, hệ thống có thể:
- ✅ Xử lý hàng chục triệu records
- ✅ Chạy ổn định, không miss data
- ✅ Kiểm soát tốt memory và performance
- ✅ Tự động recover khi có lỗi
- ✅ Monitor và debug dễ dàng
Tác giả: Bienpx
Ngày tạo: 2025-11-29
Phiên bản: 1.0