diff --git a/README.md b/README.md index e70bc20..74846e9 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ This application crawls from [Maven Central Incremental Index Repository](https: Running this application will follow this repository and outputs the __unique artifacts__ released on Maven central. Currently, Maven Central releases a new (incremental) index __every week__. -Several outputs exist including Kafka. REST API support will be added soon. Moreover, a checkpointing mechanism is added to support persistence across restarts. +Several outputs exist including Kafka and HTTP support. Moreover, a checkpointing mechanism is added to support persistence across restarts. More specifically, the `checkpointDir` stores an `INDEX.index` file where the `INDEX` is the _next_ index to crawl. E.g. when `800.index` is stored, the crawler will start crawling _including_ index 800. ## Usage @@ -20,8 +20,13 @@ usage: IncrementalMavenCrawler crawled index. Used for recovery on crash or restart. Optional. -kb,--kafka_brokers Kafka brokers to connect with. I.e. - broker1:port,broker2:port,... Optional. + broker1:port,broker2:port,... + Required for Kafka output. -kt,--kafka_topic Kafka topic to produce to. + Required for Kafka output. + -re,--rest_endpoint HTTP endpoint to post crawled batches to. + Required for Rest output. + ``` ### Outputs diff --git a/pom.xml b/pom.xml index 035e81f..48cb5a4 100644 --- a/pom.xml +++ b/pom.xml @@ -119,7 +119,11 @@ 1.10.19 test - + + org.apache.httpcomponents + httpclient + 4.5.13 + diff --git a/src/main/java/eu/fasten/crawler/CrawlIndex.java b/src/main/java/eu/fasten/crawler/CrawlIndex.java index 17afa0e..ca012ec 100644 --- a/src/main/java/eu/fasten/crawler/CrawlIndex.java +++ b/src/main/java/eu/fasten/crawler/CrawlIndex.java @@ -99,33 +99,28 @@ public List getIndexCreators() * @param output the class to output to (E.g. Kafka). * @param batchSize the batch size to send to the output. */ - public void crawlAndSend(Output output, int batchSize) { + public boolean crawlAndSend(Output output, int batchSize) { nonUnique = 0; - Set artifactSet = new HashSet<>(); + HashSet artifactSet = new HashSet<>(); // Setup output. output.open(); - IndexDataReader.IndexDataReadVisitor visitor = (doc) -> { - MavenArtifact artifact = MavenArtifact.fromDocument(doc, context); - if (artifact == null) { - logger.warn("Couldn't construct artifact info for document: " + doc.toString() + ". We will skip it."); - return; - } - - if (artifactSet.contains(artifact)) { - nonUnique += 1; - } else { - artifactSet.add(artifact); - } - }; + IndexDataReader.IndexDataReadVisitor visitor = setupUniqueVisitor(artifactSet); try { IndexDataReader.IndexDataReadResult result = reader.readIndex(visitor, context); // Send to output. final List> batchedLists = Lists.partition(Lists.newArrayList(artifactSet), batchSize); - batchedLists.forEach((l) -> output.send(l)); + for (List artifacts : batchedLists) { + boolean res = output.send(artifacts); + + if (!res) { + logger.error("Failed sending batch to ouput for index " + index + ". Exiting current crawl session."); + return false; + } + } // Flush and close output. output.flush(); @@ -137,11 +132,36 @@ public void crawlAndSend(Output output, int batchSize) { logger.info("Unique documents: " + artifactSet.size()); logger.info("Total documents: " + result.getDocumentCount()); } catch (IOException e) { - logger.error("IOException while reading from the index", e); - throw new RuntimeException("Now exiting due to IOExcepton."); + logger.error("IOException while reading from the index. " + index + ". Exiting current crawl session.", e); + return false; } finally { nonUnique = 0; } + + return true; + } + + /** + * Setup DataReadVisitor which fills a set with unique artifacts. + * @param artifactSet reference to the set which will be filled with unique artifacts. + * @return the visitor. + */ + public IndexDataReader.IndexDataReadVisitor setupUniqueVisitor(HashSet artifactSet) { + IndexDataReader.IndexDataReadVisitor visitor = (doc) -> { + MavenArtifact artifact = MavenArtifact.fromDocument(doc, context); + if (artifact == null) { + logger.warn("Couldn't construct artifact info for document: " + doc.toString() + ". We will skip it."); + return; + } + + if (artifactSet.contains(artifact)) { + nonUnique += 1; + } else { + artifactSet.add(artifact); + } + }; + + return visitor; } } diff --git a/src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java b/src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java index ba487f5..c971b9c 100644 --- a/src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java +++ b/src/main/java/eu/fasten/crawler/IncrementalMavenCrawler.java @@ -1,8 +1,6 @@ package eu.fasten.crawler; -import eu.fasten.crawler.output.KafkaOutput; -import eu.fasten.crawler.output.Output; -import eu.fasten.crawler.output.StdOutput; +import eu.fasten.crawler.output.*; import org.apache.commons.cli.*; import org.codehaus.plexus.util.FileUtils; import org.slf4j.Logger; @@ -76,15 +74,15 @@ public class IncrementalMavenCrawler implements Runnable { .desc("Kafka brokers to connect with. I.e. broker1:port,broker2:port,... Optional.") .build(); - public static void main(String[] args) { - options.addOption(optStartIndex); - options.addOption(optBatchSize); - options.addOption(optOutputType); - options.addOption(optCrawlInterval); - options.addOption(optCheckpointDir); - options.addOption(optKafkaTopic); - options.addOption(optKafkaBrokers); + static Option optRestEndpoint = Option.builder("re") + .longOpt("rest_endpoint") + .hasArg() + .argName("url") + .desc("HTTP endpoint to post crawled batches to.") + .build(); + public static void main(String[] args) { + addOptions(); CommandLineParser parser = new DefaultParser(); Properties properties; @@ -96,16 +94,11 @@ public static void main(String[] args) { } // Setup arguments for crawler. - Output output = new StdOutput(); int batchSize = Integer.parseInt(properties.getProperty("batch_size")); int startIndex = Integer.parseInt(properties.getProperty("index")); int interval = Integer.parseInt(properties.getProperty("interval")); String checkpointDir = properties.getProperty("checkpoint_dir"); - - // Setup Kafka. - if (properties.get("output").equals("kafka")) { - output = new KafkaOutput(properties.getProperty("kafka_topic"), properties.getProperty("kafka_brokers"), batchSize); - } + Output output = OutputFactory.getOutput(properties.getProperty("output"), properties); // Start cralwer and execute it with an interval. IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(startIndex, batchSize, output, checkpointDir); @@ -113,6 +106,17 @@ public static void main(String[] args) { service.scheduleAtFixedRate(crawler, 0, interval, TimeUnit.HOURS); } + public static void addOptions() { + options.addOption(optStartIndex); + options.addOption(optBatchSize); + options.addOption(optOutputType); + options.addOption(optCrawlInterval); + options.addOption(optCheckpointDir); + options.addOption(optKafkaTopic); + options.addOption(optKafkaBrokers); + options.addOption(optRestEndpoint); + } + /** * Verify and stores arguments in properties instance. * @param cmd the parsed command line arguments. @@ -126,6 +130,10 @@ public static Properties verifyAndParseArguments(CommandLine cmd) throws ParseEx throw new ParseException("Configured output to be Kafka, but no `kafka_topic` or `kafka_brokers` have been configured."); } + if (cmd.getOptionValue("output").equals("rest") && !(cmd.hasOption("rest_endpoint"))) { + throw new ParseException("Configured output to be Rest, but no `rest_endpoint` has been configured."); + } + props.setProperty("index", cmd.getOptionValue("start_index", "0")); props.setProperty("batch_size", cmd.getOptionValue("batch_size", "50")); props.setProperty("output", cmd.getOptionValue("output", "std")); @@ -133,6 +141,7 @@ public static Properties verifyAndParseArguments(CommandLine cmd) throws ParseEx props.setProperty("checkpoint_dir", cmd.getOptionValue("checkpoint_dir", "")); props.setProperty("kafka_topic", cmd.getOptionValue("kafka_topic", "")); props.setProperty("kafka_brokers", cmd.getOptionValue("kafka_brokers", "")); + props.setProperty("rest_endpoint", cmd.getOptionValue("rest_endpoint", "")); return props; } @@ -163,6 +172,8 @@ public IncrementalMavenCrawler(int startIndex, int batchSize, Output output, Str if (this.index > startIndex) { logger.info("Found (checkpointed) index in " + checkpointDir + ". Will start crawling from index " + this.index); } + + logger.info("Starting IncrementalMavenCrawler with index: " + this.index + ", batch size: " + batchSize + " and output " + output.getClass().getSimpleName() + "."); } /** @@ -211,15 +222,18 @@ public void run() { // Setup crawler. CrawlIndex crawlIndex = new CrawlIndex(index, indexFile); - crawlIndex.crawlAndSend(output, batchSize); + boolean success = crawlIndex.crawlAndSend(output, batchSize); // Delete the index file. indexFile.delete(); - logger.info("Index " + index + " successfully crawled."); - - // Update (and increment) the index. - updateIndex(); + if (success) { + logger.info("Index " + index + " successfully crawled."); + // Update (and increment) the index. + updateIndex(); + } else { + logger.warn("Failed crawling index " + index + ". Will retry on next interval."); + } } /** diff --git a/src/main/java/eu/fasten/crawler/output/KafkaOutput.java b/src/main/java/eu/fasten/crawler/output/KafkaOutput.java index 93a641d..2607be1 100644 --- a/src/main/java/eu/fasten/crawler/output/KafkaOutput.java +++ b/src/main/java/eu/fasten/crawler/output/KafkaOutput.java @@ -53,20 +53,25 @@ public void flush() { * @param artifact the artifacts (we expect it to be of size batch size). */ @Override - public void send(List artifact) { + public boolean send(List artifact) { List> records = artifact .stream() .map((x) -> new ProducerRecord(topic, null, x.getTimestamp(), null, x.toString())) .collect(Collectors.toList()); - records.stream().map((r) -> producer.send(r)).parallel().forEach((f) -> { + boolean result = records.stream().map((r) -> producer.send(r)).parallel().map((f) -> { try { f.get(); + return true; } catch (InterruptedException e) { e.printStackTrace(); + return false; } catch (ExecutionException e) { e.printStackTrace(); + return false; } - }); + }).noneMatch((x) -> x == false); + + return result; } } diff --git a/src/main/java/eu/fasten/crawler/output/Output.java b/src/main/java/eu/fasten/crawler/output/Output.java index 9cb2060..71aa1ed 100644 --- a/src/main/java/eu/fasten/crawler/output/Output.java +++ b/src/main/java/eu/fasten/crawler/output/Output.java @@ -8,13 +8,14 @@ public interface Output { /** Helper methods for constructing and cleaning up the output instance. **/ - void open(); - void close(); - void flush(); + default void open() {} + default void close() {} + default void flush() {} /** Send records to output. **/ - default void send(MavenArtifact artifact) { - send(Arrays.asList(artifact)); + default boolean send(MavenArtifact artifact) { + return send(Arrays.asList(artifact)); } - void send(List artifact); + + boolean send(List artifact); } diff --git a/src/main/java/eu/fasten/crawler/output/OutputFactory.java b/src/main/java/eu/fasten/crawler/output/OutputFactory.java new file mode 100644 index 0000000..155358d --- /dev/null +++ b/src/main/java/eu/fasten/crawler/output/OutputFactory.java @@ -0,0 +1,17 @@ +package eu.fasten.crawler.output; + +import java.util.Properties; + +public class OutputFactory { + + public static Output getOutput(String outputName, Properties properties) { + switch (outputName) { + case "kafka": + return new KafkaOutput(properties.getProperty("kafka_topic"), properties.getProperty("kafka_brokers"), Integer.parseInt(properties.getProperty("batch_size"))); + case "rest": + return new RestOutput(properties.getProperty("rest_endpoint")); + default: + return new StdOutput(); + } + } +} diff --git a/src/main/java/eu/fasten/crawler/output/RestOutput.java b/src/main/java/eu/fasten/crawler/output/RestOutput.java new file mode 100644 index 0000000..79086e4 --- /dev/null +++ b/src/main/java/eu/fasten/crawler/output/RestOutput.java @@ -0,0 +1,90 @@ +package eu.fasten.crawler.output; + +import eu.fasten.crawler.data.MavenArtifact; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.HttpClients; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.List; + +public class RestOutput implements Output { + + private Logger logger = LoggerFactory.getLogger(this.getClass()); + + private String endpoint; + + /** + * Setup RestOutput. + * @param endpoint the http url to POST to. + */ + public RestOutput(String endpoint) { + this.endpoint = endpoint; + } + + @Override + public boolean send(List artifacts) { + // Setup connections. + HttpClient httpClient = constructHttpClient(); + HttpPost httpPost = constructPostRequest(); + + try { + // Send batch. + StringEntity jsonList = new StringEntity(buildJsonList(artifacts)); + httpPost.setEntity(jsonList); + int responseCode = httpClient.execute(httpPost).getStatusLine().getStatusCode(); + + // If we don't get a 200, return false. + if (responseCode == 200) { + return true; + } else { + logger.error("Expected response 200, but got " + responseCode); + return false; + } + } catch (IOException e) { + logger.error("Failed sending to rest endpoint. ", e); + return false; + } + } + + /** + * Builds a json list of all artifacts. + * @param artifacts all artifacts. + * @return a stringified list of all artifacts. + */ + public String buildJsonList(List artifacts) { + // Build array of JSON objects. + StringBuffer json = new StringBuffer(); + json.append("["); + + for (MavenArtifact af : artifacts) { + json.append(af.toString() + ","); + } + + json.deleteCharAt(json.length() - 1); + json.append("]"); + + return json.toString(); + } + + /** + * Constructs a PostRequest. + * @return a HTTPPost. + */ + public HttpPost constructPostRequest() { + return new HttpPost(this.endpoint); + } + + /** + * Constructs a HTTPClient. + * @return a HttpClient. + */ + public HttpClient constructHttpClient() { + return HttpClients.createDefault(); + } +} diff --git a/src/main/java/eu/fasten/crawler/output/StdOutput.java b/src/main/java/eu/fasten/crawler/output/StdOutput.java index 029a67e..5644c49 100644 --- a/src/main/java/eu/fasten/crawler/output/StdOutput.java +++ b/src/main/java/eu/fasten/crawler/output/StdOutput.java @@ -6,21 +6,13 @@ public class StdOutput implements Output { - @Override - public void open() {} - - @Override - public void close() {} - - @Override - public void flush() {} - /** * Prints the artifacts to the screen. * @param artifact list of artifacts. */ @Override - public void send(List artifact) { + public boolean send(List artifact) { artifact.stream().map((a) -> a.toString()).forEach((a) -> {System.out.println(a);}); + return true; } } diff --git a/src/test/java/eu/fasten/crawler/CrawlIndexTest.java b/src/test/java/eu/fasten/crawler/CrawlIndexTest.java index c5f6028..e19e13c 100644 --- a/src/test/java/eu/fasten/crawler/CrawlIndexTest.java +++ b/src/test/java/eu/fasten/crawler/CrawlIndexTest.java @@ -12,7 +12,7 @@ import java.util.List; import java.util.concurrent.Future; -import static org.junit.Assert.assertThrows; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyList; import static org.mockito.Mockito.*; @@ -38,12 +38,30 @@ public void testIndexSetupFullRun() { CrawlIndex index = new CrawlIndex(600, f); StdOutput mockStd = mock(StdOutput.class); - index.crawlAndSend(mockStd, 50); + when(mockStd.send(anyList())).thenReturn(true); + + boolean res = index.crawlAndSend(mockStd, 50); + + verify(mockStd, atLeastOnce()).send(anyList()); + assertTrue(res); + f.delete(); + } + + @Test + public void testIndexSetupFullRunFailure() { + File f = DownloadIndex.download(600); + CrawlIndex index = new CrawlIndex(600, f); + StdOutput mockStd = mock(StdOutput.class); + + when(mockStd.send(anyList())).thenReturn(false); + boolean res = index.crawlAndSend(mockStd, 50); verify(mockStd, atLeastOnce()).send(anyList()); + assertFalse(res); f.delete(); } + @Test public void testIndexSetupFullRunKafka() throws IllegalAccessException { File f = DownloadIndex.download(600); @@ -58,9 +76,10 @@ public void testIndexSetupFullRunKafka() throws IllegalAccessException { FieldUtils.writeField(kafkaOutput, "producer", prod, true); - index.crawlAndSend(kafkaOutput, 50); + boolean res = index.crawlAndSend(kafkaOutput, 50); verify(kafkaOutput, atLeastOnce()).send(anyList()); + assertTrue(res); f.delete(); } } diff --git a/src/test/java/eu/fasten/crawler/IncrementalMavenCrawlerTest.java b/src/test/java/eu/fasten/crawler/IncrementalMavenCrawlerTest.java index 9d5e3ae..7070cee 100644 --- a/src/test/java/eu/fasten/crawler/IncrementalMavenCrawlerTest.java +++ b/src/test/java/eu/fasten/crawler/IncrementalMavenCrawlerTest.java @@ -22,7 +22,7 @@ public static void beforeAll() { @Test public void testCheckpointDisabled() { int index = 0; - IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, null, ""); + IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, new StdOutput(), ""); assertEquals(index, crawler.getIndex()); } @@ -34,7 +34,7 @@ public void testCheckpointEnabledNoOverride() throws IOException { file.mkdirs(); file.createNewFile(); - IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, null, "src/test/resources/"); + IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, new StdOutput(), "src/test/resources/"); assertEquals(index, crawler.getIndex()); file.delete(); } @@ -47,7 +47,7 @@ public void testCheckpointEnabledOverride() throws IOException { file.mkdirs(); file.createNewFile(); - IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, null, "src/test/resources/"); + IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, new StdOutput(), "src/test/resources/"); assertNotEquals(index, crawler.getIndex()); assertEquals(1, crawler.getIndex()); file.delete(); @@ -65,7 +65,7 @@ public void testCheckpointEnabledMultipleOverride() throws IOException { file2.mkdirs(); file2.createNewFile(); - IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, null, "src/test/resources/"); + IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, new StdOutput(), "src/test/resources/"); assertNotEquals(index, crawler.getIndex()); assertEquals(5, crawler.getIndex()); @@ -79,7 +79,7 @@ public void testUpdateIndexNoCheckpoint() throws IOException { File file = new File("src/test/resources/1.index"); - IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, null, ""); + IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, new StdOutput(), ""); crawler.updateIndex(); assertFalse(file.exists()); @@ -91,7 +91,7 @@ public void testUpdateIndexCheckpoint() throws IOException { int index = 0; - IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, null, "src/test/resources/"); + IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, new StdOutput(), "src/test/resources/"); crawler.updateIndex(); File file = new File("src/test/resources/1.index"); @@ -113,7 +113,7 @@ public void testUpdateIndexCheckpointEnabledMultipleOverride() throws IOExceptio file2.mkdirs(); file2.createNewFile(); - IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, null, "src/test/resources/"); + IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, new StdOutput(), "src/test/resources/"); crawler.updateIndex(); assertFalse(file.exists()); assertFalse(file2.exists()); @@ -126,7 +126,7 @@ public void testUpdateIndexCheckpointEnabledMultipleOverride() throws IOExceptio @Test public void testNonExistentIndex() { int index = 9999999; - IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, null, "src/test/resources/"); + IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 0, new StdOutput(), "src/test/resources/"); crawler.run(); assertEquals(index, crawler.getIndex()); } @@ -138,8 +138,7 @@ public void testSuccessfulCrawl() { IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 50, stdOutput, "src/test/resources/"); - doNothing().when(stdOutput).send(anyList()); - + when(stdOutput.send(anyList())).thenReturn(true); crawler.run(); assertTrue(new File("src/test/resources/" + (index + 1) + ".index").exists()); @@ -148,4 +147,18 @@ public void testSuccessfulCrawl() { new File("src/test/resources/" + (index + 1) + ".index").delete(); } + + @Test + public void testFailedCrawl() { + int index = 680; + StdOutput stdOutput = spy(new StdOutput()); + + IncrementalMavenCrawler crawler = new IncrementalMavenCrawler(index, 50, stdOutput, "src/test/resources/"); + + when(stdOutput.send(anyList())).thenReturn(false); + crawler.run(); + + verify(stdOutput, atLeastOnce()).send(anyList()); + assertEquals(index, crawler.getIndex()); + } } diff --git a/src/test/java/eu/fasten/crawler/output/KafkaOutputTest.java b/src/test/java/eu/fasten/crawler/output/KafkaOutputTest.java new file mode 100644 index 0000000..2d90df1 --- /dev/null +++ b/src/test/java/eu/fasten/crawler/output/KafkaOutputTest.java @@ -0,0 +1,63 @@ +package eu.fasten.crawler.output; + +import eu.fasten.crawler.data.MavenArtifact; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.junit.Test; +import org.mockito.Spy; + +import java.util.List; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doReturn; + +public class KafkaOutputTest { + + @Test + public void testKafkaOutputSuccessfulSend() throws IllegalAccessException { + MavenArtifact artifactOne = new MavenArtifact("a", "g", "1", 0L); + MavenArtifact artifactTwo = new MavenArtifact("a", "g", "2", 0L); + + KafkaOutput kafkaOutput = spy(new KafkaOutput("", "", 50)); + + KafkaProducer prod = mock(KafkaProducer.class); + Future fut = mock(Future.class); + + doNothing().when(kafkaOutput).open(); + doReturn(fut).when(prod).send(any()); + + FieldUtils.writeField(kafkaOutput, "producer", prod, true); + + boolean res = kafkaOutput.send(List.of(artifactOne, artifactTwo)); + assertTrue(res); + } + + @Test + public void testKafkaOutputFailedSend() throws Exception { + MavenArtifact artifactOne = new MavenArtifact("a", "g", "1", 0L); + MavenArtifact artifactTwo = new MavenArtifact("a", "g", "2", 0L); + + KafkaOutput kafkaOutput = spy(new KafkaOutput("", "", 50)); + + KafkaProducer prod = mock(KafkaProducer.class); + Future fut = mock(Future.class); + Future futTwo = mock(Future.class); + + doNothing().when(kafkaOutput).open(); + doReturn(fut).when(prod).send(new ProducerRecord("", null, artifactOne.getTimestamp(), null, artifactOne.toString())); + doReturn(futTwo).when(prod).send(new ProducerRecord("", null, artifactTwo.getTimestamp(), null, artifactTwo.toString())); + + when(futTwo.get()).thenThrow(InterruptedException.class); + + FieldUtils.writeField(kafkaOutput, "producer", prod, true); + + boolean res = kafkaOutput.send(List.of(artifactOne, artifactTwo)); + assertFalse(res); + } +} diff --git a/src/test/java/eu/fasten/crawler/output/RestOutputTest.java b/src/test/java/eu/fasten/crawler/output/RestOutputTest.java new file mode 100644 index 0000000..33b3e03 --- /dev/null +++ b/src/test/java/eu/fasten/crawler/output/RestOutputTest.java @@ -0,0 +1,98 @@ +package eu.fasten.crawler.output; + +import eu.fasten.crawler.data.MavenArtifact; +import org.apache.http.HttpResponse; +import org.apache.http.StatusLine; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.HttpClients; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class RestOutputTest { + + @Test + public void testRestOutputFailed() throws Exception { + RestOutput output = spy(new RestOutput("")); + + HttpClient client = mock(HttpClient.class); + HttpPost post = mock(HttpPost.class); + + when(output.constructHttpClient()).thenReturn(client); + when(output.constructPostRequest()).thenReturn(post); + + when(client.execute(post)).thenThrow(IOException.class); + boolean res = output.send(new MavenArtifact("a", "b", "c", 0L)); + + assertFalse(res); + } + + @Test + public void testRestOutputFailedStatus() throws Exception { + RestOutput output = spy(new RestOutput("")); + + HttpClient client = mock(HttpClient.class); + HttpPost post = mock(HttpPost.class); + HttpResponse response = mock(HttpResponse.class); + StatusLine line = mock(StatusLine.class); + + when(output.constructHttpClient()).thenReturn(client); + when(output.constructPostRequest()).thenReturn(post); + when(client.execute(post)).thenReturn(response); + when(response.getStatusLine()).thenReturn(line); + when(line.getStatusCode()).thenReturn(201); + + boolean res = output.send(new MavenArtifact("a", "b", "c", 0L)); + + assertFalse(res); + } + + @Test + public void testRestOutputSuccessStatus() throws Exception { + RestOutput output = spy(new RestOutput("")); + + HttpClient client = mock(HttpClient.class); + HttpPost post = mock(HttpPost.class); + HttpResponse response = mock(HttpResponse.class); + StatusLine line = mock(StatusLine.class); + + when(output.constructHttpClient()).thenReturn(client); + when(output.constructPostRequest()).thenReturn(post); + when(client.execute(post)).thenReturn(response); + when(response.getStatusLine()).thenReturn(line); + when(line.getStatusCode()).thenReturn(200); + + boolean res = output.send(new MavenArtifact("a", "b", "c", 0L)); + + assertTrue(res); + } + + @Test + public void testVerifyJSON() throws Exception { + RestOutput output = spy(new RestOutput("")); + MavenArtifact af = new MavenArtifact("a", "b", "c", 0L); + String list = output.buildJsonList(List.of(af)); + + + assertEquals("[" + af.toString() + "]", list); + } + + @Test + public void testVerifyJSONList() throws Exception { + RestOutput output = spy(new RestOutput("")); + MavenArtifact af = new MavenArtifact("a", "b", "c", 0L); + MavenArtifact af2 = new MavenArtifact("a", "b", "d", 0L); + String list = output.buildJsonList(List.of(af, af2)); + + + System.out.println(list); + + assertEquals("[" + af.toString() + "," + af2 + "]", list); + } +}