-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpostproc.py
More file actions
92 lines (80 loc) · 2.8 KB
/
postproc.py
File metadata and controls
92 lines (80 loc) · 2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
"""Commandline script to rechunk data to easier-to-read parquet file. First run main.py"""
import polars as pl
from pathlib import Path
import datetime
DB_VERSION = "v1.2"
INPUT_FOLDER = Path("processed_data", "database_flat")
OUTPUT_FOLDER = Path("processed_data", "database")
OUTPUT_FOLDER.mkdir(exist_ok=True)
def main():
print(datetime.datetime.now(), "| Reading data in memory...")
df = pl.read_parquet(INPUT_FOLDER / "**" / "*.parquet", missing_columns="insert")
print(datetime.datetime.now(), "| Finished reading data in memory.")
print(datetime.datetime.now(), "| Creating completed dataset.")
df_full = (
df.select(pl.col(["disease", "year", "month", "cbscode"]).unique().implode())
.explode("disease")
.explode("year")
.explode("month")
.explode("cbscode")
.join(df, on=["disease", "year", "month", "cbscode"], how="left")
.select(["disease", "year", "month", "cbscode", "n_location", "n_both"])
.fill_null(0)
)
print(datetime.datetime.now(), "| Finished completing dataset.")
print(datetime.datetime.now(), "| Cleaning dataset.")
df_clean = (
df_full.with_columns(
pl.when(pl.col("n_location").eq(0))
.then(pl.lit(0.0))
.otherwise(pl.col("n_both") / pl.col("n_location"))
.alias("mention_rate")
)
.sort(["disease", "year", "month", "cbscode"])
.with_columns(pl.col("disease").str.to_lowercase().cast(pl.Categorical))
.select(
[
"disease",
"year",
"month",
"cbscode",
"mention_rate",
"n_location",
"n_both",
]
)
)
print(datetime.datetime.now(), "| Writing data.")
df_clean.write_parquet(
OUTPUT_FOLDER / f"disease_database_{DB_VERSION}.parquet",
statistics="full",
)
print(datetime.datetime.now(), "| Producing yearly dataset.")
df_yearly = (
df_clean.group_by(["disease", "year", "cbscode"])
.agg(pl.col.n_location.sum(), pl.col.n_both.sum())
.with_columns(
pl.when(pl.col("n_location").eq(0))
.then(pl.lit(0.0))
.otherwise(pl.col("n_both") / pl.col("n_location"))
.alias("mention_rate")
)
.sort(["disease", "year", "cbscode"])
.select(
[
"disease",
"year",
"cbscode",
"mention_rate",
"n_location",
"n_both",
]
)
)
print(datetime.datetime.now(), "| Writing data.")
df_yearly.write_parquet(
OUTPUT_FOLDER / f"disease_database_yearly_{DB_VERSION}.parquet",
statistics="full",
)
if __name__ == "__main__":
main()