Source code for snowexsql.lambda_client

"""
SnowEx Lambda API Client

Lightweight client for accessing SnowEx database via AWS Lambda
function. Provides serverless access to snow data without requiring
AWS credentials or heavy geospatial dependencies.

Uses Lambda Function URL for public HTTPS access.
"""

import json
import os
import pandas as pd
import requests
from typing import Dict, Any, Optional
from datetime import datetime, date
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry


[docs] class SnowExLambdaClient: """ Client for accessing SnowEx data via AWS Lambda Function URL This client provides serverless access to the SnowEx database through a public Lambda Function URL, eliminating the need for AWS credentials, database connections, or heavy geospatial dependencies. The Lambda function handles all database credentials securely via AWS Secrets Manager. The client mirrors the api.py class structure, providing access to: - PointMeasurements: Point data measurements - LayerMeasurements: Layer/profile data measurements - RasterMeasurements: Raster/image data - System functions: DOI queries, connection testing Example: >>> client = SnowExLambdaClient() >>> client.test_connection() {'connected': True, 'version': 'PostgreSQL 17.6...'} >>> # Use class-based approach (mirrors api.py) >>> data = client.layer_measurements.from_filter( ... instrument='reflectance', limit=10 ... ) >>> instruments = client.point_measurements.all_instruments """ # Default production Lambda Function URL DEFAULT_FUNCTION_URL = ( 'https://izwsawyfkxss5vawq5v64mruqy0ahxek' '.lambda-url.us-west-2.on.aws' ) # Request timeout in seconds REQUEST_TIMEOUT_SECONDS = 30 def __init__(self, function_url: Optional[str] = None): """ Initialize the Lambda client with Function URL. No AWS credentials required - uses public HTTP endpoint. The Lambda Function URL can be set in three ways (in order of precedence): 1. Pass directly to constructor: SnowExLambdaClient(function_url='https://...') 2. Set SNOWEX_LAMBDA_URL environment variable 3. Uses DEFAULT_FUNCTION_URL class constant Args: function_url: Lambda Function URL (https://....lambda-url.us-west-2.on.aws) If None, uses SNOWEX_LAMBDA_URL environment variable or default production URL. """ # Get Function URL from parameter, environment, or default self.function_url = ( function_url or os.environ.get('SNOWEX_LAMBDA_URL') or self.DEFAULT_FUNCTION_URL ) # Validate URL if not self.function_url: raise ValueError( "\n\n" + "=" * 70 + "\n" + "Lambda Function URL Not Configured\n" + "=" * 70 + "\n\n" + "Please provide the Lambda Function URL in one of " "these ways:\n\n" + "1. Pass directly to constructor:\n" + " client = SnowExLambdaClient(" "function_url='https://...')\n\n" + "2. Set environment variable:\n" + " export SNOWEX_LAMBDA_URL='https://...'\n\n" + "3. Update DEFAULT_FUNCTION_URL in lambda_client.py\n\n" + "Contact the SnowEx team if you need the Function URL.\n" + "=" * 70 ) # Remove trailing slash if present self.function_url = self.function_url.rstrip('/') # Setup HTTP session with retries for reliability self.session = requests.Session() retry_strategy = Retry( total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["POST"] ) adapter = HTTPAdapter(max_retries=retry_strategy) self.session.mount("https://", adapter) # Dynamically create class-based accessors from available # measurement classes self._create_measurement_clients()
[docs] def query(self, sql_query: str) -> pd.DataFrame: """ Execute a raw SQL query against the database via Lambda. Args: sql_query: Raw SQL query string to execute Returns: pd.DataFrame: Query results as a DataFrame """ result = self._invoke_lambda('query', sql=sql_query) return pd.DataFrame(result.get('data', []))
def _create_measurement_clients(self): """ Dynamically create measurement client attributes based on available measurement classes. This method discovers measurement classes using the same convention as the Lambda handler: - Classes ending in 'Measurements' - Available in the snowexsql.api module - Creates snake_case attributes (e.g., PointMeasurements -> point_measurements) """ try: # Import here to avoid circular imports from snowexsql import api # Get all measurement classes using the same discovery # logic as lambda_handler measurement_classes = [ name for name in dir(api) if name.endswith('Measurements') and hasattr(api, name) ] # Create client attributes dynamically for class_name in measurement_classes: # Convert CamelCase to snake_case for attribute name attr_name = ''.join([ '_' + c.lower() if c.isupper() else c for c in class_name ]).lstrip('_') # Create the client accessor setattr( self, attr_name, _LambdaDatasetClient(self, class_name) ) except ImportError as e: # If local discovery fails raise ImportError( f"Could not auto-discover measurement classes " f"from snowexsql.api: {e}. " "This usually indicates a packaging or import " "issue. Check that the snowexsql package is " "properly installed." )
[docs] def get_measurement_classes(self): """ Get all measurement client objects as a dictionary for easy unpacking. This method dynamically discovers all available measurement classes and returns them with their original CamelCase names, making it easy to use as drop-in replacements for direct API imports. Returns: Dict mapping class names (str) to client objects Example: >>> from snowexsql.lambda_client import SnowExLambdaClient >>> client = SnowExLambdaClient() >>> >>> # Get all measurement classes >>> classes = client.get_measurement_classes() >>> PointMeasurements = classes['PointMeasurements'] >>> LayerMeasurements = classes['LayerMeasurements'] >>> >>> # Use exactly like the direct API >>> df = PointMeasurements.from_filter(type='depth', limit=10) >>> df.plot(column='value', cmap='jet') """ try: from snowexsql import api # Get all measurement classes measurement_classes = [ name for name in dir(api) if name.endswith('Measurements') and hasattr(api, name) ] # Build dictionary mapping class names to client objects result = {} for class_name in measurement_classes: # Convert CamelCase to snake_case to get the attribute name attr_name = ''.join([ '_' + c.lower() if c.isupper() else c for c in class_name ]).lstrip('_') # Get the client object and map it to the original class name if hasattr(self, attr_name): result[class_name] = getattr(self, attr_name) return result except ImportError as e: raise ImportError( f"Could not discover measurement classes: {e}" )
def _serialize_payload(self, obj): """ Recursively serialize payload objects to JSON-compatible format. Handles datetime objects and Shapely geometry objects by converting them to JSON-serializable formats. Args: obj: Object to serialize Returns: JSON-serializable version of the object """ if isinstance(obj, (datetime, date)): return obj.isoformat() elif hasattr(obj, '__geo_interface__'): # Handle Shapely geometry objects (Point, Polygon, etc.) return obj.__geo_interface__ elif isinstance(obj, dict): return {key: self._serialize_payload(value) for key, value in obj.items()} elif isinstance(obj, (list, tuple)): return [self._serialize_payload(item) for item in obj] else: return obj def _invoke_lambda(self, action: str, **kwargs) -> dict: """ Internal method to invoke Lambda function via HTTP POST Args: action: The action to perform (e.g., 'test_connection', 'get_layer_measurements') **kwargs: Additional parameters to pass to the Lambda function Returns: Dict containing the Lambda function response Raises: Exception: If Lambda invocation fails or returns an error """ payload = {'action': action, **kwargs} # Serialize datetime objects and other non-JSON-serializable types payload = self._serialize_payload(payload) try: response = self.session.post( self.function_url, json=payload, headers={'Content-Type': 'application/json'}, timeout=self.REQUEST_TIMEOUT_SECONDS ) # Check HTTP status if response.status_code != 200: error_text = response.text[:500] if response.text \ else 'No response body' raise Exception( f"Lambda returned HTTP {response.status_code}: {error_text}" ) # Parse JSON response result = response.json() # Check for Lambda errors if 'errorMessage' in result: raise Exception(f"Lambda error: {result['errorMessage']}") # Check for application-level errors if 'error' in result: raise Exception(f"Query error: {result['error']}") if not result.get('success', True): error_msg = result.get('error', 'Unknown error') raise Exception(f"Request failed: {error_msg}") return result except requests.exceptions.Timeout: raise Exception( f"Request timed out after " f"{self.REQUEST_TIMEOUT_SECONDS} seconds. The query " f"may be too complex or the database is slow. Try " f"adding a 'limit' parameter to reduce result size." ) except requests.exceptions.ConnectionError as e: raise Exception( f"Could not connect to Lambda function at:\n" f"{self.function_url}\n\n" f"Possible causes:\n" f"1. Check your internet connectivity\n" f"2. Verify the Function URL is correct\n" f"3. If the issue persists, the service may be " f"temporarily unavailable - contact the SnowEx team\n\n" f"Connection error: {str(e)}" ) except requests.exceptions.RequestException as e: raise Exception(f"HTTP request failed: {str(e)}") except json.JSONDecodeError as e: response_preview = ( response.text[:200] if hasattr(response, 'text') else 'N/A' ) raise Exception( f"Failed to parse Lambda response as JSON: {str(e)}\n" f"Response preview: {response_preview}" )
[docs] def test_connection(self) -> Dict[str, Any]: """ Test database connection through Lambda Returns: Dict with connection status and database version info """ return self._invoke_lambda('test_connection')
class _LambdaDatasetClient: """ Dynamic proxy client that automatically mirrors api.py BaseDataset classes This class uses Python's __getattr__ magic method to dynamically handle any method or property call, eliminating the need to manually synchronize with changes in the underlying API classes. Supported patterns: - Properties starting with 'all_': all_instruments, all_campaigns, etc. - Known methods: from_filter, from_unique_entries, from_area - Class-specific properties: all_sites (LayerMeasurements only) """ # Known methods that return DataFrames _DATAFRAME_METHODS = {'from_filter', 'from_area', 'get_sites'} # Known methods that take special parameters _KNOWN_METHODS = { 'from_filter': ['filters'], 'from_unique_entries': ['columns', 'filters'], 'from_area': ['shp', 'pt', 'buffer', 'crs'], 'get_sites': ['site_names'] } def __init__( self, parent_client: SnowExLambdaClient, class_name: str ): self._client = parent_client self._class_name = class_name def __getattr__(self, name: str): """ Dynamic attribute access - handles any property or method call This magic method is called when an attribute is accessed that doesn't exist on the object. It routes the call to the appropriate handler based on naming patterns. """ # Pattern 1: Properties starting with 'all_' if name.startswith('all_'): return self._get_property(name) # Pattern 2: Known methods from BaseDataset elif name in self._KNOWN_METHODS: return self._create_method_proxy(name) # Pattern 3: Other potential methods (extensible) elif name.startswith('get_') or name.startswith('find_'): return self._create_method_proxy(name) # Pattern 4: Handle unknown attributes with helpful error else: methods_list = list(self._KNOWN_METHODS.keys()) raise AttributeError( f"'{self._class_name}' has no attribute '{name}'. " f"Available patterns: all_* (properties), " f"{methods_list} (methods)" ) def _create_method_proxy(self, method_name: str): """ Create a proxy function for a method that will invoke Lambda Returns a callable that matches the signature of the original method """ def method_proxy(*args, as_geodataframe=True, **kwargs): # Convert positional args to kwargs based on known method # signatures if args and method_name in self._KNOWN_METHODS: param_names = self._KNOWN_METHODS[method_name] for i, arg in enumerate(args): if i < len(param_names): kwargs[param_names[i]] = arg # Shape the payload to match what the Lambda handler # expects from_filter: expects a single 'filters' dict if method_name == 'from_filter': provided_filters = {} # If user provided an explicit filters dict, start # with it if 'filters' in kwargs and isinstance( kwargs['filters'], dict ): provided_filters.update(kwargs['filters']) kwargs.pop('filters', None) # Move any remaining kwargs into filters for k in list(kwargs.keys()): provided_filters[k] = kwargs.pop(k) # Now set the shaped kwargs kwargs = {'filters': provided_filters} # from_unique_entries: expects 'columns' list and # optional 'filters' dict elif method_name == 'from_unique_entries': columns = kwargs.get('columns') if columns is None and 'filters' in kwargs: # In case user passed columns positionally earlier, # it's already mapped pass # Start filters from explicit dict if present provided_filters = {} if 'filters' in kwargs and isinstance( kwargs['filters'], dict ): provided_filters.update(kwargs['filters']) # Pull out recognized key 'columns' shaped = {} if columns is not None: shaped['columns'] = columns # Move any unrecognized keys (besides # 'columns'/'filters') into filters for k in list(kwargs.keys()): if k in ('columns', 'filters'): continue provided_filters[k] = kwargs[k] if provided_filters: shaped['filters'] = provided_filters kwargs = shaped if shaped else kwargs # from_area: Handle server-side spatial filtering using PostGIS # Lambda uses PostGIS for efficient database-side spatial queries elif method_name == 'from_area': return self._handle_from_area_server_side(kwargs, as_geodataframe) # Invoke Lambda with the method call action = f'{self._class_name}.{method_name}' result = self._client._invoke_lambda(action, **kwargs) if 'error' in result: raise Exception( f"Method call failed: {result['error']}" ) # Smart return type handling based on method if method_name in self._DATAFRAME_METHODS: df = pd.DataFrame(result['data']) # Convert to GeoDataFrame if requested and possible if as_geodataframe and self._can_convert_to_geodataframe(df): return self._to_geodataframe(df) return df else: return result['data'] # Add helpful docstring to the proxy function method_proxy.__doc__ = ( f"Proxy for {self._class_name}.{method_name}() - " f"calls Lambda backend\n\n" f"Args:\n" f" as_geodataframe (bool): If True (default), return GeoDataFrame " f"when geometry data is available.\n" f" If False, return regular DataFrame.\n" f" Requires geopandas to be installed." ) method_proxy.__name__ = method_name return method_proxy def _handle_from_area_server_side( self, kwargs: dict, as_geodataframe: bool ): """ Handle from_area() with server-side PostGIS spatial filtering Lambda uses PostGIS for efficient database-side spatial queries: 1. Convert geometry to WKT (Well-Known Text) format 2. Send to Lambda with other filters 3. Lambda constructs PostGIS spatial query 4. Database performs spatial filtering efficiently 5. Return filtered results Args: kwargs: Parameters including pt/shp, buffer, crs, and other filters as_geodataframe: Whether to return as GeoDataFrame Returns: Filtered GeoDataFrame or DataFrame """ try: from shapely.geometry import Point except ImportError: raise ImportError( "shapely is required for from_area(). " "Install with: pip install shapely" ) # Extract spatial parameters pt = kwargs.pop('pt', None) shp = kwargs.pop('shp', None) buffer_dist = kwargs.pop('buffer', None) crs = kwargs.pop('crs', 4326) # Default to WGS84 # Validate parameters if pt is None and shp is None: raise ValueError( "Either 'pt' or 'shp' parameter is required " "for from_area" ) if pt is not None and buffer_dist is None: raise ValueError( "'buffer' parameter is required when using 'pt'" ) # Convert geometry to WKT for transmission to Lambda if pt is not None: # Convert point to WKT if isinstance(pt, Point): pt_wkt = pt.wkt elif isinstance(pt, (tuple, list)) and len(pt) == 2: pt_wkt = Point(pt[0], pt[1]).wkt else: raise ValueError( "pt must be a shapely Point or (x, y) tuple" ) kwargs['pt_wkt'] = pt_wkt kwargs['buffer'] = buffer_dist else: # Convert shape to WKT if hasattr(shp, 'wkt'): kwargs['shp_wkt'] = shp.wkt else: raise ValueError("shp must be a shapely geometry object") kwargs['crs'] = crs # Remaining kwargs are filters filters = {} for k, v in list(kwargs.items()): if k not in ['pt_wkt', 'shp_wkt', 'buffer', 'crs']: filters[k] = kwargs.pop(k) if filters: kwargs['filters'] = filters # Invoke Lambda with PostGIS spatial query action = f'{self._class_name}.from_area' result = self._client._invoke_lambda(action, **kwargs) # Convert result to DataFrame df = pd.DataFrame(result.get('data', [])) if df.empty: return df # Convert to GeoDataFrame if requested if as_geodataframe: df = self._to_geodataframe(df) return df def _can_convert_to_geodataframe(self, df: pd.DataFrame) -> bool: """ Check if DataFrame can be converted to GeoDataFrame Args: df: DataFrame to check Returns: bool: True if conversion is possible """ # Check for PostGIS geometry columns has_geometry = 'geometry' in df.columns has_geom = 'geom' in df.columns # PostGIS column name return has_geometry or has_geom def _to_geodataframe(self, df: pd.DataFrame): """ Convert pandas DataFrame to GeoDataFrame Handles PostGIS geometry columns returned from Lambda: - geom column from PostGIS databases (WKB hex, WKT, or GeoJSON dict) - geometry column already present Args: df: DataFrame to convert Returns: GeoDataFrame if conversion successful, otherwise original DataFrame """ try: import geopandas as gpd from shapely import wkb, wkt from shapely.geometry import shape # Case 1: DataFrame has 'geom' column (PostGIS standard) if 'geom' in df.columns: if df['geom'].dtype == 'object': # Try to parse as WKB hex string (most common from PostGIS) try: df['geometry'] = df['geom'].apply( lambda x: wkb.loads(x, hex=True) if x else None ) return gpd.GeoDataFrame( df, geometry='geometry', crs='EPSG:4326' ) except Exception: # Try as WKT string try: df['geometry'] = df['geom'].apply( lambda x: ( wkt.loads(x) if x else None ) ) return gpd.GeoDataFrame( df, geometry='geometry', crs='EPSG:4326' ) except Exception: # Try as GeoJSON __geo_interface__ dict try: df['geometry'] = df['geom'].apply( lambda x: shape(x) if x else None ) return gpd.GeoDataFrame( df, geometry='geometry', crs='EPSG:4326' ) except: pass # Fall through to return original df # Case 2: DataFrame already has geometry column elif 'geometry' in df.columns: # Try to parse as WKT if it's a string if df['geometry'].dtype == 'object': try: df['geometry'] = df['geometry'].apply( lambda x: wkt.loads(x) if x else None ) except: pass # Already valid geometry or fail return gpd.GeoDataFrame( df, geometry='geometry', crs='EPSG:4326' ) # Case 3: No spatial data available return df except ImportError: # If geopandas not available, return regular DataFrame import warnings warnings.warn( "geopandas not installed. Returning pandas DataFrame. " "Install geopandas for spatial plotting: pip install geopandas", UserWarning ) return df except Exception as e: # If conversion fails for any other reason import warnings warnings.warn( f"Could not convert to GeoDataFrame: {e}. " f"Returning pandas DataFrame.", UserWarning ) return df def _get_property(self, property_name: str): """Handle property access via Lambda""" action = f'{self._class_name}.{property_name}' result = self._client._invoke_lambda(action) if 'error' in result: raise Exception( f"Property access failed: {result['error']}" ) return result['data'] def __repr__(self): """Helpful representation for debugging""" return f"<{self._class_name}Client via Lambda>" # Convenience function for quick client creation
[docs] def create_client(function_url: Optional[str] = None) -> SnowExLambdaClient: """ Create a SnowExLambdaClient instance Args: function_url: Lambda Function URL (optional) Returns: SnowExLambdaClient instance """ return SnowExLambdaClient(function_url=function_url)