Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for non-transactional write. #35

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ public class FileConfig {

public final long minTimeInMillisToUpdateFile;

public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExtension, String routingKey, String streamName, String eventTemplateStr, int maxRecordsPerEvent, boolean enableDeleteCompletedFiles, boolean exactlyOnce, double transactionTimeoutMinutes, long minTimeInMillisToUpdateFile, String fileType) {
public final boolean enableLargeEvent;


public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExtension, String routingKey,
String streamName, String eventTemplateStr, int maxRecordsPerEvent, boolean enableDeleteCompletedFiles,
boolean exactlyOnce, double transactionTimeoutMinutes, long minTimeInMillisToUpdateFile, String fileType,
boolean enableLargeEvent) {
this.stateDatabaseFileName = stateDatabaseFileName;
this.fileSpec = fileSpec;
this.fileExtension = fileExtension;
Expand All @@ -44,6 +50,7 @@ public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExte
this.transactionTimeoutMinutes = transactionTimeoutMinutes;
this.minTimeInMillisToUpdateFile = minTimeInMillisToUpdateFile;
this.fileType = fileType;
this.enableLargeEvent = enableLargeEvent;
}

@Override
Expand All @@ -61,6 +68,7 @@ public String toString() {
", exactlyOnce=" + exactlyOnce +
", transactionTimeoutMinutes=" + transactionTimeoutMinutes +
", minTimeInMillisToUpdateFile=" + minTimeInMillisToUpdateFile +
", enableLargeEvent=" + enableLargeEvent +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public abstract class FileIngestService extends DeviceDriver {
private static final String EXACTLY_ONCE_KEY = "EXACTLY_ONCE";
private static final String TRANSACTION_TIMEOUT_MINUTES_KEY = "TRANSACTION_TIMEOUT_MINUTES";
private static final String MIN_TIME_IN_MILLIS_TO_UPDATE_FILE_KEY = "MIN_TIME_IN_MILLIS_TO_UPDATE_FILE";
private static final String ENABLE_LARGE_EVENT = "ENABLE_LARGE_EVENT";

private final FileProcessor processor;
private final ScheduledExecutorService executor;
Expand All @@ -63,7 +64,8 @@ public FileIngestService(DeviceDriverConfig config) {
getExactlyOnce(),
getTransactionTimeoutMinutes(),
getMinTimeInMillisToUpdateFile(),
config.getClassName());
config.getClassName(),
getLargeEventEnable());
log.info("File Ingest Config: {}", fileSequenceConfig);
final String scopeName = getScopeName();
log.info("Scope: {}", scopeName);
Expand Down Expand Up @@ -129,6 +131,10 @@ long getMinTimeInMillisToUpdateFile() {
return Long.parseLong(getProperty(MIN_TIME_IN_MILLIS_TO_UPDATE_FILE_KEY, "5000"));
}

boolean getLargeEventEnable() {
return Boolean.parseBoolean(getProperty(ENABLE_LARGE_EVENT, Boolean.toString(false)));
}

protected void watchFiles() {
log.trace("watchFiles: BEGIN");
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public static FileProcessor create(
EventWriterConfig.builder()
.enableConnectionPooling(true)
.transactionTimeoutTime((long) (config.transactionTimeoutMinutes * 60.0 * 1000.0))
.enableLargeEvents(config.enableLargeEvent)
.build(),
config.exactlyOnce);

Expand Down Expand Up @@ -176,9 +177,14 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN
/* Check if transactions can be aborted.
* Will fail with {@link TxnFailedException} if the transaction has already been committed or aborted.
*/
log.debug("processFile: Transaction status {} ", writer.getTransactionStatus());
if(writer.getTransactionStatus() == Transaction.Status.OPEN){
writer.abort();
if (config.exactlyOnce)
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong indentation

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

log.debug("processFile: Transaction status {} ", writer.getTransactionStatus());
if(writer.getTransactionStatus() == Transaction.Status.OPEN){
writer.abort();
}
} else {
log.debug("processFile: No need to check transaction status for non-transactional write.");
}

File pendingFile = new File(fileNameWithBeginOffset.fileName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public abstract class SimpleDeviceDriver<R, S extends Samples> extends DeviceDri
private static final String DELAY_BETWEEN_WRITE_BATCHES_MS_KEY = "DELAY_BETWEEN_WRITE_BATCHES_MS";
private static final String EXACTLY_ONCE_KEY = "EXACTLY_ONCE";
private static final String TRANSACTION_TIMEOUT_MINUTES_KEY = "TRANSACTION_TIMEOUT_MINUTES";
private static final String ENABLE_LARGE_EVENT = "ENABLE_LARGE_EVENT";

private final String routingKey;
private final DataCollectorService<R, S> dataCollectorService;
Expand Down Expand Up @@ -101,6 +102,7 @@ public SimpleDeviceDriver(DeviceDriverConfig config) {
.enableConnectionPooling(true)
.retryAttempts(Integer.MAX_VALUE)
.transactionTimeoutTime((long) (transactionTimeoutMinutes * 60.0 * 1000.0))
.enableLargeEvents(getLargeEventEnable())
.build(),
exactlyOnce);

Expand Down Expand Up @@ -155,6 +157,10 @@ boolean getExactlyOnce() {
return Boolean.parseBoolean(getProperty(EXACTLY_ONCE_KEY, Boolean.toString(true)));
}

boolean getLargeEventEnable() {
return Boolean.parseBoolean(getProperty(ENABLE_LARGE_EVENT, Boolean.toString(false)));
}

/**
* This time duration must not exceed the controller property controller.transaction.maxLeaseValue (milliseconds).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public abstract class SimpleMemorylessDriver<R> extends DeviceDriver {
private static final String TRANSACTION_TIMEOUT_MINUTES_KEY = "TRANSACTION_TIMEOUT_MINUTES";
private static final String ROUTING_KEY_KEY = "ROUTING_KEY";
private static final String SENSOR_POLL_PERIODICITY_MS = "POLL_PERIODICITY_MS";
private static final String ENABLE_LARGE_EVENT = "ENABLE_LARGE_EVENT";

private final DataCollectorService<R> dataCollectorService;
private final EventStreamClientFactory clientFactory;
Expand Down Expand Up @@ -58,6 +59,7 @@ public SimpleMemorylessDriver(DeviceDriverConfig config) {
.enableConnectionPooling(true)
.retryAttempts(Integer.MAX_VALUE)
.transactionTimeoutTime((long) (transactionTimeoutMinutes * 60.0 * 1000.0))
.enableLargeEvents(getLargeEventEnable())
.build(),
exactlyOnce);
dataCollectorService = new DataCollectorService<>(config.getInstanceName(), this, writer, readPeriodicityMs);
Expand Down Expand Up @@ -134,6 +136,11 @@ private long getReadPeriodicityMs() {
return Long.parseLong(getProperty(SENSOR_POLL_PERIODICITY_MS, Integer.toString(10)));
}

boolean getLargeEventEnable() {
return Boolean.parseBoolean(getProperty(ENABLE_LARGE_EVENT, Boolean.toString(false)));
}


public String getRoutingKey() {
return routingKey;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ abstract public class StatefulSensorDeviceDriver<S> extends DeviceDriver {
private static final String DELAY_BETWEEN_WRITE_BATCHES_MS_KEY = "DELAY_BETWEEN_WRITE_BATCHES_MS";
private static final String EXACTLY_ONCE_KEY = "EXACTLY_ONCE";
private static final String TRANSACTION_TIMEOUT_MINUTES_KEY = "TRANSACTION_TIMEOUT_MINUTES";
private static final String ENABLE_LARGE_EVENT = "ENABLE_LARGE_EVENT";

private final String routingKey;
private final DataCollectorService<S> dataCollectorService;
Expand Down Expand Up @@ -85,6 +86,7 @@ public StatefulSensorDeviceDriver(DeviceDriverConfig config) {
.enableConnectionPooling(true)
.retryAttempts(Integer.MAX_VALUE)
.transactionTimeoutTime((long) (transactionTimeoutMinutes * 60.0 * 1000.0))
.enableLargeEvents(getLargeEventEnable())
.build(),
exactlyOnce);

Expand Down Expand Up @@ -141,6 +143,10 @@ String getStreamName() {
return getProperty(STREAM_KEY);
}

boolean getLargeEventEnable() {
return Boolean.parseBoolean(getProperty(ENABLE_LARGE_EVENT, Boolean.toString(false)));
}

@Override
protected void doStart() {
persistentQueueToPravegaService.startAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void createRAWFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0, 5000l,"RawFileIngestService");
true,20.0, 5000l,"RawFileIngestService", true);
FileProcessor rawFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId");

Assertions.assertTrue(rawFileProcessor instanceof RawFileProcessor);
Expand All @@ -52,7 +52,7 @@ public void createCSVFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0, 5000L,"CsvFileIngestService");
true,20.0, 5000L,"CsvFileIngestService", false);
FileProcessor csvFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId");

Assertions.assertTrue(csvFileProcessor instanceof CsvFileSequenceProcessor);
Expand All @@ -67,7 +67,7 @@ public void createParquetFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0, 5000L,"ParquetFileIngestService");
true,20.0, 5000L,"ParquetFileIngestService", false);
FileProcessor parquetFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId");

Assertions.assertTrue(parquetFileProcessor instanceof ParquetFileProcessor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void setup(){
String stateDatabaseFileName = ":memory:";
config = new FileConfig("./psc.db","/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0, 5000,"RawFileIngestService");
true,20.0, 5000,"RawFileIngestService", true);
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion scripts/run-with-gradle-raw-file.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ export PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES=false
export PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES=2.0
export PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE=false
export PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE=5000

export PRAVEGA_SENSOR_COLLECTOR_RAW1_ENABLE_LARGE_EVENT=true
./gradlew --no-daemon run
1 change: 1 addition & 0 deletions windows-service/PravegaSensorCollectorApp.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
<env name="PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES" value="2.0" />
<env name="PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE" value="false" />
<env name="PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE" value="5000" />
<env name="PRAVEGA_SENSOR_COLLECTOR_RAW1_ENABLE_LARGE_EVENT" value="true" />

<onfailure action="restart"/>

Expand Down
Loading