-
Notifications
You must be signed in to change notification settings - Fork 475
/
Copy pathpyspark-session-2021-04-19.txt
executable file
·110 lines (106 loc) · 3.37 KB
/
pyspark-session-2021-04-19.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
$ ./bin/pyspark
Python 3.7.2 (v3.7.2:9a3ffc0492, Dec 24 2018, 02:44:43)
[Clang 6.0 (clang-600.0.57)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
21/04/19 20:20:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.1.1
/_/
Using Python version 3.7.2 (v3.7.2:9a3ffc0492, Dec 24 2018 02:44:43)
Spark context Web UI available at http://10.0.0.93:4040
Spark context available as 'sc' (master = local[*], app id = local-1618888841845).
SparkSession available as 'spark'.
>>>
>>># Create an RDD[(String, Integer)] as rdd
>>> kv =[('a', 3), ('a', 4), ('a', 5), ('b', 30),('b', 40),('b', 50),('z', 3)]
>>> rdd = spark.sparkContext.parallelize(kv)
>>>
>>>
>>> rdd.count()
7
>>> rdd.collect()
[('a', 3), ('a', 4), ('a', 5), ('b', 30), ('b', 40), ('b', 50), ('z', 3)]
>>> def mapfun1(e):
... k = e[0]
... v = e[1]
... return (k, (v, v+5))
...
>>># Create an RDD[(String, (Integer, Integer))] as rdd2
>>># rdd2 has key type of String and value type of (Integer, Integer)
>>> rdd2 = rdd.map(mapfun1)
>>>
>>> rdd2.collect()
[('a', (3, 8)), ('a', (4, 9)), ('a', (5, 10)), ('b', (30, 35)), ('b', (40, 45)), ('b', (50, 55)), ('z', (3, 8))]
>>> rdd2.count()
7
>>> # rdd: RDD[(String, Integer)]
...
>>> # rdd2: RDD[(String, (Integer, Integer)]
>>>
>>># Create an RDD[(String, Integer)] as rdd3
>>> rdd3 = rdd2.map(lambda x: (x[0], x[1][0]+x[1][1]))
>>> rdd3.count()
7
>>> rdd3.collect()
[('a', 11), ('a', 13), ('a', 15), ('b', 65), ('b', 85), ('b', 105), ('z', 11)]
>>>
>>> rdd31 = rdd2.mapValues(lambda v: v[0]+v[1])
>>> rdd31.count()
7
>>> rdd31.collect()
[('a', 11), ('a', 13), ('a', 15), ('b', 65), ('b', 85), ('b', 105), ('z', 11)]
>>>
>>>
>>>
>>> strings = ["abc", "xyzt", "", "123"]
>>> rdd_strings = spark.sparkContext.parallelize(strings)
>>>
>>> rdd_strings.count()
4
>>> rdd_strings_2 = rdd_strings.flatMap(lambda v: v)
>>> rdd_strings_2.collect()
['a', 'b', 'c', 'x', 'y', 'z', 't', '1', '2', '3']
>>>
>>> lists = [ [1, 2, 3], [], [6,7,8,9,10], [] ]
>>> rdd4 = spark.sparkContext.parallelize(lists)
>>> rdd4.collect()
[[1, 2, 3], [], [6, 7, 8, 9, 10], []]
>>> rdd4.count()
4
>>> rdd5 = rdd4.flatMap(lambda v: v)
>>> rdd5.collect()
[1, 2, 3, 6, 7, 8, 9, 10]
>>> rdd5.count()
8
>>>
>>> lists = [ [7, (1,2), (2,4)], ["abc", 99], [6, (7, 7), (8, 8)], [] ]
>>> rdd9 = spark.sparkContext.parallelize(lists)
>>> rdd9.collect()
[[7, (1, 2), (2, 4)], ['abc', 99], [6, (7, 7), (8, 8)], []]
>>> rdd9.count()
4
>>> rdd10 = rdd9.flatMap(lambda v: v)
>>> rdd10.collect()
[7, (1, 2), (2, 4), 'abc', 99, 6, (7, 7), (8, 8)]
>>>
>>>
>>> rdd11 = rdd10.flatMap(lambda v: v)
>>> rdd11.collect()
21/04/19 20:43:44 ERROR Executor: Exception in task 5.0 in stage 17.0 (TID 141)
TypeError: 'int' object is not iterable
>>>
>>> mylist = [(7, 1, 2), (2, 4), ('abc', 99, 6), (7, 7), (8, 8)]
>>> rdd = spark.sparkContext.parallelize(mylist)
>>> rdd.collect()
[(7, 1, 2), (2, 4), ('abc', 99, 6), (7, 7), (8, 8)]
>>> rdd2 = rdd.flatMap(lambda x: x)
>>> rdd2.collect()
[7, 1, 2, 2, 4, 'abc', 99, 6, 7, 7, 8, 8]
>>>
>>>