Skip to content

Commit dd8713f

Browse files
authored
Merge pull request #929 from kaizo-com/deflate-compression
Datadog 'Deflate' compression
2 parents 9e99cd0 + 41c3420 commit dd8713f

File tree

5 files changed

+90
-6
lines changed

5 files changed

+90
-6
lines changed

reporters/kamon-datadog/src/main/resources/reference.conf

+3-2
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ kamon {
6262
connect-timeout = 5 seconds
6363
read-timeout = 5 seconds
6464
write-timeout = 5 seconds
65+
66+
# Use 'Deflate' compression when posting to the Datadog API
67+
compression = false
6568
}
6669

6770

@@ -111,5 +114,3 @@ kamon {
111114
}
112115
}
113116
}
114-
115-
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package kamon.datadog
2+
3+
import okhttp3.Interceptor.Chain
4+
import okhttp3.{Interceptor, MediaType, RequestBody, Response}
5+
import okio.{BufferedSink, DeflaterSink, Okio}
6+
7+
import java.util.zip.Deflater
8+
9+
class DeflateInterceptor extends Interceptor {
10+
11+
def intercept(chain: Chain): Response = {
12+
val originalRequest = chain.request
13+
14+
Option(originalRequest.body) match {
15+
case Some(body) if Option(originalRequest.header("Content-Encoding")).isEmpty =>
16+
val compressedRequest = originalRequest
17+
.newBuilder
18+
.header("Content-Encoding", "deflate")
19+
.method(
20+
originalRequest.method,
21+
encodeDeflate(body)
22+
)
23+
.build
24+
25+
chain.proceed(compressedRequest)
26+
case _ =>
27+
chain.proceed(originalRequest)
28+
}
29+
}
30+
31+
private def encodeDeflate(body: RequestBody): RequestBody = new RequestBody() {
32+
def contentType: MediaType = body.contentType
33+
34+
def writeTo(sink: BufferedSink): Unit = {
35+
val deflateSink = Okio.buffer(new DeflaterSink(sink, new Deflater()))
36+
body.writeTo(deflateSink)
37+
deflateSink.close
38+
}
39+
}
40+
}

reporters/kamon-datadog/src/main/scala/kamon/datadog/package.scala

+6-3
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ package object datadog {
3636
}
3737
}
3838

39-
private[datadog] case class HttpClient(apiUrl: String, apiKey: Option[String], usingAgent: Boolean, connectTimeout: Duration,
39+
private[datadog] case class HttpClient(apiUrl: String, apiKey: Option[String], usingCompression: Boolean, usingAgent: Boolean, connectTimeout: Duration,
4040
readTimeout: Duration, writeTimeout: Duration) {
4141

4242
val httpClient: OkHttpClient = createHttpClient()
@@ -45,6 +45,7 @@ package object datadog {
4545
this(
4646
config.getString("api-url"),
4747
if (usingAgent) None else Some(config.getString("api-key")),
48+
if (usingAgent) false else config.getBoolean("compression"),
4849
usingAgent,
4950
config.getDuration("connect-timeout"),
5051
config.getDuration("read-timeout"),
@@ -97,12 +98,14 @@ package object datadog {
9798

9899
// Apparently okhttp doesn't require explicit closing of the connection
99100
private def createHttpClient(): OkHttpClient = {
100-
new OkHttpClient.Builder()
101+
val builder = new OkHttpClient.Builder()
101102
.connectTimeout(connectTimeout.toMillis, TimeUnit.MILLISECONDS)
102103
.readTimeout(readTimeout.toMillis, TimeUnit.MILLISECONDS)
103104
.writeTimeout(writeTimeout.toMillis, TimeUnit.MILLISECONDS)
104105
.retryOnConnectionFailure(false)
105-
.build()
106+
107+
if (usingCompression) builder.addInterceptor(new DeflateInterceptor).build()
108+
else builder.build()
106109
}
107110
}
108111

reporters/kamon-datadog/src/test/resources/application.conf

+3
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,8 @@ kamon {
1818
include-instance = "no"
1919
include-host = "no"
2020
}
21+
api {
22+
compression = false
23+
}
2124
}
2225
}

reporters/kamon-datadog/src/test/scala/kamon/datadog/DatadogAPIReporterSpec.scala

+38-1
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
package kamon.datadog
22

33
import java.time.Instant
4-
54
import kamon.Kamon
65
import kamon.metric.Distribution.Percentile
76
import kamon.metric._
87
import kamon.module.ModuleFactory
98
import kamon.tag.TagSet
109
import kamon.testkit.Reconfigure
1110
import okhttp3.mockwebserver.MockResponse
11+
import okio.{Buffer, InflaterSource, Okio}
1212
import org.scalatest.Matchers
1313
import play.api.libs.json.Json
1414

15+
import java.util.zip.Inflater
1516
import scala.concurrent.ExecutionContext
1617

1718
class DatadogAPIReporterSpec extends AbstractHttpReporter with Matchers with Reconfigure {
@@ -20,10 +21,45 @@ class DatadogAPIReporterSpec extends AbstractHttpReporter with Matchers with Rec
2021
val reporter = new DatadogAPIReporterFactory().create(ModuleFactory.Settings(Kamon.config(), ExecutionContext.global))
2122
val now = Instant.ofEpochMilli(1523395554)
2223

24+
"sends metrics - compressed" in {
25+
val baseUrl = mockResponse("/test", new MockResponse().setStatus("HTTP/1.1 200 OK"))
26+
applyConfig("kamon.datadog.api.api-url = \"" + baseUrl + "\"")
27+
applyConfig("kamon.datadog.api.api-key = \"dummy\"")
28+
applyConfig("kamon.datadog.api.compression = true")
29+
reporter.reconfigure(Kamon.config())
30+
31+
reporter.reportPeriodSnapshot(
32+
PeriodSnapshot.apply(
33+
now.minusMillis(1000),
34+
now,
35+
MetricSnapshot.ofValues[Long](
36+
"test.counter",
37+
"test",
38+
Metric.Settings.ForValueInstrument(MeasurementUnit.none, java.time.Duration.ZERO),
39+
Instrument.Snapshot.apply(TagSet.of("tag1", "value1"), 0L) :: Nil
40+
) :: Nil,
41+
Nil,
42+
Nil,
43+
Nil,
44+
Nil
45+
)
46+
)
47+
48+
val request = server.takeRequest()
49+
50+
val decompressedBody = Okio.buffer(new InflaterSource(request.getBody.buffer(), new Inflater())).readByteString().utf8()
51+
52+
Json.parse(decompressedBody) shouldEqual Json
53+
.parse(
54+
"""{"series":[{"metric":"test.counter","interval":1,"points":[[1523394,0]],"type":"count","host":"test","tags":["env:staging","service:kamon-application","tag1:value1"]}]}"""
55+
)
56+
}
57+
2358
"sends counter metrics" in {
2459
val baseUrl = mockResponse("/test", new MockResponse().setStatus("HTTP/1.1 200 OK"))
2560
applyConfig("kamon.datadog.api.api-url = \"" + baseUrl + "\"")
2661
applyConfig("kamon.datadog.api.api-key = \"dummy\"")
62+
applyConfig("kamon.datadog.api.compression = false")
2763
reporter.reconfigure(Kamon.config())
2864

2965
reporter.reportPeriodSnapshot(
@@ -56,6 +92,7 @@ class DatadogAPIReporterSpec extends AbstractHttpReporter with Matchers with Rec
5692
val baseUrl = mockResponse("/test", new MockResponse().setStatus("HTTP/1.1 200 OK"))
5793
applyConfig("kamon.datadog.api.api-url = \"" + baseUrl + "\"")
5894
applyConfig("kamon.datadog.api.api-key = \"dummy\"")
95+
applyConfig("kamon.datadog.api.compression = false")
5996
reporter.reconfigure(Kamon.config())
6097

6198
val distribution = new Distribution {

0 commit comments

Comments
 (0)