-
Notifications
You must be signed in to change notification settings - Fork 475
/
Copy pathpyspark-session-2020-07-06-word-count.txt
executable file
·131 lines (129 loc) · 5.52 KB
/
pyspark-session-2020-07-06-word-count.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
$ cat /tmp/foxy.txt
a Fox jumped high and high and jumped and jumped
fox of red jumped fox of red jumped fox of red jumped
oh no
fox of blue jumped
oh boy
a Fox is a red fox of hen
a fox is a high fox
orange fox is high and blue and blue
mparsian@usfc-olw-025011 ~/spark-3.0.0 $ ./bin/pyspark
Python 3.7.2 (v3.7.2:9a3ffc0492, Dec 24 2018, 02:44:43)
[Clang 6.0 (clang-600.0.57)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
20/07/06 17:59:22 WARN Utils: Your hostname, Mahmouds-MacBook.local resolves to a loopback address: 127.0.0.1; using 10.0.0.93 instead (on interface en0)
20/07/06 17:59:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/07/06 17:59:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.0.0
/_/
Using Python version 3.7.2 (v3.7.2:9a3ffc0492, Dec 24 2018 02:44:43)
SparkSession available as 'spark'.
>>>
>>>
>>> spark
<pyspark.sql.session.SparkSession object at 0x7fca5613ec18>
>>>
>>> input_path = '/tmp/foxy.txt'
>>> input_path
'/tmp/foxy.txt'
>>>
>>> recs = spark.sparkContext.textFile(input_path)
>>> recs.count()
8
>>> recs.collect()
[
'a Fox jumped high and high and jumped and jumped',
'fox of red jumped fox of red jumped fox of red jumped',
'oh no',
'fox of blue jumped',
'oh boy',
'a Fox is a red fox of hen',
'a fox is a high fox',
'orange fox is high and blue and blue'
]
>>>
>>>
>>>
>>> splitted = recs.map(lambda x: x.split(" "))
>>> splitted.count()
8
>>> splitted.collect()
[
['a', 'Fox', 'jumped', 'high', 'and', 'high', 'and', 'jumped', 'and', 'jumped'],
['fox', 'of', 'red', 'jumped', 'fox', 'of', 'red', 'jumped', 'fox', 'of', 'red', 'jumped'],
['oh', 'no'],
['fox', 'of', 'blue', 'jumped'],
['oh', 'boy'],
['a', 'Fox', 'is', 'a', 'red', 'fox', 'of', 'hen'],
['a', 'fox', 'is', 'a', 'high', 'fox'],
['orange', 'fox', 'is', 'high', 'and', 'blue', 'and', 'blue']
]
>>>
>>>
>>> words = splitted.flatMap(lambda x: x)
>>> words.count()
52
>>> words.collect()
['a', 'Fox', 'jumped', 'high', 'and', 'high', 'and', 'jumped', 'and', 'jumped', 'fox', 'of', 'red', 'jumped', 'fox', 'of', 'red', 'jumped', 'fox', 'of', 'red', 'jumped', 'oh', 'no', 'fox', 'of', 'blue', 'jumped', 'oh', 'boy', 'a', 'Fox', 'is', 'a', 'red', 'fox', 'of', 'hen', 'a', 'fox', 'is', 'a', 'high', 'fox', 'orange', 'fox', 'is', 'high', 'and', 'blue', 'and', 'blue']
>>>
>>>
>>> pairs = words.map(lambda x : (x, 1))
>>> pairs.collect()
[('a', 1), ('Fox', 1), ('jumped', 1), ('high', 1), ('and', 1), ('high', 1), ('and', 1), ('jumped', 1), ('and', 1), ('jumped', 1), ('fox', 1), ('of', 1), ('red', 1), ('jumped', 1), ('fox', 1), ('of', 1), ('red', 1), ('jumped', 1), ('fox', 1), ('of', 1), ('red', 1), ('jumped', 1), ('oh', 1), ('no', 1), ('fox', 1), ('of', 1), ('blue', 1), ('jumped', 1), ('oh', 1), ('boy', 1), ('a', 1), ('Fox', 1), ('is', 1), ('a', 1), ('red', 1), ('fox', 1), ('of', 1), ('hen', 1), ('a', 1), ('fox', 1), ('is', 1), ('a', 1), ('high', 1), ('fox', 1), ('orange', 1), ('fox', 1), ('is', 1), ('high', 1), ('and', 1), ('blue', 1), ('and', 1), ('blue', 1)]
>>>
>>>
>>> freq = pairs.reduceByKey(lambda a, b: a+b)
>>>
>>> freq.collect()
[('Fox', 2), ('high', 4), ('of', 5), ('oh', 2), ('no', 1), ('boy', 1), ('is', 3), ('hen', 1), ('orange', 1), ('a', 5), ('jumped', 7), ('and', 5), ('fox', 8), ('red', 4), ('blue', 3)]
>>>
>>>
>>> grouped = pairs.groupByKey()
>>> grouped.collect()
[
('Fox', <pyspark.resultiterable.ResultIterable object at 0x7fca56198ef0>),
('high', <pyspark.resultiterable.ResultIterable object at 0x7fca56198b00>),
('of', <pyspark.resultiterable.ResultIterable object at 0x7fca56198dd8>),
('oh', <pyspark.resultiterable.ResultIterable object at 0x7fca56198e80>),
('no', <pyspark.resultiterable.ResultIterable object at 0x7fca56198d30>),
('boy', <pyspark.resultiterable.ResultIterable object at 0x7fca56198da0>),
('is', <pyspark.resultiterable.ResultIterable object at 0x7fca56198be0>),
('hen', <pyspark.resultiterable.ResultIterable object at 0x7fca56198eb8>),
('orange', <pyspark.resultiterable.ResultIterable object at 0x7fca56198710>),
('a', <pyspark.resultiterable.ResultIterable object at 0x7fca561989e8>),
('jumped', <pyspark.resultiterable.ResultIterable object at 0x7fca56198cc0>),
('and', <pyspark.resultiterable.ResultIterable object at 0x7fca561988d0>),
('fox', <pyspark.resultiterable.ResultIterable object at 0x7fca56198828>),
('red', <pyspark.resultiterable.ResultIterable object at 0x7fca56198668>),
('blue', <pyspark.resultiterable.ResultIterable object at 0x7fca561ab080>)
]
>>>
>>> grouped.mapValues(lambda iter: list(iter)).collect()
[
('Fox', [1, 1]),
('high', [1, 1, 1, 1]),
('of', [1, 1, 1, 1, 1]),
('oh', [1, 1]),
('no', [1]),
('boy', [1]),
('is', [1, 1, 1]),
('hen', [1]),
('orange', [1]),
('a', [1, 1, 1, 1, 1]),
('jumped', [1, 1, 1, 1, 1, 1, 1]),
('and', [1, 1, 1, 1, 1]),
('fox', [1, 1, 1, 1, 1, 1, 1, 1]),
('red', [1, 1, 1, 1]),
('blue', [1, 1, 1])
]
>>> freq2 = grouped.mapValues(lambda iter: sum(iter))
>>> freq2.collect()
[('Fox', 2), ('high', 4), ('of', 5), ('oh', 2), ('no', 1), ('boy', 1), ('is', 3), ('hen', 1), ('orange', 1), ('a', 5), ('jumped', 7), ('and', 5), ('fox', 8), ('red', 4), ('blue', 3)]
>>>