From 36d51af77477140b45481eb75ae49fbacbe0cc63 Mon Sep 17 00:00:00 2001 From: Khalid Shakir Date: Fri, 27 Jun 2025 23:17:07 -0400 Subject: [PATCH] Move memory error detection from head to tail --- .../StandardAsyncExecutionActor.scala | 8 +- .../main/scala/cromwell/core/io/AsyncIo.scala | 3 + .../cromwell/core/io/DefaultIoCommand.scala | 6 ++ .../scala/cromwell/core/io/IoCommand.scala | 18 ++++ .../cromwell/core/io/IoCommandBuilder.scala | 5 ++ .../core/path/EvenBetterPathMethods.scala | 64 ++++++++++++- .../scala/cromwell/core/MockIoActor.scala | 3 + .../scala/cromwell/core/SimpleIoActor.scala | 12 +++ .../scala/cromwell/core/io/AsyncIoSpec.scala | 44 +++++++++ .../cromwell/engine/io/nio/NioFlow.scala | 6 ++ .../cromwell/engine/io/IoActorSpec.scala | 89 +++++++++++++++++++ .../filesystems/gcs/GcsPathBuilder.scala | 33 ++++++- 12 files changed, 285 insertions(+), 6 deletions(-) diff --git a/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala b/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala index c416d21a3c2..80d573376aa 100644 --- a/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala +++ b/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala @@ -1421,6 +1421,12 @@ trait StandardAsyncExecutionActor def readFile(path: Path, maxBytes: Option[Int]): Future[String] = asyncIo.contentAsStringAsync(path, maxBytes, failOnOverflow = false) + // Read the tail of a file, or fall back to reading the head. + def readPartial(path: Path, maxBytes: Int): Future[String] = + asyncIo.tailAsStringAsync(path, maxBytes) recoverWith { case _ => + asyncIo.contentAsStringAsync(path, Option(maxBytes), failOnOverflow = false) + } + def checkMemoryRetryRC(): Future[Boolean] = readFile(jobPaths.memoryRetryRC, None) map { codeAsString => Try(codeAsString.trim.toInt) match { @@ -1439,7 +1445,7 @@ trait StandardAsyncExecutionActor } def checkMemoryRetryStderr(memoryRetryError: Path, errorKeys: List[String], maxBytes: Int): Future[Boolean] = - readFile(memoryRetryError, Option(maxBytes)) map { errorContent => + readPartial(memoryRetryError, maxBytes) map { errorContent => errorKeys.exists(errorContent.contains) } diff --git a/core/src/main/scala/cromwell/core/io/AsyncIo.scala b/core/src/main/scala/cromwell/core/io/AsyncIo.scala index 91ef7ac0575..7cd260b7085 100644 --- a/core/src/main/scala/cromwell/core/io/AsyncIo.scala +++ b/core/src/main/scala/cromwell/core/io/AsyncIo.scala @@ -42,6 +42,9 @@ class AsyncIo(ioEndpoint: ActorRef, ioCommandBuilder: IoCommandBuilder) { def contentAsStringAsync(path: Path, maxBytes: Option[Int], failOnOverflow: Boolean): Future[String] = asyncCommand(ioCommandBuilder.contentAsStringCommand(path, maxBytes, failOnOverflow)) + def tailAsStringAsync(path: Path, maxBytes: Int): Future[String] = + asyncCommand(ioCommandBuilder.tailAsString(path, maxBytes)) + def writeAsync(path: Path, content: String, options: OpenOptions, compressPayload: Boolean = false): Future[Unit] = asyncCommand(ioCommandBuilder.writeCommand(path, content, options, compressPayload)) diff --git a/core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala b/core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala index 0bb57a81f06..cf5bae6b718 100644 --- a/core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala +++ b/core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala @@ -3,6 +3,7 @@ package cromwell.core.io import better.files.File.OpenOptions import cromwell.core.callcaching.FileHashStrategy import cromwell.core.io.IoContentAsStringCommand.IoReadOptions +import cromwell.core.io.IoTailAsStringCommand.IoTailOptions import cromwell.core.path.Path object DefaultIoCommand { @@ -16,6 +17,11 @@ object DefaultIoCommand { override def commandDescription: String = s"DefaultIoContentAsStringCommand file '$file' options '$options'" } + case class DefaultIoTailAsStringCommand(override val file: Path, override val options: IoTailOptions) + extends IoTailAsStringCommand(file, options) { + override def commandDescription: String = s"DefaultIoTailAsStringCommand file '$file' options '$options'" + } + case class DefaultIoSizeCommand(override val file: Path) extends IoSizeCommand(file) { override def commandDescription: String = s"DefaultIoSizeCommand file '$file'" } diff --git a/core/src/main/scala/cromwell/core/io/IoCommand.scala b/core/src/main/scala/cromwell/core/io/IoCommand.scala index 8aa1a7cda9a..2aeaa58e9f5 100644 --- a/core/src/main/scala/cromwell/core/io/IoCommand.scala +++ b/core/src/main/scala/cromwell/core/io/IoCommand.scala @@ -9,6 +9,7 @@ import common.util.Backoff import common.util.StringUtil.EnhancedToStringable import cromwell.core.callcaching.FileHashStrategy import cromwell.core.io.IoContentAsStringCommand.IoReadOptions +import cromwell.core.io.IoTailAsStringCommand.IoTailOptions import cromwell.core.path.Path import cromwell.core.retry.SimpleExponentialBackoff import org.slf4j.LoggerFactory @@ -133,6 +134,23 @@ abstract class IoContentAsStringCommand(val file: Path, override lazy val name = "read" } +object IoTailAsStringCommand { + + /** + * Options to customize reading the tail of a file. + * @param maxBytes Only reads up to maxBytes Bytes from the file + */ + case class IoTailOptions(maxBytes: Int) +} + +/** + * Read the tail of a file as a string (load the entire content in memory) + */ +abstract class IoTailAsStringCommand(val file: Path, val options: IoTailOptions) extends SingleFileIoCommand[String] { + override def toString = s"read tail of ${file.pathAsString}" + override lazy val name = "tail" +} + /** * Return the size of file */ diff --git a/core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala b/core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala index 471a026785a..5c0eca2fafd 100644 --- a/core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala +++ b/core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala @@ -4,6 +4,7 @@ import cromwell.core.callcaching.FileHashStrategy import cromwell.core.io.DefaultIoCommand._ import cromwell.core.io.IoCommand.{noopMetricsCallback, IOMetricsCallback} import cromwell.core.io.IoContentAsStringCommand.IoReadOptions +import cromwell.core.io.IoTailAsStringCommand.IoTailOptions import cromwell.core.path.BetterFileMethods.OpenOptions import cromwell.core.path.Path @@ -16,6 +17,7 @@ import scala.util.Try abstract class PartialIoCommandBuilder { def contentAsStringCommand: PartialFunction[(Path, Option[Int], Boolean), Try[IoContentAsStringCommand]] = PartialFunction.empty + def tailAsStringCommand: PartialFunction[(Path, Int), Try[IoTailAsStringCommand]] = PartialFunction.empty def writeCommand: PartialFunction[(Path, String, OpenOptions, Boolean), Try[IoWriteCommand]] = PartialFunction.empty def sizeCommand: PartialFunction[Path, Try[IoSizeCommand]] = PartialFunction.empty def deleteCommand: PartialFunction[(Path, Boolean), Try[IoDeleteCommand]] = PartialFunction.empty @@ -63,6 +65,9 @@ class IoCommandBuilder(partialBuilders: List[PartialIoCommandBuilder] = List.emp DefaultIoContentAsStringCommand(path, IoReadOptions(maxBytes, failOnOverflow)) ) + def tailAsString(path: Path, maxBytes: Int): Try[IoTailAsStringCommand] = + buildOrDefault(_.tailAsStringCommand, (path, maxBytes), DefaultIoTailAsStringCommand(path, IoTailOptions(maxBytes))) + def writeCommand(path: Path, content: String, options: OpenOptions, diff --git a/core/src/main/scala/cromwell/core/path/EvenBetterPathMethods.scala b/core/src/main/scala/cromwell/core/path/EvenBetterPathMethods.scala index e455867e5e6..84a6d74635f 100644 --- a/core/src/main/scala/cromwell/core/path/EvenBetterPathMethods.scala +++ b/core/src/main/scala/cromwell/core/path/EvenBetterPathMethods.scala @@ -4,10 +4,10 @@ import java.io.{BufferedInputStream, BufferedReader, ByteArrayOutputStream, Inpu import java.nio.file.{FileAlreadyExistsException, Files} import java.nio.file.attribute.{PosixFilePermission, PosixFilePermissions} import java.util.zip.GZIPOutputStream - import better.files.File.OpenOptions import cromwell.util.TryWithResource.tryWithResource +import java.nio.channels.Channels import scala.jdk.CollectionConverters._ import scala.concurrent.ExecutionContext import scala.io.Codec @@ -86,12 +86,34 @@ trait EvenBetterPathMethods { final def tailed(tailedSize: Int) = TailedWriter(this, tailedSize) + /** + * Similar to newInputStream, but is overridable in subclasses to provide a different InputStream. + * + * Naming comes from the original method used from Google Cloud Storage, executeMediaAsInputStream() + */ def mediaInputStream(implicit ec: ExecutionContext): InputStream = { + // Use locally(ec) to avoid "parameter value ec in method mediaInputStream is never used" + // It is used in the overridden methods, but not here. // See https://github.com/scala/bug/issues/10347 and https://github.com/scala/bug/issues/10790 locally(ec) newInputStream } + /** + * A tail version of mediaInputStream that reads the last numBytes of the file. + */ + def mediaInputStreamTail(numBytes: Long)(implicit ec: ExecutionContext): InputStream = { + // Use locally(ec) to avoid "parameter value ec in method mediaInputStreamTail is never used" + // It is used in the overridden methods, but not here. + // See https://github.com/scala/bug/issues/10347 and https://github.com/scala/bug/issues/10790 + locally(ec) + val channel = newFileChannel + val size = channel.size() + val startPosition = Math.max(0, size - numBytes) + channel.position(startPosition) + Channels.newInputStream(channel) + } + protected def gzipByteArray(byteArray: Array[Byte]): Array[Byte] = { val byteStream = new ByteArrayOutputStream tryWithResource(() => new GZIPOutputStream(byteStream)) { @@ -132,6 +154,14 @@ trait EvenBetterPathMethods { def withBufferedStream[A](f: BufferedInputStream => A)(implicit ec: ExecutionContext): A = tryWithResource(() => new BufferedInputStream(this.mediaInputStream))(f).recoverWith(fileIoErrorPf).get + /** + * Similar to withBufferedStream, but reads from the tail of the stream. + */ + def withBufferedStreamTail[A](numBytes: Long)(f: BufferedInputStream => A)(implicit ec: ExecutionContext): A = + tryWithResource(() => new BufferedInputStream(this.mediaInputStreamTail(numBytes)))(f) + .recoverWith(fileIoErrorPf) + .get + /** * Returns an Array[Byte] from a Path. Limit the array size to "limit" byte if defined. * @throws IOException if failOnOverflow is true and the file is larger than limit @@ -151,9 +181,35 @@ trait EvenBetterPathMethods { } /** - * Reads the first limitBytes of a file and makes a String. Prepend with an annotation at the start (to say that this is the - * first n bytes). + * Reads the last bytes of a file and returns a String. If the file is larger than limit, it will + * drop the first line and return the rest of the file as a String. + */ + def tailLines(limitBytes: Int)(implicit ec: ExecutionContext): String = + // Add one because we'll be dropping until the end of the first line if the file is larger than limitBytes + withBufferedStreamTail(limitBytes + 1L) { bufferedStream => + val bytes = bufferedStream.readAllBytes() + if (bytes.length <= limitBytes) { + new String(bytes, Codec.UTF8.charSet) + } else { + val newlineIndex = bytes.indexOf('\n') + if (newlineIndex < 0) { + "" + } else { + new String(bytes.drop(newlineIndex + 1), Codec.UTF8.charSet) + } + } + } + + /** + * Reads the limitBytes of a file and makes a String. Prepend with an annotation at the start (to say that this is a + * limited number of bytes). */ def annotatedContentAsStringWithLimit(limitBytes: Int)(implicit ec: ExecutionContext): String = - s"[First $limitBytes bytes]:" + new String(limitFileContent(Option(limitBytes), failOnOverflow = false)) + try + s"[Last $limitBytes bytes]:" + tailLines(limitBytes) + catch { + // If anything goes wrong, such as we cannot seek to the end of the file, we will just return the first n bytes. + case _: Exception => + s"[First $limitBytes bytes]:" + new String(limitFileContent(Option(limitBytes), failOnOverflow = false)) + } } diff --git a/core/src/test/scala/cromwell/core/MockIoActor.scala b/core/src/test/scala/cromwell/core/MockIoActor.scala index dd2e8d114ac..22a84d782b0 100644 --- a/core/src/test/scala/cromwell/core/MockIoActor.scala +++ b/core/src/test/scala/cromwell/core/MockIoActor.scala @@ -17,6 +17,7 @@ class MockIoActor(returnCode: String, stderrSize: Long) extends Actor { case command: IoDeleteCommand => sender() ! IoSuccess(command, ()) case command: IoSizeCommand => sender() ! IoSuccess(command, 0L) case command: IoContentAsStringCommand => sender() ! IoSuccess(command, "0") + case command: IoTailAsStringCommand => sender() ! IoSuccess(command, "") case command: IoExistsCommand => sender() ! IoSuccess(command, false) // With context @@ -26,6 +27,8 @@ class MockIoActor(returnCode: String, stderrSize: Long) extends Actor { case (requestContext: Any, command: IoSizeCommand) => sender() ! (requestContext -> IoSuccess(command, stderrSize)) case (requestContext: Any, command: IoContentAsStringCommand) => sender() ! (requestContext -> IoSuccess(command, returnCode)) + case (requestContext: Any, command: IoTailAsStringCommand) => + sender() ! (requestContext -> IoSuccess(command, returnCode)) case (requestContext: Any, command: IoExistsCommand) => sender() ! (requestContext -> IoSuccess(command, false)) case withPromise: IoCommandWithPromise[_] => self ! ((withPromise.promise, withPromise.ioCommand)) diff --git a/core/src/test/scala/cromwell/core/SimpleIoActor.scala b/core/src/test/scala/cromwell/core/SimpleIoActor.scala index 4ca3d3c1bdf..c4a17358be0 100644 --- a/core/src/test/scala/cromwell/core/SimpleIoActor.scala +++ b/core/src/test/scala/cromwell/core/SimpleIoActor.scala @@ -46,6 +46,12 @@ class SimpleIoActor extends Actor { case Failure(failure) => sender() ! IoFailure(command, failure) } + case command: IoTailAsStringCommand => + Try(command.file.tailLines(command.options.maxBytes)(context.dispatcher)) match { + case Success(content) => sender() ! IoSuccess(command, content) + case Failure(failure) => sender() ! IoFailure(command, failure) + } + case command: IoHashCommand => Try(command.file.md5) match { case Success(hash) => sender() ! IoSuccess(command, hash) @@ -89,6 +95,12 @@ class SimpleIoActor extends Actor { case Failure(failure) => sender() ! (requestContext -> IoFailure(command, failure)) } + case (requestContext: Any, command: IoTailAsStringCommand) => + Try(command.file.tailLines(command.options.maxBytes)(context.dispatcher)) match { + case Success(content) => sender() ! (requestContext -> IoSuccess(command, content)) + case Failure(failure) => sender() ! (requestContext -> IoFailure(command, failure)) + } + case (requestContext: Any, command: IoHashCommand) => Try(command.file.md5) match { case Success(hash) => sender() ! (requestContext -> IoSuccess(command, hash)) diff --git a/core/src/test/scala/cromwell/core/io/AsyncIoSpec.scala b/core/src/test/scala/cromwell/core/io/AsyncIoSpec.scala index 3c7d71cde75..480accdd132 100644 --- a/core/src/test/scala/cromwell/core/io/AsyncIoSpec.scala +++ b/core/src/test/scala/cromwell/core/io/AsyncIoSpec.scala @@ -40,6 +40,50 @@ class AsyncIoSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers { } } + it should "tail asynchronously" in { + val testActor = TestActorRef(new AsyncIoTestActor(simpleIoActor)) + + val testPath = DefaultPathBuilder.createTempFile() + testPath.write("hello") + + testActor.underlyingActor.asyncIo.tailAsStringAsync(testPath, 2) map { result => + assert(result == "") + } + } + + it should "tail a multi-line unix file asynchronously" in { + val testActor = TestActorRef(new AsyncIoTestActor(simpleIoActor)) + + val testPath = DefaultPathBuilder.createTempFile() + testPath.write("hello\nworld") + + testActor.underlyingActor.asyncIo.tailAsStringAsync(testPath, 8) map { result => + assert(result == "world") + } + } + + it should "tail a multi-line windows file asynchronously" in { + val testActor = TestActorRef(new AsyncIoTestActor(simpleIoActor)) + + val testPath = DefaultPathBuilder.createTempFile() + testPath.write("hello\r\nworld") + + testActor.underlyingActor.asyncIo.tailAsStringAsync(testPath, 9) map { result => + assert(result == "world") + } + } + + it should "tail the file if it's under the byte limit asynchronously" in { + val testActor = TestActorRef(new AsyncIoTestActor(simpleIoActor)) + + val testPath = DefaultPathBuilder.createTempFile() + testPath.write("hello") + + testActor.underlyingActor.asyncIo.tailAsStringAsync(testPath, 6) map { result => + assert(result == "hello") + } + } + it should "get size asynchronously" in { val testActor = TestActorRef(new AsyncIoTestActor(simpleIoActor)) diff --git a/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala b/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala index b4ac2461032..3827b79b323 100644 --- a/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala +++ b/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala @@ -71,6 +71,8 @@ class NioFlow(parallelism: Int, case sizeCommand: IoSizeCommand => size(sizeCommand) map sizeCommand.success case readAsStringCommand: IoContentAsStringCommand => readAsString(readAsStringCommand) map readAsStringCommand.success + case tailAsStringCommand: IoTailAsStringCommand => + tailAsString(tailAsStringCommand) map tailAsStringCommand.success case hashCommand: IoHashCommand => hash(hashCommand) map hashCommand.success case touchCommand: IoTouchCommand => touch(touchCommand) map touchCommand.success case existsCommand: IoExistsCommand => exists(existsCommand) map existsCommand.success @@ -111,6 +113,10 @@ class NioFlow(parallelism: Int, ).replaceAll("\\r\\n", "\\\n") } + private def tailAsString(command: IoTailAsStringCommand): IO[String] = IO { + command.file.tailLines(command.options.maxBytes).replaceAll("\\r\\n", "\\\n") + } + private def size(size: IoSizeCommand) = size.file match { case httpPath: HttpPath => IO.fromFuture(IO(httpPath.fetchSize)) diff --git a/engine/src/test/scala/cromwell/engine/io/IoActorSpec.scala b/engine/src/test/scala/cromwell/engine/io/IoActorSpec.scala index 4a00632fed5..27f64637db7 100644 --- a/engine/src/test/scala/cromwell/engine/io/IoActorSpec.scala +++ b/engine/src/test/scala/cromwell/engine/io/IoActorSpec.scala @@ -8,6 +8,7 @@ import cromwell.core.TestKitSuite import cromwell.core.callcaching.FileHashStrategy import cromwell.core.io.DefaultIoCommand._ import cromwell.core.io.IoContentAsStringCommand.IoReadOptions +import cromwell.core.io.IoTailAsStringCommand.IoTailOptions import cromwell.core.io._ import cromwell.core.path.{DefaultPathBuilder, Path} import cromwell.engine.io.IoActor.IoConfig @@ -184,6 +185,94 @@ class IoActorSpec extends TestKitSuite with AnyFlatSpecLike with Matchers with I src.delete() } + it should "tail only the last bytes of file" in { + val testActor = TestActorRef( + factory = new IoActor(IoActorConfig, TestProbe("serviceRegistryActorFirstBytes").ref, "cromwell test"), + name = "testActorLastBytes" + ) + + val src = DefaultPathBuilder.createTempFile() + src.write("hello") + + val tailCommand = DefaultIoTailAsStringCommand(src, IoTailOptions(2)) + + testActor ! tailCommand + expectMsgPF(5 seconds) { + case response: IoSuccess[_] => + response.command.isInstanceOf[IoTailAsStringCommand] shouldBe true + response.result.asInstanceOf[String] shouldBe "" + case response: IoFailure[_] => fail("Expected an IoSuccess", response.failure) + } + + src.delete() + } + + it should "tail a multi-line unix file" in { + val testActor = TestActorRef( + factory = new IoActor(IoActorConfig, TestProbe("serviceRegistryActorRead").ref, "cromwell test"), + name = "testActorTailUnix" + ) + + val src = DefaultPathBuilder.createTempFile() + src.write("hello\nworld") + + val tailCommand = DefaultIoTailAsStringCommand(src, IoTailOptions(8)) + + testActor ! tailCommand + expectMsgPF(5 seconds) { + case response: IoSuccess[_] => + response.command.isInstanceOf[IoTailAsStringCommand] shouldBe true + response.result.asInstanceOf[String] shouldBe "world" + case response: IoFailure[_] => fail("Expected an IoSuccess", response.failure) + } + + src.delete() + } + + it should "tail a multi-line windows file" in { + val testActor = TestActorRef( + factory = new IoActor(IoActorConfig, TestProbe("serviceRegistryActorRead").ref, "cromwell test"), + name = "testActorTailWindows" + ) + + val src = DefaultPathBuilder.createTempFile() + src.write("hello\r\nworld") + + val tailCommand = DefaultIoTailAsStringCommand(src, IoTailOptions(9)) + + testActor ! tailCommand + expectMsgPF(5 seconds) { + case response: IoSuccess[_] => + response.command.isInstanceOf[IoTailAsStringCommand] shouldBe true + response.result.asInstanceOf[String] shouldBe "world" + case response: IoFailure[_] => fail("Expected an IoSuccess", response.failure) + } + + src.delete() + } + + it should "tail the file if it's under the byte limit" in { + val testActor = TestActorRef( + factory = new IoActor(IoActorConfig, TestProbe("serviceRegistryActorByteLimit").ref, "cromwell test"), + name = "testActorTailByteLimit" + ) + + val src = DefaultPathBuilder.createTempFile() + src.write("hello") + + val tailCommand = DefaultIoTailAsStringCommand(src, IoTailOptions(6)) + + testActor ! tailCommand + expectMsgPF(5 seconds) { + case response: IoSuccess[_] => + response.command.isInstanceOf[IoTailAsStringCommand] shouldBe true + response.result.asInstanceOf[String] shouldBe "hello" + case response: IoFailure[_] => fail("Expected an IoSuccess", response.failure) + } + + src.delete() + } + it should "return a file size" in { val testActor = TestActorRef( factory = new IoActor(IoActorConfig, TestProbe("serviceRegistryActorSize").ref, "cromwell test"), diff --git a/filesystems/gcs/src/main/scala/cromwell/filesystems/gcs/GcsPathBuilder.scala b/filesystems/gcs/src/main/scala/cromwell/filesystems/gcs/GcsPathBuilder.scala index c16e236c9d9..cc7fe889639 100644 --- a/filesystems/gcs/src/main/scala/cromwell/filesystems/gcs/GcsPathBuilder.scala +++ b/filesystems/gcs/src/main/scala/cromwell/filesystems/gcs/GcsPathBuilder.scala @@ -4,8 +4,9 @@ import akka.actor.ActorSystem import akka.http.scaladsl.model.ContentTypes import better.files.File.OpenOptions import cats.effect.IO +import com.google.api.client.http.HttpHeaders import com.google.api.gax.retrying.RetrySettings -import com.google.api.services.storage.StorageScopes +import com.google.api.services.storage.{Storage, StorageScopes} import com.google.api.services.storage.model.StorageObject import com.google.auth.Credentials import com.google.cloud.storage.Storage.BlobTargetOption @@ -253,6 +254,36 @@ case class GcsPath private[gcs] (nioPath: NioPath, runOnEc(recoverFromProjectNotProvided(this, request)).unsafeRunSync() } + override def mediaInputStreamTail(numBytes: Long)(implicit ec: ExecutionContext): InputStream = { + def request(withProject: Boolean): InputStream = { + val size = + apiStorage + .objects() + .get(objectBlobId.get.getBucket, objectBlobId.get.getName) + .setUserProject(withProject.option(projectId).orNull) + .execute() + .getSize + .longValueExact() + + val startPosition = Math.max(0, size - numBytes) + val rangeHeader = String.format("bytes=%d-", startPosition) + + // Use apiStorage here instead of cloudStorage, because apiStorage throws now if the bucket has requester pays, + // whereas cloudStorage creates the input stream anyway and only throws once `read` is called (which only happens in NioFlow) + apiStorage + .objects() + .get(objectBlobId.get.getBucket, objectBlobId.get.getName) + .setUserProject(withProject.option(projectId).orNull) + .setRequestHeaders(new HttpHeaders().setRange(rangeHeader)) + .asInstanceOf[Storage#Objects#Get] + .executeMediaAsInputStream() + } + + // Since the NIO interface is synchronous we need to run this synchronously here. It is however wrapped in a Future + // in the NioFlow so we don't need to worry about exceptions + runOnEc(recoverFromProjectNotProvided(this, request)).unsafeRunSync() + } + override def pathWithoutScheme: String = { val gcsPath = cloudStoragePath gcsPath.bucket + gcsPath.toAbsolutePath.toString