diff --git a/dir_to_terrainrgb_mbtiles1.py b/dir_to_terrainrgb_mbtiles1.py new file mode 100644 index 0000000..0cba0db --- /dev/null +++ b/dir_to_terrainrgb_mbtiles1.py @@ -0,0 +1,421 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Crea un MBTiles Terrain-RGB (PNG) mosaicato da tutti i TIF in una directory. + +- Mosaico per-tile (FirstMethod) -> niente buchi ai bordi +- Resampling 'nearest' -> preserva i byte RGB (quote intatte) +- Buffer di tile (tile_buffer) per chiudere il "seam" agli zoom bassi +- mask=None NON considerata "vuota" (si salta solo mask presente e tutta 0) +- Scrittura MBTiles standard (tile_row in TMS) + +Requisiti: + pip install rio-tiler mercantile numpy pillow rasterio +""" + +import os +import math +import glob +import argparse +import sqlite3 +from typing import List, Tuple, Optional, Any + +import mercantile +import numpy as np +from rio_tiler.io import COGReader +from rio_tiler.mosaic import mosaic_reader +from rio_tiler.mosaic.methods import FirstMethod +from rio_tiler.utils import render + +# Compatibilit con rio-tiler (ImageData) +try: + from rio_tiler.models import ImageData +except Exception: + ImageData = None # type: ignore + +# ============================== +# Parametri di default +# ============================== + +DEFAULT_MIN_ZOOM = 5 +DEFAULT_MAX_ZOOM = 14 +DEFAULT_TILESIZE = 256 +DEFAULT_THREADS = 4 +DEFAULT_RESAMPLING = "nearest" +DEFAULT_SKIP_EMPTY = False # di default NON saltiamo tile +DEFAULT_BUFFER = 0 # buffer in pixel; auto-buffer se 0 +DEFAULT_BBOX_EPS = 0.001 # espansione bbox in gradi (100 m) + +# BBOX Italia (WGS84, lon/lat) approx +DEFAULT_ITALY_BBOX = (6.6, 36.6, 18.8, 47.3) # (W, S, E, N) + +# ============================== +# Utility +# ============================== + +def parse_bbox(bbox_str: Optional[str]) -> Optional[Tuple[float, float, float, float]]: + if not bbox_str: + return None + parts = bbox_str.split(",") + if len(parts) != 4: + raise ValueError("La bbox deve essere nel formato: minx,miny,maxx,maxy") + return tuple(map(float, parts)) # type: ignore + +def list_rasters_in_dir(input_dir: str) -> List[str]: + patterns = ["*.tif", "*.tiff", "*.TIF", "*.TIFF"] + files = [] + for p in patterns: + files.extend(glob.glob(os.path.join(input_dir, p))) + return sorted(files) + +def get_union_bounds(assets: List[str]) -> Tuple[float, float, float, float]: + """Unisce i bounds (lon/lat WGS84) dei COG/GeoTIFF. Aggiunge un piccolo epsilon.""" + minx = miny = math.inf + maxx = maxy = -math.inf + for a in assets: + with COGReader(a) as cog: + b = cog.bounds # (minx, miny, maxx, maxy) in EPSG:4326 + minx = min(minx, b[0]); miny = min(miny, b[1]) + maxx = max(maxx, b[2]); maxy = max(maxy, b[3]) + eps = 1e-10 + return (minx - eps, miny - eps, maxx + eps, maxy + eps) + +def expand_bbox(b: Tuple[float,float,float,float], eps: float) -> Tuple[float,float,float,float]: + """Espande la bbox di un epsilon in tutte le direzioni.""" + return (b[0]-eps, b[1]-eps, b[2]+eps, b[3]+eps) + +def intersect_bbox(a: Tuple[float, float, float, float], + b: Tuple[float, float, float, float]) -> Optional[Tuple[float, float, float, float]]: + """Intersezione di due bbox. Ritorna None se vuota.""" + minx = max(a[0], b[0]); miny = max(a[1], b[1]) + maxx = min(a[2], b[2]); maxy = min(a[3], b[3]) + if minx >= maxx or miny >= maxy: + return None + return (minx, miny, maxx, maxy) + +def reader(asset: str, x: int, y: int, z: int, **kwargs): + """Reader per mosaic_reader: restituisce un ImageData (o data/mask).""" + with COGReader(asset) as cog: + return cog.tile(x, y, z, **kwargs) + +def xyz_to_tms_y(z: int, y: int) -> int: + """Converte Y in TMS per MBTiles (capovolto).""" + return (2**z - 1) - y + +def init_mbtiles(path: str, metadata: dict, fast: bool = True) -> sqlite3.Connection: + """Crea un MBTiles vuoto con metadata. Se esiste, lo sovrascrive.""" + if os.path.exists(path): + os.remove(path) + con = sqlite3.connect(path) + cur = con.cursor() + + if fast: + cur.execute("PRAGMA synchronous=OFF;") + cur.execute("PRAGMA journal_mode=MEMORY;") + cur.execute("PRAGMA temp_store=MEMORY;") + + cur.executescript(""" + CREATE TABLE metadata (name TEXT, value TEXT); + CREATE TABLE tiles ( + zoom_level INTEGER, + tile_column INTEGER, + tile_row INTEGER, + tile_data BLOB + ); + CREATE UNIQUE INDEX tile_index ON tiles (zoom_level, tile_column, tile_row); + """) + + cur.executemany( + "INSERT INTO metadata (name, value) VALUES (?, ?)", + [(k, str(v)) for k, v in metadata.items()] + ) + con.commit() + return con + +def normalize_mask(mask: Any) -> Optional[np.ndarray]: + """ + Rende la mask un array 2D uint8 (0/255). + - Accetta: None, bool/uint8 array, o list/tuple contenenti array numerici. + - Ignora elementi non-numerici (es. liste di stringhe 'assets_used'). + """ + if mask is None: + return None + + def to_uint8_2d(arr: np.ndarray) -> Optional[np.ndarray]: + # 3D con 1 banda -> squeeze + if arr.ndim == 3 and arr.shape[0] == 1: + arr = arr[0] + elif arr.ndim == 3: + arr = arr.squeeze() + if arr.ndim != 2: + return None + # Porta a uint8 0/255 + if arr.dtype == np.uint8: + return arr + if arr.dtype == np.bool_: + return arr.astype(np.uint8) * 255 + if np.issubdtype(arr.dtype, np.number): + return (arr > 0).astype(np.uint8) * 255 + return None + + # Lista/Tupla -> OR per-pixel degli elementi numerici + if isinstance(mask, (list, tuple)): + masks: List[np.ndarray] = [] + for m in mask: + if m is None: + continue + arr = np.asarray(m) + if (not np.issubdtype(arr.dtype, np.number)) and (arr.dtype != np.bool_): + continue + conv = to_uint8_2d(arr) + if conv is not None: + masks.append(conv) + if not masks: + return None + # allinea shapes alla minima comune + h = min(m.shape[0] for m in masks) + w = min(m.shape[1] for m in masks) + if any((m.shape[0] != h or m.shape[1] != w) for m in masks): + masks = [m[:h, :w] for m in masks] + return np.maximum.reduce(masks) + + # Caso singolo array + arr = np.asarray(mask) + if (not np.issubdtype(arr.dtype, np.number)) and (arr.dtype != np.bool_): + return None + return to_uint8_2d(arr) + +def split_mosaic_output(out: Any) -> Tuple[np.ndarray, Optional[np.ndarray]]: + """ + Estrae (data, mask) dall'output di mosaic_reader a prova di versione: + - ImageData (rio_tiler >= 5/6): usa .data e .mask + - Tuple: (ImageData, assets_used) o (data, mask) o (data, assets_used, mask) + - Dict con 'data'/'mask' + """ + if ImageData is not None and isinstance(out, ImageData): + return out.data, out.mask + + if isinstance(out, dict) and "data" in out: + return out["data"], out.get("mask") + + if isinstance(out, (tuple, list)): + # (ImageData, assets_used) + if len(out) >= 1 and (ImageData is not None) and isinstance(out[0], ImageData): + img = out[0] + return img.data, img.mask + + arrays = [o for o in out if isinstance(o, np.ndarray)] + data = None + mask = None + for a in arrays: + if a.ndim == 3: + data = a + break + for a in arrays: + if a.ndim == 2: + mask = a + break + if mask is None: + for a in arrays: + if a.ndim == 3 and a.shape[0] == 1: + mask = a[0] + break + if data is None and arrays: + data = arrays[0] + if data is None: + raise RuntimeError("Impossibile determinare 'data' dalla risposta di mosaic_reader.") + return data, mask + + # fallback + try: + data, mask = out # type: ignore + return data, mask + except Exception: + raise RuntimeError("Impossibile determinare 'data'/'mask' dalla risposta di mosaic_reader (tipo sconosciuto).") + +def safe_mosaic_reader( + assets: List[str], + x: int, y: int, z: int, + tilesize: int, + resampling: str, + threads: int, + tile_buffer: int +) -> Any: + """ + Chiama mosaic_reader provando firme diverse (compatibilit tra versioni): + 1) mosaic_assets=..., tile_buffer=... + 2) mosaic_assets=... + 3) assets=..., tile_buffer=... + 4) assets=... + """ + base = dict( + reader=reader, + x=x, y=y, z=z, + pixel_selection=FirstMethod(), + tilesize=tilesize, + resampling_method=resampling, + threads=threads, + ) + # 1) mosaic_assets + tile_buffer + try: + return mosaic_reader(mosaic_assets=assets, **base, tile_buffer=tile_buffer) + except TypeError: + pass + # 2) mosaic_assets senza tile_buffer + try: + return mosaic_reader(mosaic_assets=assets, **base) + except TypeError: + pass + # 3) assets + tile_buffer + try: + return mosaic_reader(assets=assets, **base, tile_buffer=tile_buffer) + except TypeError: + pass + # 4) assets senza tile_buffer + return mosaic_reader(assets=assets, **base) + +# ============================== +# Core +# ============================== + +def generate_mbtiles_from_dir( + input_dir: str, + out_mbtiles: str, + minzoom: int = DEFAULT_MIN_ZOOM, + maxzoom: int = DEFAULT_MAX_ZOOM, + tilesize: int = DEFAULT_TILESIZE, + resampling: str = DEFAULT_RESAMPLING, + threads: int = DEFAULT_THREADS, + skip_empty: bool = DEFAULT_SKIP_EMPTY, + bbox: Optional[Tuple[float, float, float, float]] = None, + bbox_eps: float = DEFAULT_BBOX_EPS, + buffer: int = DEFAULT_BUFFER, +): + assets = list_rasters_in_dir(input_dir) + if not assets: + raise RuntimeError(f"Nessun file .tif trovato in: {input_dir}") + + # AOI: usa bbox se fornita (espansa), altrimenti union_bounds (espansa) + if bbox is None: + aoi = expand_bbox(get_union_bounds(assets), bbox_eps) + else: + aoi = expand_bbox(bbox, bbox_eps) + + # Metadati MBTiles + center_lon = (aoi[0] + aoi[2]) / 2.0 + center_lat = (aoi[1] + aoi[3]) / 2.0 + metadata = { + "name": f"Terrain-RGB z{minzoom}-{maxzoom}", + "type": "raster", + "format": "png", + "minzoom": minzoom, + "maxzoom": maxzoom, + "bounds": f"{aoi[0]},{aoi[1]},{aoi[2]},{aoi[3]}", + "center": f"{center_lon:.6f},{center_lat:.6f},{minzoom}", + } + + print(f"-> Trovati {len(assets)} raster in: {input_dir}") + print(f"-> AOI usata (WGS84, espansa di {bbox_eps}): {aoi}") + print(f"-> Zoom: {minzoom}..{maxzoom}, tilesize={tilesize}, resampling={resampling}, threads={threads}, buffer={buffer} (auto<=11)") + print(f"-> Output: {out_mbtiles}") + + con = init_mbtiles(out_mbtiles, metadata, fast=True) + cur = con.cursor() + + try: + for z in range(minzoom, maxzoom + 1): + # Auto-buffer: se non specificato, usa 2 px per zoom <= 11 + z_buffer = buffer + if z_buffer <= 0 and z <= 11: + z_buffer = 2 + + print(f"\n== Zoom {z} (tile_buffer={z_buffer}) ==") + tiles = list(mercantile.tiles(aoi[0], aoi[1], aoi[2], aoi[3], z)) + total = len(tiles) + for idx, t in enumerate(tiles, 1): + out = safe_mosaic_reader( + assets=assets, + x=t.x, y=t.y, z=z, + tilesize=tilesize, + resampling=resampling, + threads=threads, + tile_buffer=z_buffer, + ) + + data, raw_mask = split_mosaic_output(out) + mask = normalize_mask(raw_mask) + + # NON considerare mai mask=None come vuota; salta solo se mask presente e tutta 0 + if skip_empty and (mask is not None) and (mask.max() == 0): + continue + + png = render(data, mask=mask, img_format="PNG") + tms_y = xyz_to_tms_y(z, t.y) + + cur.execute( + "INSERT OR REPLACE INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?,?,?,?)", + (z, t.x, tms_y, sqlite3.Binary(png)) + ) + + if idx % 500 == 0 or idx == total: + print(f" {idx}/{total} tile...") + + con.commit() # commit per livello + + except KeyboardInterrupt: + print("\nInterrotto dall'utente. Scrittura parziale salvata.") + finally: + try: + cur.execute("ANALYZE;") + cur.execute("VACUUM;") + con.commit() + except Exception: + pass + con.close() + + print(f"\nCompletato: {out_mbtiles}") + +# ============================== +# CLI +# ============================== + +def main(): + ap = argparse.ArgumentParser( + description="Crea un MBTiles Terrain-RGB (PNG) mosaicato da tutti i TIF in una directory (z=5..14 di default)." + ) + ap.add_argument("input_dir", help="Directory con i TIF/TIFF RGB (COG o GeoTIFF).") + ap.add_argument("out_mbtiles", help="Percorso file .mbtiles da creare.") + ap.add_argument("--min-zoom", type=int, default=DEFAULT_MIN_ZOOM) + ap.add_argument("--max-zoom", type=int, default=DEFAULT_MAX_ZOOM) + ap.add_argument("--tilesize", type=int, default=DEFAULT_TILESIZE, choices=[256, 512]) + ap.add_argument("--threads", type=int, default=DEFAULT_THREADS) + ap.add_argument("--resampling", default=DEFAULT_RESAMPLING, choices=["nearest"]) + ap.add_argument("--skip-empty", action="store_true", default=DEFAULT_SKIP_EMPTY, + help="Se impostato, non scrive tile completamente vuote (mask=0).") + ap.add_argument("--bbox", type=str, default=None, + help="BBox WGS84 minx,miny,maxx,maxy (se non indicata, usa i bounds dei dati).") + ap.add_argument("--bbox-eps", type=float, default=DEFAULT_BBOX_EPS, + help="Espansione della bbox in gradi per includere tile al bordo (default 0.001).") + ap.add_argument("--buffer", type=int, default=DEFAULT_BUFFER, + help="Tile buffer in pixel (consigliato 1-2 per zoom bassi). Se 0, usa auto-buffer=2 per z<=11.") + args = ap.parse_args() + + bbox = parse_bbox(args.bbox) if args.bbox else None + + generate_mbtiles_from_dir( + input_dir=args.input_dir, + out_mbtiles=args.out_mbtiles, + minzoom=args.min_zoom, + maxzoom=args.max_zoom, + tilesize=args.tilesize, + resampling=args.resampling, + threads=args.threads, + skip_empty=args.skip_empty, + bbox=bbox, + bbox_eps=args.bbox_eps, + buffer=args.buffer, + ) + +if __name__ == "__main__": + main() \ No newline at end of file