RWI normalisation using population density#
Instructions#
Download this notebook and put it in your working directory with the rest of the script, together with common.py, input_utils.py and .env.
Run the notebook in RDL-tools environment
Download the RWI data for the country of interest from HDX
Run the code below and use the interface to select the country area, the boundaries level for aggregation, the RWI data and the population data to use for normalisation.
Output is plotted on the map, results are saved as geopackage in the output folder
import tkinter as tk
from tkinter import filedialog
import ipywidgets as widgets
from IPython.display import display, clear_output
import geopandas as gpd
import pandas as pd
import rasterio
from rasterio.warp import reproject, Resampling
from rasterio.features import rasterize
from rasterio import transform
import numpy as np
import os
import sys
import warnings
import folium
from branca.colormap import LinearColormap
from rasterstats import zonal_stats
# Import from your existing codebase
from common import DATA_DIR
from input_utils import get_adm_data
# Read the countries list file
df = pd.read_csv('countries.csv')
# Create dictionaries mapping country names to ISO_A3 codes and vice versa
country_dict = dict(zip(df['NAM_0'], df['ISO_A3']))
iso_to_country = dict(zip(df['ISO_A3'], df['NAM_0']))
# Function to select file using tkinter
def select_file():
root = tk.Tk()
root.attributes('-topmost', True)
root.geometry("1x1+0+0")
root.withdraw()
root.after(0, lambda: root.focus_force())
file_selected = filedialog.askopenfilename(master=root)
root.destroy()
return file_selected
# GUI elements
country_selector = widgets.Combobox(
placeholder='Type country name...',
options=list(country_dict.keys()),
description='Country:',
layout=widgets.Layout(width='300px'),
ensure_option=True
)
adm_level_selector = widgets.Dropdown(
options=[(str(i), i) for i in range(1, 4)],
value=1,
description='Admin Level:',
layout=widgets.Layout(width='300px')
)
select_pop_file_button = widgets.Button(
description='Select Population File',
button_style='info',
layout=widgets.Layout(width='300px')
)
pop_file_path_display = widgets.Label(
value='No file selected',
layout=widgets.Layout(width='500px')
)
select_rwi_file_button = widgets.Button(
description='Select RWI CSV File',
button_style='info',
layout=widgets.Layout(width='300px')
)
rwi_file_path_display = widgets.Label(
value='No file selected',
layout=widgets.Layout(width='500px')
)
run_button = widgets.Button(
description='Run Normalisation',
button_style='danger',
layout=widgets.Layout(width='300px')
)
output = widgets.Output()
map_output = widgets.Output()
# Function to handle file selection
def on_select_pop_file_button_clicked(b):
file_path = select_file()
if file_path:
pop_file_path_display.value = file_path
else:
pop_file_path_display.value = 'No file selected'
def on_select_rwi_file_button_clicked(b):
file_path = select_file()
if file_path:
rwi_file_path_display.value = file_path
else:
rwi_file_path_display.value = 'No file selected'
select_pop_file_button.on_click(on_select_pop_file_button_clicked)
select_rwi_file_button.on_click(on_select_rwi_file_button_clicked)
# Plot function
def plot_results(result_df, country, adm_level, nRWI, analysis_type):
# Convert result_df to GeoDataFrame if it's not already
if not isinstance(result_df, gpd.GeoDataFrame):
result_df = gpd.GeoDataFrame(result_df, geometry='geometry')
# Ensure the CRS is EPSG:4326
result_df = result_df.to_crs(epsg=4326)
# Determine the key column
key_column = f'HASC_{adm_level}'
# Use the nRWI column
column = nRWI
# Set fixed min and max values for the colormap
vmin, vmax = -1.5, 1.5
# Create a custom colormap
colors = ['#FF0000', '#FFFF00', '#00FF00'] # Red, Yellow, Green
colormap = LinearColormap(colors=colors, vmin=vmin, vmax=vmax, index=[-1.5, 0, 1.5])
# Create a style function that colors features based on their value
def style_function(feature):
value = feature['properties'][column]
return {
'fillColor': colormap(value),
'fillOpacity': 0.7,
'color': 'black',
'weight': 1,
}
# Suppress the specific warning
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message="Geometry is in a geographic CRS")
# Calculate the center coordinates
center_y = result_df.geometry.centroid.y.mean()
center_x = result_df.geometry.centroid.x.mean()
# Create a map centered on the mean of the centroids
m = folium.Map(location=[center_y, center_x], zoom_start=6)
# Add the GeoJson layer to the map
folium.GeoJson(
result_df,
style_function=style_function,
name=column
).add_to(m)
# Add colormap to the map
colormap.add_to(m)
colormap.caption = f'{column} (-1.5 to +1.5)'
return m
# Main analysis function
def run_analysis(b):
with output:
clear_output()
print("Starting analysis...")
selected_country = country_selector.value
country = country_dict[selected_country]
adm_level = adm_level_selector.value
pop_path = pop_file_path_display.value # Correct variable name
rwi_path = rwi_file_path_display.value
if pop_path == 'No file selected':
print("Error: No population file selected.")
return
if rwi_path == 'No file selected':
print("Error: No RWI CSV file selected.")
return
# Step 1: Process RWI CSV
try:
rwi_df = pd.read_csv(rwi_path)
rwi_gdf = gpd.GeoDataFrame(
rwi_df,
geometry=gpd.points_from_xy(rwi_df['longitude'], rwi_df['latitude'])
)
rwi_gdf.set_crs('epsg:4326', inplace=True)
print("RWI CSV loaded and converted to GeoDataFrame.")
except Exception as e:
print(f"Error processing RWI CSV: {str(e)}")
return
# Step 2: Get boundary data
adm_data = get_adm_data(country, adm_level)
if adm_data.crs is None:
adm_data.set_crs(epsg=4326, inplace=True)
print(f"Boundary data for {selected_country} at ADM{adm_level} level retrieved.")
# Step 3: Process population data
try:
with rasterio.open(pop_path) as src: # Using the correct pop_path variable
pop_data = src.read(1)
pop_meta = src.meta.copy()
pop_data = pop_data.astype('float32')
print("Population raster loaded.")
except Exception as e:
print(f"Error processing population file: {str(e)}")
return
# Step 4: Rasterize RWI data
try:
# Get the extent of the population raster
bounds = rasterio.transform.array_bounds(pop_meta['height'], pop_meta['width'], pop_meta['transform'])
# Create a new transform for the RWI raster
resolution = 0.021972700
new_width = int((bounds[2] - bounds[0]) / resolution)
new_height = int((bounds[3] - bounds[1]) / resolution)
new_transform = rasterio.transform.from_bounds(*bounds, new_width, new_height)
# Rasterize the RWI points
rwi_raster = rasterize(
[(geom, value) for geom, value in zip(rwi_gdf.geometry, rwi_gdf['rwi'])],
out_shape=(new_height, new_width),
transform=new_transform,
fill=-99999,
dtype='float32'
)
print("RWI data rasterized.")
except Exception as e:
print(f"Error rasterizing RWI data: {str(e)}")
return
# Save the RWI raster
rwi_output_path = os.path.join(DATA_DIR, "Output", f"{country}_rwi_rast.tif")
with rasterio.open(rwi_output_path, 'w', **pop_meta) as dst:
dst.write(rwi_raster, 1)
print(f"Raster RWI saved to: {rwi_output_path}")
# Step 5: Normalize RWI with population and calculate nRWI for ADM areas
try:
# Read the population data
with rasterio.open(pop_path) as src:
pop_data = src.read(1)
pop_meta = src.meta
pop_nodata = src.nodata
# Read the RWI raster
with rasterio.open(rwi_output_path) as src:
rwi_data = src.read(1)
rwi_nodata = src.nodata
# Ensure pop_data and rwi_data have the same shape
assert pop_data.shape == rwi_data.shape, "Population and RWI rasters have different shapes"
print(f"Population data shape: {pop_data.shape}")
print(f"RWI data shape: {rwi_data.shape}")
# Define nodata value for output
out_nodata = -99999
# Multiply RWI and population
rwi_norm = np.where((pop_data > 0) & (pop_data != pop_nodata) & (rwi_data != rwi_nodata),
rwi_data * pop_data,
out_nodata)
print("RWI multiplied with population.")
# Save RWI_norm for inspection
rwi_norm_output_path = os.path.join(DATA_DIR, "Output", f"{country}_rwi_norm.tif")
with rasterio.open(rwi_norm_output_path, 'w', **pop_meta) as dst:
dst.write(rwi_norm, 1)
print(f"RWI_norm saved to: {rwi_norm_output_path}")
# Calculate zonal statistics
rwi_norm_sum = zonal_stats(adm_data, rwi_norm, affine=pop_meta['transform'], stats=['sum'], nodata=out_nodata)
pop_sum = zonal_stats(adm_data, pop_data, affine=pop_meta['transform'], stats=['sum'], nodata=pop_nodata)
# Calculate nRWI for each ADM area
adm_data['nRWI'] = [rwi_sum['sum'] / pop_sum['sum'] if pop_sum['sum'] > 0 else out_nodata
for rwi_sum, pop_sum in zip(rwi_norm_sum, pop_sum)]
print("nRWI calculated for each ADM area.")
# Save results
output_path = os.path.join(DATA_DIR, "Output", f"{country}_ADM{adm_level}_nRWI.gpkg")
adm_data.to_file(output_path, driver="GPKG")
print(f"Results saved to {output_path}")
# Print some statistics for verification
valid_nrwi = [nrwi for nrwi in adm_data['nRWI'] if nrwi != out_nodata]
if valid_nrwi:
print(f"nRWI statistics:")
print(f" Min: {min(valid_nrwi)}")
print(f" Max: {max(valid_nrwi)}")
print(f" Mean: {sum(valid_nrwi) / len(valid_nrwi)}")
print(f" Median: {sorted(valid_nrwi)[len(valid_nrwi)//2]}")
else:
print("No valid nRWI values calculated.")
except Exception as e:
print(f"Error normalizing RWI with population: {str(e)}")
import traceback
traceback.print_exc()
return
# Plot results
try:
with map_output:
clear_output()
m = plot_results(adm_data, country, adm_level, 'nRWI', 'Function')
if m is not None:
display(m)
else:
print("No map to display.")
print("Results plotted on map.")
except Exception as e:
print(f"Error plotting results: {str(e)}")
import traceback
traceback.print_exc() # This will print the full traceback for debugging
return
print("Analysis completed successfully.")
# Connect button to analysis function
run_button.on_click(run_analysis)
# Display GUI
display(widgets.VBox([
country_selector,
adm_level_selector,
select_pop_file_button,
pop_file_path_display,
select_rwi_file_button,
rwi_file_path_display,
run_button,
output,
map_output
]))