|
15 | 15 | import infra.pd
|
16 | 16 | import infra.platform
|
17 | 17 |
|
18 |
| -# Configs |
19 |
| -day_intervals = 7 |
20 |
| -# IMPORTANT: Run get_date_range() to update these values when loading in a new dataset! |
21 |
| -max_date = infra.constants.MAX_DATE |
22 |
| - |
23 |
| -def cohort_as_date_interval(x): |
24 |
| - cohort_start = max_date - datetime.timedelta(day_intervals * x + day_intervals - 1) |
25 |
| - cohort_end = max_date - datetime.timedelta(day_intervals * x) |
26 |
| - |
27 |
| - return cohort_start.strftime("%Y/%m/%d") + "-" + cohort_end.strftime("%Y/%m/%d") |
28 |
| - |
29 |
| -def cohort_as_date(x): |
30 |
| - day = max_date - datetime.timedelta(day_intervals * x) |
31 |
| - return day.strftime("%Y/%m/%d") |
32 |
| - |
33 |
| -def get_cohort(x): |
34 |
| - return x["start"].apply(lambda x_1: (max_date - x_1).days // day_intervals, meta=('start', 'int64')) |
35 |
| - |
36 |
| -def get_date(x): |
37 |
| - return x["cohort"].apply(cohort_as_date_interval, meta=('cohort', 'object')) |
38 |
| - |
39 |
| -def get_throughput_data(flows): |
40 |
| - # Make indexes a column and select "start", "bytes_up", "bytes_down" columns |
41 |
| - query = flows.reset_index()[["start", "bytes_up", "bytes_down"]] |
42 |
| - # Map each start to a cohort |
43 |
| - query = query.assign(cohort=get_cohort) |
44 |
| - # Group by cohorts and get the all the users |
45 |
| - query = query.groupby("cohort") |
46 |
| - # Sum up all of the bytes_up and bytes_down |
47 |
| - query = query.sum() |
48 |
| - # Get the start column back |
49 |
| - query = query.reset_index() |
50 |
| - # Get the total usage per day |
51 |
| - query["total_bytes"] = query["bytes_up"] + query["bytes_down"] |
52 |
| - # Get each date mapped to each cohort |
53 |
| - query = query.assign(date=get_date) |
54 |
| - |
55 |
| - return query |
56 |
| - |
57 | 18 |
|
58 | 19 | def reduce_to_pandas(outfile, dask_client):
|
59 | 20 | typical = infra.dask.read_parquet("data/clean/flows/typical_fqdn_org_category_local_TM_DIV_none_INDEX_start")[["bytes_up", "bytes_down"]]
|
|
0 commit comments