Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Json to Avro converter supports capturing failures #23

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,22 @@ JsonAvroConverter converter = JsonAvroConverter.builder()

By default, both `_ab_additional_properties` and `_airbyte_additional_properties` are the additional properties field names on the Json object.

### Field Conversion Failure Listener

A listener can be set to react to conversion failures at the field level. It will be called with metadata about the field and failure, and it may do any of the following:

* return a replacement value for the field
* call `pushPostProcessingAction` to register a function to apply to the record (eg, to add metadata about the failure)
* (re)throw an exception if the failure is unrecoverable

Note that it may not edit the record itself. This is to avoid race conditions and other issues that might arise from modifying the record while it is being processed.

```java
JsonAvroConverter converter = JsonAvroConverter.builder()
.setFieldConversionFailureListener(listener)
.build();
```

## Build
- The build is upgraded to use Java 14 and Gradle 7.2 to match the build environment of Airbyte.
- Maven staging and publishing is removed because they are incompatible with the new build environment.
Expand Down
4 changes: 2 additions & 2 deletions converter/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ buildscript {
}
}
dependencies {
classpath 'com.commercehub.gradle.plugin:gradle-avro-plugin:0.14.0'
classpath 'com.github.davidmc24.gradle.plugin:gradle-avro-plugin:1.0.0'
}
}

Expand All @@ -20,7 +20,7 @@ plugins {
id 'pmd'
}

apply plugin: 'com.commercehub.gradle.plugin.avro'
apply plugin: 'com.github.davidmc24.gradle.plugin.avro'
apply plugin: 'idea'

configurations {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package tech.allegro.schema.json2avro.converter;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Function;

public abstract class FieldConversionFailureListener {
/**
* This is to support behavior like v2 destinations change capture.
*
* Specifically, when a field fails to convert:
* * the field, change, and reason are added to `_airbyte_metadata.changes[]`
* * the field is nulled or truncated
*
* At the time of failure, the _airbyte_metadata.changes[] field might
* * exist and be empty
* * exist and already contain changes
* * not have been parsed yet (meta == null)
* * have been parsed, but contain a changes field that has not been parsed (meta.changes == null)
*
* Therefore, the simplest general feature that will support the desired behavior is
* * listener may return a new value for the affected field only
* * listener may not mutate any other part of the record on failure
* * listener may only push post-processing actions for the record (after required fields definitely exist)
*
*/

private final List<Function<GenericData.Record, GenericData.Record>> postProcessingActions = new LinkedList<>();

protected final void pushPostProcessingAction(Function<GenericData.Record, GenericData.Record> action) {
postProcessingActions.add(action);
}

@Nullable
public abstract Object onFieldConversionFailure(@Nonnull String avroName,
@Nonnull String originalName,
@Nonnull Schema schema,
@Nonnull Object value,
@Nonnull String path,
@Nonnull Exception exception);

@Nonnull
public final GenericData.Record applyPostProcessingActions(@Nonnull GenericData.Record record) {
for (Function<GenericData.Record, GenericData.Record> action : postProcessingActions) {
record = action.apply(record);
}
postProcessingActions.clear();
return record;
}

public final void clearPostProcessingActions() {
postProcessingActions.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public Builder setAvroAdditionalPropsFieldName(String avroAdditionalPropsFieldNa
return this;
}

public Builder setFieldConversionFailureListener(FieldConversionFailureListener listener) {
recordReaderBuilder.setFieldConversionFailureListener(listener);
return this;
}

public JsonAvroConverter build() {
return new JsonAvroConverter(recordReaderBuilder.build());
}
Expand Down
Loading
Loading