-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
all tables in lake storage configured cluster should use lake's bucke… #362
Conversation
84ab0a0
to
b69bc6b
Compare
b69bc6b
to
12e2d38
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some high-level design suggestions:
- The cluster datalake configuration should align with table configuration, how about:
datalake.format = paimon/iceberg
datalake.paimon.catalog = hive
datalake.paimon.metastore = thrift://localhost:9083
datalake.paimon.warehouse = hdfs://localhost:9000/user/hive/warehouse
- Besides, the
table.datalake.format
, we can also add the catalog and metastore information into table config, so that the table can be self-contained, and connectors don't need to retrievedescribeLakeStorage
(describeLakeStorage
can be removed then). If there is any auth/securtiy tokens to retrieve, we can add another RPC similar togetFileSystemSecurityToken
, but that should be optional.
table.datalake.enabled = true/false
table.datalake.format = paimon/iceberg
// logical properties
table.datalake.paimon.catalog = hive
table.datalake.paimon.metastore = thrift://localhost:9083
table.datalake.paimon.warehouse = hdfs://localhost:9000/user/hive/warehouse
Considering the metastore location may be changed over time, it would be a logical property that is added during getTableInfo
instead of a physical property stored in zookeeper. And thus metastore properties shouldn't be set or altered.
- It's confusing that the key encoding is different for bucketing and storing. It's meaningless to keep our own key encoding when it is a datalake-enabled table. We also discussed that bucket-rescale can be supported even if we use Paimon/Iceberg key encoding/bucketing. So we can introduce two separate interfaces for encoding and bucketing.
interface KeyEncoder {
byte[] encodeKey(InternalRow row);
}
interface BucketAssigner {
int assignBucket(@Nullable byte[] bucketKey, Cluster cluster);
}
- We should add tests to verify the encoding and bucketing is the same with Paimon
fluss-client/src/main/java/com/alibaba/fluss/client/write/WriterClient.java
Outdated
Show resolved
Hide resolved
fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriterImpl.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/com/alibaba/fluss/config/TableConfig.java
Outdated
Show resolved
Hide resolved
021edb8
to
840980a
Compare
840980a
to
4a8108f
Compare
…ated in DataLake configured cluster (alibaba#362)
@@ -124,4 +124,14 @@ public static Map<String, String> extractConfigStartWith( | |||
} | |||
return extractedConfig; | |||
} | |||
|
|||
private static Map<String, String> normalizeToPaimonConfigs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use datalake.paimon.metastore
to replace datalake.paimon.catalog
.
datalakeFormat.toString(), normalizeToPaimonConfigs(datalakeConfig)); | ||
} | ||
|
||
private static Map<String, String> normalizeToPaimonConfigs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use datalake.paimon.metastore
to replace datalake.paimon.catalog
@@ -186,6 +194,14 @@ private TableDescriptor applySystemDefaults(TableDescriptor tableDescriptor) { | |||
newDescriptor = newDescriptor.withReplicationFactor(defaultReplicationFactor); | |||
} | |||
|
|||
// if lake storage is not null, we need to add the datalake type | |||
// to the property of the table | |||
if (dataLakeFormat != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should throw exception if the table is datalake enabled, but cluster not.
this.lakeTableBucketAssigner = null; | ||
this.bucketKeyEncoder = | ||
CompactedKeyEncoder.createKeyEncoder(lookupRowType, tableInfo.getBucketKeys()); | ||
this.lakeBucketAssigner = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Constructing the bucketKeyEncoder
is still very complex. However, we can unify the default and lake assigners into one util method.
} else { | ||
return lakeTableBucketAssigner.assignBucket( | ||
keyBytes, key, metadataUpdater.getCluster()); | ||
return lakeBucketAssigner.assignBucket(bucketKeyBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is still not unified and complex to assign bucket. We can introduce a BucketingFunction#bucketing(byte[] key, int numBuckets)
to unify the bucket assigned instead of using a util method and passing optional lake bucket assigner. We should hold only one not-null bucket assigner, that will be easier to use.
4a8108f
to
fffb61c
Compare
Please review the changes I just pushed @luoyuxia . |
…lake enabled or not
fffb61c
to
e5f1628
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wuchong Thanks for the commit to unify encoding and bucket assigning.. Looks much better after unifying. It make simpler to use.. LGTM
…ated in DataLake configured cluster (#362)
…t assigner
Purpose
Linked issue: close #343
Make all tables in lake storage configured cluster use lake's bucket assign strategy..
table.datalake.format
to the corresponding datalaketable.datalake.format
, use lake's bucket assinger to assign bucket for rowsStaticBucketAssigner
andDynamicBucketAssigner
.. For tables with bucket keys,use
StaticBucketAssigner
to caclcuate bucket id during convert row intoWriteRecord
.Tests
LakeTableManagerITCase to verify
table.datalake.format
will be set in lakestorage configured clusterFlussLakeTableITCase to verfy write/lookup will use lake's bucket assinger t
API and Format
Documentation