@@ -38,11 +38,11 @@ public class PubtransConnector {
3838 private RedisStore redisStore ;
3939 private Producer <byte []> producer ;
4040
41- private PubtransConnector () {}
41+ private PubtransConnector () {
42+ }
4243
43- public static PubtransConnector newInstance (Connection connection ,
44- PulsarApplicationContext context ,
45- PubtransTableType tableType ) throws RuntimeException {
44+ public static PubtransConnector newInstance (Connection connection , PulsarApplicationContext context ,
45+ PubtransTableType tableType ) throws RuntimeException {
4646 PubtransConnector connector = new PubtransConnector ();
4747
4848 connector .connection = connection ;
@@ -53,19 +53,20 @@ public static PubtransConnector newInstance(Connection connection,
5353 connector .queryString = queryString (config );
5454 connector .enableCacheCheck = config .getBoolean ("application.enableCacheTimestampCheck" );
5555 connector .cacheMaxAgeInMins = config .getInt ("application.cacheMaxAgeInMinutes" );
56- connector .queryTimeoutSecs = (int )config .getDuration ("pubtrans.queryTimeout" , TimeUnit .SECONDS );
56+ connector .queryTimeoutSecs = (int ) config .getDuration ("pubtrans.queryTimeout" , TimeUnit .SECONDS );
5757
58- log .info ("Cache pre-condition enabled: {} with max age {}" , connector .enableCacheCheck , connector .cacheMaxAgeInMins );
58+ log .info ("Cache pre-condition enabled: {} with max age {}" , connector .enableCacheCheck ,
59+ connector .cacheMaxAgeInMins );
5960
6061 log .info ("TableType: " + tableType );
6162 switch (tableType ) {
62- case ROI_ARRIVAL :
63+ case ROI_ARRIVAL :
6364 connector .handler = new ArrivalHandler (context );
6465 break ;
65- case ROI_DEPARTURE :
66+ case ROI_DEPARTURE :
6667 connector .handler = new DepartureHandler (context );
6768 break ;
68- default :
69+ default :
6970 throw new IllegalArgumentException ("Table type not supported" );
7071 }
7172 return connector ;
@@ -75,16 +76,9 @@ private static String queryString(Config config) {
7576 String longName = config .getString ("pubtrans.longName" );
7677 String shortName = config .getString ("pubtrans.shortName" );
7778
78- return "SELECT * FROM " +
79- longName +
80- " AS " +
81- shortName +
82- " WHERE " +
83- shortName + ".LastModifiedUTCDateTime > ? " +
84- " ORDER BY " +
85- shortName + ".LastModifiedUTCDateTime, " +
86- shortName + ".IsOnDatedVehicleJourneyId, " +
87- shortName + ".JourneyPatternSequenceNumber DESC" ;
79+ return "SELECT * FROM " + longName + " AS " + shortName + " WHERE " + shortName
80+ + ".LastModifiedUTCDateTime > ? " + " ORDER BY " + shortName + ".LastModifiedUTCDateTime, " + shortName
81+ + ".IsOnDatedVehicleJourneyId, " + shortName + ".JourneyPatternSequenceNumber DESC" ;
8882 }
8983
9084 public boolean checkPrecondition () {
@@ -95,10 +89,11 @@ public boolean checkPrecondition() {
9589 log .info ("Cache last known update: {}" , lastUpdate );
9690 if (lastUpdate .isPresent ()) {
9791 OffsetDateTime dt = OffsetDateTime .parse (lastUpdate .get (), DateTimeFormatter .ISO_DATE_TIME );
98- return isCacheValid (dt , cacheMaxAgeInMins );
99- } else {
100- log .error ("Could not find last cache update timestamp from redis" );
101- return false ;
92+ return isCacheValid (dt , cacheMaxAgeInMins );
93+ } else {
94+ log .error ("Could not find last cache update timestamp from redis" );
95+ return false ;
96+
10297 }
10398 }
10499
@@ -109,10 +104,11 @@ static boolean isCacheValid(OffsetDateTime lastCacheUpdate, final int cacheMaxAg
109104 final long secondsSinceUpdate = Duration .between (lastCacheUpdate , now ).get (ChronoUnit .SECONDS );
110105 final long minutesSinceUpdate = Math .floorDiv (secondsSinceUpdate , 60 );
111106 log .info ("Minutes since last cache update: {}" , minutesSinceUpdate );
112- log .info ("Current time {}, last update {}} => mins from prev update: {}" , now , lastCacheUpdate , minutesSinceUpdate );
107+ log .info ("Current time {}, last update {}} => mins from prev update: {}" , now , lastCacheUpdate ,
108+ minutesSinceUpdate );
113109 return minutesSinceUpdate <= cacheMaxAgeInMins ;
114110 }
115-
111+
116112 static void closeQuery (final ResultSet resultSet , final Statement statement ) {
117113 if (resultSet != null ) {
118114 try {
@@ -147,7 +143,7 @@ public void queryAndProcessResults() throws SQLException, PulsarClientException
147143 statement .setQueryTimeout (queryTimeoutSecs );
148144
149145 resultSet = statement .executeQuery ();
150-
146+
151147 produceMessages (handler .handleResultSet (resultSet , statement , queryStartTime ));
152148 } catch (PulsarClientException | SQLException e ) {
153149 closeQuery (resultSet , statement );
@@ -161,17 +157,16 @@ private void produceMessages(Collection<TypedMessageBuilder<byte[]>> messages) t
161157 }
162158
163159 for (TypedMessageBuilder <byte []> msg : messages ) {
164- msg .sendAsync ()
165- .exceptionally (throwable -> {
166- log .error ("Failed to send Pulsar message" , throwable );
167- return null ;
168- });
160+ msg .sendAsync ().exceptionally (throwable -> {
161+ log .error ("Failed to send Pulsar message" , throwable );
162+ return null ;
163+ });
169164
170165 }
171166 //If we want to get Pulsar Exceptions to bubble up into this thread we need to do a sync flush for all pending messages.
172167 producer .flush ();
173168
174- log .info ("{} messages written. Latest timestamp: {} Total query and processing time: {} ms" , messages .size (), handler .getLastModifiedTimeStamp (), System .currentTimeMillis () - this .queryStartTime );
169+ log .info ("{} messages written. Latest timestamp: {} Total query and processing time: {} ms" , messages .size (),
170+ handler .getLastModifiedTimeStamp (), System .currentTimeMillis () - this .queryStartTime );
175171 }
176172}
177-
0 commit comments