Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ scripts/run-with-local-http.sh

This project includes a convenience script to index and serve a remote STAC catalog. This script will fully index the remote STAC catalog each time it is run. This may not be the most efficient way to meet your needs, but it does help demonstrate some of this project's capabilities.

> ![NOTE]
> This script should not be used in a production environment. It is intended for local testing only.

This script can optionally be called with a comma-separated list of STAC item JSON fixers, invoking the behaviour described [here](./docs/index-config.md#fixes).

```sh
Expand Down
41 changes: 0 additions & 41 deletions docs/suitability.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,44 +65,3 @@ Since 1.1.0 STAC-GeoParquet does not _require_ each collection to exist in a dif
#### stac-fastapi-geoparquet

The [stac-fastapi-geoparquet](https://pypi.org/project/stac-fastapi-geoparquet/) project aims to augment STAC-GeoParquet with a STAC API interface, however this project does not currently appear to offer a production-ready solution.

## Other Considerations

### Paging Tokens

Paging tokens included with search and collection items responses that span multiple pages work differently in this project compared to some other projects. The approach is considered safe, reasonable, and justified, and may be of interest to some users evaluating suitability of this project for a use-case.

The API is ignorant of paging state to help support a serverless deployment, and no state or token information is stored within the API or associated storage. However, the paging process requires some knowledge of state _somewhere_. State is contained within the paging tokens that are received and submitted by the client.

Paging tokens are included in the `next` and `prev` links in a multi-page response. Each token is a [JSON Web Token (JWT)](https://jwt.io/introduction) that provides all information required by the API to progress or regress across pages.

#### SQL

The payload of the JWT includes a parameterised SQL query and parameters that will be used to fetch the next or previous page of results. It also includes the ID of the most recent data load which the API uses to determine if it is paging across a data change, and which supports the behaviour described in the main README's [pagination section](../README.md#pagination).

With a paging token the client provides the API with the SQL query it should execute. In many scenarios this might raise security concerns. See [SQL Safety](#sql-safety) for how such concerns are addressed and negated.

#### SQL Safety

The JWT is secured using the HS256 algorithm and a private key that must be provided at deployment time, and is therefore considered immutable. Hashed JWTs are generally thought safe for verifying identity, and therefore should be capable of preventing SQL query tampering. Integration tests verify that the payload cannot be modified by a client between page requests ([example](https://github.com/sparkgeo/STAC-API-Serverless/blob/212f1a97f091efe19bd6f9edb6084b7f3d508d20/tests/with_environment/integration_tests/test_get_search.py#L201)). If it becomes possible - such as through credential theft - for a malicious actor to modify and re-sign a paging token JWT this is still not considered a significant concern. In a standard deployment the API reads Parquet index files with [read-only](https://github.com/sparkgeo/STAC-API-Serverless/blob/212f1a97f091efe19bd6f9edb6084b7f3d508d20/iac/cdk_deployment/cdk_deployment_stack.py#L68) access to an S3 bucket, which should prevent tampering with the index via SQL. If a malicious user is somehow able to modify files within the API container via a modified SQL query, any changes will be destroyed by the next Lambda invocation.

The JWT private key can be rotated at any time to reduce the risk of credential theft. This change will interrupt any clients actively paging across results, at which point affected clients can reissue their queries.

#### SQL Visibility

This approach exposes the API's SQL query content to clients, however no privileged information can be exposed in this way. The content of the SQL query is comprised entirely of:
1. information that can be gleaned from a review of this repository, and
2. parameters provided by the client.

The API uses placeholders to represent the location of the parquet index files it queries (e.g. an S3 URI) and replaces these immediately prior to SQL execution, so clients have no additional visibility of a deployment's storage infrastructure via a paging token.

The following image shows the content of a sample paging token returned in response to a search query:

```sh
curl -X 'POST' \
'https://host/search' \
-H 'Content-Type: application/json' \
-d '{"collections": ["joplin"]}'
```

![sample paging token jwt](./images/sample-paging-token-jwt.png "Sample Paging Token JWT")
39 changes: 8 additions & 31 deletions src/stac_fastapi/indexed/db.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import re
from datetime import UTC, datetime
from logging import Logger, getLogger
from os import environ
Expand All @@ -16,11 +15,6 @@

_logger: Final[Logger] = getLogger(__name__)
_query_timing_precision: Final[int] = 3
_query_object_identifier_prefix: Final[str] = "src:"
_query_object_identifier_suffix: Final[str] = ":src"
_query_object_identifier_template: Final[str] = (
f"{_query_object_identifier_prefix}{{}}{_query_object_identifier_suffix}"
)

_root_db_connection: DuckDBPyConnection = None
_parquet_uris: Dict[str, str] = {}
Expand Down Expand Up @@ -76,18 +70,19 @@ async def disconnect_from_db() -> None:
_logger.error(e)


# SQL queries include placeholder strings that are replaced with Parquet URIs prior to query execution.
# This improves query performance relative to creating views in DuckDB from Parquet files and querying those.
# Placeholders are used until the point of query execution so that API search pagination tokens,
# which are JWT-encoded SQL queries and visible to the client, do not leak implementation detail around
# parquet URI locations.
def format_query_object_name(object_name: str) -> str:
return _query_object_identifier_template.format(object_name)
if object_name in _parquet_uris:
return "'{}'".format(_parquet_uris[object_name])
raise Exception(
"Attempt to use non-existent query object name '{bad_name}'. Available object names: '{availables}'".format(
bad_name=object_name,
availables="', '".join(list(_parquet_uris.keys())),
)
)


def _execute(statement: str, params: Optional[List[Any]] = None) -> None:
start = time()
statement = _prepare_statement(statement)
_get_db_connection().execute(statement, params)
_sql_log_message(statement, time() - start, None, params)

Expand All @@ -100,7 +95,6 @@ async def fetchone(
if perform_latest_data_check:
await _ensure_latest_data()
start = time()
statement = _prepare_statement(statement)
result = _get_db_connection().execute(statement, params).fetchone()
_sql_log_message(statement, time() - start, 1 if result is not None else 0, params)
return result
Expand All @@ -114,7 +108,6 @@ async def fetchall(
if perform_latest_data_check:
await _ensure_latest_data()
start = time()
statement = _prepare_statement(statement)
result = _get_db_connection().execute(statement, params).fetchall()
_sql_log_message(statement, time() - start, len(result), params)
return result
Expand All @@ -130,22 +123,6 @@ def _get_db_connection():
return _root_db_connection.cursor()


def _prepare_statement(statement: str) -> str:
query_object_identifier_regex = rf"\b{re.escape(_query_object_identifier_prefix)}([^:]+){re.escape(_query_object_identifier_suffix)}\b"
for query_object_name in re.findall(query_object_identifier_regex, statement):
if query_object_name not in _parquet_uris:
_logger.warning(
f"{query_object_name} not in parquet URI map, query will likely fail"
)
continue
statement = re.sub(
rf"\b{re.escape(_query_object_identifier_prefix)}{re.escape(query_object_name)}{re.escape(_query_object_identifier_suffix)}\b",
f"'{_parquet_uris[query_object_name]}'",
statement,
)
return statement


def _sql_log_message(
statement: str,
duration: float,
Expand Down
75 changes: 37 additions & 38 deletions src/stac_fastapi/indexed/search/query_info.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,49 @@
from dataclasses import asdict, dataclass, field
from datetime import datetime
from json import JSONEncoder
from re import escape, match
from typing import Any, Dict, Final, List, Optional, Type, cast
from dataclasses import asdict, dataclass, replace
from json import JSONEncoder, loads
from typing import Any, Dict, Final, List, Optional, Self, Type, cast

_datetime_field_prefix: Final[str] = "datetime::"
from geojson_pydantic.geometries import parse_geometry_obj
from stac_pydantic.api.extensions.sort import SortExtension
from stac_pydantic.api.search import Intersection
from stac_pydantic.shared import BBox

# Increment this value if query structure changes, so that paging tokens from
# older query structures can be rejected.
current_query_version: Final[int] = 1


@dataclass(kw_only=True)
class QueryInfo:
query: str
params: List[Any] = field(default_factory=list)
query_version: int
ids: Optional[List[str]] = None
collections: Optional[List[str]] = None
bbox: Optional[BBox] = None
intersects: Optional[Intersection] = None
datetime: Optional[str] = None
filter: Optional[Dict[str, Any]] = None
filter_lang: str
order: Optional[List[SortExtension]] = None
limit: int
offset: Optional[int] = None
last_load_id: str

def next(self) -> "QueryInfo":
return QueryInfo(
query=self.query,
params=self.params,
limit=self.limit,
return replace(
self,
offset=(self.offset + self.limit)
if self.offset is not None
else self.limit,
last_load_id=self.last_load_id,
)

def previous(self) -> "QueryInfo":
# Assume logic of validating that a "previous" link is required (i.e. there is currently a non-None offset) is applied elsewhere.
# Technically we could apply that logic here, but we cannot determine if a "next" link is required in this module, so that would be insconsistent.
current_offset = cast(int, self.offset)
return QueryInfo(
query=self.query,
params=self.params,
limit=self.limit,
return replace(
self,
offset=(current_offset - self.limit)
if current_offset > self.limit
else None,
last_load_id=self.last_load_id,
)

def to_dict(self) -> Dict[str, Any]:
Expand All @@ -47,29 +53,22 @@ def to_dict(self) -> Dict[str, Any]:
def json_encoder() -> Type:
return _CustomJSONEncoder

def json_post_decoder(self) -> "QueryInfo":
new_params = []
for param in self.params:
if isinstance(param, str):
datetime_match = match(rf"^{escape(_datetime_field_prefix)}(.+)", param)
if datetime_match:
new_params.append(datetime.fromisoformat(datetime_match.group(1)))
continue
new_params.append(param)
return QueryInfo(
query=self.query,
params=new_params,
limit=self.limit,
offset=self.offset,
last_load_id=self.last_load_id,
def json_post_decoder(self: Self) -> "QueryInfo":
return replace(
self,
intersects=parse_geometry_obj(loads(cast(str, self.intersects)))
if self.intersects is not None
else None,
order=[SortExtension(**loads(cast(str, entry))) for entry in self.order]
if self.order is not None
else None,
)


class _CustomJSONEncoder(JSONEncoder):
def default(self, obj: Any) -> Any:
if isinstance(obj, datetime):
return "{}{}".format(
_datetime_field_prefix,
obj.isoformat(),
)
if isinstance(obj, Intersection):
return cast(Intersection, obj).model_dump_json()
if isinstance(obj, SortExtension):
return cast(SortExtension, obj).model_dump_json()
return JSONEncoder.default(self, obj)
Loading