Skip to content

Commit c95a33c

Browse files
Zhen Lizhljen
Zhen Li
authored andcommitted
Supporting table iceberg get partitions
1 parent 6b66d8d commit c95a33c

File tree

43 files changed

+1263
-190
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1263
-190
lines changed

build.gradle

+6
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ buildscript {
2222
maven {
2323
url "http://repo.spring.io/milestone"
2424
}
25+
maven { url 'https://jitpack.io' }
2526
}
2627

2728
dependencies {
@@ -32,6 +33,7 @@ buildscript {
3233
}
3334
}
3435

36+
3537
plugins {
3638
id "com.github.kt3k.coveralls" version "2.8.1"
3739
id "nebula.netflixoss" version "5.0.0"
@@ -60,6 +62,9 @@ allprojects {
6062
maven {
6163
url "http://redshift-maven-repository.s3-website-us-east-1.amazonaws.com/release"
6264
}
65+
maven {
66+
url 'https://jitpack.io'
67+
}
6368
}
6469
}
6570

@@ -156,6 +161,7 @@ configure(javaProjects) {
156161
}
157162

158163
dependencies {
164+
159165
/*******************************
160166
* Compile Dependencies
161167
*******************************/

metacat-common-server/src/main/java/com/netflix/metacat/common/server/connectors/ConnectorPartitionService.java

+16-7
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.netflix.metacat.common.server.connectors.model.PartitionListRequest;
2323
import com.netflix.metacat.common.server.connectors.model.PartitionsSaveRequest;
2424
import com.netflix.metacat.common.server.connectors.model.PartitionsSaveResponse;
25+
import com.netflix.metacat.common.server.connectors.model.TableInfo;
2526

2627
import java.util.List;
2728
import java.util.Map;
@@ -39,14 +40,15 @@ public interface ConnectorPartitionService extends ConnectorBaseService<Partitio
3940
* @param context The Metacat request context
4041
* @param table table handle to get partition for
4142
* @param partitionsRequest The metadata for what kind of partitions to get from the table
43+
* @param tableInfo Table info object
4244
* @return filtered list of partitions
4345
* @throws UnsupportedOperationException If the connector doesn't implement this method
4446
*/
4547
default List<PartitionInfo> getPartitions(
4648
final ConnectorRequestContext context,
4749
final QualifiedName table,
48-
final PartitionListRequest partitionsRequest
49-
) {
50+
final PartitionListRequest partitionsRequest,
51+
final TableInfo tableInfo) {
5052
throw new UnsupportedOperationException(ConnectorBaseService.UNSUPPORTED_MESSAGE);
5153
}
5254

@@ -73,12 +75,14 @@ default PartitionsSaveResponse savePartitions(
7375
* @param context The Metacat request context
7476
* @param tableName table name
7577
* @param partitionNames list of partition names
78+
* @param tableInfo table info object
7679
* @throws UnsupportedOperationException If the connector doesn't implement this method
7780
*/
7881
default void deletePartitions(
7982
final ConnectorRequestContext context,
8083
final QualifiedName tableName,
81-
final List<String> partitionNames
84+
final List<String> partitionNames,
85+
final TableInfo tableInfo
8286
) {
8387
throw new UnsupportedOperationException(ConnectorBaseService.UNSUPPORTED_MESSAGE);
8488
}
@@ -88,12 +92,14 @@ default void deletePartitions(
8892
*
8993
* @param context The Metacat request context
9094
* @param table table handle
95+
* @param tableInfo table info object
9196
* @return Number of partitions
9297
* @throws UnsupportedOperationException If the connector doesn't implement this method
9398
*/
9499
default int getPartitionCount(
95100
final ConnectorRequestContext context,
96-
final QualifiedName table
101+
final QualifiedName table,
102+
final TableInfo tableInfo
97103
) {
98104
throw new UnsupportedOperationException(ConnectorBaseService.UNSUPPORTED_MESSAGE);
99105
}
@@ -121,14 +127,15 @@ default Map<String, List<QualifiedName>> getPartitionNames(
121127
* @param context The Metacat request context
122128
* @param table table handle to get partition for
123129
* @param partitionsRequest The metadata for what kind of partitions to get from the table
130+
* @param tableInfo table info object
124131
* @return filtered list of partition names
125132
* @throws UnsupportedOperationException If the connector doesn't implement this method
126133
*/
127134
default List<String> getPartitionKeys(
128135
final ConnectorRequestContext context,
129136
final QualifiedName table,
130-
final PartitionListRequest partitionsRequest
131-
) {
137+
final PartitionListRequest partitionsRequest,
138+
final TableInfo tableInfo) {
132139
throw new UnsupportedOperationException(ConnectorBaseService.UNSUPPORTED_MESSAGE);
133140
}
134141

@@ -138,13 +145,15 @@ default List<String> getPartitionKeys(
138145
* @param context The Metacat request context
139146
* @param table table handle to get partition for
140147
* @param partitionsRequest The metadata for what kind of partitions to get from the table
148+
* @param tableInfo table info object
141149
* @return filtered list of partition uris
142150
* @throws UnsupportedOperationException If the connector doesn't implement this method
143151
*/
144152
default List<String> getPartitionUris(
145153
final ConnectorRequestContext context,
146154
final QualifiedName table,
147-
final PartitionListRequest partitionsRequest
155+
final PartitionListRequest partitionsRequest,
156+
final TableInfo tableInfo
148157
) {
149158
throw new UnsupportedOperationException(ConnectorBaseService.UNSUPPORTED_MESSAGE);
150159
}

metacat-common-server/src/main/java/com/netflix/metacat/common/server/connectors/model/PartitionInfo.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package com.netflix.metacat.common.server.connectors.model;
1919

20+
import com.fasterxml.jackson.databind.node.ObjectNode;
2021
import com.netflix.metacat.common.QualifiedName;
2122
import lombok.AllArgsConstructor;
2223
import lombok.Builder;
@@ -29,7 +30,7 @@
2930
/**
3031
* Partition DTO.
3132
*
32-
* @author amajumdar
33+
* @author amajumdar, zhenl
3334
* @since 1.0.0
3435
*/
3536
@SuppressWarnings("unused")
@@ -39,6 +40,8 @@
3940
@NoArgsConstructor
4041
public final class PartitionInfo extends BaseInfo {
4142
private StorageInfo serde;
43+
//to populate the metrics from iceberg
44+
private ObjectNode dataMetrics;
4245

4346
/**
4447
* Constructor.
@@ -53,9 +56,11 @@ private PartitionInfo(
5356
final QualifiedName name,
5457
final AuditInfo auditInfo,
5558
final Map<String, String> metadata,
56-
final StorageInfo serde
59+
final StorageInfo serde,
60+
final ObjectNode dataMetrics
5761
) {
5862
super(name, auditInfo, metadata);
5963
this.serde = serde;
64+
this.dataMetrics = dataMetrics;
6065
}
6166
}

metacat-common-server/src/main/java/com/netflix/metacat/common/server/properties/Config.java

+7
Original file line numberDiff line numberDiff line change
@@ -374,5 +374,12 @@ public interface Config {
374374
* @return The metacat delete acl property
375375
*/
376376
Map<QualifiedName, Set<String>> getMetacatDeleteAcl();
377+
378+
/**
379+
* get Iceberg Table Summary Fetch Size.
380+
*
381+
* @return Iceberg Table Summary Fetch Size
382+
*/
383+
int getIcebergTableSummaryFetchSize();
377384
}
378385

metacat-common-server/src/main/java/com/netflix/metacat/common/server/properties/DefaultConfigImpl.java

+10
Original file line numberDiff line numberDiff line change
@@ -436,4 +436,14 @@ public Map<QualifiedName, Set<String>> getMetacatCreateAcl() {
436436
public Map<QualifiedName, Set<String>> getMetacatDeleteAcl() {
437437
return this.metacatProperties.getAuthorization().getDeleteAcl().getDeleteAclMap();
438438
}
439+
440+
441+
/**
442+
* {@inheritDoc}
443+
*/
444+
@Override
445+
public int getIcebergTableSummaryFetchSize() {
446+
return this.metacatProperties.getHive().getIceberg().getFetchSizeInTableSummary();
447+
}
448+
439449
}

metacat-common-server/src/main/java/com/netflix/metacat/common/server/properties/HiveProperties.java

+13
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public class HiveProperties {
3232

3333
@NonNull
3434
private Metastore metastore = new Metastore();
35+
@NonNull
36+
private Iceberg iceberg = new Iceberg();
3537

3638
/**
3739
* Metastore related properties.
@@ -85,4 +87,15 @@ public static class Whitelist {
8587
}
8688
}
8789
}
90+
91+
/**
92+
* Iceberg related properties.
93+
*
94+
* @author zhenl
95+
* @since 1.2.0
96+
*/
97+
@Data
98+
public static class Iceberg {
99+
private int fetchSizeInTableSummary = 100;
100+
}
88101
}

metacat-common-server/src/main/java/com/netflix/metacat/common/server/usermetadata/BaseUserMetadataService.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ public void populateMetadata(
104104

105105
if (holder instanceof HasDataMetadata) {
106106
final HasDataMetadata dataDto = (HasDataMetadata) holder;
107-
if (dataDto.isDataExternal()) {
107+
//data Metadata can be populated from iceberg metrics directly
108+
if (dataDto.isDataExternal() || dataMetadata != null) {
108109
dataDto.setDataMetadata(dataMetadata);
109110
}
110111
}

metacat-connector-cassandra/src/test/groovy/com/netflix/metacat/connector/cassandra/CassandraConnectorPartitionServiceSpec.groovy

+10-5
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import com.netflix.metacat.common.server.connectors.ConnectorRequestContext
2323
import com.netflix.metacat.common.server.connectors.model.PartitionInfo
2424
import com.netflix.metacat.common.server.connectors.model.PartitionListRequest
2525
import com.netflix.metacat.common.server.connectors.model.PartitionsSaveRequest
26+
import com.netflix.metacat.common.server.connectors.model.TableInfo
2627
import spock.lang.Shared
2728
import spock.lang.Specification
2829
import spock.lang.Unroll
@@ -57,7 +58,8 @@ class CassandraConnectorPartitionServiceSpec extends Specification {
5758
this.service.getPartitions(
5859
this.context,
5960
this.name,
60-
Mock(PartitionListRequest)
61+
Mock(PartitionListRequest),
62+
Mock(TableInfo)
6163
)
6264
}
6365
) | "getPartitions" | UnsupportedOperationException
@@ -75,13 +77,14 @@ class CassandraConnectorPartitionServiceSpec extends Specification {
7577
this.service.deletePartitions(
7678
this.context,
7779
this.name,
78-
Lists.newArrayList()
80+
Lists.newArrayList(),
81+
Mock(TableInfo)
7982
)
8083
}
8184
) | "deletePartitions" | UnsupportedOperationException
8285
(
8386
{
84-
this.service.getPartitionCount(this.context, this.name)
87+
this.service.getPartitionCount(this.context, this.name, Mock(TableInfo))
8588
}
8689
) | "getPartitionCount" | UnsupportedOperationException
8790
(
@@ -98,7 +101,8 @@ class CassandraConnectorPartitionServiceSpec extends Specification {
98101
this.service.getPartitionKeys(
99102
this.context,
100103
this.name,
101-
Mock(PartitionListRequest)
104+
Mock(PartitionListRequest),
105+
Mock(TableInfo)
102106
)
103107
}
104108
) | "getPartitionKeys" | UnsupportedOperationException
@@ -107,7 +111,8 @@ class CassandraConnectorPartitionServiceSpec extends Specification {
107111
this.service.getPartitionUris(
108112
this.context,
109113
this.name,
110-
Mock(PartitionListRequest)
114+
Mock(PartitionListRequest),
115+
Mock(TableInfo)
111116
)
112117
}
113118
) | "getPartitionUris" | UnsupportedOperationException

metacat-connector-druid/src/main/java/com/netflix/metacat/connector/druid/DruidConnectorPartitionService.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.netflix.metacat.common.server.connectors.ConnectorRequestContext;
2323
import com.netflix.metacat.common.server.connectors.model.PartitionInfo;
2424
import com.netflix.metacat.common.server.connectors.model.PartitionListRequest;
25+
import com.netflix.metacat.common.server.connectors.model.TableInfo;
2526
import com.netflix.metacat.connector.druid.converter.DataSource;
2627
import com.netflix.metacat.connector.druid.converter.DruidConnectorInfoConverter;
2728
import com.netflix.metacat.connector.druid.converter.DruidConverterUtil;
@@ -62,7 +63,8 @@ public DruidConnectorPartitionService(
6263
@Override
6364
public int getPartitionCount(
6465
final ConnectorRequestContext context,
65-
final QualifiedName name
66+
final QualifiedName name,
67+
final TableInfo tableInfo
6668
) {
6769
final ObjectNode node = this.druidClient.getAllDataByName(name.getTableName());
6870
return DruidConverterUtil.getSegmentCount(node);
@@ -75,8 +77,8 @@ public int getPartitionCount(
7577
public List<PartitionInfo> getPartitions(
7678
final ConnectorRequestContext context,
7779
final QualifiedName name,
78-
final PartitionListRequest partitionsRequest
79-
) {
80+
final PartitionListRequest partitionsRequest,
81+
final TableInfo tableInfo) {
8082
final ObjectNode node = this.druidClient.getAllDataByName(name.getTableName());
8183
final DataSource dataSource = DruidConverterUtil.getDatasourceFromAllSegmentJsonObject(node);
8284
final List<PartitionInfo> partitionInfos = new ArrayList<>();

metacat-connector-hive/build.gradle

+3
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ dependencies {
2929
compile("com.github.rholder:guava-retrying") {
3030
exclude module: "guava"
3131
}
32+
3233
compile("commons-dbutils:commons-dbutils")
34+
compile('com.github.Netflix.iceberg:iceberg-common:0.3.0')
35+
compile('com.github.Netflix.iceberg:iceberg-core:0.3.0')
3336

3437
/*******************************
3538
* Provided Dependencies

0 commit comments

Comments
 (0)