-
Notifications
You must be signed in to change notification settings - Fork 475
/
Copy pathpyspark-session-2020-10-12.txt
executable file
·116 lines (101 loc) · 3.85 KB
/
pyspark-session-2020-10-12.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
$ ls -l /tmp/data/
-rw-r--r-- 1 mparsian wheel 72 Oct 12 20:00 file1
-rw-r--r-- 1 mparsian wheel 94 Oct 12 20:01 file2
-rw-r--r-- 1 mparsian wheel 35 Oct 12 20:01 file3
$ cat /tmp/data/file1
file1: this is record 1
file1: this is record 2
file1: this is record 3
$ cat /tmp/data/file2
file2: this is record 1
file2: this is record 2
file2: this is fox 3
file2: this is it 4
$ cat /tmp/data/file3
file3: record 1
file3: ewcord 2222
$ ./bin/pyspark
Python 3.7.2 (v3.7.2:9a3ffc0492, Dec 24 2018, 02:44:43)
[Clang 6.0 (clang-600.0.57)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.0.0
/_/
Using Python version 3.7.2 (v3.7.2:9a3ffc0492, Dec 24 2018 02:44:43)
SparkSession available as 'spark'.
>>> input_path = '/tmp/data'
>>>
>>> recs = spark.sparkContext.textFile(input_path)
>>> recs.count()
9
>>> recs.collect()
['file3: record 1', 'file3: ewcord 2222', 'file2: this is record 1', 'file2: this is record 2', 'file2: this is fox 3', 'file2: this is it 4', 'file1: this is record 1', 'file1: this is record 2', 'file1: this is record 3']
>>>
>>> union2 = recs.union(recs)
>>> union2.count()
18
>>> union2.collect()
['file3: record 1', 'file3: ewcord 2222', 'file2: this is record 1', 'file2: this is record 2', 'file2: this is fox 3', 'file2: this is it 4', 'file1: this is record 1', 'file1: this is record 2', 'file1: this is record 3', 'file3: record 1', 'file3: ewcord 2222', 'file2: this is record 1', 'file2: this is record 2', 'file2: this is fox 3', 'file2: this is it 4', 'file1: this is record 1', 'file1: this is record 2', 'file1: this is record 3']
>>> records = [('A', 1), ('B', 10), ('A', 2), ('A', 3), ('B', 20), ('B', 60)]
>>>
>>>
>>> records
[('A', 1), ('B', 10), ('A', 2), ('A', 3), ('B', 20), ('B', 60)]
>>>
>>> recs_rdd = spark.sparkContext.parallelize(records)
>>> recs_rdd.count()
6
>>> recs_rdd.collect()
[('A', 1), ('B', 10), ('A', 2), ('A', 3), ('B', 20), ('B', 60)]
>>> # recs_rdd: RDD[(String, Integer)]
...
>>> sum_per_key = recs_rdd.reduceByKey(lambda x, y: x+y)
>>> sum_per_key.count()
2
>>> sum_per_key.collect()
[('B', 90), ('A', 6)]
>>> # avg_by_key: [('B', 30), ('A', 2)]
...
>>>
>>> sum_count = recs_rdd.mapValues(lambda v: (v, 1))
>>>
>>> sum_count.collect()
[('A', (1, 1)), ('B', (10, 1)), ('A', (2, 1)), ('A', (3, 1)), ('B', (20, 1)), ('B', (60, 1))]
>>>
>>>
>>> sum_count1 = (10, 1)
>>> sum_count2 = (20, 2)
>>> # (10+20, 1+2)
... # (30, 3)
...
>>> sum_count_per_key = sum_count.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
>>> sum_count_per_key.collect()
[('B', (90, 3)), ('A', (6, 3))]
>>>
])> avg_by_key = sum_count_per_key.mapValues(lambda sum_and_count_tuple : sum_and_count_tuple[0] / sum_and_count_tuple[1
>>> avg_by_key.count()
2
>>> avg_by_key.collect()
[('B', 30.0), ('A', 2.0)]
>>>
>>> sum_count.collect()
[('A', (1, 1)), ('B', (10, 1)), ('A', (2, 1)), ('A', (3, 1)), ('B', (20, 1)), ('B', (60, 1))]
>>> def add_sum_count(x, y):
... sum2 = x[0] + y[0]
... count = x[1] + y[1]
... return (sum2, count)
...
>>>
>>> sum_count_per_key = sum_count.reduceByKey(lambda x, y: add_sum_count(x, y))
>>> sum_count_per_key.collect()
[('B', (90, 3)), ('A', (6, 3))]
>>> avg_per_key = sum_count_per_key.mapValues(lambda tuple: tuple[0] / tuple[1])
>>> avg_per_key.collect()
[('B', 30.0), ('A', 2.0)]
>>>