1
+ # import required libraries
2
+ from pyspark .sql import SparkSession
3
+ from pyspark .sql .functions import col
4
+ from pyspark .sql .functions import udf
5
+ from pyspark .sql .types import StringType
6
+
7
+ #--------------------------------------------------
8
+ # Demo concept of Spark UDF (user-defined-function)
9
+ #--------------------------------------------------
10
+ # @author: Mahmoud Parsian
11
+ #--------------------------------------------------
12
+ def convert_case (name ):
13
+ if name is None : return None
14
+ if len (name ) < 1 : return ""
15
+ result_string = ""
16
+ arr = name .split (" " )
17
+ for x in arr :
18
+ result_string += x [0 :1 ].upper () + x [1 :len (x )] + " "
19
+ #end-for
20
+ return result_string .strip ()
21
+ #end-def
22
+ #--------------------------------------------------
23
+ def to_upper_case (name ):
24
+ if name is None : return None
25
+ if len (name ) < 1 : return ""
26
+ return name .upper ()
27
+ #end-def
28
+ #--------------------------------------------------
29
+ #
30
+ # create a SparkSession object
31
+ spark = SparkSession .builder .appName ('UDF-Learning' ).getOrCreate ()
32
+
33
+ # define column names for a DataFrame
34
+ column_names = ["ID" , "Name" ]
35
+
36
+ # define some rows for a DataFrame
37
+ some_data = [("100" , "john jones" ),
38
+ ("200" , "tracey smith" ),
39
+ ("300" , "amy sanders" ),
40
+ ("400" , None )]
41
+
42
+ # create a DataFrame
43
+ df = spark .createDataFrame (data = some_data ,schema = column_names )
44
+
45
+ # display content of a DataFrame for testing/debugging
46
+ df .show (truncate = False )
47
+
48
+
49
+ # Converting function to UDF
50
+ convert_case_udf = udf (lambda p : convert_case (p ))
51
+
52
+ # use UDF in select stmt
53
+ df .select (col ("ID" ), convert_case_udf (col ("Name" )).alias ("Name" )).show (truncate = False )
54
+
55
+ # create a UDF function
56
+ upper_case_udf = udf (lambda p : to_upper_case (p ), StringType ())
57
+
58
+ # Apply a UDF using withColumn
59
+ df .withColumn ("Upper Name" , upper_case_udf (col ("Name" ))).show (truncate = False )
60
+
61
+ # Using UDF on SQL
62
+ spark .udf .register ("convert_UDF" , convert_case , StringType ())
63
+ df .createOrReplaceTempView ("NAME_TABLE" )
64
+ spark .sql ("select ID, convert_UDF(Name) as Name from NAME_TABLE" ).show (truncate = False )
65
+
0 commit comments