From a559a027add41aed02af04d4a4c518da5b9ea7c0 Mon Sep 17 00:00:00 2001 From: David Leifker Date: Wed, 29 Jan 2025 16:42:04 -0600 Subject: [PATCH] fix(json-converter): fix iceberg json converter --- metadata-ingestion/src/datahub/entrypoints.py | 14 +++- .../rest/common/IcebergJsonConverter.java | 83 +++++++++++++++++++ .../rest/common/IcebergSpringWebConfig.java | 22 +---- .../config/SpringWebSchemaRegistryConfig.java | 17 +--- metadata-service/war/build.gradle | 1 + .../java/com/linkedin/gms/ServletConfig.java | 16 ++++ 6 files changed, 114 insertions(+), 39 deletions(-) create mode 100644 metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/rest/common/IcebergJsonConverter.java diff --git a/metadata-ingestion/src/datahub/entrypoints.py b/metadata-ingestion/src/datahub/entrypoints.py index cdcbffc5c5c2b..5ae465bbc89ee 100644 --- a/metadata-ingestion/src/datahub/entrypoints.py +++ b/metadata-ingestion/src/datahub/entrypoints.py @@ -20,7 +20,6 @@ from datahub.cli.env_utils import get_boolean_env_variable from datahub.cli.exists_cli import exists from datahub.cli.get_cli import get -from datahub.cli.iceberg_cli import iceberg from datahub.cli.ingest_cli import ingest from datahub.cli.migrate import migrate from datahub.cli.put_cli import put @@ -183,7 +182,18 @@ def init(use_password: bool = False) -> None: datahub.add_command(datacontract) datahub.add_command(assertions) datahub.add_command(container) -datahub.add_command(iceberg) + +try: + from datahub.cli.iceberg_cli import iceberg + + datahub.add_command(iceberg) +except ImportError as e: + logger.debug(f"Failed to load datahub iceberg command: {e}") + datahub.add_command( + make_shim_command( + "iceberg", "run `pip install 'acryl-datahub[iceberg-catalog]'`" + ) + ) try: from datahub.cli.lite_cli import lite diff --git a/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/rest/common/IcebergJsonConverter.java b/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/rest/common/IcebergJsonConverter.java new file mode 100644 index 0000000000000..4936f09fc42c3 --- /dev/null +++ b/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/rest/common/IcebergJsonConverter.java @@ -0,0 +1,83 @@ +package io.datahubproject.iceberg.catalog.rest.common; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.lang.reflect.GenericArrayType; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.lang.reflect.WildcardType; +import javax.annotation.Nonnull; +import org.springframework.http.MediaType; +import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; + +public class IcebergJsonConverter extends MappingJackson2HttpMessageConverter { + private static final String ICEBERG_PACKAGE_PREFIX = "org.apache.iceberg."; + + public IcebergJsonConverter(ObjectMapper objectMapper) { + super(objectMapper); + } + + @Override + protected boolean supports(@Nonnull Class clazz) { + return isClassInPackage(clazz); + } + + @Override + public boolean canRead(@Nonnull Type type, Class contextClass, MediaType mediaType) { + return hasTypeInPackage(type) && super.canRead(type, contextClass, mediaType); + } + + @Override + public boolean canWrite(@Nonnull Class clazz, MediaType mediaType) { + return isClassInPackage(clazz) && super.canWrite(clazz, mediaType); + } + + private boolean hasTypeInPackage(Type type) { + if (type instanceof Class) { + return isClassInPackage((Class) type); + } + + if (type instanceof ParameterizedType) { + ParameterizedType paramType = (ParameterizedType) type; + + // Check raw type + Type rawType = paramType.getRawType(); + if (rawType instanceof Class && isClassInPackage((Class) rawType)) { + return true; + } + + // Recursively check type arguments + for (Type typeArg : paramType.getActualTypeArguments()) { + if (hasTypeInPackage(typeArg)) { + return true; + } + } + } + + if (type instanceof WildcardType) { + WildcardType wildcardType = (WildcardType) type; + // Check upper bounds + for (Type bound : wildcardType.getUpperBounds()) { + if (hasTypeInPackage(bound)) { + return true; + } + } + // Check lower bounds + for (Type bound : wildcardType.getLowerBounds()) { + if (hasTypeInPackage(bound)) { + return true; + } + } + } + + if (type instanceof GenericArrayType) { + GenericArrayType arrayType = (GenericArrayType) type; + return hasTypeInPackage(arrayType.getGenericComponentType()); + } + + return false; + } + + private static boolean isClassInPackage(@Nonnull Class clazz) { + return clazz.getName().startsWith(ICEBERG_PACKAGE_PREFIX); + } +} diff --git a/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/rest/common/IcebergSpringWebConfig.java b/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/rest/common/IcebergSpringWebConfig.java index 954cc512da8cc..79ee41417d878 100644 --- a/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/rest/common/IcebergSpringWebConfig.java +++ b/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/rest/common/IcebergSpringWebConfig.java @@ -1,34 +1,14 @@ package io.datahubproject.iceberg.catalog.rest.common; -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.PropertyAccessor; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.PropertyNamingStrategies; import io.datahubproject.iceberg.catalog.credentials.CachingCredentialProvider; import io.datahubproject.iceberg.catalog.credentials.CredentialProvider; import io.datahubproject.iceberg.catalog.credentials.S3CredentialProvider; -import java.util.List; -import org.apache.iceberg.rest.RESTSerializers; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.converter.*; -import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; -import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; @Configuration -public class IcebergSpringWebConfig implements WebMvcConfigurer { - @Override - public void extendMessageConverters(List> converters) { - for (HttpMessageConverter converter : converters) { - if (converter instanceof MappingJackson2HttpMessageConverter jsonConverter) { - ObjectMapper objectMapper = jsonConverter.getObjectMapper(); - objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); - objectMapper.setPropertyNamingStrategy(new PropertyNamingStrategies.KebabCaseStrategy()); - RESTSerializers.registerAll(objectMapper); - break; - } - } - } +public class IcebergSpringWebConfig { @Bean public CredentialProvider credentialProvider() { diff --git a/metadata-service/schema-registry-servlet/src/main/java/io/datahubproject/openapi/schema/registry/config/SpringWebSchemaRegistryConfig.java b/metadata-service/schema-registry-servlet/src/main/java/io/datahubproject/openapi/schema/registry/config/SpringWebSchemaRegistryConfig.java index c2200eef6e60f..56b4e57530179 100644 --- a/metadata-service/schema-registry-servlet/src/main/java/io/datahubproject/openapi/schema/registry/config/SpringWebSchemaRegistryConfig.java +++ b/metadata-service/schema-registry-servlet/src/main/java/io/datahubproject/openapi/schema/registry/config/SpringWebSchemaRegistryConfig.java @@ -8,7 +8,6 @@ import io.swagger.v3.oas.annotations.info.Info; import io.swagger.v3.oas.annotations.servers.Server; import jakarta.servlet.http.HttpServletRequest; -import java.util.List; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -16,13 +15,7 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.core.annotation.Order; -import org.springframework.http.converter.ByteArrayHttpMessageConverter; -import org.springframework.http.converter.FormHttpMessageConverter; -import org.springframework.http.converter.HttpMessageConverter; -import org.springframework.http.converter.StringHttpMessageConverter; -import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; import org.springframework.web.servlet.config.annotation.EnableWebMvc; -import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; @Slf4j @EnableWebMvc @@ -35,7 +28,7 @@ havingValue = InternalSchemaRegistryFactory.TYPE) @Configuration @ComponentScan(basePackages = {"io.datahubproject.openapi.schema.registry"}) -public class SpringWebSchemaRegistryConfig implements WebMvcConfigurer { +public class SpringWebSchemaRegistryConfig { @Bean public SchemaRegistryController schemaRegistryController( @@ -44,12 +37,4 @@ public SchemaRegistryController schemaRegistryController( @Qualifier("schemaRegistryService") SchemaRegistryService schemaRegistryService) { return new SchemaRegistryController(objectMapper, request, schemaRegistryService); } - - @Override - public void configureMessageConverters(List> messageConverters) { - messageConverters.add(new StringHttpMessageConverter()); - messageConverters.add(new ByteArrayHttpMessageConverter()); - messageConverters.add(new FormHttpMessageConverter()); - messageConverters.add(new MappingJackson2HttpMessageConverter()); - } } diff --git a/metadata-service/war/build.gradle b/metadata-service/war/build.gradle index b173a1dd607f3..59e0f3d80a24a 100644 --- a/metadata-service/war/build.gradle +++ b/metadata-service/war/build.gradle @@ -24,6 +24,7 @@ dependencies { implementation project(':metadata-service:openapi-analytics-servlet') implementation project(':metadata-service:schema-registry-servlet') implementation project(':metadata-service:iceberg-catalog') + implementation 'org.apache.iceberg:iceberg-core:1.6.1' runtimeOnly project(':metadata-jobs:mce-consumer') runtimeOnly project(':metadata-jobs:mae-consumer') runtimeOnly project(':metadata-jobs:pe-consumer') diff --git a/metadata-service/war/src/main/java/com/linkedin/gms/ServletConfig.java b/metadata-service/war/src/main/java/com/linkedin/gms/ServletConfig.java index fbd8d72b76c8f..e09b38a4f1dbb 100644 --- a/metadata-service/war/src/main/java/com/linkedin/gms/ServletConfig.java +++ b/metadata-service/war/src/main/java/com/linkedin/gms/ServletConfig.java @@ -7,16 +7,21 @@ import com.datahub.gms.servlet.Config; import com.datahub.gms.servlet.ConfigSearchExport; import com.datahub.gms.servlet.HealthCheck; +import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.core.StreamReadConstraints; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; import com.linkedin.r2.transport.http.server.RAPJakartaServlet; import com.linkedin.restli.server.RestliHandlerServlet; +import io.datahubproject.iceberg.catalog.rest.common.IcebergJsonConverter; import io.datahubproject.openapi.converter.StringToChangeCategoryConverter; import java.util.List; import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; +import org.apache.iceberg.rest.RESTSerializers; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.web.servlet.FilterRegistrationBean; import org.springframework.boot.web.servlet.ServletRegistrationBean; @@ -129,6 +134,7 @@ public void configureMessageConverters(List> messageConv messageConverters.add(new StringHttpMessageConverter()); messageConverters.add(new ByteArrayHttpMessageConverter()); messageConverters.add(new FormHttpMessageConverter()); + messageConverters.add(createIcebergMessageConverter()); ObjectMapper objectMapper = new ObjectMapper(); int maxSize = @@ -145,6 +151,16 @@ public void configureMessageConverters(List> messageConv messageConverters.add(jsonConverter); } + private HttpMessageConverter createIcebergMessageConverter() { + ObjectMapper objectMapper = new ObjectMapper(); + MappingJackson2HttpMessageConverter jsonConverter = new IcebergJsonConverter(objectMapper); + + objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + objectMapper.setPropertyNamingStrategy(new PropertyNamingStrategies.KebabCaseStrategy()); + RESTSerializers.registerAll(objectMapper); + return jsonConverter; + } + @Override public void addFormatters(FormatterRegistry registry) { registry.addConverter(new StringToChangeCategoryConverter());