@@ -102,3 +102,38 @@ def build(self) -> HealpixGraph:
102102 culled_graph = cull (previous .graph , selected_keys )
103103 pixel_keys = {p : k for p , k in zip (selected_pixels , selected_keys )}
104104 return HealpixGraph (culled_graph , pixel_keys )
105+
106+
107+ class AlignAndApply (Operation ):
108+ def __init__ (self , input_ops : Sequence [Operation ], pixel_lists : Sequence [Sequence [HealpixPixel | None ]],
109+ func ,
110+ meta , output_pixels : Sequence [HealpixPixel ], * args , ** kwargs ):
111+ self .input_ops = input_ops
112+ self .pixel_lists = pixel_lists
113+ if len (self .input_ops ) != len (self .pixel_lists ):
114+ raise ValueError ("Inccorect Align and Apply Setup" )
115+ self .func = func
116+ self ._meta = meta
117+ self .output_pixels = output_pixels
118+ self .args = args
119+ self .kwargs = kwargs
120+
121+ @property
122+ def meta (self ) -> pd .DataFrame :
123+ return self ._meta
124+
125+ def build (self ) -> HealpixGraph :
126+ graphs = [op .build () if op is not None else None for op in self .input_ops ]
127+ metas = [op .meta if op is not None else None for op in self .input_ops ]
128+ graph = {}
129+ for g in graphs :
130+ graph = graphs | g
131+ for pixels in zip (* self .pixel_lists ):
132+ task_refs = []
133+ for g , m , p in zip (graphs , metas , pixels ):
134+ if g is None :
135+ task_refs .append (None )
136+ elif p is None or p not in g .pixel_to_key_map :
137+ task_refs .append (m )
138+ else :
139+ task_refs .append (TaskRef (g .pixel_to_key_map [p ]))
0 commit comments