Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,23 @@ protected Boolean executeDistCp(DistCpOptions options)
*/
protected abstract Path getInputPath() throws IOException;

protected String getCategoryFromFileName(String fileName) {
LOG.debug("Splitting [" + fileName + "] on -");
if (fileName != null && fileName.length() > 1 && fileName.contains("-")) {
StringTokenizer tokenizer = new StringTokenizer(fileName, "-");
tokenizer.nextToken(); //skip collector name
String catgeory = tokenizer.nextToken();
return catgeory;
protected static String getCategoryFromFileName(String fileName,
Set<String> streamsSet) {
for (String streamName : streamsSet) {
String strs[] = fileName.split(streamName);
if (strs.length == 2) {
if (checkCorrectDateFormat(strs[1]))
return streamName;
}
}
return null;
}

protected static boolean checkCorrectDateFormat(String timestamp) {
return timestamp
.matches("^.[0-9]{4}.[0-9]{2}.[0-9]{2}.[0-9]{2}.[0-9]{2}.[0-9]{5}.gz$");
}

@Override
public long getMSecondsTillNextRun(long currentTime) {
long runIntervalInSec = (DEFAULT_RUN_INTERVAL/1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ private Map<String, List<Path>> prepareForCommit(Path tmpOut)
for (int i = 0; i < allFiles.length; i++) {
String fileName = allFiles[i].getPath().getName();
if (fileName != null) {
String category = getCategoryFromFileName(fileName);
String category = getCategoryFromFileName(fileName, getDestCluster()
.getPrimaryDestinationStreams());
if (category != null) {
Path intermediatePath = new Path(tmpOut, category);
if (!getDestFs().exists(intermediatePath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,38 @@ public void testNegative() throws Exception {

}

@Test
public void testSplitFileName() throws Exception {
Set<String> streamsSet = new HashSet<String>();
streamsSet.add("test-stream");
streamsSet.add("test_stream");
streamsSet.add("test_streams");
streamsSet.add("test_stream_2");
// file name in which collector name has hyphen
String fileName1 = "databus-test-test_stream-2012-11-27-21-20_00000.gz";
// file name in which stream name has hyphen
String fileName2 = "databus_test-test-stream-2012-11-27-21-20_00000.gz";
// file name in which stream name is subset of another stream name in the
// streamsSet
String fileName3 = "databus_test-test_streams-2012-11-27-21-20_00000.gz";
String fileName4 = "databus_test-test_stream_2-2012-11-27-21-20_00000.gz";
// file name in which stream name is not in streamsSet passed
String fileName5 = "databus_test-test_stream-2-2012-11-27-21-20_00000.gz";
// get stream names from file name
String expectedStreamName1 = MergedStreamService.getCategoryFromFileName(
fileName1, streamsSet);
String expectedStreamName2 = MergedStreamService.getCategoryFromFileName(
fileName2, streamsSet);
String expectedStreamName3 = MergedStreamService.getCategoryFromFileName(
fileName3, streamsSet);
String expectedStreamName4 = MergedStreamService.getCategoryFromFileName(
fileName4, streamsSet);
String expectedStreamName5 = MergedStreamService.getCategoryFromFileName(
fileName5, streamsSet);
assert expectedStreamName1.compareTo("test_stream") == 0;
assert expectedStreamName2.compareTo("test-stream") == 0;
assert expectedStreamName3.compareTo("test_streams") == 0;
assert expectedStreamName4.compareTo("test_stream_2") == 0;
assert expectedStreamName5 == null;
}
}
12 changes: 11 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>1.0</version>
<version>3.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
Expand Down Expand Up @@ -155,6 +155,16 @@
<artifactId>testng</artifactId>
<version>6.1.1</version>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
<version>0.7.4</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.3</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
2 changes: 1 addition & 1 deletion tools/scripts/databus.sh
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ fi

#set classpath
export CLASSPATH=`ls $DATABUS_DIR/lib/*jar | tr "\n" :`;
export CLASSPATH=$DATABUS_DIR/conf:$CLASSPATH:$HADOOP_CONF_DIR
export CLASSPATH=$DATABUS_DIR/conf:$CLASSPATH:$HADOOP_CONF_DIR:$DATABUS_DIR/bin
#echo setting classPath to $CLASSPATH

case $startStop in
Expand Down