From d6e569a2e43220f5341a77a19011edf3199d5526 Mon Sep 17 00:00:00 2001 From: TeoCalvo Date: Thu, 11 Jan 2024 13:39:41 +0000 Subject: [PATCH] Add Gamersclub data from kaggle --- databases/create.sql | 6 ++++ src/02.bronze/gamersclub/ingestion.py | 49 +++++++++++++++++++++++++++ src/lib/ingestors.py | 5 ++- 3 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 src/02.bronze/gamersclub/ingestion.py diff --git a/databases/create.sql b/databases/create.sql index 74e3add..21267bd 100644 --- a/databases/create.sql +++ b/databases/create.sql @@ -11,6 +11,12 @@ CREATE DATABASE IF NOT EXISTS silver.dota; -- COMMAND ---------- +-- DBTITLE 1,Gamesclub +CREATE DATABASE IF NOT EXISTS bronze.gamersclub; +CREATE DATABASE IF NOT EXISTS silver.gamersclub; + +-- COMMAND ---------- + -- DBTITLE 1,IBGE CREATE DATABASE IF NOT EXISTS bronze.ibge; CREATE DATABASE IF NOT EXISTS silver.ibge; diff --git a/src/02.bronze/gamersclub/ingestion.py b/src/02.bronze/gamersclub/ingestion.py new file mode 100644 index 0000000..cfcdff4 --- /dev/null +++ b/src/02.bronze/gamersclub/ingestion.py @@ -0,0 +1,49 @@ +# Databricks notebook source +import sys + +sys.path.insert(0, '../../lib/') + +from ingestors import IngestaoBronze +import dbtools + +# COMMAND ---------- + +database_name = 'bronze.gamersclub' +table_name = dbutils.widgets.get('table_name') +file_format = 'csv' +id_fields = [] +timestamp_field = '' +partition_fields = [] + +path_full_load = f"/mnt/datalake/gc/{table_name}.csv" +path_incremental = '' + +read_options = { + "header": "true", + "multiLine": "true", + "sep": ",", + } + +# COMMAND ---------- + + +ingest = IngestaoBronze( + path_full_load = path_full_load, + path_incremental = path_incremental, + file_format = file_format, + table_name = table_name, + database_name = database_name, + id_fields = id_fields, + timestamp_field = timestamp_field, + partition_fields = partition_fields, + read_options=read_options, + spark = spark, + ) + +# COMMAND ---------- + +if not dbtools.table_exists(spark, database_name, table_name): + print("Tabela não existente, realizando primeira carga...") + ingest.process_full() + print("Ok") + diff --git a/src/lib/ingestors.py b/src/lib/ingestors.py index aee0263..67b8491 100644 --- a/src/lib/ingestors.py +++ b/src/lib/ingestors.py @@ -117,7 +117,10 @@ def default_query(self): NOW() as ingestion_at FROM {table} """ ids = ",".join(self.id_fields) - window = f"""QUALIFY row_number() OVER (PARTITION BY {ids} ORDER BY {self.timestamp_field} DESC) = 1""" + if len(self.partition_fields) > 0: + window = f"""QUALIFY row_number() OVER (PARTITION BY {ids} ORDER BY {self.timestamp_field} DESC) = 1""" + else: + window = "" return base_query + window def load_stream(self):