#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Crea un MBTiles Terrain-RGB (PNG) mosaicato da tutti i TIF in una directory. - Mosaico per-tile (FirstMethod) -> niente buchi ai bordi - Resampling 'nearest' -> preserva i byte RGB (quote intatte) - Buffer di tile (tile_buffer) per chiudere il "seam" agli zoom bassi - mask=None NON considerata "vuota" (si salta solo mask presente e tutta 0) - Scrittura MBTiles standard (tile_row in TMS) Requisiti: pip install rio-tiler mercantile numpy pillow rasterio """ import os import math import glob import argparse import sqlite3 from typing import List, Tuple, Optional, Any import mercantile import numpy as np from rio_tiler.io import COGReader from rio_tiler.mosaic import mosaic_reader from rio_tiler.mosaic.methods import FirstMethod from rio_tiler.utils import render # Compatibilit con rio-tiler (ImageData) try: from rio_tiler.models import ImageData except Exception: ImageData = None # type: ignore # ============================== # Parametri di default # ============================== DEFAULT_MIN_ZOOM = 5 DEFAULT_MAX_ZOOM = 14 DEFAULT_TILESIZE = 256 DEFAULT_THREADS = 4 DEFAULT_RESAMPLING = "nearest" DEFAULT_SKIP_EMPTY = False # di default NON saltiamo tile DEFAULT_BUFFER = 0 # buffer in pixel; auto-buffer se 0 DEFAULT_BBOX_EPS = 0.001 # espansione bbox in gradi (100 m) # BBOX Italia (WGS84, lon/lat) approx DEFAULT_ITALY_BBOX = (6.6, 36.6, 18.8, 47.3) # (W, S, E, N) # ============================== # Utility # ============================== def parse_bbox(bbox_str: Optional[str]) -> Optional[Tuple[float, float, float, float]]: if not bbox_str: return None parts = bbox_str.split(",") if len(parts) != 4: raise ValueError("La bbox deve essere nel formato: minx,miny,maxx,maxy") return tuple(map(float, parts)) # type: ignore def list_rasters_in_dir(input_dir: str) -> List[str]: patterns = ["*.tif", "*.tiff", "*.TIF", "*.TIFF"] files = [] for p in patterns: files.extend(glob.glob(os.path.join(input_dir, p))) return sorted(files) def get_union_bounds(assets: List[str]) -> Tuple[float, float, float, float]: """Unisce i bounds (lon/lat WGS84) dei COG/GeoTIFF. Aggiunge un piccolo epsilon.""" minx = miny = math.inf maxx = maxy = -math.inf for a in assets: with COGReader(a) as cog: b = cog.bounds # (minx, miny, maxx, maxy) in EPSG:4326 minx = min(minx, b[0]); miny = min(miny, b[1]) maxx = max(maxx, b[2]); maxy = max(maxy, b[3]) eps = 1e-10 return (minx - eps, miny - eps, maxx + eps, maxy + eps) def expand_bbox(b: Tuple[float,float,float,float], eps: float) -> Tuple[float,float,float,float]: """Espande la bbox di un epsilon in tutte le direzioni.""" return (b[0]-eps, b[1]-eps, b[2]+eps, b[3]+eps) def intersect_bbox(a: Tuple[float, float, float, float], b: Tuple[float, float, float, float]) -> Optional[Tuple[float, float, float, float]]: """Intersezione di due bbox. Ritorna None se vuota.""" minx = max(a[0], b[0]); miny = max(a[1], b[1]) maxx = min(a[2], b[2]); maxy = min(a[3], b[3]) if minx >= maxx or miny >= maxy: return None return (minx, miny, maxx, maxy) def reader(asset: str, x: int, y: int, z: int, **kwargs): """Reader per mosaic_reader: restituisce un ImageData (o data/mask).""" with COGReader(asset) as cog: return cog.tile(x, y, z, **kwargs) def xyz_to_tms_y(z: int, y: int) -> int: """Converte Y in TMS per MBTiles (capovolto).""" return (2**z - 1) - y def init_mbtiles(path: str, metadata: dict, fast: bool = True) -> sqlite3.Connection: """Crea un MBTiles vuoto con metadata. Se esiste, lo sovrascrive.""" if os.path.exists(path): os.remove(path) con = sqlite3.connect(path) cur = con.cursor() if fast: cur.execute("PRAGMA synchronous=OFF;") cur.execute("PRAGMA journal_mode=MEMORY;") cur.execute("PRAGMA temp_store=MEMORY;") cur.executescript(""" CREATE TABLE metadata (name TEXT, value TEXT); CREATE TABLE tiles ( zoom_level INTEGER, tile_column INTEGER, tile_row INTEGER, tile_data BLOB ); CREATE UNIQUE INDEX tile_index ON tiles (zoom_level, tile_column, tile_row); """) cur.executemany( "INSERT INTO metadata (name, value) VALUES (?, ?)", [(k, str(v)) for k, v in metadata.items()] ) con.commit() return con def normalize_mask(mask: Any) -> Optional[np.ndarray]: """ Rende la mask un array 2D uint8 (0/255). - Accetta: None, bool/uint8 array, o list/tuple contenenti array numerici. - Ignora elementi non-numerici (es. liste di stringhe 'assets_used'). """ if mask is None: return None def to_uint8_2d(arr: np.ndarray) -> Optional[np.ndarray]: # 3D con 1 banda -> squeeze if arr.ndim == 3 and arr.shape[0] == 1: arr = arr[0] elif arr.ndim == 3: arr = arr.squeeze() if arr.ndim != 2: return None # Porta a uint8 0/255 if arr.dtype == np.uint8: return arr if arr.dtype == np.bool_: return arr.astype(np.uint8) * 255 if np.issubdtype(arr.dtype, np.number): return (arr > 0).astype(np.uint8) * 255 return None # Lista/Tupla -> OR per-pixel degli elementi numerici if isinstance(mask, (list, tuple)): masks: List[np.ndarray] = [] for m in mask: if m is None: continue arr = np.asarray(m) if (not np.issubdtype(arr.dtype, np.number)) and (arr.dtype != np.bool_): continue conv = to_uint8_2d(arr) if conv is not None: masks.append(conv) if not masks: return None # allinea shapes alla minima comune h = min(m.shape[0] for m in masks) w = min(m.shape[1] for m in masks) if any((m.shape[0] != h or m.shape[1] != w) for m in masks): masks = [m[:h, :w] for m in masks] return np.maximum.reduce(masks) # Caso singolo array arr = np.asarray(mask) if (not np.issubdtype(arr.dtype, np.number)) and (arr.dtype != np.bool_): return None return to_uint8_2d(arr) def split_mosaic_output(out: Any) -> Tuple[np.ndarray, Optional[np.ndarray]]: """ Estrae (data, mask) dall'output di mosaic_reader a prova di versione: - ImageData (rio_tiler >= 5/6): usa .data e .mask - Tuple: (ImageData, assets_used) o (data, mask) o (data, assets_used, mask) - Dict con 'data'/'mask' """ if ImageData is not None and isinstance(out, ImageData): return out.data, out.mask if isinstance(out, dict) and "data" in out: return out["data"], out.get("mask") if isinstance(out, (tuple, list)): # (ImageData, assets_used) if len(out) >= 1 and (ImageData is not None) and isinstance(out[0], ImageData): img = out[0] return img.data, img.mask arrays = [o for o in out if isinstance(o, np.ndarray)] data = None mask = None for a in arrays: if a.ndim == 3: data = a break for a in arrays: if a.ndim == 2: mask = a break if mask is None: for a in arrays: if a.ndim == 3 and a.shape[0] == 1: mask = a[0] break if data is None and arrays: data = arrays[0] if data is None: raise RuntimeError("Impossibile determinare 'data' dalla risposta di mosaic_reader.") return data, mask # fallback try: data, mask = out # type: ignore return data, mask except Exception: raise RuntimeError("Impossibile determinare 'data'/'mask' dalla risposta di mosaic_reader (tipo sconosciuto).") def safe_mosaic_reader( assets: List[str], x: int, y: int, z: int, tilesize: int, resampling: str, threads: int, tile_buffer: int ) -> Any: """ Chiama mosaic_reader provando firme diverse (compatibilit tra versioni): 1) mosaic_assets=..., tile_buffer=... 2) mosaic_assets=... 3) assets=..., tile_buffer=... 4) assets=... """ base = dict( reader=reader, x=x, y=y, z=z, pixel_selection=FirstMethod(), tilesize=tilesize, resampling_method=resampling, threads=threads, ) # 1) mosaic_assets + tile_buffer try: return mosaic_reader(mosaic_assets=assets, **base, tile_buffer=tile_buffer) except TypeError: pass # 2) mosaic_assets senza tile_buffer try: return mosaic_reader(mosaic_assets=assets, **base) except TypeError: pass # 3) assets + tile_buffer try: return mosaic_reader(assets=assets, **base, tile_buffer=tile_buffer) except TypeError: pass # 4) assets senza tile_buffer return mosaic_reader(assets=assets, **base) # ============================== # Core # ============================== def generate_mbtiles_from_dir( input_dir: str, out_mbtiles: str, minzoom: int = DEFAULT_MIN_ZOOM, maxzoom: int = DEFAULT_MAX_ZOOM, tilesize: int = DEFAULT_TILESIZE, resampling: str = DEFAULT_RESAMPLING, threads: int = DEFAULT_THREADS, skip_empty: bool = DEFAULT_SKIP_EMPTY, bbox: Optional[Tuple[float, float, float, float]] = None, bbox_eps: float = DEFAULT_BBOX_EPS, buffer: int = DEFAULT_BUFFER, ): assets = list_rasters_in_dir(input_dir) if not assets: raise RuntimeError(f"Nessun file .tif trovato in: {input_dir}") # AOI: usa bbox se fornita (espansa), altrimenti union_bounds (espansa) if bbox is None: aoi = expand_bbox(get_union_bounds(assets), bbox_eps) else: aoi = expand_bbox(bbox, bbox_eps) # Metadati MBTiles center_lon = (aoi[0] + aoi[2]) / 2.0 center_lat = (aoi[1] + aoi[3]) / 2.0 metadata = { "name": f"Terrain-RGB z{minzoom}-{maxzoom}", "type": "raster", "format": "png", "minzoom": minzoom, "maxzoom": maxzoom, "bounds": f"{aoi[0]},{aoi[1]},{aoi[2]},{aoi[3]}", "center": f"{center_lon:.6f},{center_lat:.6f},{minzoom}", } print(f"-> Trovati {len(assets)} raster in: {input_dir}") print(f"-> AOI usata (WGS84, espansa di {bbox_eps}): {aoi}") print(f"-> Zoom: {minzoom}..{maxzoom}, tilesize={tilesize}, resampling={resampling}, threads={threads}, buffer={buffer} (auto<=11)") print(f"-> Output: {out_mbtiles}") con = init_mbtiles(out_mbtiles, metadata, fast=True) cur = con.cursor() try: for z in range(minzoom, maxzoom + 1): # Auto-buffer: se non specificato, usa 2 px per zoom <= 11 z_buffer = buffer if z_buffer <= 0 and z <= 11: z_buffer = 2 print(f"\n== Zoom {z} (tile_buffer={z_buffer}) ==") tiles = list(mercantile.tiles(aoi[0], aoi[1], aoi[2], aoi[3], z)) total = len(tiles) for idx, t in enumerate(tiles, 1): out = safe_mosaic_reader( assets=assets, x=t.x, y=t.y, z=z, tilesize=tilesize, resampling=resampling, threads=threads, tile_buffer=z_buffer, ) data, raw_mask = split_mosaic_output(out) mask = normalize_mask(raw_mask) # NON considerare mai mask=None come vuota; salta solo se mask presente e tutta 0 if skip_empty and (mask is not None) and (mask.max() == 0): continue png = render(data, mask=mask, img_format="PNG") tms_y = xyz_to_tms_y(z, t.y) cur.execute( "INSERT OR REPLACE INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?,?,?,?)", (z, t.x, tms_y, sqlite3.Binary(png)) ) if idx % 500 == 0 or idx == total: print(f" {idx}/{total} tile...") con.commit() # commit per livello except KeyboardInterrupt: print("\nInterrotto dall'utente. Scrittura parziale salvata.") finally: try: cur.execute("ANALYZE;") cur.execute("VACUUM;") con.commit() except Exception: pass con.close() print(f"\nCompletato: {out_mbtiles}") # ============================== # CLI # ============================== def main(): ap = argparse.ArgumentParser( description="Crea un MBTiles Terrain-RGB (PNG) mosaicato da tutti i TIF in una directory (z=5..14 di default)." ) ap.add_argument("input_dir", help="Directory con i TIF/TIFF RGB (COG o GeoTIFF).") ap.add_argument("out_mbtiles", help="Percorso file .mbtiles da creare.") ap.add_argument("--min-zoom", type=int, default=DEFAULT_MIN_ZOOM) ap.add_argument("--max-zoom", type=int, default=DEFAULT_MAX_ZOOM) ap.add_argument("--tilesize", type=int, default=DEFAULT_TILESIZE, choices=[256, 512]) ap.add_argument("--threads", type=int, default=DEFAULT_THREADS) ap.add_argument("--resampling", default=DEFAULT_RESAMPLING, choices=["nearest"]) ap.add_argument("--skip-empty", action="store_true", default=DEFAULT_SKIP_EMPTY, help="Se impostato, non scrive tile completamente vuote (mask=0).") ap.add_argument("--bbox", type=str, default=None, help="BBox WGS84 minx,miny,maxx,maxy (se non indicata, usa i bounds dei dati).") ap.add_argument("--bbox-eps", type=float, default=DEFAULT_BBOX_EPS, help="Espansione della bbox in gradi per includere tile al bordo (default 0.001).") ap.add_argument("--buffer", type=int, default=DEFAULT_BUFFER, help="Tile buffer in pixel (consigliato 1-2 per zoom bassi). Se 0, usa auto-buffer=2 per z<=11.") args = ap.parse_args() bbox = parse_bbox(args.bbox) if args.bbox else None generate_mbtiles_from_dir( input_dir=args.input_dir, out_mbtiles=args.out_mbtiles, minzoom=args.min_zoom, maxzoom=args.max_zoom, tilesize=args.tilesize, resampling=args.resampling, threads=args.threads, skip_empty=args.skip_empty, bbox=bbox, bbox_eps=args.bbox_eps, buffer=args.buffer, ) if __name__ == "__main__": main()