Created
March 4, 2026 19:42
-
-
Save dinizime/cbb4b31c4f34afa2de2112241ade8dd4 to your computer and use it in GitHub Desktop.
Trata o raster classificado generalizando para escala 50k e conformando area edificada e massa dagua
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| """ | |
| /*************************************************************************** | |
| DsgTools | |
| A QGIS plugin | |
| Brazilian Army Cartographic Production Tools | |
| ------------------- | |
| begin : 2025-07-17 | |
| git sha : $Format:%H$ | |
| copyright : (C) 2025 by Philipe Borba - Cartographic Engineer @ Brazilian Army | |
| email : borba.philipe@eb.mil.br | |
| ***************************************************************************/ | |
| /*************************************************************************** | |
| * * | |
| * This program is free software; you can redistribute it and/or modify * | |
| * it under the terms of the GNU General Public License as published by * | |
| * the Free Software Foundation; either version 2 of the License, or * | |
| * (at your option) any later version. * | |
| * * | |
| ***************************************************************************/ | |
| """ | |
| import gc | |
| import os | |
| import json | |
| import shutil | |
| import tempfile | |
| import uuid | |
| import numpy as np | |
| from osgeo import gdal | |
| from qgis.PyQt.QtCore import QCoreApplication | |
| from qgis.core import ( | |
| QgsProcessing, | |
| QgsProcessingAlgorithm, | |
| QgsProcessingParameterRasterLayer, | |
| QgsProcessingParameterVectorLayer, | |
| QgsProcessingParameterNumber, | |
| QgsProcessingParameterString, | |
| QgsProcessingParameterRasterDestination, | |
| QgsProcessingException, | |
| QgsProcessingMultiStepFeedback, | |
| QgsRasterLayer, | |
| ) | |
| from DsgTools.core.DSGToolsProcessingAlgs.algRunner import AlgRunner | |
| import processing | |
| class TrataRasterAlgorithm(QgsProcessingAlgorithm): | |
| """ | |
| Algoritmo consolidado de tratamento de raster para classificação de vegetação. | |
| Substitui o modelo "02-Trata_Raster" por um único algoritmo Python. | |
| Fluxo: | |
| 1. Leitura do raster | |
| 2. Sieve inicial | |
| 3. Remover pixels de massa d'água e área edificada (torna nodata por valor DN) | |
| 4. Queimar polígonos de referência com valores corretos das classes | |
| 5. Reclassificar pixels adjacentes com valor nodata | |
| 6. Generalização V4 passo 1 (15625 m²) | |
| 7. Sieve pós-V4 | |
| 8. Generalização V4 passo 2 (62500 m²) | |
| 9. Sieve final | |
| 10. Escrita do raster de saída | |
| """ | |
| INPUT_RASTER = "INPUT_RASTER" | |
| AREA_EDIFICADA = "AREA_EDIFICADA" | |
| MASSA_DAGUA = "MASSA_DAGUA" | |
| AREA_EDIFICADA_VALUE = "AREA_EDIFICADA_VALUE" | |
| MASSA_DAGUA_VALUE = "MASSA_DAGUA_VALUE" | |
| NODATA_VALUE = "NODATA_VALUE" | |
| BUFFER_DISTANCE = "BUFFER_DISTANCE" | |
| SIEVE_THRESHOLD = "SIEVE_THRESHOLD" | |
| FIRST_PASS_MIN_AREA = "FIRST_PASS_MIN_AREA" | |
| SECOND_PASS_MIN_AREA = "SECOND_PASS_MIN_AREA" | |
| GENERALIZATION_RULES_PASS1 = "GENERALIZATION_RULES_PASS1" | |
| GENERALIZATION_RULES_PASS2 = "GENERALIZATION_RULES_PASS2" | |
| OUTPUT_RASTER = "OUTPUT_RASTER" | |
| DEFAULT_RULES_PASS1 = json.dumps( | |
| { | |
| "class_restrictions": {"4": [2, 3, 6]}, | |
| "size_thresholds": {"1": 2500}, | |
| "non_growing_classes": [1], | |
| } | |
| ) | |
| DEFAULT_RULES_PASS2 = json.dumps( | |
| { | |
| "class_restrictions": {"4": [2, 3, 6]}, | |
| "size_thresholds": {"1": 10000}, | |
| "non_growing_classes": [1], | |
| } | |
| ) | |
| def __init__(self): | |
| super().__init__() | |
| self.algRunner = AlgRunner() | |
| self._tmpDir = None | |
| self._tmpCounter = 0 | |
| def tr(self, string): | |
| return QCoreApplication.translate("TrataRasterAlgorithm", string) | |
| def createInstance(self): | |
| return TrataRasterAlgorithm() | |
| def name(self): | |
| return "trataraster" | |
| def displayName(self): | |
| return self.tr("Trata Raster (Consolidado)") | |
| def group(self): | |
| return self.tr("Raster Handling") | |
| def groupId(self): | |
| return "DSGTools - Raster Handling" | |
| def shortHelpString(self): | |
| return self.tr( | |
| "Algoritmo consolidado de tratamento de raster para classificação " | |
| "de vegetação.\n\n" | |
| "Realiza limpeza (sieve), correção de classes (massa d'água e " | |
| "área edificada), generalização (V4) e preenchimento de nodata.\n\n" | |
| "A correção de classes funciona em etapas:\n" | |
| "1. Remove TODOS os pixels do raster que possuem os valores DN " | |
| "de massa d'água e área edificada (torna nodata)\n" | |
| "2. Queima os polígonos de referência com os valores corretos\n" | |
| "3. Preenche nodata adjacente por vizinho mais próximo\n\n" | |
| "Parâmetros:\n" | |
| "- Raster: Raster classificado de entrada (banda única)\n" | |
| "- Área Edificada: Camada vetorial de polígonos\n" | |
| "- Massa d'Água: Camada vetorial de polígonos\n" | |
| "- Valor Área Edificada: Valor DN da classe (ex: 4)\n" | |
| "- Valor Massa d'Água: Valor DN da classe (ex: 1)\n" | |
| "- Valor NoData: (padrão: -9999)\n" | |
| "- Distância Buffer: Buffer negativo em unidades do mapa " | |
| "(padrão: -0.0001)\n" | |
| "- Limiar Sieve: Pixels mínimos por grupo (padrão: 50)\n" | |
| "- Área Mínima Passo 1: m² para V4 passo 1 (padrão: 15625)\n" | |
| "- Área Mínima Passo 2: m² para V4 passo 2 (padrão: 62500)\n" | |
| "- Regras de Generalização: JSON com restrições de classe" | |
| ) | |
| def initAlgorithm(self, config=None): | |
| self.addParameter( | |
| QgsProcessingParameterRasterLayer( | |
| self.INPUT_RASTER, | |
| self.tr("Raster de Entrada"), | |
| optional=False, | |
| ) | |
| ) | |
| self.addParameter( | |
| QgsProcessingParameterVectorLayer( | |
| self.AREA_EDIFICADA, | |
| self.tr("Área Edificada (Polígonos)"), | |
| [QgsProcessing.TypeVectorPolygon], | |
| optional=False, | |
| ) | |
| ) | |
| self.addParameter( | |
| QgsProcessingParameterVectorLayer( | |
| self.MASSA_DAGUA, | |
| self.tr("Massa d'Água (Polígonos)"), | |
| [QgsProcessing.TypeVectorPolygon], | |
| optional=False, | |
| ) | |
| ) | |
| self.addParameter( | |
| QgsProcessingParameterNumber( | |
| self.AREA_EDIFICADA_VALUE, | |
| self.tr("Valor DN da Classe Área Edificada"), | |
| type=QgsProcessingParameterNumber.Integer, | |
| defaultValue=2, | |
| ) | |
| ) | |
| self.addParameter( | |
| QgsProcessingParameterNumber( | |
| self.MASSA_DAGUA_VALUE, | |
| self.tr("Valor DN da Classe Massa d'Água"), | |
| type=QgsProcessingParameterNumber.Integer, | |
| defaultValue=1, | |
| ) | |
| ) | |
| self.addParameter( | |
| QgsProcessingParameterNumber( | |
| self.NODATA_VALUE, | |
| self.tr("Valor NoData"), | |
| type=QgsProcessingParameterNumber.Integer, | |
| defaultValue=-9999, | |
| ) | |
| ) | |
| self.addParameter( | |
| QgsProcessingParameterNumber( | |
| self.BUFFER_DISTANCE, | |
| self.tr("Distância do Buffer Negativo (unidades do CRS do raster)"), | |
| type=QgsProcessingParameterNumber.Double, | |
| defaultValue=-5, | |
| ) | |
| ) | |
| self.addParameter( | |
| QgsProcessingParameterNumber( | |
| self.SIEVE_THRESHOLD, | |
| self.tr("Limiar Sieve (pixels)"), | |
| type=QgsProcessingParameterNumber.Integer, | |
| defaultValue=50, | |
| ) | |
| ) | |
| self.addParameter( | |
| QgsProcessingParameterNumber( | |
| self.FIRST_PASS_MIN_AREA, | |
| self.tr("Área Mínima - Passo 1 (m²)"), | |
| type=QgsProcessingParameterNumber.Integer, | |
| defaultValue=15625, | |
| ) | |
| ) | |
| self.addParameter( | |
| QgsProcessingParameterNumber( | |
| self.SECOND_PASS_MIN_AREA, | |
| self.tr("Área Mínima - Passo 2 (m²)"), | |
| type=QgsProcessingParameterNumber.Integer, | |
| defaultValue=62500, | |
| ) | |
| ) | |
| self.addParameter( | |
| QgsProcessingParameterString( | |
| self.GENERALIZATION_RULES_PASS1, | |
| self.tr( | |
| "Regras de Generalização - Passo 1 (JSON). " | |
| 'Formato: {"class_restrictions": {...}, ' | |
| '"size_thresholds": {...}, ' | |
| '"non_growing_classes": [...]}' | |
| ), | |
| defaultValue=self.DEFAULT_RULES_PASS1, | |
| optional=True, | |
| ) | |
| ) | |
| self.addParameter( | |
| QgsProcessingParameterString( | |
| self.GENERALIZATION_RULES_PASS2, | |
| self.tr( | |
| "Regras de Generalização - Passo 2 (JSON). " | |
| "Usado no segundo passo V4." | |
| ), | |
| defaultValue=self.DEFAULT_RULES_PASS2, | |
| optional=True, | |
| ) | |
| ) | |
| self.addParameter( | |
| QgsProcessingParameterRasterDestination( | |
| self.OUTPUT_RASTER, | |
| self.tr("Raster de Saída"), | |
| optional=False, | |
| ) | |
| ) | |
| def processAlgorithm(self, parameters, context, feedback): | |
| try: | |
| from scipy.spatial import KDTree | |
| except ImportError: | |
| raise QgsProcessingException( | |
| self.tr( | |
| "Este algoritmo requer a biblioteca scipy. " | |
| "Instale com: pip install scipy" | |
| ) | |
| ) | |
| # ---- Leitura dos parâmetros ---- | |
| inputRaster = self.parameterAsRasterLayer( | |
| parameters, self.INPUT_RASTER, context | |
| ) | |
| areaEdificada = self.parameterAsVectorLayer( | |
| parameters, self.AREA_EDIFICADA, context | |
| ) | |
| massaDagua = self.parameterAsVectorLayer( | |
| parameters, self.MASSA_DAGUA, context | |
| ) | |
| areaEdificadaValue = self.parameterAsInt( | |
| parameters, self.AREA_EDIFICADA_VALUE, context | |
| ) | |
| massaDaguaValue = self.parameterAsInt( | |
| parameters, self.MASSA_DAGUA_VALUE, context | |
| ) | |
| nodata = self.parameterAsInt(parameters, self.NODATA_VALUE, context) | |
| bufferDist = self.parameterAsDouble(parameters, self.BUFFER_DISTANCE, context) | |
| sieveThreshold = self.parameterAsInt(parameters, self.SIEVE_THRESHOLD, context) | |
| firstPassMinArea = self.parameterAsInt( | |
| parameters, self.FIRST_PASS_MIN_AREA, context | |
| ) | |
| secondPassMinArea = self.parameterAsInt( | |
| parameters, self.SECOND_PASS_MIN_AREA, context | |
| ) | |
| rulesPass1 = self.parameterAsString( | |
| parameters, self.GENERALIZATION_RULES_PASS1, context | |
| ) | |
| rulesPass2 = self.parameterAsString( | |
| parameters, self.GENERALIZATION_RULES_PASS2, context | |
| ) | |
| outputRaster = self.parameterAsOutputLayer( | |
| parameters, self.OUTPUT_RASTER, context | |
| ) | |
| if not inputRaster or not inputRaster.isValid(): | |
| raise QgsProcessingException( | |
| self.tr("Raster de entrada inválido ou não fornecido.") | |
| ) | |
| self._tmpDir = tempfile.mkdtemp(prefix="trata_raster_") | |
| self._tmpCounter = 0 | |
| totalSteps = 8 | |
| multiStepFeedback = QgsProcessingMultiStepFeedback(totalSteps, feedback) | |
| step = 0 | |
| try: | |
| # ============================================================== | |
| # ETAPA 1: Leitura do raster | |
| # ============================================================== | |
| multiStepFeedback.pushInfo( | |
| self.tr("Etapa 1/10: Lendo raster de entrada...") | |
| ) | |
| npRaster, geotransform, projection = self._readRaster( | |
| inputRaster.source() | |
| ) | |
| multiStepFeedback.pushInfo( | |
| self.tr( | |
| f" Dimensões: {npRaster.shape[0]} x {npRaster.shape[1]}, " | |
| f"dtype: {npRaster.dtype}" | |
| ) | |
| ) | |
| # Preparar polígonos (feito uma vez, reutilizado) | |
| # Reprojeção para o CRS do raster é feita aqui se necessário, | |
| # porque gdal_rasterize não reprojeta automaticamente. | |
| rasterCrs = inputRaster.crs() | |
| multiStepFeedback.pushInfo( | |
| self.tr( | |
| f"Preparando polígonos (reprojeção, dissolve, buffer)... " | |
| f"CRS raster: {rasterCrs.authid()}" | |
| ) | |
| ) | |
| massaDaguaBuffered = self._prepareSinglePolygonLayer( | |
| massaDagua, bufferDist, rasterCrs, context, multiStepFeedback | |
| ) | |
| areaEdificadaBuffered = self._prepareSinglePolygonLayer( | |
| areaEdificada, bufferDist, rasterCrs, context, multiStepFeedback | |
| ) | |
| # ============================================================== | |
| # ETAPA 2: Sieve inicial | |
| # ============================================================== | |
| multiStepFeedback.setCurrentStep(step) | |
| multiStepFeedback.pushInfo( | |
| self.tr(f"Etapa 2/10: Sieve inicial (limiar={sieveThreshold})...") | |
| ) | |
| npRaster = self._runSieve( | |
| npRaster, geotransform, projection, sieveThreshold, | |
| context, multiStepFeedback, | |
| ) | |
| step += 1 | |
| if multiStepFeedback.isCanceled(): | |
| return {} | |
| # ============================================================== | |
| # ETAPA 3: Remover massa d'água e área edificada (nodata por DN) | |
| # ============================================================== | |
| multiStepFeedback.setCurrentStep(step) | |
| multiStepFeedback.pushInfo( | |
| self.tr( | |
| "Etapa 3/10: Removendo pixels de massa d'água " | |
| "e área edificada..." | |
| ) | |
| ) | |
| nPixelsMassa = int(np.sum(npRaster == massaDaguaValue)) | |
| nPixelsEdif = int(np.sum(npRaster == areaEdificadaValue)) | |
| multiStepFeedback.pushInfo( | |
| self.tr( | |
| f" {nPixelsMassa} pixels massa d'água (DN={massaDaguaValue}), " | |
| f"{nPixelsEdif} pixels área edificada (DN={areaEdificadaValue}) " | |
| f"convertidos para nodata ({nodata})." | |
| ) | |
| ) | |
| npRaster[npRaster == massaDaguaValue] = nodata | |
| npRaster[npRaster == areaEdificadaValue] = nodata | |
| step += 1 | |
| if multiStepFeedback.isCanceled(): | |
| return {} | |
| # ============================================================== | |
| # ETAPA 4: Queimar polígonos com valores corretos | |
| # ============================================================== | |
| multiStepFeedback.setCurrentStep(step) | |
| multiStepFeedback.pushInfo( | |
| self.tr("Etapa 4/10: Queimando polígonos com valores corretos...") | |
| ) | |
| multiStepFeedback.pushInfo( | |
| self.tr( | |
| f" Queimando massa d'água (DN={massaDaguaValue})..." | |
| ) | |
| ) | |
| npRaster = self._burnValueOnRaster( | |
| npRaster, geotransform, projection, | |
| massaDaguaBuffered, massaDaguaValue, | |
| context, multiStepFeedback, | |
| ) | |
| multiStepFeedback.pushInfo( | |
| self.tr( | |
| f" Queimando área edificada (DN={areaEdificadaValue})..." | |
| ) | |
| ) | |
| npRaster = self._burnValueOnRaster( | |
| npRaster, geotransform, projection, | |
| areaEdificadaBuffered, areaEdificadaValue, | |
| context, multiStepFeedback, | |
| ) | |
| step += 1 | |
| if multiStepFeedback.isCanceled(): | |
| return {} | |
| # ============================================================== | |
| # ETAPA 5: Reclassificar pixels adjacentes com valor nodata | |
| # ============================================================== | |
| multiStepFeedback.setCurrentStep(step) | |
| nNodataPixels = int(np.sum(npRaster == nodata)) | |
| multiStepFeedback.pushInfo( | |
| self.tr( | |
| f"Etapa 5/10: Reclassificando {nNodataPixels} pixels " | |
| f"nodata por vizinho mais próximo..." | |
| ) | |
| ) | |
| if nNodataPixels > 0: | |
| npRaster = self._reclassifyValuesToNearest( | |
| npRaster, valuesToReclassify=[nodata], nodata=None | |
| ) | |
| step += 1 | |
| if multiStepFeedback.isCanceled(): | |
| return {} | |
| # ============================================================== | |
| # ETAPA 6: Generalização V4 passo 1 (15625 m²) | |
| # ============================================================== | |
| multiStepFeedback.setCurrentStep(step) | |
| multiStepFeedback.pushInfo( | |
| self.tr( | |
| f"Etapa 6/10: Generalização V4 passo 1 " | |
| f"(min_area={firstPassMinArea} m²)..." | |
| ) | |
| ) | |
| npRaster = self._runGeneralizationV4( | |
| npRaster, geotransform, projection, nodata, | |
| firstPassMinArea, rulesPass1, context, multiStepFeedback, | |
| ) | |
| self._forceCleanup(multiStepFeedback) | |
| step += 1 | |
| if multiStepFeedback.isCanceled(): | |
| return {} | |
| # ============================================================== | |
| # ETAPA 7: Sieve pós-V4 | |
| # ============================================================== | |
| multiStepFeedback.setCurrentStep(step) | |
| multiStepFeedback.pushInfo( | |
| self.tr(f"Etapa 7/10: Sieve pós-V4 (limiar={sieveThreshold})...") | |
| ) | |
| npRaster = self._runSieve( | |
| npRaster, geotransform, projection, sieveThreshold, | |
| context, multiStepFeedback, | |
| ) | |
| step += 1 | |
| if multiStepFeedback.isCanceled(): | |
| return {} | |
| # ============================================================== | |
| # ETAPA 8: Generalização V4 passo 2 (62500 m²) | |
| # ============================================================== | |
| multiStepFeedback.setCurrentStep(step) | |
| multiStepFeedback.pushInfo( | |
| self.tr( | |
| f"Etapa 8/10: Generalização V4 passo 2 " | |
| f"(min_area={secondPassMinArea} m²)..." | |
| ) | |
| ) | |
| npRaster = self._runGeneralizationV4( | |
| npRaster, geotransform, projection, nodata, | |
| secondPassMinArea, rulesPass2, context, multiStepFeedback, | |
| ) | |
| self._forceCleanup(multiStepFeedback) | |
| step += 1 | |
| if multiStepFeedback.isCanceled(): | |
| return {} | |
| # ============================================================== | |
| # ETAPA 9: Sieve final | |
| # ============================================================== | |
| multiStepFeedback.pushInfo( | |
| self.tr(f"Etapa 9/10: Sieve final (limiar={sieveThreshold})...") | |
| ) | |
| npRaster = self._runSieve( | |
| npRaster, geotransform, projection, sieveThreshold, | |
| context, multiStepFeedback, | |
| ) | |
| # ============================================================== | |
| # ETAPA 10: Escrita do raster de saída | |
| # ============================================================== | |
| multiStepFeedback.pushInfo( | |
| self.tr("Etapa 10/10: Escrevendo raster de saída...") | |
| ) | |
| self._writeRaster( | |
| npRaster, geotransform, projection, outputRaster, nodata | |
| ) | |
| multiStepFeedback.pushInfo( | |
| self.tr("Processamento concluído com sucesso!") | |
| ) | |
| return {self.OUTPUT_RASTER: outputRaster} | |
| finally: | |
| if self._tmpDir and os.path.exists(self._tmpDir): | |
| shutil.rmtree(self._tmpDir, ignore_errors=True) | |
| # ================================================================== | |
| # Operações em memória (sem I/O de disco) | |
| # ================================================================== | |
| def _runSieve( | |
| self, npRaster, geotransform, projection, threshold, context, feedback | |
| ): | |
| """ | |
| Aplica Sieve do GDAL via algRunner.runSieve (processing.run). | |
| Escreve em disco para evitar access violation com o driver MEM | |
| no Windows (GDAL 3.4.x). | |
| """ | |
| tmpInput = self._uniqueTmpPath("sieve_in") | |
| tmpOutput = self._uniqueTmpPath("sieve_out") | |
| try: | |
| self._writeRaster( | |
| npRaster, geotransform, projection, tmpInput | |
| ) | |
| outputPath = self.algRunner.runSieve( | |
| inputRaster=tmpInput, | |
| threshold=threshold, | |
| context=context, | |
| feedback=feedback, | |
| outputRaster=tmpOutput, | |
| ) | |
| if not os.path.exists(outputPath): | |
| feedback.pushWarning( | |
| self.tr( | |
| "Sieve não produziu saída. Retornando sem alteração." | |
| ) | |
| ) | |
| return npRaster | |
| result, _, _ = self._readRaster(outputPath) | |
| return result | |
| finally: | |
| self._safeRemove(tmpInput) | |
| self._safeRemove(tmpOutput) | |
| def _reclassifyValuesToNearest( | |
| self, npRaster, valuesToReclassify, nodata=-9999 | |
| ): | |
| """ | |
| Reclassifica pixels com valores específicos para o vizinho mais | |
| próximo usando KDTree (scipy). | |
| Args: | |
| npRaster: Array numpy (rows, cols). | |
| valuesToReclassify: Lista de valores DN a substituir. | |
| nodata: Valor de nodata excluído como fonte. Se None, não | |
| exclui nenhum valor adicional. | |
| """ | |
| from scipy.spatial import KDTree | |
| result = npRaster.copy() | |
| fillMask = np.isin(result, valuesToReclassify) | |
| sourceMask = ~fillMask | |
| if nodata is not None: | |
| sourceMask = sourceMask & (result != nodata) | |
| if not np.any(fillMask) or not np.any(sourceMask): | |
| return result | |
| rows, cols = np.mgrid[0 : result.shape[0], 0 : result.shape[1]] | |
| sourceCoords = np.column_stack((rows[sourceMask], cols[sourceMask])) | |
| fillCoords = np.column_stack((rows[fillMask], cols[fillMask])) | |
| nearestIndices = KDTree(sourceCoords).query(fillCoords)[1] | |
| result[fillMask] = result[sourceMask][nearestIndices] | |
| return result | |
| # ================================================================== | |
| # Preparação de polígonos | |
| # ================================================================== | |
| def _prepareSinglePolygonLayer( | |
| self, polygonLayer, bufferDist, targetCrs, context, feedback | |
| ): | |
| """ | |
| Reprojeta para o CRS do raster (se necessário), depois dissolve | |
| e aplica buffer negativo. | |
| A ordem é importante: | |
| - A reprojeção é feita primeiro para que o buffer seja aplicado | |
| em unidades do CRS do raster (ex: metros em EPSG:3857) | |
| - O gdal_rasterize não reprojeta automaticamente, então os | |
| polígonos precisam estar no mesmo CRS do raster | |
| """ | |
| needsReproject = polygonLayer.crs() != targetCrs | |
| nSteps = 3 if needsReproject else 2 | |
| innerFeedback = QgsProcessingMultiStepFeedback(nSteps, feedback) | |
| currentStep = 0 | |
| # Reprojetar para o CRS do raster se necessário | |
| layerToProcess = polygonLayer | |
| if needsReproject: | |
| innerFeedback.setCurrentStep(currentStep) | |
| innerFeedback.pushInfo( | |
| self.tr( | |
| f" Reprojetando de " | |
| f"{polygonLayer.crs().authid()} para " | |
| f"{targetCrs.authid()}..." | |
| ) | |
| ) | |
| layerToProcess = processing.run( | |
| "native:reprojectlayer", | |
| { | |
| "INPUT": polygonLayer, | |
| "TARGET_CRS": targetCrs, | |
| "OUTPUT": "TEMPORARY_OUTPUT", | |
| }, | |
| context=context, | |
| feedback=innerFeedback, | |
| is_child_algorithm=True, | |
| )["OUTPUT"] | |
| currentStep += 1 | |
| # Dissolve (já no CRS do raster) | |
| innerFeedback.setCurrentStep(currentStep) | |
| dissolved = self.algRunner.runDissolve( | |
| inputLyr=layerToProcess, | |
| context=context, | |
| feedback=innerFeedback, | |
| is_child_algorithm=True, | |
| ) | |
| currentStep += 1 | |
| # Buffer negativo em unidades do CRS do raster (metros) | |
| innerFeedback.setCurrentStep(currentStep) | |
| buffered = self.algRunner.runBuffer( | |
| inputLayer=dissolved, | |
| distance=bufferDist, | |
| context=context, | |
| feedback=innerFeedback, | |
| is_child_algorithm=True, | |
| ) | |
| return buffered | |
| # ================================================================== | |
| # Operações com I/O (requerem arquivos temporários) | |
| # ================================================================== | |
| def _uniqueTmpPath(self, label, ext="tif"): | |
| """ | |
| Gera um caminho de arquivo temporário único usando um contador | |
| incremental. Evita conflitos de file handle no Windows quando o | |
| GDAL ou Qt não liberam o arquivo imediatamente. | |
| """ | |
| self._tmpCounter += 1 | |
| name = f"{label}_{self._tmpCounter}_{uuid.uuid4().hex[:8]}.{ext}" | |
| return os.path.join(self._tmpDir, name) | |
| def _forceCleanup(self, feedback=None): | |
| """ | |
| Força liberação de memória e file handles pendentes do GDAL. | |
| Importante no Windows onde file handles podem persistir. | |
| """ | |
| gdal.VSICurlClearCache() | |
| gc.collect() | |
| def _readRaster(self, rasterPath): | |
| """ | |
| Lê um raster como array numpy int16 e retorna (array, geotransform, projection). | |
| Garante fechamento do dataset GDAL. | |
| """ | |
| ds = gdal.Open(rasterPath) | |
| if ds is None: | |
| raise QgsProcessingException( | |
| self.tr(f"Não foi possível abrir: {rasterPath}") | |
| ) | |
| npRaster = ds.GetRasterBand(1).ReadAsArray().astype(np.int16) | |
| geotransform = ds.GetGeoTransform() | |
| projection = ds.GetProjection() | |
| ds = None | |
| return npRaster, geotransform, projection | |
| def _burnValueOnRaster( | |
| self, | |
| npRaster, | |
| geotransform, | |
| projection, | |
| vectorLayer, | |
| burnValue, | |
| context, | |
| feedback, | |
| ): | |
| """ | |
| Queima um valor fixo sobre o raster nas áreas cobertas pela | |
| camada vetorial. | |
| Usa gdal:rasterize_over_fixed_value que recebe QgsVectorLayer e | |
| QgsRasterLayer e modifica o raster in-place. | |
| Usa nomes únicos para evitar conflito de file handles no Windows. | |
| """ | |
| tmpPath = self._uniqueTmpPath(f"burn_{burnValue}") | |
| try: | |
| # Escreve raster temporário | |
| self._writeRaster(npRaster, geotransform, projection, tmpPath) | |
| # Cria QgsRasterLayer (obrigatório para a API) | |
| rasterLayer = QgsRasterLayer(tmpPath, "temp_burn") | |
| if not rasterLayer.isValid(): | |
| raise QgsProcessingException( | |
| self.tr( | |
| "Erro ao carregar raster temporário para rasterização." | |
| ) | |
| ) | |
| # Queima valor in-place no arquivo tmpPath | |
| self.algRunner.runGdalRasterizeOverFixedValue( | |
| inputLayer=vectorLayer, | |
| inputRaster=rasterLayer, | |
| value=burnValue, | |
| context=context, | |
| feedback=feedback, | |
| is_child_algorithm=True, | |
| ) | |
| # Liberar QgsRasterLayer explicitamente | |
| del rasterLayer | |
| gc.collect() | |
| # Ler resultado de volta | |
| result, _, _ = self._readRaster(tmpPath) | |
| return result | |
| finally: | |
| # No Windows, o arquivo pode ainda estar em uso. | |
| # Não falhar se não conseguir deletar. | |
| self._safeRemove(tmpPath) | |
| def _runGeneralizationV4( | |
| self, npRaster, geotransform, projection, nodata, | |
| minArea, rulesJson, context, feedback, | |
| ): | |
| """Executa reclassificação de grupos de pixels V4 via processing.run.""" | |
| tmpInput = self._uniqueTmpPath("v4_in") | |
| tmpOutput = self._uniqueTmpPath("v4_out") | |
| try: | |
| self._writeRaster( | |
| npRaster, geotransform, projection, tmpInput, nodata | |
| ) | |
| feedback.pushInfo( | |
| self.tr(f" V4 input: {tmpInput}") | |
| ) | |
| feedback.pushInfo( | |
| self.tr(f" V4 output: {tmpOutput}") | |
| ) | |
| processing.run( | |
| "dsgtools:reclassifygroupsofpixelstonearestneighboralgorithmv4", | |
| { | |
| "INPUT": tmpInput, | |
| "MIN_AREA": minArea, | |
| "NODATA_VALUE": nodata, | |
| "GENERALIZATION_RULES": rulesJson if rulesJson else "", | |
| "OUTPUT": tmpOutput, | |
| }, | |
| context=context, | |
| feedback=feedback, | |
| is_child_algorithm=True, | |
| ) | |
| if not os.path.exists(tmpOutput): | |
| feedback.pushWarning( | |
| self.tr( | |
| "V4 não produziu arquivo de saída. " | |
| "Retornando sem alteração." | |
| ) | |
| ) | |
| return npRaster | |
| result, _, _ = self._readRaster(tmpOutput) | |
| # Validar dimensões | |
| if result.shape != npRaster.shape: | |
| feedback.pushWarning( | |
| self.tr( | |
| f"V4 produziu raster com dimensões diferentes: " | |
| f"{result.shape} vs {npRaster.shape}. " | |
| f"Retornando sem alteração." | |
| ) | |
| ) | |
| return npRaster | |
| return result | |
| finally: | |
| self._safeRemove(tmpInput) | |
| self._safeRemove(tmpOutput) | |
| # ================================================================== | |
| # Utilitários de I/O | |
| # ================================================================== | |
| def _writeRaster( | |
| self, npRaster, geotransform, projection, outputPath, nodata=-9999 | |
| ): | |
| """Escreve array numpy como GeoTIFF com compressão LZW.""" | |
| rows, cols = npRaster.shape | |
| driver = gdal.GetDriverByName("GTiff") | |
| ds = driver.Create( | |
| outputPath, cols, rows, 1, gdal.GDT_Int16, | |
| options=["COMPRESS=LZW", "TILED=YES"], | |
| ) | |
| ds.SetGeoTransform(geotransform) | |
| ds.SetProjection(projection) | |
| band = ds.GetRasterBand(1) | |
| band.WriteArray(npRaster) | |
| band.SetNoDataValue(nodata) | |
| band.FlushCache() | |
| ds.FlushCache() | |
| band = None | |
| ds = None | |
| def _safeRemove(self, path): | |
| """ | |
| Remove arquivo temporário de forma segura. | |
| No Windows, arquivos podem estar travados pelo GDAL/Qt. | |
| Não falha se não conseguir deletar. | |
| """ | |
| if not path or not os.path.exists(path): | |
| return | |
| try: | |
| # Tentar liberar via GDAL primeiro | |
| gdal.Unlink(path) | |
| except Exception: | |
| pass | |
| # Se ainda existir, tentar via OS | |
| if os.path.exists(path): | |
| try: | |
| os.remove(path) | |
| except OSError: | |
| pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment