Skip to content

Commit

Permalink
add message at various steps, export to parquet, utilities column
Browse files Browse the repository at this point in the history
  • Loading branch information
defuneste committed Sep 19, 2024
1 parent f1ae028 commit 3a80fdb
Showing 1 changed file with 55 additions and 7 deletions.
62 changes: 55 additions & 7 deletions data-raw/nbm_block.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# or just going in the FTP of US census
# our internal package
library(cori.db)
library(duckdb)

get_census_block <- function() {
con <- cori.db::connect_to_db("sch_census_tiger")
Expand All @@ -13,6 +14,7 @@ get_census_block <- function() {
geoid20 as geoid_bl
from
sch_census_tiger.source_tiger_2020_blocks"
message(sprintf("Starting to get Census Block: %s", Sys.time()))
DBI::dbGetQuery(con, statement_census)
}

Expand All @@ -21,15 +23,15 @@ census_blocks <- get_census_block()

stopifnot(nrow(census_blocks) == 8180866)

library(duckdb)

# I worked with a persistent db
# because I also like testing on duckdb cli
# and maybe cache some intermediary process
con <- dbConnect(duckdb(), dbdir = "nbm_block.duckdb")

release <- "2023-12-01"

message(sprintf("Release used: %s", release))

DBI::dbWriteTable(con, "nbm_block", census_blocks)

set_release <- sprintf("alter table nbm_block
Expand All @@ -39,6 +41,8 @@ set_release <- sprintf("alter table nbm_block

DBI::dbExecute(con, set_release)

message(sprintf("Creating a filtered table: %s", Sys.time()))

nbm_cori1 <- "create table staging (
frn char(10),
provider_id varchar(7),
Expand Down Expand Up @@ -110,6 +114,8 @@ where
DBI::dbExecute(con, nbm_cori2)


message(sprintf("Starting adding count of locations; %s", Sys.time()))

nbm_count1 <- sprintf("
alter table nbm_block
add column cnt_total_locations integer;
Expand Down Expand Up @@ -154,7 +160,7 @@ where

DBI::dbExecute(con, nbm_count2)

DBI::dbGetQuery(con, "select * from nbm_block limit 10")
stopifnot(ncol(DBI::dbGetQuery(con, "select * from nbm_block limit 10")) == 4L)

nbm_count3 <-
"alter table nbm_block add column cnt_fiber_locations integer;
Expand Down Expand Up @@ -274,7 +280,7 @@ nbm_count6 <- "

DBI::dbExecute(con, nbm_count6)


message(sprintf("Starting create combo frn and relation table: %s", Sys.time()))

combo_frn <-
"alter table nbm_block
Expand Down Expand Up @@ -338,6 +344,8 @@ frn_count <- "

DBI::dbExecute(con, frn_count)

message(sprintf("Starting creating utiliies columns: %s", Sys.time()))

states <-
"create temp table us_states (
state_abbr varchar(2) primary key,
Expand Down Expand Up @@ -402,7 +410,9 @@ insert into us_states (state_abbr, geoid_st) values
('WI', '55'), -- Wisconsin
('WY', '56'); -- Wyoming"

"alter table nbm_block
DBI::dbExecute(con, states)

utilities <- "alter table nbm_block
add column if not exists geoid_st char(2);
alter table nbm_block
add column if not exists geoid_co char(5);
Expand All @@ -418,8 +428,46 @@ set state_abbr = us_states.state_abbr
from us_states
where nbm_block.geoid_st = us_states.geoid_st;"

DBI::dbExecute(con, states)
DBI::dbExecute(con, utilities)

DBI::dbGetQuery(con, "describe nbm_block")
message(sprintf("Starting to create parquet file: %s", Sys.time()))

write_parquet <- "copy (
select
geoid_bl,
geoid_st,
geoid_co,
state_abbr,
cnt_total_locations,
cnt_bead_locations,
cnt_copper_locations,
cnt_cable_locations,
cnt_fiber_locations,
cnt_other_locations,
cnt_unlicensed_fixed_wireless_locations,
cnt_licensed_fixed_wireless_locations,
cnt_LBR_fixed_wireless_locations,
cnt_terrestrial_locations,
cnt_25_3,
cnt_100_20,
cnt_100_100,
cnt_distcint_frn,
array_frn,
combo_frn,
release
from
nbm_block
order by geoid_bl)
to 'nbm_block' (FORMAT PARQUET, PARTITION_BY(state_abbr), CODEC 'SNAPPY');"

DBI::dbExecute(con, write_parquet)

write_rel_combo <-
"copy (
from rel_combo_frn order by frn
) to 'rel_combo_frn.parquet' (format parquet, CODEC 'SNAPPY', ROW_GROUP_SIZE 100000)"

DBI::dbExecute(con, write_rel_combo)

DBI::dbDisconnect(con)

0 comments on commit 3a80fdb

Please sign in to comment.