Skip to content

Instantly share code, notes, and snippets.

@dinizime
Created March 4, 2026 19:42
Show Gist options
  • Select an option

  • Save dinizime/cbb4b31c4f34afa2de2112241ade8dd4 to your computer and use it in GitHub Desktop.

Select an option

Save dinizime/cbb4b31c4f34afa2de2112241ade8dd4 to your computer and use it in GitHub Desktop.
Trata o raster classificado generalizando para escala 50k e conformando area edificada e massa dagua
# -*- coding: utf-8 -*-
"""
/***************************************************************************
DsgTools
A QGIS plugin
Brazilian Army Cartographic Production Tools
-------------------
begin : 2025-07-17
git sha : $Format:%H$
copyright : (C) 2025 by Philipe Borba - Cartographic Engineer @ Brazilian Army
email : borba.philipe@eb.mil.br
***************************************************************************/
/***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************/
"""
import gc
import os
import json
import shutil
import tempfile
import uuid
import numpy as np
from osgeo import gdal
from qgis.PyQt.QtCore import QCoreApplication
from qgis.core import (
QgsProcessing,
QgsProcessingAlgorithm,
QgsProcessingParameterRasterLayer,
QgsProcessingParameterVectorLayer,
QgsProcessingParameterNumber,
QgsProcessingParameterString,
QgsProcessingParameterRasterDestination,
QgsProcessingException,
QgsProcessingMultiStepFeedback,
QgsRasterLayer,
)
from DsgTools.core.DSGToolsProcessingAlgs.algRunner import AlgRunner
import processing
class TrataRasterAlgorithm(QgsProcessingAlgorithm):
"""
Algoritmo consolidado de tratamento de raster para classificação de vegetação.
Substitui o modelo "02-Trata_Raster" por um único algoritmo Python.
Fluxo:
1. Leitura do raster
2. Sieve inicial
3. Remover pixels de massa d'água e área edificada (torna nodata por valor DN)
4. Queimar polígonos de referência com valores corretos das classes
5. Reclassificar pixels adjacentes com valor nodata
6. Generalização V4 passo 1 (15625 m²)
7. Sieve pós-V4
8. Generalização V4 passo 2 (62500 m²)
9. Sieve final
10. Escrita do raster de saída
"""
INPUT_RASTER = "INPUT_RASTER"
AREA_EDIFICADA = "AREA_EDIFICADA"
MASSA_DAGUA = "MASSA_DAGUA"
AREA_EDIFICADA_VALUE = "AREA_EDIFICADA_VALUE"
MASSA_DAGUA_VALUE = "MASSA_DAGUA_VALUE"
NODATA_VALUE = "NODATA_VALUE"
BUFFER_DISTANCE = "BUFFER_DISTANCE"
SIEVE_THRESHOLD = "SIEVE_THRESHOLD"
FIRST_PASS_MIN_AREA = "FIRST_PASS_MIN_AREA"
SECOND_PASS_MIN_AREA = "SECOND_PASS_MIN_AREA"
GENERALIZATION_RULES_PASS1 = "GENERALIZATION_RULES_PASS1"
GENERALIZATION_RULES_PASS2 = "GENERALIZATION_RULES_PASS2"
OUTPUT_RASTER = "OUTPUT_RASTER"
DEFAULT_RULES_PASS1 = json.dumps(
{
"class_restrictions": {"4": [2, 3, 6]},
"size_thresholds": {"1": 2500},
"non_growing_classes": [1],
}
)
DEFAULT_RULES_PASS2 = json.dumps(
{
"class_restrictions": {"4": [2, 3, 6]},
"size_thresholds": {"1": 10000},
"non_growing_classes": [1],
}
)
def __init__(self):
super().__init__()
self.algRunner = AlgRunner()
self._tmpDir = None
self._tmpCounter = 0
def tr(self, string):
return QCoreApplication.translate("TrataRasterAlgorithm", string)
def createInstance(self):
return TrataRasterAlgorithm()
def name(self):
return "trataraster"
def displayName(self):
return self.tr("Trata Raster (Consolidado)")
def group(self):
return self.tr("Raster Handling")
def groupId(self):
return "DSGTools - Raster Handling"
def shortHelpString(self):
return self.tr(
"Algoritmo consolidado de tratamento de raster para classificação "
"de vegetação.\n\n"
"Realiza limpeza (sieve), correção de classes (massa d'água e "
"área edificada), generalização (V4) e preenchimento de nodata.\n\n"
"A correção de classes funciona em etapas:\n"
"1. Remove TODOS os pixels do raster que possuem os valores DN "
"de massa d'água e área edificada (torna nodata)\n"
"2. Queima os polígonos de referência com os valores corretos\n"
"3. Preenche nodata adjacente por vizinho mais próximo\n\n"
"Parâmetros:\n"
"- Raster: Raster classificado de entrada (banda única)\n"
"- Área Edificada: Camada vetorial de polígonos\n"
"- Massa d'Água: Camada vetorial de polígonos\n"
"- Valor Área Edificada: Valor DN da classe (ex: 4)\n"
"- Valor Massa d'Água: Valor DN da classe (ex: 1)\n"
"- Valor NoData: (padrão: -9999)\n"
"- Distância Buffer: Buffer negativo em unidades do mapa "
"(padrão: -0.0001)\n"
"- Limiar Sieve: Pixels mínimos por grupo (padrão: 50)\n"
"- Área Mínima Passo 1: m² para V4 passo 1 (padrão: 15625)\n"
"- Área Mínima Passo 2: m² para V4 passo 2 (padrão: 62500)\n"
"- Regras de Generalização: JSON com restrições de classe"
)
def initAlgorithm(self, config=None):
self.addParameter(
QgsProcessingParameterRasterLayer(
self.INPUT_RASTER,
self.tr("Raster de Entrada"),
optional=False,
)
)
self.addParameter(
QgsProcessingParameterVectorLayer(
self.AREA_EDIFICADA,
self.tr("Área Edificada (Polígonos)"),
[QgsProcessing.TypeVectorPolygon],
optional=False,
)
)
self.addParameter(
QgsProcessingParameterVectorLayer(
self.MASSA_DAGUA,
self.tr("Massa d'Água (Polígonos)"),
[QgsProcessing.TypeVectorPolygon],
optional=False,
)
)
self.addParameter(
QgsProcessingParameterNumber(
self.AREA_EDIFICADA_VALUE,
self.tr("Valor DN da Classe Área Edificada"),
type=QgsProcessingParameterNumber.Integer,
defaultValue=2,
)
)
self.addParameter(
QgsProcessingParameterNumber(
self.MASSA_DAGUA_VALUE,
self.tr("Valor DN da Classe Massa d'Água"),
type=QgsProcessingParameterNumber.Integer,
defaultValue=1,
)
)
self.addParameter(
QgsProcessingParameterNumber(
self.NODATA_VALUE,
self.tr("Valor NoData"),
type=QgsProcessingParameterNumber.Integer,
defaultValue=-9999,
)
)
self.addParameter(
QgsProcessingParameterNumber(
self.BUFFER_DISTANCE,
self.tr("Distância do Buffer Negativo (unidades do CRS do raster)"),
type=QgsProcessingParameterNumber.Double,
defaultValue=-5,
)
)
self.addParameter(
QgsProcessingParameterNumber(
self.SIEVE_THRESHOLD,
self.tr("Limiar Sieve (pixels)"),
type=QgsProcessingParameterNumber.Integer,
defaultValue=50,
)
)
self.addParameter(
QgsProcessingParameterNumber(
self.FIRST_PASS_MIN_AREA,
self.tr("Área Mínima - Passo 1 (m²)"),
type=QgsProcessingParameterNumber.Integer,
defaultValue=15625,
)
)
self.addParameter(
QgsProcessingParameterNumber(
self.SECOND_PASS_MIN_AREA,
self.tr("Área Mínima - Passo 2 (m²)"),
type=QgsProcessingParameterNumber.Integer,
defaultValue=62500,
)
)
self.addParameter(
QgsProcessingParameterString(
self.GENERALIZATION_RULES_PASS1,
self.tr(
"Regras de Generalização - Passo 1 (JSON). "
'Formato: {"class_restrictions": {...}, '
'"size_thresholds": {...}, '
'"non_growing_classes": [...]}'
),
defaultValue=self.DEFAULT_RULES_PASS1,
optional=True,
)
)
self.addParameter(
QgsProcessingParameterString(
self.GENERALIZATION_RULES_PASS2,
self.tr(
"Regras de Generalização - Passo 2 (JSON). "
"Usado no segundo passo V4."
),
defaultValue=self.DEFAULT_RULES_PASS2,
optional=True,
)
)
self.addParameter(
QgsProcessingParameterRasterDestination(
self.OUTPUT_RASTER,
self.tr("Raster de Saída"),
optional=False,
)
)
def processAlgorithm(self, parameters, context, feedback):
try:
from scipy.spatial import KDTree
except ImportError:
raise QgsProcessingException(
self.tr(
"Este algoritmo requer a biblioteca scipy. "
"Instale com: pip install scipy"
)
)
# ---- Leitura dos parâmetros ----
inputRaster = self.parameterAsRasterLayer(
parameters, self.INPUT_RASTER, context
)
areaEdificada = self.parameterAsVectorLayer(
parameters, self.AREA_EDIFICADA, context
)
massaDagua = self.parameterAsVectorLayer(
parameters, self.MASSA_DAGUA, context
)
areaEdificadaValue = self.parameterAsInt(
parameters, self.AREA_EDIFICADA_VALUE, context
)
massaDaguaValue = self.parameterAsInt(
parameters, self.MASSA_DAGUA_VALUE, context
)
nodata = self.parameterAsInt(parameters, self.NODATA_VALUE, context)
bufferDist = self.parameterAsDouble(parameters, self.BUFFER_DISTANCE, context)
sieveThreshold = self.parameterAsInt(parameters, self.SIEVE_THRESHOLD, context)
firstPassMinArea = self.parameterAsInt(
parameters, self.FIRST_PASS_MIN_AREA, context
)
secondPassMinArea = self.parameterAsInt(
parameters, self.SECOND_PASS_MIN_AREA, context
)
rulesPass1 = self.parameterAsString(
parameters, self.GENERALIZATION_RULES_PASS1, context
)
rulesPass2 = self.parameterAsString(
parameters, self.GENERALIZATION_RULES_PASS2, context
)
outputRaster = self.parameterAsOutputLayer(
parameters, self.OUTPUT_RASTER, context
)
if not inputRaster or not inputRaster.isValid():
raise QgsProcessingException(
self.tr("Raster de entrada inválido ou não fornecido.")
)
self._tmpDir = tempfile.mkdtemp(prefix="trata_raster_")
self._tmpCounter = 0
totalSteps = 8
multiStepFeedback = QgsProcessingMultiStepFeedback(totalSteps, feedback)
step = 0
try:
# ==============================================================
# ETAPA 1: Leitura do raster
# ==============================================================
multiStepFeedback.pushInfo(
self.tr("Etapa 1/10: Lendo raster de entrada...")
)
npRaster, geotransform, projection = self._readRaster(
inputRaster.source()
)
multiStepFeedback.pushInfo(
self.tr(
f" Dimensões: {npRaster.shape[0]} x {npRaster.shape[1]}, "
f"dtype: {npRaster.dtype}"
)
)
# Preparar polígonos (feito uma vez, reutilizado)
# Reprojeção para o CRS do raster é feita aqui se necessário,
# porque gdal_rasterize não reprojeta automaticamente.
rasterCrs = inputRaster.crs()
multiStepFeedback.pushInfo(
self.tr(
f"Preparando polígonos (reprojeção, dissolve, buffer)... "
f"CRS raster: {rasterCrs.authid()}"
)
)
massaDaguaBuffered = self._prepareSinglePolygonLayer(
massaDagua, bufferDist, rasterCrs, context, multiStepFeedback
)
areaEdificadaBuffered = self._prepareSinglePolygonLayer(
areaEdificada, bufferDist, rasterCrs, context, multiStepFeedback
)
# ==============================================================
# ETAPA 2: Sieve inicial
# ==============================================================
multiStepFeedback.setCurrentStep(step)
multiStepFeedback.pushInfo(
self.tr(f"Etapa 2/10: Sieve inicial (limiar={sieveThreshold})...")
)
npRaster = self._runSieve(
npRaster, geotransform, projection, sieveThreshold,
context, multiStepFeedback,
)
step += 1
if multiStepFeedback.isCanceled():
return {}
# ==============================================================
# ETAPA 3: Remover massa d'água e área edificada (nodata por DN)
# ==============================================================
multiStepFeedback.setCurrentStep(step)
multiStepFeedback.pushInfo(
self.tr(
"Etapa 3/10: Removendo pixels de massa d'água "
"e área edificada..."
)
)
nPixelsMassa = int(np.sum(npRaster == massaDaguaValue))
nPixelsEdif = int(np.sum(npRaster == areaEdificadaValue))
multiStepFeedback.pushInfo(
self.tr(
f" {nPixelsMassa} pixels massa d'água (DN={massaDaguaValue}), "
f"{nPixelsEdif} pixels área edificada (DN={areaEdificadaValue}) "
f"convertidos para nodata ({nodata})."
)
)
npRaster[npRaster == massaDaguaValue] = nodata
npRaster[npRaster == areaEdificadaValue] = nodata
step += 1
if multiStepFeedback.isCanceled():
return {}
# ==============================================================
# ETAPA 4: Queimar polígonos com valores corretos
# ==============================================================
multiStepFeedback.setCurrentStep(step)
multiStepFeedback.pushInfo(
self.tr("Etapa 4/10: Queimando polígonos com valores corretos...")
)
multiStepFeedback.pushInfo(
self.tr(
f" Queimando massa d'água (DN={massaDaguaValue})..."
)
)
npRaster = self._burnValueOnRaster(
npRaster, geotransform, projection,
massaDaguaBuffered, massaDaguaValue,
context, multiStepFeedback,
)
multiStepFeedback.pushInfo(
self.tr(
f" Queimando área edificada (DN={areaEdificadaValue})..."
)
)
npRaster = self._burnValueOnRaster(
npRaster, geotransform, projection,
areaEdificadaBuffered, areaEdificadaValue,
context, multiStepFeedback,
)
step += 1
if multiStepFeedback.isCanceled():
return {}
# ==============================================================
# ETAPA 5: Reclassificar pixels adjacentes com valor nodata
# ==============================================================
multiStepFeedback.setCurrentStep(step)
nNodataPixels = int(np.sum(npRaster == nodata))
multiStepFeedback.pushInfo(
self.tr(
f"Etapa 5/10: Reclassificando {nNodataPixels} pixels "
f"nodata por vizinho mais próximo..."
)
)
if nNodataPixels > 0:
npRaster = self._reclassifyValuesToNearest(
npRaster, valuesToReclassify=[nodata], nodata=None
)
step += 1
if multiStepFeedback.isCanceled():
return {}
# ==============================================================
# ETAPA 6: Generalização V4 passo 1 (15625 m²)
# ==============================================================
multiStepFeedback.setCurrentStep(step)
multiStepFeedback.pushInfo(
self.tr(
f"Etapa 6/10: Generalização V4 passo 1 "
f"(min_area={firstPassMinArea} m²)..."
)
)
npRaster = self._runGeneralizationV4(
npRaster, geotransform, projection, nodata,
firstPassMinArea, rulesPass1, context, multiStepFeedback,
)
self._forceCleanup(multiStepFeedback)
step += 1
if multiStepFeedback.isCanceled():
return {}
# ==============================================================
# ETAPA 7: Sieve pós-V4
# ==============================================================
multiStepFeedback.setCurrentStep(step)
multiStepFeedback.pushInfo(
self.tr(f"Etapa 7/10: Sieve pós-V4 (limiar={sieveThreshold})...")
)
npRaster = self._runSieve(
npRaster, geotransform, projection, sieveThreshold,
context, multiStepFeedback,
)
step += 1
if multiStepFeedback.isCanceled():
return {}
# ==============================================================
# ETAPA 8: Generalização V4 passo 2 (62500 m²)
# ==============================================================
multiStepFeedback.setCurrentStep(step)
multiStepFeedback.pushInfo(
self.tr(
f"Etapa 8/10: Generalização V4 passo 2 "
f"(min_area={secondPassMinArea} m²)..."
)
)
npRaster = self._runGeneralizationV4(
npRaster, geotransform, projection, nodata,
secondPassMinArea, rulesPass2, context, multiStepFeedback,
)
self._forceCleanup(multiStepFeedback)
step += 1
if multiStepFeedback.isCanceled():
return {}
# ==============================================================
# ETAPA 9: Sieve final
# ==============================================================
multiStepFeedback.pushInfo(
self.tr(f"Etapa 9/10: Sieve final (limiar={sieveThreshold})...")
)
npRaster = self._runSieve(
npRaster, geotransform, projection, sieveThreshold,
context, multiStepFeedback,
)
# ==============================================================
# ETAPA 10: Escrita do raster de saída
# ==============================================================
multiStepFeedback.pushInfo(
self.tr("Etapa 10/10: Escrevendo raster de saída...")
)
self._writeRaster(
npRaster, geotransform, projection, outputRaster, nodata
)
multiStepFeedback.pushInfo(
self.tr("Processamento concluído com sucesso!")
)
return {self.OUTPUT_RASTER: outputRaster}
finally:
if self._tmpDir and os.path.exists(self._tmpDir):
shutil.rmtree(self._tmpDir, ignore_errors=True)
# ==================================================================
# Operações em memória (sem I/O de disco)
# ==================================================================
def _runSieve(
self, npRaster, geotransform, projection, threshold, context, feedback
):
"""
Aplica Sieve do GDAL via algRunner.runSieve (processing.run).
Escreve em disco para evitar access violation com o driver MEM
no Windows (GDAL 3.4.x).
"""
tmpInput = self._uniqueTmpPath("sieve_in")
tmpOutput = self._uniqueTmpPath("sieve_out")
try:
self._writeRaster(
npRaster, geotransform, projection, tmpInput
)
outputPath = self.algRunner.runSieve(
inputRaster=tmpInput,
threshold=threshold,
context=context,
feedback=feedback,
outputRaster=tmpOutput,
)
if not os.path.exists(outputPath):
feedback.pushWarning(
self.tr(
"Sieve não produziu saída. Retornando sem alteração."
)
)
return npRaster
result, _, _ = self._readRaster(outputPath)
return result
finally:
self._safeRemove(tmpInput)
self._safeRemove(tmpOutput)
def _reclassifyValuesToNearest(
self, npRaster, valuesToReclassify, nodata=-9999
):
"""
Reclassifica pixels com valores específicos para o vizinho mais
próximo usando KDTree (scipy).
Args:
npRaster: Array numpy (rows, cols).
valuesToReclassify: Lista de valores DN a substituir.
nodata: Valor de nodata excluído como fonte. Se None, não
exclui nenhum valor adicional.
"""
from scipy.spatial import KDTree
result = npRaster.copy()
fillMask = np.isin(result, valuesToReclassify)
sourceMask = ~fillMask
if nodata is not None:
sourceMask = sourceMask & (result != nodata)
if not np.any(fillMask) or not np.any(sourceMask):
return result
rows, cols = np.mgrid[0 : result.shape[0], 0 : result.shape[1]]
sourceCoords = np.column_stack((rows[sourceMask], cols[sourceMask]))
fillCoords = np.column_stack((rows[fillMask], cols[fillMask]))
nearestIndices = KDTree(sourceCoords).query(fillCoords)[1]
result[fillMask] = result[sourceMask][nearestIndices]
return result
# ==================================================================
# Preparação de polígonos
# ==================================================================
def _prepareSinglePolygonLayer(
self, polygonLayer, bufferDist, targetCrs, context, feedback
):
"""
Reprojeta para o CRS do raster (se necessário), depois dissolve
e aplica buffer negativo.
A ordem é importante:
- A reprojeção é feita primeiro para que o buffer seja aplicado
em unidades do CRS do raster (ex: metros em EPSG:3857)
- O gdal_rasterize não reprojeta automaticamente, então os
polígonos precisam estar no mesmo CRS do raster
"""
needsReproject = polygonLayer.crs() != targetCrs
nSteps = 3 if needsReproject else 2
innerFeedback = QgsProcessingMultiStepFeedback(nSteps, feedback)
currentStep = 0
# Reprojetar para o CRS do raster se necessário
layerToProcess = polygonLayer
if needsReproject:
innerFeedback.setCurrentStep(currentStep)
innerFeedback.pushInfo(
self.tr(
f" Reprojetando de "
f"{polygonLayer.crs().authid()} para "
f"{targetCrs.authid()}..."
)
)
layerToProcess = processing.run(
"native:reprojectlayer",
{
"INPUT": polygonLayer,
"TARGET_CRS": targetCrs,
"OUTPUT": "TEMPORARY_OUTPUT",
},
context=context,
feedback=innerFeedback,
is_child_algorithm=True,
)["OUTPUT"]
currentStep += 1
# Dissolve (já no CRS do raster)
innerFeedback.setCurrentStep(currentStep)
dissolved = self.algRunner.runDissolve(
inputLyr=layerToProcess,
context=context,
feedback=innerFeedback,
is_child_algorithm=True,
)
currentStep += 1
# Buffer negativo em unidades do CRS do raster (metros)
innerFeedback.setCurrentStep(currentStep)
buffered = self.algRunner.runBuffer(
inputLayer=dissolved,
distance=bufferDist,
context=context,
feedback=innerFeedback,
is_child_algorithm=True,
)
return buffered
# ==================================================================
# Operações com I/O (requerem arquivos temporários)
# ==================================================================
def _uniqueTmpPath(self, label, ext="tif"):
"""
Gera um caminho de arquivo temporário único usando um contador
incremental. Evita conflitos de file handle no Windows quando o
GDAL ou Qt não liberam o arquivo imediatamente.
"""
self._tmpCounter += 1
name = f"{label}_{self._tmpCounter}_{uuid.uuid4().hex[:8]}.{ext}"
return os.path.join(self._tmpDir, name)
def _forceCleanup(self, feedback=None):
"""
Força liberação de memória e file handles pendentes do GDAL.
Importante no Windows onde file handles podem persistir.
"""
gdal.VSICurlClearCache()
gc.collect()
def _readRaster(self, rasterPath):
"""
Lê um raster como array numpy int16 e retorna (array, geotransform, projection).
Garante fechamento do dataset GDAL.
"""
ds = gdal.Open(rasterPath)
if ds is None:
raise QgsProcessingException(
self.tr(f"Não foi possível abrir: {rasterPath}")
)
npRaster = ds.GetRasterBand(1).ReadAsArray().astype(np.int16)
geotransform = ds.GetGeoTransform()
projection = ds.GetProjection()
ds = None
return npRaster, geotransform, projection
def _burnValueOnRaster(
self,
npRaster,
geotransform,
projection,
vectorLayer,
burnValue,
context,
feedback,
):
"""
Queima um valor fixo sobre o raster nas áreas cobertas pela
camada vetorial.
Usa gdal:rasterize_over_fixed_value que recebe QgsVectorLayer e
QgsRasterLayer e modifica o raster in-place.
Usa nomes únicos para evitar conflito de file handles no Windows.
"""
tmpPath = self._uniqueTmpPath(f"burn_{burnValue}")
try:
# Escreve raster temporário
self._writeRaster(npRaster, geotransform, projection, tmpPath)
# Cria QgsRasterLayer (obrigatório para a API)
rasterLayer = QgsRasterLayer(tmpPath, "temp_burn")
if not rasterLayer.isValid():
raise QgsProcessingException(
self.tr(
"Erro ao carregar raster temporário para rasterização."
)
)
# Queima valor in-place no arquivo tmpPath
self.algRunner.runGdalRasterizeOverFixedValue(
inputLayer=vectorLayer,
inputRaster=rasterLayer,
value=burnValue,
context=context,
feedback=feedback,
is_child_algorithm=True,
)
# Liberar QgsRasterLayer explicitamente
del rasterLayer
gc.collect()
# Ler resultado de volta
result, _, _ = self._readRaster(tmpPath)
return result
finally:
# No Windows, o arquivo pode ainda estar em uso.
# Não falhar se não conseguir deletar.
self._safeRemove(tmpPath)
def _runGeneralizationV4(
self, npRaster, geotransform, projection, nodata,
minArea, rulesJson, context, feedback,
):
"""Executa reclassificação de grupos de pixels V4 via processing.run."""
tmpInput = self._uniqueTmpPath("v4_in")
tmpOutput = self._uniqueTmpPath("v4_out")
try:
self._writeRaster(
npRaster, geotransform, projection, tmpInput, nodata
)
feedback.pushInfo(
self.tr(f" V4 input: {tmpInput}")
)
feedback.pushInfo(
self.tr(f" V4 output: {tmpOutput}")
)
processing.run(
"dsgtools:reclassifygroupsofpixelstonearestneighboralgorithmv4",
{
"INPUT": tmpInput,
"MIN_AREA": minArea,
"NODATA_VALUE": nodata,
"GENERALIZATION_RULES": rulesJson if rulesJson else "",
"OUTPUT": tmpOutput,
},
context=context,
feedback=feedback,
is_child_algorithm=True,
)
if not os.path.exists(tmpOutput):
feedback.pushWarning(
self.tr(
"V4 não produziu arquivo de saída. "
"Retornando sem alteração."
)
)
return npRaster
result, _, _ = self._readRaster(tmpOutput)
# Validar dimensões
if result.shape != npRaster.shape:
feedback.pushWarning(
self.tr(
f"V4 produziu raster com dimensões diferentes: "
f"{result.shape} vs {npRaster.shape}. "
f"Retornando sem alteração."
)
)
return npRaster
return result
finally:
self._safeRemove(tmpInput)
self._safeRemove(tmpOutput)
# ==================================================================
# Utilitários de I/O
# ==================================================================
def _writeRaster(
self, npRaster, geotransform, projection, outputPath, nodata=-9999
):
"""Escreve array numpy como GeoTIFF com compressão LZW."""
rows, cols = npRaster.shape
driver = gdal.GetDriverByName("GTiff")
ds = driver.Create(
outputPath, cols, rows, 1, gdal.GDT_Int16,
options=["COMPRESS=LZW", "TILED=YES"],
)
ds.SetGeoTransform(geotransform)
ds.SetProjection(projection)
band = ds.GetRasterBand(1)
band.WriteArray(npRaster)
band.SetNoDataValue(nodata)
band.FlushCache()
ds.FlushCache()
band = None
ds = None
def _safeRemove(self, path):
"""
Remove arquivo temporário de forma segura.
No Windows, arquivos podem estar travados pelo GDAL/Qt.
Não falha se não conseguir deletar.
"""
if not path or not os.path.exists(path):
return
try:
# Tentar liberar via GDAL primeiro
gdal.Unlink(path)
except Exception:
pass
# Se ainda existir, tentar via OS
if os.path.exists(path):
try:
os.remove(path)
except OSError:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment