diff --git a/databus-worker/src/main/java/com/inmobi/databus/distcp/DistcpBaseService.java b/databus-worker/src/main/java/com/inmobi/databus/distcp/DistcpBaseService.java index de868df..60efb82 100644 --- a/databus-worker/src/main/java/com/inmobi/databus/distcp/DistcpBaseService.java +++ b/databus-worker/src/main/java/com/inmobi/databus/distcp/DistcpBaseService.java @@ -126,17 +126,23 @@ protected Boolean executeDistCp(DistCpOptions options) */ protected abstract Path getInputPath() throws IOException; - protected String getCategoryFromFileName(String fileName) { - LOG.debug("Splitting [" + fileName + "] on -"); - if (fileName != null && fileName.length() > 1 && fileName.contains("-")) { - StringTokenizer tokenizer = new StringTokenizer(fileName, "-"); - tokenizer.nextToken(); //skip collector name - String catgeory = tokenizer.nextToken(); - return catgeory; + protected static String getCategoryFromFileName(String fileName, + Set streamsSet) { + for (String streamName : streamsSet) { + String strs[] = fileName.split(streamName); + if (strs.length == 2) { + if (checkCorrectDateFormat(strs[1])) + return streamName; + } } return null; } + protected static boolean checkCorrectDateFormat(String timestamp) { + return timestamp + .matches("^.[0-9]{4}.[0-9]{2}.[0-9]{2}.[0-9]{2}.[0-9]{2}.[0-9]{5}.gz$"); + } + @Override public long getMSecondsTillNextRun(long currentTime) { long runIntervalInSec = (DEFAULT_RUN_INTERVAL/1000); diff --git a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java index ac49b93..4f872db 100644 --- a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java +++ b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java @@ -262,7 +262,8 @@ private Map> prepareForCommit(Path tmpOut) for (int i = 0; i < allFiles.length; i++) { String fileName = allFiles[i].getPath().getName(); if (fileName != null) { - String category = getCategoryFromFileName(fileName); + String category = getCategoryFromFileName(fileName, getDestCluster() + .getPrimaryDestinationStreams()); if (category != null) { Path intermediatePath = new Path(tmpOut, category); if (!getDestFs().exists(intermediatePath)) diff --git a/databus-worker/src/test/java/com/inmobi/databus/distcp/TestDistCPBaseService.java b/databus-worker/src/test/java/com/inmobi/databus/distcp/TestDistCPBaseService.java index 970e99a..3a0011a 100644 --- a/databus-worker/src/test/java/com/inmobi/databus/distcp/TestDistCPBaseService.java +++ b/databus-worker/src/test/java/com/inmobi/databus/distcp/TestDistCPBaseService.java @@ -161,4 +161,38 @@ public void testNegative() throws Exception { } + @Test + public void testSplitFileName() throws Exception { + Set streamsSet = new HashSet(); + streamsSet.add("test-stream"); + streamsSet.add("test_stream"); + streamsSet.add("test_streams"); + streamsSet.add("test_stream_2"); + // file name in which collector name has hyphen + String fileName1 = "databus-test-test_stream-2012-11-27-21-20_00000.gz"; + // file name in which stream name has hyphen + String fileName2 = "databus_test-test-stream-2012-11-27-21-20_00000.gz"; + // file name in which stream name is subset of another stream name in the + // streamsSet + String fileName3 = "databus_test-test_streams-2012-11-27-21-20_00000.gz"; + String fileName4 = "databus_test-test_stream_2-2012-11-27-21-20_00000.gz"; + // file name in which stream name is not in streamsSet passed + String fileName5 = "databus_test-test_stream-2-2012-11-27-21-20_00000.gz"; + // get stream names from file name + String expectedStreamName1 = MergedStreamService.getCategoryFromFileName( + fileName1, streamsSet); + String expectedStreamName2 = MergedStreamService.getCategoryFromFileName( + fileName2, streamsSet); + String expectedStreamName3 = MergedStreamService.getCategoryFromFileName( + fileName3, streamsSet); + String expectedStreamName4 = MergedStreamService.getCategoryFromFileName( + fileName4, streamsSet); + String expectedStreamName5 = MergedStreamService.getCategoryFromFileName( + fileName5, streamsSet); + assert expectedStreamName1.compareTo("test_stream") == 0; + assert expectedStreamName2.compareTo("test-stream") == 0; + assert expectedStreamName3.compareTo("test_streams") == 0; + assert expectedStreamName4.compareTo("test_stream_2") == 0; + assert expectedStreamName5 == null; + } } diff --git a/pom.xml b/pom.xml index 0f70bc8..deaf787 100644 --- a/pom.xml +++ b/pom.xml @@ -118,7 +118,7 @@ commons-httpclient commons-httpclient - 1.0 + 3.1 log4j @@ -155,6 +155,16 @@ testng 6.1.1 + + net.java.dev.jets3t + jets3t + 0.7.4 + + + commons-codec + commons-codec + 1.3 + diff --git a/tools/scripts/databus.sh b/tools/scripts/databus.sh index f9e0932..0c19ec0 100755 --- a/tools/scripts/databus.sh +++ b/tools/scripts/databus.sh @@ -73,7 +73,7 @@ fi #set classpath export CLASSPATH=`ls $DATABUS_DIR/lib/*jar | tr "\n" :`; -export CLASSPATH=$DATABUS_DIR/conf:$CLASSPATH:$HADOOP_CONF_DIR +export CLASSPATH=$DATABUS_DIR/conf:$CLASSPATH:$HADOOP_CONF_DIR:$DATABUS_DIR/bin #echo setting classPath to $CLASSPATH case $startStop in