From b8219273438649b80aa71d1a82b4614eb0ce2cb9 Mon Sep 17 00:00:00 2001 From: Ilya Sadykov Date: Fri, 20 Nov 2015 19:05:17 +0300 Subject: [PATCH 1/2] Migrate to newer pessimistic mongo --- camelot-ext/camelot-mongodb/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/camelot-ext/camelot-mongodb/pom.xml b/camelot-ext/camelot-mongodb/pom.xml index 750f2e2..8c376b9 100644 --- a/camelot-ext/camelot-mongodb/pom.xml +++ b/camelot-ext/camelot-mongodb/pom.xml @@ -10,7 +10,7 @@ Camelot MongoDB Extension camelot-mongodb - 1.2 + 1.3 From 368b0d10c85e608e2ee3489d0ec85634b41faa86 Mon Sep 17 00:00:00 2001 From: Ilya Sadykov Date: Fri, 20 Nov 2015 19:24:59 +0300 Subject: [PATCH 2/2] Trying to solve odd messages issue --- .../camelot/test/core/CamelotTestListener.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/camelot-test/src/main/java/ru/yandex/qatools/camelot/test/core/CamelotTestListener.java b/camelot-test/src/main/java/ru/yandex/qatools/camelot/test/core/CamelotTestListener.java index b312616..6eb9a58 100644 --- a/camelot-test/src/main/java/ru/yandex/qatools/camelot/test/core/CamelotTestListener.java +++ b/camelot-test/src/main/java/ru/yandex/qatools/camelot/test/core/CamelotTestListener.java @@ -27,6 +27,7 @@ import static java.lang.String.format; import static java.lang.System.currentTimeMillis; +import static java.lang.Thread.sleep; import static java.util.stream.Collectors.toList; import static org.apache.camel.util.CamelContextHelper.getEndpointInjection; import static org.apache.camel.util.ObjectHelper.isEmpty; @@ -163,9 +164,14 @@ private void clearContext(TestContext testContext) throws Exception { //NOSONAR while (!camelContext.getInflightRepository().browse().isEmpty() && currentTimeMillis() - waitStartedTime < MAX_INFLIGHT_WAIT_MS) { camelContext.getInflightRepository().browse().stream().collect(toList()).forEach(e -> { - camelContext.getInflightRepository().remove(e.getExchange()); - logger.warn("Removing inflight exchange {} for route {}", - e.getExchange().getExchangeId(), e.getRouteId()); + try { + camelContext.getInflightRepository().remove(e.getExchange()); + logger.warn("Removing inflight exchange {} for route {}", + e.getExchange().getExchangeId(), e.getRouteId()); + sleep(20); // Giving some time for camel to fetch new inflights + } catch (InterruptedException ignored) { + throw new RuntimeException(ignored); + } }); } try {