Skip to content

Commit 7313b9a

Browse files
authored
[java] Accept byte array from avro fields for BQ (#33712)
1 parent 1b0d147 commit 7313b9a

File tree

2 files changed

+31
-11
lines changed

2 files changed

+31
-11
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public class AvroGenericRecordToStorageApiProto {
8686
.put(Schema.Type.STRING, Object::toString)
8787
.put(Schema.Type.BOOLEAN, Function.identity())
8888
.put(Schema.Type.ENUM, o -> o.toString())
89-
.put(Schema.Type.BYTES, o -> ByteString.copyFrom(((ByteBuffer) o).duplicate()))
89+
.put(Schema.Type.BYTES, AvroGenericRecordToStorageApiProto::convertBytes)
9090
.build();
9191

9292
// A map of supported logical types to their encoding functions.
@@ -145,6 +145,16 @@ static ByteString convertDecimal(LogicalType logicalType, Object value) {
145145
return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal);
146146
}
147147

148+
static ByteString convertBytes(Object value) {
149+
if (value instanceof byte[]) {
150+
// for backward compatibility
151+
// this is not accepted by the avro spec, but users may have abused it
152+
return ByteString.copyFrom((byte[]) value);
153+
} else {
154+
return ByteString.copyFrom(((ByteBuffer) value).duplicate());
155+
}
156+
}
157+
148158
/**
149159
* Given an Avro Schema, returns a protocol-buffer TableSchema that can be used to write data
150160
* through BigQuery Storage API.

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ enum TestEnum {
7777
SchemaBuilder.record("TestRecord")
7878
.fields()
7979
.optionalBytes("bytesValue")
80+
.optionalBytes("byteBufferValue")
8081
.requiredInt("intValue")
8182
.optionalLong("longValue")
8283
.optionalFloat("floatValue")
@@ -138,64 +139,71 @@ enum TestEnum {
138139
.build())
139140
.addField(
140141
FieldDescriptorProto.newBuilder()
141-
.setName("intvalue")
142+
.setName("bytebuffervalue")
142143
.setNumber(2)
144+
.setType(Type.TYPE_BYTES)
145+
.setLabel(Label.LABEL_OPTIONAL)
146+
.build())
147+
.addField(
148+
FieldDescriptorProto.newBuilder()
149+
.setName("intvalue")
150+
.setNumber(3)
143151
.setType(Type.TYPE_INT64)
144152
.setLabel(Label.LABEL_OPTIONAL)
145153
.build())
146154
.addField(
147155
FieldDescriptorProto.newBuilder()
148156
.setName("longvalue")
149-
.setNumber(3)
157+
.setNumber(4)
150158
.setType(Type.TYPE_INT64)
151159
.setLabel(Label.LABEL_OPTIONAL)
152160
.build())
153161
.addField(
154162
FieldDescriptorProto.newBuilder()
155163
.setName("floatvalue")
156-
.setNumber(4)
164+
.setNumber(5)
157165
.setType(Type.TYPE_DOUBLE)
158166
.setLabel(Label.LABEL_OPTIONAL)
159167
.build())
160168
.addField(
161169
FieldDescriptorProto.newBuilder()
162170
.setName("doublevalue")
163-
.setNumber(5)
171+
.setNumber(6)
164172
.setType(Type.TYPE_DOUBLE)
165173
.setLabel(Label.LABEL_OPTIONAL)
166174
.build())
167175
.addField(
168176
FieldDescriptorProto.newBuilder()
169177
.setName("stringvalue")
170-
.setNumber(6)
178+
.setNumber(7)
171179
.setType(Type.TYPE_STRING)
172180
.setLabel(Label.LABEL_OPTIONAL)
173181
.build())
174182
.addField(
175183
FieldDescriptorProto.newBuilder()
176184
.setName("booleanvalue")
177-
.setNumber(7)
185+
.setNumber(8)
178186
.setType(Type.TYPE_BOOL)
179187
.setLabel(Label.LABEL_OPTIONAL)
180188
.build())
181189
.addField(
182190
FieldDescriptorProto.newBuilder()
183191
.setName("arrayvalue")
184-
.setNumber(8)
192+
.setNumber(9)
185193
.setType(Type.TYPE_STRING)
186194
.setLabel(Label.LABEL_REPEATED)
187195
.build())
188196
.addField(
189197
FieldDescriptorProto.newBuilder()
190198
.setName("enumvalue")
191-
.setNumber(9)
199+
.setNumber(10)
192200
.setType(Type.TYPE_STRING)
193201
.setLabel(Label.LABEL_OPTIONAL)
194202
.build())
195203
.addField(
196204
FieldDescriptorProto.newBuilder()
197205
.setName("fixedvalue")
198-
.setNumber(10)
206+
.setNumber(11)
199207
.setType(Type.TYPE_BYTES)
200208
.setLabel(Label.LABEL_REQUIRED)
201209
.build())
@@ -309,7 +317,8 @@ enum TestEnum {
309317
Instant now = Instant.now();
310318
baseRecord =
311319
new GenericRecordBuilder(BASE_SCHEMA)
312-
.set("bytesValue", ByteBuffer.wrap(BYTES))
320+
.set("bytesValue", BYTES)
321+
.set("byteBufferValue", ByteBuffer.wrap(BYTES))
313322
.set("intValue", (int) 3)
314323
.set("longValue", (long) 4)
315324
.set("floatValue", (float) 3.14)
@@ -346,6 +355,7 @@ enum TestEnum {
346355
baseProtoExpectedFields =
347356
ImmutableMap.<String, Object>builder()
348357
.put("bytesvalue", ByteString.copyFrom(BYTES))
358+
.put("bytebuffervalue", ByteString.copyFrom(BYTES))
349359
.put("intvalue", (long) 3)
350360
.put("longvalue", (long) 4)
351361
.put("floatvalue", (double) 3.14)

0 commit comments

Comments
 (0)