diff --git a/parquet/generate-ch-schema.py b/parquet/generate-ch-schema.py index 7629668..e8f1744 100644 --- a/parquet/generate-ch-schema.py +++ b/parquet/generate-ch-schema.py @@ -23,12 +23,30 @@ def pq_to_ch_type(pq_type): else: ch_type = "Int256" + elif pq_type.logical_type.type == "INT" and pq_type.physical_type == "INT32": + ch_type = "Int32" + elif pq_type.logical_type.type == "STRING": ch_type = "String" elif pq_type.logical_type.type == "DATE": ch_type = "Date" + elif pq_type.logical_type.type == "NONE" and pq_type.physical_type == "BOOLEAN": + ch_type = "Bool" + + elif pq_type.logical_type.type == "NONE" and pq_type.physical_type == "DOUBLE": + ch_type = "Float64" + + elif pq_type.logical_type.type == "NONE" and pq_type.physical_type == "INT32": + ch_type = "Int32" + + elif pq_type.logical_type.type == "NONE" and pq_type.physical_type == "INT64": + ch_type = "Int64" + + elif pq_type.logical_type.type == "NONE" and pq_type.physical_type == "INT96": + ch_type = "DateTime64" + else: print(pq_type, type(pq_type)) raise Exception(f"Unknown type: {pq_type}") @@ -55,7 +73,7 @@ def fail(msg): exit(1) def process(): - # Print a header. + # Print a header. print("-- Automatically generated DDL and INSERT for Parquet data") # We need region to read S3 and generate URLs. @@ -65,7 +83,7 @@ def process(): print("-- AWS_REGION: " + AWS_REGION) # The AWS access key and secret key are optional for loading into S3. - # You can also use a bucket with open permissions. + # You can also use a bucket with open permissions. AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID') AWS_SECRET_ACCESS_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY') @@ -74,16 +92,16 @@ def process(): if not S3_DATASET_PATH: fail("S3_DATASET_PATH environment variable not set") print("-- S3_DATASET_PATH: " + S3_DATASET_PATH) - - # We need to get the table from the path. It is the last whole word - # in the path before the trailing /. + + # We need to get the table from the path. It is the last whole word + # in the path before the trailing /. regex_match = re.search('([A-Za-z0-9_]+)/$', S3_DATASET_PATH) if not regex_match: fail("S3_DATASET_PATH must have form bucket/dir/.../dir/table_name/") table_name = regex_match.group(1) print("-- Table name: " + table_name) - - # Open up the parquet dataset in S3. + + # Open up the parquet dataset in S3. s3 = fs.S3FileSystem(region=AWS_REGION) pq_dataset = pq.ParquetDataset(S3_DATASET_PATH, filesystem=s3) @@ -93,7 +111,7 @@ def process(): fail("S3_DATASET_PATH does not have any Parquet files!") pq_fragment = pq_dataset.fragments[0] - # Generate CREATE TABLE command. + # Generate CREATE TABLE command. ch_columns = pq_columns_to_ch_columns(pq_fragment.metadata) print("CREATE TABLE IF NOT EXISTS {0} (".format(table_name)) @@ -106,7 +124,7 @@ def process(): print("ORDER BY tuple()") print("") - # Generate INSERT with SELECT from S3 URL. + # Generate INSERT with SELECT from S3 URL. if AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY: aws_credentials = "'{0}', '{1}',".format(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) else: