"""
SnowEx API for querying measurement data.
LAMBDA INTEGRATION CONVENTIONS:
===============================
If you're adding a new measurement class that should be available
via the Lambda API, follow these naming conventions:
1. Class name MUST end with 'Measurements'
(e.g., WeatherMeasurements)
2. Class MUST have a 'MODEL' attribute pointing to the SQLAlchemy
model
3. Class MUST inherit from BaseDataset
Example:
class WeatherMeasurements(BaseDataset):
MODEL = WeatherData # Required for Lambda auto-discovery!
# Your implementation here...
The Lambda handler will automatically discover and expose your
class as:
client.weather_measurements.from_filter()
client.weather_measurements.all_instruments
etc.
See snowexsql.lambda_handler._get_measurement_classes() for
implementation details.
"""
import logging
import os
from contextlib import contextmanager
from sqlalchemy.sql import func
from sqlalchemy import cast, Numeric, exists, literal, select
# Initialize logger first
LOG = logging.getLogger(__name__)
# Import pandas - always available
import pandas as pd
from sqlalchemy.dialects import postgresql
def query_to_geopandas(query, engine, **kwargs):
"""
Convert SQLAlchemy query to GeoDataFrame (if geopandas available)
or DataFrame.
Execution context:
- Local power users: Returns GeoDataFrame with proper geometry objects
- Lambda environment: Returns pandas DataFrame (no geopandas dependency)
- DataFrame is serialized to JSON by lambda_handler
- lambda_client receives JSON and converts to GeoDataFrame client-side
Args:
query: SQLAlchemy Query object
engine: SQLAlchemy engine
**kwargs: Additional arguments passed to read_postgis or read_sql
Returns:
geopandas.GeoDataFrame if geopandas available,
otherwise pandas.DataFrame
"""
sql = query.statement.compile(dialect=postgresql.dialect())
try:
import geopandas as gpd
return gpd.read_postgis(sql, engine.connect(), **kwargs)
except ImportError:
# Geopandas not available (e.g., Lambda environment)
# Returns pandas DataFrame with geometry as WKB/WKT
# lambda_client will convert to GeoDataFrame client-side
return pd.read_sql(sql, engine, **kwargs)
def raster_to_rasterio(rasters):
"""Raster functionality requires rasterio"""
raise ImportError(
"Raster functionality not available in Lambda environment. "
"Use local API for raster operations."
)
from snowexsql.db import get_db
from snowexsql.tables import (
Campaign, CampaignObservation, DOI, ImageData, ImageObservation, Instrument,
LayerData, MeasurementType, Observer, PointData, PointObservation, Site
)
from snowexsql.db import db_session_with_credentials
# TODO:
# * Possible enums
# * implement 'like' or 'contains' method
[docs]
class LargeQueryCheckException(RuntimeError):
pass
def get_points():
# Lets grab a single row from the points table
with db_session_with_credentials() as (_engine, session):
qry = session.query(PointData).limit(1)
# Execute that query!
result = qry.all()
[docs]
class BaseDataset:
"""Base class for all SnowEx measurement dataset accessors.
Provides filtering, querying, and property-based access to measurements
stored in the SnowEx database. Subclasses must set ``MODEL`` to the
appropriate SQLAlchemy table class.
"""
MODEL = None
ALLOWED_QRY_KWARGS = [
"campaign", "date", "instrument", "type",
"utm_zone", "date_greater_equal", "date_less_equal",
"value_greater_equal", 'value_less_equal', "doi",
"observer"
]
SPECIAL_KWARGS = ["limit"]
# Default max record count
MAX_RECORD_COUNT = 1000
[docs]
@staticmethod
def retrieve_single_value_result(result):
"""
When we only request a single thing we still get a list of
lists this function filters it out. This usually looks like a
list of tuples.
"""
final = []
if len(result) != 0:
final = [r[0] for r in result]
return final
@classmethod
def _build_select_clause(cls, verbose=False):
"""
Build the SELECT clause for queries.
Override in subclasses to customize columns returned based on verbose
mode.
Args:
verbose: If True, return denormalized data with related table
columns
Returns:
List of SQLAlchemy column expressions for session.query()
"""
# Default: just return the model (all its direct columns)
return [cls.MODEL]
@classmethod
def _check_size(cls, qry, kwargs):
# Safeguard against accidental giant requests
count = qry.count()
if count > cls.MAX_RECORD_COUNT and "limit" not in kwargs:
raise LargeQueryCheckException(
f"Query will return {count} number of records,"
f" but we have a default max of {cls.MAX_RECORD_COUNT}."
f" If you want to proceed, set the 'limit' filter"
f" to the desired number of records."
)
@classmethod
def _filter_campaign(cls, qry, v):
qry = qry.filter(
Site.campaign.has(Campaign.name == v)
)
return qry
@classmethod
def _filter_observers(cls, qry, v):
qry = qry.join(
cls.MODEL.observers
).filter(Observer.name == v)
return qry
@classmethod
def _filter_instrument(cls, qry, value):
return qry.filter(
cls.MODEL.instrument.has(name=value)
)
@classmethod
def _filter_measurement_type(cls, qry, value):
return qry.join(
cls.MODEL.measurement_type
).filter(MeasurementType.name == value)
@classmethod
def _filter_doi(cls, qry, value):
return qry.join(cls.MODEL.doi).filter(DOI.doi == value)
[docs]
@classmethod
def extend_qry(cls, qry, check_size=True, **kwargs):
if cls.MODEL is None:
raise ValueError("You must use a class with a MODEL.")
# use the default kwargs
for k, v in kwargs.items():
# Handle special operations
if k in cls.ALLOWED_QRY_KWARGS:
qry_model = cls.MODEL
# Logic for filtering on date with LayerData
if "date" in k and cls.MODEL == LayerData:
qry = qry.join(LayerData.site)
qry_model = Site
elif cls.MODEL == PointData:
qry = qry.join(PointData.observation)
# standard filtering using qry.filter
if isinstance(v, list):
filter_col = getattr(qry_model, k)
if k == "date":
raise ValueError(
"We cannot search for a list of dates"
)
elif "_equal" in k:
raise ValueError(
"We cannot compare greater_equal or "
"less_equal with a list"
)
elif k == "site":
# Skip list handling here, will be handled below
pass
else:
qry = qry.filter(filter_col.in_(v))
LOG.debug(
f"Filtering {k} to value {v}"
)
else:
# Filter boundary
if "_greater_equal" in k:
key = k.split("_greater_equal")[0]
if key == "value":
qry = qry.filter(
cast(getattr(qry_model, key), Numeric) >= v
)
else:
qry = qry.filter(
getattr(qry_model, key) >= v
)
elif "_less_equal" in k:
key = k.split("_less_equal")[0]
if key == "value":
qry = qry.filter(
cast(getattr(qry_model, key), Numeric) <= v
)
else:
qry = qry.filter(
getattr(qry_model, key) <= v
)
# Filter linked columns
elif k == "instrument":
qry = cls._filter_instrument(qry, v)
elif k == "campaign":
qry = cls._filter_campaign(qry, v)
elif k == "site":
# Handle list of site names
if isinstance(v, list):
qry = qry.filter(
exists().where(
qry_model.site_id == Site.id
).where(
Site.name.in_(v)
)
)
else:
qry = qry.filter(
qry_model.site.has(name=v)
)
elif k == "observer":
qry = cls._filter_observers(qry, v)
elif k == "doi":
qry = cls._filter_doi(qry, v)
elif k == "type":
qry = cls._filter_measurement_type(qry, v)
# Filter to exact value
else:
qry = qry.filter(
getattr(qry_model, k) == v
)
LOG.debug(
f"Filtering {k} to list {v}"
)
# to avoid limit before filter
elif k in cls.SPECIAL_KWARGS:
if k == "limit":
qry = qry.limit(v)
else:
# Error out for not-allowed kwargs
raise ValueError(f"{k} is not an allowed filter")
if check_size:
cls._check_size(qry, kwargs)
return qry
[docs]
@classmethod
def from_unique_entries(cls, columns_to_search, **kwargs):
"""
Returns unique values from a column to help with filtering
"""
columns = [
getattr(cls.MODEL, column)
for column in columns_to_search
]
with db_session_with_credentials() as (_engine, session):
try:
qry = session.query(*columns)
# Hardcode the limit to
qry = cls.extend_qry(qry, check_size=False, **kwargs)
results = qry.distinct().all()
except Exception as e:
session.close()
LOG.error(
"Failed query finding options for filtering"
)
raise e
if len(columns_to_search) == 1:
results = cls.retrieve_single_value_result(results)
return results
[docs]
@classmethod
def from_filter(cls, verbose=False, **kwargs):
"""
Get data for the class by filtering by allowed arguments. The allowed
filters are cls.ALLOWED_QRY_KWARGS.
Args:
verbose: If True, return denormalized data with related table columns
kwargs: Filter arguments from ALLOWED_QRY_KWARGS
"""
with db_session_with_credentials() as (engine, session):
try:
select_clause = cls._build_select_clause(verbose)
qry = session.query(*select_clause)
# Add explicit joins for verbose mode to avoid cartesian
# products
if verbose and hasattr(cls, '_add_verbose_joins'):
qry = cls._add_verbose_joins(qry)
elif hasattr(cls, '_add_base_joins'):
# For verbose=False, still need basic joins (e.g.,
# Site for geom)
qry = cls._add_base_joins(qry)
qry = cls.extend_qry(qry, **kwargs)
# For debugging in the test suite and not
# recommended in production
# https://docs.sqlalchemy.org/en/20/faq/
# sqlexpressions.html#rendering-postcompile-
# parameters-as-bound-parameters
if 'DEBUG_QUERY' in os.environ:
full_sql_query = qry.statement.compile(
compile_kwargs={"literal_binds": True}
)
print("\n ** SQL query **")
print(full_sql_query)
df = query_to_geopandas(qry, engine)
except Exception as e:
session.close()
LOG.error("Failed query for PointData")
raise e
return df
[docs]
@classmethod
def from_area(
cls, verbose=False, shp=None, pt=None, buffer=None, crs=26912, **kwargs
):
"""
Get data for the class within a specific shapefile or
within a point and a known buffer. Uses PostGIS functions via ORM
for spatial operations, eliminating dependency on geoalchemy2/shapely.
Args:
verbose: If True, return denormalized data with related table columns
shp: shapely geometry in which to filter, or WKT string
pt: shapely point that will have a buffer applied, or WKT string
buffer: buffer distance in same units as point (meters if using geography)
crs: integer SRID/EPSG code (default 26912 = UTM Zone 12N)
kwargs: for more filtering or limiting (cls.ALLOWED_QRY_KWARGS)
Returns:
pandas DataFrame with results (includes geom column with WKT)
"""
if shp is None and pt is None:
raise ValueError(
"Inputs must be a shape description or a point "
"and buffer"
)
if ((pt is not None and buffer is None) or
(buffer is not None and pt is None)):
raise ValueError(
"pt and buffer must be given together"
)
# Convert shapely objects to WKT if needed
if shp is not None and hasattr(shp, 'wkt'):
shp_wkt = shp.wkt
elif isinstance(shp, str):
shp_wkt = shp
else:
shp_wkt = None
if pt is not None:
if hasattr(pt, 'wkt'):
pt_wkt = pt.wkt
elif isinstance(pt, str):
pt_wkt = pt
elif isinstance(pt, (tuple, list)) and len(pt) == 2:
# Handle (x, y) tuple format
pt_wkt = f"POINT ({pt[0]} {pt[1]})"
else:
pt_wkt = None
else:
pt_wkt = None
# Determine table structure
table_name = cls.MODEL.__tablename__
needs_site_join = (table_name == 'layers')
with db_session_with_credentials() as (engine, session):
try:
# Detect database SRID to avoid transforming indexed column
# Query first non-null geometry to determine database SRID
if needs_site_join:
srid_qry = session.query(func.ST_SRID(Site.geom)).filter(
Site.geom.isnot(None)
).limit(1)
else:
srid_qry = session.query(func.ST_SRID(cls.MODEL.geom)).filter(
cls.MODEL.geom.isnot(None)
).limit(1)
try:
db_srid_result = session.execute(srid_qry).first()
if not db_srid_result or db_srid_result[0] is None:
# No data in table yet - use input CRS as default
# This allows empty table queries to work
# (will return empty)
LOG.warning(
f"No geometries found in {table_name}, "
f"using input CRS {crs} as default"
)
db_srid = crs
else:
db_srid = db_srid_result[0]
LOG.debug(f"Detected database SRID: \
{db_srid} for table {table_name}")
except Exception as srid_error:
# If SRID detection fails, fall back to input CRS
LOG.warning(
f"SRID detection failed for {table_name}: {srid_error}."
f"Using input CRS {crs} as default"
)
db_srid = crs
# Build PostGIS search geometry
# Transform search geometry to match database SRID for index
# usage
if pt_wkt:
# Create point in input CRS, buffer it, then transform to
# DB SRID
# Buffer before transform to ensure correct distance units
search_geom = func.ST_Transform(
func.ST_Buffer(
func.ST_GeomFromText(literal(pt_wkt), literal(crs)),
literal(buffer)
),
literal(db_srid)
)
elif shp_wkt:
# Transform shape from input CRS to database SRID
search_geom = func.ST_Transform(
func.ST_GeomFromText(literal(shp_wkt), literal(crs)),
literal(db_srid)
)
else:
raise ValueError("Unable to parse geometry input")
# Build select clause (handles verbose mode)
select_clause = cls._build_select_clause(verbose) \
if hasattr(cls, '_build_select_clause') \
else [cls.MODEL]
qry = session.query(*select_clause)
# Add explicit joins for verbose mode to avoid
# cartesian products
if verbose and hasattr(cls, '_add_verbose_joins'):
qry = cls._add_verbose_joins(qry)
elif hasattr(cls, '_add_base_joins'):
# For verbose=False, still need basic joins
# (e.g., Site for geom)
qry = cls._add_base_joins(qry)
# Add spatial filter
if needs_site_join:
# For LayerData, join to Site for geometry
# SQLAlchemy handles duplicate joins from _add_*_joins above
qry = qry.join(cls.MODEL.site)
qry = qry.filter(func.ST_Intersects(Site.geom, search_geom))
else:
# For PointData, use direct geometry column
qry = qry.filter(func.ST_Intersects(cls.MODEL.geom,
search_geom))
# Add standard filters using existing extend_qry
# This handles type, instrument, campaign, date ranges, etc.
qry = cls.extend_qry(qry, **kwargs)
# Execute and convert to GeoDataFrame
df = query_to_geopandas(qry, engine)
except Exception as e:
session.close()
LOG.error(f"Failed query for {cls.__name__}")
raise e
return df
@property
def all_campaigns(self):
"""
Return all campaign names
"""
with db_session_with_credentials() as (_engine, session):
qry = session.query(Campaign.name).distinct()
result = qry.all()
return self.retrieve_single_value_result(result)
@property
def all_types(self):
"""
Return all types of the data
"""
with db_session_with_credentials() as (_engine, session):
qry = session.query(MeasurementType.name).distinct()
result = qry.all()
return self.retrieve_single_value_result(result)
@property
def all_dates(self):
"""
Return all distinct dates in the data
"""
with db_session_with_credentials() as (_engine, session):
qry = session.query(self.MODEL.date).distinct()
result = qry.all()
return self.retrieve_single_value_result(result)
@property
def all_observers(self):
"""
Return all distinct observers in the data
"""
with db_session_with_credentials() as (_engine, session):
qry = session.query(Observer.name).distinct()
result = qry.all()
return self.retrieve_single_value_result(result)
@property
def all_dois(self):
"""
Return all distinct DOIs in the data
"""
with db_session_with_credentials() as (_engine, session):
qry = session.query(DOI.doi).distinct()
result = qry.all()
return self.retrieve_single_value_result(result)
@property
def all_units(self):
"""
Return all distinct units in the data
"""
with db_session_with_credentials() as (_engine, session):
qry = session.query(self.MODEL.units).distinct()
result = qry.all()
return self.retrieve_single_value_result(result)
@property
def all_instruments(self):
"""
Return all distinct instruments in the data
"""
with db_session_with_credentials() as (_engine, session):
# Use EXISTS for better performance on large datasets
# (29GB+ tables)
qry = session.query(Instrument.name).filter(
exists().where(
self.MODEL.instrument_id == Instrument.id
)
)
result = qry.all()
return self.retrieve_single_value_result(result)
[docs]
class PointMeasurements(BaseDataset):
"""
API class for access to PointData
"""
MODEL = PointData
@classmethod
def _build_select_clause(cls, verbose=False):
"""
Build SELECT clause for PointMeasurements queries.
Args:
verbose: If False, return only core point columns.
If True, return denormalized data with observation,
instrument, and measurement type info.
"""
if verbose:
# Return denormalized data with meaningful column names
# Note: Must also join these tables explicitly in the query
return [
cls.MODEL.value,
cls.MODEL.datetime.label('date'),
cls.MODEL.elevation,
cls.MODEL.geom,
CampaignObservation.name.label('observation_name'),
CampaignObservation.description.label('obs_description'),
MeasurementType.name.label('type'),
MeasurementType.units.label('units'),
MeasurementType.derived.label('derived'),
Instrument.name.label('instrument_name'),
Instrument.model.label('instrument_model'),
Instrument.specifications.label('instrument_specifications'),
Campaign.name.label('campaign_name'),
Observer.name.label('observer_name')
]
else:
# Return only columns from points table
return [
cls.MODEL.id,
cls.MODEL.value,
cls.MODEL.datetime,
cls.MODEL.elevation,
cls.MODEL.geom
]
@classmethod
def _add_verbose_joins(cls, qry):
"""
Add explicit joins needed for verbose mode to avoid cartesian products.
"""
qry = qry.join(cls.MODEL.observation)
qry = qry.join(cls.MODEL.measurement_type)
qry = qry.join(PointObservation.instrument)
qry = qry.join(PointObservation.campaign)
qry = qry.join(PointObservation.observer)
return qry
@classmethod
def _filter_campaign(cls, qry, value):
return qry.join(
cls.MODEL.observation
).join(
PointObservation.campaign
).filter(
Campaign.name == value
)
@classmethod
def _filter_instrument(cls, qry, value):
return qry.join(
cls.MODEL.observation
).join(
PointObservation.instrument
).filter(
Instrument.name == value
)
@classmethod
def _filter_doi(cls, qry, value):
return qry.join(
cls.MODEL.observation
).join(
PointObservation.doi
).filter(
DOI.doi == value
)
@classmethod
def _filter_observers(cls, qry, value):
return qry.join(
cls.MODEL.observation
).join(
PointObservation.observer
).filter(
Observer.name == value
)
@property
def all_types(self):
"""
Return all measurement types that have data in the points table
"""
with db_session_with_credentials() as (_engine, session):
# Use EXISTS for better performance on large points table
qry = session.query(MeasurementType.name).filter(
exists().where(
PointData.measurement_type_id == MeasurementType.id
)
)
result = qry.all()
return self.retrieve_single_value_result(result)
@property
def all_instruments(self):
"""
Return all distinct instruments in the data
"""
with db_session_with_credentials() as (_engine, session):
result = session.query(Instrument.name).filter(
Instrument.id.in_(
session.query(
PointObservation.instrument_id
).distinct()
)
).distinct().all()
return self.retrieve_single_value_result(result)
class TooManyRastersException(Exception):
"""
Exception to report to users that their query will produce too
many rasters
"""
pass
[docs]
class LayerMeasurements(BaseDataset):
"""
API class for access to LayerData
"""
MODEL = LayerData
ALLOWED_QRY_KWARGS = [
"campaign", "site", "date", "instrument", "observer", "type",
"utm_zone", "date_greater_equal", "date_less_equal",
"doi", "value_greater_equal", 'value_less_equal'
]
@classmethod
def _filter_campaign(cls, qry, v):
return qry.join(
cls.MODEL.site
).join(
Site.campaign
).filter(
Campaign.name == v
)
@classmethod
def _filter_observers(cls, qry, v):
return qry.join(
cls.MODEL.site
).join(
Site.observers
).filter(
Observer.name == v
)
@classmethod
def _filter_doi(cls, qry, value):
return qry.join(
cls.MODEL.site
).join(
Site.doi
).filter(
DOI.doi == value
)
@classmethod
def _build_select_clause(cls, verbose=False):
"""
Build SELECT clause for LayerMeasurements queries.
Args:
verbose: If False, return only core layer columns (no joins).
If True, return denormalized data with site, instrument,
and measurement type info.
"""
if verbose:
# Return denormalized data with meaningful column names
# Note: Must also join these tables explicitly in the query
return [
cls.MODEL.depth,
cls.MODEL.bottom_depth,
cls.MODEL.value,
Site.name.label('site_name'),
Site.description.label('site_description'),
Site.slope_angle.label('slope_angle'),
Site.aspect.label('aspect'),
Site.air_temp.label('air_temp'),
Site.total_depth.label('total_depth'),
Site.weather_description.label('weather_description'),
Site.precip.label('precip'),
Site.sky_cover.label('sky_cover'),
Site.wind.label('wind'),
Site.ground_condition.label('ground_condition'),
Site.ground_roughness.label('ground_roughness'),
Site.ground_vegetation.label('ground_vegetation'),
Site.vegetation_height.label('vegetation_height'),
Site.tree_canopy.label('tree_canopy'),
Site.comments.label('site_comments'),
Site.datetime.label('date'),
Site.geom,
MeasurementType.name.label('type'),
MeasurementType.units.label('units'),
MeasurementType.derived.label('type_derived'),
Instrument.name.label('instrument_name'),
Instrument.model.label('instrument_model'),
Instrument.specifications.label('instrument_specifications'),
func.ST_AsText(Site.geom).label('geom_wkt')
]
else:
# Return only core columns from layers table plus geom for geopandas
return [
cls.MODEL.id,
cls.MODEL.depth,
cls.MODEL.bottom_depth,
cls.MODEL.value,
Site.geom # Required for GeoDataFrame
]
@classmethod
def _add_base_joins(cls, qry):
"""
Add minimal joins needed for non-verbose mode (just Site for geom).
"""
qry = qry.join(cls.MODEL.site)
return qry
@classmethod
def _add_verbose_joins(cls, qry):
"""
Add explicit joins needed for verbose mode to avoid cartesian products.
"""
qry = qry.join(cls.MODEL.site)
qry = qry.join(cls.MODEL.measurement_type)
qry = qry.join(cls.MODEL.instrument)
return qry
@property
def all_types(self):
"""
Return all measurement types that have data in the layers table
"""
with db_session_with_credentials() as (_engine, session):
# Use EXISTS for better performance on 208M row table
qry = session.query(MeasurementType.name).filter(
exists().where(
LayerData.measurement_type_id == MeasurementType.id
)
)
result = qry.all()
return self.retrieve_single_value_result(result)
@property
def all_sites(self):
"""
Return all specific site names
"""
with db_session_with_credentials() as (_engine, session):
result = session.query(
Site.name
).distinct().all()
return self.retrieve_single_value_result(result)
@property
def all_dates(self):
"""
Return all distinct dates in the data
"""
with db_session_with_credentials() as (_engine, session):
result = session.query(
Site.date
).distinct().all()
return self.retrieve_single_value_result(result)
@property
def all_units(self):
"""
Return all distinct units in the data
"""
with db_session_with_credentials() as (_engine, session):
result = session.query(
MeasurementType.units
).distinct().all()
return self.retrieve_single_value_result(result)
[docs]
@classmethod
def get_sites(cls, site_names=None, **kwargs):
"""
Get site information including geometries.
Args:
site_names: List of site names or single site name
kwargs: Additional filters (campaign, date, etc.)
Returns:
GeoDataFrame with site information
"""
with db_session_with_credentials() as (engine, session):
qry = session.query(Site.name,
Site.geom,
Site.description,
Site.datetime).distinct()
# others can be added
if site_names is not None:
if isinstance(site_names, list):
qry = qry.filter(Site.name.in_(site_names))
else:
qry = qry.filter(Site.name == site_names)
df = query_to_geopandas(qry, engine)
return df
class RasterMeasurements(BaseDataset):
MODEL = ImageData
ALLOWED_QRY_KWARGS = (
BaseDataset.ALLOWED_QRY_KWARGS + ['description']
)
@property
def all_types(self):
"""
Return all measurement types that have data in the images table
"""
with db_session_with_credentials() as (_engine, session):
result = session.query(
MeasurementType.name
).join(
ImageData, ImageData.observation_id == ImageObservation.id
).join(
ImageObservation.measurement_type
).distinct().all()
return self.retrieve_single_value_result(result)
@property
def all_descriptions(self):
with db_session_with_credentials() as (_engine, session):
qry = session.query(self.MODEL.description).distinct()
result = qry.all()
return self.retrieve_single_value_result(result)
@classmethod
def check_for_single_dataset(cls, **kwargs):
"""
At the moment there is not a clear path to how to deal with
multiple rasters so check that the user only requested one
dataset
"""
LOG.info(
"Checking raster query for single raster dataset..."
)
multi_raster_indicators = [
'instrument', 'date', 'observers', 'doi', 'type',
'description'
]
with db_session_with_credentials() as (_engine, session):
try:
# Form query and check if the query spans
# multiple rasters
for column in multi_raster_indicators:
values = cls.from_unique_entries(
[column], **kwargs
)
if len(values) > 1:
options = [f"'{v}'" for v in values]
raise TooManyRastersException(
f"More than one `{column}` suggests "
f"there are multiple raster datasets. "
f"Try filter {column} to one of the "
f"following values {', '.join(options)}."
)
except Exception as e:
session.close()
LOG.error("Failed query for Raster Data")
raise e
@classmethod
def from_filter(cls, **kwargs):
"""
Get data for the class by filtering by allowed arguments.
The allowed filters are cls.ALLOWED_QRY_KWARGS.
"""
cls.check_for_single_dataset(**kwargs)
with db_session_with_credentials() as (_engine, session):
try:
# Rebuild the query and form the raster
base_query = cls.MODEL.raster
qry = session.query(
func.ST_AsTiff(
func.ST_Union(base_query, type_=Raster)
)
)
qry = cls.extend_qry(qry, **kwargs)
rasters = qry.all()
# Get the rasterio object of the raster
datasets = raster_to_rasterio(rasters)
except Exception as e:
LOG.error("Failed query for Raster Data")
raise e
return datasets
@classmethod
def from_area(
cls, shp=None, pt=None, buffer=None, crs=26912, **kwargs
):
if shp is None and pt is None:
raise ValueError(
"We need a shape description or a point and buffer"
)
if ((pt is not None and buffer is None) or
(buffer is not None and pt is None)):
raise ValueError(
"pt and buffer must be given together"
)
# Check if geometric operations are available
if not HAS_CORE_GIS:
raise ImportError(
"geoalchemy2 and shapely are required for "
"geometric filtering. Install with: "
"pip install geoalchemy2 shapely"
)
with db_session_with_credentials() as (_engine, session):
try:
# Get shape ready for cropping with rasters
if shp:
db_shp = from_shape(shp, srid=crs)
else:
qry_pt = from_shape(pt)
qry = session.query(
gfunc.ST_SetSRID(
func.ST_Buffer(qry_pt, buffer), crs
)
)
db_shp = qry.all()[0][0]
# Grab the rasters, union and clip them
base_query = func.ST_AsTiff(
func.ST_Clip(
func.ST_Union(ImageData.raster, type_=Raster),
db_shp,
True
)
)
q = session.query(base_query)
# Find all the tiles that
q = q.filter(
gfunc.ST_Intersects(ImageData.raster, db_shp)
)
limit = kwargs.get("limit")
if limit:
kwargs.pop("limit")
q = cls.extend_qry(q, check_size=False, **kwargs)
rasters = q.all()
# Get the rasterio object of the raster
datasets = raster_to_rasterio(rasters)
if len(datasets) > 0:
dataset = datasets[0]
else:
dataset = datasets
return dataset
except Exception as e:
LOG.error("Failed query for Raster Data")
raise e