1010import org .slf4j .LoggerFactory ;
1111import redis .clients .jedis .Jedis ;
1212
13+ import java .sql .PreparedStatement ;
1314import java .sql .ResultSet ;
1415import java .sql .SQLException ;
1516import java .time .LocalDateTime ;
@@ -25,6 +26,8 @@ public abstract class PubtransTableHandler {
2526 private Jedis jedis ;
2627 private final String timeZone ;
2728 private final boolean excludeMetroTrips ;
29+ private record QueryResultItem (PubtransTableProtos .Common common , String key , long eventTimestampUtcMs ,
30+ Map <String , Long > columnToIdMap ) {};
2831
2932 public PubtransTableHandler (PulsarApplicationContext context , TransitdataProperties .ProtobufSchema handlerSchema ) {
3033 lastModifiedTimeStamp = (System .currentTimeMillis () - 5000 );
@@ -62,35 +65,54 @@ public static Optional<Long> toUtcEpochMs(String localTimestamp, String zoneId)
6265 return Optional .empty ();
6366 }
6467 }
68+
69+ abstract protected Map <String , Long > getTableColumnToIdMap (ResultSet resultSet ) throws SQLException ;
6570
66- abstract protected byte [] createPayload (ResultSet resultSet , PubtransTableProtos .Common common , PubtransTableProtos .DOITripInfo tripInfo ) throws SQLException ;
71+ abstract protected byte [] createPayload (
72+ PubtransTableProtos .Common common , Map <String , Long > columnToIdMap ,
73+ PubtransTableProtos .DOITripInfo tripInfo ) throws SQLException ;
6774
6875 abstract protected String getTimetabledDateTimeColumnName ();
6976
7077 abstract protected TransitdataSchema getSchema ();
7178
72- public Collection <TypedMessageBuilder <byte []>> handleResultSet (ResultSet resultSet ) throws SQLException {
79+ public Collection <TypedMessageBuilder <byte []>> handleResultSet (
80+ ResultSet resultSet , PreparedStatement statement , long queryStartTime )
81+ throws SQLException {
7382 List <TypedMessageBuilder <byte []>> messageBuilderQueue = new ArrayList <>();
7483
7584 long tempTimeStamp = getLastModifiedTimeStamp ();
7685
7786 int count = 0 ;
7887 int metroTripCount = 0 ;
7988 Set <String > metroRouteIds = new HashSet <>();
89+ List <QueryResultItem > queryResultItems = new ArrayList <>();
90+ long queryDuration = -1L ;
91+ long resultHandlerDuration = -1L ;
92+ long queryAndResultHandlerDuration = -1L ;
8093
8194 while (resultSet .next ()) {
8295 count ++;
83-
96+
8497 PubtransTableProtos .Common common = parseCommon (resultSet );
8598 final long eventTimestampUtcMs = common .getLastModifiedUtcDateTimeMs ();
86-
99+
87100 final long delay = System .currentTimeMillis () - eventTimestampUtcMs ;
88101 log .debug ("Delay between current time and estimate publish time is {} ms" , delay );
89-
102+
90103 final String key = resultSet .getString ("IsOnDatedVehicleJourneyId" ) + resultSet .getString ("JourneyPatternSequenceNumber" );
91- final long dvjId = common .getIsOnDatedVehicleJourneyId ();
92- final long scheduledJppId = common .getIsTimetabledAtJourneyPatternPointGid ();
93- final long targetedJppId = common .getIsTargetedAtJourneyPatternPointGid ();
104+ final Map <String , Long > columnToIdMap = getTableColumnToIdMap (resultSet );
105+ queryResultItems .add (new QueryResultItem (common , key , eventTimestampUtcMs , columnToIdMap ));
106+ }
107+
108+ PubtransConnector .closeQuery (resultSet , statement );
109+ queryDuration = System .currentTimeMillis () - queryStartTime ;
110+
111+ for (QueryResultItem queryResultItem : queryResultItems ) {
112+ final long resultHandlerStartTime = System .currentTimeMillis ();
113+ final long dvjId = queryResultItem .common .getIsOnDatedVehicleJourneyId ();
114+ final long scheduledJppId = queryResultItem .common .getIsTimetabledAtJourneyPatternPointGid ();
115+ final long targetedJppId = queryResultItem .common .getIsTargetedAtJourneyPatternPointGid ();
94116
95117 Optional <PubtransTableProtos .DOITripInfo > maybeTripInfo = getTripInfo (dvjId , scheduledJppId , targetedJppId );
96118 if (maybeTripInfo .isEmpty ()) {
@@ -102,21 +124,26 @@ public Collection<TypedMessageBuilder<byte[]>> handleResultSet(ResultSet resultS
102124 metroTripCount ++;
103125 metroRouteIds .add (tripInfo .getRouteId ());
104126 } else {
105- final byte [] data = createPayload (resultSet , common , tripInfo );
106- TypedMessageBuilder <byte []> msgBuilder = createMessage (key , eventTimestampUtcMs , dvjId , data , getSchema ());
127+ final byte [] data = createPayload (queryResultItem .common , queryResultItem .columnToIdMap , tripInfo );
128+ TypedMessageBuilder <byte []> msgBuilder = createMessage (queryResultItem .key ,
129+ queryResultItem .eventTimestampUtcMs , dvjId , data , getSchema ());
107130 messageBuilderQueue .add (msgBuilder );
108131 }
109132 }
110133
111134 //Update latest ts for next round
112- if (eventTimestampUtcMs > tempTimeStamp ) {
113- tempTimeStamp = eventTimestampUtcMs ;
135+ if (queryResultItem . eventTimestampUtcMs > tempTimeStamp ) {
136+ tempTimeStamp = queryResultItem . eventTimestampUtcMs ;
114137 }
138+ resultHandlerDuration = System .currentTimeMillis () - resultHandlerStartTime ;
139+ queryAndResultHandlerDuration = System .currentTimeMillis () - queryStartTime ;
115140 }
116-
117- log .info ("{} rows processed from the result set. {} rows skipped with metro trips (route ids: {})" ,
118- count , metroTripCount , metroRouteIds );
119-
141+
142+ log .info ("{} rows processed from the result set. {} rows skipped with metro trips (route ids: {}). "
143+ + "Operation took {} (db query took {}, handling results took {})" ,
144+ count , metroTripCount , metroRouteIds ,
145+ getMinSec (queryAndResultHandlerDuration ), getMinSec (queryDuration ), getMinSec (resultHandlerDuration ));
146+
120147 setLastModifiedTimeStamp (tempTimeStamp );
121148
122149 return messageBuilderQueue ;
@@ -155,6 +182,13 @@ protected PubtransTableProtos.Common parseCommon(ResultSet resultSet) throws SQL
155182 commonBuilder .setLastModifiedUtcDateTimeMs (eventTimestampUtcMs );
156183 return commonBuilder .build ();
157184 }
185+
186+ private String getMinSec (long durationMs ) {
187+ long seconds = durationMs / 1000 ;
188+ long minutes = seconds / 60 ;
189+ long remainingSeconds = seconds % 60 ;
190+ return String .format ("%d min %d sec" , minutes , remainingSeconds );
191+ }
158192
159193 private Optional <String > getStopId (long jppId ) {
160194 synchronized (jedis ) {
0 commit comments