11package fi .hsl .transitdata .pulsarpubtransconnect ;
22
33import fi .hsl .common .pulsar .PulsarApplicationContext ;
4+ import fi .hsl .common .redis .RedisStore ;
45import fi .hsl .common .transitdata .TransitdataProperties ;
56import fi .hsl .common .transitdata .TransitdataSchema ;
67import fi .hsl .common .transitdata .proto .PubtransTableProtos ;
78import org .apache .pulsar .client .api .Producer ;
89import org .apache .pulsar .client .api .TypedMessageBuilder ;
910import org .slf4j .Logger ;
1011import org .slf4j .LoggerFactory ;
11- import redis .clients .jedis .Jedis ;
1212
1313import java .sql .PreparedStatement ;
1414import java .sql .ResultSet ;
1515import java .sql .SQLException ;
1616import java .time .LocalDateTime ;
1717import java .time .ZoneId ;
18- import java .util .*;
18+ import java .util .ArrayList ;
19+ import java .util .Collection ;
20+ import java .util .HashSet ;
21+ import java .util .List ;
22+ import java .util .Map ;
23+ import java .util .Optional ;
24+ import java .util .Set ;
25+
26+ import static fi .hsl .common .transitdata .TransitdataProperties .REDIS_PREFIX_DVJ ;
27+ import static fi .hsl .common .transitdata .TransitdataProperties .REDIS_PREFIX_JPP ;
1928
2029public abstract class PubtransTableHandler {
2130 static final Logger log = LoggerFactory .getLogger (PubtransTableHandler .class );
2231
2332 private long lastModifiedTimeStamp ;
2433 Producer <byte []> producer ;
2534 final TransitdataProperties .ProtobufSchema schema ;
26- private Jedis jedis ;
35+ private RedisStore redisStore ;
2736 private final String timeZone ;
2837 private final boolean excludeMetroTrips ;
38+
2939 private record QueryResultItem (PubtransTableProtos .Common common , String key , long eventTimestampUtcMs ,
30- Map <String , Long > columnToIdMap ) {};
40+ Map <String , Long > columnToIdMap ) {
41+ }
42+
43+ ;
3144
3245 public PubtransTableHandler (PulsarApplicationContext context , TransitdataProperties .ProtobufSchema handlerSchema ) {
3346 lastModifiedTimeStamp = (System .currentTimeMillis () - 5000 );
34- jedis = context .getJedis ();
47+ redisStore = context .getRedisStore ();
3548 producer = context .getSingleProducer ();
3649 timeZone = context .getConfig ().getString ("pubtrans.timezone" );
3750 excludeMetroTrips = context .getConfig ().getBoolean ("application.excludeMetroTrips" );
@@ -59,17 +72,16 @@ public static Optional<Long> toUtcEpochMs(String localTimestamp, String zoneId)
5972 ZoneId zone = ZoneId .of (zoneId );
6073 long epochMs = dt .atZone (zone ).toInstant ().toEpochMilli ();
6174 return Optional .of (epochMs );
62- }
63- catch (Exception e ) {
75+ } catch (Exception e ) {
6476 log .error ("Failed to parse datetime from " + localTimestamp , e );
6577 return Optional .empty ();
6678 }
6779 }
68-
80+
6981 public static float getSeconds (long durationMs ) {
7082 return durationMs / 1000.0f ;
7183 }
72-
84+
7385 abstract protected Map <String , Long > getTableColumnToIdMap (ResultSet resultSet ) throws SQLException ;
7486
7587 abstract protected byte [] createPayload (
@@ -97,22 +109,22 @@ public Collection<TypedMessageBuilder<byte[]>> handleResultSet(
97109
98110 while (resultSet .next ()) {
99111 count ++;
100-
112+
101113 PubtransTableProtos .Common common = parseCommon (resultSet );
102114 final long eventTimestampUtcMs = common .getLastModifiedUtcDateTimeMs ();
103-
115+
104116 final long delay = System .currentTimeMillis () - eventTimestampUtcMs ;
105117 log .debug ("Delay between current time and estimate publish time is {} ms" , delay );
106-
118+
107119 final String key = resultSet .getString ("IsOnDatedVehicleJourneyId" ) + resultSet .getString ("JourneyPatternSequenceNumber" );
108120 final Map <String , Long > columnToIdMap = getTableColumnToIdMap (resultSet );
109121 queryResultItems .add (new QueryResultItem (common , key , eventTimestampUtcMs , columnToIdMap ));
110122 }
111-
123+
112124 PubtransConnector .closeQuery (resultSet , statement );
113125 long queryEndTime = System .currentTimeMillis ();
114126 queryDuration = queryEndTime - queryStartTime ;
115-
127+
116128 for (QueryResultItem queryResultItem : queryResultItems ) {
117129 final long dvjId = queryResultItem .common .getIsOnDatedVehicleJourneyId ();
118130 final long scheduledJppId = queryResultItem .common .getIsTimetabledAtJourneyPatternPointGid ();
@@ -123,7 +135,7 @@ public Collection<TypedMessageBuilder<byte[]>> handleResultSet(
123135 log .warn ("Could not find valid DOITripInfo from Redis for dvjId {}, timetabledJppId {}, targetedJppId {}. Ignoring this update " , dvjId , scheduledJppId , targetedJppId );
124136 } else {
125137 PubtransTableProtos .DOITripInfo tripInfo = maybeTripInfo .get ();
126-
138+
127139 if (excludeMetroTrips && tripInfo .getRouteId ().startsWith ("31M" )) {
128140 metroTripCount ++;
129141 metroRouteIds .add (tripInfo .getRouteId ());
@@ -140,16 +152,16 @@ public Collection<TypedMessageBuilder<byte[]>> handleResultSet(
140152 tempTimeStamp = queryResultItem .eventTimestampUtcMs ;
141153 }
142154 }
143-
155+
144156 long endTime = System .currentTimeMillis ();
145157 resultHandlerDuration = endTime - queryEndTime ;
146158 queryAndResultHandlerDuration = endTime - queryStartTime ;
147-
159+
148160 log .info ("{} rows processed from the result set. {} rows skipped with metro trips (route ids: {}). "
149161 + "Operation took {} s (db query took {} s, handling results took {} s)" ,
150162 count , metroTripCount , metroRouteIds ,
151163 getSeconds (queryAndResultHandlerDuration ), getSeconds (queryDuration ), getSeconds (resultHandlerDuration ));
152-
164+
153165 setLastModifiedTimeStamp (tempTimeStamp );
154166
155167 return messageBuilderQueue ;
@@ -190,17 +202,11 @@ protected PubtransTableProtos.Common parseCommon(ResultSet resultSet) throws SQL
190202 }
191203
192204 private Optional <String > getStopId (long jppId ) {
193- synchronized (jedis ) {
194- String stopIdKey = TransitdataProperties .REDIS_PREFIX_JPP + jppId ;
195- return Optional .ofNullable (jedis .get (stopIdKey ));
196- }
205+ return redisStore .getValue (REDIS_PREFIX_JPP + jppId );
197206 }
198207
199208 private Optional <Map <String , String >> getTripInfoFields (long dvjId ) {
200- synchronized (jedis ) {
201- String tripInfoKey = TransitdataProperties .REDIS_PREFIX_DVJ + dvjId ;
202- return Optional .ofNullable (jedis .hgetAll (tripInfoKey ));
203- }
209+ return redisStore .getValues (REDIS_PREFIX_DVJ + dvjId );
204210 }
205211
206212 protected Optional <PubtransTableProtos .DOITripInfo > getTripInfo (long dvjId , long scheduledJppId , long targetedJppId ) {
@@ -211,7 +217,7 @@ protected Optional<PubtransTableProtos.DOITripInfo> getTripInfo(long dvjId, long
211217
212218 if (maybeScheduledStopId .isPresent () && maybeTripInfoMap .isPresent ()) {
213219 PubtransTableProtos .DOITripInfo .Builder builder = PubtransTableProtos .DOITripInfo .newBuilder ();
214-
220+
215221 builder .setStopId (maybeScheduledStopId .get ());
216222 maybeTargetedStopId .ifPresent (builder ::setTargetedStopId );
217223
@@ -227,13 +233,11 @@ protected Optional<PubtransTableProtos.DOITripInfo> getTripInfo(long dvjId, long
227233 });
228234 builder .setDvjId (dvjId );
229235 return Optional .of (builder .build ());
230- }
231- else {
236+ } else {
232237 log .error ("Failed to get data from Redis for dvjId {}, timetabledJppId {}" , dvjId , scheduledJppId );
233238 return Optional .empty ();
234239 }
235- }
236- catch (Exception e ) {
240+ } catch (Exception e ) {
237241 log .warn ("Failed to get Trip Info for dvj-id " + dvjId , e );
238242 return Optional .empty ();
239243 }
0 commit comments