Skip to content

Commit

Permalink
Add Gamersclub data from kaggle
Browse files Browse the repository at this point in the history
  • Loading branch information
TeoCalvo authored and TeoCalvo committed Jan 11, 2024
1 parent ea02023 commit d6e569a
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 1 deletion.
6 changes: 6 additions & 0 deletions databases/create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ CREATE DATABASE IF NOT EXISTS silver.dota;

-- COMMAND ----------

-- DBTITLE 1,Gamesclub
CREATE DATABASE IF NOT EXISTS bronze.gamersclub;
CREATE DATABASE IF NOT EXISTS silver.gamersclub;

-- COMMAND ----------

-- DBTITLE 1,IBGE
CREATE DATABASE IF NOT EXISTS bronze.ibge;
CREATE DATABASE IF NOT EXISTS silver.ibge;
Expand Down
49 changes: 49 additions & 0 deletions src/02.bronze/gamersclub/ingestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Databricks notebook source
import sys

sys.path.insert(0, '../../lib/')

from ingestors import IngestaoBronze
import dbtools

# COMMAND ----------

database_name = 'bronze.gamersclub'
table_name = dbutils.widgets.get('table_name')
file_format = 'csv'
id_fields = []
timestamp_field = ''
partition_fields = []

path_full_load = f"/mnt/datalake/gc/{table_name}.csv"
path_incremental = ''

read_options = {
"header": "true",
"multiLine": "true",
"sep": ",",
}

# COMMAND ----------


ingest = IngestaoBronze(
path_full_load = path_full_load,
path_incremental = path_incremental,
file_format = file_format,
table_name = table_name,
database_name = database_name,
id_fields = id_fields,
timestamp_field = timestamp_field,
partition_fields = partition_fields,
read_options=read_options,
spark = spark,
)

# COMMAND ----------

if not dbtools.table_exists(spark, database_name, table_name):
print("Tabela não existente, realizando primeira carga...")
ingest.process_full()
print("Ok")

5 changes: 4 additions & 1 deletion src/lib/ingestors.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ def default_query(self):
NOW() as ingestion_at
FROM {table} """
ids = ",".join(self.id_fields)
window = f"""QUALIFY row_number() OVER (PARTITION BY {ids} ORDER BY {self.timestamp_field} DESC) = 1"""
if len(self.partition_fields) > 0:
window = f"""QUALIFY row_number() OVER (PARTITION BY {ids} ORDER BY {self.timestamp_field} DESC) = 1"""
else:
window = ""
return base_query + window

def load_stream(self):
Expand Down

0 comments on commit d6e569a

Please sign in to comment.