Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1421,6 +1421,12 @@ trait StandardAsyncExecutionActor
def readFile(path: Path, maxBytes: Option[Int]): Future[String] =
asyncIo.contentAsStringAsync(path, maxBytes, failOnOverflow = false)

// Read the tail of a file, or fall back to reading the head.
def readPartial(path: Path, maxBytes: Int): Future[String] =
asyncIo.tailAsStringAsync(path, maxBytes) recoverWith { case _ =>
asyncIo.contentAsStringAsync(path, Option(maxBytes), failOnOverflow = false)
}

def checkMemoryRetryRC(): Future[Boolean] =
readFile(jobPaths.memoryRetryRC, None) map { codeAsString =>
Try(codeAsString.trim.toInt) match {
Expand All @@ -1439,7 +1445,7 @@ trait StandardAsyncExecutionActor
}

def checkMemoryRetryStderr(memoryRetryError: Path, errorKeys: List[String], maxBytes: Int): Future[Boolean] =
readFile(memoryRetryError, Option(maxBytes)) map { errorContent =>
readPartial(memoryRetryError, maxBytes) map { errorContent =>
errorKeys.exists(errorContent.contains)
}

Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/cromwell/core/io/AsyncIo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class AsyncIo(ioEndpoint: ActorRef, ioCommandBuilder: IoCommandBuilder) {
def contentAsStringAsync(path: Path, maxBytes: Option[Int], failOnOverflow: Boolean): Future[String] =
asyncCommand(ioCommandBuilder.contentAsStringCommand(path, maxBytes, failOnOverflow))

def tailAsStringAsync(path: Path, maxBytes: Int): Future[String] =
asyncCommand(ioCommandBuilder.tailAsString(path, maxBytes))

def writeAsync(path: Path, content: String, options: OpenOptions, compressPayload: Boolean = false): Future[Unit] =
asyncCommand(ioCommandBuilder.writeCommand(path, content, options, compressPayload))

Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cromwell.core.io
import better.files.File.OpenOptions
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io.IoContentAsStringCommand.IoReadOptions
import cromwell.core.io.IoTailAsStringCommand.IoTailOptions
import cromwell.core.path.Path

object DefaultIoCommand {
Expand All @@ -16,6 +17,11 @@ object DefaultIoCommand {
override def commandDescription: String = s"DefaultIoContentAsStringCommand file '$file' options '$options'"
}

case class DefaultIoTailAsStringCommand(override val file: Path, override val options: IoTailOptions)
extends IoTailAsStringCommand(file, options) {
override def commandDescription: String = s"DefaultIoTailAsStringCommand file '$file' options '$options'"
}

case class DefaultIoSizeCommand(override val file: Path) extends IoSizeCommand(file) {
override def commandDescription: String = s"DefaultIoSizeCommand file '$file'"
}
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/scala/cromwell/core/io/IoCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import common.util.Backoff
import common.util.StringUtil.EnhancedToStringable
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io.IoContentAsStringCommand.IoReadOptions
import cromwell.core.io.IoTailAsStringCommand.IoTailOptions
import cromwell.core.path.Path
import cromwell.core.retry.SimpleExponentialBackoff
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -133,6 +134,23 @@ abstract class IoContentAsStringCommand(val file: Path,
override lazy val name = "read"
}

object IoTailAsStringCommand {

/**
* Options to customize reading the tail of a file.
* @param maxBytes Only reads up to maxBytes Bytes from the file
*/
case class IoTailOptions(maxBytes: Int)
}

/**
* Read the tail of a file as a string (load the entire content in memory)
*/
abstract class IoTailAsStringCommand(val file: Path, val options: IoTailOptions) extends SingleFileIoCommand[String] {
override def toString = s"read tail of ${file.pathAsString}"
override lazy val name = "tail"
}

/**
* Return the size of file
*/
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io.DefaultIoCommand._
import cromwell.core.io.IoCommand.{noopMetricsCallback, IOMetricsCallback}
import cromwell.core.io.IoContentAsStringCommand.IoReadOptions
import cromwell.core.io.IoTailAsStringCommand.IoTailOptions
import cromwell.core.path.BetterFileMethods.OpenOptions
import cromwell.core.path.Path

Expand All @@ -16,6 +17,7 @@ import scala.util.Try
abstract class PartialIoCommandBuilder {
def contentAsStringCommand: PartialFunction[(Path, Option[Int], Boolean), Try[IoContentAsStringCommand]] =
PartialFunction.empty
def tailAsStringCommand: PartialFunction[(Path, Int), Try[IoTailAsStringCommand]] = PartialFunction.empty
def writeCommand: PartialFunction[(Path, String, OpenOptions, Boolean), Try[IoWriteCommand]] = PartialFunction.empty
def sizeCommand: PartialFunction[Path, Try[IoSizeCommand]] = PartialFunction.empty
def deleteCommand: PartialFunction[(Path, Boolean), Try[IoDeleteCommand]] = PartialFunction.empty
Expand Down Expand Up @@ -63,6 +65,9 @@ class IoCommandBuilder(partialBuilders: List[PartialIoCommandBuilder] = List.emp
DefaultIoContentAsStringCommand(path, IoReadOptions(maxBytes, failOnOverflow))
)

def tailAsString(path: Path, maxBytes: Int): Try[IoTailAsStringCommand] =
buildOrDefault(_.tailAsStringCommand, (path, maxBytes), DefaultIoTailAsStringCommand(path, IoTailOptions(maxBytes)))

def writeCommand(path: Path,
content: String,
options: OpenOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import java.io.{BufferedInputStream, BufferedReader, ByteArrayOutputStream, Inpu
import java.nio.file.{FileAlreadyExistsException, Files}
import java.nio.file.attribute.{PosixFilePermission, PosixFilePermissions}
import java.util.zip.GZIPOutputStream

import better.files.File.OpenOptions
import cromwell.util.TryWithResource.tryWithResource

import java.nio.channels.Channels
import scala.jdk.CollectionConverters._
import scala.concurrent.ExecutionContext
import scala.io.Codec
Expand Down Expand Up @@ -86,12 +86,34 @@ trait EvenBetterPathMethods {

final def tailed(tailedSize: Int) = TailedWriter(this, tailedSize)

/**
* Similar to newInputStream, but is overridable in subclasses to provide a different InputStream.
*
* Naming comes from the original method used from Google Cloud Storage, executeMediaAsInputStream()
*/
def mediaInputStream(implicit ec: ExecutionContext): InputStream = {
// Use locally(ec) to avoid "parameter value ec in method mediaInputStream is never used"
// It is used in the overridden methods, but not here.
// See https://github.com/scala/bug/issues/10347 and https://github.com/scala/bug/issues/10790
locally(ec)
newInputStream
}

/**
* A tail version of mediaInputStream that reads the last numBytes of the file.
*/
def mediaInputStreamTail(numBytes: Long)(implicit ec: ExecutionContext): InputStream = {
// Use locally(ec) to avoid "parameter value ec in method mediaInputStreamTail is never used"
// It is used in the overridden methods, but not here.
// See https://github.com/scala/bug/issues/10347 and https://github.com/scala/bug/issues/10790
locally(ec)
val channel = newFileChannel
val size = channel.size()
val startPosition = Math.max(0, size - numBytes)
channel.position(startPosition)
Channels.newInputStream(channel)
}

protected def gzipByteArray(byteArray: Array[Byte]): Array[Byte] = {
val byteStream = new ByteArrayOutputStream
tryWithResource(() => new GZIPOutputStream(byteStream)) {
Expand Down Expand Up @@ -132,6 +154,14 @@ trait EvenBetterPathMethods {
def withBufferedStream[A](f: BufferedInputStream => A)(implicit ec: ExecutionContext): A =
tryWithResource(() => new BufferedInputStream(this.mediaInputStream))(f).recoverWith(fileIoErrorPf).get

/**
* Similar to withBufferedStream, but reads from the tail of the stream.
*/
def withBufferedStreamTail[A](numBytes: Long)(f: BufferedInputStream => A)(implicit ec: ExecutionContext): A =
tryWithResource(() => new BufferedInputStream(this.mediaInputStreamTail(numBytes)))(f)
.recoverWith(fileIoErrorPf)
.get

/**
* Returns an Array[Byte] from a Path. Limit the array size to "limit" byte if defined.
* @throws IOException if failOnOverflow is true and the file is larger than limit
Expand All @@ -151,9 +181,35 @@ trait EvenBetterPathMethods {
}

/**
* Reads the first limitBytes of a file and makes a String. Prepend with an annotation at the start (to say that this is the
* first n bytes).
* Reads the last bytes of a file and returns a String. If the file is larger than limit, it will
* drop the first line and return the rest of the file as a String.
*/
def tailLines(limitBytes: Int)(implicit ec: ExecutionContext): String =
// Add one because we'll be dropping until the end of the first line if the file is larger than limitBytes
withBufferedStreamTail(limitBytes + 1L) { bufferedStream =>
val bytes = bufferedStream.readAllBytes()
if (bytes.length <= limitBytes) {
new String(bytes, Codec.UTF8.charSet)
} else {
val newlineIndex = bytes.indexOf('\n')
if (newlineIndex < 0) {
""
} else {
new String(bytes.drop(newlineIndex + 1), Codec.UTF8.charSet)
}
}
}

/**
* Reads the limitBytes of a file and makes a String. Prepend with an annotation at the start (to say that this is a
* limited number of bytes).
*/
def annotatedContentAsStringWithLimit(limitBytes: Int)(implicit ec: ExecutionContext): String =
s"[First $limitBytes bytes]:" + new String(limitFileContent(Option(limitBytes), failOnOverflow = false))
try
s"[Last $limitBytes bytes]:" + tailLines(limitBytes)
catch {
// If anything goes wrong, such as we cannot seek to the end of the file, we will just return the first n bytes.
case _: Exception =>
s"[First $limitBytes bytes]:" + new String(limitFileContent(Option(limitBytes), failOnOverflow = false))
}
}
3 changes: 3 additions & 0 deletions core/src/test/scala/cromwell/core/MockIoActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class MockIoActor(returnCode: String, stderrSize: Long) extends Actor {
case command: IoDeleteCommand => sender() ! IoSuccess(command, ())
case command: IoSizeCommand => sender() ! IoSuccess(command, 0L)
case command: IoContentAsStringCommand => sender() ! IoSuccess(command, "0")
case command: IoTailAsStringCommand => sender() ! IoSuccess(command, "")
case command: IoExistsCommand => sender() ! IoSuccess(command, false)

// With context
Expand All @@ -26,6 +27,8 @@ class MockIoActor(returnCode: String, stderrSize: Long) extends Actor {
case (requestContext: Any, command: IoSizeCommand) => sender() ! (requestContext -> IoSuccess(command, stderrSize))
case (requestContext: Any, command: IoContentAsStringCommand) =>
sender() ! (requestContext -> IoSuccess(command, returnCode))
case (requestContext: Any, command: IoTailAsStringCommand) =>
sender() ! (requestContext -> IoSuccess(command, returnCode))
case (requestContext: Any, command: IoExistsCommand) => sender() ! (requestContext -> IoSuccess(command, false))

case withPromise: IoCommandWithPromise[_] => self ! ((withPromise.promise, withPromise.ioCommand))
Expand Down
12 changes: 12 additions & 0 deletions core/src/test/scala/cromwell/core/SimpleIoActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ class SimpleIoActor extends Actor {
case Failure(failure) => sender() ! IoFailure(command, failure)
}

case command: IoTailAsStringCommand =>
Try(command.file.tailLines(command.options.maxBytes)(context.dispatcher)) match {
case Success(content) => sender() ! IoSuccess(command, content)
case Failure(failure) => sender() ! IoFailure(command, failure)
}

case command: IoHashCommand =>
Try(command.file.md5) match {
case Success(hash) => sender() ! IoSuccess(command, hash)
Expand Down Expand Up @@ -89,6 +95,12 @@ class SimpleIoActor extends Actor {
case Failure(failure) => sender() ! (requestContext -> IoFailure(command, failure))
}

case (requestContext: Any, command: IoTailAsStringCommand) =>
Try(command.file.tailLines(command.options.maxBytes)(context.dispatcher)) match {
case Success(content) => sender() ! (requestContext -> IoSuccess(command, content))
case Failure(failure) => sender() ! (requestContext -> IoFailure(command, failure))
}

case (requestContext: Any, command: IoHashCommand) =>
Try(command.file.md5) match {
case Success(hash) => sender() ! (requestContext -> IoSuccess(command, hash))
Expand Down
44 changes: 44 additions & 0 deletions core/src/test/scala/cromwell/core/io/AsyncIoSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,50 @@ class AsyncIoSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers {
}
}

it should "tail asynchronously" in {
val testActor = TestActorRef(new AsyncIoTestActor(simpleIoActor))

val testPath = DefaultPathBuilder.createTempFile()
testPath.write("hello")

testActor.underlyingActor.asyncIo.tailAsStringAsync(testPath, 2) map { result =>
assert(result == "")
}
}

it should "tail a multi-line unix file asynchronously" in {
val testActor = TestActorRef(new AsyncIoTestActor(simpleIoActor))

val testPath = DefaultPathBuilder.createTempFile()
testPath.write("hello\nworld")

testActor.underlyingActor.asyncIo.tailAsStringAsync(testPath, 8) map { result =>
assert(result == "world")
}
}

it should "tail a multi-line windows file asynchronously" in {
val testActor = TestActorRef(new AsyncIoTestActor(simpleIoActor))

val testPath = DefaultPathBuilder.createTempFile()
testPath.write("hello\r\nworld")

testActor.underlyingActor.asyncIo.tailAsStringAsync(testPath, 9) map { result =>
assert(result == "world")
}
}

it should "tail the file if it's under the byte limit asynchronously" in {
val testActor = TestActorRef(new AsyncIoTestActor(simpleIoActor))

val testPath = DefaultPathBuilder.createTempFile()
testPath.write("hello")

testActor.underlyingActor.asyncIo.tailAsStringAsync(testPath, 6) map { result =>
assert(result == "hello")
}
}

it should "get size asynchronously" in {
val testActor = TestActorRef(new AsyncIoTestActor(simpleIoActor))

Expand Down
6 changes: 6 additions & 0 deletions engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class NioFlow(parallelism: Int,
case sizeCommand: IoSizeCommand => size(sizeCommand) map sizeCommand.success
case readAsStringCommand: IoContentAsStringCommand =>
readAsString(readAsStringCommand) map readAsStringCommand.success
case tailAsStringCommand: IoTailAsStringCommand =>
tailAsString(tailAsStringCommand) map tailAsStringCommand.success
case hashCommand: IoHashCommand => hash(hashCommand) map hashCommand.success
case touchCommand: IoTouchCommand => touch(touchCommand) map touchCommand.success
case existsCommand: IoExistsCommand => exists(existsCommand) map existsCommand.success
Expand Down Expand Up @@ -111,6 +113,10 @@ class NioFlow(parallelism: Int,
).replaceAll("\\r\\n", "\\\n")
}

private def tailAsString(command: IoTailAsStringCommand): IO[String] = IO {
command.file.tailLines(command.options.maxBytes).replaceAll("\\r\\n", "\\\n")
}

private def size(size: IoSizeCommand) =
size.file match {
case httpPath: HttpPath => IO.fromFuture(IO(httpPath.fetchSize))
Expand Down
Loading
Loading