Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hackday: Protobuf! #52

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions protocol-models/build.gradle
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import org.jsonschema2pojo.SourceType

plugins {
id 'com.google.protobuf' version '0.9.4'
id 'com.github.eirnym.js2p' version '1.0'
}

dependencies {
implementation 'javax.validation:validation-api:1.1.0.Final'
implementation 'org.apache.commons:commons-lang3:3.11'

implementation 'com.google.protobuf:protobuf-java:3.25.1'
}

jsonSchema2Pojo {
@@ -40,3 +43,13 @@ task generateTypescriptProtocolClassFiles(type: Exec) {

commandLine 'bin/generate-typescript-classes-docker.sh'
}

protobuf {
protoc {
artifact = 'com.google.protobuf:protoc:3.25.1' // Use the appropriate version
}

generateProtoTasks {
ofSourceSet('main')
}
}
Original file line number Diff line number Diff line change
@@ -4,6 +4,11 @@

package io.airbyte.protocol.models;

import io.airbyte.protocol.protos.AirbyteCatalog;
import io.airbyte.protocol.protos.AirbyteMessage;
import io.airbyte.protocol.protos.AirbyteStreamState;
import io.airbyte.protocol.protos.StreamDescriptor;

import java.util.List;

public class Field extends CommonField<JsonSchemaType> {
@@ -31,4 +36,11 @@ public List<Field> getSubFields() {
return subFields;
}

public static void main(String[] args) {
System.out.println("poop");

System.out.println(AirbyteStreamState.getDefaultInstance().isInitialized());
System.out.println(AirbyteStreamState.newBuilder().setStreamDescriptor(StreamDescriptor.newBuilder().setName("Test").build()));
System.out.println(AirbyteMessage.newBuilder().build());
}
}
211 changes: 211 additions & 0 deletions protocol-models/src/main/proto/airbyte_protocol.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
syntax = "proto3";

package airbyte.protocol;

option java_multiple_files = true;
option java_package = "io.airbyte.protocol.protos";

import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";

// Enumerations
enum SyncMode {
FULL_REFRESH = 0;
INCREMENTAL = 1;
}

enum DestinationSyncMode {
APPEND = 0;
OVERWRITE = 1;
APPEND_DEDUP = 2;
}

enum AirbyteStreamStatus {
STARTED = 0;
RUNNING = 1;
COMPLETE = 2;
INCOMPLETE = 3;
}

// Message Definitions
message AirbyteMessage {
oneof message {
AirbyteRecordMessage record = 1;
AirbyteStateMessage state = 2;
AirbyteLogMessage log = 3;
AirbyteTraceMessage trace = 4;
AirbyteControlMessage control = 5;
// ... other message types
}
}

message AirbyteRecordMessage {
string stream = 1; // required
google.protobuf.Timestamp emitted_at = 2; // required
ObjectData data = 3; // required
string namespace = 4;
}


message ObjectData {
map<string, Value> data = 1;
}

message ValueList {
repeated Value values = 1;
}

message Value {
oneof kind {
NullValue null_value = 1;
int64 int_value = 2;
double float_value = 3;
string string_value = 4;
bytes bytes_value = 5;
ObjectData object_value = 6;
ValueList list_value = 7;
}
}

enum NullValue {
NULL_VALUE = 0;
}

message StreamDescriptor {
string name = 1; // required
string namespace = 2;
}

message AirbyteLogMessage {
// ? Can enum have the same number as other fields ?
enum LEVEL {
INFO = 0;
WARN = 1;
ERROR = 2;
FATAL = 3;
}
LEVEL level = 1; // required
string message = 2; // required
google.protobuf.Timestamp emitted_at = 3;
string stack_trace = 4;
}

message AirbyteTraceMessage {
oneof trace_type {
AirbyteErrorTraceMessage error = 1;
AirbyteEstimateTraceMessage estimate = 2;
AirbyteStreamStatusTraceMessage stream_status = 3;
AirbyteAnalyticsTraceMessage analytics = 4;
// ... other trace types
}
}

message AirbyteErrorTraceMessage {
string message = 1; // required
string internal_message = 2;
string stack_trace = 3;

enum FAILURE_TYPE {
system_error = 0;
config_error = 1;
}
FAILURE_TYPE failure_type = 4;
StreamDescriptor stream_descriptor = 5;
}

message AirbyteStreamStatusTraceMessage {
StreamDescriptor stream_descriptor = 1; // required
AirbyteStreamStatus status = 2; // required
}

// CATALOG MESSAGES. SKIP FOR HACKDAY.
message AirbyteConnectionStatus {
string status = 1;
string message = 2;
// ... other fields as necessary
}

message AirbyteCatalog {
repeated AirbyteStream streams = 1;
// ... other fields as necessary
}

message AirbyteStream {
string name = 1;
google.protobuf.Any json_schema = 2;
repeated SyncMode supported_sync_modes = 3;
bool source_defined_cursor = 4;
repeated string default_cursor_field = 5;
repeated string source_defined_primary_key = 6;
string namespace = 7;
// ... other fields as necessary
}

message ConfiguredAirbyteCatalog {
repeated ConfiguredAirbyteStream streams = 1;
// ... other fields as necessary
}

message ConfiguredAirbyteStream {
AirbyteStream stream = 1;
SyncMode sync_mode = 2;
repeated string cursor_field = 3;
DestinationSyncMode destination_sync_mode = 4;
repeated string primary_key = 5;
// ... other fields as necessary
}

// SKIP ALL THESE FOR NOW FOR HACKDAY //
message AirbyteStateMessage {
AirbyteStateBlob data = 1;
// ... other fields as necessary
}

message AirbyteStreamState {
StreamDescriptor stream_descriptor = 1;
AirbyteStateBlob stream_state = 2;
// ... other fields as necessary
}

message AirbyteGlobalState {
AirbyteStateBlob shared_state = 1;
repeated AirbyteStreamState stream_states = 2;
// ... other fields as necessary
}

message AirbyteStateBlob {
google.protobuf.Any data = 1;
// ... other fields as necessary
}

message AirbyteStateStats {
double recordCount = 1;
// ... other fields as necessary
}

message AirbyteControlMessage {
string type = 1;
google.protobuf.Timestamp emitted_at = 2;
AirbyteControlConnectorConfigMessage connectorConfig = 3;
// ... other fields as necessary
}

message AirbyteControlConnectorConfigMessage {
map<string, google.protobuf.Any> config = 1;
// ... other fields as necessary
}

message AirbyteEstimateTraceMessage {
string name = 1;
string type = 2;
string namespace = 3;
int64 row_estimate = 4;
int64 byte_estimate = 5;
// ... other fields as necessary
}

message AirbyteAnalyticsTraceMessage {
string type = 1;
string value = 2;
// ... other fields as necessary
}
Original file line number Diff line number Diff line change
@@ -310,4 +310,5 @@ void testCatalogDiffStreamChangeWithNoFieldTransform() throws IOException {

Assertions.assertThat(actualDiff).containsExactlyInAnyOrderElementsOf(expectedDiff);
}

}