Skip to content

Metro trips are skipped in result set handling #58

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 28, 2025
Merged
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>fi.hsl.transitdata</groupId>
<artifactId>transitdata-pubtrans-source</artifactId>
<version>2.0.0</version>
<version>2.0.1</version>
<packaging>jar</packaging>

<repositories>
Expand All @@ -16,7 +16,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<common.version>2.0.1</common.version>
<common.version>2.0.2</common.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ public abstract class PubtransTableHandler {
final TransitdataProperties.ProtobufSchema schema;
private Jedis jedis;
private final String timeZone;
private final boolean excludeMetroTrips;

public PubtransTableHandler(PulsarApplicationContext context, TransitdataProperties.ProtobufSchema handlerSchema) {
lastModifiedTimeStamp = (System.currentTimeMillis() - 5000);
jedis = context.getJedis();
producer = context.getSingleProducer();
timeZone = context.getConfig().getString("pubtrans.timezone");
excludeMetroTrips = context.getConfig().getBoolean("application.excludeMetroTrips");
schema = handlerSchema;
}

Expand Down Expand Up @@ -73,6 +75,8 @@ public Collection<TypedMessageBuilder<byte[]>> handleResultSet(ResultSet resultS
long tempTimeStamp = getLastModifiedTimeStamp();

int count = 0;
int metroTripCount = 0;
Set<String> metroRouteIds = new HashSet<>();

while (resultSet.next()) {
count++;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand why the meaning of count was changed to the count of included rows only. Is the log message used in monitoring?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the log message is not used in monitoring. To know that the filtering works, I want see how many metro trips where excluded, if any. Some other options to implement this:

A) Bring the line count++ back to it's original position and add new variable: int includedCount

B) Modify the log message and the name of the count variable => int includedCount
From:
log.info("{} rows processed from result set.", count);
To:
log.info("{} rows included from result set.", includedCount);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metroTripCount already answers to: "To know that the filtering works, I want see how many metro trips where excluded, if any." Do you mean that you wish to know the included count?

How about one log line with both the original meaning of count and metroTripCount (even if zero): "{} rows processed from the result set. {} rows skipped with metro trips (route ids: {})"? If you wish, you could also calculate includedCount = count - metroTripCount into the same log line.

I'd like us to be able to tell that some rows were received even if for some reason everything was excluded, e.g. if there are only metro rows in the database for some reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see PR: #59

Expand All @@ -92,9 +96,16 @@ public Collection<TypedMessageBuilder<byte[]>> handleResultSet(ResultSet resultS
if (maybeTripInfo.isEmpty()) {
log.warn("Could not find valid DOITripInfo from Redis for dvjId {}, timetabledJppId {}, targetedJppId {}. Ignoring this update ", dvjId, scheduledJppId, targetedJppId);
} else {
final byte[] data = createPayload(resultSet, common, maybeTripInfo.get());
TypedMessageBuilder<byte[]> msgBuilder = createMessage(key, eventTimestampUtcMs, dvjId, data, getSchema());
messageBuilderQueue.add(msgBuilder);
PubtransTableProtos.DOITripInfo tripInfo = maybeTripInfo.get();

if (excludeMetroTrips && tripInfo.getRouteId().startsWith("31M")) {
metroTripCount++;
metroRouteIds.add(tripInfo.getRouteId());
} else {
final byte[] data = createPayload(resultSet, common, tripInfo);
TypedMessageBuilder<byte[]> msgBuilder = createMessage(key, eventTimestampUtcMs, dvjId, data, getSchema());
messageBuilderQueue.add(msgBuilder);
}
}

//Update latest ts for next round
Expand All @@ -103,7 +114,8 @@ public Collection<TypedMessageBuilder<byte[]>> handleResultSet(ResultSet resultS
}
}

log.info("{} rows processed from result set", count);
log.info("{} rows processed from the result set. {} rows skipped with metro trips (route ids: {})",
count, metroTripCount, metroRouteIds);

setLastModifiedTimeStamp(tempTimeStamp);

Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/arrival.conf
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ application {
enableCacheTimestampCheck=${?ENABLE_CACHE_TIMESTAMP_CHECK}
cacheMaxAgeInMinutes=180
cacheMaxAgeInMinutes=${?CACHE_MAX_AGE_IN_MINS}
excludeMetroTrips=true
excludeMetroTrips=${?EXCLUDE_METRO_TRIPS}
}
2 changes: 2 additions & 0 deletions src/main/resources/departure.conf
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ application {
enableCacheTimestampCheck=${?ENABLE_CACHE_TIMESTAMP_CHECK}
cacheMaxAgeInMinutes=180
cacheMaxAgeInMinutes=${?CACHE_MAX_AGE_IN_MINS}
excludeMetroTrips=true
excludeMetroTrips=${?EXCLUDE_METRO_TRIPS}
}
Loading