diff --git a/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java b/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java index 52985e7..c145a08 100644 --- a/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java +++ b/src/main/java/fi/hsl/transitdata/pulsarpubtransconnect/PubtransTableHandler.java @@ -24,12 +24,14 @@ public abstract class PubtransTableHandler { final TransitdataProperties.ProtobufSchema schema; private Jedis jedis; private final String timeZone; + private final boolean excludeMetroTrips; public PubtransTableHandler(PulsarApplicationContext context, TransitdataProperties.ProtobufSchema handlerSchema) { lastModifiedTimeStamp = (System.currentTimeMillis() - 5000); jedis = context.getJedis(); producer = context.getSingleProducer(); timeZone = context.getConfig().getString("pubtrans.timezone"); + excludeMetroTrips = context.getConfig().getBoolean("application.excludeMetroTrips"); schema = handlerSchema; } @@ -73,10 +75,10 @@ public Collection<TypedMessageBuilder<byte[]>> handleResultSet(ResultSet resultS long tempTimeStamp = getLastModifiedTimeStamp(); int count = 0; + int metroTripCount = 0; + Set<String> metroRouteIds = new HashSet<>(); while (resultSet.next()) { - count++; - PubtransTableProtos.Common common = parseCommon(resultSet); final long eventTimestampUtcMs = common.getLastModifiedUtcDateTimeMs(); @@ -92,9 +94,17 @@ public Collection<TypedMessageBuilder<byte[]>> handleResultSet(ResultSet resultS if (maybeTripInfo.isEmpty()) { log.warn("Could not find valid DOITripInfo from Redis for dvjId {}, timetabledJppId {}, targetedJppId {}. Ignoring this update ", dvjId, scheduledJppId, targetedJppId); } else { - final byte[] data = createPayload(resultSet, common, maybeTripInfo.get()); - TypedMessageBuilder<byte[]> msgBuilder = createMessage(key, eventTimestampUtcMs, dvjId, data, getSchema()); - messageBuilderQueue.add(msgBuilder); + PubtransTableProtos.DOITripInfo tripInfo = maybeTripInfo.get(); + + if (excludeMetroTrips && tripInfo.getRouteId().startsWith("31M")) { + metroTripCount++; + metroRouteIds.add(tripInfo.getRouteId()); + } else { + count++; + final byte[] data = createPayload(resultSet, common, tripInfo); + TypedMessageBuilder<byte[]> msgBuilder = createMessage(key, eventTimestampUtcMs, dvjId, data, getSchema()); + messageBuilderQueue.add(msgBuilder); + } } //Update latest ts for next round @@ -103,8 +113,11 @@ public Collection<TypedMessageBuilder<byte[]>> handleResultSet(ResultSet resultS } } - log.info("{} rows processed from result set", count); - + log.info("{} rows processed from result set.", count); + if (metroTripCount > 0) { + log.info("Skipped {} rows with metro trips (route ids: {})", metroTripCount, metroRouteIds); + } + setLastModifiedTimeStamp(tempTimeStamp); return messageBuilderQueue; diff --git a/src/main/resources/arrival.conf b/src/main/resources/arrival.conf index 027498f..285191a 100644 --- a/src/main/resources/arrival.conf +++ b/src/main/resources/arrival.conf @@ -37,4 +37,6 @@ application { enableCacheTimestampCheck=${?ENABLE_CACHE_TIMESTAMP_CHECK} cacheMaxAgeInMinutes=180 cacheMaxAgeInMinutes=${?CACHE_MAX_AGE_IN_MINS} + excludeMetroTrips=true + excludeMetroTrips=${?EXCLUDE_METRO_TRIPS} } \ No newline at end of file diff --git a/src/main/resources/departure.conf b/src/main/resources/departure.conf index 711d8af..6b04caa 100644 --- a/src/main/resources/departure.conf +++ b/src/main/resources/departure.conf @@ -37,4 +37,6 @@ application { enableCacheTimestampCheck=${?ENABLE_CACHE_TIMESTAMP_CHECK} cacheMaxAgeInMinutes=180 cacheMaxAgeInMinutes=${?CACHE_MAX_AGE_IN_MINS} + excludeMetroTrips=true + excludeMetroTrips=${?EXCLUDE_METRO_TRIPS} } \ No newline at end of file