"""Views and helper functions for the main Django application."""
import datetime
import json
import logging
import os
import re
import shutil
import uuid
from codecs import getincrementaldecoder
from contextlib import suppress
from copy import deepcopy
from functools import lru_cache, wraps
from io import BytesIO, StringIO
import numpy as np
import pandas as pd
import swmmio
from django import forms
from django.conf import settings
from django.contrib import messages
from django.contrib.auth import get_user_model
from django.contrib.auth.decorators import login_required
from django.core.cache import cache
from django.core.exceptions import ValidationError
from django.core.files.uploadhandler import FileUploadHandler, StopUpload
from django.http import (
FileResponse,
HttpRequest,
HttpResponse,
HttpResponseForbidden,
HttpResponseRedirect,
JsonResponse,
)
from django.http.multipartparser import MultiPartParserError
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse
from django.views.decorators.http import require_GET, require_POST
from pydantic import ValidationError as PydanticValidationError
from pyswmm import Simulation
from catchment_simulation.analysis import runoff_volume, time_to_peak
from catchment_simulation.catchment_features_simulation import FeaturesSimulation
from catchment_simulation.schemas import SimulationMethodParams
from main.forms import ContactForm, SimulationForm, TimeseriesForm, UserProfileForm
from main.predictor import predict_runoff
from main.schemas import ContactMessage
from main.services import send_message
logger = logging.getLogger(__name__)
SIM_FORM_STATE_SESSION_KEY = "sim_form_state"
TS_FORM_STATE_SESSION_KEY = "ts_form_state"
SIM_FORM_STATE_FIELDS = ("option", "start", "stop", "step", "catchment_name")
TS_FORM_STATE_FIELDS = ("mode", "feature", "start", "stop", "step", "catchment_name")
EXCEL_CONTENT_TYPE = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
SIM_RESULT_TOKEN_SESSION_KEY = "sim_result_token"
TS_RESULT_TOKEN_SESSION_KEY = "ts_result_token"
RESULT_CACHE_TTL_SECONDS = 30 * 60
MAX_RESULT_CACHE_BYTES = 2 * 1024 * 1024
UPLOAD_SUBDIR = "uploaded_files"
SI_FLOW_UNITS = frozenset({"CMS", "LPS", "MLD"})
US_FLOW_UNITS = frozenset({"CFS", "GPM", "MGD"})
UPLOAD_SESSION_KEYS = (
SIM_FORM_STATE_SESSION_KEY,
TS_FORM_STATE_SESSION_KEY,
"_subcatchment_ids",
"_subcatchment_ids_file",
)
SIM_SESSION_DEFAULTS = {
"show_download_button": False,
"chart_config": None,
"results_columns": [],
"results_data": [],
"feature_name": "",
"output_file_name": None,
"download_token": None,
}
TS_SESSION_DEFAULTS = {
"ts_chart_config": None,
"ts_time_to_peak": None,
"ts_runoff_volume": None,
"ts_show_results": False,
"output_file_name": None,
"download_token": None,
}
SESSION_VARIABLES_TO_CLEAR = (
"show_download_button",
"chart_config",
"results_columns",
"results_data",
"feature_name",
"output_file_name",
"ts_chart_config",
"ts_time_to_peak",
"ts_runoff_volume",
"ts_show_results",
"ts_output_file_name",
SIM_RESULT_TOKEN_SESSION_KEY,
TS_RESULT_TOKEN_SESSION_KEY,
SIM_FORM_STATE_SESSION_KEY,
TS_FORM_STATE_SESSION_KEY,
)
[docs]
def ajax_login_required(view_func):
"""Like ``@login_required`` but returns 401 JSON for AJAX requests."""
@wraps(view_func)
def wrapper(request, *args, **kwargs):
if not request.user.is_authenticated:
login_url = settings.LOGIN_URL
is_ajax = request.headers.get("X-Requested-With") == "XMLHttpRequest"
if is_ajax:
return JsonResponse(
{"error": "Authentication required.", "login_url": login_url},
status=401,
)
return redirect(f"{login_url}?next={request.path}")
return view_func(request, *args, **kwargs)
return wrapper
def _load_chart_json(filename: str, x_key: str, y_key: str) -> list[dict]:
"""Load JSON from ``data/`` dir and validate it has numeric x/y keys."""
data_dir = os.path.join(settings.BASE_DIR, "data")
path = os.path.join(data_dir, filename)
with open(path, encoding="utf-8") as file:
payload = json.load(file)
if not isinstance(payload, list):
raise ValueError(f"{filename} must contain a JSON list")
for idx, row in enumerate(payload):
if not isinstance(row, dict):
raise ValueError(f"{filename} row {idx} is not an object")
if x_key not in row or y_key not in row:
raise ValueError(f"{filename} row {idx} missing required keys")
if not isinstance(row[x_key], int | float) or not isinstance(row[y_key], int | float):
raise ValueError(f"{filename} row {idx} contains non-numeric values")
return payload
@lru_cache(maxsize=1)
def _load_static_chart_data_cached() -> dict:
"""Load static chart data once per process."""
try:
return {
"slope": _load_chart_json("df_slope.json", "slope", "runoff"),
"area": _load_chart_json("df_area.json", "area", "runoff"),
"width": _load_chart_json("df_width.json", "width", "runoff"),
}
except Exception:
logger.exception("Failed to load static chart data from JSON files")
return {"slope": [], "area": [], "width": []}
def _load_static_chart_data() -> dict:
"""Return a defensive copy so cache data cannot be mutated by callers."""
return deepcopy(_load_static_chart_data_cached())
def _result_cache_key(scope: str, user_id: int, token: str) -> str:
return f"result:{scope}:{user_id}:{token}"
def _delete_cached_result(scope: str, user_id: int, token: str | None) -> None:
if token:
cache.delete(_result_cache_key(scope, user_id, token))
def _store_cached_result(scope: str, user_id: int, payload: dict) -> str:
"""Store result payload in cache and return opaque token."""
serialized = json.dumps(payload, separators=(",", ":"))
payload_size = len(serialized.encode("utf-8"))
if payload_size > MAX_RESULT_CACHE_BYTES:
raise ResultPayloadTooLargeError("Result payload too large")
token = uuid.uuid4().hex
cache.set(
_result_cache_key(scope, user_id, token), serialized, timeout=RESULT_CACHE_TTL_SECONDS
)
return token
def _load_cached_result(scope: str, user_id: int, token: str | None) -> dict | None:
if not token:
return None
serialized = cache.get(_result_cache_key(scope, user_id, token))
if serialized is None:
return None
try:
payload = json.loads(serialized)
except (TypeError, json.JSONDecodeError):
_delete_cached_result(scope, user_id, token)
return None
if not isinstance(payload, dict):
_delete_cached_result(scope, user_id, token)
return None
return payload
def _safe_download_filename(name: str | None, fallback: str, extension: str = ".xlsx") -> str:
"""Sanitize user-facing filenames used in Content-Disposition."""
candidate = os.path.basename(name or "").strip()
if not candidate:
candidate = fallback
candidate = re.sub(r'[\r\n"]+', "_", candidate)
base_name = os.path.splitext(candidate)[0] or os.path.splitext(fallback)[0] or "results"
if not extension:
normalized_extension = ""
else:
normalized_extension = extension if extension.startswith(".") else f".{extension}"
safe_name = f"{base_name}{normalized_extension}"
return safe_name[:150]
def _normalize_sheet_name(raw_name: str, used_names: set[str]) -> str:
"""Create Excel-safe unique sheet names (<=31 chars)."""
sanitized = "".join("_" if char in r"[]:*?/\\" else char for char in (raw_name or "sheet"))
sanitized = sanitized.strip() or "sheet"
base = sanitized[:31]
candidate = base
suffix = 1
while candidate.lower() in used_names:
suffix_text = f"_{suffix}"
candidate = f"{base[: 31 - len(suffix_text)]}{suffix_text}"
suffix += 1
used_names.add(candidate.lower())
return candidate
def _is_valid_result_token(token: str | None) -> bool:
return bool(token and re.fullmatch(r"[0-9a-f]{32}", token))
def _user_upload_dir(user_id: int) -> str:
"""Return absolute per-user upload directory rooted in MEDIA_ROOT."""
return os.path.join(settings.MEDIA_ROOT, UPLOAD_SUBDIR, str(user_id))
def _safe_remove_file(path: str | None) -> None:
"""Best-effort file removal safe for concurrent requests."""
if not path:
return
with suppress(OSError):
os.remove(path)
def _file_too_large_message() -> str:
return f"File too large. Maximum size is {MAX_UPLOAD_SIZE // (1024 * 1024)} MB."
def _timestamp_suffix() -> str:
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
def _default_uploaded_file_path(request: HttpRequest) -> str:
return request.session.get(
"uploaded_file_path", os.path.abspath("catchment_simulation/example.inp")
)
def _clear_upload_session_state(request: HttpRequest) -> None:
for key in UPLOAD_SESSION_KEYS:
request.session.pop(key, None)
def _set_uploaded_file_for_session(request: HttpRequest, file_path: str) -> None:
old_path = request.session.get("uploaded_file_path")
if old_path and old_path != file_path:
_safe_remove_file(old_path)
request.session["uploaded_file_path"] = file_path
_clear_upload_session_state(request)
def _store_result_payload_for_user(
request: HttpRequest,
*,
scope: str,
session_key: str,
payload: dict,
) -> None:
old_token = request.session.get(session_key)
token = _store_cached_result(scope, request.user.id, payload)
_delete_cached_result(scope, request.user.id, old_token)
request.session[session_key] = token
def _get_download_payload(
request: HttpRequest,
*,
scope: str,
redirect_name: str,
missing_message: str,
) -> tuple[dict | None, HttpResponseRedirect | None]:
token = request.POST.get("token")
if not _is_valid_result_token(token):
messages.error(request, "Invalid download token.")
return None, redirect(redirect_name)
payload = _load_cached_result(scope, request.user.id, token)
if not payload:
messages.error(request, missing_message)
return None, redirect(redirect_name)
return payload, None
def _default_simulation_session_data() -> dict:
return dict(SIM_SESSION_DEFAULTS)
def _default_timeseries_session_data() -> dict:
return dict(TS_SESSION_DEFAULTS)
def _coerce_input_validation_error(error: Exception) -> InputValidationError | None:
"""Normalize known user-facing errors to InputValidationError."""
if isinstance(error, InputValidationError):
return error
if isinstance(error, PydanticValidationError | ValidationError):
return InputValidationError("validation")
if isinstance(error, FileNotFoundError):
return InputValidationError("missing_file")
if isinstance(error, UnicodeDecodeError):
return InputValidationError("encoding")
if isinstance(error, ValueError):
lowered = str(error).lower()
if "subcatchment" in lowered and "not found" in lowered:
return InputValidationError("subcatchment")
if "must be <=" in lowered:
return InputValidationError("range")
if "could not convert string to float" in lowered:
return InputValidationError("non_numeric")
if "expected numeric" in lowered:
return InputValidationError("non_numeric")
return None
def _format_input_error_message(error: InputValidationError) -> str:
"""Return safe, user-facing message without leaking internal details."""
code = str(error)
if code == "missing_file":
return "Input file is missing. Please upload the model again."
if code == "encoding":
return "Input file encoding is not supported."
if code == "subcatchment":
return "Selected catchment was not found in the uploaded model."
if code == "range":
return "Invalid range values. Ensure start is less than or equal to stop."
if code == "non_numeric":
return "Input file contains non-numeric values where numbers are required."
return "Input file error. Please validate your model and selected parameters."
[docs]
def main_view(request: HttpRequest) -> HttpResponse:
"""Landing page with static slope/area/width charts."""
context = {"chart_data": _load_static_chart_data()}
return render(request, "main/main_view.html", context)
[docs]
def about(request: HttpRequest) -> HttpResponse:
return render(request, "main/about.html")
def _build_user_profile_form(user, *, data: object | None = None) -> UserProfileForm:
form_kwargs = {}
if data is not None:
form_kwargs["data"] = data
try:
return UserProfileForm(instance=user.userprofile, **form_kwargs)
except AttributeError:
if data is None:
logger.debug("User %s has no profile, creating empty form", user.id)
else:
logger.debug("User %s has no profile, creating new form for POST", user.id)
return UserProfileForm(initial={"user": user, "bio": ""}, **form_kwargs)
def _set_profile_form_read_only(form: UserProfileForm) -> None:
for field in form.fields.values():
field.disabled = True
form.helper.inputs = []
[docs]
def user_profile(request: HttpRequest, user_id: int) -> HttpResponse:
"""Display user profile; owners can edit, others get read-only view."""
user = get_object_or_404(get_user_model(), id=user_id)
if request.method == "POST":
if request.user != user:
return HttpResponseForbidden("You are not allowed to edit this profile.")
form = _build_user_profile_form(user, data=request.POST)
if form.is_valid():
form.save()
return HttpResponseRedirect(reverse("userprofile", args=[user_id]))
else:
form = _build_user_profile_form(user)
if request.user != user:
_set_profile_form_read_only(form)
return render(request, "main/userprofile.html", {"form": form})
MAX_UPLOAD_SIZE = settings.INP_UPLOAD_MAX_BYTES
MAX_UPLOAD_BODY_SIZE = settings.INP_UPLOAD_MAX_BODY_BYTES
UPLOAD_VALIDATION_CHUNK_SIZE = 8192
UPLOAD_VALIDATION_MAX_LINE_BUFFER = 8192
MIN_SWMM_SECTION_MATCHES = 2
SWMM_SECTION_HEADERS = frozenset(
{"[TITLE]", "[OPTIONS]", "[RAINGAGES]", "[SUBCATCHMENTS]", "[SUBAREAS]"}
)
def _sanitize_filename(filename: str) -> str:
"""Sanitize filename to prevent path traversal attacks."""
# Remove path separators and other dangerous characters
sanitized = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "_", filename)
# Remove leading/trailing dots and spaces
sanitized = sanitized.strip(". ")
# Limit length
return sanitized[:100] if sanitized else "uploaded_file"
def _validate_inp_file_content(file_content: bytes) -> bool:
"""Check in-memory bytes for SWMM section headers. Handles BOM and mixed line endings."""
try:
if file_content.startswith(b"\xef\xbb\xbf"):
file_content = file_content[3:]
elif file_content.startswith(b"\xff\xfe") or file_content.startswith(b"\xfe\xff"):
file_content = file_content[2:]
try:
content_str = file_content.decode("utf-8")
except UnicodeDecodeError:
content_str = file_content.decode("latin-1")
content_upper = content_str.replace("\r\n", "\n").replace("\r", "\n").upper()
# Check for common SWMM section headers
swmm_sections = ["[TITLE]", "[OPTIONS]", "[RAINGAGES]", "[SUBCATCHMENTS]", "[SUBAREAS]"]
return any(section in content_upper for section in swmm_sections)
except Exception:
return False
def _detect_inp_encoding_prefix(prefix: bytes) -> tuple[str, int]:
"""Detect encoding from BOM prefix and return (encoding, BOM bytes to strip)."""
if prefix.startswith(b"\xef\xbb\xbf"):
return "utf-8", 3
if prefix.startswith(b"\xff\xfe"):
return "utf-16-le", 2
if prefix.startswith(b"\xfe\xff"):
return "utf-16-be", 2
return "utf-8", 0
def _validate_inp_file_stream(
uploaded_file: object, chunk_size: int = UPLOAD_VALIDATION_CHUNK_SIZE
) -> bool:
"""Validate SWMM INP headers without loading the entire file into memory."""
decoder = None
pending = ""
matched_sections: set[str] = set()
try:
for chunk in uploaded_file.chunks(chunk_size):
if decoder is None:
encoding, bom_size = _detect_inp_encoding_prefix(chunk[:4])
decoder = getincrementaldecoder(encoding)(errors="replace")
chunk = chunk[bom_size:]
pending += decoder.decode(chunk, final=False)
pending = pending.replace("\r\n", "\n").replace("\r", "\n")
lines = pending.split("\n")
pending = lines.pop()[-UPLOAD_VALIDATION_MAX_LINE_BUFFER:] if lines else pending
for line in lines:
normalized_line = line.strip().upper()
if normalized_line in SWMM_SECTION_HEADERS:
matched_sections.add(normalized_line)
if len(matched_sections) >= MIN_SWMM_SECTION_MATCHES:
return True
if decoder is None:
return False
pending += decoder.decode(b"", final=True)
pending = pending.replace("\r\n", "\n").replace("\r", "\n")
for line in pending.split("\n"):
normalized_line = line.strip().upper()
if normalized_line in SWMM_SECTION_HEADERS:
matched_sections.add(normalized_line)
if len(matched_sections) >= MIN_SWMM_SECTION_MATCHES:
return True
return False
except (OSError, UnicodeError, AttributeError):
logger.warning("Failed to validate uploaded INP file stream", exc_info=True)
return False
finally:
with suppress(Exception):
uploaded_file.seek(0)
[docs]
class BodySizeLimitUploadHandler(FileUploadHandler):
"""Abort multipart parsing when streamed request body exceeds configured limit."""
def __init__(self, request: HttpRequest, max_bytes: int):
super().__init__(request)
self.max_bytes = max_bytes
self.bytes_received = 0
[docs]
def receive_data_chunk(self, raw_data: bytes, start: int) -> bytes:
self.bytes_received += len(raw_data)
if self.bytes_received > self.max_bytes:
self.request._upload_body_too_large = True
raise StopUpload(connection_reset=True)
return raw_data
[docs]
def file_complete(self, file_size: int) -> None:
return None
[docs]
@require_POST
@ajax_login_required
def upload(request: HttpRequest) -> JsonResponse:
"""Accept a user-uploaded .inp file (via Dropzone.js) after size and content validation."""
raw_content_length = request.META.get("CONTENT_LENGTH")
if raw_content_length in ("",):
return JsonResponse({"error": "Invalid Content-Length header."}, status=400)
try:
content_length = int(raw_content_length or 0)
except (TypeError, ValueError):
return JsonResponse({"error": "Invalid Content-Length header."}, status=400)
if content_length < 0:
return JsonResponse({"error": "Invalid Content-Length header."}, status=400)
if content_length > MAX_UPLOAD_BODY_SIZE:
return JsonResponse({"error": _file_too_large_message()}, status=413)
request.upload_handlers = [
BodySizeLimitUploadHandler(request, MAX_UPLOAD_BODY_SIZE),
*request.upload_handlers,
]
try:
files = request.FILES
except MultiPartParserError:
return JsonResponse({"error": "Malformed multipart request."}, status=400)
if getattr(request, "_upload_body_too_large", False):
return JsonResponse({"error": _file_too_large_message()}, status=413)
if "file" not in files:
return JsonResponse({"error": "No file provided."}, status=400)
uploaded_file = files["file"]
filename, file_extension = os.path.splitext(uploaded_file.name)
# Check file extension
if file_extension.lower() != ".inp":
return JsonResponse(
{"error": "Invalid file type. Please upload a .inp file."},
status=400,
)
# Check file size
if uploaded_file.size > MAX_UPLOAD_SIZE:
return JsonResponse({"error": _file_too_large_message()}, status=413)
# Validate file content
if not _validate_inp_file_stream(uploaded_file):
logger.warning("Invalid .inp file content uploaded: %s", uploaded_file.name)
return JsonResponse(
{
"error": "Invalid file content. The file does not appear to be a valid SWMM .inp file."
},
status=400,
)
# Sanitize filename and scope to user
safe_filename = _sanitize_filename(filename)
user_dir = _user_upload_dir(request.user.id)
file_path = os.path.join(user_dir, safe_filename + file_extension)
# Ensure upload directory exists
os.makedirs(user_dir, exist_ok=True)
with open(file_path, "wb+") as destination:
for chunk in uploaded_file.chunks():
destination.write(chunk)
_set_uploaded_file_for_session(request, file_path)
logger.info("File uploaded successfully: %s", file_path)
return JsonResponse({"message": "File was sent."})
[docs]
@require_POST
@ajax_login_required
def upload_sample(request: HttpRequest) -> JsonResponse:
"""Load bundled sample INP file into the current user session."""
sample_source_path = os.path.join(settings.BASE_DIR, "data", "example.inp")
if not os.path.exists(sample_source_path):
logger.error("Sample INP file is missing: %s", sample_source_path)
return JsonResponse({"error": "Sample file is not available."}, status=500)
try:
with open(sample_source_path, "rb") as sample_file:
if not _validate_inp_file_content(sample_file.read()):
logger.error("Sample INP file failed validation: %s", sample_source_path)
return JsonResponse({"error": "Sample file is invalid."}, status=500)
except OSError:
logger.exception("Failed to read sample INP file at %s", sample_source_path)
return JsonResponse({"error": "Sample file is not available."}, status=500)
user_dir = _user_upload_dir(request.user.id)
os.makedirs(user_dir, exist_ok=True)
file_path = os.path.join(user_dir, "example.inp")
try:
shutil.copyfile(sample_source_path, file_path)
except OSError:
logger.exception("Failed to copy sample INP file for user %s", request.user.id)
return JsonResponse({"error": "Failed to load sample file."}, status=500)
_set_uploaded_file_for_session(request, file_path)
try:
sample_size = os.path.getsize(file_path)
except OSError:
sample_size = 0
return JsonResponse(
{
"message": "Sample data loaded.",
"filename": os.path.basename(file_path),
"size": sample_size,
}
)
[docs]
@require_GET
@ajax_login_required
def upload_status(request: HttpRequest) -> JsonResponse:
"""Return ``{has_file, filename, size}`` for the current session upload."""
file_path = request.session.get("uploaded_file_path")
if file_path:
try:
size = os.path.getsize(file_path)
return JsonResponse(
{
"has_file": True,
"filename": os.path.basename(file_path),
"size": size,
}
)
except OSError:
# File disappeared between session set and now – clean up
request.session.pop("uploaded_file_path", None)
return JsonResponse({"has_file": False})
[docs]
@ajax_login_required
def upload_clear(request: HttpRequest) -> JsonResponse:
"""Remove uploaded file from disk and clear related session state."""
if request.method != "POST":
return JsonResponse({"error": "Method not allowed."}, status=405)
file_path = request.session.pop("uploaded_file_path", None)
_safe_remove_file(file_path)
_clear_upload_session_state(request)
return JsonResponse({"message": "Upload cleared."})
def _get_subcatchment_ids(request: HttpRequest) -> list[str]:
"""Return subcatchment IDs from the uploaded INP, cached per session/file path."""
file_path = request.session.get("uploaded_file_path")
cached_file = request.session.get("_subcatchment_ids_file")
if file_path and file_path == cached_file:
cached_ids = request.session.get("_subcatchment_ids")
if cached_ids is not None:
return cached_ids
if not file_path or not os.path.exists(file_path):
request.session.pop("_subcatchment_ids", None)
request.session.pop("_subcatchment_ids_file", None)
return []
try:
model = swmmio.Model(file_path)
ids = list(model.inp.subcatchments.index)
except Exception:
logger.warning("Failed to read subcatchments from %s", file_path, exc_info=True)
ids = []
request.session["_subcatchment_ids"] = ids
request.session["_subcatchment_ids_file"] = file_path
return ids
def _get_catchment_choices(request: HttpRequest) -> list[tuple[str, str]]:
"""Build ``(value, label)`` choices for the catchment ``<select>`` widget."""
ids = _get_subcatchment_ids(request)
if ids:
return [("", "--- Select catchment ---")] + [(sid, sid) for sid in ids]
return [("", "--- Upload a file first ---")]
[docs]
@require_GET
@ajax_login_required
def subcatchments(request: HttpRequest) -> JsonResponse:
"""AJAX endpoint returning subcatchment IDs from the uploaded INP."""
return JsonResponse({"subcatchments": _get_subcatchment_ids(request)})
[docs]
def get_feature_name(method_name: str) -> str:
"""Map simulation method name (e.g. ``simulate_area``) to INP column name."""
feature_map = {
"simulate_percent_slope": "PercSlope",
"simulate_area": "Area",
"simulate_width": "Width",
"simulate_percent_impervious": "PercImperv",
"simulate_percent_zero_imperv": "Zero-Imperv",
"simulate_curb_length": "CurbLength",
"simulate_n_imperv": "N-Imperv",
"simulate_n_perv": "N-Perv",
"simulate_s_imperv": "Destore-Imperv",
"simulate_s_perv": "Destore-Perv",
}
return feature_map.get(method_name, "")
def _normalize_flow_units(flow_units: str | None) -> str:
"""Normalize FLOW_UNITS token for comparisons and labels."""
return str(flow_units or "").strip().upper()
def _read_flow_units(inp_path: str) -> str | None:
"""Read FLOW_UNITS from a SWMM input file."""
if not inp_path or not os.path.exists(inp_path):
return None
try:
options = swmmio.Model(inp_path).inp.options
except Exception:
logger.warning("Failed to read FLOW_UNITS from %s", inp_path, exc_info=True)
return None
flow_units: object | None = None
if isinstance(options, pd.DataFrame):
if "FLOW_UNITS" in options.index and not options.columns.empty:
value_column = "Value" if "Value" in options.columns else options.columns[0]
flow_units = options.loc["FLOW_UNITS", value_column]
elif isinstance(options, dict):
flow_units = options.get("FLOW_UNITS")
if isinstance(flow_units, pd.Series):
flow_units = flow_units.iloc[0] if not flow_units.empty else None
normalized = _normalize_flow_units(str(flow_units) if flow_units is not None else None)
return normalized or None
def _unit_system(flow_units: str | None) -> str:
"""Classify FLOW_UNITS into SI/US/UNKNOWN buckets."""
normalized = _normalize_flow_units(flow_units)
if normalized in SI_FLOW_UNITS:
return "SI"
if normalized in US_FLOW_UNITS:
return "US"
return "UNKNOWN"
def _unit_labels(flow_units: str | None) -> dict[str, str]:
"""Return display units for chart labels."""
normalized = _normalize_flow_units(flow_units)
system = _unit_system(normalized)
if system == "SI":
return {
"length": "m",
"area": "ha",
"storage": "mm",
"depth_rate": "mm/h",
"volume": "m3",
"flow_rate": normalized,
}
if system == "US":
return {
"length": "ft",
"area": "acre",
"storage": "in",
"depth_rate": "in/h",
"volume": "ft3",
"flow_rate": normalized,
}
return {
"length": "model length units",
"area": "model area units",
"storage": "model storage units",
"depth_rate": "model depth/time units",
"volume": "model volume units",
"flow_rate": normalized or "model flow units",
}
def _build_simulation_axis_labels(
feature_name: str, y_columns: list[str], flow_units: str | None
) -> tuple[str, dict[str, str]]:
"""Build X and Y labels for simulation charts."""
units = _unit_labels(flow_units)
x_templates = {
"PercSlope": "Percent Slope [%]",
"Area": "Area [{area}]",
"Width": "Width [{length}]",
"PercImperv": "Impervious Area [%]",
"Zero-Imperv": "Zero-Impervious Area [%]",
"CurbLength": "Curb Length [{length}]",
"N-Imperv": "Manning n (Impervious) [-]",
"N-Perv": "Manning n (Pervious) [-]",
"Destore-Imperv": "Depression Storage (Impervious) [{storage}]",
"Destore-Perv": "Depression Storage (Pervious) [{storage}]",
}
y_templates = {
"runoff": "Total Runoff Volume [{volume}]",
"peak_runoff_rate": "Peak Runoff Rate [{flow_rate}]",
"infiltration": "Total Infiltration Volume [{volume}]",
"evaporation": "Total Evaporation Volume [{volume}]",
}
x_template = x_templates.get(feature_name)
x_label = x_template.format(**units) if x_template else feature_name or "Parameter"
y_labels = {}
for column in y_columns:
y_template = y_templates.get(column)
y_labels[column] = y_template.format(**units) if y_template else column
return x_label, y_labels
def _build_timeseries_axis_labels(
columns: list[str], flow_units: str | None
) -> tuple[str, dict[str, str]]:
"""Build X and Y labels for timeseries charts."""
units = _unit_labels(flow_units)
y_templates = {
"rainfall": "Rainfall Intensity [{depth_rate}]",
"runoff": "Runoff Rate [{flow_rate}]",
"infiltration_loss": "Infiltration Loss [{depth_rate}]",
"evaporation_loss": "Evaporation Loss [{depth_rate}]",
"runon": "Runon Rate [{flow_rate}]",
}
y_labels = {}
for column in columns:
y_template = y_templates.get(column)
y_labels[column] = y_template.format(**units) if y_template else column
return "Time", y_labels
def _excel_attachment_response(
output_file_name: str, sheets: dict[str, pd.DataFrame]
) -> HttpResponse:
"""Return an in-memory Excel attachment from sheet->DataFrame mapping."""
buffer = BytesIO()
used_names: set[str] = set()
with pd.ExcelWriter(buffer, engine="openpyxl") as writer:
for sheet_name, df in sheets.items():
normalized_sheet_name = _normalize_sheet_name(sheet_name, used_names)
df.to_excel(writer, sheet_name=normalized_sheet_name, index=False)
buffer.seek(0)
safe_name = _safe_download_filename(output_file_name, "results.xlsx")
return FileResponse(
buffer,
as_attachment=True,
filename=safe_name,
content_type=EXCEL_CONTENT_TYPE,
)
[docs]
def get_session_variables(request: HttpRequest) -> dict:
"""Load simulation result context from cache for template rendering."""
user = getattr(request, "user", None)
user_id = user.id if getattr(user, "is_authenticated", False) else None
if user_id is None:
return _default_simulation_session_data()
token = request.session.get(SIM_RESULT_TOKEN_SESSION_KEY)
payload = _load_cached_result("sim", user_id, token)
if not payload:
if token:
request.session.pop(SIM_RESULT_TOKEN_SESSION_KEY, None)
return _default_simulation_session_data()
return {
"show_download_button": True,
"chart_config": payload.get("chart_config"),
"results_columns": payload.get("results_columns", []),
"results_data": payload.get("results_data", []),
"feature_name": payload.get("feature_name", ""),
"output_file_name": payload.get("output_file_name"),
"download_token": token,
}
[docs]
def clear_session_variables(request: HttpRequest) -> None:
"""Purge all cached results and form state from the session."""
user = getattr(request, "user", None)
user_id = user.id if getattr(user, "is_authenticated", False) else None
if user_id is not None:
_delete_cached_result("sim", user_id, request.session.get(SIM_RESULT_TOKEN_SESSION_KEY))
_delete_cached_result("ts", user_id, request.session.get(TS_RESULT_TOKEN_SESSION_KEY))
for variable in SESSION_VARIABLES_TO_CLEAR:
if variable in request.session:
del request.session[variable]
def _save_form_state(
request: HttpRequest,
session_key: str,
cleaned_data: dict,
fields: tuple[str, ...],
) -> None:
"""Persist non-None form values in session so the form is pre-filled on next GET."""
state = {}
for field_name in fields:
value = cleaned_data.get(field_name)
if value is not None:
state[field_name] = value
request.session[session_key] = state
def _get_form_initial(
request: HttpRequest,
session_key: str,
catchment_choices: list[tuple[str, str]],
form_class: type[forms.Form],
fields: tuple[str, ...],
) -> dict:
"""Restore form values from session, validating against current choices and field types."""
state = request.session.get(session_key)
if not isinstance(state, dict):
return {}
initial = {}
for field_name in fields:
if field_name not in state:
continue
field = form_class.base_fields.get(field_name)
if field is None:
continue
try:
coerced_value = field.to_python(state[field_name])
except (ValidationError, TypeError, ValueError):
continue
if coerced_value is None:
continue
if isinstance(field, forms.ChoiceField) and field_name != "catchment_name":
if not field.valid_value(coerced_value):
continue
initial[field_name] = coerced_value
valid_catchments = {value for value, _ in catchment_choices if value}
catchment_name = initial.get("catchment_name")
if catchment_name and catchment_name not in valid_catchments:
initial["catchment_name"] = ""
if initial != state:
request.session[session_key] = initial
return initial
def _run_simulation_dataframe(
*,
params: SimulationMethodParams,
uploaded_file_path: str,
is_predefined: bool,
) -> tuple[pd.DataFrame, str, list[str]]:
with FeaturesSimulation(
subcatchment_id=params.catchment_name, raw_file=uploaded_file_path
) as model:
feature_name = get_feature_name(params.method_name)
method = getattr(model, params.method_name)
if is_predefined:
df = method()
else:
df = method(start=params.start, stop=params.stop, step=params.step)
other_columns = [column for column in df.columns if column != feature_name]
return df[[feature_name] + other_columns], feature_name, other_columns
def _build_simulation_payload(
*,
dataframe: pd.DataFrame,
feature_name: str,
y_columns: list[str],
flow_units: str | None,
username: str,
) -> dict:
x_label, y_labels = _build_simulation_axis_labels(
feature_name=feature_name,
y_columns=y_columns,
flow_units=flow_units,
)
output_file_name = f"{username}_simulation_result_{_timestamp_suffix()}.xlsx"
chart_config = {
"data": json.loads(dataframe.to_json(orient="records")),
"x": feature_name,
"y": y_columns,
"title": f"Dependence of runoff on subcatchment {feature_name}.",
"xLabel": x_label,
"yLabels": y_labels,
}
return {
"chart_config": chart_config,
"results_columns": dataframe.columns.tolist(),
"results_data": dataframe.values.tolist(),
"feature_name": feature_name,
"output_file_name": output_file_name,
}
def _serialize_timeseries_dataframe(ts_df: pd.DataFrame) -> list[dict]:
ts_df_reset = ts_df.reset_index()
ts_df_reset["datetime"] = ts_df_reset["datetime"].astype(str)
return json.loads(ts_df_reset.to_json(orient="records"))
def _timeseries_metrics(ts_df: pd.DataFrame) -> tuple[str, str]:
try:
ttp_value = str(time_to_peak(ts_df, column="runoff"))
except ValueError:
ttp_value = "N/A"
try:
runoff_value = f"{runoff_volume(ts_df, column='runoff'):.4f}"
except ValueError:
runoff_value = "N/A"
return ttp_value, runoff_value
def _build_single_timeseries_payload(
*,
ts_df: pd.DataFrame,
catchment_name: str,
flow_units: str | None,
username: str,
) -> dict:
ts_columns = list(FeaturesSimulation.TIMESERIES_KEYS)
data = _serialize_timeseries_dataframe(ts_df)
ttp_str, runoff_volume_str = _timeseries_metrics(ts_df)
x_label, y_labels = _build_timeseries_axis_labels(ts_columns, flow_units)
output_file_name = f"{username}_timeseries_{_timestamp_suffix()}.xlsx"
chart_config = {
"mode": "single",
"data": data,
"columns": ts_columns,
"title": f"Timeseries for subcatchment {catchment_name}",
"xLabel": x_label,
"yLabels": y_labels,
}
return {
"mode": "single",
"data": data,
"output_file_name": output_file_name,
"chart_config": chart_config,
"ts_time_to_peak": ttp_str,
"ts_runoff_volume": runoff_volume_str,
"ts_show_results": True,
}
def _build_sweep_timeseries_payload(
*,
sweep_results: dict,
feature: str,
catchment_name: str,
flow_units: str | None,
username: str,
) -> dict:
ts_columns = list(FeaturesSimulation.TIMESERIES_KEYS)
sweep_data = {
str(parameter_value): _serialize_timeseries_dataframe(ts_df)
for parameter_value, ts_df in sweep_results.items()
}
x_label, y_labels = _build_timeseries_axis_labels(ts_columns, flow_units)
output_file_name = f"{username}_ts_sweep_{_timestamp_suffix()}.xlsx"
chart_config = {
"mode": "sweep",
"data": sweep_data,
"columns": ts_columns,
"title": f"Timeseries sweep: {feature} for {catchment_name}",
"feature": feature,
"catchment": catchment_name,
"xLabel": x_label,
"yLabels": y_labels,
}
return {
"mode": "sweep",
"data": sweep_data,
"output_file_name": output_file_name,
"chart_config": chart_config,
"ts_show_results": True,
"ts_time_to_peak": None,
"ts_runoff_volume": None,
}
def _get_timeseries_session_data(request: HttpRequest) -> dict:
token = request.session.get(TS_RESULT_TOKEN_SESSION_KEY)
payload = _load_cached_result("ts", request.user.id, token)
if not payload:
if token:
request.session.pop(TS_RESULT_TOKEN_SESSION_KEY, None)
return _default_timeseries_session_data()
return {
"ts_chart_config": payload.get("chart_config"),
"ts_time_to_peak": payload.get("ts_time_to_peak"),
"ts_runoff_volume": payload.get("ts_runoff_volume"),
"ts_show_results": payload.get("ts_show_results", False),
"output_file_name": payload.get("output_file_name"),
"download_token": token,
}
[docs]
@login_required
def simulation_view(request: HttpRequest) -> HttpResponse:
"""Run a subcatchment feature simulation and display results with charts."""
session_data = {}
if request.method == "POST":
catchment_choices = _get_catchment_choices(request)
form = SimulationForm(request.POST, catchment_choices=catchment_choices)
if form.is_valid():
option = form.cleaned_data["option"]
is_predefined = option in SimulationForm.PREDEFINED_METHODS
uploaded_file_path = _default_uploaded_file_path(request)
try:
params = SimulationMethodParams(
method_name=option,
start=form.cleaned_data.get("start") if not is_predefined else None,
stop=form.cleaned_data.get("stop") if not is_predefined else None,
step=form.cleaned_data.get("step") if not is_predefined else None,
catchment_name=form.cleaned_data["catchment_name"],
)
dataframe, feature_name, y_columns = _run_simulation_dataframe(
params=params,
uploaded_file_path=uploaded_file_path,
is_predefined=is_predefined,
)
payload = _build_simulation_payload(
dataframe=dataframe,
feature_name=feature_name,
y_columns=y_columns,
flow_units=_read_flow_units(uploaded_file_path),
username=request.user.username,
)
_store_result_payload_for_user(
request,
scope="sim",
session_key=SIM_RESULT_TOKEN_SESSION_KEY,
payload=payload,
)
_save_form_state(
request, SIM_FORM_STATE_SESSION_KEY, form.cleaned_data, SIM_FORM_STATE_FIELDS
)
return redirect("main:simulation")
except ResultPayloadTooLargeError:
messages.error(
request,
"Result set is too large to keep for download. Narrow the simulation range.",
)
except Exception as error:
input_error = _coerce_input_validation_error(error)
if input_error:
logger.warning("Simulation input validation failed", exc_info=True)
messages.error(request, _format_input_error_message(input_error))
return render(
request,
"main/simulation.html",
{"form": form, **get_session_variables(request)},
)
logger.exception("Simulation failed")
messages.error(request, "An error occurred while running the simulation.")
else:
catchment_choices = _get_catchment_choices(request)
initial = _get_form_initial(
request,
SIM_FORM_STATE_SESSION_KEY,
catchment_choices,
SimulationForm,
SIM_FORM_STATE_FIELDS,
)
form = SimulationForm(catchment_choices=catchment_choices, initial=initial)
session_data = get_session_variables(request)
return render(
request,
"main/simulation.html",
{"form": form, **session_data},
)
[docs]
@login_required
@require_POST
def download_simulation_results(request: HttpRequest) -> HttpResponse:
"""Download simulation results as an Excel file generated in memory."""
payload, redirect_response = _get_download_payload(
request,
scope="sim",
redirect_name="main:simulation",
missing_message="No simulation results available to download.",
)
if redirect_response:
return redirect_response
try:
df = pd.DataFrame(payload["results_data"], columns=payload["results_columns"])
return _excel_attachment_response(payload.get("output_file_name"), {"results": df})
except Exception:
logger.exception("Failed to generate simulation download")
messages.error(request, "Failed to generate the simulation output file.")
return redirect("main:simulation")
[docs]
@login_required
@require_POST
def download_timeseries_results(request: HttpRequest) -> HttpResponse:
"""Download timeseries analysis results as an in-memory Excel file."""
payload, redirect_response = _get_download_payload(
request,
scope="ts",
redirect_name="main:timeseries",
missing_message="No timeseries results available to download.",
)
if redirect_response:
return redirect_response
mode = payload.get("mode")
data = payload.get("data")
try:
if mode == "single" and isinstance(data, list):
df = pd.DataFrame(data)
return _excel_attachment_response(payload.get("output_file_name"), {"timeseries": df})
if mode == "sweep" and isinstance(data, dict):
sheets = {}
for param, rows in data.items():
sheet_name = f"val_{param}"
sheets[sheet_name] = pd.DataFrame(rows)
return _excel_attachment_response(payload.get("output_file_name"), sheets)
except Exception:
logger.exception("Failed to generate timeseries download")
messages.error(request, "Failed to generate the timeseries output file.")
return redirect("main:timeseries")
def _timeseries_payload_to_csv_df(payload: dict) -> pd.DataFrame:
"""Convert cached timeseries payload to a single CSV-friendly DataFrame."""
mode = payload.get("mode")
data = payload.get("data")
if mode == "single" and isinstance(data, list):
return pd.DataFrame(data)
if mode == "sweep" and isinstance(data, dict):
frames: list[pd.DataFrame] = []
for parameter_value, rows in data.items():
frame = pd.DataFrame(rows)
frame.insert(0, "parameter_value", parameter_value)
frames.append(frame)
if frames:
return pd.concat(frames, ignore_index=True)
return pd.DataFrame(columns=["parameter_value"])
raise ValueError("Invalid timeseries payload for CSV export.")
[docs]
@login_required
@require_POST
def download_timeseries_csv(request: HttpRequest) -> HttpResponse:
"""Download timeseries analysis results as CSV."""
payload, redirect_response = _get_download_payload(
request,
scope="ts",
redirect_name="main:timeseries",
missing_message="No timeseries results available to download.",
)
if redirect_response:
return redirect_response
try:
df = _timeseries_payload_to_csv_df(payload)
buffer = StringIO()
df.to_csv(buffer, index=False)
output_file_name = _safe_download_filename(
payload.get("output_file_name"),
"timeseries_results.csv",
extension=".csv",
)
response = HttpResponse(buffer.getvalue(), content_type="text/csv; charset=utf-8")
response["Content-Disposition"] = f'attachment; filename="{output_file_name}"'
return response
except Exception:
logger.exception("Failed to generate timeseries CSV download")
messages.error(request, "Failed to generate the timeseries CSV file.")
return redirect("main:timeseries")
[docs]
@login_required
def timeseries_view(request: HttpRequest) -> HttpResponse:
"""Timeseries analysis: single run with metrics or parameter sweep with overlaid hydrographs."""
session_data = {}
if request.method == "POST":
catchment_choices = _get_catchment_choices(request)
form = TimeseriesForm(request.POST, catchment_choices=catchment_choices)
if form.is_valid():
mode = form.cleaned_data["mode"]
catchment_name = form.cleaned_data["catchment_name"]
uploaded_file_path = _default_uploaded_file_path(request)
flow_units = _read_flow_units(uploaded_file_path)
try:
payload = None
with FeaturesSimulation(
subcatchment_id=catchment_name, raw_file=uploaded_file_path
) as model:
if mode == "single":
payload = _build_single_timeseries_payload(
ts_df=model.calculate_timeseries(),
catchment_name=catchment_name,
flow_units=flow_units,
username=request.user.username,
)
elif mode == "sweep":
payload = _build_sweep_timeseries_payload(
sweep_results=model.simulate_subcatchment_timeseries(
feature=form.cleaned_data["feature"],
start=form.cleaned_data["start"],
stop=form.cleaned_data["stop"],
step=form.cleaned_data["step"],
),
feature=form.cleaned_data["feature"],
catchment_name=catchment_name,
flow_units=flow_units,
username=request.user.username,
)
if payload is not None:
_store_result_payload_for_user(
request,
scope="ts",
session_key=TS_RESULT_TOKEN_SESSION_KEY,
payload=payload,
)
_save_form_state(
request,
TS_FORM_STATE_SESSION_KEY,
form.cleaned_data,
TS_FORM_STATE_FIELDS,
)
return redirect("main:timeseries")
except ResultPayloadTooLargeError:
messages.error(
request,
"Result set is too large to keep for download. Narrow the timeseries range.",
)
except Exception as error:
input_error = _coerce_input_validation_error(error)
if input_error:
logger.warning("Timeseries input validation failed", exc_info=True)
messages.error(request, _format_input_error_message(input_error))
return render(
request,
"main/timeseries.html",
{"form": form, "ts_show_results": False},
)
logger.exception("Timeseries analysis failed")
messages.error(request, "An error occurred while running the analysis.")
else:
catchment_choices = _get_catchment_choices(request)
initial = _get_form_initial(
request,
TS_FORM_STATE_SESSION_KEY,
catchment_choices,
TimeseriesForm,
TS_FORM_STATE_FIELDS,
)
form = TimeseriesForm(catchment_choices=catchment_choices, initial=initial)
session_data = _get_timeseries_session_data(request)
return render(request, "main/timeseries.html", {"form": form, **session_data})
def _cleanup_swmm_side_files(inp_path: str) -> None:
"""Remove .rpt and .out files generated by SWMM alongside a .inp file."""
base = os.path.splitext(inp_path)[0]
for ext in FeaturesSimulation.SWMM_SIDE_EXTENSIONS:
if ext == ".inp":
continue
try:
os.remove(base + ext)
except OSError:
pass
[docs]
def calculations(request: HttpRequest) -> HttpResponse:
"""Run SWMM + ANN prediction and compare runoff volumes per subcatchment."""
df = None
if request.method == "POST":
uploaded_file_path = request.session.get("uploaded_file_path", None)
if not uploaded_file_path:
messages.error(request, "Please upload a file first.")
else:
user_dir = os.path.realpath(_user_upload_dir(request.user.id))
abs_uploaded_file = os.path.realpath(uploaded_file_path)
try:
common_path = os.path.commonpath([abs_uploaded_file, user_dir])
except ValueError:
common_path = None
if common_path != user_dir:
logger.warning(
"File path traversal attempt or cross-user access: %s", uploaded_file_path
)
messages.error(request, "Invalid file path detected.")
else:
try:
with Simulation(uploaded_file_path) as sim:
for _ in sim:
pass
# Build the model after SWMM run so report-derived columns are available.
swmmio_model = swmmio.Model(uploaded_file_path)
ann_predictions = predict_runoff(swmmio_model).transpose()
df = pd.DataFrame(
data={
"Name": swmmio_model.subcatchments.dataframe.index,
"SWMM_Runoff_m3": swmmio_model.subcatchments.dataframe[
"TotalRunoffMG"
].values,
"ANN_Runoff_m3": np.round(ann_predictions, 2),
},
)
except Exception:
logger.exception("Error while performing calculations.")
messages.error(
request,
"An error occurred while performing calculations.",
)
finally:
_cleanup_swmm_side_files(uploaded_file_path)
df_is_empty = df is None or df.empty
return render(request, "main/calculations.html", {"df": df, "df_is_empty": df_is_empty})