2
2
3
3
import fi .hsl .common .pulsar .PulsarApplicationContext ;
4
4
import fi .hsl .common .transitdata .TransitdataProperties ;
5
+ import fi .hsl .common .transitdata .TransitdataSchema ;
5
6
import fi .hsl .common .transitdata .proto .PubtransTableProtos ;
6
7
import org .apache .pulsar .client .api .Producer ;
7
8
import org .apache .pulsar .client .api .TypedMessageBuilder ;
@@ -63,10 +64,12 @@ public static Optional<Long> toUtcEpochMs(String localTimestamp, String zoneId)
63
64
}
64
65
}
65
66
66
- abstract protected byte [] createPayload (ResultSet resultSet , PubtransTableProtos .Common common ) throws SQLException ;
67
+ abstract protected byte [] createPayload (ResultSet resultSet , PubtransTableProtos .Common common , PubtransTableProtos . DOITripInfo tripInfo ) throws SQLException ;
67
68
68
69
abstract protected String getTimetabledDateTimeColumnName ();
69
70
71
+ abstract protected TransitdataSchema getSchema ();
72
+
70
73
public Queue <TypedMessageBuilder <byte []>> handleResultSet (ResultSet resultSet ) throws SQLException {
71
74
72
75
Queue <TypedMessageBuilder <byte []>> messageBuilderQueue = new LinkedList <>();
@@ -83,10 +86,16 @@ public Queue<TypedMessageBuilder<byte[]>> handleResultSet(ResultSet resultSet) t
83
86
final String key = resultSet .getString ("IsOnDatedVehicleJourneyId" ) + resultSet .getString ("JourneyPatternSequenceNumber" );
84
87
final long dvjId = common .getIsOnDatedVehicleJourneyId ();
85
88
final long jppId = common .getIsTargetedAtJourneyPatternPointGid ();
86
- final byte [] data = createPayload (resultSet , common );
87
89
88
- Optional <TypedMessageBuilder <byte []>> maybeBuilder = createMessage (key , eventTimestampUtcMs , dvjId , jppId , data );
89
- maybeBuilder .ifPresent (messageBuilderQueue ::add );
90
+ Optional <PubtransTableProtos .DOITripInfo > maybeTripInfo = getTripInfo (dvjId , jppId );
91
+ if (!maybeTripInfo .isPresent ()) {
92
+ log .warn ("Could not find valid DOITripInfo from Redis for dvjId {}, jppId {}. Ignoring this update " , dvjId , jppId );
93
+ }
94
+ else {
95
+ final byte [] data = createPayload (resultSet , common , maybeTripInfo .get ());
96
+ TypedMessageBuilder <byte []> msgBuilder = createMessage (key , eventTimestampUtcMs , dvjId , data , getSchema ());
97
+ messageBuilderQueue .add (msgBuilder );
98
+ }
90
99
91
100
//Update latest ts for next round
92
101
if (eventTimestampUtcMs > tempTimeStamp ) {
@@ -133,77 +142,55 @@ protected PubtransTableProtos.Common parseCommon(ResultSet resultSet) throws SQL
133
142
return commonBuilder .build ();
134
143
}
135
144
136
-
137
- class JourneyInfo {
138
- String direction ;
139
- String routeName ;
140
- String startTime ;
141
- String operatingDay ;
142
-
143
- JourneyInfo (Map <String , String > map ) {
144
- direction = map .get (TransitdataProperties .KEY_DIRECTION );
145
- routeName = map .get (TransitdataProperties .KEY_ROUTE_NAME );
146
- startTime = map .get (TransitdataProperties .KEY_START_TIME );
147
- operatingDay = map .get (TransitdataProperties .KEY_OPERATING_DAY );
148
- if (!isValid ()) {
149
- //Let's print more info for debugging purposes:
150
- log .warn ("JourneyInfo is missing some fields. Content: " + this .toString ());
151
- }
152
- }
153
-
154
- boolean isValid () {
155
- return direction != null && routeName != null &&
156
- startTime != null && operatingDay != null ;
157
- }
158
-
159
- @ Override
160
- public String toString () {
161
- return new StringBuilder ()
162
- .append ("direction: " ).append (direction )
163
- .append (" routeName: " ).append (routeName )
164
- .append (" startTime: " ).append (startTime )
165
- .append (" operatingDay: " ).append (operatingDay )
166
- .toString ();
167
- }
168
- }
169
-
170
- private Optional <JourneyInfo > getJourneyInfo (long dvjId ) {
171
- String key = TransitdataProperties .REDIS_PREFIX_DVJ + Long .toString (dvjId );
172
- Optional <Map <String , String >> maybeJourneyInfoMap = Optional .ofNullable (jedis .hgetAll (key ));
173
-
174
- return maybeJourneyInfoMap
175
- .map (JourneyInfo ::new )
176
- .filter (JourneyInfo ::isValid );
145
+ private Optional <String > getStopId (long jppId ) {
146
+ String stopIdKey = TransitdataProperties .REDIS_PREFIX_JPP + Long .toString (jppId );
147
+ return Optional .ofNullable (jedis .get (stopIdKey ));
177
148
}
178
149
179
- private Optional <String > getStopId (long jppId ) {
180
- String key = TransitdataProperties .REDIS_PREFIX_JPP + Long .toString (jppId );
181
- return Optional .ofNullable (jedis .get ( key ));
150
+ private Optional <Map < String , String >> getTripInfoFields (long dvjId ) {
151
+ String tripInfoKey = TransitdataProperties .REDIS_PREFIX_DVJ + Long .toString (dvjId );
152
+ return Optional .ofNullable (jedis .hgetAll ( tripInfoKey ));
182
153
}
183
154
184
- Optional <TypedMessageBuilder <byte []>> createMessage (String key , long eventTime , long dvjId , long jppId , byte [] data ) {
185
- Optional <JourneyInfo > maybeJourneyInfo = getJourneyInfo (dvjId );
186
- if (!maybeJourneyInfo .isPresent ()) {
187
- log .warn ("Could not find valid JourneyInfo from Redis for dvjId " + dvjId );
155
+ protected Optional <PubtransTableProtos .DOITripInfo > getTripInfo (long dvjId , long jppId ) {
156
+ try {
157
+ Optional <String > maybeStopId = getStopId (jppId );
158
+ Optional <Map <String , String >> maybeTripInfoMap = getTripInfoFields (dvjId );
159
+
160
+ if (maybeStopId .isPresent () && maybeTripInfoMap .isPresent ()) {
161
+ PubtransTableProtos .DOITripInfo .Builder builder = PubtransTableProtos .DOITripInfo .newBuilder ();
162
+ builder .setStopId (maybeStopId .get ());
163
+ maybeTripInfoMap .ifPresent (map -> {
164
+ if (map .containsKey (TransitdataProperties .KEY_DIRECTION ))
165
+ builder .setDirectionId (Integer .parseInt (map .get (TransitdataProperties .KEY_DIRECTION )));
166
+ if (map .containsKey (TransitdataProperties .KEY_ROUTE_NAME ))
167
+ builder .setRouteId (map .get (TransitdataProperties .KEY_ROUTE_NAME ));
168
+ if (map .containsKey (TransitdataProperties .KEY_START_TIME ))
169
+ builder .setStartTime (map .get (TransitdataProperties .KEY_START_TIME ));
170
+ if (map .containsKey (TransitdataProperties .KEY_OPERATING_DAY ))
171
+ builder .setOperatingDay (map .get (TransitdataProperties .KEY_OPERATING_DAY ));
172
+ });
173
+ builder .setDvjId (dvjId );
174
+ return Optional .of (builder .build ());
175
+ }
176
+ else {
177
+ log .error ("Failed to get data from Redis for dvjId {}, jppId {}" , dvjId , jppId );
178
+ return Optional .empty ();
179
+ }
188
180
}
189
- Optional < String > maybeStopId = getStopId ( jppId );
190
- if (! maybeStopId . isPresent ()) {
191
- log . warn ( "Could not find StopId from Redis for dvjId " + dvjId );
181
+ catch ( Exception e ) {
182
+ log . error ( "Failed to get Trip Info for dvj-id " + dvjId , e );
183
+ return Optional . empty ( );
192
184
}
185
+ }
193
186
194
- return maybeJourneyInfo .flatMap (journeyInfo ->
195
- maybeStopId .map (stopId ->
196
- producer .newMessage ()
197
- .key (key )
198
- .eventTime (eventTime )
199
- .property (TransitdataProperties .KEY_DVJ_ID , Long .toString (dvjId ))
200
- .property (TransitdataProperties .KEY_PROTOBUF_SCHEMA , schema .toString ())
201
- .property (TransitdataProperties .KEY_DIRECTION , journeyInfo .direction )
202
- .property (TransitdataProperties .KEY_ROUTE_NAME , journeyInfo .routeName )
203
- .property (TransitdataProperties .KEY_START_TIME , journeyInfo .startTime )
204
- .property (TransitdataProperties .KEY_OPERATING_DAY , journeyInfo .operatingDay )
205
- .property (TransitdataProperties .KEY_STOP_ID , stopId )
206
- .value (data ))
207
- );
187
+ protected TypedMessageBuilder <byte []> createMessage (String key , long eventTime , long dvjId , byte [] data , TransitdataSchema schema ) {
188
+ return producer .newMessage ()
189
+ .key (key )
190
+ .eventTime (eventTime )
191
+ .property (TransitdataProperties .KEY_DVJ_ID , Long .toString (dvjId ))
192
+ .property (TransitdataProperties .KEY_PROTOBUF_SCHEMA , schema .schema .toString ())
193
+ .property (TransitdataProperties .KEY_SCHEMA_VERSION , Integer .toString (schema .schemaVersion .get ()))
194
+ .value (data );
208
195
}
209
196
}
0 commit comments