Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,21 @@
beam.Row(recipe='pie', fruit='blueberry', quantity=1, unit_price=2.00),
beam.Row(recipe='muffin', fruit='blueberry', quantity=2, unit_price=2.00),
beam.Row(recipe='muffin', fruit='banana', quantity=3, unit_price=1.00),
beam.Row(recipe='pie', fruit='strawberry', quantity=3, unit_price=1.50),
]
# [END groupby_table]


def groupby_attr(test=None):
with beam.Pipeline() as p:
# [START groupby_attr]
grouped = (
p
| beam.Create(GROCERY_LIST)
| beam.GroupBy('recipe')
| beam.Map(print))
grouped = (p | beam.Create(GROCERY_LIST) | beam.GroupBy('recipe'))
# [END groupby_attr]

if test:
test(grouped)
if test:
test(grouped)
else:
grouped | beam.Map(print)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we've fixed this, we should



if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
beam.Row(recipe='pie', fruit='blueberry', quantity=1, unit_price=2.00),
beam.Row(recipe='muffin', fruit='blueberry', quantity=2, unit_price=2.00),
beam.Row(recipe='muffin', fruit='banana', quantity=3, unit_price=1.00),
beam.Row(recipe='pie', fruit='strawberry', quantity=3, unit_price=1.50),
]
# [END groupby_table]

Expand All @@ -53,12 +54,13 @@ def groupby_attr_expr(test=None):
grouped = (
p
| beam.Create(GROCERY_LIST)
| beam.GroupBy('recipe', is_berry=lambda x: 'berry' in x.fruit)
| beam.Map(print))
| beam.GroupBy('recipe', is_berry=lambda x: 'berry' in x.fruit))
# [END groupby_attr_expr]

if test:
test(grouped)
if test:
test(grouped)
else:
grouped | beam.Map(print)


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ def groupby_expr(test=None):
p
| beam.Create(
['strawberry', 'raspberry', 'blueberry', 'blackberry', 'banana'])
| beam.GroupBy(lambda s: s[0])
| beam.Map(print))
| beam.GroupBy(lambda s: s[0]))
# [END groupby_expr]
if test:
test(grouped)
else:
grouped | beam.Map(print)


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
beam.Row(recipe='pie', fruit='blueberry', quantity=1, unit_price=2.00),
beam.Row(recipe='muffin', fruit='blueberry', quantity=2, unit_price=2.00),
beam.Row(recipe='muffin', fruit='banana', quantity=3, unit_price=1.00),
beam.Row(recipe='pie', fruit='strawberry', quantity=3, unit_price=1.50),
]
# [END groupby_table]

Expand All @@ -55,12 +56,13 @@ def expr_aggregate(test=None):
| beam.Create(GROCERY_LIST)
| beam.GroupBy('recipe').aggregate_field(
'quantity', sum, 'total_quantity').aggregate_field(
lambda x: x.quantity * x.unit_price, sum, 'price')
| beam.Map(print))
lambda x: x.quantity * x.unit_price, sum, 'price'))
# [END expr_aggregate]

if test:
test(grouped)
if test:
test(grouped)
else:
grouped | beam.Map(print)


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
beam.Row(recipe='pie', fruit='blueberry', quantity=1, unit_price=2.00),
beam.Row(recipe='muffin', fruit='blueberry', quantity=2, unit_price=2.00),
beam.Row(recipe='muffin', fruit='banana', quantity=3, unit_price=1.00),
beam.Row(recipe='pie', fruit='strawberry', quantity=3, unit_price=1.50),
]
# [END groupby_table]

Expand All @@ -57,11 +58,12 @@ def global_aggregate(test=None):
| beam.GroupBy().aggregate_field(
'unit_price', min, 'min_price').aggregate_field(
'unit_price', MeanCombineFn(), 'mean_price').aggregate_field(
'unit_price', max, 'max_price')
| beam.Map(print))
'unit_price', max, 'max_price'))
# [END global_aggregate]
if test:
test(grouped)
else:
grouped | beam.Map(print)


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
beam.Row(recipe='pie', fruit='blueberry', quantity=1, unit_price=2.00),
beam.Row(recipe='muffin', fruit='blueberry', quantity=2, unit_price=2.00),
beam.Row(recipe='muffin', fruit='banana', quantity=3, unit_price=1.00),
beam.Row(recipe='pie', fruit='strawberry', quantity=3, unit_price=1.50),
]
# [END groupby_table]

Expand All @@ -54,9 +55,9 @@ def simple_aggregate(test=None):
p
| beam.Create(GROCERY_LIST)
| beam.GroupBy('fruit').aggregate_field(
'quantity', sum, 'total_quantity')
| beam.Map(print))
'quantity', sum, 'total_quantity'))
# [END simple_aggregate]

if test:
test(grouped)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@
from .groupby_simple_aggregate import simple_aggregate
from .groupby_two_exprs import groupby_two_exprs

#
# TODO: Remove early returns in check functions
# https://github.com/apache/beam/issues/30778
skip_due_to_30778 = True


class UnorderedList(object):
Expand Down Expand Up @@ -80,8 +76,6 @@ def normalize_kv(k, v):


def check_groupby_expr_result(grouped):
if skip_due_to_30778:
return
assert_that(
grouped | beam.MapTuple(normalize_kv),
equal_to([
Expand All @@ -94,8 +88,6 @@ def check_groupby_expr_result(grouped):


def check_groupby_two_exprs_result(grouped):
if skip_due_to_30778:
return
assert_that(
grouped | beam.MapTuple(normalize_kv),
equal_to([
Expand All @@ -109,8 +101,6 @@ def check_groupby_two_exprs_result(grouped):


def check_groupby_attr_result(grouped):
if skip_due_to_30778:
return
assert_that(
grouped | beam.MapTuple(normalize_kv),
equal_to([
Expand Down Expand Up @@ -157,8 +147,6 @@ def check_groupby_attr_result(grouped):


def check_groupby_attr_expr_result(grouped):
if skip_due_to_30778:
return
assert_that(
grouped | beam.MapTuple(normalize_kv),
equal_to([
Expand Down Expand Up @@ -209,10 +197,8 @@ def check_groupby_attr_expr_result(grouped):


def check_simple_aggregate_result(grouped):
if skip_due_to_30778:
return
assert_that(
grouped | beam.MapTuple(normalize_kv),
grouped,
equal_to([
#[START simple_aggregate_result]
NamedTuple(fruit='strawberry', total_quantity=3),
Expand All @@ -225,8 +211,6 @@ def check_simple_aggregate_result(grouped):


def check_expr_aggregate_result(grouped):
if skip_due_to_30778:
return
assert_that(
grouped | beam.Map(normalize),
equal_to([
Expand All @@ -238,8 +222,6 @@ def check_expr_aggregate_result(grouped):


def check_global_aggregate_result(grouped):
if skip_due_to_30778:
return
assert_that(
grouped | beam.Map(normalize),
equal_to([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ def groupby_two_exprs(test=None):
p
| beam.Create(
['strawberry', 'raspberry', 'blueberry', 'blackberry', 'banana'])
| beam.GroupBy(letter=lambda s: s[0], is_berry=lambda s: 'berry' in s)
| beam.Map(print))
| beam.GroupBy(letter=lambda s: s[0], is_berry=lambda s: 'berry' in s))
# [END groupby_two_exprs]

if test:
test(grouped)
if test:
test(grouped)
else:
grouped | beam.Map(print)


if __name__ == '__main__':
Expand Down
Loading