diff --git a/disk-buffering/build.gradle.kts b/disk-buffering/build.gradle.kts index 8bae11733..251f7a7c3 100644 --- a/disk-buffering/build.gradle.kts +++ b/disk-buffering/build.gradle.kts @@ -18,6 +18,7 @@ val protos by configurations.creating dependencies { api("io.opentelemetry:opentelemetry-sdk") implementation("io.opentelemetry:opentelemetry-api-incubator") + implementation("io.opentelemetry:opentelemetry-exporter-otlp-common") compileOnly("com.google.auto.value:auto-value-annotations") annotationProcessor("com.google.auto.value:auto-value") signature("com.toasttab.android:gummy-bears-api-21:0.12.0:coreLib@signature") @@ -63,9 +64,10 @@ wire { } root( - "opentelemetry.proto.trace.v1.TracesData", - "opentelemetry.proto.metrics.v1.MetricsData", - "opentelemetry.proto.logs.v1.LogsData", + // These are the types used by the Java SDK's OTLP exporters. + "opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest", + "opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest", + "opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest", ) } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/NoopSerializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/NoopSerializer.java new file mode 100644 index 000000000..715f538c4 --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/NoopSerializer.java @@ -0,0 +1,30 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.exporter; + +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collection; + +class NoopSerializer implements SignalSerializer { + + @Override + public NoopSerializer initialize(Collection data) { + return this; + } + + @Override + public void writeBinaryTo(OutputStream output) throws IOException {} + + @Override + public int getBinarySerializedSize() { + return 0; + } + + @Override + public void reset() {} +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java index b54e3cc16..5b2dcd186 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java @@ -38,10 +38,11 @@ public static ToDiskExporterBuilder builder(Storage storage) { return new ToDiskExporterBuilder<>(storage); } - public CompletableResultCode export(Collection data) { + public synchronized CompletableResultCode export(Collection data) { logger.log("Intercepting exporter batch.", Level.FINER); try { - if (storage.write(serializer.serialize(data))) { + serializer.initialize(data); + if (storage.write(serializer)) { return CompletableResultCode.ofSuccess(); } logger.log("Could not store batch in disk. Exporting it right away."); @@ -52,6 +53,8 @@ public CompletableResultCode export(Collection data) { Level.WARNING, e); return exportFunction.apply(data); + } finally { + serializer.reset(); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java index 069e08986..be75a3976 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java @@ -14,7 +14,7 @@ public final class ToDiskExporterBuilder { - private SignalSerializer serializer = ts -> new byte[0]; + private SignalSerializer serializer = new NoopSerializer(); private final Storage storage; diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java index 5ac0007d9..cbbb4a0ad 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java @@ -7,7 +7,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.ProtoLogsDataMapper; import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; -import io.opentelemetry.proto.logs.v1.LogsData; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.sdk.logs.data.LogRecordData; import java.io.IOException; import java.util.List; @@ -24,7 +24,8 @@ static LogRecordDataDeserializer getInstance() { @Override public List deserialize(byte[] source) throws DeserializationException { try { - return ProtoLogsDataMapper.getInstance().fromProto(LogsData.ADAPTER.decode(source)); + return ProtoLogsDataMapper.getInstance() + .fromProto(ExportLogsServiceRequest.ADAPTER.decode(source)); } catch (IOException e) { throw new DeserializationException(e); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java index 34e88b3ef..d6410d4e7 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java @@ -7,7 +7,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.metrics.ProtoMetricsDataMapper; import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; -import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.sdk.metrics.data.MetricData; import java.io.IOException; import java.util.List; @@ -24,7 +24,8 @@ static MetricDataDeserializer getInstance() { @Override public List deserialize(byte[] source) throws DeserializationException { try { - return ProtoMetricsDataMapper.getInstance().fromProto(MetricsData.ADAPTER.decode(source)); + return ProtoMetricsDataMapper.getInstance() + .fromProto(ExportMetricsServiceRequest.ADAPTER.decode(source)); } catch (IOException e) { throw new DeserializationException(e); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java index 457d5f268..eb4406ff3 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java @@ -7,7 +7,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.ProtoSpansDataMapper; import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; -import io.opentelemetry.proto.trace.v1.TracesData; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; import java.util.List; @@ -24,7 +24,8 @@ static SpanDataDeserializer getInstance() { @Override public List deserialize(byte[] source) throws DeserializationException { try { - return ProtoSpansDataMapper.getInstance().fromProto(TracesData.ADAPTER.decode(source)); + return ProtoSpansDataMapper.getInstance() + .fromProto(ExportTraceServiceRequest.ADAPTER.decode(source)); } catch (IOException e) { throw new DeserializationException(e); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/common/ByteStringMapper.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/common/ByteStringMapper.java index ca8366e8a..1234d25de 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/common/ByteStringMapper.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/common/ByteStringMapper.java @@ -16,10 +16,10 @@ public static ByteStringMapper getInstance() { } public ByteString stringToProto(String source) { - return ByteString.encodeUtf8(source); + return ByteString.decodeHex(source); } public String protoToString(ByteString source) { - return source.utf8(); + return source.hex(); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/ProtoLogsDataMapper.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/ProtoLogsDataMapper.java index 1d11c177f..021935f9a 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/ProtoLogsDataMapper.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/ProtoLogsDataMapper.java @@ -6,8 +6,8 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.common.BaseProtoSignalsDataMapper; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.logs.v1.LogRecord; -import io.opentelemetry.proto.logs.v1.LogsData; import io.opentelemetry.proto.logs.v1.ResourceLogs; import io.opentelemetry.proto.logs.v1.ScopeLogs; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; @@ -19,7 +19,7 @@ public final class ProtoLogsDataMapper extends BaseProtoSignalsDataMapper< - LogRecordData, LogRecord, LogsData, ResourceLogs, ScopeLogs> { + LogRecordData, LogRecord, ExportLogsServiceRequest, ResourceLogs, ScopeLogs> { private static final ProtoLogsDataMapper INSTANCE = new ProtoLogsDataMapper(); @@ -39,12 +39,12 @@ protected LogRecordData protoToSignalItem( } @Override - protected List getProtoResources(LogsData logsData) { + protected List getProtoResources(ExportLogsServiceRequest logsData) { return logsData.resource_logs; } @Override - protected LogsData createProtoData( + protected ExportLogsServiceRequest createProtoData( Map>> itemsByResource) { List items = new ArrayList<>(); itemsByResource.forEach( @@ -58,7 +58,7 @@ protected LogsData createProtoData( } items.add(resourceLogsBuilder.build()); }); - return new LogsData.Builder().resource_logs(items).build(); + return new ExportLogsServiceRequest.Builder().resource_logs(items).build(); } private ScopeLogs.Builder createProtoScopeBuilder(InstrumentationScopeInfo scopeInfo) { diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapper.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapper.java index a81ab9957..ad67eee1c 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapper.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapper.java @@ -6,8 +6,8 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.metrics; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.common.BaseProtoSignalsDataMapper; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.metrics.v1.Metric; -import io.opentelemetry.proto.metrics.v1.MetricsData; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; import io.opentelemetry.proto.metrics.v1.ScopeMetrics; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; @@ -19,7 +19,7 @@ public final class ProtoMetricsDataMapper extends BaseProtoSignalsDataMapper< - MetricData, Metric, MetricsData, ResourceMetrics, ScopeMetrics> { + MetricData, Metric, ExportMetricsServiceRequest, ResourceMetrics, ScopeMetrics> { private static final ProtoMetricsDataMapper INSTANCE = new ProtoMetricsDataMapper(); @@ -39,12 +39,12 @@ protected MetricData protoToSignalItem( } @Override - protected List getProtoResources(MetricsData protoData) { + protected List getProtoResources(ExportMetricsServiceRequest protoData) { return protoData.resource_metrics; } @Override - protected MetricsData createProtoData( + protected ExportMetricsServiceRequest createProtoData( Map>> itemsByResource) { List items = new ArrayList<>(); itemsByResource.forEach( @@ -58,7 +58,7 @@ protected MetricsData createProtoData( } items.add(resourceMetricsBuilder.build()); }); - return new MetricsData.Builder().resource_metrics(items).build(); + return new ExportMetricsServiceRequest.Builder().resource_metrics(items).build(); } private ScopeMetrics.Builder createProtoScopeBuilder(InstrumentationScopeInfo scopeInfo) { diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/spans/ProtoSpansDataMapper.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/spans/ProtoSpansDataMapper.java index 18acf3a1f..12697c49d 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/spans/ProtoSpansDataMapper.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/spans/ProtoSpansDataMapper.java @@ -6,10 +6,10 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.common.BaseProtoSignalsDataMapper; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.trace.v1.ResourceSpans; import io.opentelemetry.proto.trace.v1.ScopeSpans; import io.opentelemetry.proto.trace.v1.Span; -import io.opentelemetry.proto.trace.v1.TracesData; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.data.SpanData; @@ -18,7 +18,8 @@ import java.util.Map; public final class ProtoSpansDataMapper - extends BaseProtoSignalsDataMapper { + extends BaseProtoSignalsDataMapper< + SpanData, Span, ExportTraceServiceRequest, ResourceSpans, ScopeSpans> { private static final ProtoSpansDataMapper INSTANCE = new ProtoSpansDataMapper(); @@ -32,7 +33,7 @@ protected Span signalItemToProto(SpanData sourceData) { } @Override - protected List getProtoResources(TracesData protoData) { + protected List getProtoResources(ExportTraceServiceRequest protoData) { return protoData.resource_spans; } @@ -43,7 +44,7 @@ protected SpanData protoToSignalItem( } @Override - protected TracesData createProtoData( + protected ExportTraceServiceRequest createProtoData( Map>> itemsByResource) { List items = new ArrayList<>(); itemsByResource.forEach( @@ -57,7 +58,7 @@ protected TracesData createProtoData( } items.add(resourceSpansBuilder.build()); }); - return new TracesData.Builder().resource_spans(items).build(); + return new ExportTraceServiceRequest.Builder().resource_spans(items).build(); } @Override diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/LogRecordDataSerializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/LogRecordDataSerializer.java index 72c654ffe..19bb1cf93 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/LogRecordDataSerializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/LogRecordDataSerializer.java @@ -5,33 +5,41 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers; -import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.ProtoLogsDataMapper; +import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.contrib.disk.buffering.internal.utils.ProtobufTools; -import io.opentelemetry.proto.logs.v1.LogsData; +import io.opentelemetry.exporter.internal.otlp.logs.LowAllocationLogsRequestMarshaler; import io.opentelemetry.sdk.logs.data.LogRecordData; -import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.util.Collection; public final class LogRecordDataSerializer implements SignalSerializer { - private static final LogRecordDataSerializer INSTANCE = new LogRecordDataSerializer(); - private LogRecordDataSerializer() {} + private final LowAllocationLogsRequestMarshaler marshaler = + new LowAllocationLogsRequestMarshaler(); - static LogRecordDataSerializer getInstance() { - return INSTANCE; + LogRecordDataSerializer() {} + + @CanIgnoreReturnValue + @Override + public LogRecordDataSerializer initialize(Collection data) { + marshaler.initialize(data); + return this; + } + + @Override + public void writeBinaryTo(OutputStream output) throws IOException { + ProtobufTools.writeRawVarint32(marshaler.getBinarySerializedSize(), output); + marshaler.writeBinaryTo(output); + } + + @Override + public int getBinarySerializedSize() { + return marshaler.getBinarySerializedSize(); } @Override - public byte[] serialize(Collection logRecordData) { - LogsData proto = ProtoLogsDataMapper.getInstance().toProto(logRecordData); - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - int size = LogsData.ADAPTER.encodedSize(proto); - ProtobufTools.writeRawVarint32(size, out); - proto.encode(out); - return out.toByteArray(); - } catch (IOException e) { - throw new IllegalStateException(e); - } + public void reset() { + marshaler.reset(); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/MetricDataSerializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/MetricDataSerializer.java index 077d4ade5..726b3185d 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/MetricDataSerializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/MetricDataSerializer.java @@ -5,33 +5,41 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers; -import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.metrics.ProtoMetricsDataMapper; +import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.contrib.disk.buffering.internal.utils.ProtobufTools; -import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.exporter.internal.otlp.metrics.LowAllocationMetricsRequestMarshaler; import io.opentelemetry.sdk.metrics.data.MetricData; -import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.util.Collection; public final class MetricDataSerializer implements SignalSerializer { - private static final MetricDataSerializer INSTANCE = new MetricDataSerializer(); - private MetricDataSerializer() {} + private final LowAllocationMetricsRequestMarshaler marshaler = + new LowAllocationMetricsRequestMarshaler(); - static MetricDataSerializer getInstance() { - return INSTANCE; + MetricDataSerializer() {} + + @CanIgnoreReturnValue + @Override + public MetricDataSerializer initialize(Collection data) { + marshaler.initialize(data); + return this; + } + + @Override + public void writeBinaryTo(OutputStream output) throws IOException { + ProtobufTools.writeRawVarint32(marshaler.getBinarySerializedSize(), output); + marshaler.writeBinaryTo(output); + } + + @Override + public int getBinarySerializedSize() { + return marshaler.getBinarySerializedSize(); } @Override - public byte[] serialize(Collection metricData) { - MetricsData proto = ProtoMetricsDataMapper.getInstance().toProto(metricData); - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - int size = MetricsData.ADAPTER.encodedSize(proto); - ProtobufTools.writeRawVarint32(size, out); - proto.encode(out); - return out.toByteArray(); - } catch (IOException e) { - throw new IllegalStateException(e); - } + public void reset() { + marshaler.reset(); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SignalSerializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SignalSerializer.java index c7d7e5c8c..4c306ceb7 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SignalSerializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SignalSerializer.java @@ -8,21 +8,29 @@ import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.trace.data.SpanData; +import java.io.IOException; +import java.io.OutputStream; import java.util.Collection; public interface SignalSerializer { static SignalSerializer ofSpans() { - return SpanDataSerializer.getInstance(); + return new SpanDataSerializer(); } static SignalSerializer ofMetrics() { - return MetricDataSerializer.getInstance(); + return new MetricDataSerializer(); } static SignalSerializer ofLogs() { - return LogRecordDataSerializer.getInstance(); + return new LogRecordDataSerializer(); } - byte[] serialize(Collection items); + SignalSerializer initialize(Collection data); + + void writeBinaryTo(OutputStream output) throws IOException; + + int getBinarySerializedSize(); + + void reset(); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SpanDataSerializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SpanDataSerializer.java index 5a26426db..6e3276231 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SpanDataSerializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SpanDataSerializer.java @@ -5,33 +5,41 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers; -import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.ProtoSpansDataMapper; +import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.contrib.disk.buffering.internal.utils.ProtobufTools; -import io.opentelemetry.proto.trace.v1.TracesData; +import io.opentelemetry.exporter.internal.otlp.traces.LowAllocationTraceRequestMarshaler; import io.opentelemetry.sdk.trace.data.SpanData; -import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.util.Collection; public final class SpanDataSerializer implements SignalSerializer { - private static final SpanDataSerializer INSTANCE = new SpanDataSerializer(); - private SpanDataSerializer() {} + private final LowAllocationTraceRequestMarshaler marshaler = + new LowAllocationTraceRequestMarshaler(); - static SpanDataSerializer getInstance() { - return INSTANCE; + SpanDataSerializer() {} + + @CanIgnoreReturnValue + @Override + public SpanDataSerializer initialize(Collection data) { + marshaler.initialize(data); + return this; + } + + @Override + public void writeBinaryTo(OutputStream output) throws IOException { + ProtobufTools.writeRawVarint32(marshaler.getBinarySerializedSize(), output); + marshaler.writeBinaryTo(output); + } + + @Override + public int getBinarySerializedSize() { + return marshaler.getBinarySerializedSize(); } @Override - public byte[] serialize(Collection spanData) { - TracesData proto = ProtoSpansDataMapper.getInstance().toProto(spanData); - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - int size = TracesData.ADAPTER.encodedSize(proto); - ProtobufTools.writeRawVarint32(size, out); - proto.encode(out); - return out.toByteArray(); - } catch (IOException e) { - throw new IllegalStateException(e); - } + public void reset() { + marshaler.reset(); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java index 73a263490..86b5284ca 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java @@ -8,6 +8,7 @@ import static java.util.logging.Level.WARNING; import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.ReadableFile; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.WritableFile; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; @@ -49,14 +50,14 @@ public boolean isDebugEnabled() { /** * Attempts to write an item into a writable file. * - * @param item - The data that would be appended to the file. + * @param marshaler - The data that would be appended to the file. * @throws IOException If an unexpected error happens. */ - public boolean write(byte[] item) throws IOException { - return write(item, 1); + public boolean write(SignalSerializer marshaler) throws IOException { + return write(marshaler, 1); } - private boolean write(byte[] item, int attemptNumber) throws IOException { + private boolean write(SignalSerializer marshaler, int attemptNumber) throws IOException { if (isClosed.get()) { logger.log("Refusing to write to storage after being closed."); return false; @@ -69,11 +70,11 @@ private boolean write(byte[] item, int attemptNumber) throws IOException { writableFile = folderManager.createWritableFile(); logger.log("Created new writableFile: " + writableFile); } - WritableResult result = writableFile.append(item); + WritableResult result = writableFile.append(marshaler); if (result != WritableResult.SUCCEEDED) { // Retry with new file writableFile = null; - return write(item, ++attemptNumber); + return write(marshaler, ++attemptNumber); } return true; } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java index 0f3d1d475..ce4e87ddf 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java @@ -8,6 +8,7 @@ import static io.opentelemetry.contrib.disk.buffering.internal.storage.util.ClockBuddy.nowMillis; import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult; import io.opentelemetry.sdk.common.Clock; import java.io.File; @@ -43,9 +44,9 @@ public WritableFile( * reached the configured max size, the file stream is closed with the contents available in the * buffer before attempting to append the new data. * - * @param data - The new data line to add. + * @param marshaler - The new data line to add. */ - public synchronized WritableResult append(byte[] data) throws IOException { + public synchronized WritableResult append(SignalSerializer marshaler) throws IOException { if (isClosed.get()) { return WritableResult.FAILED; } @@ -53,12 +54,12 @@ public synchronized WritableResult append(byte[] data) throws IOException { close(); return WritableResult.FAILED; } - int futureSize = size + data.length; + int futureSize = size + marshaler.getBinarySerializedSize(); if (futureSize > configuration.getMaxFileSize()) { close(); return WritableResult.FAILED; } - out.write(data); + marshaler.writeBinaryTo(out); size = futureSize; return WritableResult.SUCCEEDED; } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java index ae503ecdf..2ea0d2b8a 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java @@ -86,7 +86,7 @@ private static List writeSomeSpans(Storage storage) throws Exception { SpanData span2 = makeSpan2(TraceFlags.getSampled(), now); List spans = Arrays.asList(span1, span2); - storage.write(SignalSerializer.ofSpans().serialize(spans)); + storage.write(SignalSerializer.ofSpans().initialize(spans)); storage.flush(); return spans; } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java index f7b6e3ff6..0a98061ac 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java @@ -5,8 +5,8 @@ package io.opentelemetry.contrib.disk.buffering.internal.exporter; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -30,8 +30,6 @@ class ToDiskExporterTest { private final List records = Arrays.asList("one", "two", "three"); - private final byte[] serialized = "one,two,three".getBytes(UTF_8); - @Mock private SignalSerializer serializer; @Mock private Storage storage; @@ -50,21 +48,20 @@ void setup() { return exportFnResultToReturn.get(); }; toDiskExporter = new ToDiskExporter<>(serializer, exportFn, storage); - when(serializer.serialize(records)).thenReturn(serialized); } @Test void whenWritingSucceedsOnExport_returnSuccessfulResultCode() throws Exception { - when(storage.write(serialized)).thenReturn(true); + when(storage.write(any())).thenReturn(true); CompletableResultCode completableResultCode = toDiskExporter.export(records); assertThat(completableResultCode.isSuccess()).isTrue(); - verify(storage).write(serialized); + verify(storage).write(any()); assertThat(exportedFnSeen).isNull(); } @Test void whenWritingFailsOnExport_doExportRightAway() throws Exception { - when(storage.write(serialized)).thenReturn(false); + when(storage.write(any())).thenReturn(false); exportFnResultToReturn.set(CompletableResultCode.ofSuccess()); CompletableResultCode completableResultCode = toDiskExporter.export(records); @@ -75,7 +72,7 @@ void whenWritingFailsOnExport_doExportRightAway() throws Exception { @Test void whenExceptionInWrite_doExportRightAway() throws Exception { - when(storage.write(serialized)).thenThrow(new IOException("boom")); + when(storage.write(any())).thenThrow(new IOException("boom")); exportFnResultToReturn.set(CompletableResultCode.ofFailure()); CompletableResultCode completableResultCode = toDiskExporter.export(records); diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/ProtoLogsDataMapperTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/ProtoLogsDataMapperTest.java index 26c73502e..9a5d93cd8 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/ProtoLogsDataMapperTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/ProtoLogsDataMapperTest.java @@ -12,8 +12,8 @@ import io.opentelemetry.api.logs.Severity; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.models.LogRecordDataImpl; import io.opentelemetry.contrib.disk.buffering.testutils.TestData; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.logs.v1.LogRecord; -import io.opentelemetry.proto.logs.v1.LogsData; import io.opentelemetry.proto.logs.v1.ResourceLogs; import io.opentelemetry.proto.logs.v1.ScopeLogs; import io.opentelemetry.sdk.logs.data.LogRecordData; @@ -104,7 +104,7 @@ class ProtoLogsDataMapperTest { void verifyConversionDataStructure() { List signals = Collections.singletonList(LOG_RECORD); - LogsData result = mapToProto(signals); + ExportLogsServiceRequest result = mapToProto(signals); List resourceLogsList = result.resource_logs; assertEquals(1, resourceLogsList.size()); @@ -118,7 +118,7 @@ void verifyConversionDataStructure() { void verifyMultipleLogsWithSameResourceAndScope() { List signals = Arrays.asList(LOG_RECORD, OTHER_LOG_RECORD); - LogsData proto = mapToProto(signals); + ExportLogsServiceRequest proto = mapToProto(signals); List resourceLogsList = proto.resource_logs; assertEquals(1, resourceLogsList.size()); @@ -139,7 +139,7 @@ void verifyMultipleLogsWithSameResourceDifferentScope() { List signals = Arrays.asList(LOG_RECORD, LOG_RECORD_WITH_DIFFERENT_SCOPE_SAME_RESOURCE); - LogsData proto = mapToProto(signals); + ExportLogsServiceRequest proto = mapToProto(signals); List resourceLogsList = proto.resource_logs; assertEquals(1, resourceLogsList.size()); @@ -159,7 +159,7 @@ void verifyMultipleLogsWithSameResourceDifferentScope() { void verifyMultipleLogsWithDifferentResource() { List signals = Arrays.asList(LOG_RECORD, LOG_RECORD_WITH_DIFFERENT_RESOURCE); - LogsData proto = mapToProto(signals); + ExportLogsServiceRequest proto = mapToProto(signals); List resourceLogsList = proto.resource_logs; assertEquals(2, resourceLogsList.size()); @@ -183,7 +183,7 @@ void verifyMultipleLogsWithDifferentResource() { void verifyLogWithEventName() { List signals = Collections.singletonList(LOG_RECORD_WITH_EVENT_NAME); - LogsData result = mapToProto(signals); + ExportLogsServiceRequest result = mapToProto(signals); List resourceLogsList = result.resource_logs; LogRecord firstLog = resourceLogsList.get(0).scope_logs.get(0).log_records.get(0); @@ -192,11 +192,11 @@ void verifyLogWithEventName() { assertThat(mapFromProto(result)).containsExactlyInAnyOrderElementsOf(signals); } - private static LogsData mapToProto(Collection signals) { + private static ExportLogsServiceRequest mapToProto(Collection signals) { return ProtoLogsDataMapper.getInstance().toProto(signals); } - private static List mapFromProto(LogsData protoData) { + private static List mapFromProto(ExportLogsServiceRequest protoData) { return ProtoLogsDataMapper.getInstance().fromProto(protoData); } } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapperTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapperTest.java index b45e9c9e7..59d369704 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapperTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapperTest.java @@ -10,8 +10,8 @@ import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.contrib.disk.buffering.testutils.TestData; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.metrics.v1.Metric; -import io.opentelemetry.proto.metrics.v1.MetricsData; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; import io.opentelemetry.proto.metrics.v1.ScopeMetrics; import io.opentelemetry.sdk.metrics.data.MetricData; @@ -30,7 +30,7 @@ void verifyConversionDataStructure() { MetricData expectedGauge1 = TestData.makeLongGauge(TraceFlags.getSampled()); List expectedSignals = Collections.singletonList(expectedGauge1); - MetricsData proto = mapToProto(signals); + ExportMetricsServiceRequest proto = mapToProto(signals); List resourceMetrics = proto.resource_metrics; assertEquals(1, resourceMetrics.size()); @@ -49,7 +49,7 @@ void verifyMultipleMetricsWithSameResourceAndScope() { MetricData expectedGauge2 = TestData.makeLongGauge(TraceFlags.getSampled()); List expectedSignals = Arrays.asList(expectedGauge1, expectedGauge2); - MetricsData proto = mapToProto(signals); + ExportMetricsServiceRequest proto = mapToProto(signals); List resourceMetrics = proto.resource_metrics; assertEquals(1, resourceMetrics.size()); @@ -78,7 +78,7 @@ void verifyMultipleMetricsWithSameResourceDifferentScope() { List signals = Arrays.asList(gauge1, gauge2); List expectedSignals = Arrays.asList(expectedGauge1, expectedGauge2); - MetricsData proto = mapToProto(signals); + ExportMetricsServiceRequest proto = mapToProto(signals); List resourceMetrics = proto.resource_metrics; assertEquals(1, resourceMetrics.size()); @@ -113,7 +113,7 @@ void verifyMultipleMetricsWithDifferentResource() { // , LONG_GAUGE_METRIC_WITH_DIFFERENT_RESOURCE); // List expectedSignals = Arrays.asList(expected); - MetricsData proto = mapToProto(signals); + ExportMetricsServiceRequest proto = mapToProto(signals); List resourceMetrics = proto.resource_metrics; assertEquals(2, resourceMetrics.size()); @@ -133,11 +133,11 @@ void verifyMultipleMetricsWithDifferentResource() { assertThat(mapFromProto(proto)).containsExactlyInAnyOrderElementsOf(expectedSignals); } - private static MetricsData mapToProto(Collection signals) { + private static ExportMetricsServiceRequest mapToProto(Collection signals) { return ProtoMetricsDataMapper.getInstance().toProto(signals); } - private static List mapFromProto(MetricsData protoData) { + private static List mapFromProto(ExportMetricsServiceRequest protoData) { return ProtoMetricsDataMapper.getInstance().fromProto(protoData); } } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/spans/ProtoSpansDataMapperTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/spans/ProtoSpansDataMapperTest.java index bdd9c053c..ca325496b 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/spans/ProtoSpansDataMapperTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/spans/ProtoSpansDataMapperTest.java @@ -11,10 +11,10 @@ import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.models.SpanDataImpl; import io.opentelemetry.contrib.disk.buffering.testutils.TestData; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.trace.v1.ResourceSpans; import io.opentelemetry.proto.trace.v1.ScopeSpans; import io.opentelemetry.proto.trace.v1.Span; -import io.opentelemetry.proto.trace.v1.TracesData; import io.opentelemetry.sdk.trace.data.EventData; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; @@ -116,7 +116,7 @@ class ProtoSpansDataMapperTest { void verifyConversionDataStructure() { List signals = Collections.singletonList(SPAN_DATA); - TracesData proto = mapToProto(signals); + ExportTraceServiceRequest proto = mapToProto(signals); List resourceSpans = proto.resource_spans; assertEquals(1, resourceSpans.size()); @@ -130,7 +130,7 @@ void verifyConversionDataStructure() { void verifyMultipleSpansWithSameResourceAndScope() { List signals = Arrays.asList(SPAN_DATA, OTHER_SPAN_DATA); - TracesData proto = mapToProto(signals); + ExportTraceServiceRequest proto = mapToProto(signals); List resourceSpans = proto.resource_spans; assertEquals(1, resourceSpans.size()); @@ -146,7 +146,7 @@ void verifyMultipleSpansWithSameResourceAndScope() { void verifyMultipleSpansWithSameResourceDifferentScope() { List signals = Arrays.asList(SPAN_DATA, SPAN_DATA_WITH_DIFFERENT_SCOPE_SAME_RESOURCE); - TracesData proto = mapToProto(signals); + ExportTraceServiceRequest proto = mapToProto(signals); List resourceSpans = proto.resource_spans; assertEquals(1, resourceSpans.size()); @@ -166,7 +166,7 @@ void verifyMultipleSpansWithSameResourceDifferentScope() { void verifyMultipleSpansWithDifferentResource() { List signals = Arrays.asList(SPAN_DATA, SPAN_DATA_WITH_DIFFERENT_RESOURCE); - TracesData proto = mapToProto(signals); + ExportTraceServiceRequest proto = mapToProto(signals); List resourceSpans = proto.resource_spans; assertEquals(2, resourceSpans.size()); @@ -186,11 +186,11 @@ void verifyMultipleSpansWithDifferentResource() { assertThat(mapFromProto(proto)).containsExactlyInAnyOrderElementsOf(signals); } - private static TracesData mapToProto(Collection signals) { + private static ExportTraceServiceRequest mapToProto(Collection signals) { return ProtoSpansDataMapper.getInstance().toProto(signals); } - private static List mapFromProto(TracesData protoData) { + private static List mapFromProto(ExportTraceServiceRequest protoData) { return ProtoSpansDataMapper.getInstance().fromProto(protoData); } } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/ByteArraySerializer.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/ByteArraySerializer.java new file mode 100644 index 000000000..7ad446729 --- /dev/null +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/ByteArraySerializer.java @@ -0,0 +1,39 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers; + +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collection; + +public final class ByteArraySerializer implements SignalSerializer { + + private final byte[] data; + + public ByteArraySerializer(byte[] data) { + this.data = data; + } + + @CanIgnoreReturnValue + @Override + public SignalSerializer initialize(Collection data) { + return null; + } + + @Override + public void writeBinaryTo(OutputStream output) throws IOException { + output.write(data); + } + + @Override + public int getBinarySerializedSize() { + return data.length; + } + + @Override + public void reset() {} +} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManagerTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManagerTest.java index 5b72c29e8..ad994c38d 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManagerTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManagerTest.java @@ -18,6 +18,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.ByteArraySerializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.ReadableFile; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.WritableFile; import io.opentelemetry.sdk.common.Clock; @@ -83,7 +84,7 @@ void closeCurrentlyWritableFile_whenItIsReadyToBeRead_andNoOtherReadableFilesAre when(clock.now()).thenReturn(MILLISECONDS.toNanos(createdFileTime)); WritableFile writableFile = folderManager.createWritableFile(); - writableFile.append(new byte[3]); + writableFile.append(new ByteArraySerializer(new byte[3])); when(clock.now()) .thenReturn(MILLISECONDS.toNanos(createdFileTime + MIN_FILE_AGE_FOR_READ_MILLIS)); diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java index 7f134afdc..d96b9a1bc 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java @@ -17,6 +17,7 @@ import static org.mockito.Mockito.when; import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.ByteArraySerializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.ReadableFile; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.WritableFile; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; @@ -87,7 +88,7 @@ void whenReadingMultipleTimes_reuseReader() throws IOException { @Test void whenWritingMultipleTimes_reuseWriter() throws IOException { - byte[] data = new byte[1]; + ByteArraySerializer data = new ByteArraySerializer(new byte[1]); WritableFile anotherWriter = createWritableFile(); when(folderManager.createWritableFile()).thenReturn(writableFile).thenReturn(anotherWriter); @@ -108,7 +109,7 @@ void whenAttemptingToReadAfterClosed_returnFailed() throws IOException { @Test void whenAttemptingToWriteAfterClosed_returnFalse() throws IOException { storage.close(); - assertFalse(storage.write(new byte[1])); + assertFalse(storage.write(new ByteArraySerializer(new byte[1]))); } @Test @@ -159,7 +160,7 @@ void whenEveryNewFileFoundCannotBeRead_returnContentNotAvailable() throws IOExce @Test void appendDataToFile() throws IOException { when(folderManager.createWritableFile()).thenReturn(writableFile); - byte[] data = new byte[1]; + ByteArraySerializer data = new ByteArraySerializer(new byte[1]); storage.write(data); @@ -168,7 +169,7 @@ void appendDataToFile() throws IOException { @Test void whenWritingTimeoutHappens_retryWithNewFile() throws IOException { - byte[] data = new byte[1]; + ByteArraySerializer data = new ByteArraySerializer(new byte[1]); WritableFile workingWritableFile = createWritableFile(); when(folderManager.createWritableFile()) .thenReturn(writableFile) @@ -182,7 +183,7 @@ void whenWritingTimeoutHappens_retryWithNewFile() throws IOException { @Test void whenThereIsNoSpaceAvailableForWriting_retryWithNewFile() throws IOException { - byte[] data = new byte[1]; + ByteArraySerializer data = new ByteArraySerializer(new byte[1]); WritableFile workingWritableFile = createWritableFile(); when(folderManager.createWritableFile()) .thenReturn(writableFile) @@ -196,7 +197,7 @@ void whenThereIsNoSpaceAvailableForWriting_retryWithNewFile() throws IOException @Test void whenWritingResourceIsClosed_retryWithNewFile() throws IOException { - byte[] data = new byte[1]; + ByteArraySerializer data = new ByteArraySerializer(new byte[1]); WritableFile workingWritableFile = createWritableFile(); when(folderManager.createWritableFile()) .thenReturn(writableFile) @@ -210,7 +211,7 @@ void whenWritingResourceIsClosed_retryWithNewFile() throws IOException { @Test void whenEveryAttemptToWriteFails_returnFalse() throws IOException { - byte[] data = new byte[1]; + ByteArraySerializer data = new ByteArraySerializer(new byte[1]); when(folderManager.createWritableFile()).thenReturn(writableFile); when(writableFile.append(data)).thenReturn(WritableResult.FAILED); @@ -223,7 +224,7 @@ void whenEveryAttemptToWriteFails_returnFalse() throws IOException { void whenClosing_closeWriterAndReaderIfNotNull() throws IOException { when(folderManager.createWritableFile()).thenReturn(writableFile); when(folderManager.getReadableFile()).thenReturn(readableFile); - storage.write(new byte[1]); + storage.write(new ByteArraySerializer(new byte[1])); storage.readAndProcess(processing); storage.close(); diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java index a94b5fb3d..a9d0eb5da 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java @@ -30,6 +30,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import org.junit.jupiter.api.AfterEach; @@ -105,14 +106,12 @@ void tearDown() throws IOException { } private static void addFileContents(File source) throws IOException { - List items = new ArrayList<>(); - items.add(SERIALIZER.serialize(Collections.singleton(FIRST_LOG_RECORD))); - items.add(SERIALIZER.serialize(Collections.singleton(SECOND_LOG_RECORD))); - items.add(SERIALIZER.serialize(Collections.singleton(THIRD_LOG_RECORD))); - try (FileOutputStream out = new FileOutputStream(source)) { - for (byte[] item : items) { - out.write(item); + for (LogRecordData item : + Arrays.asList(FIRST_LOG_RECORD, SECOND_LOG_RECORD, THIRD_LOG_RECORD)) { + SERIALIZER.initialize(Collections.singleton(item)); + SERIALIZER.writeBinaryTo(out); + SERIALIZER.reset(); } } } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFileTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFileTest.java index 3df37eb4c..cae1e9f64 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFileTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFileTest.java @@ -14,6 +14,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.ByteArraySerializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.TestData; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult; import io.opentelemetry.sdk.common.Clock; @@ -71,8 +72,8 @@ void hasExpired_whenWriteAgeHasExpired() { void appendDataInNewLines_andIncreaseSize() throws IOException { byte[] line1 = getByteArrayLine("First line"); byte[] line2 = getByteArrayLine("Second line"); - writableFile.append(line1); - writableFile.append(line2); + writableFile.append(new ByteArraySerializer(line1)); + writableFile.append(new ByteArraySerializer(line2)); writableFile.close(); List lines = getWrittenLines(); @@ -85,9 +86,11 @@ void appendDataInNewLines_andIncreaseSize() throws IOException { @Test void whenAppendingData_andNotEnoughSpaceIsAvailable_closeAndReturnFailed() throws IOException { - assertEquals(WritableResult.SUCCEEDED, writableFile.append(new byte[MAX_FILE_SIZE])); + assertEquals( + WritableResult.SUCCEEDED, + writableFile.append(new ByteArraySerializer(new byte[MAX_FILE_SIZE]))); - assertEquals(WritableResult.FAILED, writableFile.append(new byte[1])); + assertEquals(WritableResult.FAILED, writableFile.append(new ByteArraySerializer(new byte[1]))); assertEquals(1, getWrittenLines().size()); assertEquals(MAX_FILE_SIZE, writableFile.getSize()); @@ -95,21 +98,21 @@ void whenAppendingData_andNotEnoughSpaceIsAvailable_closeAndReturnFailed() throw @Test void whenAppendingData_andHasExpired_closeAndReturnExpiredStatus() throws IOException { - writableFile.append(new byte[2]); + writableFile.append(new ByteArraySerializer(new byte[2])); when(clock.now()) .thenReturn(MILLISECONDS.toNanos(CREATED_TIME_MILLIS + MAX_FILE_AGE_FOR_WRITE_MILLIS)); - assertEquals(WritableResult.FAILED, writableFile.append(new byte[1])); + assertEquals(WritableResult.FAILED, writableFile.append(new ByteArraySerializer(new byte[1]))); assertEquals(1, getWrittenLines().size()); } @Test void whenAppendingData_andIsAlreadyClosed_returnFailedStatus() throws IOException { - writableFile.append(new byte[1]); + writableFile.append(new ByteArraySerializer(new byte[1])); writableFile.close(); - assertEquals(WritableResult.FAILED, writableFile.append(new byte[2])); + assertEquals(WritableResult.FAILED, writableFile.append(new ByteArraySerializer(new byte[2]))); } private static byte[] getByteArrayLine(String line) { diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/testutils/BaseSignalSerializerTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/testutils/BaseSignalSerializerTest.java index adfc8fb2f..69186f812 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/testutils/BaseSignalSerializerTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/testutils/BaseSignalSerializerTest.java @@ -12,6 +12,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.DelimitedProtoStreamReader; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.StreamReader; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Arrays; import java.util.List; @@ -20,7 +21,17 @@ @SuppressWarnings("unchecked") public abstract class BaseSignalSerializerTest { protected byte[] serialize(SIGNAL_SDK_ITEM... items) { - return getSerializer().serialize(Arrays.asList(items)); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + SignalSerializer serializer = getSerializer(); + try { + serializer.initialize(Arrays.asList(items)); + serializer.writeBinaryTo(byteArrayOutputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + serializer.reset(); + } + return byteArrayOutputStream.toByteArray(); } protected List deserialize(byte[] source) {