From 88a3ef6d9bbc793272e6a16b53b8a38a3e652381 Mon Sep 17 00:00:00 2001 From: Annapurna V M S S Date: Wed, 28 Nov 2012 12:00:53 +0530 Subject: [PATCH 1/4] Changed split of filename method --- .../main/java/com/inmobi/databus/Cluster.java | 2 +- .../databus/distcp/DistcpBaseService.java | 24 ++++++++++++------- .../databus/distcp/MergedStreamService.java | 3 ++- .../databus/distcp/TestDistCPBaseService.java | 6 +++++ 4 files changed, 25 insertions(+), 10 deletions(-) diff --git a/databus-core/src/main/java/com/inmobi/databus/Cluster.java b/databus-core/src/main/java/com/inmobi/databus/Cluster.java index 6630457..a5fd5d2 100644 --- a/databus-core/src/main/java/com/inmobi/databus/Cluster.java +++ b/databus-core/src/main/java/com/inmobi/databus/Cluster.java @@ -36,7 +36,7 @@ public class Cluster { private final Map consumeStreams; private final Set sourceStreams; private final Configuration hadoopConf; - + public Cluster(Map clusterElementsMap, String rootDir, Map consumeStreams, Set sourceStreams) throws Exception { diff --git a/databus-worker/src/main/java/com/inmobi/databus/distcp/DistcpBaseService.java b/databus-worker/src/main/java/com/inmobi/databus/distcp/DistcpBaseService.java index de868df..9528111 100644 --- a/databus-worker/src/main/java/com/inmobi/databus/distcp/DistcpBaseService.java +++ b/databus-worker/src/main/java/com/inmobi/databus/distcp/DistcpBaseService.java @@ -126,17 +126,25 @@ protected Boolean executeDistCp(DistCpOptions options) */ protected abstract Path getInputPath() throws IOException; - protected String getCategoryFromFileName(String fileName) { - LOG.debug("Splitting [" + fileName + "] on -"); - if (fileName != null && fileName.length() > 1 && fileName.contains("-")) { - StringTokenizer tokenizer = new StringTokenizer(fileName, "-"); - tokenizer.nextToken(); //skip collector name - String catgeory = tokenizer.nextToken(); - return catgeory; + protected String getCategoryFromFileName(String fileName, Set streamsSet) { + for( String streamName : streamsSet){ + String strs[] = fileName.split(streamName); + if(strs.length==2){ + LOG.info("on split "+strs[0]+" "+strs[1]); + if (checkCorrectDateFormat(strs[1])) + return streamName; + } } return null; } - + + protected boolean checkCorrectDateFormat(String timestamp){ + LOG.info("in checkCorrectDateFormat: "+timestamp); + return timestamp.matches("^.[0-9]{4}.[0-9]{2}.[0-9]{2}.[0-9]{2}.[0-9]{2}.[0-9]{5}.gz$"); + } + + + @Override public long getMSecondsTillNextRun(long currentTime) { long runIntervalInSec = (DEFAULT_RUN_INTERVAL/1000); diff --git a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java index ac49b93..a1a2aca 100644 --- a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java +++ b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java @@ -262,7 +262,8 @@ private Map> prepareForCommit(Path tmpOut) for (int i = 0; i < allFiles.length; i++) { String fileName = allFiles[i].getPath().getName(); if (fileName != null) { - String category = getCategoryFromFileName(fileName); + LOG.info("ClusterName: "+getDestCluster().getName()+" Merge stream: "+getDestCluster().getPrimaryDestinationStreams()); + String category = getCategoryFromFileName(fileName, getDestCluster().getPrimaryDestinationStreams()); if (category != null) { Path intermediatePath = new Path(tmpOut, category); if (!getDestFs().exists(intermediatePath)) diff --git a/databus-worker/src/test/java/com/inmobi/databus/distcp/TestDistCPBaseService.java b/databus-worker/src/test/java/com/inmobi/databus/distcp/TestDistCPBaseService.java index 970e99a..e93f90c 100644 --- a/databus-worker/src/test/java/com/inmobi/databus/distcp/TestDistCPBaseService.java +++ b/databus-worker/src/test/java/com/inmobi/databus/distcp/TestDistCPBaseService.java @@ -27,6 +27,7 @@ public class TestDistCPBaseService { FileSystem localFs; Cluster cluster; MirrorStreamService service = null; + MergedStreamService mergeService = null; String expectedFileName1 = "/tmp/com.inmobi.databus.distcp" + ".TestDistCPBaseService/data-file1"; String expectedFileName2 = "/tmp/com.inmobi.databus.distcp" + @@ -161,4 +162,9 @@ public void testNegative() throws Exception { } + @Test + public void testSplitFileName() throws Exception { + cleanUP(); + + } } From df157cc4a333a4a64aa69ed0d090645a46554565 Mon Sep 17 00:00:00 2001 From: Annapurna V M S S Date: Wed, 28 Nov 2012 12:50:41 +0530 Subject: [PATCH 2/4] test added for getting stream name from filename --- .../main/java/com/inmobi/databus/Cluster.java | 2 +- .../databus/distcp/DistcpBaseService.java | 10 ++---- .../databus/distcp/MergedStreamService.java | 4 +-- .../databus/distcp/TestDistCPBaseService.java | 34 +++++++++++++++++-- 4 files changed, 37 insertions(+), 13 deletions(-) diff --git a/databus-core/src/main/java/com/inmobi/databus/Cluster.java b/databus-core/src/main/java/com/inmobi/databus/Cluster.java index a5fd5d2..6630457 100644 --- a/databus-core/src/main/java/com/inmobi/databus/Cluster.java +++ b/databus-core/src/main/java/com/inmobi/databus/Cluster.java @@ -36,7 +36,7 @@ public class Cluster { private final Map consumeStreams; private final Set sourceStreams; private final Configuration hadoopConf; - + public Cluster(Map clusterElementsMap, String rootDir, Map consumeStreams, Set sourceStreams) throws Exception { diff --git a/databus-worker/src/main/java/com/inmobi/databus/distcp/DistcpBaseService.java b/databus-worker/src/main/java/com/inmobi/databus/distcp/DistcpBaseService.java index 9528111..b3542ee 100644 --- a/databus-worker/src/main/java/com/inmobi/databus/distcp/DistcpBaseService.java +++ b/databus-worker/src/main/java/com/inmobi/databus/distcp/DistcpBaseService.java @@ -126,11 +126,10 @@ protected Boolean executeDistCp(DistCpOptions options) */ protected abstract Path getInputPath() throws IOException; - protected String getCategoryFromFileName(String fileName, Set streamsSet) { + protected static String getCategoryFromFileName(String fileName, Set streamsSet) { for( String streamName : streamsSet){ String strs[] = fileName.split(streamName); if(strs.length==2){ - LOG.info("on split "+strs[0]+" "+strs[1]); if (checkCorrectDateFormat(strs[1])) return streamName; } @@ -138,12 +137,9 @@ protected String getCategoryFromFileName(String fileName, Set streamsSet return null; } - protected boolean checkCorrectDateFormat(String timestamp){ - LOG.info("in checkCorrectDateFormat: "+timestamp); + protected static boolean checkCorrectDateFormat(String timestamp){ return timestamp.matches("^.[0-9]{4}.[0-9]{2}.[0-9]{2}.[0-9]{2}.[0-9]{2}.[0-9]{5}.gz$"); - } - - + } @Override public long getMSecondsTillNextRun(long currentTime) { diff --git a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java index a1a2aca..4f872db 100644 --- a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java +++ b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java @@ -262,8 +262,8 @@ private Map> prepareForCommit(Path tmpOut) for (int i = 0; i < allFiles.length; i++) { String fileName = allFiles[i].getPath().getName(); if (fileName != null) { - LOG.info("ClusterName: "+getDestCluster().getName()+" Merge stream: "+getDestCluster().getPrimaryDestinationStreams()); - String category = getCategoryFromFileName(fileName, getDestCluster().getPrimaryDestinationStreams()); + String category = getCategoryFromFileName(fileName, getDestCluster() + .getPrimaryDestinationStreams()); if (category != null) { Path intermediatePath = new Path(tmpOut, category); if (!getDestFs().exists(intermediatePath)) diff --git a/databus-worker/src/test/java/com/inmobi/databus/distcp/TestDistCPBaseService.java b/databus-worker/src/test/java/com/inmobi/databus/distcp/TestDistCPBaseService.java index e93f90c..4eac982 100644 --- a/databus-worker/src/test/java/com/inmobi/databus/distcp/TestDistCPBaseService.java +++ b/databus-worker/src/test/java/com/inmobi/databus/distcp/TestDistCPBaseService.java @@ -27,7 +27,6 @@ public class TestDistCPBaseService { FileSystem localFs; Cluster cluster; MirrorStreamService service = null; - MergedStreamService mergeService = null; String expectedFileName1 = "/tmp/com.inmobi.databus.distcp" + ".TestDistCPBaseService/data-file1"; String expectedFileName2 = "/tmp/com.inmobi.databus.distcp" + @@ -164,7 +163,36 @@ public void testNegative() throws Exception { @Test public void testSplitFileName() throws Exception { - cleanUP(); - + Set streamsSet = new HashSet(); + streamsSet.add("test-stream"); + streamsSet.add("test_stream"); + streamsSet.add("test_streams"); + streamsSet.add("test_stream_2"); + // file name in which collector name has hyphen + String fileName1 = "databus-test-test_stream-2012-11-27-21-20_00000.gz"; + // file name in which stream name has hyphen + String fileName2 = "databus_test-test-stream-2012-11-27-21-20_00000.gz"; + // file name in which stream name is subset of another stream name in the + // streamsSet + String fileName3 = "databus_test-test_streams-2012-11-27-21-20_00000.gz"; + String fileName4 = "databus_test-test_stream_2-2012-11-27-21-20_00000.gz"; + //file name in which stream name is not in streamsSet passed + String fileName5 = "databus_test-test_stream-2-2012-11-27-21-20_00000.gz"; + // get stream names from file name + String expectedStreamName1 = MergedStreamService.getCategoryFromFileName( + fileName1, streamsSet); + String expectedStreamName2 = MergedStreamService.getCategoryFromFileName( + fileName2, streamsSet); + String expectedStreamName3 = MergedStreamService.getCategoryFromFileName( + fileName3, streamsSet); + String expectedStreamName4 = MergedStreamService.getCategoryFromFileName( + fileName4, streamsSet); + String expectedStreamName5 = MergedStreamService.getCategoryFromFileName( + fileName5, streamsSet); + assert expectedStreamName1.compareTo("test_stream") == 0; + assert expectedStreamName2.compareTo("test-stream") == 0; + assert expectedStreamName3.compareTo("test_streams") == 0; + assert expectedStreamName4.compareTo("test_stream_2") == 0; + assert expectedStreamName5 == null; } } From 0a99e36d65c2fd5e56a95864ca2f4a72f4b0abd0 Mon Sep 17 00:00:00 2001 From: Annapurna V M S S Date: Wed, 28 Nov 2012 13:01:56 +0530 Subject: [PATCH 3/4] formatting corrected --- .../inmobi/databus/distcp/DistcpBaseService.java | 16 +++++++++------- .../databus/distcp/TestDistCPBaseService.java | 2 +- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/databus-worker/src/main/java/com/inmobi/databus/distcp/DistcpBaseService.java b/databus-worker/src/main/java/com/inmobi/databus/distcp/DistcpBaseService.java index b3542ee..60efb82 100644 --- a/databus-worker/src/main/java/com/inmobi/databus/distcp/DistcpBaseService.java +++ b/databus-worker/src/main/java/com/inmobi/databus/distcp/DistcpBaseService.java @@ -126,20 +126,22 @@ protected Boolean executeDistCp(DistCpOptions options) */ protected abstract Path getInputPath() throws IOException; - protected static String getCategoryFromFileName(String fileName, Set streamsSet) { - for( String streamName : streamsSet){ + protected static String getCategoryFromFileName(String fileName, + Set streamsSet) { + for (String streamName : streamsSet) { String strs[] = fileName.split(streamName); - if(strs.length==2){ + if (strs.length == 2) { if (checkCorrectDateFormat(strs[1])) return streamName; } } return null; } - - protected static boolean checkCorrectDateFormat(String timestamp){ - return timestamp.matches("^.[0-9]{4}.[0-9]{2}.[0-9]{2}.[0-9]{2}.[0-9]{2}.[0-9]{5}.gz$"); - } + + protected static boolean checkCorrectDateFormat(String timestamp) { + return timestamp + .matches("^.[0-9]{4}.[0-9]{2}.[0-9]{2}.[0-9]{2}.[0-9]{2}.[0-9]{5}.gz$"); + } @Override public long getMSecondsTillNextRun(long currentTime) { diff --git a/databus-worker/src/test/java/com/inmobi/databus/distcp/TestDistCPBaseService.java b/databus-worker/src/test/java/com/inmobi/databus/distcp/TestDistCPBaseService.java index 4eac982..3a0011a 100644 --- a/databus-worker/src/test/java/com/inmobi/databus/distcp/TestDistCPBaseService.java +++ b/databus-worker/src/test/java/com/inmobi/databus/distcp/TestDistCPBaseService.java @@ -176,7 +176,7 @@ public void testSplitFileName() throws Exception { // streamsSet String fileName3 = "databus_test-test_streams-2012-11-27-21-20_00000.gz"; String fileName4 = "databus_test-test_stream_2-2012-11-27-21-20_00000.gz"; - //file name in which stream name is not in streamsSet passed + // file name in which stream name is not in streamsSet passed String fileName5 = "databus_test-test_stream-2-2012-11-27-21-20_00000.gz"; // get stream names from file name String expectedStreamName1 = MergedStreamService.getCategoryFromFileName( From 84c66b183e1f97de9061a3b8aa885d1e6aab7370 Mon Sep 17 00:00:00 2001 From: Annapurna V M S S Date: Thu, 29 Nov 2012 16:58:34 +0530 Subject: [PATCH 4/4] Classpath dependencies added in pom.xml and databus.sh --- pom.xml | 12 +++++++++++- tools/scripts/databus.sh | 2 +- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 0f70bc8..deaf787 100644 --- a/pom.xml +++ b/pom.xml @@ -118,7 +118,7 @@ commons-httpclient commons-httpclient - 1.0 + 3.1 log4j @@ -155,6 +155,16 @@ testng 6.1.1 + + net.java.dev.jets3t + jets3t + 0.7.4 + + + commons-codec + commons-codec + 1.3 + diff --git a/tools/scripts/databus.sh b/tools/scripts/databus.sh index f9e0932..0c19ec0 100755 --- a/tools/scripts/databus.sh +++ b/tools/scripts/databus.sh @@ -73,7 +73,7 @@ fi #set classpath export CLASSPATH=`ls $DATABUS_DIR/lib/*jar | tr "\n" :`; -export CLASSPATH=$DATABUS_DIR/conf:$CLASSPATH:$HADOOP_CONF_DIR +export CLASSPATH=$DATABUS_DIR/conf:$CLASSPATH:$HADOOP_CONF_DIR:$DATABUS_DIR/bin #echo setting classPath to $CLASSPATH case $startStop in