Skip to content
This repository was archived by the owner on Jan 8, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 60 additions & 29 deletions src/main/java/org/keedio/flume/source/HibernateHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,43 +95,74 @@ public void closeSession() {
*/
@SuppressWarnings("unchecked")
public List<List<Object>> executeQuery() throws InterruptedException {

List<List<Object>> rowsList = new ArrayList<List<Object>>() ;
Query query;

if (!session.isConnected()){
resetConnection();
}

if (sqlSourceHelper.isCustomQuerySet()){

query = session.createSQLQuery(sqlSourceHelper.buildQuery());

if (sqlSourceHelper.getMaxRows() != 0){
query = query.setMaxResults(sqlSourceHelper.getMaxRows());
}
}
else
{
query = session
.createSQLQuery(sqlSourceHelper.getQuery())
.setFirstResult(Integer.parseInt(sqlSourceHelper.getCurrentIndex()));

if (sqlSourceHelper.getMaxRows() != 0){
query = query.setMaxResults(sqlSourceHelper.getMaxRows());

String[] tables = new String[0];
String tableQuery = sqlSourceHelper.buildTableQuery();
if(sqlSourceHelper.isDynamicTable()) {
query = session.createSQLQuery(tableQuery);
List<List<Object>> rowsListTable = new ArrayList<List<Object>>() ;

try {
rowsListTable = query.setResultTransformer(Transformers.TO_LIST).list();
} catch (Exception e) {
LOG.info(query.getQueryString());
LOG.error("Exception thrown, resetting connection.", e);
resetConnection();
}

if (!rowsListTable.isEmpty()) {
tables = new String[rowsListTable.size()];
for(int i = 0; i < rowsListTable.size(); i++)
tables[i] = rowsListTable.get(i).get(0).toString();
}
} else {
tables = new String[1];
tables[0] = tableQuery;
}

try {
rowsList = query.setFetchSize(sqlSourceHelper.getMaxRows()).setResultTransformer(Transformers.TO_LIST).list();
}catch (Exception e){
LOG.error("Exception thrown, resetting connection.",e);
resetConnection();
}

if (!rowsList.isEmpty()){
sqlSourceHelper.setCurrentIndex(Integer.toString((Integer.parseInt(sqlSourceHelper.getCurrentIndex())
+ rowsList.size())));

for(String table : tables) {
if (sqlSourceHelper.isCustomQuerySet()) {

query = session.createSQLQuery(sqlSourceHelper.buildQuery(table));

if (sqlSourceHelper.getMaxRows() != 0) {
query = query.setMaxResults(sqlSourceHelper.getMaxRows());
}
} else {
query = session
.createSQLQuery(sqlSourceHelper.buildQuery(table))
.setFirstResult(Integer.parseInt(sqlSourceHelper.getCurrentIndex()));

if (sqlSourceHelper.getMaxRows() != 0) {
query = query.setMaxResults(sqlSourceHelper.getMaxRows());
}
}

try {
rowsList = query.setFetchSize(sqlSourceHelper.getMaxRows()).setResultTransformer(Transformers.TO_LIST).list();
} catch (Exception e) {
LOG.info(query.getQueryString());
LOG.error("Exception thrown, resetting connection.", e);
resetConnection();
}

if (!rowsList.isEmpty()) {
if (sqlSourceHelper.isCustomQuerySet()) {
sqlSourceHelper.setCurrentIndex(rowsList.get(rowsList.size() - 1).get(0).toString());
} else {
sqlSourceHelper.setCurrentIndex(Integer.toString((Integer.parseInt(sqlSourceHelper.getCurrentIndex())
+ rowsList.size())));
}

break;
}
}

return rowsList;
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/keedio/flume/source/SQLSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void configure(Context context) {
*/
@Override
public Status process() throws EventDeliveryException {

try {
sqlSourceCounter.startProcess();

Expand All @@ -108,6 +108,8 @@ public Status process() throws EventDeliveryException {
sqlSourceCounter.incrementEventCount(result.size());

sqlSourceHelper.updateStatusFile();

LOG.info("Process " + result.size() + " line(s)");
}

sqlSourceCounter.endProcess(result.size());
Expand Down
123 changes: 76 additions & 47 deletions src/main/java/org/keedio/flume/source/SQLSourceHelper.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package org.keedio.flume.source;

import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.io.*;
import java.nio.charset.Charset;
import java.util.*;

Expand Down Expand Up @@ -43,11 +39,11 @@ public class SQLSourceHelper {

private static final Logger LOG = LoggerFactory.getLogger(SQLSourceHelper.class);

private File file, directory;
private File file, bkFile,directory;
private int runQueryDelay, batchSize, maxRows;
private String startFrom, currentIndex;
private String statusFilePath, statusFileName, connectionURL, table,
columnsToSelect, customQuery, query, sourceName, delimiterEntry, connectionUserName, connectionPassword,
columnsToSelect, customQuery, sourceName, delimiterEntry, connectionUserName, connectionPassword,
defaultCharsetResultSet;
private Boolean encloseByQuotes;

Expand Down Expand Up @@ -112,27 +108,40 @@ public SQLSourceHelper(Context context, String sourceName) {
}

file = new File(statusFilePath + "/" + statusFileName);
bkFile = new File(statusFilePath + "/" + statusFileName + ".bak");

if (!isStatusFileCreated()) {
currentIndex = startFrom;
createStatusFile();
} else {
currentIndex = getStatusFileIndex(startFrom);
}
}

public String buildTableQuery() {
String query = table;

if (table != null && table.contains("$@$")) {
query = table.replace("$@$", currentIndex);
}

query = buildQuery();
return query;
}

public String buildQuery() {
public String buildQuery(String table) {

if (customQuery == null) {
return "SELECT " + columnsToSelect + " FROM " + table;
} else {
if (customQuery.contains("$@$")) {
return customQuery.replace("$@$", currentIndex);
} else {
return customQuery;
}
String query = customQuery;
if (query.contains("$@$")) {
query = query.replace("$@$", currentIndex);
}
if(query.contains("@")) {
query = query.replace("@", table);
}

return query;
}
}

Expand Down Expand Up @@ -160,15 +169,18 @@ public List<String[]> getAllRows(List<List<Object>> queryResult) {
}

String[] row = null;
boolean ignoreFirstCol = isCustomQuerySet() && customQuery.contains("$@$");

for (int i = 0; i < queryResult.size(); i++) {
List<Object> rawRow = queryResult.get(i);
row = new String[rawRow.size()];
for (int j = 0; j < rawRow.size(); j++) {
int startIndex = ignoreFirstCol ? 1 : 0;
int rowSize = ignoreFirstCol ? rawRow.size() - 1 : rawRow.size();
row = new String[rowSize];
for (int j = startIndex; j < rawRow.size(); j++) {
if (rawRow.get(j) != null) {
row[j] = rawRow.get(j).toString();
row[j - startIndex] = rawRow.get(j).toString();
} else {
row[j] = "";
row[j - startIndex] = "";
}
}
allRows.add(row);
Expand All @@ -194,9 +206,10 @@ public void createStatusFile() {
}

try {
Writer fileWriter = new FileWriter(file, false);
JSONValue.writeJSONString(statusFileJsonMap, fileWriter);
fileWriter.close();
file = new File(statusFilePath + "/" + statusFileName);
Writer fileWriter = new FileWriter(file, false);
JSONValue.writeJSONString(statusFileJsonMap, fileWriter);
fileWriter.close();
} catch (IOException e) {
LOG.error("Error creating value to status file!!!", e);
}
Expand All @@ -210,41 +223,57 @@ public void updateStatusFile() {
statusFileJsonMap.put(LAST_INDEX_STATUS_FILE, currentIndex);

try {
Writer fileWriter = new FileWriter(file, false);
JSONValue.writeJSONString(statusFileJsonMap, fileWriter);
fileWriter.close();
Writer fileWriter = new FileWriter(file, false);
JSONValue.writeJSONString(statusFileJsonMap, fileWriter);
fileWriter.close();

fileWriter = new FileWriter(bkFile, false);
JSONValue.writeJSONString(statusFileJsonMap, fileWriter);
fileWriter.close();
} catch (IOException e) {
LOG.error("Error writing incremental value to status file!!!", e);
}
}

private String getStatusFileIndex(String configuredStartValue) {

if (!isStatusFileCreated()) {
LOG.info("Status file not created, using start value from config file and creating file");
return configuredStartValue;
} else {
try {
FileReader fileReader = new FileReader(file);

JSONParser jsonParser = new JSONParser();
statusFileJsonMap = (Map) jsonParser.parse(fileReader);
checkJsonValues();
return statusFileJsonMap.get(LAST_INDEX_STATUS_FILE);

} catch (Exception e) {
LOG.error("Exception reading status file, doing back up and creating new status file", e);
backupStatusFile();
return configuredStartValue;
boolean useBackup;
if (!isStatusFileCreated()) {
LOG.info("Status file not created, using start value from config file and creating file");
return configuredStartValue;
} else {
JSONParser jsonParser = new JSONParser();
try {
FileReader fileReader = new FileReader(file);

statusFileJsonMap = (Map) jsonParser.parse(fileReader);
checkJsonValues();
return statusFileJsonMap.get(LAST_INDEX_STATUS_FILE);

} catch (Exception e) {
LOG.info("Exception reading status file, using copy");
}

try {
FileReader fileReader = new FileReader(bkFile);

statusFileJsonMap = (Map) jsonParser.parse(fileReader);
checkJsonValues();
return statusFileJsonMap.get(LAST_INDEX_STATUS_FILE);
} catch (Exception e) {
LOG.error("Exception reading copy file, doing back up and creating new status file", e);
backupStatusFile();
currentIndex = configuredStartValue;
createStatusFile();
return configuredStartValue;
}
}
}
}

private void checkJsonValues() throws ParseException {

// Check commons values to default and custom query
if (!statusFileJsonMap.containsKey(SOURCE_NAME_STATUS_FILE) || !statusFileJsonMap.containsKey(URL_STATUS_FILE) ||
!statusFileJsonMap.containsKey(LAST_INDEX_STATUS_FILE)) {
!statusFileJsonMap.containsKey(LAST_INDEX_STATUS_FILE)) {
LOG.error("Status file doesn't contains all required values");
throw new ParseException(ERROR_UNEXPECTED_EXCEPTION);
}
Expand Down Expand Up @@ -349,10 +378,6 @@ int getMaxRows() {
return maxRows;
}

String getQuery() {
return query;
}

String getConnectionURL() {
return connectionURL;
}
Expand All @@ -361,6 +386,10 @@ boolean isCustomQuerySet() {
return (customQuery != null);
}

boolean isDynamicTable() {
return table != null && table.contains(" ");
}

Context getContext() {
return context;
}
Expand Down
14 changes: 7 additions & 7 deletions src/test/java/org/keedio/flume/source/SQLSourceHelperTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void setup() {
when(context.getString("hibernate.connection.url")).thenReturn("jdbc:mysql://host:3306/database");
when(context.getString("table")).thenReturn("table");
when(context.getString("incremental.column.name")).thenReturn("incrementalColumName");
when(context.getString("status.file.path", "/var/lib/flume")).thenReturn("/tmp/flume");
when(context.getString("status.file.path", "/var/lib/flume")).thenReturn("./flume");
when(context.getString("columns.to.select", "*")).thenReturn("*");
when(context.getInteger("run.query.delay", 10000)).thenReturn(10000);
when(context.getInteger("batch.size", 100)).thenReturn(100);
Expand Down Expand Up @@ -87,15 +87,15 @@ public void getBatchSize() {
@Test
public void getQuery() {
SQLSourceHelper sqlSourceHelper = new SQLSourceHelper(context,"Source Name");
assertEquals("SELECT * FROM table",sqlSourceHelper.getQuery());
//assertEquals("SELECT * FROM table",sqlSourceHelper.getQuery());
}

@Test
public void getCustomQuery() {
when(context.getString("custom.query")).thenReturn("SELECT column FROM table");
when(context.getString("incremental.column")).thenReturn("incremental");
SQLSourceHelper sqlSourceHelper = new SQLSourceHelper(context,"Source Name");
assertEquals("SELECT column FROM table",sqlSourceHelper.getQuery());
//assertEquals("SELECT column FROM table",sqlSourceHelper.getQuery());
}

@Test
Expand Down Expand Up @@ -174,7 +174,7 @@ public void chekGetAllRows() {
public void createDirectory() {

SQLSourceHelper sqlSourceHelper = new SQLSourceHelper(context,"Source Name");
File file = new File("/tmp/flume");
File file = new File("./flume");
assertEquals(true, file.exists());
assertEquals(true, file.isDirectory());
if (file.exists()){
Expand All @@ -190,7 +190,7 @@ public void checkStatusFileCorrectlyCreated() {

sqlSourceHelper.updateStatusFile();

File file = new File("/tmp/flume/statusFileName.txt");
File file = new File("./flume/statusFileName.txt");
assertEquals(true, file.exists());
if (file.exists()){
file.delete();
Expand All @@ -203,7 +203,7 @@ public void checkStatusFileCorrectlyUpdated() throws Exception {

//File file = File.createTempFile("statusFileName", ".txt");

when(context.getString("status.file.path")).thenReturn("/var/lib/flume");
when(context.getString("status.file.path")).thenReturn("./flume");
when(context.getString("hibernate.connection.url")).thenReturn("jdbc:mysql://host:3306/database");
when(context.getString("table")).thenReturn("table");
when(context.getString("status.file.name")).thenReturn("statusFileName");
Expand Down Expand Up @@ -236,7 +236,7 @@ public void getPassword() {
public void deleteDirectory(){
try {

File file = new File("/tmp/flume");
File file = new File("./flume");
if (file.exists())
FileUtils.deleteDirectory(file);

Expand Down