Source code for ecosound.environment.erddap

# -*- coding: utf-8 -*-
#!/usr/bin/env python3
"""
ERDDAP Data Fetcher
Class-based interface for retrieving data from ERDDAP servers.

"""

from __future__ import annotations
from typing import Tuple, Iterable, List, Optional, Dict
import xarray as xr
import pandas as pd
import numpy as np
import os
import tempfile
from urllib.request import urlretrieve
import xml.etree.ElementTree as ET


[docs] class ERDDAPDataFetcher: """ Class for fetching data from ERDDAP servers. Handles data retrieval, dateline crossing, quality masking, and spatial/temporal striding. """
[docs] def __init__(self, server: str, dataset_id: Optional[str] = None): """ Initialize the ERDDAP data fetcher. Args: server: ERDDAP server URL (e.g., "https://comet.nefsc.noaa.gov/erddap") dataset_id: Dataset ID (optional, can be set later for fetch operations) """ self.server = server.rstrip('/') # Remove trailing slash if present self.dataset_id = dataset_id
[docs] def list_datasets(self, search_text: Optional[str] = None) -> pd.DataFrame: """ List all datasets available on the ERDDAP server. Args: search_text: Optional text to filter datasets (searches in title and summary) Returns: DataFrame with columns: dataset_id, title, summary, institution """ # Use ERDDAP's allDatasets endpoint with CSV response url = f"{self.server}/info/index.csv" try: # Try using requests if available try: import requests response = requests.get(url, timeout=30) response.raise_for_status() from io import StringIO df = pd.read_csv(StringIO(response.text)) except ImportError: # Fallback to pandas read_csv directly df = pd.read_csv(url) # Filter columns of interest columns_to_keep = ['Dataset ID', 'Title', 'Summary', 'Institution'] available_columns = [col for col in columns_to_keep if col in df.columns] df = df[available_columns] # Rename for consistency df = df.rename(columns={ 'Dataset ID': 'dataset_id', 'Title': 'title', 'Summary': 'summary', 'Institution': 'institution' }) # Filter by search text if provided if search_text: mask = ( df['title'].str.contains(search_text, case=False, na=False) | df['summary'].str.contains(search_text, case=False, na=False) ) df = df[mask] return df except Exception as e: print(f"Error fetching dataset list: {e}") print(f"Attempted URL: {url}") return pd.DataFrame()
[docs] def get_dataset_info(self, dataset_id: Optional[str] = None) -> Dict: """ Get detailed information about a specific dataset. Args: dataset_id: Dataset ID (uses instance dataset_id if not provided) Returns: Dictionary with dataset metadata """ ds_id = dataset_id or self.dataset_id if not ds_id: raise ValueError("No dataset_id specified") url = f"{self.server}/info/{ds_id}/index.csv" try: df = pd.read_csv(url) # Convert to dictionary info = {} for _, row in df.iterrows(): if 'Variable Name' in df.columns and 'Attribute Name' in df.columns: var_name = row.get('Variable Name', '') attr_name = row.get('Attribute Name', '') value = row.get('Value', '') if var_name not in info: info[var_name] = {} info[var_name][attr_name] = value return info except Exception as e: print(f"Error fetching dataset info: {e}") return {}
[docs] def list_variables(self, dataset_id: Optional[str] = None) -> List[str]: """ List all variables available in a dataset. Args: dataset_id: Dataset ID (uses instance dataset_id if not provided) Returns: List of variable names """ info = self.get_dataset_info(dataset_id) # Filter out coordinate variables and metadata variables = [ var for var in info.keys() if var and var not in ['NC_GLOBAL', '', 'time', 'latitude', 'longitude', 'altitude', 'depth'] ] return variables
@staticmethod def _wrap_to_180(lon: float) -> float: """Wrap longitude to [-180, 180] range.""" x = ((lon + 180) % 360) - 180 return 180.0 if abs(x - (-180.0)) < 1e-9 else x @staticmethod def _iso_noon(d) -> str: """Convert date to ISO format at noon UTC.""" return pd.to_datetime(d).strftime("%Y-%m-%dT12:00:00Z") def _constraint_index_range( self, t0_iso: str, t1_iso: str, lat_min: float, lat_max: float, lon_min: float, lon_max: float, time_stride: int = 1, spatial_stride: int = 1 ) -> str: """ Build ERDDAP value-style constraint indices for [time][lat][lon]. """ return ( f"[({t0_iso}):{time_stride}:({t1_iso})]" f"[({lat_min}):{spatial_stride}:({lat_max})]" f"[({lon_min}):{spatial_stride}:({lon_max})]" ) def _build_urls( self, variables: Iterable[str], t0_iso: str, t1_iso: str, lat_bounds: Tuple[float, float], lon_bounds: Tuple[float, float], time_stride: int = 1, spatial_stride: int = 1, dataset_id: Optional[str] = None ) -> List[str]: """ Create one or two ERDDAP .nc URLs. If the box crosses the dateline, split into [lon_min, 180] + [-180, lon_max]. """ ds_id = dataset_id or self.dataset_id if not ds_id: raise ValueError("No dataset_id specified") lat1, lat2 = sorted(lat_bounds) lo1, lo2 = self._wrap_to_180(lon_bounds[0]), self._wrap_to_180(lon_bounds[1]) base = f"{self.server}/griddap/{ds_id}.nc?" def url_for(lmin, lmax): idx = self._constraint_index_range( t0_iso, t1_iso, lat1, lat2, lmin, lmax, time_stride=time_stride, spatial_stride=spatial_stride ) vars_clause = ",".join(f"{v}{idx}" for v in variables) return base + vars_clause if lo1 <= lo2: return [url_for(lo1, lo2)] else: # Dateline crossing return [url_for(lo1, 180.0), url_for(-180.0, lo2)] @staticmethod def _open_erddap_nc(url: str) -> xr.Dataset: """Download ERDDAP .nc subset to a temp file, then open with xarray.""" tmp_path = None try: try: import requests r = requests.get(url, stream=True, timeout=180) r.raise_for_status() fd, tmp_path = tempfile.mkstemp(suffix=".nc") os.close(fd) with open(tmp_path, "wb") as f: for chunk in r.iter_content(1024 * 1024): if chunk: f.write(chunk) except (ImportError, Exception): # Fallback to stdlib if requests not available tmp_path, _ = urlretrieve(url) ds = xr.open_dataset(tmp_path) return ds.load() # Load so we can safely delete the temp file finally: if tmp_path and os.path.exists(tmp_path): try: os.remove(tmp_path) except Exception: pass
[docs] def fetch_data( self, variables: str | List[str], *, date: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, lat_min: float, lat_max: float, lon_min: float, lon_max: float, dataset_id: Optional[str] = None, include_quality: bool = False, quality_variable: str = "quality_level", quality_mask_value: Optional[int] = None, spatial_stride: int = 1, time_stride: int = 1, max_request_duration_days: int = 31 ) -> xr.Dataset: """ Fetch gridded data from ERDDAP for a single day or date range. Automatically splits large time ranges into chunks to avoid overwhelming the server. Args: variables: Variable name(s) to fetch (string or list of strings) date: Single date (YYYY-MM-DD) - mutually exclusive with start_date/end_date start_date: Start date (YYYY-MM-DD) for range end_date: End date (YYYY-MM-DD) for range lat_min: Minimum latitude lat_max: Maximum latitude lon_min: Minimum longitude lon_max: Maximum longitude dataset_id: Dataset ID (uses instance dataset_id if not provided) include_quality: Whether to include quality variable quality_variable: Name of quality variable (default: "quality_level") quality_mask_value: Mask data by quality value (e.g., 5 for best quality) spatial_stride: Spatial stride for thinning data (1=all, 2=every other point, etc.) time_stride: Temporal stride for thinning data (1=all, 2=every other day, etc.) max_request_duration_days: Maximum time range per request (default: 31 days/~1 month) Large ranges are split into multiple requests automatically. Returns: xarray.Dataset with requested data (seamlessly combined from multiple requests if needed) Examples: Single day:: ds = fetcher.fetch_data( "sea_surface_temperature", date="2018-01-01", lat_min=42, lat_max=43, lon_min=-71, lon_max=-69) Large date range (automatically chunked into monthly requests):: ds = fetcher.fetch_data( ["sst", "chlorophyll"], start_date="2018-01-01", end_date="2018-06-30", lat_min=42, lat_max=43, lon_min=-71, lon_max=-69, max_request_duration_days=31) """ # Normalize time inputs if date is not None: t0 = t1 = pd.to_datetime(date) else: if start_date is None or end_date is None: raise ValueError("Provide either `date` OR both `start_date` and `end_date`.") t0, t1 = sorted([pd.to_datetime(start_date), pd.to_datetime(end_date)]) # Normalize variables to list if isinstance(variables, str): variables = [variables] vars_to_request = list(variables) if include_quality or quality_mask_value is not None: if quality_variable not in vars_to_request: vars_to_request.append(quality_variable) # Check if we need to split the request into chunks time_range_days = (t1 - t0).days + 1 # +1 to include both endpoints if time_range_days <= max_request_duration_days: # Single request - no chunking needed print(f"Fetching {time_range_days} days of data in 1 request...") ds = self._fetch_single_timerange( vars_to_request, t0, t1, lat_min, lat_max, lon_min, lon_max, time_stride, spatial_stride, dataset_id ) else: # Split into chunks num_chunks = int(np.ceil(time_range_days / max_request_duration_days)) print(f"Fetching {time_range_days} days of data in {num_chunks} requests (max {max_request_duration_days} days each)...") # Create time chunks time_chunks = [] current_start = t0 while current_start <= t1: current_end = min(current_start + pd.Timedelta(days=max_request_duration_days - 1), t1) time_chunks.append((current_start, current_end)) current_start = current_end + pd.Timedelta(days=1) # Fetch each chunk chunk_datasets = [] for i, (chunk_start, chunk_end) in enumerate(time_chunks, 1): chunk_days = (chunk_end - chunk_start).days + 1 print(f" Request {i}/{num_chunks}: {chunk_start.date()} to {chunk_end.date()} ({chunk_days} days)") chunk_ds = self._fetch_single_timerange( vars_to_request, chunk_start, chunk_end, lat_min, lat_max, lon_min, lon_max, time_stride, spatial_stride, dataset_id ) chunk_datasets.append(chunk_ds) # Concatenate all chunks along time dimension print(f" Combining {len(chunk_datasets)} chunks...") ds = xr.concat(chunk_datasets, dim="time") # Optional: mask by quality level if quality_variable in ds and quality_mask_value is not None: for var in variables: if var in ds: ds[var] = ds[var].where(ds[quality_variable] == quality_mask_value) # Sort coordinates (helps with plotting and time operations) for coord in ("time", "latitude", "longitude"): if coord in ds.coords: ds = ds.sortby(coord) print(f"Fetched data with shape: {ds.dims}") return ds
def _fetch_single_timerange( self, variables: List[str], t0: pd.Timestamp, t1: pd.Timestamp, lat_min: float, lat_max: float, lon_min: float, lon_max: float, time_stride: int, spatial_stride: int, dataset_id: Optional[str] ) -> xr.Dataset: """ Internal method to fetch data for a single time range. Handles dateline crossing by splitting into multiple spatial requests if needed. """ t0_iso = self._iso_noon(t0) t1_iso = self._iso_noon(t1) # Build URLs (may split if crossing dateline) urls = self._build_urls( variables, t0_iso, t1_iso, (lat_min, lat_max), (lon_min, lon_max), time_stride=time_stride, spatial_stride=spatial_stride, dataset_id=dataset_id ) # Download and open (stitch if crossing dateline) parts = [self._open_erddap_nc(u) for u in urls] ds = xr.concat(parts, dim="longitude") if len(parts) > 1 else parts[0] return ds
# Example usage if __name__ == "__main__": # ERDDAP server and datasets ERDDAP_SERVER = "https://comet.nefsc.noaa.gov/erddap" ACSPO_REANALYSIS = "noaa_coastwatch_acspo_v2_reanalysis" ACSPO_NRT = "noaa_coastwatch_acspo_v2_nrt" # Initialize fetcher fetcher = ERDDAPDataFetcher(server=ERDDAP_SERVER, dataset_id=ACSPO_REANALYSIS) # Example 1: List all datasets on the server print("Example 1: List all datasets") print("=" * 80) datasets = fetcher.list_datasets() print(f"Found {len(datasets)} datasets") print(datasets.head(10)) print() # Example 2: Search for specific datasets print("Example 2: Search for SST datasets") print("=" * 80) sst_datasets = fetcher.list_datasets(search_text="temperature") print(f"Found {len(sst_datasets)} datasets containing 'temperature'") print(sst_datasets[['dataset_id', 'title']].head()) print() # Example 3: Get dataset info print("Example 3: Get dataset information") print("=" * 80) info = fetcher.get_dataset_info() print(f"Dataset: {fetcher.dataset_id}") if 'NC_GLOBAL' in info: print("Global attributes:") for key, value in list(info['NC_GLOBAL'].items())[:5]: print(f" {key}: {value}") print() # Example 4: List variables print("Example 4: List available variables") print("=" * 80) variables = fetcher.list_variables() print(f"Available variables: {variables}") print() # Example 5: Fetch SST data print("Example 5: Fetch SST data") print("=" * 80) ds = fetcher.fetch_data( "sea_surface_temperature", start_date="2018-01-01", end_date="2018-01-03", lat_min=42.0, lat_max=43.0, lon_min=-71.0, lon_max=-69.0, include_quality=True, quality_mask_value=5, # Best quality only spatial_stride=1, time_stride=1 ) print(ds) print() # Example 6: Fetch multiple variables print("Example 6: Fetch with different dataset") print("=" * 80) fetcher2 = ERDDAPDataFetcher(server=ERDDAP_SERVER, dataset_id=ACSPO_REANALYSIS) ds2 = fetcher2.fetch_data( "sea_surface_temperature", date="2018-01-01", lat_min=42.0, lat_max=43.0, lon_min=-71.0, lon_max=-69.0 ) print(ds2)