"""
This module contains functions to predict runoff using a machine learning model.
The model weights are loaded from the 'swmm_model' directory.
The predict_runoff function takes a SWMM model object as input and returns an array of
predicted runoff values for each subcatchment.
This is a pure NumPy implementation - no TensorFlow/Keras required.
"""
import logging
import os
import threading
import time
import numpy as np
import swmmio
logger = logging.getLogger(__name__)
[docs]
class SimpleMLPModel:
"""
A simple MLP model for inference using pure NumPy.
Architecture:
Input (8) -> Normalization -> Dense(8, ReLU) -> Dense(8, ReLU) -> Dense(1, ReLU)
"""
def __init__(self, weights_path: str):
"""
Load model weights from .npz file.
Parameters
----------
weights_path : str
Path to the .npz file containing model weights.
"""
try:
weights = np.load(weights_path)
except FileNotFoundError:
logger.error(f"Cannot load model weights: {weights_path}")
raise
# Normalization parameters
self.norm_mean = weights["norm_mean"]
self.norm_variance = weights["norm_variance"]
# Dense layer weights
self.dense_0_kernel = weights["dense_0_kernel"]
self.dense_0_bias = weights["dense_0_bias"]
self.dense_1_kernel = weights["dense_1_kernel"]
self.dense_1_bias = weights["dense_1_bias"]
self.dense_2_kernel = weights["dense_2_kernel"]
self.dense_2_bias = weights["dense_2_bias"]
logger.debug("Model weights loaded successfully")
def _normalize(self, x: np.ndarray) -> np.ndarray:
"""Apply normalization layer."""
return (x - self.norm_mean) / np.sqrt(self.norm_variance + 1e-7)
def _relu(self, x: np.ndarray) -> np.ndarray:
"""Apply ReLU activation."""
return np.maximum(0, x)
[docs]
def predict(self, x: np.ndarray) -> np.ndarray:
"""
Run inference on input data.
Parameters
----------
x : np.ndarray
Input array of shape (batch_size, 8).
Returns
-------
np.ndarray
Predictions of shape (batch_size, 1).
"""
# Ensure float32
x = np.asarray(x, dtype=np.float32)
# Normalization
x = self._normalize(x)
# Dense layer 0
x = x @ self.dense_0_kernel + self.dense_0_bias
x = self._relu(x)
# Dense layer 1
x = x @ self.dense_1_kernel + self.dense_1_bias
x = self._relu(x)
# Dense layer 2 (output)
x = x @ self.dense_2_kernel + self.dense_2_bias
x = self._relu(x)
return x
_model: SimpleMLPModel | None = None
_model_lock = threading.Lock()
def _default_weights_path() -> str:
current_directory = os.path.dirname(os.path.abspath(__file__))
return os.path.join(current_directory, "..", "swmm_model", "weights.npz")
def _get_weights_path() -> str:
"""Resolve model weights path from env/settings with a safe default."""
env_path = os.environ.get("CATCHMENT_SIMULATION_WEIGHTS_PATH")
if env_path:
return env_path
try:
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
except ImportError:
# Predictor can run outside Django environments.
return _default_weights_path()
try:
configured_path = getattr(settings, "SWMM_MODEL_WEIGHTS_PATH", None)
except ImproperlyConfigured:
return _default_weights_path()
if configured_path:
return str(configured_path)
return _default_weights_path()
def _get_or_create_model() -> tuple[SimpleMLPModel, bool]:
"""Return model instance and whether it was created in this call."""
global _model
if _model is None:
with _model_lock:
if _model is None:
_model = SimpleMLPModel(_get_weights_path())
return _model, True
return _model, False
def _get_model() -> SimpleMLPModel:
"""Get or create the model instance (lazy loading)."""
model, _ = _get_or_create_model()
return model
[docs]
def preload_model() -> bool:
"""Eagerly load model weights into process memory.
Returns True when model was not loaded before this call.
"""
started_at = time.perf_counter()
_, loaded_now = _get_or_create_model()
if loaded_now:
elapsed_ms = (time.perf_counter() - started_at) * 1000
logger.info("ANN predictor model loaded in %.1f ms", elapsed_ms)
return True
return False
[docs]
def predict_runoff(swmmio_model: swmmio.Model) -> np.ndarray:
"""
Predict runoff using a machine learning model.
Parameters
----------
swmmio_model : swmmio.Model
A SWMM model object with subcatchment data.
Returns
-------
np.ndarray
An array of predicted runoff values for each subcatchment.
Example
-------
>>> swmm_model = swmmio.Model("example.inp")
>>> predict_runoff(swmm_model)
array([0.1, 0.2, 0.3, 0.4, 0.5])
"""
model = _get_model()
data = swmmio_model.subcatchments.dataframe[
[
"PercImperv",
"Width",
"PercSlope",
"N-Imperv",
"N-Perv",
"S-Imperv",
"S-Perv",
"PctZero",
]
].values
return model.predict(data).flatten()