Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 40 additions & 11 deletions src/main/java/io/fabric8/maven/docker/StartMojo.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import com.google.common.util.concurrent.MoreExecutors;
import io.fabric8.maven.docker.access.DockerAccessException;
Expand All @@ -42,6 +44,10 @@
import io.fabric8.maven.docker.service.helper.StartContainerExecutor;
import io.fabric8.maven.docker.util.ContainerNamingUtil;
import io.fabric8.maven.docker.util.StartOrderResolver;
import io.fabric8.maven.docker.wait.ExitChecker;
import io.fabric8.maven.docker.wait.PreconditionFailedException;
import io.fabric8.maven.docker.wait.WaitTimeoutException;
import io.fabric8.maven.docker.wait.WaitUtil;
import org.apache.maven.plugin.MojoExecutionException;
import org.apache.maven.plugins.annotations.LifecyclePhase;
import org.apache.maven.plugins.annotations.Mojo;
Expand Down Expand Up @@ -141,7 +147,7 @@ public synchronized void executeInternal(final ServiceHub hub) throws DockerAcce
// All aliases which are provided in the image configuration:
final Set<String> imageAliases = new HashSet<>();
// Remember all aliases which has been started
final Set<String> startedContainerAliases = new HashSet<>();
final Set<StartedContainer> startedContainers = new HashSet<>();

// All images to to start
Queue<ImageConfiguration> imagesWaitingToStart = prepareStart(hub, queryService, runService, imageAliases);
Expand All @@ -152,14 +158,15 @@ public synchronized void executeInternal(final ServiceHub hub) throws DockerAcce
// Prepare the shutdown hook for stopping containers if we are going to follow them. Add the hook before starting any
// of the containers so that partial or aborted starts will behave the same as fully-successful ones.
if (follow) {
log.info("Shutdown-hook for expected stoppings added");
runService.addShutdownHookForStoppingContainers(keepContainer, removeVolumes, autoCreateCustomNetworks);
}

// Loop until every image has been started and the start of all images has been completed
while (!hasBeenAllImagesStarted(imagesWaitingToStart, imagesStarting)) {

final List<ImageConfiguration> imagesReadyToStart =
getImagesWhoseDependenciesHasStarted(imagesWaitingToStart, startedContainerAliases, imageAliases);
getImagesWhoseDependenciesHasStarted(imagesWaitingToStart, startedContainers, imageAliases);

for (final ImageConfiguration image : imagesReadyToStart) {

Expand All @@ -170,18 +177,40 @@ public synchronized void executeInternal(final ServiceHub hub) throws DockerAcce
imagesWaitingToStart.remove(image);

if (!startParallel) {
waitForStartedContainer(containerStartupService, startedContainerAliases, imagesStarting);
waitForStartedContainer(containerStartupService, startedContainers, imagesStarting);
}
}

if (startParallel) {
waitForStartedContainer(containerStartupService, startedContainerAliases, imagesStarting);
waitForStartedContainer(containerStartupService, startedContainers, imagesStarting);
}
}
log.info("All containers started!");
portMappingPropertyWriteHelper.write();

if (follow) {
wait();
log.info("Wait for containers finish - %s!", startedContainers.stream().map(sc->sc.imageConfig.getAlias() +"[" + sc.containerId + "]").collect(Collectors.joining(", ")));
try {
WaitUtil.wait(
new WaitUtil.Precondition() {
@Override
public boolean isOk() {
return true;
}

@Override
public void cleanup() {

}
},
Integer.MAX_VALUE,
startedContainers.stream().map(cid -> new ExitChecker(queryService, cid.containerId)).collect(Collectors.toSet()));
} catch (WaitTimeoutException e) {
log.warn("Infinite timeout timeouted!");
} catch (PreconditionFailedException e) {
log.error("Null precondition fails!");
}
log.info("All container stopped - finalized ...");
}

success = true;
Expand All @@ -204,14 +233,14 @@ public synchronized void executeInternal(final ServiceHub hub) throws DockerAcce

private void waitForStartedContainer(
final ExecutorCompletionService<StartedContainer> containerStartupService,
final Set<String> startedContainerAliases, final Queue<ImageConfiguration> imagesStarting)
final Set<StartedContainer> startedContainerAliases, final Queue<ImageConfiguration> imagesStarting)
throws InterruptedException, IOException, ExecException {
final Future<StartedContainer> startedContainerFuture = containerStartupService.take();
try {
final StartedContainer startedContainer = startedContainerFuture.get();
final ImageConfiguration imageConfig = startedContainer.imageConfig;

updateAliasesSet(startedContainerAliases, imageConfig.getAlias());
log.info("sc: %s,%s", startedContainer.containerId, imageConfig.getAlias());
updateAliasesSet(startedContainerAliases, startedContainer);

// All done with this image
imagesStarting.remove(imageConfig);
Expand Down Expand Up @@ -257,7 +286,7 @@ private void rethrowCause(ExecutionException e) throws IOException, InterruptedE
}
}

private void updateAliasesSet(Set<String> aliasesSet, String alias) {
private <T> void updateAliasesSet(Set<T> aliasesSet, T alias) {
// Add the alias to the set only when it is set. When it's
// not set it cant be used in the dependency resolution anyway, so we are ignoring
// it hence.
Expand Down Expand Up @@ -307,15 +336,15 @@ private void startImage(final ImageConfiguration imageConfig,

// Pick out all images who can be started right now because all their dependencies has been started
private List<ImageConfiguration> getImagesWhoseDependenciesHasStarted(Queue<ImageConfiguration> imagesRemaining,
Set<String> containersStarted,
Set<StartedContainer> containersStarted,
Set<String> aliases) {
final List<ImageConfiguration> ret = new ArrayList<>();

// Check for all images which can be already started
for (ImageConfiguration imageWaitingToStart : imagesRemaining) {
List<String> allDependencies = imageWaitingToStart.getDependencies();
List<String> aliasDependencies = filterOutNonAliases(aliases, allDependencies);
if (containersStarted.containsAll(aliasDependencies)) {
if (aliasDependencies.stream().allMatch(ad -> containersStarted.stream().map(sc -> sc.imageConfig.getAlias()).anyMatch(Predicate.isEqual(ad)))) {
ret.add(imageWaitingToStart);
}
}
Expand Down
17 changes: 14 additions & 3 deletions src/main/java/io/fabric8/maven/docker/access/log/LogRequestor.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class LogRequestor extends Thread implements LogGetHandle {

private final UrlBuilder urlBuilder;

InputStream is = null;
private boolean finished = true;

/**
* Create a helper object for requesting log entries synchronously ({@link #fetchLogs()}) or asynchronously ({@link #start()}.
*
Expand Down Expand Up @@ -98,14 +101,17 @@ public void fetchLogs() {
// Fetch log asynchronously as stream and follow stream
public void run() {
try {
finished = false;
callback.open();
this.request = getLogRequest(true);
final HttpResponse response = client.execute(request);
parseResponse(response);
} catch (LogCallback.DoneException e) {
// Signifies we're finished with the log stream.
} catch (IOException e) {
callback.error("IO Error while requesting logs: " + e + " " + Thread.currentThread().getName());
if (!finished) {
callback.error("IO Error while requesting logs: " + e + " " + Thread.currentThread().getName());
}
} finally {
callback.close();
}
Expand Down Expand Up @@ -181,12 +187,15 @@ private void parseResponse(HttpResponse response) throws LogCallback.DoneExcepti
throw new LogCallback.DoneException();
}

try (InputStream is = response.getEntity().getContent()) {
is = response.getEntity().getContent();
try {
while (true) {
if (!readStreamFrame(is)) {
return;
}
}
} finally {
is.close();
}
}

Expand All @@ -209,7 +218,9 @@ private HttpUriRequest getLogRequest(boolean follow) {
@Override
public void finish() {
if (request != null) {
request.abort();
finished = true;
final HttpUriRequest req = request;
new Thread(req::abort).start();
request = null;
}
}
Expand Down
37 changes: 37 additions & 0 deletions src/main/java/io/fabric8/maven/docker/wait/ExitChecker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.fabric8.maven.docker.wait;

import io.fabric8.maven.docker.access.DockerAccessException;
import io.fabric8.maven.docker.service.QueryService;
import io.fabric8.maven.docker.util.Logger;

public class ExitChecker implements WaitChecker {

private final String containerId;
private final QueryService queryService;

public ExitChecker(QueryService queryService, String containerId) {
this.containerId = containerId;
this.queryService = queryService;
}

@Override
public boolean check() {
try {
Integer exitCodeActual = queryService.getMandatoryContainer(containerId).getExitCode();
// container still running
return exitCodeActual != null;
} catch (DockerAccessException e) {
return true;
}
}

@Override
public void cleanUp() {
// No cleanup required
}

@Override
public String getLogLabel() {
return "on exit code";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ public boolean check() {
@Override
public void cleanUp() {
if (logHandle != null) {
logHandle.finish();
try {
logHandle.finish();
} catch (UnsupportedOperationException e) {
log.warn("abort not supported - continue!");
}
}
}

Expand Down