From cf8e3e26f4951c7bf5fbae83f12dff2e5b57f53a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20J=C3=A4rvinen?= <timo.jarvinen@yahoo.com> Date: Thu, 13 Mar 2025 17:01:45 +0200 Subject: [PATCH 1/7] Add logging --- .../pulsarpubtransconnect/PubtransConnector.java | 7 ++++++- .../pulsarpubtransconnect/PubtransTableHandler.java | 9 ++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransConnector.java b/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransConnector.java index ec1c982..3034ed6 100644 --- a/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransConnector.java +++ b/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransConnector.java @@ -118,12 +118,17 @@ public void queryAndProcessResults() throws SQLException, PulsarClientException try { statement = connection.prepareStatement(queryString); - statement.setTimestamp(1, new java.sql.Timestamp(handler.getLastModifiedTimeStamp())); + Timestamp lastModifiedTimestamp = new Timestamp(handler.getLastModifiedTimeStamp()); + statement.setTimestamp(1, lastModifiedTimestamp); statement.setQueryTimeout(queryTimeoutSecs); + + log.info("Executing query. LastModifiedTimestamp: {}. QueryTimeoutSecs: {}", lastModifiedTimestamp, queryTimeoutSecs); + log.info("SQL: {}", queryString); resultSet = statement.executeQuery(); produceMessages(handler.handleResultSet(resultSet)); + log.info("RouteIds: {}", handler.getRouteIds()); } finally { if (resultSet != null) try { resultSet.close(); } catch (Exception e) { log.error("Exception while closing result set", e); } if (statement != null) try { statement.close(); } catch (Exception e) { log.error("Exception while closing statement", e); } diff --git a/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java b/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java index 52985e7..2511e7b 100644 --- a/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java +++ b/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java @@ -24,6 +24,7 @@ public abstract class PubtransTableHandler { final TransitdataProperties.ProtobufSchema schema; private Jedis jedis; private final String timeZone; + private Set<String> routeIds = new HashSet<>(); public PubtransTableHandler(PulsarApplicationContext context, TransitdataProperties.ProtobufSchema handlerSchema) { lastModifiedTimeStamp = (System.currentTimeMillis() - 5000); @@ -32,6 +33,10 @@ public PubtransTableHandler(PulsarApplicationContext context, TransitdataPropert timeZone = context.getConfig().getString("pubtrans.timezone"); schema = handlerSchema; } + + public Set<String> getRouteIds() { + return routeIds; + } public long getLastModifiedTimeStamp() { return this.lastModifiedTimeStamp; @@ -92,7 +97,9 @@ public Collection<TypedMessageBuilder<byte[]>> handleResultSet(ResultSet resultS if (maybeTripInfo.isEmpty()) { log.warn("Could not find valid DOITripInfo from Redis for dvjId {}, timetabledJppId {}, targetedJppId {}. Ignoring this update ", dvjId, scheduledJppId, targetedJppId); } else { - final byte[] data = createPayload(resultSet, common, maybeTripInfo.get()); + PubtransTableProtos.DOITripInfo tripInfo = maybeTripInfo.get(); + routeIds.add(tripInfo.getRouteId()); + final byte[] data = createPayload(resultSet, common, tripInfo); TypedMessageBuilder<byte[]> msgBuilder = createMessage(key, eventTimestampUtcMs, dvjId, data, getSchema()); messageBuilderQueue.add(msgBuilder); } From ced026c1048603a63130a339355684e9bbcac0a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20J=C3=A4rvinen?= <timo.jarvinen@yahoo.com> Date: Fri, 14 Mar 2025 14:19:43 +0200 Subject: [PATCH 2/7] Revert "Add logging" This reverts commit cf8e3e26f4951c7bf5fbae83f12dff2e5b57f53a. --- .../pulsarpubtransconnect/PubtransConnector.java | 7 +------ .../pulsarpubtransconnect/PubtransTableHandler.java | 9 +-------- 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransConnector.java b/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransConnector.java index 3034ed6..ec1c982 100644 --- a/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransConnector.java +++ b/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransConnector.java @@ -118,17 +118,12 @@ public void queryAndProcessResults() throws SQLException, PulsarClientException try { statement = connection.prepareStatement(queryString); - Timestamp lastModifiedTimestamp = new Timestamp(handler.getLastModifiedTimeStamp()); - statement.setTimestamp(1, lastModifiedTimestamp); + statement.setTimestamp(1, new java.sql.Timestamp(handler.getLastModifiedTimeStamp())); statement.setQueryTimeout(queryTimeoutSecs); - - log.info("Executing query. LastModifiedTimestamp: {}. QueryTimeoutSecs: {}", lastModifiedTimestamp, queryTimeoutSecs); - log.info("SQL: {}", queryString); resultSet = statement.executeQuery(); produceMessages(handler.handleResultSet(resultSet)); - log.info("RouteIds: {}", handler.getRouteIds()); } finally { if (resultSet != null) try { resultSet.close(); } catch (Exception e) { log.error("Exception while closing result set", e); } if (statement != null) try { statement.close(); } catch (Exception e) { log.error("Exception while closing statement", e); } diff --git a/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java b/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java index 2511e7b..52985e7 100644 --- a/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java +++ b/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java @@ -24,7 +24,6 @@ public abstract class PubtransTableHandler { final TransitdataProperties.ProtobufSchema schema; private Jedis jedis; private final String timeZone; - private Set<String> routeIds = new HashSet<>(); public PubtransTableHandler(PulsarApplicationContext context, TransitdataProperties.ProtobufSchema handlerSchema) { lastModifiedTimeStamp = (System.currentTimeMillis() - 5000); @@ -33,10 +32,6 @@ public PubtransTableHandler(PulsarApplicationContext context, TransitdataPropert timeZone = context.getConfig().getString("pubtrans.timezone"); schema = handlerSchema; } - - public Set<String> getRouteIds() { - return routeIds; - } public long getLastModifiedTimeStamp() { return this.lastModifiedTimeStamp; @@ -97,9 +92,7 @@ public Collection<TypedMessageBuilder<byte[]>> handleResultSet(ResultSet resultS if (maybeTripInfo.isEmpty()) { log.warn("Could not find valid DOITripInfo from Redis for dvjId {}, timetabledJppId {}, targetedJppId {}. Ignoring this update ", dvjId, scheduledJppId, targetedJppId); } else { - PubtransTableProtos.DOITripInfo tripInfo = maybeTripInfo.get(); - routeIds.add(tripInfo.getRouteId()); - final byte[] data = createPayload(resultSet, common, tripInfo); + final byte[] data = createPayload(resultSet, common, maybeTripInfo.get()); TypedMessageBuilder<byte[]> msgBuilder = createMessage(key, eventTimestampUtcMs, dvjId, data, getSchema()); messageBuilderQueue.add(msgBuilder); } From dbf49e89d06a43a0e976b38411cace2373dd2e06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20J=C3=A4rvinen?= <timo.jarvinen@yahoo.com> Date: Fri, 14 Mar 2025 14:39:09 +0200 Subject: [PATCH 3/7] AB#49889: Metro trips are skipped in result set handling --- .../pulsarpubtransconnect/PubtransTableHandler.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java b/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java index 52985e7..d51dbee 100644 --- a/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java +++ b/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java @@ -92,9 +92,15 @@ public Collection<TypedMessageBuilder<byte[]>> handleResultSet(ResultSet resultS if (maybeTripInfo.isEmpty()) { log.warn("Could not find valid DOITripInfo from Redis for dvjId {}, timetabledJppId {}, targetedJppId {}. Ignoring this update ", dvjId, scheduledJppId, targetedJppId); } else { - final byte[] data = createPayload(resultSet, common, maybeTripInfo.get()); - TypedMessageBuilder<byte[]> msgBuilder = createMessage(key, eventTimestampUtcMs, dvjId, data, getSchema()); - messageBuilderQueue.add(msgBuilder); + PubtransTableProtos.DOITripInfo tripInfo = maybeTripInfo.get(); + + if (tripInfo.getRouteId().startsWith("31M")) { + log.info("Metro trip skipped. RouteId: {}", tripInfo.getRouteId()); + } else { + final byte[] data = createPayload(resultSet, common, tripInfo); + TypedMessageBuilder<byte[]> msgBuilder = createMessage(key, eventTimestampUtcMs, dvjId, data, getSchema()); + messageBuilderQueue.add(msgBuilder); + } } //Update latest ts for next round From 4c509ac8194778d4665f3c1e4d430eb33912b601 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20J=C3=A4rvinen?= <timo.jarvinen@yahoo.com> Date: Fri, 14 Mar 2025 14:58:00 +0200 Subject: [PATCH 4/7] AB#49889: Reduce logging --- .../pulsarpubtransconnect/PubtransTableHandler.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java b/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java index d51dbee..ee6a32a 100644 --- a/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java +++ b/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java @@ -73,10 +73,10 @@ public Collection<TypedMessageBuilder<byte[]>> handleResultSet(ResultSet resultS long tempTimeStamp = getLastModifiedTimeStamp(); int count = 0; + int metroTripCount = 0; + Set<String> metroRouteIds = new HashSet<>(); while (resultSet.next()) { - count++; - PubtransTableProtos.Common common = parseCommon(resultSet); final long eventTimestampUtcMs = common.getLastModifiedUtcDateTimeMs(); @@ -95,8 +95,10 @@ public Collection<TypedMessageBuilder<byte[]>> handleResultSet(ResultSet resultS PubtransTableProtos.DOITripInfo tripInfo = maybeTripInfo.get(); if (tripInfo.getRouteId().startsWith("31M")) { - log.info("Metro trip skipped. RouteId: {}", tripInfo.getRouteId()); + metroTripCount++; + metroRouteIds.add(tripInfo.getRouteId()); } else { + count++; final byte[] data = createPayload(resultSet, common, tripInfo); TypedMessageBuilder<byte[]> msgBuilder = createMessage(key, eventTimestampUtcMs, dvjId, data, getSchema()); messageBuilderQueue.add(msgBuilder); @@ -109,7 +111,8 @@ public Collection<TypedMessageBuilder<byte[]>> handleResultSet(ResultSet resultS } } - log.info("{} rows processed from result set", count); + log.info("{} rows processed from result set. Skipped {} rows with metro trips (route ids: {})", + count, metroTripCount, metroRouteIds); setLastModifiedTimeStamp(tempTimeStamp); From c92b5339dceb150d1f3a0a3b6176422795637e76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20J=C3=A4rvinen?= <timo.jarvinen@yahoo.com> Date: Wed, 19 Mar 2025 15:24:11 +0200 Subject: [PATCH 5/7] AB#49889: Add environment variable excludeMetroTrips --- .../pulsarpubtransconnect/PubtransTableHandler.java | 4 +++- src/main/resources/arrival.conf | 2 ++ src/main/resources/departure.conf | 2 ++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java b/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java index ee6a32a..0da12d3 100644 --- a/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java +++ b/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java @@ -24,12 +24,14 @@ public abstract class PubtransTableHandler { final TransitdataProperties.ProtobufSchema schema; private Jedis jedis; private final String timeZone; + private final boolean excludeMetroTrips; public PubtransTableHandler(PulsarApplicationContext context, TransitdataProperties.ProtobufSchema handlerSchema) { lastModifiedTimeStamp = (System.currentTimeMillis() - 5000); jedis = context.getJedis(); producer = context.getSingleProducer(); timeZone = context.getConfig().getString("pubtrans.timezone"); + excludeMetroTrips = context.getConfig().getBoolean("pubtrans.excludeMetroTrips"); schema = handlerSchema; } @@ -94,7 +96,7 @@ public Collection<TypedMessageBuilder<byte[]>> handleResultSet(ResultSet resultS } else { PubtransTableProtos.DOITripInfo tripInfo = maybeTripInfo.get(); - if (tripInfo.getRouteId().startsWith("31M")) { + if (excludeMetroTrips && tripInfo.getRouteId().startsWith("31M")) { metroTripCount++; metroRouteIds.add(tripInfo.getRouteId()); } else { diff --git a/src/main/resources/arrival.conf b/src/main/resources/arrival.conf index 027498f..285191a 100644 --- a/src/main/resources/arrival.conf +++ b/src/main/resources/arrival.conf @@ -37,4 +37,6 @@ application { enableCacheTimestampCheck=${?ENABLE_CACHE_TIMESTAMP_CHECK} cacheMaxAgeInMinutes=180 cacheMaxAgeInMinutes=${?CACHE_MAX_AGE_IN_MINS} + excludeMetroTrips=true + excludeMetroTrips=${?EXCLUDE_METRO_TRIPS} } \ No newline at end of file diff --git a/src/main/resources/departure.conf b/src/main/resources/departure.conf index 711d8af..6b04caa 100644 --- a/src/main/resources/departure.conf +++ b/src/main/resources/departure.conf @@ -37,4 +37,6 @@ application { enableCacheTimestampCheck=${?ENABLE_CACHE_TIMESTAMP_CHECK} cacheMaxAgeInMinutes=180 cacheMaxAgeInMinutes=${?CACHE_MAX_AGE_IN_MINS} + excludeMetroTrips=true + excludeMetroTrips=${?EXCLUDE_METRO_TRIPS} } \ No newline at end of file From 548ea88aa51f57b76924c5dac0e69769e9999a27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20J=C3=A4rvinen?= <timo.jarvinen@yahoo.com> Date: Thu, 20 Mar 2025 14:45:17 +0200 Subject: [PATCH 6/7] AB#49889: Fix path to environment variable excludeMetroTrips --- .../transitdata/pulsarpubtransconnect/PubtransTableHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java b/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java index 0da12d3..007fe1e 100644 --- a/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java +++ b/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java @@ -31,7 +31,7 @@ public PubtransTableHandler(PulsarApplicationContext context, TransitdataPropert jedis = context.getJedis(); producer = context.getSingleProducer(); timeZone = context.getConfig().getString("pubtrans.timezone"); - excludeMetroTrips = context.getConfig().getBoolean("pubtrans.excludeMetroTrips"); + excludeMetroTrips = context.getConfig().getBoolean("application.excludeMetroTrips"); schema = handlerSchema; } From 7fa03960164c9afacbf16ee3cc749807d71a053b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20J=C3=A4rvinen?= <timo.jarvinen@yahoo.com> Date: Thu, 20 Mar 2025 15:03:24 +0200 Subject: [PATCH 7/7] AB#49889: Skipped metro trip count logged in separate line --- .../pulsarpubtransconnect/PubtransTableHandler.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java b/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java index 007fe1e..c145a08 100644 --- a/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java +++ b/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java @@ -113,9 +113,11 @@ public Collection<TypedMessageBuilder<byte[]>> handleResultSet(ResultSet resultS } } - log.info("{} rows processed from result set. Skipped {} rows with metro trips (route ids: {})", - count, metroTripCount, metroRouteIds); - + log.info("{} rows processed from result set.", count); + if (metroTripCount > 0) { + log.info("Skipped {} rows with metro trips (route ids: {})", metroTripCount, metroRouteIds); + } + setLastModifiedTimeStamp(tempTimeStamp); return messageBuilderQueue;