From 4c410204975ecb3c21fe59f9f6a5b11e53a81651 Mon Sep 17 00:00:00 2001
From: Strongest Number 9 <16169054+StrongestNumber9@users.noreply.github.com>
Date: Thu, 27 Jul 2023 17:04:54 +0300
Subject: [PATCH] stop threads (#74)
* Interrupt threads before shutdown
* Set logging to debug level on dev.watch()
* Use new version of rlo_12 and rlo_13 with differnt way of handling DEW threads to enable graceful shutdown
* Removes unnecessary logging messages
---
example/combined.yaml | 13 ++-
example/pods/first-pod.json | 2 +
example/pods/k8s_01.json | 2 +-
example/pods/receiver.json | 13 ++-
pom.xml | 2 +-
.../teragrep/k8s_01/KubernetesLogReader.java | 81 ++++++++-----------
.../java/com/teragrep/k8s_01/RelpOutput.java | 4 +-
7 files changed, 60 insertions(+), 57 deletions(-)
diff --git a/example/combined.yaml b/example/combined.yaml
index 5ae4582..12465b5 100644
--- a/example/combined.yaml
+++ b/example/combined.yaml
@@ -181,6 +181,8 @@ metadata:
app: first-pod-appname
app-stderr: first-pod-appname-stderr
env: dev
+ hostname: first-pod
+ hostname-stderr: first-pod-stderr
name: first-pod
namespace: default
spec:
@@ -268,7 +270,7 @@ spec:
restartPolicy: Always
serviceAccount: kubelogreader
serviceAccountName: kubelogreader
- terminationGracePeriodSeconds: 0
+ terminationGracePeriodSeconds: 30
volumes:
- configMap:
name: app-config-8dtf4m92md
@@ -308,6 +310,13 @@ spec:
- /usr/bin/java
image: ghcr.io/teragrep/rlp_07/app:latest
imagePullPolicy: IfNotPresent
+ lifecycle:
+ preStop:
+ exec:
+ command:
+ - sh
+ - -c
+ - echo presleep; sleep 10;
name: first-pod
ports:
- containerPort: 1601
@@ -321,7 +330,7 @@ spec:
imagePullSecrets:
- name: ghcr.io
restartPolicy: Always
- terminationGracePeriodSeconds: 0
+ terminationGracePeriodSeconds: 30
volumes:
- configMap:
name: receiver-config-cf8g6bh7tg
diff --git a/example/pods/first-pod.json b/example/pods/first-pod.json
index 98cff30..4e0109d 100644
--- a/example/pods/first-pod.json
+++ b/example/pods/first-pod.json
@@ -3,6 +3,8 @@
"name": "first-pod",
"namespace": "default",
"labels": {
+ "hostname": "first-pod",
+ "hostname-stderr": "first-pod-stderr",
"app": "first-pod-appname",
"app-stderr": "first-pod-appname-stderr",
"env": "dev"
diff --git a/example/pods/k8s_01.json b/example/pods/k8s_01.json
index 35842a5..4da7604 100644
--- a/example/pods/k8s_01.json
+++ b/example/pods/k8s_01.json
@@ -15,7 +15,7 @@
"serviceAccount": "kubelogreader",
"serviceAccountName": "kubelogreader",
"dnsPolicy": "ClusterFirst",
- "terminationGracePeriodSeconds": 0,
+ "terminationGracePeriodSeconds": 30,
"volumes": [
{
"name": "app-config",
diff --git a/example/pods/receiver.json b/example/pods/receiver.json
index 7387927..982a7f6 100644
--- a/example/pods/receiver.json
+++ b/example/pods/receiver.json
@@ -12,7 +12,7 @@
"spec": {
"hostname": "receiver",
"dnsPolicy": "ClusterFirst",
- "terminationGracePeriodSeconds": 0,
+ "terminationGracePeriodSeconds": 30,
"volumes": [
{
"name": "receiver-config",
@@ -39,6 +39,17 @@
"name": "receiver-config"
}
],
+ "lifecycle": {
+ "preStop": {
+ "exec": {
+ "command": [
+ "sh",
+ "-c",
+ "echo presleep; sleep 10;"
+ ]
+ }
+ }
+ },
"command": [
"/usr/bin/java"
],
diff --git a/pom.xml b/pom.xml
index 3a034d1..eca38ed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,7 +45,7 @@
com.teragrep
rlo_13
- 1.2.0
+ 1.2.1
diff --git a/src/main/java/com/teragrep/k8s_01/KubernetesLogReader.java b/src/main/java/com/teragrep/k8s_01/KubernetesLogReader.java
index 790b075..1d9e2aa 100644
--- a/src/main/java/com/teragrep/k8s_01/KubernetesLogReader.java
+++ b/src/main/java/com/teragrep/k8s_01/KubernetesLogReader.java
@@ -41,7 +41,7 @@ public class KubernetesLogReader {
private static final Logger LOGGER = LoggerFactory.getLogger(KubernetesLogReader.class);
private static final MetricRegistry metricRegistry = new MetricRegistry();
static Gson gson = new Gson();
- public static void main(String[] args) throws IOException {
+ public static void main(String[] args) throws IOException, InterruptedException {
AppConfig appConfig;
try {
try(InputStreamReader isr = new InputStreamReader(Files.newInputStream(Paths.get("etc/config.json")), StandardCharsets.UTF_8)) {
@@ -127,7 +127,7 @@ public static void main(String[] args) throws IOException {
Arrays.toString(logfiles)
);
- List threads = new ArrayList<>();
+ List dews = new ArrayList<>();
String statesStore = System.getProperty("user.dir") + "/var";
LOGGER.debug(
"Using {} as statestore",
@@ -143,8 +143,20 @@ public static void main(String[] args) throws IOException {
// Graceful shutdown so Relp sessions are gracefully terminated
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOGGER.info("Shutting down.");
+ for(DirectoryEventWatcher dew : dews) {
+ LOGGER.debug("Shutting down dew " + dew);
+ try {
+ dew.stop();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ LOGGER.info(
+ "Disconnecting {} relp threads",
+ outputThreads
+ );
for(int i=1; i <= outputThreads; i++) {
- LOGGER.info(
+ LOGGER.debug(
"Disconnecting relp thread #{}/{}",
i,
outputThreads
@@ -164,52 +176,23 @@ public static void main(String[] args) throws IOException {
// Start a new thread for all logfile watchers
for (String logfile : logfiles) {
- Thread thread = new Thread(() -> {
- LOGGER.debug(
- "Starting new DirectoryEventWatcher thread on directory '{}' with pattern '{}'",
- appConfig.getKubernetes().getLogdir(),
- logfile
- );
- try {
- DirectoryEventWatcher dew = new DirectoryEventWatcher(
- Paths.get(appConfig.getKubernetes().getLogdir()),
- false,
- Pattern.compile(logfile),
- statefulFileReader,
- 500,
- TimeUnit.MILLISECONDS,
- appConfig.getKubernetes().getMaxLogReadingThreads()
- );
- dew.watch();
- } catch (IOException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- });
- thread.setName("DEW-" + threads.size());
- thread.start();
- threads.add(thread);
- }
-
- // FIXME: Is this necessary
- for (Thread thread : threads) {
- if(LOGGER.isTraceEnabled()) {
- LOGGER.trace(
- "Waiting for thread {}#{} to finish",
- thread.getName(),
- thread.getId()
- );
- }
- try {
- thread.join();
- } catch (InterruptedException e) {
- LOGGER.error(
- "Failed to stop thread {}#{}:",
- thread.getName(),
- thread.getId(),
- e
- );
- throw new RuntimeException(e);
- }
+ LOGGER.debug(
+ "Starting new DirectoryEventWatcher on directory '{}' with pattern '{}'",
+ appConfig.getKubernetes().getLogdir(),
+ logfile
+ );
+ DirectoryEventWatcher dew = new DirectoryEventWatcher(
+ Paths.get(appConfig.getKubernetes().getLogdir()),
+ false,
+ Pattern.compile(logfile),
+ statefulFileReader,
+ 500,
+ TimeUnit.MILLISECONDS,
+ appConfig.getKubernetes().getMaxLogReadingThreads()
+ );
+ dew.start();
+ dews.add(dew);
}
+ Thread.sleep(Long.MAX_VALUE);
}
}
diff --git a/src/main/java/com/teragrep/k8s_01/RelpOutput.java b/src/main/java/com/teragrep/k8s_01/RelpOutput.java
index 5f16ab0..943768f 100644
--- a/src/main/java/com/teragrep/k8s_01/RelpOutput.java
+++ b/src/main/java/com/teragrep/k8s_01/RelpOutput.java
@@ -122,13 +122,11 @@ public void disconnect() {
totalConnections.dec();
relpConnection.disconnect();
} catch (IOException | TimeoutException e) {
- LOGGER.debug(
+ LOGGER.info(
"[#{}] Had to teardown connection",
getId()
);
relpConnection.tearDown();
- throughputErrors.mark();
- throw new RuntimeException(e);
}
}