From 90cdb1bec5b33aa508953d7b31f3599b808e258d Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Sun, 19 Nov 2023 17:35:24 -0800 Subject: [PATCH 1/6] V0. --- protocol-models/build.gradle | 14 ++ .../v0/airbyte_protocol.proto | 174 ++++++++++++++++++ .../protocol/models/CatalogHelpersTest.java | 1 + 3 files changed, 189 insertions(+) create mode 100644 protocol-models/src/main/resources/airbyte_protocol_proto/v0/airbyte_protocol.proto diff --git a/protocol-models/build.gradle b/protocol-models/build.gradle index 1f85b3c..5803d8f 100644 --- a/protocol-models/build.gradle +++ b/protocol-models/build.gradle @@ -1,12 +1,15 @@ import org.jsonschema2pojo.SourceType plugins { + id 'com.google.protobuf' version '0.9.4' id 'com.github.eirnym.js2p' version '1.0' } dependencies { implementation 'javax.validation:validation-api:1.1.0.Final' implementation 'org.apache.commons:commons-lang3:3.11' + + implementation 'com.google.protobuf:protobuf-java:3.25.1' } jsonSchema2Pojo { @@ -40,3 +43,14 @@ task generateTypescriptProtocolClassFiles(type: Exec) { commandLine 'bin/generate-typescript-classes-docker.sh' } + +protobuf { + protoc { + artifact = 'com.google.protobuf:protoc:3.25.1' // Use the appropriate version + } + + generateProtoTasks { + ofSourceSet('main')*.protoPath = 'src/main/resources/airbyte_protocol_proto/v0' +// ofSourceSet('test')*.protoPath = 'src/test/proto' + } +} diff --git a/protocol-models/src/main/resources/airbyte_protocol_proto/v0/airbyte_protocol.proto b/protocol-models/src/main/resources/airbyte_protocol_proto/v0/airbyte_protocol.proto new file mode 100644 index 0000000..2b3d6a1 --- /dev/null +++ b/protocol-models/src/main/resources/airbyte_protocol_proto/v0/airbyte_protocol.proto @@ -0,0 +1,174 @@ +syntax = "proto3"; + +package airbyte; + +import "google/protobuf/any.proto"; +import "google/protobuf/timestamp.proto"; + +// Enumerations +enum SyncMode { + FULL_REFRESH = 0; + INCREMENTAL = 1; + // ... other sync modes +} + +enum DestinationSyncMode { + APPEND = 0; + OVERWRITE = 1; + APPEND_DEDUP = 2; + // ... other destination sync modes +} + +enum AirbyteStreamStatus { + ACTIVE = 0; + DEPRECATED = 1; + // ... other stream statuses +} + +// Message Definitions +message AirbyteMessage { + oneof message { + AirbyteRecordMessage record = 1; + AirbyteStateMessage state = 2; + AirbyteLogMessage log = 3; + AirbyteTraceMessage trace = 4; + AirbyteControlMessage control = 5; + // ... other message types + } +} + +message AirbyteRecordMessage { + string stream = 1; + google.protobuf.Timestamp emitted_at = 2; + google.protobuf.Any data = 3; + // ... other fields as necessary +} + +message AirbyteStateMessage { + AirbyteStateBlob data = 1; + // ... other fields as necessary +} + +message AirbyteLogMessage { + string level = 1; + string message = 2; + google.protobuf.Timestamp emitted_at = 3; + // ... other fields as necessary +} + +message AirbyteTraceMessage { + oneof trace_type { + AirbyteErrorTraceMessage error = 1; + AirbyteEstimateTraceMessage estimate = 2; + AirbyteStreamStatusTraceMessage stream_status = 3; + AirbyteAnalyticsTraceMessage analytics = 4; + // ... other trace types + } +} + +// Additional message definitions based on JSON schema +message AirbyteStreamState { + StreamDescriptor stream_descriptor = 1; + AirbyteStateBlob stream_state = 2; + // ... other fields as necessary +} + +message AirbyteGlobalState { + AirbyteStateBlob shared_state = 1; + repeated AirbyteStreamState stream_states = 2; + // ... other fields as necessary +} + +message StreamDescriptor { + string name = 1; + string namespace = 2; + // ... other fields as necessary +} + +message AirbyteStateBlob { + google.protobuf.Any data = 1; + // ... other fields as necessary +} + +message AirbyteStateStats { + double recordCount = 1; + // ... other fields as necessary +} + +message AirbyteErrorTraceMessage { + string message = 1; + string internal_message = 2; + string stack_trace = 3; + string failure_type = 4; + StreamDescriptor stream_descriptor = 5; + // ... other fields as necessary +} + +message AirbyteEstimateTraceMessage { + string name = 1; + string type = 2; + string namespace = 3; + int64 row_estimate = 4; + int64 byte_estimate = 5; + // ... other fields as necessary +} + +message AirbyteStreamStatusTraceMessage { + StreamDescriptor stream_descriptor = 1; + AirbyteStreamStatus status = 2; + // ... other fields as necessary +} + +message AirbyteAnalyticsTraceMessage { + string type = 1; + string value = 2; + // ... other fields as necessary +} + +message AirbyteControlMessage { + string type = 1; + google.protobuf.Timestamp emitted_at = 2; + AirbyteControlConnectorConfigMessage connectorConfig = 3; + // ... other fields as necessary +} + +message AirbyteControlConnectorConfigMessage { + map config = 1; + // ... other fields as necessary +} + +message AirbyteConnectionStatus { + string status = 1; + string message = 2; + // ... other fields as necessary +} + +message AirbyteCatalog { + repeated AirbyteStream streams = 1; + // ... other fields as necessary +} + +message AirbyteStream { + string name = 1; + google.protobuf.Any json_schema = 2; + repeated SyncMode supported_sync_modes = 3; + bool source_defined_cursor = 4; + repeated string default_cursor_field = 5; + repeated string source_defined_primary_key = 6; + string namespace = 7; + // ... other fields as necessary +} + +message ConfiguredAirbyteCatalog { + repeated ConfiguredAirbyteStream streams = 1; + // ... other fields as necessary +} + +message ConfiguredAirbyteStream { + AirbyteStream stream = 1; + SyncMode sync_mode = 2; + repeated string cursor_field = 3; + DestinationSyncMode destination_sync_mode = 4; + repeated string primary_key = 5; + // ... other fields as necessary +} diff --git a/protocol-models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java b/protocol-models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java index 97bde7a..2467c9e 100644 --- a/protocol-models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java +++ b/protocol-models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java @@ -310,4 +310,5 @@ void testCatalogDiffStreamChangeWithNoFieldTransform() throws IOException { Assertions.assertThat(actualDiff).containsExactlyInAnyOrderElementsOf(expectedDiff); } + } From de7e412f299ebbea8aa60b5353df7f1e27075d2c Mon Sep 17 00:00:00 2001 From: Michel Tricot Date: Mon, 20 Nov 2023 10:16:58 -0800 Subject: [PATCH 2/6] progress --- .../v0 => proto}/airbyte_protocol.proto | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) rename protocol-models/src/main/{resources/airbyte_protocol_proto/v0 => proto}/airbyte_protocol.proto (97%) diff --git a/protocol-models/src/main/resources/airbyte_protocol_proto/v0/airbyte_protocol.proto b/protocol-models/src/main/proto/airbyte_protocol.proto similarity index 97% rename from protocol-models/src/main/resources/airbyte_protocol_proto/v0/airbyte_protocol.proto rename to protocol-models/src/main/proto/airbyte_protocol.proto index 2b3d6a1..bc08b77 100644 --- a/protocol-models/src/main/resources/airbyte_protocol_proto/v0/airbyte_protocol.proto +++ b/protocol-models/src/main/proto/airbyte_protocol.proto @@ -1,6 +1,9 @@ syntax = "proto3"; -package airbyte; +package airbyte.protocol; + +option java_multiple_files = true; +option java_package = "io.airbyte.protocol.protos"; import "google/protobuf/any.proto"; import "google/protobuf/timestamp.proto"; @@ -9,14 +12,12 @@ import "google/protobuf/timestamp.proto"; enum SyncMode { FULL_REFRESH = 0; INCREMENTAL = 1; - // ... other sync modes } enum DestinationSyncMode { APPEND = 0; OVERWRITE = 1; APPEND_DEDUP = 2; - // ... other destination sync modes } enum AirbyteStreamStatus { From af9603f7722440a9eeeeb8b2de112bda26ef5ac4 Mon Sep 17 00:00:00 2001 From: Michel Tricot Date: Mon, 20 Nov 2023 10:17:09 -0800 Subject: [PATCH 3/6] progress --- protocol-models/build.gradle | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/protocol-models/build.gradle b/protocol-models/build.gradle index 5803d8f..22244ca 100644 --- a/protocol-models/build.gradle +++ b/protocol-models/build.gradle @@ -50,7 +50,6 @@ protobuf { } generateProtoTasks { - ofSourceSet('main')*.protoPath = 'src/main/resources/airbyte_protocol_proto/v0' -// ofSourceSet('test')*.protoPath = 'src/test/proto' + ofSourceSet('main') } } From 2e1f30369d11274c8aea64d15d389845ef487776 Mon Sep 17 00:00:00 2001 From: Michel Tricot Date: Mon, 20 Nov 2023 13:13:11 -0800 Subject: [PATCH 4/6] remove any --- .../src/main/proto/airbyte_protocol.proto | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/protocol-models/src/main/proto/airbyte_protocol.proto b/protocol-models/src/main/proto/airbyte_protocol.proto index bc08b77..ae41d87 100644 --- a/protocol-models/src/main/proto/airbyte_protocol.proto +++ b/protocol-models/src/main/proto/airbyte_protocol.proto @@ -41,7 +41,7 @@ message AirbyteMessage { message AirbyteRecordMessage { string stream = 1; google.protobuf.Timestamp emitted_at = 2; - google.protobuf.Any data = 3; + ObjectData data = 3; // ... other fields as necessary } @@ -173,3 +173,27 @@ message ConfiguredAirbyteStream { repeated string primary_key = 5; // ... other fields as necessary } + +message ObjectData { + map data = 1; +} + +message Value { + oneof kind { + NullValue null_value = 1; + int64 int_value = 2; + double float_value = 3; + string string_value = 4; + bytes bytes_value = 5; + ObjectData object_value = 6; + ValueList list_value = 7; + } +} + +enum NullValue { + NULL_VALUE = 0; +} + +message ValueList { + repeated Value values = 1; +} From a1a0be77ed102a6933f3c92fb3a4335758249677 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Mon, 20 Nov 2023 13:16:58 -0800 Subject: [PATCH 5/6] Figure out the data field. --- .../io/airbyte/protocol/models/Field.java | 12 ++ .../src/main/proto/airbyte_protocol.proto | 121 +++++++++--------- 2 files changed, 73 insertions(+), 60 deletions(-) diff --git a/protocol-models/src/main/java/io/airbyte/protocol/models/Field.java b/protocol-models/src/main/java/io/airbyte/protocol/models/Field.java index e7f5b77..36dd129 100644 --- a/protocol-models/src/main/java/io/airbyte/protocol/models/Field.java +++ b/protocol-models/src/main/java/io/airbyte/protocol/models/Field.java @@ -4,6 +4,11 @@ package io.airbyte.protocol.models; +import io.airbyte.protocol.protos.AirbyteCatalog; +import io.airbyte.protocol.protos.AirbyteMessage; +import io.airbyte.protocol.protos.AirbyteStreamState; +import io.airbyte.protocol.protos.StreamDescriptor; + import java.util.List; public class Field extends CommonField { @@ -31,4 +36,11 @@ public List getSubFields() { return subFields; } + public static void main(String[] args) { + System.out.println("poop"); + + System.out.println(AirbyteStreamState.getDefaultInstance().isInitialized()); + System.out.println(AirbyteStreamState.newBuilder().setStreamDescriptor(StreamDescriptor.newBuilder().setName("Test").build())); + System.out.println(AirbyteMessage.newBuilder().build()); + } } diff --git a/protocol-models/src/main/proto/airbyte_protocol.proto b/protocol-models/src/main/proto/airbyte_protocol.proto index ae41d87..435592b 100644 --- a/protocol-models/src/main/proto/airbyte_protocol.proto +++ b/protocol-models/src/main/proto/airbyte_protocol.proto @@ -42,11 +42,13 @@ message AirbyteRecordMessage { string stream = 1; google.protobuf.Timestamp emitted_at = 2; ObjectData data = 3; + optional string namespace = 4; // ... other fields as necessary } -message AirbyteStateMessage { - AirbyteStateBlob data = 1; +message StreamDescriptor { + string name = 1; + string namespace = 2; // ... other fields as necessary } @@ -67,35 +69,6 @@ message AirbyteTraceMessage { } } -// Additional message definitions based on JSON schema -message AirbyteStreamState { - StreamDescriptor stream_descriptor = 1; - AirbyteStateBlob stream_state = 2; - // ... other fields as necessary -} - -message AirbyteGlobalState { - AirbyteStateBlob shared_state = 1; - repeated AirbyteStreamState stream_states = 2; - // ... other fields as necessary -} - -message StreamDescriptor { - string name = 1; - string namespace = 2; - // ... other fields as necessary -} - -message AirbyteStateBlob { - google.protobuf.Any data = 1; - // ... other fields as necessary -} - -message AirbyteStateStats { - double recordCount = 1; - // ... other fields as necessary -} - message AirbyteErrorTraceMessage { string message = 1; string internal_message = 2; @@ -105,39 +78,12 @@ message AirbyteErrorTraceMessage { // ... other fields as necessary } -message AirbyteEstimateTraceMessage { - string name = 1; - string type = 2; - string namespace = 3; - int64 row_estimate = 4; - int64 byte_estimate = 5; - // ... other fields as necessary -} - message AirbyteStreamStatusTraceMessage { StreamDescriptor stream_descriptor = 1; AirbyteStreamStatus status = 2; // ... other fields as necessary } -message AirbyteAnalyticsTraceMessage { - string type = 1; - string value = 2; - // ... other fields as necessary -} - -message AirbyteControlMessage { - string type = 1; - google.protobuf.Timestamp emitted_at = 2; - AirbyteControlConnectorConfigMessage connectorConfig = 3; - // ... other fields as necessary -} - -message AirbyteControlConnectorConfigMessage { - map config = 1; - // ... other fields as necessary -} - message AirbyteConnectionStatus { string status = 1; string message = 2; @@ -178,6 +124,10 @@ message ObjectData { map data = 1; } +message ValueList { + repeated Value values = 1; +} + message Value { oneof kind { NullValue null_value = 1; @@ -194,6 +144,57 @@ enum NullValue { NULL_VALUE = 0; } -message ValueList { - repeated Value values = 1; +// SKIP ALL THESE FOR NOW FOR HACKDAY // +message AirbyteStateMessage { + AirbyteStateBlob data = 1; + // ... other fields as necessary +} + +message AirbyteStreamState { + StreamDescriptor stream_descriptor = 1; + AirbyteStateBlob stream_state = 2; + // ... other fields as necessary +} + +message AirbyteGlobalState { + AirbyteStateBlob shared_state = 1; + repeated AirbyteStreamState stream_states = 2; + // ... other fields as necessary +} + +message AirbyteStateBlob { + google.protobuf.Any data = 1; + // ... other fields as necessary +} + +message AirbyteStateStats { + double recordCount = 1; + // ... other fields as necessary +} + +message AirbyteControlMessage { + string type = 1; + google.protobuf.Timestamp emitted_at = 2; + AirbyteControlConnectorConfigMessage connectorConfig = 3; + // ... other fields as necessary +} + +message AirbyteControlConnectorConfigMessage { + map config = 1; + // ... other fields as necessary +} + +message AirbyteEstimateTraceMessage { + string name = 1; + string type = 2; + string namespace = 3; + int64 row_estimate = 4; + int64 byte_estimate = 5; + // ... other fields as necessary +} + +message AirbyteAnalyticsTraceMessage { + string type = 1; + string value = 2; + // ... other fields as necessary } From c9473edd63a27f8d7005bb996b46e3c41ec174f6 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Mon, 20 Nov 2023 13:40:50 -0800 Subject: [PATCH 6/6] First complete V0 protocol version. --- .../src/main/proto/airbyte_protocol.proto | 97 +++++++++++-------- 1 file changed, 54 insertions(+), 43 deletions(-) diff --git a/protocol-models/src/main/proto/airbyte_protocol.proto b/protocol-models/src/main/proto/airbyte_protocol.proto index 435592b..211301c 100644 --- a/protocol-models/src/main/proto/airbyte_protocol.proto +++ b/protocol-models/src/main/proto/airbyte_protocol.proto @@ -21,9 +21,10 @@ enum DestinationSyncMode { } enum AirbyteStreamStatus { - ACTIVE = 0; - DEPRECATED = 1; - // ... other stream statuses + STARTED = 0; + RUNNING = 1; + COMPLETE = 2; + INCOMPLETE = 3; } // Message Definitions @@ -39,24 +40,54 @@ message AirbyteMessage { } message AirbyteRecordMessage { - string stream = 1; - google.protobuf.Timestamp emitted_at = 2; - ObjectData data = 3; - optional string namespace = 4; - // ... other fields as necessary + string stream = 1; // required + google.protobuf.Timestamp emitted_at = 2; // required + ObjectData data = 3; // required + string namespace = 4; +} + + +message ObjectData { + map data = 1; +} + +message ValueList { + repeated Value values = 1; +} + +message Value { + oneof kind { + NullValue null_value = 1; + int64 int_value = 2; + double float_value = 3; + string string_value = 4; + bytes bytes_value = 5; + ObjectData object_value = 6; + ValueList list_value = 7; + } +} + +enum NullValue { + NULL_VALUE = 0; } message StreamDescriptor { - string name = 1; + string name = 1; // required string namespace = 2; - // ... other fields as necessary } message AirbyteLogMessage { - string level = 1; - string message = 2; + // ? Can enum have the same number as other fields ? + enum LEVEL { + INFO = 0; + WARN = 1; + ERROR = 2; + FATAL = 3; + } + LEVEL level = 1; // required + string message = 2; // required google.protobuf.Timestamp emitted_at = 3; - // ... other fields as necessary + string stack_trace = 4; } message AirbyteTraceMessage { @@ -70,20 +101,24 @@ message AirbyteTraceMessage { } message AirbyteErrorTraceMessage { - string message = 1; + string message = 1; // required string internal_message = 2; string stack_trace = 3; - string failure_type = 4; + + enum FAILURE_TYPE { + system_error = 0; + config_error = 1; + } + FAILURE_TYPE failure_type = 4; StreamDescriptor stream_descriptor = 5; - // ... other fields as necessary } message AirbyteStreamStatusTraceMessage { - StreamDescriptor stream_descriptor = 1; - AirbyteStreamStatus status = 2; - // ... other fields as necessary + StreamDescriptor stream_descriptor = 1; // required + AirbyteStreamStatus status = 2; // required } +// CATALOG MESSAGES. SKIP FOR HACKDAY. message AirbyteConnectionStatus { string status = 1; string message = 2; @@ -120,30 +155,6 @@ message ConfiguredAirbyteStream { // ... other fields as necessary } -message ObjectData { - map data = 1; -} - -message ValueList { - repeated Value values = 1; -} - -message Value { - oneof kind { - NullValue null_value = 1; - int64 int_value = 2; - double float_value = 3; - string string_value = 4; - bytes bytes_value = 5; - ObjectData object_value = 6; - ValueList list_value = 7; - } -} - -enum NullValue { - NULL_VALUE = 0; -} - // SKIP ALL THESE FOR NOW FOR HACKDAY // message AirbyteStateMessage { AirbyteStateBlob data = 1;