diff --git a/README.md b/README.md
index f0b15b1..3ab4f44 100644
--- a/README.md
+++ b/README.md
@@ -39,6 +39,9 @@ In your `logback.xml`:
false
100
+ true
+ data.
+
host
@@ -110,6 +113,8 @@ Configuration Reference
* `includeMdc` (optional, default false): If set to `true`, then all [MDC](http://www.slf4j.org/api/org/slf4j/MDC.html) values will be mapped to properties on the JSON payload.
* `maxMessageSize` (optional, default -1): If set to a number greater than 0, truncate messages larger than this length, then append "`..`" to denote that the message was truncated
* `authentication` (optional): Add the ability to send authentication headers (see below)
+ * `objectSerialization` (optional): specifies whether to use POJO to JSON serialization
+ * `keyPrefix` (optional): objects logged within a message will also be logged separately with this prefix added
The fields `@timestamp` and `message` are always sent and can not currently be configured. Additional fields can be sent by adding `` elements to the `` set.
diff --git a/pom.xml b/pom.xml
index 27f073b..a2040a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -67,6 +67,11 @@
jackson-core
2.8.0
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.8.0
+
com.amazonaws
aws-java-sdk-core
@@ -85,6 +90,12 @@
1.10.19
test
+
+
+ net.logstash.logback
+ logstash-logback-encoder
+ 6.3
+
diff --git a/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchAppender.java b/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchAppender.java
index b0e7cb1..bec6566 100644
--- a/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchAppender.java
+++ b/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchAppender.java
@@ -22,12 +22,28 @@ public abstract class AbstractElasticsearchAppender extends UnsynchronizedApp
public AbstractElasticsearchAppender() {
this.settings = new Settings();
this.headers = new HttpRequestHeaders();
+ registerShutdownHook();
}
public AbstractElasticsearchAppender(Settings settings) {
this.settings = settings;
+ registerShutdownHook();
}
+ private void registerShutdownHook() {
+ Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
+ }
+
+ private class ShutdownHook implements Runnable {
+ @Override
+ public void run() {
+ stop();
+ if(publisher != null) {
+ publisher.close();
+ }
+ }
+ }
+
@Override
public void start() {
super.start();
@@ -138,4 +154,13 @@ public void setAuthentication(Authentication auth) {
public void setMaxMessageSize(int maxMessageSize) {
settings.setMaxMessageSize(maxMessageSize);
}
+
+ public void setKeyPrefix(String keyPrefix) {
+ settings.setKeyPrefix(keyPrefix);
+ }
+
+ public void setObjectSerialization(boolean objectSerialization) {
+ settings.setObjectSerialization(objectSerialization);
+ }
+
}
diff --git a/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchPublisher.java b/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchPublisher.java
index 98d7034..816fc7e 100644
--- a/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchPublisher.java
+++ b/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchPublisher.java
@@ -3,6 +3,7 @@
import ch.qos.logback.core.Context;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.internetitem.logback.elasticsearch.config.ElasticsearchProperties;
import com.internetitem.logback.elasticsearch.config.HttpRequestHeaders;
import com.internetitem.logback.elasticsearch.config.Property;
@@ -51,6 +52,8 @@ protected DateFormat initialValue() {
private final PropertySerializer propertySerializer;
+ private Thread thread;
+
public AbstractElasticsearchPublisher(Context context, ErrorReporter errorReporter, Settings settings, ElasticsearchProperties properties, HttpRequestHeaders headers) throws IOException {
this.errorReporter = errorReporter;
this.events = new ArrayList();
@@ -59,7 +62,8 @@ public AbstractElasticsearchPublisher(Context context, ErrorReporter errorReport
this.outputAggregator = configureOutputAggregator(settings, errorReporter, headers);
- this.jf = new JsonFactory();
+ this.jf = buildJsonFactory(settings);
+
this.jf.setRootValueSeparator(null);
this.jsonGenerator = jf.createGenerator(outputAggregator);
@@ -67,6 +71,20 @@ public AbstractElasticsearchPublisher(Context context, ErrorReporter errorReport
this.propertyList = generatePropertyList(context, properties);
this.propertySerializer = new PropertySerializer();
+ }
+
+ public void close() {
+ if(thread != null) {
+ thread.interrupt();
+ }
+ }
+
+ private JsonFactory buildJsonFactory(Settings settings) {
+ if(settings.isObjectSerialization()) {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.getFactory();
+ }
+ return new JsonFactory();
}
private static ElasticsearchOutputAggregator configureOutputAggregator(Settings settings, ErrorReporter errorReporter, HttpRequestHeaders httpRequestHeaders) {
@@ -108,7 +126,7 @@ public void addEvent(T event) {
events.add(event);
if (!working) {
working = true;
- Thread thread = new Thread(this, THREAD_NAME_PREFIX + THREAD_COUNTER.getAndIncrement());
+ thread = new Thread(this, THREAD_NAME_PREFIX + THREAD_COUNTER.getAndIncrement());
thread.start();
}
}
@@ -119,7 +137,11 @@ public void run() {
int maxRetries = settings.getMaxRetries();
while (true) {
try {
- Thread.sleep(settings.getSleepTime());
+ try {
+ Thread.sleep(settings.getSleepTime());
+ } catch(InterruptedException e) {
+ // we are waking up the thread
+ }
List eventsCopy = null;
synchronized (lock) {
@@ -152,6 +174,7 @@ public void run() {
if (!outputAggregator.sendData()) {
currentTry++;
}
+
} catch (Exception e) {
errorReporter.logError("Internal error handling log data: " + e.getMessage(), e);
currentTry++;
diff --git a/src/main/java/com/internetitem/logback/elasticsearch/StructuredArgsElasticsearchAppender.java b/src/main/java/com/internetitem/logback/elasticsearch/StructuredArgsElasticsearchAppender.java
new file mode 100644
index 0000000..c1bf2b8
--- /dev/null
+++ b/src/main/java/com/internetitem/logback/elasticsearch/StructuredArgsElasticsearchAppender.java
@@ -0,0 +1,21 @@
+package com.internetitem.logback.elasticsearch;
+
+import com.internetitem.logback.elasticsearch.config.Settings;
+
+import java.io.IOException;
+
+public class StructuredArgsElasticsearchAppender extends ElasticsearchAppender {
+
+ public StructuredArgsElasticsearchAppender() {
+ }
+
+ public StructuredArgsElasticsearchAppender(Settings settings) {
+ super(settings);
+ }
+
+ protected StructuredArgsElasticsearchPublisher buildElasticsearchPublisher() throws IOException {
+ return new StructuredArgsElasticsearchPublisher(this.getContext(), this.errorReporter, this.settings,
+ this.elasticsearchProperties, this.headers);
+ }
+
+}
diff --git a/src/main/java/com/internetitem/logback/elasticsearch/StructuredArgsElasticsearchPublisher.java b/src/main/java/com/internetitem/logback/elasticsearch/StructuredArgsElasticsearchPublisher.java
new file mode 100644
index 0000000..428f5bd
--- /dev/null
+++ b/src/main/java/com/internetitem/logback/elasticsearch/StructuredArgsElasticsearchPublisher.java
@@ -0,0 +1,67 @@
+package com.internetitem.logback.elasticsearch;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.Context;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.internetitem.logback.elasticsearch.config.ElasticsearchProperties;
+import com.internetitem.logback.elasticsearch.config.HttpRequestHeaders;
+import com.internetitem.logback.elasticsearch.config.Settings;
+import com.internetitem.logback.elasticsearch.util.ErrorReporter;
+import net.logstash.logback.marker.ObjectAppendingMarker;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+public class StructuredArgsElasticsearchPublisher extends ClassicElasticsearchPublisher {
+ private String keyPrefix;
+ private Field field;
+ private ErrorReporter errorReporter;
+
+ public StructuredArgsElasticsearchPublisher(Context context, ErrorReporter errorReporter, Settings settings, ElasticsearchProperties properties,
+ HttpRequestHeaders headers) throws IOException {
+ super(context, errorReporter, settings, properties, headers);
+
+ this.errorReporter = errorReporter;
+
+ keyPrefix = "";
+ if(settings != null && settings.getKeyPrefix() != null) {
+ keyPrefix = settings.getKeyPrefix();
+ }
+
+ try {
+ field = ObjectAppendingMarker.class.getDeclaredField("object");
+ field.setAccessible(true);
+ } catch (NoSuchFieldException e) {
+ // message will be logged without object
+ errorReporter.logError("error in logging with object serialization", e);
+ }
+ }
+
+ protected void serializeCommonFields(JsonGenerator gen, ILoggingEvent event) throws IOException {
+ super.serializeCommonFields(gen, event);
+
+ if(event.getArgumentArray() != null) {
+ Object[] eventArgs = event.getArgumentArray();
+ for(Object eventArg:eventArgs) {
+ if(eventArg instanceof ObjectAppendingMarker) {
+ ObjectAppendingMarker marker = (ObjectAppendingMarker) eventArg;
+ if(field != null && settings != null && settings.isObjectSerialization() &&
+ marker.getFieldValue().toString().contains("@")) {
+ try {
+ Object obj = field.get(marker);
+ if(obj != null) {
+ gen.writeObjectField(keyPrefix + marker.getFieldName(), obj);
+ }
+ } catch (IllegalAccessException e) {
+ // message will be logged without object
+ errorReporter.logError("error in logging with object serialization", e);
+ }
+ }
+ else
+ gen.writeObjectField(keyPrefix + marker.getFieldName(), marker.getFieldValue());
+ }
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/com/internetitem/logback/elasticsearch/config/Settings.java b/src/main/java/com/internetitem/logback/elasticsearch/config/Settings.java
index e349b47..ac56973 100644
--- a/src/main/java/com/internetitem/logback/elasticsearch/config/Settings.java
+++ b/src/main/java/com/internetitem/logback/elasticsearch/config/Settings.java
@@ -23,6 +23,8 @@ public class Settings {
private int maxQueueSize = 100 * 1024 * 1024;
private Authentication authentication;
private int maxMessageSize = -1;
+ private String keyPrefix;
+ private boolean objectSerialization;
public String getIndex() {
return index;
@@ -162,4 +164,21 @@ public int getMaxMessageSize() {
public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
+
+ public String getKeyPrefix() {
+ return this.keyPrefix;
+ }
+
+ public void setKeyPrefix(String keyPrefix) {
+ this.keyPrefix = keyPrefix;
+ }
+
+ public boolean isObjectSerialization() {
+ return objectSerialization;
+ }
+
+ public void setObjectSerialization(boolean objectSerialization) {
+ this.objectSerialization = objectSerialization;
+ }
+
}
diff --git a/src/main/java/com/internetitem/logback/elasticsearch/writer/ElasticsearchWriter.java b/src/main/java/com/internetitem/logback/elasticsearch/writer/ElasticsearchWriter.java
index 93def3c..0018ffd 100644
--- a/src/main/java/com/internetitem/logback/elasticsearch/writer/ElasticsearchWriter.java
+++ b/src/main/java/com/internetitem/logback/elasticsearch/writer/ElasticsearchWriter.java
@@ -8,11 +8,13 @@
import java.net.HttpURLConnection;
import java.util.Collection;
import java.util.Collections;
+import java.util.zip.GZIPOutputStream;
import com.internetitem.logback.elasticsearch.config.HttpRequestHeader;
import com.internetitem.logback.elasticsearch.config.HttpRequestHeaders;
import com.internetitem.logback.elasticsearch.config.Settings;
import com.internetitem.logback.elasticsearch.util.ErrorReporter;
+import org.apache.http.HttpHeaders;
public class ElasticsearchWriter implements SafeWriter {
@@ -23,6 +25,7 @@ public class ElasticsearchWriter implements SafeWriter {
private Collection headerList;
private boolean bufferExceeded;
+ private boolean compressedTransfer;
public ElasticsearchWriter(ErrorReporter errorReporter, Settings settings, HttpRequestHeaders headers) {
this.errorReporter = errorReporter;
@@ -32,6 +35,13 @@ public ElasticsearchWriter(ErrorReporter errorReporter, Settings settings, HttpR
: Collections.emptyList();
this.sendBuffer = new StringBuilder();
+ compressedTransfer = false;
+ for(HttpRequestHeader header : this.headerList) {
+ if(header.getName().toLowerCase().equals(HttpHeaders.CONTENT_ENCODING.toLowerCase()) && header.getValue().equals("gzip")) {
+ compressedTransfer = true;
+ break;
+ }
+ }
}
public void write(char[] cbuf, int off, int len) {
@@ -72,10 +82,7 @@ public void sendData() throws IOException {
settings.getAuthentication().addAuth(urlConnection, body);
}
- Writer writer = new OutputStreamWriter(urlConnection.getOutputStream(), "UTF-8");
- writer.write(body);
- writer.flush();
- writer.close();
+ writeData(urlConnection, body);
int rc = urlConnection.getResponseCode();
if (rc != 200) {
@@ -98,18 +105,18 @@ public boolean hasPendingData() {
}
private static String slurpErrors(HttpURLConnection urlConnection) {
- try {
- InputStream stream = urlConnection.getErrorStream();
+ try (InputStream stream = urlConnection.getErrorStream()) {
if (stream == null) {
return "";
}
StringBuilder builder = new StringBuilder();
- InputStreamReader reader = new InputStreamReader(stream, "UTF-8");
- char[] buf = new char[2048];
- int numRead;
- while ((numRead = reader.read(buf)) > 0) {
- builder.append(buf, 0, numRead);
+ try(InputStreamReader reader = new InputStreamReader(stream, "UTF-8")) {
+ char[] buf = new char[2048];
+ int numRead;
+ while ((numRead = reader.read(buf)) > 0) {
+ builder.append(buf, 0, numRead);
+ }
}
return builder.toString();
} catch (Exception e) {
@@ -117,4 +124,18 @@ private static String slurpErrors(HttpURLConnection urlConnection) {
}
}
+ private void writeData(HttpURLConnection urlConnection, String body) throws IOException {
+ if(this.compressedTransfer) {
+ try(Writer writer = new OutputStreamWriter(new GZIPOutputStream(urlConnection.getOutputStream()), "UTF-8")) {
+ writer.write(body);
+ writer.flush();
+ }
+ } else {
+ try(Writer writer = new OutputStreamWriter(urlConnection.getOutputStream(), "UTF-8")) {
+ writer.write(body);
+ writer.flush();
+ }
+ }
+ }
+
}