Skip to content

Commit ca4d3ba

Browse files
added minmax_use_mappartitions_v2.py
1 parent 93cbf20 commit ca4d3ba

File tree

2 files changed

+187
-0
lines changed

2 files changed

+187
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
from __future__ import print_function
2+
import sys
3+
from pyspark.sql import SparkSession
4+
5+
#-----------------------------------------------------
6+
# Find Minimum and Maximum of all input by
7+
# using the mapPartitions() transformations.
8+
#
9+
# The idea is that each partition will find
10+
# (local_min, local_max, local_count)
11+
# and then we find (final_min, final_max, final_count)
12+
# for all partitions.
13+
#
14+
# input ---- N partitioned ----> partition-1, partition-2, ... partition_N
15+
#
16+
# partition-1 => local_1 = (local_min_1, local_max_1, local_count_1)
17+
# partition-2 => local_2 = (local_min_2, local_max_2, local_count_2)
18+
# ...
19+
# partition-N => local_N = (local_min_N, local_max_N, local_count_N)
20+
#
21+
# final_min_max = minmax(local_1, local_2, ..., local_N)
22+
#
23+
#------------------------------------------------------
24+
# Input Parameters:
25+
# INPUT_PATH as a file of numbers
26+
#
27+
# Example: sample_numbers.txt
28+
#
29+
# $ cat sample_numbers.txt
30+
#23,24,22,44,66,77,44,44,555,666
31+
#12,4,555,66,67,68,57,55,56,45,45,45,66,77
32+
#34,35,36,97300,78,79
33+
#120,44,444,445,345,345,555
34+
#11,33,34,35,36,37,47,7777,8888,6666,44,55
35+
#10,11,44,66,77,78,79,80,90,98,99,100,102,103,104,105
36+
#6,7,8,9,10
37+
#8,9,10,12,12
38+
#7777
39+
#222,333,444,555,666,111,112,5,113,114
40+
#5555,4444,24
41+
#
42+
#
43+
#-------------------------------------------------------
44+
# @author Mahmoud Parsian
45+
#-------------------------------------------------------
46+
47+
#
48+
#
49+
#==========================================
50+
# Find (min, max, count) for a given single partition.
51+
#
52+
# partition_iterator is an iterator over
53+
# elements of a single partition.
54+
# partition_iterator : iterator over
55+
# set of input records and each input record
56+
# has the format as:
57+
# <number><,><number><,>...<number>
58+
#
59+
def minmax(partition_iterator):
60+
#
61+
print("type(partition_iterator)=", type(partition_iterator))
62+
#('type(partition_iterator)=', <type 'itertools.chain'>)
63+
# type(partition_iterator)= <type 'generator'>
64+
#
65+
try:
66+
first_record = next(partition_iterator)
67+
print("first_record=", first_record)
68+
except StopIteration:
69+
# for empty partitions
70+
return [None]
71+
#
72+
numbers = [int(n) for n in first_record.split(",")]
73+
local_min = min(numbers)
74+
local_max = max(numbers)
75+
local_count = len(numbers)
76+
#
77+
# handle remaining records in a partition
78+
for record in partition_iterator:
79+
#print("record=", record)
80+
numbers = [int(n) for n in record.split(",")]
81+
min2 = min(numbers)
82+
max2 = max(numbers)
83+
# update min, max, and count
84+
local_count += len(numbers)
85+
local_max = max(local_max, max2)
86+
local_min = min(local_min, min2)
87+
# end-for
88+
return [(local_min, local_max, local_count)]
89+
#end-def
90+
#
91+
#==========================================
92+
#
93+
# find final (min, max, count) from all partitions
94+
# min_max_count_list = [
95+
# (min_1, max_1, count_1),
96+
# (min_2, max_2, count_2),
97+
# ...
98+
# (min_N, max_N, count_N)
99+
# ]
100+
#
101+
def find_min_max_count(min_max_count_list):
102+
first_time = True
103+
# iterate tuple3 in min_max_count_list:
104+
for local_min, local_max, local_count in min_max_count_list:
105+
if (first_time):
106+
final_min = local_min
107+
final_max = local_max
108+
final_count = local_count
109+
first_time = False
110+
else:
111+
final_min = min(final_min, local_min)
112+
final_max = max(final_max, local_max)
113+
final_count += local_count
114+
#end-for
115+
return (final_min, final_max, final_count)
116+
#end-def
117+
#==========================================
118+
#
119+
def debug_partition(iterator):
120+
print("===begin-partition===")
121+
for x in iterator:
122+
print(x)
123+
print("===end-partition===")
124+
#end-def
125+
#
126+
#==========================================
127+
# main():
128+
129+
if len(sys.argv) != 2:
130+
print("Usage: ", __file__, "<input-path>", file=sys.stderr)
131+
exit(-1)
132+
133+
# create an instance of SparkSession
134+
spark = SparkSession.builder.appName("minmax").getOrCreate()
135+
#
136+
137+
# handle input parameter
138+
input_path = sys.argv[1]
139+
print("input_path=", input_path)
140+
141+
#=====================================
142+
# read input and apply mapPartitions()
143+
#=====================================
144+
# rdd: RDD[String]
145+
rdd = spark.sparkContext.textFile(input_path)
146+
print("rdd=", rdd)
147+
print("rdd.count=", rdd.count())
148+
print("rdd.collect()=", rdd.collect())
149+
print("rdd.getNumPartitions()=", rdd.getNumPartitions())
150+
#
151+
#=====================================
152+
# find (min, max, count) per partition
153+
# custom function is minmax
154+
#=====================================
155+
# min_max_count: RDD[(min, max, count)]
156+
# min_max_count: [(min_1, max_1, count_1),
157+
# (min_2, max_2, count_2),
158+
# ...,
159+
# (min_N, max_N, count_N)]
160+
#
161+
# Apply mapPartitions() and then drop None elements
162+
min_max_count = rdd.mapPartitions(minmax).filter(lambda x: x is not None)
163+
#
164+
print("min_max_count=", min_max_count)
165+
print("min_max_count.count=", min_max_count.count())
166+
min_max_count_list = min_max_count.collect()
167+
print("min_max_count.collect()=", min_max_count_list)
168+
169+
#=====================================
170+
# find final (min, max, count) from all partitions
171+
#=====================================
172+
final_min, final_max, final_count = find_min_max_count(min_max_count_list)
173+
print("final: (min, max, count)= (", final_min, ", ", final_max, ", ", final_count, ")")
174+
175+
# done!
176+
spark.stop()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#-----------------------------------------------------
2+
# This is a shell script to run minmax_use_mappartitions.py
3+
#-----------------------------------------------------
4+
# @author Mahmoud Parsian
5+
#-----------------------------------------------------
6+
export SPARK_HOME="/book/spark-3.2.0"
7+
export INPUT_PATH="/book/code/chap10/sample_numbers.txt"
8+
export SPARK_PROG="/book/code/chap10/minmax_use_mappartitions.py"
9+
#
10+
# run the PySpark program:
11+
$SPARK_HOME/bin/spark-submit $SPARK_PROG $INPUT_PATH

0 commit comments

Comments
 (0)