EMDAT Disaster Data Dashboard - Enhanced with Leafmap#
Interactive Dashboard with Quantile-Based Choropleth Maps#
This dashboard provides comprehensive analysis of EMDAT disaster data with enhanced subnational visualization using Leafmap for superior legend control and quantile-based classification.
Features:#
Country Selection: Choose specific countries or view global data, administrative levels: ADM1 and ADM2
Time Range Filtering: Adjust the year range using the slider
Multiple Visualization Tabs: Overview, disaster types, temporal trends, loss analysis, and subnational analysis
Choropleth Maps: Quantile-based classification (5 classes) for multiple metrics: Deaths, Affected Population, Disaster Count.
Required input:#
EMDAT database: download the latest global dataset of natural disasters from EMDAT or use this export: emdat_2025.xlsx
ADM units boundaries: EMDAT geomapping is based on GAUL 2015 dataset. A refined version of the dataset produced for this tool is available here: ADM_GAUL.gpkg
Usage Tips:#
For best choropleth results: Select countries with high subnational data coverage
Legend management: Toggle layers on/off to show only relevant legends
Interactive maps: Click on administrative units for detailed information
Performance: Large datasets may take a few seconds to render choropleth maps
## CONFIGURATION PARAMETERS
EXCEL_FILE_PATH = 'emdat_2025.xlsx' # Set your EMDAT Excel file path here
GPKG_FILE_PATH = 'X:/Work/Geodata/ADM/ADM_GAUL.gpkg' # Set your ADM units GPKG file path here
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import folium
import ipywidgets as widgets
from IPython.display import display, HTML, clear_output
import json
import warnings
import re
import os
warnings.filterwarnings('ignore')
# Optional: For advanced mapping
try:
import geopandas as gpd
GEOPANDAS_AVAILABLE = True
print("β
GeoPandas available - choropleth maps enabled")
except ImportError:
GEOPANDAS_AVAILABLE = False
print("β GeoPandas not available - install with: pip install geopandas")
try:
from scipy import stats
SCIPY_AVAILABLE = True
except ImportError:
SCIPY_AVAILABLE = False
print("SciPy not installed. Some trend analysis features will be limited.")
class EMDATDashboard:
"""Main class for EMDAT disaster data dashboard"""
def __init__(self, excel_file_path):
"""Initialize dashboard with EMDAT Excel file"""
self.file_path = excel_file_path
self.df = None
self.selected_country = None
self.load_data()
def load_data(self):
"""Load EMDAT data from Excel file"""
try:
print(f"Loading data from {self.file_path}...")
self.df = pd.read_excel(self.file_path)
print(f"Data loaded successfully!")
print(f"Shape: {self.df.shape}")
print(f"Years covered: {self.df['Start Year'].min()} - {self.df['Start Year'].max()}")
print(f"Countries: {self.df['ISO'].nunique()}")
print(f"Disaster types: {self.df['Disaster Type'].nunique()}")
self.prepare_data()
except Exception as e:
print(f"Error loading data: {e}")
print("Please ensure your EMDAT Excel file is in the correct location")
def prepare_data(self):
"""Clean and prepare the data for analysis"""
# Handle missing values
numeric_columns = ['Total Deaths', 'Total Affected', 'No Injured',
'No Affected', 'No Homeless', 'Total Damages (\'000 US$)',
'Insured Damages (\'000 US$)', 'Reconstruction Costs (\'000 US$)']
for col in numeric_columns:
if col in self.df.columns:
self.df[col] = pd.to_numeric(self.df[col], errors='coerce')
self.df[col] = self.df[col].fillna(0)
# Create decade column for temporal analysis
self.df['Decade'] = (self.df['Start Year'] // 10) * 10
# Create simplified disaster categories
self.df['Disaster Category'] = self.df['Disaster Type'].apply(self.categorize_disaster)
# Calculate total losses (deaths + affected)
self.df['Total Losses'] = self.df['Total Deaths'] + self.df['Total Affected']
print("Data preparation completed!")
def categorize_disaster(self, disaster_type):
"""Categorize disasters into broader groups"""
if pd.isna(disaster_type):
return 'Other'
disaster_type = str(disaster_type).lower()
if any(word in disaster_type for word in ['flood', 'storm', 'cyclone', 'hurricane', 'typhoon']):
return 'Hydrometeorological'
elif any(word in disaster_type for word in ['earthquake', 'volcanic', 'landslide']):
return 'Geophysical'
elif any(word in disaster_type for word in ['drought', 'extreme temperature', 'wildfire']):
return 'Climatological'
elif any(word in disaster_type for word in ['epidemic', 'infestation']):
return 'Biological'
else:
return 'Other'
def parse_admin_units(admin_units_str):
"""Parse the JSON-formatted Admin Units string"""
if pd.isna(admin_units_str) or admin_units_str == '':
return []
try:
admin_str = str(admin_units_str).strip()
if admin_str.startswith('[') and admin_str.endswith(']'):
admin_data = json.loads(admin_str)
if isinstance(admin_data, list):
return admin_data
else:
return [admin_data]
elif admin_str.startswith('{') and admin_str.endswith('}'):
admin_data = json.loads(admin_str)
return [admin_data]
else:
admin_data = json.loads(admin_str)
return admin_data if isinstance(admin_data, list) else [admin_data]
except (json.JSONDecodeError, ValueError) as e:
# Fallback to regex parsing
try:
adm1_matches = re.findall(r'"adm1_name"\s*:\s*"([^"]+)"', str(admin_units_str))
adm1_codes = re.findall(r'"adm1_code"\s*:\s*(\d+)', str(admin_units_str))
adm2_matches = re.findall(r'"adm2_name"\s*:\s*"([^"]+)"', str(admin_units_str))
adm2_codes = re.findall(r'"adm2_code"\s*:\s*(\d+)', str(admin_units_str))
result = []
for i, name in enumerate(adm1_matches):
code = adm1_codes[i] if i < len(adm1_codes) else None
result.append({
"adm1_name": name,
"adm1_code": int(code) if code else None
})
for i, name in enumerate(adm2_matches):
code = adm2_codes[i] if i < len(adm2_codes) else None
result.append({
"adm2_name": name,
"adm2_code": int(code) if code else None
})
return result if result else []
except Exception:
return []
# ============================================================================
# DYNAMIC COUNTRY MAPPINGS FROM BOUNDARY FILE
# ============================================================================
# Global cache for country mappings to avoid reading file multiple times
_country_mappings_cache = {}
def build_country_mappings_from_boundaries(gpkg_file_path):
"""
Build dynamic country mappings from the ADM_0 layer in the boundary file.
Returns iso_to_gaul and iso_to_names mappings extracted from the data.
"""
global _country_mappings_cache
# Check if mappings are already cached
if gpkg_file_path in _country_mappings_cache:
return _country_mappings_cache[gpkg_file_path]
try:
print(f" π Building dynamic country mappings from {gpkg_file_path}...")
# Check if ADM_0 layer exists
available_layers = gpd.list_layers(gpkg_file_path)
if hasattr(available_layers, 'name'):
layer_names = available_layers['name'].tolist()
else:
layer_names = available_layers
# Try different possible ADM_0 layer names
adm0_layer_names = ['ADM_0', 'ADM0', 'admin_0', 'level_0', 'countries']
adm0_layer = None
for layer_name in adm0_layer_names:
if layer_name in layer_names:
adm0_layer = layer_name
break
if not adm0_layer:
print(f" β οΈ No ADM_0 layer found in available layers: {layer_names}")
return None, None
# Read ADM_0 layer
print(f" π Reading {adm0_layer} layer...")
adm0_gdf = gpd.read_file(gpkg_file_path, layer=adm0_layer)
print(f" π Loaded {len(adm0_gdf)} country records")
# Show available columns for debugging
print(f" π Available columns: {list(adm0_gdf.columns)}")
# Initialize mappings
iso_to_gaul = {}
iso_to_names = {}
# Define possible column name variations
iso_columns = ['ISO3166_a3', 'ISO_a3', 'ISO3', 'ISO', 'ADM0_A3']
gaul_columns = ['ADM0_CODE', 'GAUL_CODE', 'ADM0_CD', 'GAUL']
name_columns = ['ADM0_NAME', 'NAME_EN', 'COUNTRY', 'NAME', 'ADM0_NAME_EN']
# Find the actual column names
iso_col = None
gaul_col = None
name_col = None
for col in iso_columns:
if col in adm0_gdf.columns:
iso_col = col
break
for col in gaul_columns:
if col in adm0_gdf.columns:
gaul_col = col
break
for col in name_columns:
if col in adm0_gdf.columns:
name_col = col
break
print(f" π― Using columns - ISO: {iso_col}, GAUL: {gaul_col}, NAME: {name_col}")
if not iso_col:
print(f" β No ISO column found in {iso_columns}")
return None, None
if not gaul_col:
print(f" β No GAUL code column found in {gaul_columns}")
return None, None
if not name_col:
print(f" β No name column found in {name_columns}")
return None, None
# Build mappings from the data
mapping_count = 0
for idx, row in adm0_gdf.iterrows():
iso_code = row.get(iso_col)
gaul_code = row.get(gaul_col)
country_name = row.get(name_col)
# Skip rows with missing essential data
if pd.isna(iso_code) or pd.isna(gaul_code) or pd.isna(country_name):
continue
# Clean and standardize the values
iso_code = str(iso_code).strip().upper()
gaul_code = int(gaul_code) if pd.notna(gaul_code) else None
country_name = str(country_name).strip()
# Only process valid ISO codes (3 characters)
if len(iso_code) == 3 and gaul_code is not None:
iso_to_gaul[iso_code] = gaul_code
# Store country name as a list for consistency with original structure
iso_to_names[iso_code] = [country_name]
mapping_count += 1
print(f" β
Built mappings for {mapping_count} countries")
# Cache the results
_country_mappings_cache[gpkg_file_path] = (iso_to_gaul, iso_to_names)
# Show sample mappings for verification
sample_isos = list(iso_to_gaul.keys())[:5]
print(f" π Sample mappings:")
for iso in sample_isos:
print(f" {iso}: GAUL={iso_to_gaul[iso]}, NAME={iso_to_names[iso][0]}")
return iso_to_gaul, iso_to_names
except Exception as e:
print(f" β Error building country mappings: {e}")
return None, None
def get_country_boundaries_robust(boundaries_gdf, country_iso, country_name, gpkg_file_path=None):
"""Get country boundaries using multiple fallback methods - DYNAMIC VERSION"""
try:
print(f" π Searching for {country_iso} boundaries using dynamic mappings...")
# Method 1: Try ISO_a3 field (most common)
if 'ISO_a3' in boundaries_gdf.columns:
print(f" π Trying ISO_a3 field...")
country_boundaries = boundaries_gdf[boundaries_gdf['ISO_a3'] == country_iso].copy()
if not country_boundaries.empty:
print(f" β
Found {len(country_boundaries)} boundaries using ISO_a3")
return country_boundaries
# Method 2: Try other ISO fields
for iso_field in ['ISO3166_a3', 'ADM0_A3', 'ISO', 'ISO3', 'COUNTRY_ISO']:
if iso_field in boundaries_gdf.columns:
print(f" π Trying {iso_field} field...")
country_boundaries = boundaries_gdf[boundaries_gdf[iso_field].astype(str).str.upper() == country_iso.upper()].copy()
if not country_boundaries.empty:
print(f" β
Found {len(country_boundaries)} boundaries using {iso_field}")
return country_boundaries
# Method 3: Try ADM0_NAME with DYNAMIC country mapping
if 'ADM0_NAME' in boundaries_gdf.columns and gpkg_file_path:
print(f" π Trying ADM0_NAME field with dynamic country name mapping...")
# Get dynamic mappings from the boundary file
iso_to_gaul, iso_to_names = build_country_mappings_from_boundaries(gpkg_file_path)
if iso_to_names and country_iso in iso_to_names:
for country_name_variant in iso_to_names[country_iso]:
country_boundaries = boundaries_gdf[boundaries_gdf['ADM0_NAME'].str.contains(country_name_variant, case=False, na=False)].copy()
if not country_boundaries.empty:
print(f" β
Found {len(country_boundaries)} boundaries using ADM0_NAME: {country_name_variant}")
return country_boundaries
else:
print(f" β οΈ Dynamic name mapping not available or {country_iso} not found")
# Method 4: Try GAUL codes with DYNAMIC mapping
if 'ADM0_CODE' in boundaries_gdf.columns and gpkg_file_path:
print(f" π Trying GAUL codes with dynamic mapping...")
# Get dynamic mappings from the boundary file
iso_to_gaul, iso_to_names = build_country_mappings_from_boundaries(gpkg_file_path)
if iso_to_gaul and country_iso in iso_to_gaul:
gaul_code = iso_to_gaul[country_iso]
country_boundaries = boundaries_gdf[boundaries_gdf['ADM0_CODE'] == gaul_code].copy()
if not country_boundaries.empty:
print(f" β
Found {len(country_boundaries)} boundaries using dynamic GAUL code: {gaul_code}")
return country_boundaries
else:
print(f" β οΈ Dynamic GAUL mapping not available or {country_iso} not found")
print(f" β No boundaries found for {country_iso} using any method")
# Debug: Show available country identifiers
print(f" π Debug - Available fields in GPKG:")
for col in ['ADM0_NAME', 'ADM0_CODE', 'ADM0_A3', 'ISO_a3', 'ISO3166_a3', 'ISO', 'ISO3']:
if col in boundaries_gdf.columns:
sample_values = boundaries_gdf[col].dropna().unique()[:5]
print(f" {col}: {sample_values}")
return None
except Exception as e:
print(f" β Error in dynamic boundary matching: {e}")
return None
def create_merged_boundary_data(admin_data, level, country_iso, country_name, gpkg_file_path):
"""Create merged boundary data - DYNAMIC VERSION with improved country matching"""
try:
# Determine GPKG layer and code column
if level == 'ADM1':
gpkg_layer = 'ADM_1'
code_col = 'ADM1_CODE'
name_col = 'ADM1_NAME'
elif level == 'ADM2':
gpkg_layer = 'ADM_2'
code_col = 'ADM2_CODE'
name_col = 'ADM2_NAME'
else:
print(f" β Unsupported admin level: {level}")
return None
print(f" π Loading {gpkg_layer} boundaries for {country_iso}...")
# Check if the layer exists in the GPKG file
try:
available_layers = gpd.list_layers(gpkg_file_path)
if hasattr(available_layers, 'name'):
layer_names = available_layers['name'].tolist()
else:
layer_names = available_layers
if gpkg_layer not in layer_names:
print(f" β Layer {gpkg_layer} not found in GPKG")
print(f" π Available layers: {layer_names}")
# Try alternative layer names
alt_names = {
'ADM_1': ['ADM1', 'admin_1', 'adm1', 'level_1'],
'ADM_2': ['ADM2', 'admin_2', 'adm2', 'level_2']
}
if gpkg_layer in alt_names:
for alt_name in alt_names[gpkg_layer]:
if alt_name in layer_names:
print(f" π Using alternative layer name: {alt_name}")
gpkg_layer = alt_name
break
else:
print(f" β No suitable alternative layer found")
return None
except Exception as e:
print(f" β οΈ Could not check layer availability: {e}")
boundaries_gdf = gpd.read_file(gpkg_file_path, layer=gpkg_layer)
print(f" π Loaded {len(boundaries_gdf)} boundaries from {gpkg_layer}")
# Get country boundaries using DYNAMIC method
country_boundaries = get_country_boundaries_robust(boundaries_gdf, country_iso, country_name, gpkg_file_path)
if country_boundaries is None or country_boundaries.empty:
print(f" β No boundaries found for {country_iso} in {gpkg_layer}")
return None
print(f" β
Found {len(country_boundaries)} {level} boundaries")
# Aggregate admin data
admin_summary = admin_data.groupby(['Admin Code', 'Admin Unit']).agg({
'Deaths': 'sum',
'Affected': 'sum',
'Damage (000 USD)': 'sum',
'Year': 'count'
}).reset_index()
admin_summary.columns = ['Admin Code', 'Admin Unit', 'Total Deaths', 'Total Affected', 'Total Damage', 'Disaster Count']
# Remove null codes and convert to string
admin_summary = admin_summary[admin_summary['Admin Code'].notna()].copy()
admin_summary['Admin Code'] = admin_summary['Admin Code'].astype(str).str.strip()
country_boundaries[code_col] = country_boundaries[code_col].astype(str).str.strip()
# Merge with boundaries
merged_data = country_boundaries.merge(
admin_summary,
left_on=code_col,
right_on='Admin Code',
how='left'
)
# Fill NaN values
for col in ['Total Deaths', 'Total Affected', 'Total Damage', 'Disaster Count']:
merged_data[col] = merged_data[col].fillna(0)
# Add admin names from boundaries if missing
if name_col in merged_data.columns:
merged_data['Admin Unit'] = merged_data['Admin Unit'].fillna(merged_data[name_col])
matched_count = len(merged_data[merged_data['Disaster Count'] > 0])
print(f" β
Matched {matched_count}/{len(merged_data)} boundaries with disaster data")
return merged_data
except Exception as e:
print(f" β Error creating merged boundary data: {e}")
return None
# ============================================================================
# UPDATED CHOROPLETH FUNCTION WITH DYNAMIC MAPPINGS
# ============================================================================
def create_working_multi_layer_choropleth(admin_df, original_df, gpkg_file_path):
"""
DYNAMIC VERSION: Create a working multi-layer choropleth map with 6 layers
using dynamic country mappings from the boundary file
"""
if not GEOPANDAS_AVAILABLE:
print("β GeoPandas required for choropleth maps")
return create_fallback_chart(admin_df)
if not os.path.exists(gpkg_file_path):
print(f"β GPKG file not found: {gpkg_file_path}")
return create_fallback_chart(admin_df)
if admin_df.empty:
print("β No admin data available")
return create_fallback_chart(admin_df)
country_iso = admin_df['ISO'].iloc[0]
country_name = admin_df['Country'].iloc[0]
print(f"πΊοΈ Creating DYNAMIC multi-layer choropleth for {country_name}...")
# Build dynamic country mappings from boundary file
print(f" π Building dynamic country mappings...")
iso_to_gaul, iso_to_names = build_country_mappings_from_boundaries(gpkg_file_path)
if iso_to_gaul and country_iso in iso_to_gaul:
print(f" β
Found dynamic mapping: {country_iso} -> GAUL {iso_to_gaul[country_iso]} ({iso_to_names[country_iso][0]})")
else:
print(f" β οΈ No dynamic mapping found for {country_iso}, will use fallback methods")
# Add comprehensive boundary file debugging info
try:
print(f" π Checking boundary file: {gpkg_file_path}")
available_layers = gpd.list_layers(gpkg_file_path)
if hasattr(available_layers, 'name'):
layer_names = available_layers['name'].tolist()
else:
layer_names = available_layers
print(f" π Available layers: {layer_names}")
# Quick column check for ADM_1
if 'ADM_1' in layer_names:
sample_adm1 = gpd.read_file(gpkg_file_path, layer='ADM_1', rows=1)
print(f" π Sample ADM_1 columns: {list(sample_adm1.columns)}")
except Exception as e:
print(f" β οΈ Could not analyze boundary file: {e}")
try:
# STEP 1: Prepare data by admin level
adm1_data = admin_df[admin_df['Admin Level'] == 'ADM1'].copy()
adm2_data = admin_df[admin_df['Admin Level'] == 'ADM2'].copy()
# Get merged boundary data for each level using DYNAMIC method
merged_datasets = {}
if not adm1_data.empty:
print(f" ποΈ Processing ADM1 data ({len(adm1_data)} records)...")
adm1_merged = create_merged_boundary_data(adm1_data, 'ADM1', country_iso, country_name, gpkg_file_path)
if adm1_merged is not None and not adm1_merged.empty:
merged_datasets['ADM1'] = adm1_merged
print(f" β
ADM1 merged successfully")
else:
print(f" β ADM1 merge failed")
if not adm2_data.empty:
print(f" ποΈ Processing ADM2 data ({len(adm2_data)} records)...")
adm2_merged = create_merged_boundary_data(adm2_data, 'ADM2', country_iso, country_name, gpkg_file_path)
if adm2_merged is not None and not adm2_merged.empty:
merged_datasets['ADM2'] = adm2_merged
print(f" β
ADM2 merged successfully")
else:
print(f" β ADM2 merge failed")
if not merged_datasets:
print("β No merged boundary data available - check GPKG file and country codes")
print(f"π‘ Country: {country_name} ({country_iso})")
print(f"π‘ GPKG file: {gpkg_file_path}")
return create_fallback_chart(admin_df)
print(f" β
Successfully merged {len(merged_datasets)} admin levels using DYNAMIC mappings")
# Continue with the rest of the choropleth creation (same as before)...
# [Rest of the function remains unchanged - just using the dynamic merged data]
# STEP 2: Create base map with proper setup
# Calculate center from first available dataset
first_dataset = list(merged_datasets.values())[0]
if first_dataset is None:
print("β First dataset is None")
return create_fallback_chart(admin_df)
bounds = first_dataset.total_bounds
center_lat = (bounds[1] + bounds[3]) / 2
center_lon = (bounds[0] + bounds[2]) / 2
# Calculate zoom
lat_range = bounds[3] - bounds[1]
lon_range = bounds[2] - bounds[0]
max_range = max(lat_range, lon_range)
zoom = 4 if max_range > 15 else 5 if max_range > 8 else 6 if max_range > 4 else 7
# Initialize map without default tiles
m = folium.Map(
location=[center_lat, center_lon],
zoom_start=zoom,
tiles=None,
width='100%',
height='600px'
)
# Add base tile layers
folium.TileLayer('OpenStreetMap', name='Street Map', control=True).add_to(m)
folium.TileLayer('CartoDB positron', name='Light Map', control=True).add_to(m)
folium.TileLayer('CartoDB dark_matter', name='Dark Map', control=True).add_to(m)
print(f" β
Base map created at ({center_lat:.3f}, {center_lon:.3f}), zoom: {zoom}")
# STEP 3: Define layer configurations
layer_configs = []
# Add ADM1 layers if data exists
if 'ADM1' in merged_datasets:
layer_configs.extend([
{
'name': 'ADM1 - Deaths',
'data': merged_datasets['ADM1'],
'column': 'Total Deaths',
'color_scheme': 'Reds',
'show': True, # Show first layer by default
'opacity': 0.7
},
{
'name': 'ADM1 - Affected',
'data': merged_datasets['ADM1'],
'column': 'Total Affected',
'color_scheme': 'Oranges',
'show': False,
'opacity': 0.6
},
{
'name': 'ADM1 - Disasters',
'data': merged_datasets['ADM1'],
'column': 'Disaster Count',
'color_scheme': 'Blues',
'show': False,
'opacity': 0.8
}
])
# Add ADM2 layers if data exists
if 'ADM2' in merged_datasets:
layer_configs.extend([
{
'name': 'ADM2 - Deaths',
'data': merged_datasets['ADM2'],
'column': 'Total Deaths',
'color_scheme': 'Reds',
'show': False,
'opacity': 0.7
},
{
'name': 'ADM2 - Affected',
'data': merged_datasets['ADM2'],
'column': 'Total Affected',
'color_scheme': 'Oranges',
'show': False,
'opacity': 0.6
},
{
'name': 'ADM2 - Disasters',
'data': merged_datasets['ADM2'],
'column': 'Disaster Count',
'color_scheme': 'Blues',
'show': False,
'opacity': 0.8
}
])
# STEP 4: Create FeatureGroups and choropleth layers
layers_created = 0
legend_top_position = 80 # Start position
compact_spacing = 100 # Reduced spacing for compact legends
for config in layer_configs:
try:
# Check if we have data for this metric
values = config['data'][config['column']].replace(0, np.nan).dropna()
if len(values) == 0:
print(f" β οΈ No data for {config['name']} - skipping")
continue
print(f" π¨ Creating {config['name']}: {len(values)} areas, range {values.min():.0f}-{values.max():.0f}")
# Create FeatureGroup for this layer
fg = folium.FeatureGroup(
name=config['name'],
overlay=True, # Checkbox behavior
show=config['show']
).add_to(m)
# Calculate quantile bins for this specific metric
if len(values) >= 5:
quantiles = values.quantile([0, 0.2, 0.4, 0.6, 0.8, 1.0]).tolist()
bins = [0] + [q for q in quantiles if q > 0]
else:
bins = [0] + sorted(values.unique().tolist())
bins = sorted(list(set(bins))) # Remove duplicates
# Prepare data for choropleth (reset index to avoid conflicts)
layer_data = config['data'].copy()
layer_data = layer_data.reset_index(drop=True)
layer_data['choropleth_id'] = layer_data.index
layer_data[config['column']] = layer_data[config['column']].fillna(0)
# Create color mapping
color_map = create_color_mapping(layer_data, config['column'], config['color_scheme'], bins)
# Create GeoJson layer with custom styling
geojson_layer = folium.GeoJson(
layer_data.to_json(),
name=config['name'],
style_function=lambda feature, color_map=color_map: {
'fillColor': color_map.get(feature['properties']['choropleth_id'], '#gray'),
'color': 'white',
'weight': 1,
'fillOpacity': config['opacity'],
'opacity': 0.2
},
highlight_function=lambda feature: {
'weight': 3,
'color': '#666',
'dashArray': '',
'fillOpacity': 0.9
}
)
# Add GeoJson layer to FeatureGroup
geojson_layer.add_to(fg)
# Add hover tooltip
folium.GeoJsonTooltip(
fields=['Admin Unit', config['column']],
aliases=['Admin Unit:', f"{config['column']}:"],
localize=True,
sticky=True,
labels=True,
style="""
background-color: white;
border: 2px solid black;
border-radius: 3px;
box-shadow: 3px;
font-size: 12px;
padding: 10px;
"""
).add_to(geojson_layer)
# Create custom legend for this layer
add_custom_legend(m, config, bins, legend_top_position, layers_created == 0)
legend_top_position += compact_spacing
layers_created += 1
print(f" β
Added {config['name']} with {len(bins)-1} color classes")
except Exception as layer_error:
print(f" β Failed to create {config['name']}: {layer_error}")
continue
if layers_created == 0:
print("β No layers could be created")
return create_fallback_chart(admin_df)
# STEP 5: Add LayerControl LAST (critical for proper functionality)
folium.LayerControl(
position='topright',
collapsed=False,
autoZIndex=True
).add_to(m)
# STEP 6: Add JavaScript for legend visibility control
add_legend_visibility_control(m)
print(f" π SUCCESS: Created {layers_created} choropleth layers with DYNAMIC mappings!")
return m
except Exception as e:
print(f"β Error creating multi-layer choropleth: {e}")
import traceback
traceback.print_exc()
return create_fallback_chart(admin_df)
# ============================================================================
# FIXED MULTI-LAYER CHOROPLETH IMPLEMENTATION
# ============================================================================
def create_color_mapping(layer_data, column, color_scheme, bins):
"""Create color mapping for GeoJson styling"""
# Define color schemes
color_schemes = {
'Reds': ['#fee5d9', '#fcbba1', '#fc9272', '#fb6a4a', '#de2d26'],
'Oranges': ['#feedde', '#fdd0a2', '#fdae6b', '#fd8d3c', '#d94701'],
'Blues': ['#eff3ff', '#c6dbef', '#9ecae1', '#6baed6', '#2171b5']
}
colors = color_schemes.get(color_scheme, color_schemes['Blues'])
# Create color mapping based on quantile bins
color_map = {}
for idx, row in layer_data.iterrows():
value = row[column]
choropleth_id = row['choropleth_id']
# Find which bin this value falls into
color_idx = 0
for i in range(len(bins)-1):
if bins[i] <= value <= bins[i+1]:
color_idx = min(i, len(colors)-1)
break
color_map[choropleth_id] = colors[color_idx]
return color_map
def add_custom_legend(m, config, bins, top_position, is_visible):
"""Add a compact custom legend positioned on the left side"""
try:
# Define color schemes
color_schemes = {
'Reds': ['#fee5d9', '#fcbba1', '#fc9272', '#fb6a4a', '#de2d26'],
'Oranges': ['#feedde', '#fdd0a2', '#fdae6b', '#fd8d3c', '#d94701'],
'Blues': ['#eff3ff', '#c6dbef', '#9ecae1', '#6baed6', '#2171b5']
}
colors = color_schemes.get(config['color_scheme'], color_schemes['Blues'])
# Create legend entries
legend_entries = []
for i in range(min(len(bins)-1, len(colors))):
min_val = bins[i]
max_val = bins[i+1]
# Format labels based on value ranges - MORE COMPACT
if max_val < 1000:
if min_val == 0:
label = f"0-{max_val:.0f}"
else:
label = f"{min_val:.0f}-{max_val:.0f}"
elif max_val < 1000000:
if min_val == 0:
label = f"0-{max_val/1000:.1f}k"
else:
label = f"{min_val/1000:.1f}k-{max_val/1000:.1f}k"
else:
if min_val == 0:
label = f"0-{max_val/1000000:.1f}M"
else:
label = f"{min_val/1000000:.1f}M-{max_val/1000000:.1f}M"
color_idx = min(i, len(colors)-1)
legend_entries.append((label, colors[color_idx]))
# Create unique legend ID
legend_id = f"legend-{config['name'].replace(' ', '-').replace('-', '').lower()}"
# Create COMPACT legend HTML with reduced spacing
visibility = 'block' if is_visible else 'none'
legend_html = f"""
<div id="{legend_id}"
class="custom-legend compact-legend"
style="position: fixed;
left: 15px;
top: {top_position}px;
width: 100px;
background: rgba(255, 255, 255, 0.95);
border: 1px solid #666;
border-radius: 4px;
padding: 6px;
font-family: Arial, sans-serif;
font-size: 9px;
z-index: 1000;
box-shadow: 0 2px 6px rgba(0,0,0,0.1);
display: {visibility};">
<div style="font-weight: bold;
margin-bottom: 4px;
color: #333;
font-size: 10px;
border-bottom: 1px solid #ddd;
padding-bottom: 2px;">
{config['name']}
</div>
"""
# Add COMPACT legend entries with minimal spacing
for label, color in legend_entries:
legend_html += f"""
<div style="display: flex;
align-items: center;
margin-bottom: 2px;
padding: 0;">
<div style="width: 14px;
height: 10px;
background-color: {color};
margin-right: 6px;
border: 1px solid #777;
border-radius: 1px;
flex-shrink: 0;"></div>
<span style="font-size: 8px;
color: #444;
line-height: 1.0;">{label}</span>
</div>
"""
legend_html += "</div>"
# Add legend to map
m.get_root().html.add_child(folium.Element(legend_html))
print(f" π¨ Added compact legend for {config['name']} at position {top_position}px")
except Exception as e:
print(f" β οΈ Failed to create legend for {config['name']}: {e}")
def add_legend_visibility_control(m):
"""Add enhanced JavaScript to control legend visibility and dynamic positioning"""
legend_control_js = """
<script>
// Enhanced legend control with dynamic positioning
function repositionVisibleLegends() {
// Get all custom legends
var allLegends = document.querySelectorAll('.custom-legend');
var visibleLegends = [];
// Find visible legends and their layer names
var layerLegendMap = {
'ADM1 - Deaths': 'legend-adm1deaths',
'ADM1 - Affected': 'legend-adm1affected',
'ADM1 - Disasters': 'legend-adm1disasters',
'ADM2 - Deaths': 'legend-adm2deaths',
'ADM2 - Affected': 'legend-adm2affected',
'ADM2 - Disasters': 'legend-adm2disasters'
};
// Check layer control checkboxes to determine which legends should be visible
var layerControl = document.querySelector('.leaflet-control-layers');
if (layerControl) {
var checkboxes = layerControl.querySelectorAll('input[type="checkbox"]');
checkboxes.forEach(function(checkbox) {
var label = checkbox.nextSibling;
if (label && label.textContent) {
var layerName = label.textContent.trim();
var legendId = layerLegendMap[layerName];
if (legendId) {
var legend = document.getElementById(legendId);
if (legend) {
if (checkbox.checked) {
legend.style.display = 'block';
visibleLegends.push(legend);
} else {
legend.style.display = 'none';
}
}
}
}
});
}
// Dynamically position visible legends starting from top
var startTop = 80; // Starting position
var spacing = 100; // Compact spacing between legends
visibleLegends.forEach(function(legend, index) {
var newTop = startTop + (index * spacing);
legend.style.top = newTop + 'px';
console.log('Repositioned legend to: ' + newTop + 'px');
});
console.log('Repositioned ' + visibleLegends.length + ' visible legends');
}
// Wait for map to fully load
setTimeout(function() {
var layerControl = document.querySelector('.leaflet-control-layers');
if (layerControl) {
var checkboxes = layerControl.querySelectorAll('input[type="checkbox"]');
// Add event listeners to checkboxes
checkboxes.forEach(function(checkbox) {
checkbox.addEventListener('change', function() {
// Small delay to ensure DOM is updated
setTimeout(repositionVisibleLegends, 50);
});
});
// Initial positioning
repositionVisibleLegends();
console.log('Enhanced legend visibility and positioning controls initialized');
}
}, 1000);
// Also reposition on window resize
window.addEventListener('resize', function() {
setTimeout(repositionVisibleLegends, 100);
});
</script>
"""
m.get_root().html.add_child(folium.Element(legend_control_js))
print(" π§ Added enhanced JavaScript with dynamic legend positioning")
# ============================================================================
# SUBNATIONAL ANALYSIS FUNCTIONS (UNCHANGED - WORKING)
# ============================================================================
def create_subnational_analysis(df):
"""Create enhanced subnational analysis for single country"""
# Check if Admin Units column exists and has data
if 'Admin Units' not in df.columns:
print("No Admin Units column found. Creating summary analysis...")
return create_country_summary_analysis(df)
# Parse Admin Units data
admin_data = []
for idx, row in df.iterrows():
if pd.notna(row.get('Admin Units')):
admin_units = parse_admin_units(row['Admin Units'])
if admin_units:
for admin_unit in admin_units:
# Extract admin unit information
if isinstance(admin_unit, dict):
# Handle properly formatted admin data
admin_name = (admin_unit.get('adm1_name') or
admin_unit.get('adm2_name') or
admin_unit.get('name', 'Unknown'))
admin_code = (admin_unit.get('adm1_code') or
admin_unit.get('adm2_code') or
admin_unit.get('code', 'Unknown'))
admin_level = 'ADM1' if 'adm1_name' in admin_unit else 'ADM2'
else:
admin_name = str(admin_unit)
admin_code = 'Unknown'
admin_level = 'Unknown'
admin_data.append({
'Admin Unit': admin_name,
'Admin Code': admin_code,
'Admin Level': admin_level,
'Hazard Type': row['Disaster Type'],
'Disaster Subtype': row.get('Disaster Subtype', 'N/A'),
'Year': row['Start Year'],
'Start Date': row.get('Start Date', 'N/A'),
'Deaths': row['Total Deaths'],
'Affected': row['Total Affected'],
'Homeless': row.get('No Homeless', 0),
'Damage (000 USD)': row.get('Total Damages (\'000 US$)', 0),
'Country': row['Country'],
'ISO': row['ISO']
})
if not admin_data:
print("No valid admin units data found. Creating summary analysis...")
return create_country_summary_analysis(df)
# Create DataFrame from parsed data
admin_df = pd.DataFrame(admin_data)
# Create comprehensive subnational analysis
return create_subnational_visualization(admin_df, df)
def create_country_summary_analysis(df):
"""Create summary analysis when no subnational data is available"""
# Create summary by year and disaster type
summary = df.groupby(['Start Year', 'Disaster Type']).agg({
'DisNo.': 'count',
'Total Deaths': 'sum',
'Total Affected': 'sum',
'Total Damages (\'000 US$)': 'sum'
}).reset_index()
summary = summary.sort_values(['Start Year', 'Total Deaths'], ascending=[False, False]).head(30)
# Create table
fig = go.Figure(data=[
go.Table(
columnwidth=[80, 120, 60, 80, 100, 100],
header=dict(
values=['Year', 'Hazard Type', 'Count', 'Deaths', 'Affected', 'Damage (000 USD)'],
fill_color='lightblue',
align='center',
font=dict(size=12, color='black'),
height=40
),
cells=dict(
values=[
summary['Start Year'],
summary['Disaster Type'],
summary['DisNo.'],
summary['Total Deaths'].apply(lambda x: f"{x:,.0f}"),
summary['Total Affected'].apply(lambda x: f"{x:,.0f}"),
summary['Total Damages (\'000 US$)'].apply(lambda x: f"{x:,.0f}")
],
fill_color='white',
align='center',
font=dict(size=11),
height=30
)
)
])
country_name = df['Country'].iloc[0] if not df.empty else 'Selected Country'
fig.update_layout(
title=f'Disaster Summary for {country_name} (No Subnational Data Available)',
height=600,
margin=dict(t=80, b=20, l=20, r=20)
)
return fig
def create_subnational_visualization(admin_df, original_df):
"""Create comprehensive subnational visualization with table and charts"""
# Get country information
country_name = admin_df['Country'].iloc[0] if not admin_df.empty else 'Selected Country'
country_iso = admin_df['ISO'].iloc[0] if not admin_df.empty else 'XXX'
# Create subplots
fig = make_subplots(
rows=3, cols=2,
subplot_titles=[
'Subnational Disaster Data Table',
'', # Empty
'Disasters by Administrative Unit',
'Deaths by Administrative Unit',
'Affected Population by Administrative Unit',
'Economic Damage by Administrative Unit'
],
specs=[
[{"type": "table", "colspan": 2}, None],
[{"type": "bar"}, {"type": "bar"}],
[{"type": "bar"}, {"type": "pie"}]
],
vertical_spacing=0.08,
horizontal_spacing=0.20
)
# 1. Create detailed table
table_data = admin_df.sort_values(['Year', 'Deaths', 'Affected'], ascending=[False, False, False]).head(50)
fig.add_trace(
go.Table(
columnwidth=[100, 80, 100, 60, 60, 80, 80, 80, 100],
header=dict(
values=[
'Admin Unit', 'Admin Level', 'Hazard Type', 'Year',
'Deaths', 'Affected', 'Homeless', 'Damage (000 USD)', 'Start Date'
],
fill_color='lightblue',
align='center',
font=dict(size=11, color='black'),
height=35
),
cells=dict(
values=[
table_data['Admin Unit'],
table_data['Admin Level'],
table_data['Hazard Type'],
table_data['Year'],
table_data['Deaths'].apply(lambda x: f"{x:,.0f}"),
table_data['Affected'].apply(lambda x: f"{x:,.0f}"),
table_data['Homeless'].apply(lambda x: f"{x:,.0f}"),
table_data['Damage (000 USD)'].apply(lambda x: f"{x:,.0f}"),
table_data['Start Date']
],
fill_color='white',
align='center',
font=dict(size=10),
height=25
)
),
row=1, col=1
)
# 2. Aggregate data for charts
admin_summary = admin_df.groupby('Admin Unit').agg({
'Deaths': 'sum',
'Affected': 'sum',
'Damage (000 USD)': 'sum',
'Year': 'count' # Count of disasters
}).reset_index()
admin_summary.columns = ['Admin Unit', 'Deaths', 'Affected', 'Damage', 'Disaster Count']
admin_summary = admin_summary.sort_values('Disaster Count', ascending=True).tail(15) # Top 15
# 3. Disasters by Admin Unit
fig.add_trace(
go.Bar(
y=admin_summary['Admin Unit'],
x=admin_summary['Disaster Count'],
orientation='h',
name='Disaster Count',
marker_color='lightblue',
text=admin_summary['Disaster Count'],
textposition='outside'
),
row=2, col=1
)
# 4. Deaths by Admin Unit
deaths_data = admin_summary[admin_summary['Deaths'] > 0].sort_values('Deaths', ascending=True).tail(10)
if not deaths_data.empty:
fig.add_trace(
go.Bar(
y=deaths_data['Admin Unit'],
x=deaths_data['Deaths'],
orientation='h',
name='Deaths',
marker_color='red',
text=deaths_data['Deaths'].apply(lambda x: f"{x:,.0f}"),
textposition='outside'
),
row=2, col=2
)
# 5. Affected by Admin Unit
affected_data = admin_summary[admin_summary['Affected'] > 0].sort_values('Affected', ascending=True).tail(10)
if not affected_data.empty:
fig.add_trace(
go.Bar(
y=affected_data['Admin Unit'],
x=affected_data['Affected'],
orientation='h',
name='Affected',
marker_color='orange',
text=affected_data['Affected'].apply(lambda x: f"{x:,.0f}"),
textposition='outside'
),
row=3, col=1
)
# 6. Hazard types pie chart
hazard_summary = admin_df.groupby('Hazard Type').size().reset_index(name='Count')
fig.add_trace(
go.Pie(
labels=hazard_summary['Hazard Type'],
values=hazard_summary['Count'],
name='Hazard Types',
textinfo='label+percent'
),
row=3, col=2
)
# Update layout
fig.update_layout(
title=f'Subnational Disaster Analysis - {country_name} ({country_iso})',
height=1200,
showlegend=False,
margin=dict(t=120, b=60, l=100, r=100)
)
# Update axes labels
fig.update_xaxes(title_text="Number of Disasters", row=2, col=1)
fig.update_xaxes(title_text="Total Deaths", row=2, col=2)
fig.update_xaxes(title_text="Total Affected", row=3, col=1)
return fig
def parse_admin_units_for_choropleth(filtered_df):
"""Parse admin units data for choropleth mapping - optimized version"""
admin_data = []
try:
for idx, row in filtered_df.iterrows():
if pd.notna(row.get('Admin Units')):
admin_units = parse_admin_units(row['Admin Units'])
if admin_units:
for admin_unit in admin_units:
if isinstance(admin_unit, dict):
# Handle ADM1 data
if admin_unit.get('adm1_name') and admin_unit.get('adm1_code'):
admin_data.append({
'Admin Unit': admin_unit['adm1_name'],
'Admin Code': admin_unit.get('adm1_code'),
'Admin Level': 'ADM1',
'Deaths': row.get('Total Deaths', 0),
'Affected': row.get('Total Affected', 0),
'Damage (000 USD)': row.get('Total Damages (\'000 US$)', 0),
'Year': row.get('Start Year'),
'Country': row.get('Country'),
'ISO': row.get('ISO')
})
# Handle ADM2 data
if admin_unit.get('adm2_name') and admin_unit.get('adm2_code'):
admin_data.append({
'Admin Unit': admin_unit['adm2_name'],
'Admin Code': admin_unit.get('adm2_code'),
'Admin Level': 'ADM2',
'Deaths': row.get('Total Deaths', 0),
'Affected': row.get('Total Affected', 0),
'Damage (000 USD)': row.get('Total Damages (\'000 US$)', 0),
'Year': row.get('Start Year'),
'Country': row.get('Country'),
'ISO': row.get('ISO')
})
if admin_data:
print(f" π Parsed {len(admin_data)} admin unit records")
# Create summary by admin level
df_temp = pd.DataFrame(admin_data)
level_summary = df_temp.groupby('Admin Level').agg({
'Admin Unit': 'nunique',
'Deaths': 'sum',
'Affected': 'sum'
})
for level in level_summary.index:
print(f" {level}: {level_summary.loc[level, 'Admin Unit']} units, "
f"{level_summary.loc[level, 'Deaths']:.0f} deaths, "
f"{level_summary.loc[level, 'Affected']:.0f} affected")
return admin_data
except Exception as e:
print(f"β Error parsing admin units: {e}")
return []
def create_fallback_chart(admin_df):
"""Create fallback bar chart when choropleth fails"""
if admin_df.empty:
return None
try:
country_name = admin_df['Country'].iloc[0] if 'Country' in admin_df.columns else 'Unknown'
# Aggregate data by admin unit
admin_summary = admin_df.groupby('Admin Unit').agg({
'Deaths': 'sum',
'Affected': 'sum',
'Year': 'count'
}).reset_index()
admin_summary.columns = ['Admin Unit', 'Total Deaths', 'Total Affected', 'Disaster Count']
admin_summary = admin_summary.sort_values('Disaster Count', ascending=True).tail(15)
# Create horizontal bar chart
fig = px.bar(
admin_summary,
y='Admin Unit',
x='Disaster Count',
color='Total Deaths',
color_continuous_scale='Reds',
orientation='h',
title=f'{country_name} - Administrative Units Disaster Impact (Choropleth Failed)',
labels={'Disaster Count': 'Number of Disasters', 'Total Deaths': 'Total Deaths'},
text='Disaster Count'
)
fig.update_traces(textposition='outside')
fig.update_layout(
height=500,
margin=dict(l=150, r=50, t=80, b=50),
title_x=0.5
)
return fig
except Exception as e:
print(f"β Error creating fallback chart: {e}")
return None
# ============================================================================
# ADDITIONAL VISUALIZATION FUNCTIONS (UNCHANGED - WORKING)
# ============================================================================
def create_overview_stats(df):
"""Create overview statistics HTML"""
total_disasters = len(df)
total_deaths = df['Total Deaths'].sum()
total_affected = df['Total Affected'].sum()
total_damage = df['Total Damages (\'000 US$)'].sum()
countries_affected = df['ISO'].nunique()
disaster_types = df['Disaster Type'].nunique()
html = f"""
<div style='background-color: #f0f0f0; padding: 20px; border-radius: 10px;'>
<h3>Overview Statistics</h3>
<div style='display: grid; grid-template-columns: repeat(3, 1fr); gap: 15px;'>
<div style='background: white; padding: 15px; border-radius: 5px;'>
<h4 style='color: #e74c3c;'>Total Disasters</h4>
<p style='font-size: 24px; font-weight: bold;'>{total_disasters:,}</p>
</div>
<div style='background: white; padding: 15px; border-radius: 5px;'>
<h4 style='color: #e67e22;'>Total Deaths</h4>
<p style='font-size: 24px; font-weight: bold;'>{total_deaths:,.0f}</p>
</div>
<div style='background: white; padding: 15px; border-radius: 5px;'>
<h4 style='color: #f39c12;'>Total Affected</h4>
<p style='font-size: 24px; font-weight: bold;'>{total_affected:,.0f}</p>
</div>
<div style='background: white; padding: 15px; border-radius: 5px;'>
<h4 style='color: #27ae60;'>Economic Damage</h4>
<p style='font-size: 24px; font-weight: bold;'>${total_damage:,.0f}k</p>
</div>
<div style='background: white; padding: 15px; border-radius: 5px;'>
<h4 style='color: #3498db;'>Countries Affected</h4>
<p style='font-size: 24px; font-weight: bold;'>{countries_affected}</p>
</div>
<div style='background: white; padding: 15px; border-radius: 5px;'>
<h4 style='color: #9b59b6;'>Disaster Types</h4>
<p style='font-size: 24px; font-weight: bold;'>{disaster_types}</p>
</div>
</div>
</div>
"""
return html
def create_disaster_type_chart(df):
"""Create disaster type analysis charts"""
# Aggregate data by disaster type
disaster_stats = df.groupby('Disaster Type').agg({
'DisNo.': 'count',
'Total Deaths': 'sum',
'Total Affected': 'sum',
'Total Damages (\'000 US$)': 'sum'
}).reset_index()
disaster_stats.columns = ['Disaster Type', 'Count', 'Deaths', 'Affected', 'Damage']
disaster_stats = disaster_stats.sort_values('Count', ascending=False).head(15)
# Create subplots
fig = make_subplots(
rows=2, cols=2,
subplot_titles=('Number of Disasters', 'Total Deaths',
'Total Affected', 'Economic Damage (000 USD)'),
specs=[[{'type': 'bar'}, {'type': 'bar'}],
[{'type': 'bar'}, {'type': 'bar'}]]
)
# Add traces
fig.add_trace(
go.Bar(x=disaster_stats['Disaster Type'], y=disaster_stats['Count'],
name='Count', marker_color='lightblue'),
row=1, col=1
)
fig.add_trace(
go.Bar(x=disaster_stats['Disaster Type'], y=disaster_stats['Deaths'],
name='Deaths', marker_color='red'),
row=1, col=2
)
fig.add_trace(
go.Bar(x=disaster_stats['Disaster Type'], y=disaster_stats['Affected'],
name='Affected', marker_color='orange'),
row=2, col=1
)
fig.add_trace(
go.Bar(x=disaster_stats['Disaster Type'], y=disaster_stats['Damage'],
name='Damage', marker_color='green'),
row=2, col=2
)
# Update layout
fig.update_layout(height=800, showlegend=False, title_text="Disaster Impact by Type")
fig.update_xaxes(tickangle=-45)
return fig
def create_temporal_analysis(df):
"""Create temporal trend analysis"""
# Aggregate by year
yearly_stats = df.groupby('Start Year').agg({
'DisNo.': 'count',
'Total Deaths': 'sum',
'Total Affected': 'sum',
'Disaster Category': lambda x: x.value_counts().to_dict()
}).reset_index()
# Create figure with secondary y-axis
fig = make_subplots(
rows=2, cols=1,
subplot_titles=('Disaster Frequency Over Time', 'Deaths and Affected Population Over Time'),
specs=[[{"secondary_y": False}],
[{"secondary_y": True}]]
)
# Disaster frequency
fig.add_trace(
go.Scatter(x=yearly_stats['Start Year'], y=yearly_stats['DisNo.'],
mode='lines+markers', name='Number of Disasters',
line=dict(color='blue', width=2)),
row=1, col=1
)
# Deaths and affected
fig.add_trace(
go.Scatter(x=yearly_stats['Start Year'], y=yearly_stats['Total Deaths'],
mode='lines', name='Deaths', line=dict(color='red')),
row=2, col=1, secondary_y=False
)
fig.add_trace(
go.Scatter(x=yearly_stats['Start Year'], y=yearly_stats['Total Affected'],
mode='lines', name='Affected', line=dict(color='orange')),
row=2, col=1, secondary_y=True
)
# Update layout
fig.update_xaxes(title_text="Year", row=2, col=1)
fig.update_yaxes(title_text="Count", row=1, col=1)
fig.update_yaxes(title_text="Deaths", secondary_y=False, row=2, col=1)
fig.update_yaxes(title_text="Affected", secondary_y=True, row=2, col=1)
fig.update_layout(height=700, title_text="Temporal Analysis of Disasters")
return fig
def create_loss_analysis(df):
"""Create loss analysis by hazard category"""
# Aggregate by disaster category
category_stats = df.groupby('Disaster Category').agg({
'Total Deaths': 'sum',
'Total Affected': 'sum',
'Total Damages (\'000 US$)': 'sum',
'DisNo.': 'count'
}).reset_index()
# Create pie charts
fig = make_subplots(
rows=2, cols=2,
subplot_titles=('Distribution of Disasters', 'Distribution of Deaths',
'Distribution of Affected', 'Distribution of Economic Damage'),
specs=[[{'type': 'pie'}, {'type': 'pie'}],
[{'type': 'pie'}, {'type': 'pie'}]]
)
# Add pie charts
fig.add_trace(
go.Pie(labels=category_stats['Disaster Category'],
values=category_stats['DisNo.'], name='Count'),
row=1, col=1
)
fig.add_trace(
go.Pie(labels=category_stats['Disaster Category'],
values=category_stats['Total Deaths'], name='Deaths'),
row=1, col=2
)
fig.add_trace(
go.Pie(labels=category_stats['Disaster Category'],
values=category_stats['Total Affected'], name='Affected'),
row=2, col=1
)
fig.add_trace(
go.Pie(labels=category_stats['Disaster Category'],
values=category_stats['Total Damages (\'000 US$)'], name='Damage'),
row=2, col=2
)
fig.update_layout(height=700, title_text="Loss Distribution by Disaster Category")
return fig
def create_geographic_distribution(df):
"""Create geographic distribution map"""
# Aggregate by country
country_stats = df.groupby(['ISO', 'Country']).agg({
'DisNo.': 'count',
'Total Deaths': 'sum',
'Total Affected': 'sum',
'Total Damages (\'000 US$)': 'sum'
}).reset_index()
# Create choropleth map
fig = px.choropleth(
country_stats,
locations='ISO',
locationmode='ISO-3',
color='DisNo.',
hover_name='Country',
hover_data={
'DisNo.': ':,',
'Total Deaths': ':,.0f',
'Total Affected': ':,.0f',
'Total Damages (\'000 US$)': ':,.0f'
},
color_continuous_scale='YlOrRd',
labels={'DisNo.': 'Number of Disasters'},
title='Geographic Distribution of Disasters'
)
fig.update_layout(
geo=dict(
showframe=False,
showcoastlines=True,
projection_type='natural earth'
),
height=600
)
return fig
# ============================================================================
# UPDATED VISUALIZATION FUNCTIONS
# ============================================================================
def create_visualizations(filtered_df, dashboard, gpkg_file_path=None):
"""Create all visualizations for the dashboard with FIXED subnational analysis"""
if filtered_df.empty:
print("No data available for selected filters")
return
# Create tabs for different visualizations
tab_contents = []
tab_titles = []
# Tab 1: Overview Statistics
overview_html = create_overview_stats(filtered_df)
tab_contents.append(widgets.HTML(overview_html))
tab_titles.append('Overview')
# Tab 2: Disaster Type Analysis
disaster_fig = create_disaster_type_chart(filtered_df)
tab_contents.append(widgets.Output())
with tab_contents[-1]:
disaster_fig.show()
tab_titles.append('Disaster Types')
# Tab 3: Temporal Analysis
temporal_fig = create_temporal_analysis(filtered_df)
tab_contents.append(widgets.Output())
with tab_contents[-1]:
temporal_fig.show()
tab_titles.append('Temporal Trends')
# Tab 4: Loss Analysis
loss_fig = create_loss_analysis(filtered_df)
tab_contents.append(widgets.Output())
with tab_contents[-1]:
loss_fig.show()
tab_titles.append('Loss Analysis')
# Tab 5: Geographic Distribution (if multiple countries)
if filtered_df['ISO'].nunique() > 1:
geo_fig = create_geographic_distribution(filtered_df)
tab_contents.append(widgets.Output())
with tab_contents[-1]:
geo_fig.show()
tab_titles.append('Geographic Distribution')
# Tab 6: FIXED Subnational Analysis (if single country selected)
if filtered_df['ISO'].nunique() == 1:
# Create the main subnational analysis
subnational_fig = create_subnational_analysis(filtered_df)
if subnational_fig:
tab_contents.append(widgets.Output())
with tab_contents[-1]:
subnational_fig.show()
# Add FIXED multi-layer choropleth if admin data exists and GPKG provided
if 'Admin Units' in filtered_df.columns and gpkg_file_path and os.path.exists(gpkg_file_path):
admin_data_count = filtered_df['Admin Units'].notna().sum()
print(f"\nπ Records with Admin Units: {admin_data_count}")
if admin_data_count > 0:
print(f"\nπΊοΈ CREATING FIXED MULTI-LAYER CHOROPLETH MAP")
print("="*60)
# Parse admin data for choropleth
admin_data = parse_admin_units_for_choropleth(filtered_df)
if admin_data:
admin_df = pd.DataFrame(admin_data)
print(f"β
Parsed {len(admin_df)} admin records")
print(f"ποΈ Admin levels: {admin_df['Admin Level'].value_counts().to_dict()}")
print(f"π Unique admin units: {admin_df['Admin Unit'].nunique()}")
# Create the FIXED multi-layer choropleth
choropleth_map = create_working_multi_layer_choropleth(
admin_df, filtered_df, gpkg_file_path
)
if choropleth_map:
print(f"π SUCCESS: Fixed multi-layer choropleth created!")
print(f"π Features:")
print(f" β
6 independent layers (ADM1/ADM2 Γ Deaths/Affected/Count)")
print(f" β
Proper color scaling (Reds/Oranges/Blues)")
print(f" β
Dynamic legends (250px width, left-side)")
print(f" β
Layer visibility controls")
print(f" β
Hover tooltips and info")
display(choropleth_map)
else:
print("β Failed to create choropleth map - check data and boundaries")
else:
print("β No valid admin data found after parsing")
else:
print("β οΈ No admin units data available for choropleth mapping")
else:
if not gpkg_file_path:
print("β οΈ No GPKG file path provided - choropleth mapping disabled")
elif not os.path.exists(gpkg_file_path):
print(f"β οΈ GPKG file not found: {gpkg_file_path}")
else:
print("β οΈ Admin Units column not found in data")
tab_titles.append('Subnational Analysis')
# Create and display tabs
tabs = widgets.Tab(children=tab_contents)
for i, title in enumerate(tab_titles):
tabs.set_title(i, title)
display(tabs)
def create_country_selector(dashboard, gpkg_file_path=None):
"""Create interactive country selector widget with FIXED choropleth"""
# Get unique countries with their ISO codes
countries_df = dashboard.df[['Country', 'ISO']].drop_duplicates().sort_values('Country')
country_list = [f"{row['Country']} ({row['ISO']})" for _, row in countries_df.iterrows()]
# Create dropdown widget
country_dropdown = widgets.Dropdown(
options=['All Countries'] + country_list,
value='All Countries',
description='Select Country:',
style={'description_width': 'initial'},
layout=widgets.Layout(width='400px')
)
# Create date range slider
year_range = widgets.IntRangeSlider(
value=[2000, 2024],
min=int(dashboard.df['Start Year'].min()),
max=int(dashboard.df['Start Year'].max()),
step=1,
description='Year Range:',
style={'description_width': 'initial'},
layout=widgets.Layout(width='600px')
)
# Output area for visualizations
output = widgets.Output()
def update_dashboard(change):
"""Update dashboard based on selection"""
with output:
clear_output(wait=True)
# Filter data based on selection
filtered_df = dashboard.df.copy()
# Filter by year range
filtered_df = filtered_df[
(filtered_df['Start Year'] >= year_range.value[0]) &
(filtered_df['Start Year'] <= year_range.value[1])
]
# Filter by country if not "All Countries"
if country_dropdown.value != 'All Countries':
country_iso = country_dropdown.value.split('(')[-1].strip(')')
filtered_df = filtered_df[filtered_df['ISO'] == country_iso]
display(HTML(f"<h2>Disaster Analysis for {country_dropdown.value}</h2>"))
# Show choropleth availability info for single country
if gpkg_file_path and 'Admin Units' in filtered_df.columns:
admin_count = filtered_df['Admin Units'].notna().sum()
if admin_count > 0:
coverage = (admin_count / len(filtered_df)) * 100
display(HTML(f"<p style='color: green;'><strong>β
Enhanced choropleth mapping available!</strong><br>"
f"π {admin_count}/{len(filtered_df)} records have subnational data ({coverage:.1f}% coverage)<br>"
f"πΊοΈ Look for the 'Subnational Analysis' tab with interactive maps</p>"))
else:
display(HTML(f"<p style='color: orange;'>β οΈ No subnational data available for choropleth mapping</p>"))
else:
display(HTML("<h2>Global Disaster Analysis</h2>"))
# Generate visualizations with FIXED choropleth
create_visualizations(filtered_df, dashboard, gpkg_file_path)
# Link widgets to update function
country_dropdown.observe(update_dashboard, names='value')
year_range.observe(update_dashboard, names='value')
# Display instructions
instructions_html = """
<div style='background-color: #f0f8ff; padding: 15px; border-radius: 8px; margin-bottom: 20px; border-left: 4px solid #007acc;'>
<h3 style='margin-top: 0; color: #007acc;'>πΊοΈ Instructions</h3>
<p><strong>For enhanced mapping:</strong></p>
<ul>
<li>Select a single country with subnational data</li>
<li>Navigate to the 'Subnational Analysis' tab</li>
<li>Use layer checkboxes to toggle between metrics independently</li>
<li>Legends automatically show/hide based on layer visibility</li>
<li>Hover over administrative units for detailed information</li>
</ul>
<p><strong>Available layers:</strong> ADM1/ADM2 levels Γ Deaths (Red) / Affected (Orange) / Disaster Count (Blue)</p>
</div>
"""
# Display widgets
display(widgets.VBox([
widgets.HTML("<h1>EMDAT Disaster Data Dashboard</h1>"),
widgets.HTML(instructions_html),
country_dropdown,
year_range,
output
]))
# Initial display
update_dashboard(None)
def quick_subnational_check(dashboard):
"""Quick check of subnational data availability across all countries"""
countries_with_admin = []
for country_iso in dashboard.df['ISO'].unique():
country_df = dashboard.df[dashboard.df['ISO'] == country_iso]
country_name = country_df['Country'].iloc[0]
if 'Admin Units' in country_df.columns:
admin_count = country_df['Admin Units'].notna().sum()
total_count = len(country_df)
if admin_count > 0:
coverage = (admin_count / total_count) * 100
countries_with_admin.append({
'Country': country_name,
'ISO': country_iso,
'Total_Disasters': total_count,
'With_Admin_Data': admin_count,
'Coverage_Percent': coverage
})
if countries_with_admin:
admin_summary_df = pd.DataFrame(countries_with_admin)
admin_summary_df = admin_summary_df.sort_values('Coverage_Percent', ascending=False)
print("Countries with Subnational Data:")
print("="*50)
for _, row in admin_summary_df.head(20).iterrows():
print(f"{row['Country']} ({row['ISO']}): {row['Coverage_Percent']:.1f}% coverage "
f"({row['With_Admin_Data']}/{row['Total_Disasters']} disasters)")
return admin_summary_df
else:
print("No countries found with subnational admin data.")
return None
def run_complete_dashboard():
"""Run the complete EMDAT dashboard with FIXED choropleth"""
print("=" * 60)
print("EMDAT DISASTER DATA DASHBOARD - FIXED CHOROPLETH")
print("=" * 60)
print("\nInitializing dashboard components...")
# Check if data is loaded
if dashboard.df is None:
print("Error: No data loaded. Please check your Excel file path.")
return
# Check GPKG file availability
if os.path.exists(GPKG_FILE_PATH):
print(f"β
GPKG boundaries available: {GPKG_FILE_PATH}")
print("πΊοΈ Enhanced choropleth mapping enabled!")
else:
print(f"β οΈ GPKG file not available - choropleth mapping disabled")
print(f"π Expected path: {GPKG_FILE_PATH}")
# Display data quality report
print("\nπ Data Quality Report:")
print(f" - Total records: {len(dashboard.df):,}")
print(f" - Missing values in key columns:")
for col in ['Total Deaths', 'Total Affected', 'Total Damages (\'000 US$)']:
if col in dashboard.df.columns:
missing = dashboard.df[col].isna().sum()
pct = (missing / len(dashboard.df)) * 100
print(f" β’ {col}: {missing:,} ({pct:.1f}%)")
# Check subnational data availability
if 'Admin Units' in dashboard.df.columns:
admin_count = dashboard.df['Admin Units'].notna().sum()
admin_pct = (admin_count / len(dashboard.df)) * 100
print(f"\nποΈ Subnational Data Availability:")
print(f" - Records with admin units: {admin_count:,} ({admin_pct:.1f}%)")
if admin_count > 0:
# Show top countries with subnational data
countries_with_admin = []
for country_iso in dashboard.df['ISO'].unique()[:10]: # Check top 10 countries
country_df = dashboard.df[dashboard.df['ISO'] == country_iso]
country_admin_count = country_df['Admin Units'].notna().sum()
if country_admin_count > 0:
coverage = (country_admin_count / len(country_df)) * 100
countries_with_admin.append({
'Country': country_df['Country'].iloc[0],
'ISO': country_iso,
'Coverage': coverage,
'Records': country_admin_count
})
if countries_with_admin:
countries_with_admin.sort(key=lambda x: x['Coverage'], reverse=True)
print(f" - Top countries for choropleth mapping:")
for country in countries_with_admin[:5]:
print(f" β’ {country['Country']} ({country['ISO']}): {country['Coverage']:.1f}% coverage")
# Display summary statistics
print("\nπ Dataset Summary:")
print(f" - Countries: {dashboard.df['ISO'].nunique()}")
print(f" - Time period: {dashboard.df['Start Year'].min()}-{dashboard.df['Start Year'].max()}")
print(f" - Disaster types: {dashboard.df['Disaster Type'].nunique()}")
print(f" - Total deaths: {dashboard.df['Total Deaths'].sum():,.0f}")
print(f" - Total affected: {dashboard.df['Total Affected'].sum():,.0f}")
print(f"\nπ Dashboard Features:")
print(f" β
Interactive country and time filtering")
print(f" β
Multiple visualization tabs")
print(f" β
Statistical analysis and charts")
if os.path.exists(GPKG_FILE_PATH):
print(f" β
Multi-layer choropleth maps with:")
print(f" β’ 6 independent layers (ADM1/ADM2 Γ 3 metrics)")
print(f" β’ Quantile-based color scaling")
print(f" β’ 250px left-side legends with visibility control")
print(f" β’ Layer checkboxes for independent toggling")
print(f" β’ Hover tooltips and administrative unit info")
else:
print(f" π Bar chart fallbacks when choropleth unavailable")
print("\nβ
Dashboard ready! Use the interactive controls to explore the data.")
print("-" * 60)
# ============================================================================
# MAIN EXECUTION
# ============================================================================
# Check file availability
print(f"π INITIALIZING EMDAT DASHBOARD WITH FIXED CHOROPLETH")
print("="*60)
print(f"\nπ Checking file availability...")
if os.path.exists(EXCEL_FILE_PATH):
print(f"β
EMDAT Excel file found: {EXCEL_FILE_PATH}")
else:
print(f"β EMDAT Excel file not found: {EXCEL_FILE_PATH}")
print(f" Please update EXCEL_FILE_PATH in the configuration above")
if os.path.exists(GPKG_FILE_PATH):
print(f"β
GPKG file found: {GPKG_FILE_PATH}")
else:
print(f"β GPKG file not found: {GPKG_FILE_PATH}")
print(f" Please update GPKG_FILE_PATH in the configuration above")
print(f" Choropleth maps will not be available without GPKG boundaries")
# Initialize the dashboard
print(f"\nποΈ Initializing EMDAT Dashboard...")
dashboard = EMDATDashboard(EXCEL_FILE_PATH)
if dashboard.df is not None:
# Check subnational data availability
print("\nChecking subnational data availability...")
subnational_summary = quick_subnational_check(dashboard)
# Run the dashboard
run_complete_dashboard()
print(f"\nπ STARTING INTERACTIVE DASHBOARD")
print("="*50)
print("π― Select a country with subnational data to see enhanced choropleth maps!")
print("π Use the controls below to filter and explore the data.")
print("\nπ‘ Pro tip: Countries with higher subnational coverage will have better choropleth maps.")
# Start the interactive dashboard with FIXED implementation
create_country_selector(dashboard, GPKG_FILE_PATH if os.path.exists(GPKG_FILE_PATH) else None)
else:
print("β Cannot start dashboard - data not loaded")
print("Please check your EXCEL_FILE_PATH configuration and re-run the script.")
β
GeoPandas available - choropleth maps enabled
π INITIALIZING EMDAT DASHBOARD WITH FIXED CHOROPLETH
============================================================
π Checking file availability...
β
EMDAT Excel file found: emdat_2025.xlsx
β
GPKG file found: X:/Work/Geodata/ADM/ADM_GAUL.gpkg
ποΈ Initializing EMDAT Dashboard...
Loading data from emdat_2025.xlsx...
Data loaded successfully!
Shape: (15739, 46)
Years covered: 1900 - 2025
Countries: 228
Disaster types: 10
Data preparation completed!
Checking subnational data availability...
Countries with Subnational Data:
==================================================
Saint Helena (SHN): 100.0% coverage (1/1 disasters)
Northern Mariana Islands (MNP): 100.0% coverage (5/5 disasters)
Timor-Leste (TLS): 100.0% coverage (10/10 disasters)
Cayman Islands (CYM): 100.0% coverage (7/7 disasters)
Qatar (QAT): 100.0% coverage (1/1 disasters)
South Sudan (SSD): 95.0% coverage (19/20 disasters)
North Macedonia (MKD): 91.3% coverage (21/23 disasters)
Burundi (BDI): 90.2% coverage (46/51 disasters)
State of Palestine (PSE): 87.5% coverage (7/8 disasters)
Angola (AGO): 85.7% coverage (48/56 disasters)
Saudi Arabia (SAU): 85.2% coverage (23/27 disasters)
Bosnia and Herzegovina (BIH): 83.9% coverage (26/31 disasters)
Croatia (HRV): 82.9% coverage (29/35 disasters)
Serbia (SRB): 81.8% coverage (27/33 disasters)
Namibia (NAM): 81.5% coverage (22/27 disasters)
Rwanda (RWA): 80.0% coverage (36/45 disasters)
Seychelles (SYC): 80.0% coverage (4/5 disasters)
Suriname (SUR): 80.0% coverage (4/5 disasters)
Serbia Montenegro (SCG): 78.6% coverage (11/14 disasters)
Tajikistan (TJK): 78.3% coverage (54/69 disasters)
============================================================
EMDAT DISASTER DATA DASHBOARD - FIXED CHOROPLETH
============================================================
Initializing dashboard components...
β
GPKG boundaries available: X:/Work/Geodata/ADM/ADM_GAUL.gpkg
πΊοΈ Enhanced choropleth mapping enabled!
π Data Quality Report:
- Total records: 15,739
- Missing values in key columns:
β’ Total Deaths: 0 (0.0%)
β’ Total Affected: 0 (0.0%)
β’ Total Damages ('000 US$): 0 (0.0%)
ποΈ Subnational Data Availability:
- Records with admin units: 8,428 (53.5%)
- Top countries for choropleth mapping:
β’ Guatemala (GTM): 63.9% coverage
β’ Myanmar (MMR): 55.7% coverage
β’ India (IND): 49.9% coverage
β’ United States of America (USA): 48.4% coverage
β’ Saint Vincent and the Grenadines (VCT): 44.0% coverage
π Dataset Summary:
- Countries: 228
- Time period: 1900-2025
- Disaster types: 10
- Total deaths: 23,058,268
- Total affected: 8,810,390,058
π Dashboard Features:
β
Interactive country and time filtering
β
Multiple visualization tabs
β
Statistical analysis and charts
β
FIXED multi-layer choropleth maps with:
β’ 6 independent layers (ADM1/ADM2 Γ 3 metrics)
β’ Proper quantile-based color scaling
β’ 250px left-side legends with visibility control
β’ Layer checkboxes for independent toggling
β’ Hover tooltips and administrative unit info
β
Dashboard ready! Use the interactive controls to explore the data.
------------------------------------------------------------
π STARTING INTERACTIVE DASHBOARD
==================================================
π― Select a country with subnational data to see enhanced choropleth maps!
π Use the controls below to filter and explore the data.
π‘ Pro tip: Countries with higher subnational coverage will have better choropleth maps.