Skip to content

Commit 53d7b10

Browse files
Copilotslachiewicz
andcommitted
Add StreamPollFeeder implementation and test
Co-authored-by: slachiewicz <[email protected]>
1 parent a92da83 commit 53d7b10

File tree

3 files changed

+192
-5
lines changed

3 files changed

+192
-5
lines changed

src/main/java/org/codehaus/plexus/util/cli/CommandLineUtils.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,13 +146,13 @@ public void run() {
146146

147147
@Override
148148
public Integer call() throws CommandLineException {
149-
StreamFeeder inputFeeder = null;
149+
StreamPollFeeder inputFeeder = null;
150150
StreamPumper outputPumper = null;
151151
StreamPumper errorPumper = null;
152152
boolean success = false;
153153
try {
154154
if (systemIn != null) {
155-
inputFeeder = new StreamFeeder(systemIn, p.getOutputStream());
155+
inputFeeder = new StreamPollFeeder(systemIn, p.getOutputStream());
156156
inputFeeder.start();
157157
}
158158

@@ -288,11 +288,11 @@ private static void handleException(final StreamPumper streamPumper, final Strin
288288
}
289289
}
290290

291-
private static void handleException(final StreamFeeder streamFeeder, final String streamName)
291+
private static void handleException(final StreamPollFeeder streamPollFeeder, final String streamName)
292292
throws CommandLineException {
293-
if (streamFeeder.getException() != null) {
293+
if (streamPollFeeder.getException() != null) {
294294
throw new CommandLineException(
295-
String.format("Failure processing %s.", streamName), streamFeeder.getException());
295+
String.format("Failure processing %s.", streamName), streamPollFeeder.getException());
296296
}
297297
}
298298

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package org.codehaus.plexus.util.cli;
2+
3+
/*
4+
* Copyright The Codehaus Foundation.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
import java.io.IOException;
20+
import java.io.InputStream;
21+
import java.io.OutputStream;
22+
23+
/**
24+
* Poll InputStream for available data and write the output to an OutputStream.
25+
*
26+
* @author <a href="mailto:[email protected]">Trygve Laugst&oslash;l</a>
27+
*/
28+
public class StreamPollFeeder extends AbstractStreamHandler {
29+
30+
public static final int BUF_LEN = 80;
31+
32+
private InputStream input;
33+
34+
private OutputStream output;
35+
36+
private volatile Throwable exception = null;
37+
38+
private final Object lock = new Object();
39+
40+
/**
41+
* Create a new StreamPollFeeder
42+
*
43+
* @param input Stream to read from
44+
* @param output Stream to write to
45+
*/
46+
public StreamPollFeeder(InputStream input, OutputStream output) {
47+
super();
48+
this.input = input;
49+
this.output = output;
50+
}
51+
52+
@Override
53+
public void run() {
54+
byte[] buf = new byte[BUF_LEN];
55+
56+
try {
57+
while (!isDone()) {
58+
if (input.available() > 0) {
59+
int i = input.read(buf);
60+
if (i > 0) {
61+
output.write(buf, 0, i);
62+
output.flush();
63+
} else {
64+
setDone();
65+
}
66+
} else {
67+
synchronized (lock) {
68+
if (!isDone()) {
69+
lock.wait(100);
70+
}
71+
}
72+
}
73+
}
74+
} catch (IOException e) {
75+
exception = e;
76+
} catch (InterruptedException e) {
77+
Thread.currentThread().interrupt();
78+
} finally {
79+
close();
80+
}
81+
}
82+
83+
public void close() {
84+
if (input != null) {
85+
synchronized (input) {
86+
try {
87+
input.close();
88+
} catch (IOException ex) {
89+
if (exception == null) {
90+
exception = ex;
91+
}
92+
}
93+
94+
input = null;
95+
}
96+
}
97+
98+
if (output != null) {
99+
synchronized (output) {
100+
try {
101+
output.close();
102+
} catch (IOException ex) {
103+
if (exception == null) {
104+
exception = ex;
105+
}
106+
}
107+
108+
output = null;
109+
}
110+
}
111+
}
112+
113+
/**
114+
* @since 3.1.0
115+
* @return the Exception
116+
*/
117+
public Throwable getException() {
118+
return exception;
119+
}
120+
121+
@Override
122+
public synchronized void waitUntilDone() throws InterruptedException {
123+
synchronized (lock) {
124+
setDone();
125+
lock.notifyAll();
126+
}
127+
128+
join();
129+
}
130+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package org.codehaus.plexus.util.cli;
2+
3+
/*
4+
* Copyright The Codehaus Foundation.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
import java.io.ByteArrayInputStream;
20+
import java.io.ByteArrayOutputStream;
21+
import java.io.IOException;
22+
23+
import org.junit.jupiter.api.Test;
24+
25+
import static org.junit.jupiter.api.Assertions.assertEquals;
26+
import static org.junit.jupiter.api.Assertions.assertNull;
27+
28+
public class StreamPollFeederTest {
29+
30+
@Test
31+
public void dataShouldBeCopied() throws InterruptedException, IOException {
32+
33+
StringBuilder TEST_DATA = new StringBuilder();
34+
for (int i = 0; i < 100; i++) {
35+
TEST_DATA.append("TestData");
36+
}
37+
38+
ByteArrayInputStream inputStream =
39+
new ByteArrayInputStream(TEST_DATA.toString().getBytes());
40+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
41+
42+
StreamPollFeeder streamPollFeeder = new StreamPollFeeder(inputStream, outputStream);
43+
44+
streamPollFeeder.start();
45+
46+
// wait until all data from steam will be read
47+
while (outputStream.size() < TEST_DATA.length()) {
48+
Thread.sleep(100);
49+
}
50+
51+
// wait until process finish
52+
streamPollFeeder.waitUntilDone();
53+
assertNull(streamPollFeeder.getException());
54+
55+
assertEquals(TEST_DATA.toString(), outputStream.toString());
56+
}
57+
}

0 commit comments

Comments
 (0)