Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions bin/m-client
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ CLIENT_CONSOLECONSUMER_CLASS="com.inmobi.messaging.consumer.examples.ConsoleClie
CLIENT_BENCHMARK_CLASS="com.inmobi.messaging.consumer.examples.StreamingBenchmark"
CLIENT_COUNTER_CLASS="com.inmobi.messaging.consumer.examples.CounterClient"
CLIENT_MULTITOPIC_SEQGEN_CLASS="com.inmobi.messaging.publisher.examples.MultiTopicSeqGenerator"
CLIENT_CHECKPOINT_UTIL_CLASS="com.inmobi.messaging.consumer.util.CheckpointUtil"

################################
# functions
Expand Down Expand Up @@ -69,7 +70,7 @@ Usage: $0 file <topic> <file> --conf <confdir>
[-consumer <no-of-producers> <no-of-messages> [<timeoutSeconds> <msg-size> <hadoopconsumerflag> <timezone>]] --conf <confdir>
$0 counter <minutes-to-read-from> --conf <confdir>
$0 multitopicseqgen <topic1> <topic2> <maxSeq> --conf <confdir>

$0 checkpointutil <confFile>
EOF
}

Expand Down Expand Up @@ -125,6 +126,9 @@ case "$mode" in
multitopicseqgen)
opt_multitopicseqgen=1
;;
checkpointutil)
opt_checkpointutil=1
;;
*)
error "Unknown or unspecified command '$mode'"
echo
Expand Down Expand Up @@ -204,8 +208,9 @@ elif [ -n "$opt_counter" ] ; then
run_client $CLIENT_COUNTER_CLASS $args
elif [ -n "$opt_multitopicseqgen" ] ; then
run_client $CLIENT_MULTITOPIC_SEQGEN_CLASS $args
elif [ -n "$opt_checkpointutil" ] ; then
run_client $CLIENT_CHECKPOINT_UTIL_CLASS $args
else
error "This message should never appear" 1
fi

exit 0
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ public void init(String topicName, String consumerName, Date startTimestamp,
startTime.after(new Date(System.currentTimeMillis()))) {
throw new IllegalArgumentException("Future start time is not accepted");
}
init(config);
metrics = (BaseMessageConsumerStatsExposer) getMetricsImpl();
String emitterConfig = config
.getString(MessageConsumerFactory.EMITTER_CONF_FILE_KEY);
if (emitterConfig != null) {
statsEmitter.init(emitterConfig);
statsEmitter.add(metrics);
}
init(config);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ public FileStatus getFirstFile() {
return null;
}

public FileStatus getLastFile() {
Map.Entry<T, FileStatus> last = files.lastEntry();
if (last != null) {
return last.getValue();
}
return null;
}

private Map.Entry<T, FileStatus> getFirstEntry() {
return files.firstEntry();
}
Expand Down Expand Up @@ -156,5 +164,4 @@ public FileStatus getNext() {
public boolean hasNext() {
return fileNameIterator.hasNext();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import java.io.DataInputStream;
import java.io.IOException;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Collection;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -13,34 +17,77 @@
import org.apache.hadoop.io.Text;

import com.inmobi.databus.readers.DatabusStreamWaitingReader;
import com.inmobi.messaging.consumer.databus.MessageCheckpoint;
import com.inmobi.messaging.metrics.PartitionReaderStatsExposer;

public class ClusterReader extends AbstractPartitionStreamReader {

private static final Log LOG = LogFactory.getLog(PartitionReader.class);

private final PartitionCheckpoint partitionCheckpoint;
private final PartitionCheckpointList partitionCheckpointList;
private final Date startTime;
private final Path streamDir;
private final boolean isDatabusData;

ClusterReader(PartitionId partitionId,
PartitionCheckpoint partitionCheckpoint, FileSystem fs,
PartitionCheckpointList partitionCheckpointList, FileSystem fs,
Path streamDir, Configuration conf, String inputFormatClass,
Date startTime, long waitTimeForFileCreate, boolean isDatabusData,
PartitionReaderStatsExposer metrics, boolean noNewFiles)
PartitionReaderStatsExposer metrics, boolean noNewFiles,
Set<Integer> partitionMinList)
throws IOException {
this.startTime = startTime;
this.streamDir = streamDir;
this.partitionCheckpoint = partitionCheckpoint;
this.partitionCheckpointList = partitionCheckpointList;
this.isDatabusData = isDatabusData;

reader = new DatabusStreamWaitingReader(partitionId, fs, streamDir,
inputFormatClass, conf, waitTimeForFileCreate, metrics, noNewFiles);
inputFormatClass, conf, waitTimeForFileCreate, metrics, noNewFiles,
partitionMinList, partitionCheckpointList);
}

/*
+ * this method is used to find the partition checkpoint which has least time stamp.
+ * So that reader starts build listing from this partition checkpoint time stamp).
+ */
public PartitionCheckpoint findLeastPartitionCheckPointTime(
PartitionCheckpointList partitionCheckpointList) {
PartitionCheckpoint partitioncheckpoint = null;

Map<Integer, PartitionCheckpoint> listOfCheckpoints =
partitionCheckpointList.getCheckpoints();

if (listOfCheckpoints != null) {
Collection<PartitionCheckpoint> listofPartitionCheckpoints =
listOfCheckpoints.values();
Iterator<PartitionCheckpoint> it = listofPartitionCheckpoints.iterator();
Date timeStamp = null;
if (it.hasNext()) {
partitioncheckpoint = it.next();
timeStamp = DatabusStreamWaitingReader.getDateFromStreamDir(streamDir,
new Path(partitioncheckpoint.getFileName()));
}
while (it.hasNext()) {
PartitionCheckpoint tmpPartitionCheckpoint = it.next();
Date date = DatabusStreamWaitingReader.getDateFromStreamDir(streamDir,
new Path(tmpPartitionCheckpoint.getFileName()));
if (timeStamp.compareTo(date) > 0) {
partitioncheckpoint = tmpPartitionCheckpoint;
timeStamp = date;
}
}
}
return partitioncheckpoint;
}


public void initializeCurrentFile() throws IOException, InterruptedException {
LOG.info("Initializing partition reader's current file");
PartitionCheckpoint partitionCheckpoint = null;
if (partitionCheckpointList != null) {
partitionCheckpoint = findLeastPartitionCheckPointTime(partitionCheckpointList);
}

if (startTime != null) {
((DatabusStreamWaitingReader)reader).build(startTime);
if (!reader.initializeCurrentFile(startTime)) {
Expand All @@ -52,9 +99,12 @@ public void initializeCurrentFile() throws IOException, InterruptedException {
DatabusStreamWaitingReader.getBuildTimestamp(streamDir,
partitionCheckpoint));
if (!reader.isEmpty()) {
if (!reader.initializeCurrentFile(partitionCheckpoint)) {
throw new IllegalArgumentException("Checkpoint file does not exist");
}
if (partitionCheckpoint.getLineNum() == -1) {
reader.initFromNextCheckPoint();
}
else if (!reader.initializeCurrentFile(partitionCheckpoint)) {
throw new IllegalArgumentException("Checkpoint file does not exist");
}
} else {
reader.startFromBegining();
}
Expand All @@ -75,4 +125,16 @@ public byte[] readLine() throws IOException, InterruptedException {
}
return line;
}

@Override
public MessageCheckpoint getMessageCheckpoint() {
if (reader instanceof DatabusStreamWaitingReader) {
DatabusStreamWaitingReader dataWaitingReader = (DatabusStreamWaitingReader) reader;
PartitionCheckpointList pChkLst = new PartitionCheckpointList(
dataWaitingReader.getPartitionCheckpointList().getCheckpoints());
return pChkLst;
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.inmobi.databus.files.DatabusStreamFile;
import com.inmobi.databus.readers.CollectorStreamReader;
import com.inmobi.databus.readers.LocalStreamCollectorReader;
import com.inmobi.messaging.consumer.databus.MessageCheckpoint;
import com.inmobi.messaging.metrics.CollectorReaderStatsExposer;

public class CollectorReader extends AbstractPartitionStreamReader {
Expand Down Expand Up @@ -173,5 +174,10 @@ public byte[] readLine() throws IOException, InterruptedException {
}
return line;
}


@Override
public MessageCheckpoint getMessageCheckpoint() {
return new PartitionCheckpoint(reader.getCurrentStreamFile(),
reader.getCurrentLineNum());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
import org.apache.hadoop.io.Writable;

import com.inmobi.databus.files.StreamFile;
import com.inmobi.messaging.consumer.databus.MessageCheckpoint;

public class PartitionCheckpoint implements Writable {
public class PartitionCheckpoint implements Writable, MessageCheckpoint {
private StreamFile streamFile;
private long lineNum;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.inmobi.databus.partition;

import java.util.Map;
import java.util.TreeMap;

import com.inmobi.databus.partition.PartitionCheckpoint;
import com.inmobi.messaging.consumer.databus.MessageCheckpoint;

/**
* Checkpoint for the segments of databus stream consumer.
*
*/
public class PartitionCheckpointList implements MessageCheckpoint {

// map of static id to its checkpoint
private Map<Integer, PartitionCheckpoint> pChkpoints =
new TreeMap<Integer, PartitionCheckpoint>();

public PartitionCheckpointList(Map<Integer, PartitionCheckpoint> chkpoints) {

this.pChkpoints = chkpoints;
}

public void setCheckpoint(Map<Integer, PartitionCheckpoint> chkpoints) {
this.pChkpoints = chkpoints;
}

public Map<Integer, PartitionCheckpoint> getCheckpoints() {
return pChkpoints;
}

public void set(int segmentId, PartitionCheckpoint pck) {
pChkpoints.put(segmentId, pck);
}

public String toString() {
StringBuffer buf = new StringBuffer();
for (Map.Entry<Integer, PartitionCheckpoint> entry : pChkpoints
.entrySet()) {
buf.append(entry.getKey().toString())
.append(":");
if (entry.getValue() != null) {
buf.append(entry.getValue().toString());
} else {
buf.append("null");
}
buf.append(", ");
}
return buf.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.BlockingQueue;

import org.apache.commons.codec.binary.Base64;
Expand All @@ -15,6 +16,7 @@
import com.inmobi.databus.files.StreamFile;
import com.inmobi.messaging.Message;
import com.inmobi.messaging.consumer.databus.DataEncodingType;
import com.inmobi.messaging.consumer.databus.MessageCheckpoint;
import com.inmobi.messaging.consumer.databus.QueueEntry;
import com.inmobi.messaging.metrics.CollectorReaderStatsExposer;
import com.inmobi.messaging.metrics.PartitionReaderStatsExposer;
Expand Down Expand Up @@ -47,15 +49,16 @@ public PartitionReader(PartitionId partitionId,
}

public PartitionReader(PartitionId partitionId,
PartitionCheckpoint partitionCheckpoint, FileSystem fs,
BlockingQueue<QueueEntry> buffer, Path streamDir,
PartitionCheckpointList partitionCheckpointList, FileSystem fs,
BlockingQueue<QueueEntry> buffer, Path streamDir,
Configuration conf, String inputFormatClass,
Date startTime, long waitTimeForFileCreate, boolean isDatabusData,
DataEncodingType dataEncoding, PartitionReaderStatsExposer prMetrics)
DataEncodingType dataEncoding, PartitionReaderStatsExposer prMetrics,
Set<Integer> partitionMinList)
throws IOException {
this(partitionId, partitionCheckpoint, fs, buffer, streamDir,
this(partitionId, partitionCheckpointList, fs, buffer, streamDir,
conf, inputFormatClass, startTime, waitTimeForFileCreate, isDatabusData,
dataEncoding, prMetrics, false);
dataEncoding, prMetrics, false, partitionMinList);
}

PartitionReader(PartitionId partitionId,
Expand All @@ -82,21 +85,22 @@ public PartitionReader(PartitionId partitionId,
}

PartitionReader(PartitionId partitionId,
PartitionCheckpoint partitionCheckpoint, FileSystem fs,
PartitionCheckpointList partitionCheckpointList, FileSystem fs,
BlockingQueue<QueueEntry> buffer, Path streamDir,
Configuration conf, String inputFormatClass,
Date startTime, long waitTimeForFileCreate, boolean isDatabusData,
DataEncodingType dataEncoding, PartitionReaderStatsExposer prMetrics,
boolean noNewFiles)
boolean noNewFiles, Set<Integer> partitionMinList)
throws IOException {
this(partitionId, partitionCheckpoint, buffer, startTime, dataEncoding,
prMetrics);
reader = new ClusterReader(partitionId, partitionCheckpoint,
this(partitionId, partitionCheckpointList, buffer, startTime, dataEncoding,
prMetrics, partitionMinList);
reader = new ClusterReader(partitionId, partitionCheckpointList,
fs, streamDir, conf, inputFormatClass, startTime,
waitTimeForFileCreate, isDatabusData, prMetrics, noNewFiles);
waitTimeForFileCreate, isDatabusData, prMetrics, noNewFiles,
partitionMinList);
// initialize cluster and its directories
LOG.info("Partition reader initialized with partitionId:" + partitionId +
" checkPoint:" + partitionCheckpoint +
" checkPoint:" + partitionCheckpointList +
" startTime:" + startTime +
" currentReader:" + reader);
}
Expand All @@ -118,6 +122,24 @@ private PartitionReader(PartitionId partitionId,
this.dataEncoding = dataEncoding;
this.prMetrics = prMetrics;
}

private PartitionReader(PartitionId partitionId,
PartitionCheckpointList partitionCheckpointList,
BlockingQueue<QueueEntry> buffer, Date startTime,
DataEncodingType dataEncoding,
PartitionReaderStatsExposer prMetrics, Set<Integer> partitionMinList)
throws IOException {
if (startTime == null && partitionCheckpointList == null) {
String msg = "StartTime and checkpoint both" +
" cannot be null in PartitionReader";
LOG.warn(msg);
throw new IllegalArgumentException(msg);
}
this.partitionId = partitionId;
this.buffer = buffer;
this.dataEncoding = dataEncoding;
this.prMetrics = prMetrics;
}

public synchronized void start() {
Runnable runnable = new Runnable() {
Expand Down Expand Up @@ -210,10 +232,9 @@ void execute() {
} else {
data = line;
}
MessageCheckpoint checkpoint = reader.getMessageCheckpoint();
buffer.put(new QueueEntry(new Message(
ByteBuffer.wrap(data)), partitionId,
new PartitionCheckpoint(reader.getCurrentFile(),
reader.getCurrentLineNum())));
ByteBuffer.wrap(data)), partitionId, checkpoint));
prMetrics.incrementMessagesAddedToBuffer();
} else {
LOG.info("No stream to read");
Expand Down
Loading