HARVESTING AND RESAMPLING OF BUILT-UP DATA FROM WSF-2019#

This script downloads WSF 2019 binary data at 10m for a given country, resample it to 100 m and convert values on a 0 to 1 range.

import os
import requests
import geopandas as gpd
import shutil
from osgeo import gdal
from shapely.geometry import shape, MultiPolygon
from shapely.geometry.base import BaseGeometry
from shapely.ops import unary_union
from tqdm import tqdm  # For progress bar
from gdal_utils import merge_tifs, gdalwarp_wsf19, gdal_calc_wsf19  # Custom functions for geoprocessing
import concurrent.futures  # For threading

# Define the REST API URL
rest_api_url = "https://services.arcgis.com/iQ1dY19aHwbSDYIF/ArcGIS/rest/services/World_Bank_Global_Administrative_Divisions_VIEW/FeatureServer"

# Function to get the correct layer ID based on administrative level
def get_layer_id_for_adm(adm_level):
    layers_url = f"{rest_api_url}/layers"
    response = requests.get(layers_url, params={'f': 'json'})

    if response.status_code == 200:
        layers_info = response.json().get('layers', [])
        target_layer_name = f"WB_GAD_ADM{adm_level}"
        
        for layer in layers_info:
            if layer['name'] == target_layer_name:
                return layer['id']
        
        print(f"Layer matching {target_layer_name} not found.")
        return None
    else:
        print(f"Failed to fetch layers. Status code: {response.status_code}")
        return None

# Function to fetch the ADM data using the correct layer ID
def get_adm_data(country, adm_level):
    layer_id = get_layer_id_for_adm(adm_level)
    
    if layer_id is not None:
        query_url = f"{rest_api_url}/{layer_id}/query"
        params = {
            'where': f"ISO_A3 = '{country}'",
            'outFields': '*',
            'f': 'geojson'
        }
        
        response = requests.get(query_url, params=params)
        
        if response.status_code == 200:
            data = response.json()
            features = data.get('features', [])
            if features:
                geometry = [shape(feature['geometry']) for feature in features]
                properties = [feature['properties'] for feature in features]
                gdf = gpd.GeoDataFrame(properties, geometry=geometry)

                return gdf
            else:
                print("No features found for the specified query.")
                return None
        else:
            print(f"Error fetching data: {response.status_code}")
            return None
    else:
        print("Invalid administrative level or layer mapping not found.")
        return None

# Function to download files with progress bar
def download_file(url, dest_folder):
    if not os.path.exists(dest_folder):
        os.makedirs(dest_folder)

    local_filename = os.path.join(dest_folder, url.split('/')[-1])

    if not os.path.exists(local_filename):
        with requests.get(url, stream=True) as r:
            r.raise_for_status()
            with open(local_filename, 'wb') as f:
                for chunk in r.iter_content(chunk_size=8192):
                    f.write(chunk)

    return local_filename

# Mapping of administrative levels to field names
adm_field_mapping = {
    0: {'code': 'HASC_0', 'name': 'NAM_0'},
    1: {'code': 'HASC_1', 'name': 'NAM_1'},
    2: {'code': 'HASC_2', 'name': 'NAM_2'},
    # Add mappings for additional levels as needed
}

# SPECIFY COUNTRY TO DOWNLOAD
country = 'RWA'  # Country ISO_A3 code
adm_level = 2    # Administrative level (e.g., 2)

# Fetch the data
adm_data = get_adm_data(country, adm_level)

if adm_data is not None:
    # Step 1: Extract and combine the geometry into a single geometry
    ADM_area = adm_data.unary_union  # Combine all geometries into one

    if not isinstance(ADM_area, (MultiPolygon, BaseGeometry)):
        # If the union results in a non-polygonal shape, handle it accordingly
        ADM_area = MultiPolygon([ADM_area])

    # Convert to bounding box
    bbox = ADM_area.bounds  # (minx, miny, maxx, maxy)

    # Step 2: Construct the STAC search query
    search_query = {
        "bbox": list(bbox),  # [minx, miny, maxx, maxy]
        "collections": ["WSF_2019"],  # Assuming this is the correct collection name
        "limit": 100  # Adjust as needed
    }

    # Step 3: Send the POST request to the STAC API
    stac_search_url = "https://geoservice.dlr.de/eoc/ogc/stac/v1/search"
    headers = {
        "Content-Type": "application/json"
    }

    response = requests.post(stac_search_url, headers=headers, json=search_query)

    # Step 4: Check the response status and parse the results
    if response.status_code == 200:
        search_results = response.json()
        items = search_results.get("features", [])
        if items:
            print(f"Found {len(items)} items.")
            # Directory to save downloaded files
            subfolder_name = f"{country}_tifs"
            download_folder = os.path.join(f"wd/data/{country}_WSF_2019/", subfolder_name)
            if not os.path.exists(download_folder):
                os.makedirs(download_folder)

            # Step 5: Sequentially download .tif assets into the subdirectory
            tif_files = []
            total_files = len([asset for item in items for asset in item['assets'].values() if asset['href'].endswith('.tif')])
            
            with tqdm(total=total_files, desc="Downloading .tif files") as pbar:
                for item in items:
                    for asset_key, asset_value in item['assets'].items():
                        if asset_value['href'].endswith('.tif'):
                            tif_file = download_file(asset_value['href'], download_folder)
                            tif_files.append(tif_file)
                            pbar.update(1)
                            
            # Step 6: Mosaic the downloaded .tif files using the subdirectory
            merged_tif_path = os.path.join(download_folder, f"{subfolder_name}.tif")
            output_filename = os.path.join(f"wd/data/{country}_WSF_2019/", f"{country}_WSF-2019.tif")
            if tif_files and not os.path.exists(output_filename):
                print("Mosaicing downloaded .tif files...")
                merge_tifs(download_folder)
                os.rename(merged_tif_path, output_filename)
            else:
                print("Mosaic already exists, skipping mosaicing.")  
                
            # Step 7: Upscale and normalize the mosaiced file
            input_file = output_filename
            output_file = os.path.join(f"wd/data/{country}_WSF_2019/", f"{country}_WSF-2019_100m.tif")
            output_calc_file = os.path.join(f"wd/data/{country}_WSF_2019/", f"{country}_BU.tif")
                
            if not os.path.exists(output_file):
                print("Resampling WSF 2019 to 100m...")
                gdalwarp_wsf19(input_file, output_file)
            else:
                print(f"{output_file} already exists, skipping upscaling.")
            
            if not os.path.exists(output_calc_file):
                print("Normalising WSF 2019 range [0 to 1]")
                gdal_calc_wsf19(output_file, output_calc_file)
                print(f"Mosaiced and Upscaled file saved as {output_calc_file}")
            else:
                print(f"{output_calc_file} already exists, skipping normalization.")

            # Delete the tiles subfolder after mosaicing
            if os.path.exists(download_folder):
                shutil.rmtree(download_folder)
                print(f"Deleted temporary folder: {download_folder}")
        else:
            print("No items found for the specified query.")
    else:
        print(f"Error {response.status_code}: {response.text}")
else:
    print("Missing ADM data!")
Found 4 items.
Downloading .tif files: 100%|████████████████████████████████████████████████████████████████████| 4/4 [00:00<?, ?it/s]
Mosaicing downloaded .tif files...

Resampling WSF 2019 to 100m...
Normalising WSF 2019 range [0 to 1]
Mosaiced and Upscaled file saved as wd/data/RWA_WSF_2019/RWA_BU.tif
Deleted temporary folder: wd/data/RWA_WSF_2019/RWA_tifs