Skip to content
This repository has been archived by the owner on Dec 27, 2023. It is now read-only.

Commit

Permalink
feat: process storage item as input stream (#334)
Browse files Browse the repository at this point in the history
Added class to process storage item as input stream, this class will be useful in cases when it is necessary to obtain metadata from repository package, usually such operation requires unpacking the archive and reading/parsing some file from it.

Ticket: #333
  • Loading branch information
olenagerasimova authored Sep 9, 2021
1 parent 889a7da commit 3a20317
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 0 deletions.
64 changes: 64 additions & 0 deletions src/main/java/com/artipie/asto/streams/ContentAsStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* The MIT License (MIT) Copyright (c) 2020-2021 artipie.com
* https://github.com/artipie/asto/LICENSE.txt
*/
package com.artipie.asto.streams;

import com.artipie.asto.ArtipieIOException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.cqfn.rio.WriteGreed;
import org.cqfn.rio.stream.ReactiveOutputStream;
import org.reactivestreams.Publisher;

/**
* Process content as input stream.
* This class allows to treat storage item as input stream and
* perform some action with this stream (read/uncompress/parse etc).
* @param <T> Result type
* @since 1.4
*/
public final class ContentAsStream<T> {

/**
* Publisher to process.
*/
private final Publisher<ByteBuffer> content;

/**
* Ctor.
* @param content Content
*/
public ContentAsStream(final Publisher<ByteBuffer> content) {
this.content = content;
}

/**
* Process storage item as input stream by performing provided action on it.
* @param action Action to perform
* @return Completion action with the result
*/
public CompletionStage<T> process(final Function<InputStream, T> action) {
return CompletableFuture.supplyAsync(
() -> {
try (
PipedInputStream in = new PipedInputStream();
PipedOutputStream out = new PipedOutputStream(in)
) {
final CompletionStage<Void> ros =
new ReactiveOutputStream(out).write(this.content, WriteGreed.SYSTEM);
final T result = action.apply(in);
return ros.thenApply(nothing -> result);
} catch (final IOException err) {
throw new ArtipieIOException(err);
}
}
).thenCompose(Function.identity());
}
}
11 changes: 11 additions & 0 deletions src/main/java/com/artipie/asto/streams/package-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* The MIT License (MIT) Copyright (c) 2020-2021 artipie.com
* https://github.com/artipie/asto/LICENSE.txt
*/

/**
* Storage items as IO streams.
*
* @since 1.4
*/
package com.artipie.asto.streams;
34 changes: 34 additions & 0 deletions src/test/java/com/artipie/asto/streams/ContentAsStreamTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* The MIT License (MIT) Copyright (c) 2020-2021 artipie.com
* https://github.com/artipie/asto/LICENSE.txt
*/
package com.artipie.asto.streams;

import com.artipie.asto.Content;
import com.artipie.asto.misc.UncheckedIOFunc;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

/**
* Test for {@link ContentAsStream}.
* @since 1.4
*/
class ContentAsStreamTest {

@Test
void processesItem() {
final Charset charset = StandardCharsets.UTF_8;
MatcherAssert.assertThat(
new ContentAsStream<List<String>>(new Content.From("one\ntwo\nthree".getBytes(charset)))
.process(new UncheckedIOFunc<>(input -> IOUtils.readLines(input, charset)))
.toCompletableFuture().join(),
Matchers.contains("one", "two", "three")
);
}

}
11 changes: 11 additions & 0 deletions src/test/java/com/artipie/asto/streams/package-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* The MIT License (MIT) Copyright (c) 2020-2021 artipie.com
* https://github.com/artipie/asto/LICENSE.txt
*/

/**
* Storage items as IO streams tests.
*
* @since 1.4
*/
package com.artipie.asto.streams;

0 comments on commit 3a20317

Please sign in to comment.