Skip to content

Instantly share code, notes, and snippets.

@dinizime
Created March 15, 2025 09:34
Show Gist options
  • Select an option

  • Save dinizime/327f1cfd8247d3bdadde085937773501 to your computer and use it in GitHub Desktop.

Select an option

Save dinizime/327f1cfd8247d3bdadde085937773501 to your computer and use it in GitHub Desktop.
Rotina em python para zipar tiles em paralelo agrupando eles.
import os
import glob
import zipfile
import math
import shutil
import argparse
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import cpu_count
from tqdm import tqdm
import time
import logging
# Configuração do logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
def get_available_zoom_levels(tile_dir):
"""Identifica todos os níveis de zoom disponíveis no diretório de tiles."""
zoom_levels = []
# Procura por diretórios que sejam apenas números
for item in os.listdir(tile_dir):
if os.path.isdir(os.path.join(tile_dir, item)) and item.isdigit():
zoom_levels.append(int(item))
zoom_levels.sort() # Ordena os níveis de zoom
if not zoom_levels:
logger.warning(f"Nenhum nível de zoom encontrado em {tile_dir}")
return []
logger.info(f"Níveis de zoom disponíveis: {zoom_levels}")
return zoom_levels
def count_tiles(tile_dir, zoom_level):
"""Conta quantos tiles existem para um determinado nível de zoom e calcula tamanho médio."""
zoom_path = os.path.join(tile_dir, str(zoom_level))
if not os.path.exists(zoom_path):
raise ValueError(f"Caminho para o nível de zoom {zoom_level} não encontrado em {tile_dir}")
# Conta e amostra em uma única passagem
total_tiles = 0
sample_files = []
max_samples = 1000
logger.info(f"Contando tiles no nível de zoom {zoom_level}...")
for root, _, files in os.walk(zoom_path, topdown=True):
total_tiles += len(files)
# Amostra aleatória para tamanho médio
for file in files:
file_path = os.path.join(root, file)
if os.path.isfile(file_path) and len(sample_files) < max_samples:
sample_files.append(file_path)
# Calcular tamanho médio dos tiles
if sample_files:
avg_size = sum(os.path.getsize(f) for f in sample_files) / len(sample_files)
else:
avg_size = 10 * 1024 # 10KB padrão se não houver arquivos
logger.info(f"Total de tiles encontrados: {total_tiles}")
logger.info(f"Tamanho médio do tile: {avg_size/1024:.2f}KB")
return total_tiles, avg_size
def calculate_optimal_splits(total_tiles, avg_size, target_zip_size=100*1024*1024):
"""Calcula o número ótimo de divisões para arquivos ZIP."""
estimated_total_size = total_tiles * avg_size
optimal_splits = math.ceil(estimated_total_size / target_zip_size)
# Ajustar para um número razoável de divisões
if optimal_splits > 1000:
optimal_splits = 1000
# Garantir pelo menos 1 divisão
if optimal_splits < 1:
optimal_splits = 1
# Determinar a estratégia de divisão (quadrantes)
# Encontrar a melhor divisão de grade (mais próxima de uma raiz quadrada)
grid_size = int(math.sqrt(optimal_splits))
if grid_size * grid_size < optimal_splits:
grid_size += 1
logger.info(f"Tamanho total estimado: {estimated_total_size/(1024*1024*1024):.2f}GB")
logger.info(f"Número ótimo de arquivos ZIP: {optimal_splits}")
logger.info(f"Usando grade de {grid_size}x{grid_size} para divisão")
return grid_size
def create_zip_file(args):
"""Cria um arquivo ZIP com os tiles especificados."""
zoom_level, tile_dir, x_start, x_end, y_start, y_end, quadrant_id = args
output_file = f"tiles_z{zoom_level}_q{quadrant_id:04d}_x{x_start}-{x_end}_y{y_start}-{y_end}.zip"
temp_output_file = f"temp_{output_file}"
# Lista para armazenar os arquivos a serem compactados
files_to_zip = []
try:
# Percorre os diretórios X dentro do intervalo
for x in range(x_start, x_end + 1):
x_dir = os.path.join(tile_dir, str(zoom_level), str(x))
if not os.path.exists(x_dir):
continue
# Para cada diretório X, procura arquivos Y dentro do intervalo
for y_file in os.listdir(x_dir):
try:
# Valida o nome do arquivo: extrai o valor Y
y_name = os.path.splitext(y_file)[0]
if not y_name.isdigit():
logger.warning(f"Arquivo não-numérico ignorado: {x_dir}/{y_file}")
continue
y = int(y_name)
if y_start <= y <= y_end:
files_to_zip.append(os.path.join(x_dir, y_file))
except ValueError:
# Ignora arquivos que não são tiles válidos
logger.debug(f"Arquivo ignorado: {x_dir}/{y_file}")
continue
# Se não houver arquivos, retorna sem criar o ZIP
if not files_to_zip:
logger.info(f"Nenhum tile encontrado para o quadrante {quadrant_id} no zoom {zoom_level}, ZIP não criado.")
return None, 0
# Cria o arquivo ZIP
with zipfile.ZipFile(temp_output_file, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file in files_to_zip:
# Preserva a estrutura de diretórios correta (z/x/y.ext -> x/y.ext)
arcname = os.path.relpath(file, os.path.join(tile_dir, str(zoom_level)))
zipf.write(file, arcname)
# Renomeia o arquivo temporário para o nome final
shutil.move(temp_output_file, output_file)
return output_file, len(files_to_zip)
except Exception as e:
logger.error(f"Erro ao criar ZIP para quadrante {quadrant_id} no zoom {zoom_level}: {str(e)}")
if os.path.exists(temp_output_file):
try:
os.remove(temp_output_file)
except:
logger.warning(f"Não foi possível remover arquivo temporário: {temp_output_file}")
return None, 0
def zip_tiles_in_parallel(tile_dir, zoom_level, num_workers=None, target_zip_size=100*1024*1024):
"""Zipa os tiles em paralelo usando uma estratégia de quadrantes para um nível de zoom."""
start_time = time.time()
logger.info(f"====== Processando nível de zoom {zoom_level} ======")
# Conta tiles e calcula o tamanho médio
try:
total_tiles, avg_size = count_tiles(tile_dir, zoom_level)
except ValueError as e:
logger.error(str(e))
return [], []
# Determina o tamanho da grade para divisão
grid_size = calculate_optimal_splits(total_tiles, avg_size, target_zip_size)
# Determina o limite de coordenadas para o nível de zoom
max_coord = 2**zoom_level - 1
# Calcula o tamanho de cada "bloco" no grid
x_block_size = math.ceil((max_coord + 1) / grid_size)
y_block_size = math.ceil((max_coord + 1) / grid_size)
logger.info(f"Dimensão do nível de zoom {zoom_level}: 0-{max_coord} (largura e altura)")
logger.info(f"Tamanho de cada bloco: {x_block_size}x{y_block_size} tiles")
# Prepara os argumentos para cada tarefa de compactação
zip_tasks = []
quadrant_id = 0
for x_block in range(grid_size):
x_start = x_block * x_block_size
x_end = min(x_start + x_block_size - 1, max_coord)
for y_block in range(grid_size):
y_start = y_block * y_block_size
y_end = min(y_start + y_block_size - 1, max_coord)
zip_tasks.append((zoom_level, tile_dir, x_start, x_end, y_start, y_end, quadrant_id))
quadrant_id += 1
# Determina o número de workers
if num_workers is None:
num_workers = max(1, cpu_count() - 1)
logger.info(f"Iniciando compactação com {num_workers} processos paralelos...")
logger.info(f"Total de quadrantes a processar: {len(zip_tasks)}")
# Executa as tarefas em paralelo com melhor tratamento de erros
results = []
errors = []
with ProcessPoolExecutor(max_workers=num_workers) as executor:
# Submete todas as tarefas
future_to_task = {executor.submit(create_zip_file, task): task for task in zip_tasks}
# Usa tqdm para mostrar o progresso
for future in tqdm(as_completed(future_to_task), total=len(zip_tasks),
desc=f"Zoom {zoom_level}"):
task = future_to_task[future]
try:
result = future.result()
if result[0]: # Se o arquivo foi criado
results.append(result)
except Exception as e:
quadrant_id = task[6]
errors.append((quadrant_id, str(e)))
logger.error(f"Erro no processamento do quadrante {quadrant_id}: {str(e)}")
# Calcula estatísticas
total_zipped = sum(count for _, count in results)
elapsed_time = time.time() - start_time
logger.info(f"Compactação do nível {zoom_level} concluída em {elapsed_time:.2f} segundos")
logger.info(f"Total de arquivos ZIP criados: {len(results)}")
logger.info(f"Total de tiles compactados: {total_zipped}")
if errors:
logger.warning(f"Ocorreram {len(errors)} erros durante o processamento")
return results, errors
def zip_all_zoom_levels(tile_dir, min_zoom=None, max_zoom=None, num_workers=None, target_zip_size=100*1024*1024):
"""Processa e zipa todos os níveis de zoom disponíveis dentro do intervalo especificado."""
start_time = time.time()
# Obtém todos os níveis de zoom disponíveis
all_zoom_levels = get_available_zoom_levels(tile_dir)
if not all_zoom_levels:
logger.error("Nenhum nível de zoom válido encontrado. Verifique a estrutura do diretório.")
return
# Filtra os níveis de zoom com base nos limites especificados
if min_zoom is not None:
all_zoom_levels = [z for z in all_zoom_levels if z >= min_zoom]
if max_zoom is not None:
all_zoom_levels = [z for z in all_zoom_levels if z <= max_zoom]
if not all_zoom_levels:
logger.error(f"Nenhum nível de zoom dentro do intervalo especificado: min={min_zoom}, max={max_zoom}")
return
logger.info(f"Processando níveis de zoom: {all_zoom_levels}")
# Estatísticas globais
total_results = []
total_errors = []
# Processa cada nível de zoom
for zoom_level in all_zoom_levels:
results, errors = zip_tiles_in_parallel(tile_dir, zoom_level, num_workers, target_zip_size)
total_results.extend(results)
total_errors.extend(errors)
# Estatísticas finais
total_zips = len(total_results)
total_tiles = sum(count for _, count in total_results)
elapsed_time = time.time() - start_time
logger.info("=" * 60)
logger.info("RESUMO FINAL")
logger.info("=" * 60)
logger.info(f"Total de níveis de zoom processados: {len(all_zoom_levels)}")
logger.info(f"Total de arquivos ZIP criados: {total_zips}")
logger.info(f"Total de tiles compactados: {total_tiles}")
logger.info(f"Total de erros: {len(total_errors)}")
logger.info(f"Tempo total de processamento: {elapsed_time:.2f} segundos")
return total_results, total_errors
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Compactador paralelo de tiles XYZ para múltiplos níveis de zoom')
parser.add_argument('tile_dir', type=str, help='Diretório raiz dos tiles')
parser.add_argument('--min-zoom', type=int, default=None,
help='Nível de zoom mínimo para processar (opcional)')
parser.add_argument('--max-zoom', type=int, default=None,
help='Nível de zoom máximo para processar (opcional)')
parser.add_argument('--workers', type=int, default=None,
help='Número de processos paralelos (padrão: CPU cores - 1)')
parser.add_argument('--target-size', type=int, default=100,
help='Tamanho alvo para cada arquivo ZIP em MB (padrão: 100MB)')
args = parser.parse_args()
target_zip_size = args.target_size * 1024 * 1024 # Converter MB para bytes
results, errors = zip_all_zoom_levels(
args.tile_dir,
args.min_zoom,
args.max_zoom,
args.workers,
target_zip_size
)
#python tile_zipper.py /caminho/para/tiles
#python tile_zipper.py /caminho/para/tiles --min-zoom 10 --max-zoom 15
#python tile_zipper.py /caminho/para/tiles --target-size 50
#python tile_zipper.py /caminho/para/tiles --workers 4
#python tile_zipper.py /caminho/para/tiles --min-zoom 16 --target-size 200 --workers 8
#python tile_zipper.py /caminho/para/tiles --max-zoom 10 --target-size 25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment