Skip to content

Commit 576acab

Browse files
committed
Supports Asynchronous Runs in Interactive Beam
1 parent 108225d commit 576acab

File tree

6 files changed

+1459
-2
lines changed

6 files changed

+1459
-2
lines changed

sdks/python/apache_beam/runners/interactive/interactive_beam.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
from apache_beam.runners.interactive.display.pcoll_visualization import visualize
5858
from apache_beam.runners.interactive.display.pcoll_visualization import visualize_computed_pcoll
5959
from apache_beam.runners.interactive.options import interactive_options
60+
from apache_beam.runners.interactive.recording_manager import AsyncComputationResult
6061
from apache_beam.runners.interactive.utils import deferred_df_to_pcollection
6162
from apache_beam.runners.interactive.utils import elements_to_df
6263
from apache_beam.runners.interactive.utils import find_pcoll_name
@@ -1012,6 +1013,88 @@ def as_pcollection(pcoll_or_df):
10121013
return result_tuple
10131014

10141015

1016+
@progress_indicated
1017+
def compute(
1018+
*pcolls: Union[Dict[Any, PCollection], Iterable[PCollection], PCollection],
1019+
wait_for_inputs: bool = True,
1020+
blocking: bool = False,
1021+
runner=None,
1022+
options=None,
1023+
force_compute=False,
1024+
) -> Optional[AsyncComputationResult]:
1025+
"""Computes the given PCollections, potentially asynchronously.
1026+
1027+
Args:
1028+
*pcolls: PCollections to compute. Can be a single PCollection, an iterable
1029+
of PCollections, or a dictionary with PCollections as values.
1030+
wait_for_inputs: Whether to wait until the asynchronous dependencies are
1031+
computed. Setting this to False allows to immediately schedule the
1032+
computation, but also potentially results in running the same pipeline
1033+
stages multiple times.
1034+
blocking: If False, the computation will run in non-blocking fashion. In
1035+
Colab/IPython environment this mode will also provide the controls for the
1036+
running pipeline. If True, the computation will block until the pipeline
1037+
is done.
1038+
runner: (optional) the runner with which to compute the results.
1039+
options: (optional) any additional pipeline options to use to compute the
1040+
results.
1041+
force_compute: (optional) if True, forces recomputation rather than using
1042+
cached PCollections.
1043+
1044+
Returns:
1045+
An AsyncComputationResult object if blocking is False, otherwise None.
1046+
"""
1047+
flatten_pcolls = []
1048+
for pcoll_container in pcolls:
1049+
if isinstance(pcoll_container, dict):
1050+
flatten_pcolls.extend(pcoll_container.values())
1051+
elif isinstance(pcoll_container, (beam.pvalue.PCollection, DeferredBase)):
1052+
flatten_pcolls.append(pcoll_container)
1053+
else:
1054+
try:
1055+
flatten_pcolls.extend(iter(pcoll_container))
1056+
except TypeError:
1057+
raise ValueError(
1058+
f'The given pcoll {pcoll_container} is not a dict, an iterable or '
1059+
'a PCollection.')
1060+
1061+
pcolls_set = set()
1062+
for pcoll in flatten_pcolls:
1063+
if isinstance(pcoll, DeferredBase):
1064+
pcoll, _ = deferred_df_to_pcollection(pcoll)
1065+
watch({f'anonymous_pcollection_{id(pcoll)}': pcoll})
1066+
assert isinstance(
1067+
pcoll, beam.pvalue.PCollection
1068+
), f'{pcoll} is not an apache_beam.pvalue.PCollection.'
1069+
pcolls_set.add(pcoll)
1070+
1071+
if not pcolls_set:
1072+
_LOGGER.info('No PCollections to compute.')
1073+
return None
1074+
1075+
pcoll_pipeline = next(iter(pcolls_set)).pipeline
1076+
user_pipeline = ie.current_env().user_pipeline(pcoll_pipeline)
1077+
if not user_pipeline:
1078+
watch({f'anonymous_pipeline_{id(pcoll_pipeline)}': pcoll_pipeline})
1079+
user_pipeline = pcoll_pipeline
1080+
1081+
for pcoll in pcolls_set:
1082+
if pcoll.pipeline is not user_pipeline:
1083+
raise ValueError('All PCollections must belong to the same pipeline.')
1084+
1085+
recording_manager = ie.current_env().get_recording_manager(
1086+
user_pipeline, create_if_absent=True)
1087+
1088+
return recording_manager.compute_async(
1089+
pcolls_set,
1090+
wait_for_inputs=wait_for_inputs,
1091+
blocking=blocking,
1092+
runner=runner,
1093+
options=options,
1094+
force_compute=force_compute,
1095+
)
1096+
1097+
10151098
@progress_indicated
10161099
def show_graph(pipeline):
10171100
"""Shows the current pipeline shape of a given Beam pipeline as a DAG.

0 commit comments

Comments
 (0)