-
Notifications
You must be signed in to change notification settings - Fork 8
Description
With #181 landing and #185 coming soon, we realized that we are getting away from one of the core goals of stac task: make it as easy as possible for task authors to follow best practices. The problem is becoming that we have too many boiler plate methods that authors need to be aware of and call when needed and in the correct order in their process methods. We'd rather things just happen automatically, but we have to weigh doing things automatically with allowing authors to opt in or out of certain behaviors in a easy way.
As a result we want to try and experiment, and implement a PoC for how we could expose a common set of operations via a declarative configuration. Examples of possible operations include but are certainly not limited to things like:
- adding STAC extensions and/or other metadata enrichment steps like:
- the raster extension
- the processing extension
- the projection extension
- file extension info for assets
- extracting an asset footprint for an item's geometry
- using the antimeridian package to correct geometries
- uploading item assets to object storage
- uploading an item to object storage
To make this explicit and self-documenting, we could create a processing config object, which could take instances for each and every supported processing action. Then we could have something like an ItemProcessing monad to compose an output item with a given configuration. This could look maybe something like this:
# something to make subclass defintion easy kinda like a pydantic BaseModel?
class BaseProcessor(abc.ABC):
_order: ClassVar[int]
def __call__(self, item: Item) -> Item:
raise NotImplementedError
# leaving most of these config classes to the imagination
class RasterExtension(BaseProcessor):
_order = 100
...
class ProcessingExtensionConfig(BaseProcessor):
_order = 200
...
class AssetUploadConfig(BaseProcessor):
_order = 300
includes: List[str] = field(default_factory=list)
excludes: List[str] = field(default_factory=list)
...
class ItemUploadConfig(BaseProcessor):
_order = 400
check_existing_item_for_created_datetime: bool = True
...
def __call__(self, item: Item) -> Item:
return upload_item_to_S3(item)
# compose all of the processors together
@dataclass
class ItemProcessingConfig:
raster_extension: RasterExtension | None
processing_extension: ProcessingExtension | None
asset_upload: AssetUpload | None
item_upload: ItemUpload | None
_functions: List[Callable[[Item], Item] = field(init=False)
def __post_init__(self):
self._functions = sorted(
function for function in [
self.raster_extension,
self.processing_extension,
self.asset_upload,
self.item_upload,
] if function is not None,
key=lambda x: x._order,
)
def __call__(self, item: Item) -> Item:
return functools.reduce(lambda x, func: func(x), self._functions, item)
# a wrapper to compose an item with a config
@dataclass
class ItemProcessing:
item: Item
config: ItemProcessingConfig
# provide a convenience constructor to make it easy
# for users to apply the same config to multiple items
@classmethod
def for_items(cls, items: Sequence[Item], config: ItemProcessingConfig) -> List[Item]:
return [cls(item=item, config=config) for item in items]Then process would be updated to have a type signature returning List[dict[str, Any] | ItemProcessing] and we could have a new post processing method to do something like:
for item_processing in filter(lambda x: is_instance(x, ItemProcessing), outputs):
item_processing()One thing this idea doesn't address is how to allow customizations, i.e., user-defined processors (note that processor might be a terrible name, we should do better) to be added to the chain. We could go with an entirely dynamic mechanism for this, but that would have the effect of not working with type checking and would effectively duplicate the current situation where users would have to be aware of what processors are available. Again, then this would be no different than having to know what functions are available, just with the added complexity/indirection of this whole processor thing.
Maybe we solve this by adding another optional field to ItemProcessingConfig (again, workshop the names, none of the names here are necessarily good) that takes a Sequence of BaseProcessor instances, such that users then must configure (or explicitly opt out) of any built in processors, but have the option of adding any arbitrary ones they want (I can see end users potentially creating their own processors to enforce their own best practices within a pipeline).
Anyway, the above is just an idea, and is code written in the issue text editor. It might be a bad idea, or simply other ideas might be better. Whatever the case, we need to prototype how we can do something like this. Once we have a viable and accepted option, we'll want to create issues to:
- make a production implementation of the option
- add any "processors" that we want to have built-in, such as all the ideas bullet pointed above (and any others that come up)