Skip to content

Support different data types #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions parquet/generate-ch-schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,30 @@ def pq_to_ch_type(pq_type):
else:
ch_type = "Int256"

elif pq_type.logical_type.type == "INT" and pq_type.physical_type == "INT32":
ch_type = "Int32"

elif pq_type.logical_type.type == "STRING":
ch_type = "String"

elif pq_type.logical_type.type == "DATE":
ch_type = "Date"

elif pq_type.logical_type.type == "NONE" and pq_type.physical_type == "BOOLEAN":
ch_type = "Bool"

elif pq_type.logical_type.type == "NONE" and pq_type.physical_type == "DOUBLE":
ch_type = "Float64"

elif pq_type.logical_type.type == "NONE" and pq_type.physical_type == "INT32":
ch_type = "Int32"

elif pq_type.logical_type.type == "NONE" and pq_type.physical_type == "INT64":
ch_type = "Int64"

elif pq_type.logical_type.type == "NONE" and pq_type.physical_type == "INT96":
ch_type = "DateTime64"

else:
print(pq_type, type(pq_type))
raise Exception(f"Unknown type: {pq_type}")
Expand All @@ -55,7 +73,7 @@ def fail(msg):
exit(1)

def process():
# Print a header.
# Print a header.
print("-- Automatically generated DDL and INSERT for Parquet data")

# We need region to read S3 and generate URLs.
Expand All @@ -65,7 +83,7 @@ def process():
print("-- AWS_REGION: " + AWS_REGION)

# The AWS access key and secret key are optional for loading into S3.
# You can also use a bucket with open permissions.
# You can also use a bucket with open permissions.
AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY')

Expand All @@ -74,16 +92,16 @@ def process():
if not S3_DATASET_PATH:
fail("S3_DATASET_PATH environment variable not set")
print("-- S3_DATASET_PATH: " + S3_DATASET_PATH)
# We need to get the table from the path. It is the last whole word
# in the path before the trailing /.

# We need to get the table from the path. It is the last whole word
# in the path before the trailing /.
regex_match = re.search('([A-Za-z0-9_]+)/$', S3_DATASET_PATH)
if not regex_match:
fail("S3_DATASET_PATH must have form bucket/dir/.../dir/table_name/")
table_name = regex_match.group(1)
print("-- Table name: " + table_name)
# Open up the parquet dataset in S3.

# Open up the parquet dataset in S3.
s3 = fs.S3FileSystem(region=AWS_REGION)
pq_dataset = pq.ParquetDataset(S3_DATASET_PATH, filesystem=s3)

Expand All @@ -93,7 +111,7 @@ def process():
fail("S3_DATASET_PATH does not have any Parquet files!")
pq_fragment = pq_dataset.fragments[0]

# Generate CREATE TABLE command.
# Generate CREATE TABLE command.
ch_columns = pq_columns_to_ch_columns(pq_fragment.metadata)

print("CREATE TABLE IF NOT EXISTS {0} (".format(table_name))
Expand All @@ -106,7 +124,7 @@ def process():
print("ORDER BY tuple()")
print("")

# Generate INSERT with SELECT from S3 URL.
# Generate INSERT with SELECT from S3 URL.
if AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY:
aws_credentials = "'{0}', '{1}',".format(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
else:
Expand Down