Skip to content

Commit b9f540e

Browse files
zhljenajoymajumdar
authored andcommitted
Adding iceberg partition dateCreated override (Netflix#309)
1 parent 90d7c33 commit b9f540e

File tree

6 files changed

+43
-21
lines changed

6 files changed

+43
-21
lines changed

metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/iceberg/IcebergRequestMetrics.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ enum Type {
5252
private final String metricName;
5353

5454
IcebergRequestMetrics(final IcebergRequestMetrics.Type type, final String measure) {
55-
this.metricName = String.format("metacat.iceberg.%s.%s.%s", type.name(), type.name(), measure);
55+
this.metricName = String.format("metacat.iceberg.%s.%s", type.name(), measure);
5656
}
5757

5858
IcebergRequestMetrics(final String name) {

metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/iceberg/IcebergTableHandler.java

+11-8
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,18 @@ public Map<String, ScanSummary.PartitionMetrics> getIcebergTablePartitionMap(
127127
* @return iceberg table
128128
*/
129129
public Table getIcebergTable(final QualifiedName tableName, final String tableMetadataLocation) {
130-
this.icebergTableCriteria.checkCriteria(tableName, tableMetadataLocation);
131130
final long start = this.registry.clock().wallTime();
132-
log.debug("Loading icebergTable {} from {}", tableName, tableMetadataLocation);
133-
final Table table = new IcebergMetastoreTables(tableMetadataLocation).load(tableName.toString());
134-
final long duration = registry.clock().wallTime() - start;
135-
log.info("Time taken to getIcebergTable {} is {} ms", tableName, duration);
136-
this.icebergTableRequestMetrics.recordTimer(IcebergRequestMetrics.TagLoadTable.getMetricName(), duration);
137-
this.icebergTableRequestMetrics.increaseCounter(IcebergRequestMetrics.TagLoadTable.getMetricName(), tableName);
138-
return table;
131+
try {
132+
this.icebergTableCriteria.checkCriteria(tableName, tableMetadataLocation);
133+
log.debug("Loading icebergTable {} from {}", tableName, tableMetadataLocation);
134+
return new IcebergMetastoreTables(tableMetadataLocation).load(tableName.toString());
135+
} finally {
136+
final long duration = registry.clock().wallTime() - start;
137+
log.info("Time taken to getIcebergTable {} is {} ms", tableName, duration);
138+
this.icebergTableRequestMetrics.recordTimer(IcebergRequestMetrics.TagLoadTable.getMetricName(), duration);
139+
this.icebergTableRequestMetrics.increaseCounter(
140+
IcebergRequestMetrics.TagLoadTable.getMetricName(), tableName);
141+
}
139142
}
140143

141144
/**

metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/monitoring/HiveMetrics.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ enum Type {
8484
private final String metricName;
8585

8686
HiveMetrics(final Type type, final String measure) {
87-
this.metricName = "metacat.hive." + type.name() + "." + type.name() + "." + measure;
87+
this.metricName = String.format("metacat.hive.%s.%s", type.name(), measure);
8888
}
8989
HiveMetrics(final String name) {
9090
this.metricName = name;

metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/sql/HiveConnectorFastPartitionService.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.netflix.metacat.common.server.connectors.ConnectorRequestContext;
2626
import com.netflix.metacat.common.server.connectors.ConnectorUtils;
2727
import com.netflix.metacat.common.server.connectors.exception.InvalidMetaException;
28+
import com.netflix.metacat.common.server.connectors.model.AuditInfo;
2829
import com.netflix.metacat.common.server.connectors.model.PartitionInfo;
2930
import com.netflix.metacat.common.server.connectors.model.PartitionListRequest;
3031
import com.netflix.metacat.common.server.connectors.model.StorageInfo;
@@ -47,7 +48,10 @@
4748
import org.apache.hadoop.hive.metastore.api.Table;
4849

4950
import javax.annotation.Nonnull;
51+
import javax.annotation.Nullable;
52+
import java.time.Instant;
5053
import java.util.Comparator;
54+
import java.util.Date;
5155
import java.util.List;
5256
import java.util.Map;
5357
import java.util.Set;
@@ -358,20 +362,24 @@ private List<PartitionInfo> getIcebergPartitionInfos(
358362
final Map<String, ScanSummary.PartitionMetrics> partitionMap
359363
= icebergTableHandler.getIcebergTablePartitionMap(tableName, partitionsRequest, icebergTable);
360364

361-
final List<PartitionInfo> filteredPartitionList;
362365
final List<String> partitionIds = partitionsRequest.getPartitionNames();
363366
final Sort sort = partitionsRequest.getSort();
367+
final AuditInfo tableAuditInfo = tableInfo.getAudit();
364368

365-
filteredPartitionList = partitionMap.keySet().stream()
369+
final List<PartitionInfo> filteredPartitionList = partitionMap.keySet().stream()
366370
.filter(partitionName -> partitionIds == null || partitionIds.contains(partitionName))
367371
.map(partitionName -> PartitionInfo.builder().name(
368372
QualifiedName.ofPartition(tableName.getCatalogName(),
369373
tableName.getDatabaseName(),
370374
tableName.getTableName(),
371375
partitionName))
372376
.dataMetrics(icebergTableHandler.getDataMetadataFromIcebergMetrics(partitionMap.get(partitionName)))
373-
.auditInfo(tableInfo.getAudit()).build())
377+
.auditInfo(AuditInfo.builder().createdBy(tableAuditInfo.getCreatedBy())
378+
.createdDate(fromEpochMilliToDate(partitionMap.get(partitionName).dataTimestampMillis()))
379+
.lastModifiedDate(fromEpochMilliToDate(partitionMap.get(partitionName).dataTimestampMillis()))
380+
.build()).build())
374381
.collect(Collectors.toList());
382+
375383
if (sort != null) {
376384
//it can only support sortBy partition Name
377385
final Comparator<PartitionInfo> nameComparator = Comparator.comparing(p -> p.getName().toString());
@@ -380,5 +388,7 @@ private List<PartitionInfo> getIcebergPartitionInfos(
380388
return ConnectorUtils.paginate(filteredPartitionList, pageable);
381389
}
382390

383-
391+
private Date fromEpochMilliToDate(@Nullable final Long l) {
392+
return (l == null) ? null : Date.from(Instant.ofEpochMilli(l));
393+
}
384394
}

metacat-connector-hive/src/test/groovy/com/netflix/metacat/connector/hive/HiveConnectorFastPartitionSpec.groovy

+15-6
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,14 @@ class HiveConnectorFastPartitionSpec extends Specification {
6969

7070
def setupSpec() {
7171
conf.icebergEnabled >> true
72+
metric1.fileCount() >> 1
73+
metric1.dataTimestampMillis() >> 1234500000
74+
metric1.recordCount() >> 1
7275
hiveConnectorFastPartitionService.icebergTableHandler = icebergTableHandler
7376
icebergTableHandler.getIcebergTable(_,_) >> Mock(Table)
7477
icebergTableHandler.getIcebergTablePartitionMap(_,_,_) >> ["dateint=20170101/hour=1": metric1,
75-
"dateint=20170102/hour=1": metric1,
76-
"dateint=20170103/hour=1": metric1]
78+
"dateint=20170102/hour=1": metric1,
79+
"dateint=20170103/hour=1": metric1]
7780
}
7881

7982
@Unroll
@@ -84,12 +87,18 @@ class HiveConnectorFastPartitionSpec extends Specification {
8487
partitionListRequest, MetacatDataInfoProvider.getIcebergTableInfo("icebergtable"))
8588

8689
then:
87-
partionInfos.collect { it.getName().getPartitionName() }.flatten() == results
90+
partionInfos.collect { [it.getName().getPartitionName(),
91+
it.getAudit().createdDate.toInstant().toEpochMilli(),
92+
it.getAudit().lastModifiedDate.toInstant().toEpochMilli(),
93+
it.getAudit().createdBy] } == results
8894
where:
8995
partitionListRequest | results
90-
new PartitionListRequest(null, ["dateint=20170101/hour=1"],false, null, new Sort(), null ) | ["dateint=20170101/hour=1"]
91-
new PartitionListRequest(null, null, false, null, new Sort(), null) | ["dateint=20170101/hour=1", "dateint=20170102/hour=1", "dateint=20170103/hour=1"]
92-
new PartitionListRequest(null, null, false, null, new Sort(null, SortOrder.DESC), null) | ["dateint=20170103/hour=1", "dateint=20170102/hour=1", "dateint=20170101/hour=1"]
96+
new PartitionListRequest(null, ["dateint=20170101/hour=1"],false, null,
97+
new Sort(), null ) | [["dateint=20170101/hour=1", 1234500000, 1234500000, "metacat_test"]]
98+
new PartitionListRequest(null, null, false, null,
99+
new Sort(), null) | [["dateint=20170101/hour=1", 1234500000, 1234500000, "metacat_test"], ["dateint=20170102/hour=1", 1234500000, 1234500000, "metacat_test"], ["dateint=20170103/hour=1", 1234500000, 1234500000, "metacat_test"]]
100+
new PartitionListRequest(null, null, false, null,
101+
new Sort(null, SortOrder.DESC), null) | [["dateint=20170103/hour=1", 1234500000, 1234500000, "metacat_test"], ["dateint=20170102/hour=1", 1234500000, 1234500000, "metacat_test"], ["dateint=20170101/hour=1", 1234500000, 1234500000, "metacat_test"]]
93102
}
94103

95104
def "Test for get iceberg table partitionKeys" (){

metacat-testdata-provider/src/main/groovy/com/netflix/metacat/testdata/provider/MetacatDataInfoProvider.groovy

+1-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class MetacatDataInfoProvider {
116116
.serde( StorageInfo.builder().owner("test")
117117
.build())
118118
.metadata ( ['table_type': 'ICEBERG'])
119-
.auditInfo( AuditInfo.builder().build())
119+
.auditInfo( AuditInfo.builder().createdBy("metacat_test").build())
120120
.build()
121121
]
122122

0 commit comments

Comments
 (0)