Skip to content

Commit

Permalink
7.0.0: Update fs2 and http4s
Browse files Browse the repository at this point in the history
Not assigning yet since this depends on slamdata/quasar#3874 and on the corresponding `sdbe` updates.

[ch2728]
  • Loading branch information
djspiewak committed Oct 25, 2018
2 parents 32491b8 + 358386c commit 752a49e
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package quasar.physical.s3


import quasar.Disposable
import quasar.api.datasource.DatasourceError
import quasar.api.datasource.DatasourceError.InitializationError
Expand All @@ -26,10 +25,12 @@ import quasar.connector.Datasource
import quasar.connector.LightweightDatasourceModule
import quasar.connector.MonadResourceErr

import scala.concurrent.ExecutionContext

import argonaut.{EncodeJson, Json}
import cats.effect.{ConcurrentEffect, Timer}
import cats.effect.{ConcurrentEffect, ContextShift, Timer}
import fs2.Stream
import org.http4s.client.blaze.Http1Client
import org.http4s.client.blaze.BlazeClientBuilder
import scalaz.{\/, NonEmptyList}
import scalaz.syntax.either._
import cats.syntax.applicative._
Expand All @@ -41,16 +42,20 @@ import slamdata.Predef.{Stream => _, _}
object S3DatasourceModule extends LightweightDatasourceModule {
def kind: DatasourceType = s3.datasourceKind

def lightweightDatasource[F[_]: ConcurrentEffect: MonadResourceErr: Timer](config: Json)
: F[InitializationError[Json] \/ Disposable[F, Datasource[F, Stream[F, ?], ResourcePath]]] = {
def lightweightDatasource[F[_]: ConcurrentEffect: ContextShift: MonadResourceErr: Timer](
config: Json)(implicit ec: ExecutionContext)
: F[InitializationError[Json] \/ Disposable[F, Datasource[F, Stream[F, ?], ResourcePath]]] =
config.as[S3Config].result match {
case Right(s3Config) => {
Http1Client[F]() flatMap { client =>
val s3Ds = new S3Datasource[F](client, s3Config)
case Right(s3Config) =>
val clientResource = BlazeClientBuilder[F](ec).resource
val c = s3.resourceToDisposable(clientResource)

c.flatMap { client =>
val s3Ds = new S3Datasource[F](client.unsafeValue, s3Config)
val ds: Datasource[F, Stream[F, ?], ResourcePath] = s3Ds

s3Ds.isLive.ifM({
Disposable(ds, client.shutdown).right.pure[F]
Disposable(ds, client.dispose).right.pure[F]
},
{
val msg = "Unable to ListObjects at the root of the bucket"
Expand All @@ -60,14 +65,12 @@ object S3DatasourceModule extends LightweightDatasourceModule {
.left.pure[F]
})
}
}

case Left((msg, _)) =>
DatasourceError
.invalidConfiguration[Json, InitializationError[Json]](kind, config, NonEmptyList(msg))
.left.pure[F]
}
}

def sanitizeConfig(config: Json): Json = {
val redactedCreds =
Expand All @@ -77,6 +80,7 @@ object S3DatasourceModule extends LightweightDatasourceModule {
Region("<REDACTED>"))

config.as[S3Config].result.toOption.map((c: S3Config) =>
// ignore the existing credentials and replace them with redactedCreds
c.credentials.fold(c)(_ => c.copy(credentials = redactedCreds.some)))
.fold(config)(rc => EncodeJson.of[S3Config].encode(rc))
}
Expand Down
32 changes: 14 additions & 18 deletions datasource/src/main/scala/quasar/physical/s3/impl/evaluate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import quasar.connector.{MonadResourceErr, ResourceError}
import quasar.contrib.pathy._
import quasar.physical.s3.S3JsonParsing


import cats.effect.{Effect, Sync}
import cats.syntax.applicative._
import cats.syntax.functor._
import cats.syntax.flatMap._
import cats.ApplicativeError

import fs2.{Pipe, Stream}
import jawn.{Facade, ParseException}
Expand Down Expand Up @@ -66,7 +66,7 @@ object evaluate {

////

private def parse[F[_], R: Facade](jsonParsing: S3JsonParsing)
private def parse[F[_]: ApplicativeError[?[_], Throwable], R: Facade](jsonParsing: S3JsonParsing)
: Pipe[F, ByteBuffer, R] =
jsonParsing match {
case S3JsonParsing.JsonArray => unwrapJsonArray[F, ByteBuffer, R]
Expand All @@ -89,23 +89,19 @@ object evaluate {
msg)
}

// There is no method in http4s 0.16.6a that does what we want here, so
// we have to implement it ourselves. What we want specifically is to
// make an HTTP request, take the response, if it's a 404 raise
// ResourceError.PathNotFound. If the request succeeds we
// compute an fs2 stream from it using `f` and then call `dispose` on
// that response once we've finished streaming.
private def streamRequest[F[_]: Sync: MonadResourceErr, A](
client: Client[F], req: Request[F], file: AFile)(
f: Response[F] => Stream[F, A])
(implicit MR: MonadResourceErr[F])
client: Client[F], req: Request[F], file: AFile)(
f: Response[F] => Stream[F, A])
(implicit MR: MonadResourceErr[F])
: F[Stream[F, A]] =
client.open(req).flatMap {
case DisposableResponse(response, dispose) =>
response.status match {
case Status.NotFound => MR.raiseError(ResourceError.pathNotFound(ResourcePath.Leaf(file)))
case Status.Ok => f(response).onFinalize(dispose).pure[F]
case s => Sync[F].raiseError(new Exception(s"Unexpected status $s"))
}
s3.resourceToDisposable(client.run(req)).flatMap { disposable =>
val response = disposable.unsafeValue
val dispose = disposable.dispose

response.status match {
case Status.NotFound => MR.raiseError(ResourceError.pathNotFound(ResourcePath.Leaf(file)))
case Status.Ok => f(response).onFinalize(dispose).pure[F]
case s => Sync[F].raiseError(new Exception(s"Unexpected status $s"))
}
}
}
33 changes: 31 additions & 2 deletions datasource/src/main/scala/quasar/physical/s3/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@

package quasar.physical.s3

import scala.Predef._
import quasar.Disposable
import quasar.api.datasource.DatasourceType
import eu.timepit.refined.auto._
import quasar.contrib.scalaz.MonadError_

import slamdata.Predef._

import cats.Show
import cats.effect.{ExitCase, Resource}
import eu.timepit.refined.auto._
import scalaz.Monad
import scalaz.syntax.monad._
import shims._

sealed trait S3Error

Expand All @@ -45,4 +53,25 @@ object S3JsonParsing {

package object s3 {
val datasourceKind: DatasourceType = DatasourceType("s3", 1L)

@SuppressWarnings(Array("org.wartremover.warts.Recursion"))
def resourceToDisposable[F[_]: Monad: MonadError_[?[_], Throwable], A](r: Resource[F, A])
: F[Disposable[F, A]] =
r match {
case Resource.Allocate(a) => a map {
case (res, release) => Disposable(res, release(ExitCase.Completed))
}
case Resource.Bind(src, fs) => {
val fdisp: F[Disposable[F, F[Disposable[F, A]]]] =
resourceToDisposable(src)
.map(_.map(a => resourceToDisposable(fs(a))))

val fresource = fdisp.flatMap(_.unsafeValue)
val fclean = fdisp.flatMap(_.dispose)

fresource.map(_.onDispose(fclean))
}
case Resource.Suspend(res) =>
res.flatMap(resourceToDisposable(_))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@ import quasar.api.datasource.DatasourceError.AccessDenied
import quasar.connector.ResourceError
import quasar.contrib.scalaz.MonadError_

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext

import argonaut.Json
import cats.effect.IO
import cats.effect.{ContextShift, IO, Timer}
import org.specs2.mutable.Specification
import shims._

class S3DatasourceModuleSpec extends Specification {
import S3DatasourceModuleSpec._

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
implicit val ec: ExecutionContext = ExecutionContext.global

"rejects invalid credentials" >> {
// slamdata-private-test is a bucket that requires credentials to access
val conf = Json.obj(
Expand All @@ -57,32 +61,34 @@ class S3DatasourceModuleSpec extends Specification {
}
}

"removes AccessKey, SecretKey and Region from credentials" >> {
val conf = Json.obj(
"bucket" -> Json.jString("https://some.bucket.uri"),
"jsonParsing" -> Json.jString("array"),
"credentials" -> Json.obj(
"accessKey" -> Json.jString("some access key"),
"secretKey" -> Json.jString("super secret key"),
"region" -> Json.jString("us-east-1")))

val redactedConf = Json.obj(
"bucket" -> Json.jString("https://some.bucket.uri"),
"jsonParsing" -> Json.jString("array"),
"credentials" -> Json.obj(
"accessKey" -> Json.jString("<REDACTED>"),
"secretKey" -> Json.jString("<REDACTED>"),
"region" -> Json.jString("<REDACTED>")))

S3DatasourceModule.sanitizeConfig(conf) must_== redactedConf
}
"sanitizeConfig" in {
"removes AccessKey, SecretKey and Region from credentials" >> {
val conf = Json.obj(
"bucket" -> Json.jString("https://some.bucket.uri"),
"jsonParsing" -> Json.jString("array"),
"credentials" -> Json.obj(
"accessKey" -> Json.jString("some access key"),
"secretKey" -> Json.jString("super secret key"),
"region" -> Json.jString("us-east-1")))

val redactedConf = Json.obj(
"bucket" -> Json.jString("https://some.bucket.uri"),
"jsonParsing" -> Json.jString("array"),
"credentials" -> Json.obj(
"accessKey" -> Json.jString("<REDACTED>"),
"secretKey" -> Json.jString("<REDACTED>"),
"region" -> Json.jString("<REDACTED>")))

S3DatasourceModule.sanitizeConfig(conf) must_== redactedConf
}

"does nothing when there are no credentials to redact" >> {
val conf = Json.obj(
"bucket" -> Json.jString("https://some.bucket.uri"),
"jsonParsing" -> Json.jString("array"))
"does nothing when there are no credentials to redact" >> {
val conf = Json.obj(
"bucket" -> Json.jString("https://some.bucket.uri"),
"jsonParsing" -> Json.jString("array"))

S3DatasourceModule.sanitizeConfig(conf) must_== conf
S3DatasourceModule.sanitizeConfig(conf) must_== conf
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import quasar.connector.{Datasource, DatasourceSpec, MonadResourceErr, ResourceE
import quasar.connector.ResourceError
import quasar.contrib.scalaz.MonadError_

import scala.concurrent.ExecutionContext

import cats.data.{EitherT, OptionT}
import cats.effect.{Effect, IO}
import cats.effect.{ConcurrentEffect, Effect, IO, Resource}
import cats.syntax.applicative._
import cats.syntax.functor._
import fs2.Stream
import org.http4s.client.blaze.BlazeClientBuilder
import org.http4s.Uri
import org.http4s.client.blaze.Http1Client
import scalaz.{Id, ~>}, Id.Id
import shims._

Expand Down Expand Up @@ -188,16 +190,26 @@ class S3DatasourceSpec extends DatasourceSpec[IO, Stream[IO, ?]] {

val run = λ[IO ~> Id](_.unsafeRunSync)

def mkDatasource[F[_]: Effect: MonadResourceErr](
def mkDatasource[F[_]: ConcurrentEffect: MonadResourceErr](
parsing: S3JsonParsing,
bucket: Uri,
creds: Option[S3Credentials])
: F[Datasource[F, Stream[F, ?], ResourcePath]] =
Http1Client[F]().map(client =>
new S3Datasource[F](client, S3Config(bucket, parsing, creds)))
: F[Datasource[F, Stream[F, ?], ResourcePath]] = {

val ec = ExecutionContext.Implicits.global
val builder = BlazeClientBuilder[F](ec)
val client = unsafeResource(builder.resource)

client map (new S3Datasource[F](_, S3Config(bucket, parsing, creds)))
}

val datasourceLD = run(mkDatasource[IO](S3JsonParsing.LineDelimited, testBucket, None))
val datasource = run(mkDatasource[IO](S3JsonParsing.JsonArray, testBucket, None))

// FIXME: eliminate inheritance from DatasourceSpec and sequence the resource instead of
// ignoring clean up here.
private def unsafeResource[F[_]: Effect, A](r: Resource[F, A]): F[A] =
s3.resourceToDisposable(r).map(_.unsafeValue)
}

object S3DatasourceSpec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.io.{Source, Codec}
import java.io.File

import argonaut.{Parse, DecodeJson}
import cats.effect.{IO, Sync}
import cats.effect.IO
import cats.syntax.flatMap._
import cats.syntax.applicative._
import cats.syntax.option._
Expand All @@ -38,22 +38,31 @@ import SecureS3DatasourceSpec._
final class SecureS3DatasourceSpec extends S3DatasourceSpec {
override val testBucket = Uri.uri("https://s3.amazonaws.com/slamdata-private-test")

// FIXME: close the file once we update to cats-effect 1.0.0 and
// Bracket is available
override val credentials: IO[Option[S3Credentials]] = {
val file = Sync[IO].catchNonFatal(new File("testCredentials.json"))
val msg = "Failed to read testCredentials.json"
val read = IO {
val file = new File(credsFile)
val src = Source.fromFile(file)(Codec.UTF8)

val src = (file >>= (f => IO(Source.fromFile(f)(Codec.UTF8)))).map(_.getLines.mkString)
(src.getLines.mkString, src)
}

val jsonConfig = src >>= (p =>
Parse.parse(p).toOption.map(_.pure[IO]).getOrElse(IO.raiseError(new Exception(msg))))
read.bracket({
case (p, _) => {
val msg = "Failed to read testCredentials.json"
val jsonConfig =
Parse.parse(p).toOption.map(_.pure[IO]).getOrElse(IO.raiseError(new Exception(msg)))

jsonConfig
.map(DecodeJson.of[S3Credentials].decodeJson(_))
.map(_.toOption) >>= (_.fold[IO[Option[S3Credentials]]](IO.raiseError(new Exception(msg)))(c => c.some.pure[IO]))
jsonConfig
.map(DecodeJson.of[S3Credentials].decodeJson(_))
.map(_.toOption) >>= (_.fold[IO[Option[S3Credentials]]](IO.raiseError(new Exception(msg)))(c => c.some.pure[IO]))
}
})({
case (_, src) => IO(src.close)
})
}

private val credsFile = "testCredentials.json"

override val datasourceLD =
run(credentials >>= (creds => mkDatasource[IO](S3JsonParsing.LineDelimited, testBucket, creds)))
override val datasource =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,28 @@ package quasar.physical.s3

import slamdata.Predef._

import scala.concurrent.ExecutionContext

import cats.effect.IO
import cats.data.OptionT
import cats.syntax.applicative._
import org.http4s.{Uri, Request}
import org.http4s.client.blaze.Http1Client
import org.http4s.client.blaze.BlazeClientBuilder
import org.specs2.mutable.Specification
import pathy.Path

final class ChildrenSpec extends Specification {
"lists all resources at the root of the bucket, one per request" >> {
val client = Http1Client[IO]()
implicit val cs = IO.contextShift(ExecutionContext.global)
// Force S3 to return a single element per page in ListObjects,
// to ensure pagination works correctly
val bucket = Uri.uri("https://s3.amazonaws.com/slamdata-public-test/").withQueryParam("max-keys", "1")

val dir = Path.rootDir
val sign: Request[IO] => IO[Request[IO]] = _.pure[IO]
val client = BlazeClientBuilder[IO](ExecutionContext.global).resource

OptionT(client.flatMap(impl.children(_, bucket, dir, sign)))
OptionT(client.use(impl.children(_, bucket, dir, sign)))
.getOrElseF(IO.raiseError(new Exception("Could not list children under the root")))
.flatMap(_.compile.toList).map { children =>
children.length must_== 4
Expand Down
Loading

0 comments on commit 752a49e

Please sign in to comment.