Skip to content

Commit efc6bae

Browse files
committed
Remove redundant infra naming
1 parent 8d850df commit efc6bae

35 files changed

+263
-263
lines changed

annotate_category_org_local.py

+9-9
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import ipaddress
77
import os
88

9-
import infra.dask_infra
9+
import infra.dask
1010
import mappers.domains
1111
import infra.platform
1212

@@ -19,7 +19,7 @@ def _categorize_user(in_path, out_path):
1919
2020
Requires the input parquet file specify an `fqdn` column, protocol, and ports
2121
"""
22-
frame = infra.dask_infra.read_parquet(in_path)
22+
frame = infra.dask.read_parquet(in_path)
2323

2424
# First pass assign by FQDN
2525
processor = mappers.domains.FqdnProcessor()
@@ -65,7 +65,7 @@ def _categorize_user(in_path, out_path):
6565
axis="columns",
6666
meta=("local", bool))
6767

68-
return infra.dask_infra.clean_write_parquet(frame, out_path, compute=False)
68+
return infra.dask.clean_write_parquet(frame, out_path, compute=False)
6969

7070

7171
def _assign_org_from_ip(ip, current):
@@ -144,7 +144,7 @@ def _augment_user_flows_with_stun_state(in_path, out_path):
144144
145145
Will not be correct unless the flow is indexed by time
146146
"""
147-
flow_frame = infra.dask_infra.read_parquet(in_path)
147+
flow_frame = infra.dask.read_parquet(in_path)
148148
# Bookeeping for building a new frame
149149
max_rows_per_division = 100000
150150
out_chunk = list()
@@ -231,7 +231,7 @@ def _augment_user_flows_with_stun_state(in_path, out_path):
231231
force=True)
232232
out_frame = out_frame.categorize(columns=["fqdn_source", "org", "category"])
233233

234-
infra.dask_infra.clean_write_parquet(out_frame, out_path)
234+
infra.dask.clean_write_parquet(out_frame, out_path)
235235
print("Finished writing user", in_path)
236236

237237

@@ -285,7 +285,7 @@ def merge_parquet_frames(in_parent_directory, out_frame_path):
285285
div_on_disk = sorted(os.listdir(in_parent_directory))
286286
for div in div_on_disk:
287287
div_path = os.path.join(in_parent_directory, div)
288-
frame = infra.dask_infra.read_parquet(div_path)
288+
frame = infra.dask.read_parquet(div_path)
289289

290290
if merged_frame is None:
291291
merged_frame = frame
@@ -299,11 +299,11 @@ def merge_parquet_frames(in_parent_directory, out_frame_path):
299299
force=True
300300
)
301301

302-
infra.dask_infra.clean_write_parquet(merged_frame, out_frame_path)
302+
infra.dask.clean_write_parquet(merged_frame, out_frame_path)
303303

304304

305305
def _print_heavy_hitter_unmapped_domains(infile):
306-
df = infra.dask_infra.read_parquet(infile)
306+
df = infra.dask.read_parquet(infile)
307307

308308
unmapped = df.loc[((df["org"] == "Unknown (Not Mapped)") | (df["category"] == "Unknown (Not Mapped)"))]
309309
df = unmapped.groupby("fqdn").sum()
@@ -328,7 +328,7 @@ def _print_heavy_hitter_unmapped_domains(infile):
328328

329329
if platform.large_compute_support:
330330
print("To see execution status, check out the dask status page at localhost:8787 while the computation is running.")
331-
client = infra.dask_infra.setup_platform_tuned_dask_client(20, platform)
331+
client = infra.dask.setup_platform_tuned_dask_client(20, platform)
332332

333333
#augment_all_user_flows(in_parent_directory, annotated_parent_directory, client)
334334
stun_augment_all_user_flows(annotated_parent_directory, stun_annotated_parent_directory, client)

bytes_per_category.py

+9-9
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
import pandas as pd
66

77
import infra.constants
8-
import infra.dask_infra
9-
import infra.pd_infra
8+
import infra.dask
9+
import infra.pd
1010
import infra.platform
1111

1212

@@ -18,7 +18,7 @@
1818

1919

2020
def reduce_to_pandas(outfile, dask_client):
21-
flows = infra.dask_infra.read_parquet(
21+
flows = infra.dask.read_parquet(
2222
"data/clean/flows/typical_fqdn_org_category_local_TM_DIV_none_INDEX_start")[["category", "org", "bytes_up", "bytes_down", "protocol", "dest_port"]]
2323

2424
# Compress to days
@@ -30,11 +30,11 @@ def reduce_to_pandas(outfile, dask_client):
3030
flows = flows.groupby(["start_bin", "category", "org"]).sum()
3131
flows = flows.compute()
3232

33-
infra.pd_infra.clean_write_parquet(flows, outfile)
33+
infra.pd.clean_write_parquet(flows, outfile)
3434

3535

3636
def make_category_plot(infile):
37-
grouped_flows = infra.pd_infra.read_parquet(infile)
37+
grouped_flows = infra.pd.read_parquet(infile)
3838
grouped_flows = grouped_flows.reset_index()
3939
grouped_flows["bytes_total"] = grouped_flows["bytes_up"] + grouped_flows["bytes_down"]
4040

@@ -121,7 +121,7 @@ def make_category_plot(infile):
121121

122122

123123
def make_category_aggregate_bar_chart(infile):
124-
grouped_flows = infra.pd_infra.read_parquet(infile).reset_index()
124+
grouped_flows = infra.pd.read_parquet(infile).reset_index()
125125

126126
# Consolidate by week instead of by day
127127
grouped_flows = grouped_flows[
@@ -167,7 +167,7 @@ def make_category_aggregate_bar_chart(infile):
167167

168168

169169
def compute_stats(infile, dimension):
170-
grouped_flows = infra.pd_infra.read_parquet(infile)
170+
grouped_flows = infra.pd.read_parquet(infile)
171171
grouped_flows = grouped_flows.reset_index()
172172
grouped_flows["bytes_total"] = grouped_flows["bytes_up"] + grouped_flows["bytes_down"]
173173

@@ -193,7 +193,7 @@ def compute_stats(infile, dimension):
193193
def make_org_plot(infile):
194194
""" Generate plots to explore the traffic distribution across organizations
195195
"""
196-
grouped_flows = infra.pd_infra.read_parquet(infile)
196+
grouped_flows = infra.pd.read_parquet(infile)
197197
grouped_flows = grouped_flows.reset_index()
198198
grouped_flows["bytes_total"] = grouped_flows["bytes_up"] + grouped_flows["bytes_down"]
199199

@@ -345,7 +345,7 @@ def make_org_plot(infile):
345345

346346
graph_temporary_file = "scratch/graphs/bytes_per_category"
347347
if platform.large_compute_support:
348-
client = infra.dask_infra.setup_platform_tuned_dask_client(10, platform)
348+
client = infra.dask.setup_platform_tuned_dask_client(10, platform)
349349
reduce_to_pandas(outfile=graph_temporary_file, dask_client=client)
350350
client.close()
351351

bytes_per_category_per_quantile.py

+7-7
Original file line numberDiff line numberDiff line change
@@ -4,32 +4,32 @@
44
import altair as alt
55
import pandas as pd
66

7-
import infra.dask_infra
8-
import infra.pd_infra
7+
import infra.dask
8+
import infra.pd
99
import infra.platform
1010

1111

1212
def reduce_to_pandas(outfile, dask_client):
13-
flows = infra.dask_infra.read_parquet(
13+
flows = infra.dask.read_parquet(
1414
"data/clean/flows/typical_fqdn_org_category_local_TM_DIV_none_INDEX_start")[["user", "category", "org", "bytes_up", "bytes_down"]]
1515

1616
# Do the grouping
1717
flows = flows.groupby(["user", "category", "org"]).sum()
1818
flows = flows.compute()
1919

20-
infra.pd_infra.clean_write_parquet(flows, outfile)
20+
infra.pd.clean_write_parquet(flows, outfile)
2121

2222

2323
def make_category_quantiles_plots(infile):
24-
grouped_flows = infra.pd_infra.read_parquet(infile)
24+
grouped_flows = infra.pd.read_parquet(infile)
2525
grouped_flows = grouped_flows.reset_index()
2626
grouped_flows["bytes_total"] = grouped_flows["bytes_up"] + grouped_flows["bytes_down"]
2727
user_category_total = grouped_flows[["user", "category", "bytes_total"]].groupby(
2828
["user", "category"]
2929
).sum().reset_index()
3030

3131
# Filter users by time in network to eliminate early incomplete samples
32-
user_active_ranges = infra.pd_infra.read_parquet(
32+
user_active_ranges = infra.pd.read_parquet(
3333
"data/clean/user_active_deltas.parquet")[["user", "days_since_first_active", "days_active", "days_online"]]
3434
# Drop users that joined less than a week ago.
3535
users_to_analyze = user_active_ranges.loc[
@@ -207,7 +207,7 @@ def make_category_quantiles_plots(infile):
207207

208208
if platform.large_compute_support:
209209
print("Running compute tasks")
210-
client = infra.dask_infra.setup_platform_tuned_dask_client(10, platform)
210+
client = infra.dask.setup_platform_tuned_dask_client(10, platform)
211211
reduce_to_pandas(outfile=graph_temporary_file, dask_client=client)
212212
client.close()
213213

bytes_per_category_per_user.py

+7-7
Original file line numberDiff line numberDiff line change
@@ -4,32 +4,32 @@
44
import altair as alt
55
import pandas as pd
66

7-
import infra.dask_infra
8-
import infra.pd_infra
7+
import infra.dask
8+
import infra.pd
99
import infra.platform
1010

1111

1212
def reduce_to_pandas(outfile, dask_client):
13-
flows = infra.dask_infra.read_parquet(
13+
flows = infra.dask.read_parquet(
1414
"data/clean/flows/typical_fqdn_org_category_local_TM_DIV_none_INDEX_start")[["user", "category", "org", "bytes_up", "bytes_down"]]
1515

1616
# Do the grouping
1717
flows = flows.groupby(["user", "category", "org"]).sum()
1818
flows = flows.compute()
1919

20-
infra.pd_infra.clean_write_parquet(flows, outfile)
20+
infra.pd.clean_write_parquet(flows, outfile)
2121

2222

2323
def make_category_per_user_plots(infile):
24-
grouped_flows = infra.pd_infra.read_parquet(infile)
24+
grouped_flows = infra.pd.read_parquet(infile)
2525
grouped_flows = grouped_flows.reset_index()
2626
grouped_flows["bytes_total"] = grouped_flows["bytes_up"] + grouped_flows["bytes_down"]
2727
user_category_total = grouped_flows[["user", "category", "bytes_total"]].groupby(
2828
["user", "category"]
2929
).sum().reset_index()
3030

3131
# Filter users by time in network to eliminate early incomplete samples
32-
user_active_ranges = infra.pd_infra.read_parquet(
32+
user_active_ranges = infra.pd.read_parquet(
3333
"data/clean/user_active_deltas.parquet")[["user", "days_since_first_active", "days_active", "days_online"]]
3434
# Drop users that joined less than a week ago.
3535
users_to_analyze = user_active_ranges.loc[
@@ -151,7 +151,7 @@ def make_category_per_user_plots(infile):
151151

152152
if platform.large_compute_support:
153153
print("Running compute tasks")
154-
client = infra.dask_infra.setup_platform_tuned_dask_client(10, platform)
154+
client = infra.dask.setup_platform_tuned_dask_client(10, platform)
155155
reduce_to_pandas(outfile=graph_temporary_file, dask_client=client)
156156
client.close()
157157

bytes_per_day_of_week.py

+10-10
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,18 @@
22
import numpy as np
33
import pandas as pd
44

5-
import infra.dask_infra
6-
import infra.pd_infra
5+
import infra.dask
6+
import infra.pd
77
import infra.platform
88

99

1010
def create_all_flows(dask_client):
11-
typical_flows = infra.dask_infra.read_parquet(
11+
typical_flows = infra.dask.read_parquet(
1212
"data/clean/flows/typical_fqdn_category_local_TM_DIV_none_INDEX_start")[["end", "protocol", "bytes_up", "bytes_down"]]
1313

14-
p2p_flows = infra.dask_infra.read_parquet("data/clean/flows/p2p_TM_DIV_none_INDEX_start")[["end", "protocol", "bytes_a_to_b", "bytes_b_to_a"]]
14+
p2p_flows = infra.dask.read_parquet("data/clean/flows/p2p_TM_DIV_none_INDEX_start")[["end", "protocol", "bytes_a_to_b", "bytes_b_to_a"]]
1515

16-
nouser_flows = infra.dask_infra.read_parquet("data/clean/flows/nouser_TM_DIV_none_INDEX_start")[["end", "protocol", "bytes_a_to_b", "bytes_b_to_a"]]
16+
nouser_flows = infra.dask.read_parquet("data/clean/flows/nouser_TM_DIV_none_INDEX_start")[["end", "protocol", "bytes_a_to_b", "bytes_b_to_a"]]
1717

1818
typical_flows["bytes_total"] = typical_flows["bytes_up"] + typical_flows["bytes_down"]
1919
p2p_flows["bytes_total"] = p2p_flows["bytes_a_to_b"] + p2p_flows["bytes_b_to_a"]
@@ -25,11 +25,11 @@ def create_all_flows(dask_client):
2525

2626
all_flows = typical_flows.append(p2p_flows).append(nouser_flows)
2727
all_flows = all_flows.set_index("start").repartition(partition_size="128M", force=True)
28-
infra.dask_infra.clean_write_parquet(all_flows, "data/clean/flows/all_TM_DIV_none_INDEX_start")
28+
infra.dask.clean_write_parquet(all_flows, "data/clean/flows/all_TM_DIV_none_INDEX_start")
2929

3030

3131
def reduce_to_pandas(outfile, dask_client):
32-
flows = infra.dask_infra.read_parquet(
32+
flows = infra.dask.read_parquet(
3333
"data/clean/flows/all_TM_DIV_none_INDEX_start")[["bytes_total"]]
3434

3535
# Compress to days
@@ -43,11 +43,11 @@ def reduce_to_pandas(outfile, dask_client):
4343

4444
flows = flows.compute()
4545

46-
infra.pd_infra.clean_write_parquet(flows, outfile)
46+
infra.pd.clean_write_parquet(flows, outfile)
4747

4848

4949
def make_plot(infile):
50-
grouped_flows = infra.pd_infra.read_parquet(infile)
50+
grouped_flows = infra.pd.read_parquet(infile)
5151
grouped_flows = grouped_flows.reset_index()
5252

5353
days = ['Monday', 'Tuesday', 'Wednesday',
@@ -151,7 +151,7 @@ def make_plot(infile):
151151

152152
if platform.large_compute_support:
153153
print("Performing compute operations")
154-
client = infra.dask_infra.setup_dask_client()
154+
client = infra.dask.setup_dask_client()
155155
# create_all_flows(client)
156156
reduce_to_pandas(outfile=graph_temporary_file, dask_client=client)
157157
client.close()

bytes_per_online_day_per_user.py

+7-7
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55
import numpy as np
66
import pandas as pd
77

8-
import infra.dask_infra
9-
import infra.pd_infra
8+
import infra.dask
9+
import infra.pd
1010
import infra.platform
1111

1212

1313
def reduce_to_pandas(outpath, dask_client):
14-
flows = infra.dask_infra.read_parquet(
14+
flows = infra.dask.read_parquet(
1515
"data/clean/flows/typical_fqdn_org_category_local_TM_DIV_none_INDEX_start")[["user", "bytes_up", "bytes_down"]]
1616

1717
flows["bytes_total"] = flows["bytes_up"] + flows["bytes_down"]
@@ -27,7 +27,7 @@ def reduce_to_pandas(outpath, dask_client):
2727
flows = flows.reset_index()[["start_bin", "user", "bytes_total"]]
2828
flows = flows.compute()
2929

30-
infra.pd_infra.clean_write_parquet(flows, outpath)
30+
infra.pd.clean_write_parquet(flows, outpath)
3131

3232

3333
def compute_cdf(frame, value_column, base_column):
@@ -40,13 +40,13 @@ def compute_cdf(frame, value_column, base_column):
4040

4141

4242
def make_plot(inpath):
43-
flows = infra.pd_infra.read_parquet(inpath)
43+
flows = infra.pd.read_parquet(inpath)
4444
flows = flows.reset_index()
4545
flows["MB"] = flows["bytes_total"] / (1000**2)
4646
user_total = flows[["user", "MB"]]
4747
user_total = user_total.groupby(["user"]).sum().reset_index()
4848

49-
activity = infra.pd_infra.read_parquet("data/clean/user_active_deltas.parquet")
49+
activity = infra.pd.read_parquet("data/clean/user_active_deltas.parquet")
5050

5151
df = user_total.merge(activity[["user", "days_online", "optimistic_days_online", "days_active"]], on="user")
5252
df["MB_per_online_day"] = df["MB"] / df["days_online"]
@@ -104,7 +104,7 @@ def make_plot(inpath):
104104

105105
if platform.large_compute_support:
106106
print("Running compute subcommands")
107-
client = infra.dask_infra.setup_platform_tuned_dask_client(per_worker_memory_GB=10, platform=platform)
107+
client = infra.dask.setup_platform_tuned_dask_client(per_worker_memory_GB=10, platform=platform)
108108
reduce_to_pandas(outpath=graph_temporary_file, dask_client=client)
109109
client.close()
110110

0 commit comments

Comments
 (0)