-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathbase.py
89 lines (64 loc) · 2.43 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import logging
from typing import Any, Callable, Dict, List
from pyspark.sql import DataFrame, SparkSession
from filters.constants import PrecomputedFeatureName
from utils import initialize_logger
from spark.constants import NUM_OUTPUT_PARTITIONS, SPARK_CACHE_DIR
FilterFunc = Callable[..., Any]
PrecomputedFeatures = Dict[PrecomputedFeatureName, DataFrame]
LOGGER: logging.Logger = initialize_logger()
class MetricFilterPipeline:
def __init__(self):
"""
Pipeline for applying filters to a dataset.
"""
self.filters: List[FilterFunc] = []
self.features: PrecomputedFeatures = {}
self.spark: SparkSession
def register_filter(self) -> FilterFunc:
"""
Decorator for registering a filter function to the pipeline.
Returns:
FilterFunc: Decorated filter function
"""
def decorator(filter_func: FilterFunc) -> FilterFunc:
def wrapper(*args, **kwargs) -> Any:
return filter_func(*args, **kwargs)
LOGGER.info(f"Registering filter {filter_func.__name__}...")
self.filters.append(filter_func)
return wrapper
return decorator
def register_features(self, features: PrecomputedFeatures) -> None:
"""
Register precomputed features to the pipeline.
Args:
features (PrecomputedFeatures): Precomputed features
Returns:
None
"""
LOGGER.info(f"Registering features {features.keys()}...")
self.features.update(features)
def register_spark_session(self, spark: SparkSession) -> None:
"""
Register Spark session to the pipeline.
Args:
spark (SparkSession): Spark session
Returns:
None
"""
self.spark = spark
def transform(self, original: DataFrame) -> DataFrame:
"""
Apply all filters to the dataset.
Args:
original (DataFrame): Original dataset
Returns:
DataFrame: Filtered dataset
"""
current_dataset = original
for filter_func in self.filters:
# Checkpointing each filter to side-step potential OOM issues
LOGGER.info(f"Running filter {filter_func.__name__}...")
current_dataset: DataFrame = filter_func(current_dataset, self.features).checkpoint()
return current_dataset
PIPELINE_SINGLETON = MetricFilterPipeline()