Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ lazy val scala3Projects: Seq[ProjectReference] = Seq(
sprayjson,
ziojson,
clientsttp,
`client-zio-http`,
akkastreams,
pekkostreams,
reactivestreamsakka,
Expand Down Expand Up @@ -344,6 +345,12 @@ lazy val clienthttp4s = (project in file("elastic4s-client-http4s"))
.settings(scala3Settings)
.settings(libraryDependencies ++= Seq(http4sClient, http4sEmberClient % Test))

lazy val `client-zio-http` = (project in file("elastic4s-client-zio-http"))
.dependsOn(core, zio, testkit % "test")
.settings(name := "elastic4s-client-zio-http")
.settings(scala3Settings)
.settings(libraryDependencies ++= Seq(Dependencies.zioHttp, Dependencies.zio))

lazy val tests = (project in file("elastic4s-tests"))
.settings(name := "elastic4s-tests")
.dependsOn(core, jackson, testkit % Test)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.sksamuel.elastic4s.ziohttp

import com.sksamuel.elastic4s
import com.sksamuel.elastic4s.HttpEntity.{ByteArrayEntity, FileEntity, StringEntity}
import com.sksamuel.elastic4s.{ElasticRequest, HttpEntity, HttpResponse}
import zio.http.Header.ContentType
import zio.http.codec.TextBinaryCodec.fromSchema
import zio.http.{Body, Client, Header, Headers, Path, QueryParams, Request, Scheme, URL}
import zio.{Task, UIO, ZIO}

import java.nio.charset.Charset
import java.nio.charset.StandardCharsets.UTF_8

object ZIOHttpClient {
def apply(client: Client, endpoint: elastic4s.ElasticNodeEndpoint): elastic4s.HttpClient[Task] =
new ZIOHttpClient(
client = client,
baseUrl = URL(
kind =
URL.Location.Absolute(
Scheme.decode(endpoint.protocol).getOrElse(Scheme.HTTP),
endpoint.host,
Some(endpoint.port)
),
path = endpoint.prefix.map(Path(_)).getOrElse(Path.empty)
)
)
}

class ZIOHttpClient(client: Client, baseUrl: URL) extends elastic4s.HttpClient[Task] {
private def charsetFromString(s: String): Charset =
Charset.availableCharsets().getOrDefault(s, UTF_8)

private def makeBody(e: elastic4s.HttpEntity): UIO[Body] = e match {
case StringEntity(content, None) => ZIO.succeed(Body.fromString(content, UTF_8))
case StringEntity(content, Some(contentCharset)) =>
ZIO.succeed(Body.fromString(content, charsetFromString(contentCharset)))
case HttpEntity.InputStreamEntity(content, _) =>
ZIO.succeed(Body.fromStream(zio.stream.ZStream.fromInputStream(content)))
case FileEntity(content, _) => Body.fromFile(content)
case ByteArrayEntity(content, _) => ZIO.succeed(Body.fromArray(content))
}

private def getContentType(entity: elastic4s.HttpEntity) =
entity.contentCharset.flatMap(v =>
Header.ContentType.parse(v).map(h => Headers(h)).toOption
)

override def send(esRequest: elastic4s.ElasticRequest): Task[HttpResponse] = {
for {
body <- esRequest.entity.map(entity => makeBody(entity)).getOrElse(ZIO.succeed(Body.empty))
req = makeRequest(esRequest, body)
resp <- client.batched.request(req)
bytes <- resp.body.asChunk
} yield elastic4s.HttpResponse(
resp.status.code,
if (bytes.isEmpty) {
None
} else {
val contentCharsetOpt = resp.headers.get(ContentType).flatMap(_.charset)
Some(elastic4s.HttpEntity.StringEntity(
bytes.asString(contentCharsetOpt.getOrElse(UTF_8)),
contentCharsetOpt.map(_.name())
))
},
resp.headers.toSeq.map(h => h.headerName -> h.renderedValue).toMap
)
}

private def makeRequest(request: ElasticRequest, body: Body) = {
val contentType = request.entity.flatMap(getContentType).getOrElse(Headers.empty)
val path = request.endpoint.stripPrefix("/").split('/').foldLeft(Path.empty)((p, s) =>
p / java.net.URLDecoder.decode(s, UTF_8)
)
Request(
method = zio.http.Method.fromString(request.method),
headers = if (request.headers.nonEmpty) {
val seq = request.headers.toSeq
Headers(seq.head, seq.tail: _*) ++ contentType
} else {
contentType
},
url = baseUrl ++ URL(
path = path,
queryParams = if (request.params.isEmpty) {
QueryParams.empty
} else {
val paramsSeq = request.params.toSeq
QueryParams(paramsSeq.head, paramsSeq.tail: _*)
}
),
body = body
)
}

override def close(): Task[Unit] = ZIO.unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.sksamuel.elastic4s.ziohttp

import com.sksamuel.elastic4s.{CommonRequestOptions, ElasticClient, ElasticDsl, ElasticNodeEndpoint, Response}
import com.sksamuel.elastic4s.Authentication.UsernamePassword
import com.sksamuel.elastic4s.testkit.DockerTests.{elasticHost, elasticPort}
import com.sksamuel.elastic4s.zio.instances.taskFunctor
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import zio.http.Client
import zio.{Runtime, Task, TaskLayer, Unsafe, ZIO, ZLayer}

class ZIOHttpClientTest extends AnyFlatSpec with Matchers with ElasticDsl {
private val zioClientLayer: TaskLayer[ElasticClient[Task]] = Client.default >>> ZLayer.fromZIO {
ZIO.serviceWith[zio.http.Client](c =>
ElasticClient[Task](ZIOHttpClient(c, ElasticNodeEndpoint("http", elasticHost, elasticPort.toInt, None)))
)
}

private def runZIO[T](f: ElasticClient[Task] => Task[Response[T]]): Response[T] = {
Unsafe.unsafe { implicit unsafe =>
Runtime.default.unsafe.run(
ZIO.serviceWithZIO[ElasticClient[Task]](f).provideLayer(zioClientLayer)
).getOrThrowFiberFailure()
}
}

"ZIOHttpClient" should "be able to call elasticsearch" in {
runZIO(_.execute(serverInfo)).result.tagline shouldBe "You Know, for Search"
}

it should "be able to propagate auth headers if included" in {
implicit val options: CommonRequestOptions = CommonRequestOptions.defaults.copy(
authentication = UsernamePassword("user123", "pass123")
)

runZIO(_.execute(catHealth())).result.status shouldBe "401"
}

it should "be able index document with id properly" in {
val id = "id/:/test-id-1"
runZIO(_.execute(indexInto("testindex2").withId(id))).result.id shouldBe id
}

it should "support utf-8" in {
val id = "я-家"
runZIO(_.execute(
indexInto("testindex").withId(id).doc("""{ "text":"¡Hola я 家! ¿Qué tal?" }""")
)).result.result should (equal("created") or equal("updated"))
runZIO(_.execute(indexInto("testindex").withId(id))).result.id shouldBe id
}

it should "work with head methods" in {
runZIO(_.execute(
indexExists("unknown_index")
)).result.exists shouldBe false
}
}
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ object Dependencies {
val ZIO1Version = "1.0.18"
val ZIOVersion = "2.1.24"
val ZIOJsonVersion = "0.9.0"
val ZIOHttpVersion = "3.8.1"

lazy val commonDeps = Seq(
libraryDependencies ++= Seq(
Expand Down Expand Up @@ -77,6 +78,7 @@ object Dependencies {
lazy val sttp = "com.softwaremill.sttp.client3" %% "core" % SttpVersion
lazy val zioJson1 = "dev.zio" %% "zio-json" % ZIOJson1Version
lazy val zioJson = "dev.zio" %% "zio-json" % ZIOJsonVersion
lazy val zioHttp = "dev.zio" %% "zio-http" % ZIOHttpVersion
lazy val elasticsearchRestClientSniffer = "org.elasticsearch.client" % "elasticsearch-rest-client-sniffer" %
ElasticsearchVersion

Expand Down