Skip to content

Commit 3775973

Browse files
committed
deprecatre req and add streaming
1 parent df186d4 commit 3775973

File tree

8 files changed

+70
-35
lines changed

8 files changed

+70
-35
lines changed

zio-http-cli/src/main/scala/zio/http/endpoint/cli/Retriever.scala

+9-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package zio.http.endpoint.cli
22

33
import java.nio.file.Path
44

5+
import scala.annotation.nowarn
6+
57
import zio._
68

79
import zio.http._
@@ -27,10 +29,13 @@ private[cli] object Retriever {
2729
final case class URL(name: String, url: String, mediaType: MediaType) extends Retriever {
2830

2931
val request = Request.get(http.URL(http.Path.decode(url)))
30-
override def retrieve(): ZIO[Client, Throwable, FormField] = for {
31-
client <- ZIO.serviceWith[Client](_.batched)
32-
chunk <- client.request(request).flatMap(_.body.asChunk)
33-
} yield FormField.binaryField(name, chunk, mediaType)
32+
@nowarn("cat=deprecation")
33+
override def retrieve(): ZIO[Client, Throwable, FormField] = ZIO.scoped {
34+
for {
35+
client <- ZIO.serviceWith[Client](_.batched)
36+
chunk <- client.request(request).flatMap(_.body.asChunk)
37+
} yield FormField.binaryField(name, chunk, mediaType)
38+
}
3439
}
3540

3641
final case class File(name: String, path: Path, mediaType: MediaType) extends Retriever {

zio-http/jvm/src/test/scala/zio/http/ClientStreamingSpec.scala

+10-10
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ object ClientStreamingSpec extends RoutesRunnableSpec {
6262
for {
6363
port <- server(streamingServer)
6464
client <- ZIO.service[Client]
65-
response <- client.request(
65+
response <- client.batched(
6666
Request.get(URL.decode(s"http://localhost:$port/simple-get").toOption.get),
6767
)
6868
body <- response.body.asString
@@ -72,7 +72,7 @@ object ClientStreamingSpec extends RoutesRunnableSpec {
7272
for {
7373
port <- server(streamingServer)
7474
client <- ZIO.service[Client]
75-
response <- client.request(
75+
response <- client.streaming(
7676
Request.get(URL.decode(s"http://localhost:$port/streaming-get").toOption.get),
7777
)
7878
body <- response.body.asStream.chunks.map(chunk => new String(chunk.toArray)).runCollect
@@ -94,7 +94,7 @@ object ClientStreamingSpec extends RoutesRunnableSpec {
9494
port <- server(streamingServer)
9595
client <- ZIO.service[Client]
9696
response <- client
97-
.request(
97+
.batched(
9898
Request.post(
9999
URL.decode(s"http://localhost:$port/simple-post").toOption.get,
100100
Body.fromStreamChunked(
@@ -110,7 +110,7 @@ object ClientStreamingSpec extends RoutesRunnableSpec {
110110
port <- server(streamingServer)
111111
client <- ZIO.service[Client]
112112
response <- client
113-
.request(
113+
.streaming(
114114
Request.post(
115115
URL.decode(s"http://localhost:$port/streaming-echo").toOption.get,
116116
Body.fromStreamChunked(
@@ -146,7 +146,7 @@ object ClientStreamingSpec extends RoutesRunnableSpec {
146146
for {
147147
boundary <- Boundary.randomUUID
148148
response <- client
149-
.request(
149+
.batched(
150150
Request
151151
.post(
152152
URL.decode(s"http://localhost:$port/form").toOption.get,
@@ -178,8 +178,8 @@ object ClientStreamingSpec extends RoutesRunnableSpec {
178178
boundary <- Boundary.randomUUID
179179
stream = Form(fields.map(_._1): _*).multipartBytes(boundary)
180180
bytes <- stream.runCollect
181-
response <- client.batched
182-
.request(
181+
response <- client
182+
.batched(
183183
Request
184184
.post(
185185
URL.decode(s"http://localhost:$port/form").toOption.get,
@@ -219,7 +219,7 @@ object ClientStreamingSpec extends RoutesRunnableSpec {
219219
boundary <- Boundary.randomUUID
220220
stream = form.multipartBytes(boundary).rechunk(chunkSize)
221221
response <- client
222-
.request(
222+
.streaming(
223223
Request
224224
.post(
225225
URL.decode(s"http://localhost:$port/form").toOption.get,
@@ -242,7 +242,7 @@ object ClientStreamingSpec extends RoutesRunnableSpec {
242242
port <- server(streamingServer)
243243
client <- ZIO.service[Client]
244244
response <- client
245-
.request(
245+
.streaming(
246246
Request.post(
247247
URL.decode(s"http://localhost:$port/simple-post").toOption.get,
248248
Body.fromStreamChunked(ZStream.fail(new RuntimeException("Some error"))),
@@ -261,7 +261,7 @@ object ClientStreamingSpec extends RoutesRunnableSpec {
261261
client <- ZIO.service[Client]
262262
sync <- Promise.make[Nothing, Unit]
263263
response <- client
264-
.request(
264+
.streaming(
265265
Request.post(
266266
URL.decode(s"http://localhost:$port/streaming-echo").toOption.get,
267267
Body.fromStreamChunked(

zio-http/jvm/src/test/scala/zio/http/RequestStreamingServerSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ object RequestStreamingServerSpec extends RoutesRunnableSpec {
7878
val host = req.headers.get(Header.Host).get
7979
val newRequest =
8080
req.copy(url = req.url.path("/2").host(host.hostAddress).port(host.port.getOrElse(80)))
81-
ZIO.serviceWithZIO[Client](_.request(newRequest))
81+
ZIO.serviceWithZIO[Client](_.streaming(newRequest))
8282
},
8383
Method.POST / "2" -> handler { (req: Request) =>
8484
req.body.asChunk.map { body =>

zio-http/jvm/src/test/scala/zio/http/ZClientAspectSpec.scala

+18-14
Original file line numberDiff line numberDiff line change
@@ -86,20 +86,24 @@ object ZClientAspectSpec extends ZIOHttpSpec {
8686
annotations.head.contains("duration_ms"),
8787
),
8888
),
89-
test("followRedirects")(
90-
for {
91-
port <- Server.install(redir ++ routes)
92-
baseClient <- ZIO.service[Client]
93-
client = baseClient
94-
.url(
95-
URL(Path.empty, Location.Absolute(Scheme.HTTP, "localhost", Some(port))),
96-
)
97-
.batched @@ ZClientAspect.followRedirects(2)((resp, message) => ZIO.logInfo(message).as(resp))
98-
response <- client.request(Request.get(URL.empty / "redirect"))
99-
} yield assertTrue(
100-
extractStatus(response) == Status.Ok,
101-
),
102-
),
89+
test("followRedirects") {
90+
@nowarn("cat=deprecation")
91+
def followRedirectsTest = {
92+
for {
93+
port <- Server.install(redir ++ routes)
94+
baseClient <- ZIO.service[Client]
95+
client = baseClient
96+
.url(
97+
URL(Path.empty, Location.Absolute(Scheme.HTTP, "localhost", Some(port))),
98+
)
99+
.batched @@ ZClientAspect.followRedirects(2)((resp, message) => ZIO.logInfo(message).as(resp))
100+
response <- client.request(Request.get(URL.empty / "redirect"))
101+
} yield assertTrue(
102+
extractStatus(response) == Status.Ok,
103+
)
104+
}
105+
followRedirectsTest
106+
},
103107
).provide(
104108
ZLayer.succeed(Server.Config.default.onAnyOpenPort),
105109
Server.customized,

zio-http/jvm/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ object NettyStreamBodySpec extends RoutesRunnableSpec {
5454
}
5555

5656
def makeRequest(client: Client, port: Int) = client
57-
.request(
57+
.streaming(
5858
Request.get(URL.decode(s"http://localhost:$port/with-content-length").toOption.get),
5959
)
6060

zio-http/jvm/src/test/scala/zio/http/netty/client/NettyConnectionPoolSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ object NettyConnectionPoolSpec extends RoutesRunnableSpec {
259259
.toRoutes
260260
.deployAndRequest { client =>
261261
ZIO.foreachParDiscard(0 to 10) { _ =>
262-
client.batched.request(Request()).flatMap(_.body.asArray).repeatN(200)
262+
client.batched(Request()).flatMap(_.body.asArray).repeatN(200)
263263
}
264264
}(Request())
265265
.as(assertCompletes)

zio-http/shared/src/main/scala/zio/http/ZClient.scala

+29-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package zio.http
1818

1919
import java.net.{InetSocketAddress, URI}
2020

21+
import scala.annotation.nowarn
22+
2123
import zio._
2224
import zio.stacktracer.TracingImplicits.disableAutoTrace
2325

@@ -37,6 +39,7 @@ final case class ZClient[-Env, ReqEnv, -In, +Err, +Out](
3739
bodyDecoder: ZClient.BodyDecoder[Env, Err, Out],
3840
driver: ZClient.Driver[Env, ReqEnv, Err],
3941
) extends HeaderOps[ZClient[Env, ReqEnv, In, Err, Out]] { self =>
42+
@nowarn("cat=deprecation")
4043
def apply(request: Request)(implicit ev: Body <:< In, trace: Trace): ZIO[Env & ReqEnv, Err, Out] =
4144
self.request(request)
4245

@@ -175,6 +178,7 @@ final case class ZClient[-Env, ReqEnv, -In, +Err, +Out](
175178
): ZClient[Env, ReqEnv, In, Err2, Out] =
176179
transform(bodyEncoder.refineOrDie(pf), bodyDecoder.refineOrDie(pf), driver.refineOrDie(pf))
177180

181+
@deprecated("Use `batched` or `streaming` instead", since = "3.0.0")
178182
def request(request: Request)(implicit ev: Body <:< In, trace: Trace): ZIO[Env & ReqEnv, Err, Out] = {
179183
def makeRequest(body: Body) = {
180184
driver.request(
@@ -233,11 +237,33 @@ final case class ZClient[-Env, ReqEnv, -In, +Err, +Out](
233237
ev2: ReqEnv =:= Scope,
234238
): ZStream[R & Env, E0, A] = ZStream.unwrapScoped[R & Env] {
235239
self
236-
.request(request)
240+
.streaming(request)
237241
.asInstanceOf[ZIO[R & Env & Scope, Err, Out]]
238242
.fold(ZStream.fail(_), f)
239243
}
240244

245+
def streaming(
246+
request: Request,
247+
)(implicit ev: Body <:< In, trace: Trace, ev1: ReqEnv =:= Scope): ZIO[Env & ReqEnv, Err, Out] = {
248+
def makeRequest(body: Body) = {
249+
driver.request(
250+
self.version ++ request.version,
251+
request.method,
252+
self.url ++ request.url,
253+
self.headers ++ request.headers,
254+
body,
255+
sslConfig,
256+
proxy,
257+
)
258+
}
259+
if (bodyEncoder == ZClient.BodyEncoder.identity)
260+
bodyDecoder.decodeZIO(makeRequest(request.body))
261+
else
262+
bodyEncoder
263+
.encode(ev(request.body))
264+
.flatMap(body => bodyDecoder.decodeZIO(makeRequest(body)))
265+
}
266+
241267
def ssl(ssl: ClientSSLConfig): ZClient[Env, ReqEnv, In, Err, Out] =
242268
copy(sslConfig = Some(ssl))
243269

@@ -279,7 +305,7 @@ object ZClient extends ZClientPlatformSpecific {
279305
* memory, allowing to stream response bodies
280306
*/
281307
def batched(request: Request)(implicit trace: Trace): ZIO[Client, Throwable, Response] =
282-
ZIO.serviceWithZIO[Client](_.batched.request(request))
308+
ZIO.serviceWithZIO[Client](_.batched(request))
283309

284310
def fromDriver[Env, ReqEnv, Err](driver: Driver[Env, ReqEnv, Err]): ZClient[Env, ReqEnv, Body, Err, Response] =
285311
ZClient(
@@ -312,7 +338,7 @@ object ZClient extends ZClientPlatformSpecific {
312338
* request's resources (i.e., `Scope`)
313339
*/
314340
def streaming(request: Request)(implicit trace: Trace): ZIO[Client & Scope, Throwable, Response] =
315-
ZIO.serviceWithZIO[Client](_.request(request))
341+
ZIO.serviceWithZIO[Client](_.batched(request))
316342

317343
/**
318344
* Executes an HTTP request, and transforms the response to a `ZStream` using

zio-http/shared/src/main/scala/zio/http/endpoint/internal/EndpointClient.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private[endpoint] final case class EndpointClient[P, I, E, O, A <: AuthType](
6262
for {
6363
authInput <- authProvider
6464
config <- CodecConfig.codecRef.get
65-
response <- client.request(withDefaultAcceptHeader(config, authInput)).orDie
65+
response <- client.batched(withDefaultAcceptHeader(config, authInput)).orDie
6666
} yield response
6767

6868
requested.flatMap { response =>

0 commit comments

Comments
 (0)