-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSource Code.txt
175 lines (57 loc) · 5.63 KB
/
Source Code.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
Databse:sourcedb
Creating customer table:
hive>create table customer(cust_id int,cust_f_name string,cust_l_name string,cust_ssn int,cust_street string,cust_city string,cust_state string,cust_country string,cust_zip int,cust_phone string,cust_email string) row format delimited fields terminated by ',' lines terminated by '\n' tblproperties("skip.header.line.count"="1");
Creating item table:
hive>create table item(item_code int,item_name string,item_price float) row format delimited fields terminated by ',' lines terminated by '\n' tblproperties("skip.header.line.count"="1");
Creating cust_item table:
hive>create table cust_item(tran_id int,tran_date DATE,cust_id int,item_code int,no_of_item int) row format delimited fields terminated by ',' lines terminated by '\n' tblproperties("skip.header.line.count"="1");
Loading the csv files into the above tables:
hive>load data local inpath '/home/hduser/Sourcelocal/CUSTOMER.csv' into table customer;
hive> load data local inpath '/hone/hduser/Sourcelocal/ITEM.csv' into table item;
hive> load data local inpath '/home/hduser/SourceLocal/CUST_ITEM.csv' into table cust_item;
Creating temporary table for cust_item:
hive>create temporary table temp_cust_item like cust_item tblproperties("skip.header.line.count"="1");
Loading data into temporary table:
hive> load data local inpath '/home/hduser/SourceLocal/CUST_ITEM.csv' into table temp_cust_item;
Creating external table tran_cust_item:
create external table tran_cust_item(tran_id int,tran_date date,cust_id int,cust_f_name string,cust_l_name string,cust_city string,cust_state string,item_code int,item_name string,item_price double,no_of_item int) partitioned by (year int,month int) row format delimited fields terminated by ',' lines terminated by '\n' stored as parquet location '/user/hduser/outputdir/year_month_part/';
Inserting data into tran_cust_item table using pyspark :
Note:set the dynamic partition to nonstrict using this command------>spark.conf.set("hive.exec.dynamic.partition.mode","nonstrict")
spark.sql("insert into tran_cust_item partition(year,month) select t.tran_id,t.tran_date,c.cust_id,c.cust_f_name,c.cust_l_name,c.cust_city,c.cust_state,i.item_code,i.item_name,i.item_price,t.no_of_item,year(tran_date) as year,month(tran_date) as month from cust_item t join customer c on c.cust_id=t.cust_id join item i on i.item_code=t.item_code")
Database:targetdb
Processing Functional Requirements:
1)Summarized Statement:
create table temp_tbl1(customer_name string,product_name string,year int,month string,item_value double,mon int) row format delimited fields terminated by ',' lines terminated by '\n';
insert into temp_tbl1 select concat_ws(' ',cust_f_name,cust_l_name) as customer_name,item_name,year,date_format(tran_date,'MMMMM'),round(sum(item_price),2),month from sourcedb.tran_cust_item group by cust_f_name,cust_l_name,item_name,year,tran_date,month order by customer_name,year,month;
Creating cust_purc_year_mon_summ table:
create external table cust_purc_year_mon_summ(customer_name string,product_name string,year int,month string,item_value double) row format delimited fields terminated by ',' lines terminated by '\n' stored as parquet location '/user/hduser/outputdir/summary/';
Inserting the summarized data into the above table:
spark.sql("insert into targetdb.cust_purc_year_mon_summ select customer_name,product_name,year,month,item_value from targetdb.temp_tbl1")
2)Item_least_moved:
Creating item_least_moved table:
create external table targetdb.item_least_moved(customer_name string,product_name string,SOLD_VALUE double,YEAR int) row format delimited fields terminated by ',' lines terminated by '\n' stored as parquet location '/user/hduser/outputdir/item_least_moved/';
Insert data into above table:
spark.sql("insert into targetdb.item_least_moved select concat_ws(' ',cust_f_name,cust_l_name),item_name,round(sum(item_price * no_of_item),2), year from sourcedb.tran_cust_item where year between 2017 and 2020 group by 1,2,4 order by 3 limit 10")
3)Min,MAX & Avg purchase value:
df=spark.sql("select cust_state as state_wise,cust_city as city_wise,year,round(min(item_price*no_of_item),2) as min_sale,round(max(item_price*no_of_item),2) as max_sale,round(avg(item_price*no_of_item),2) as avg_sale from sourcedb.tran_cust_item where year between 2017 and 2020 group by cust_state,cust_city,year order by state_wise,city_wise,year")
df.repartition(1).write.format("csv").mode("overwrite").option("header","true").save("hdfs://localhost:50000/user/hduser/output/Loc_Item_Anz_Summ.csv")
4)4th requirement
df=spark.sql("select item_name,sum(no_of_item) as Total_Sales,floor(sum(item_price*no_of_item)) as Value from tran_cust_item group by item_name order by Total_Sales desc").limit(3)
df.repartition(1).write.format("csv").mode("overwrite").option("header","true").save("hdfs://localhost:50000/user/hduser/output/Top_3_item.csv")
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
df=pd.read_csv("C:\\Users\\SUPER\\Downloads\\ItemRank.csv")
plt.figure(figsize=(6, 6))
plots = sns.barplot(x="item_name", y="Value", data=df)
for bar in plots.patches:
plots.annotate(int(bar.get_height()),
(bar.get_x() + bar.get_width() / 2,
bar.get_height()), ha='center', va='center',
size=15, xytext=(0, 8),
textcoords='offset points')
plt.xlabel("Product_Name", size=14)
plt.ylabel("Value", size=14)
plt.title("Top 3 items Value wise")
plt.show()