Skip to content

Commit 1d3f892

Browse files
committed
add test + changes after review
1 parent f55dd06 commit 1d3f892

File tree

2 files changed

+50
-15
lines changed

2 files changed

+50
-15
lines changed

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -511,9 +511,8 @@ IdToName buildIdToNameMap(const Poco::JSON::Object::Ptr & metadata_obj)
511511
for (size_t i = 0; i < schemas->size(); ++i)
512512
{
513513
auto schema = schemas->getObject(i);
514-
if (!schema || !schema->has("schema-id"))
515-
continue;
516-
if (schema->getValue<Int32>("schema-id") != current_schema_id)
514+
515+
if (!schema || !schema->has("schema-id") || (schema->getValue<Int32>("schema-id") != current_schema_id))
517516
continue;
518517

519518
if (auto fields = schema->getArray("fields"))
@@ -1169,23 +1168,13 @@ std::optional<size_t> IcebergMetadata::totalBytes(ContextPtr local_context) cons
11691168
std::optional<String> IcebergMetadata::partitionKey(ContextPtr) const
11701169
{
11711170
SharedLockGuard lock(mutex);
1172-
if (relevant_snapshot->partition_key.has_value())
1173-
{
1174-
return relevant_snapshot->partition_key;
1175-
}
1176-
1177-
return std::nullopt;
1171+
return relevant_snapshot->partition_key;
11781172
}
11791173

11801174
std::optional<String> IcebergMetadata::sortingKey(ContextPtr) const
11811175
{
11821176
SharedLockGuard lock(mutex);
1183-
if (relevant_snapshot->sorting_key.has_value())
1184-
{
1185-
return relevant_snapshot->sorting_key;
1186-
}
1187-
1188-
return std::nullopt;
1177+
return relevant_snapshot->sorting_key;
11891178
}
11901179

11911180

tests/integration/test_storage_iceberg/test.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3189,3 +3189,49 @@ def execute_spark_query(query: str):
31893189
table_select_expression = table_creation_expression
31903190

31913191
instance.query(f"SELECT * FROM {table_select_expression} ORDER BY ALL")
3192+
3193+
3194+
@pytest.mark.parametrize("storage_type", ["s3"])
3195+
def test_system_tables_partition_sorting_keys(started_cluster, storage_type):
3196+
instance = started_cluster.instances["node1"]
3197+
spark = started_cluster.spark_session
3198+
3199+
table_name = f"test_sys_tables_keys_{storage_type}_{uuid.uuid4().hex[:8]}"
3200+
fq_table = f"spark_catalog.default.{table_name}"
3201+
3202+
spark.sql(f"DROP TABLE IF EXISTS {fq_table}")
3203+
spark.sql(f"""
3204+
CREATE TABLE {fq_table} (
3205+
id INT,
3206+
ts TIMESTAMP,
3207+
payload STRING
3208+
)
3209+
USING iceberg
3210+
PARTITIONED BY (bucket(16, id), day(ts))
3211+
TBLPROPERTIES ('format-version' = '2')
3212+
""")
3213+
spark.sql(f"ALTER TABLE {fq_table} WRITE ORDERED BY (id DESC NULLS LAST, hour(ts))")
3214+
spark.sql(f"""
3215+
INSERT INTO {fq_table} VALUES
3216+
(1, timestamp'2024-01-01 10:00:00', 'a'),
3217+
(2, timestamp'2024-01-02 11:00:00', 'b'),
3218+
(NULL, timestamp'2024-01-03 12:00:00', 'c')
3219+
""")
3220+
3221+
time.sleep(2)
3222+
default_upload_directory(
3223+
started_cluster,
3224+
storage_type,
3225+
f"/iceberg_data/default/{table_name}/",
3226+
f"/iceberg_data/default/{table_name}/",
3227+
)
3228+
3229+
create_iceberg_table(storage_type, instance, table_name, started_cluster)
3230+
3231+
res = instance.query(f"""
3232+
SELECT partition_key, sorting_key
3233+
FROM system.tables
3234+
WHERE name = '{table_name}' FORMAT csv
3235+
""").strip().lower()
3236+
3237+
assert res == '"bucket(16, id), day(ts)","iddescnulls last, hour(ts)ascnulls first"'

0 commit comments

Comments
 (0)