-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathMain.java
More file actions
157 lines (137 loc) · 6.83 KB
/
Main.java
File metadata and controls
157 lines (137 loc) · 6.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package fi.hsl.transitdata.pulsarpubtransconnect;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Scanner;
import java.io.File;
import java.sql.*;
import java.util.TimeZone;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.microsoft.sqlserver.jdbc.*;
import com.typesafe.config.Config;
import fi.hsl.common.config.ConfigParser;
import fi.hsl.common.config.ConfigUtils;
import fi.hsl.common.pulsar.PulsarApplication;
import fi.hsl.common.pulsar.PulsarApplicationContext;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.exceptions.JedisException;
public class Main {
private static final Logger log = LoggerFactory.getLogger(Main.class);
private static final int SQL_SERVER_ERROR_DEADLOCK_VICTIM = 1205;
/**
* Pubtrans stores modified timestamps in UTC format.
*
* We're creating java.sql.Timestamps(long epoch) which are also by definition in UTC format.
* We inject the query parameter via prepared statements into the SQL Server queries.
*
* However for some reason JDBC Driver seems to convert this epoch to local datetime before running the query.
* Using Calendar instances and java.time.* classes don't seem to work, since in our case
* the ResultSet is always empty because of the conversion.
*
* So far the only working solution has been to set the whole JVM to use UTC timezone, which is a hack.
* However it seems that we have to live with it until we find a better solution..
*
* See More:
* https://github.com/Microsoft/mssql-jdbc/issues/339
* https://social.msdn.microsoft.com/Forums/sqlserver/en-US/5b4fb9bf-fb9b-44eb-a7a2-222b92c31f5a/sql-server-jdbc-driver-datetimezone-related-query?forum=sqldataaccess
* */
static {
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
}
//JDBC connection needs executor for network timeout
private static final Executor SQL_TIMEOUT_EXECUTOR = Executors.newSingleThreadExecutor(runnable -> {
final Thread t = new Thread(runnable);
t.setDaemon(true);
return t;
});
public static void main(String[] args) {
log.info("Starting Pubtrans Source Application");
try {
String table = ConfigUtils.getEnvOrThrow("PT_TABLE");
PubtransTableType type = PubtransTableType.fromString(table);
Config config = null;
if (type == PubtransTableType.ROI_ARRIVAL) {
config = ConfigParser.createConfig("arrival.conf");
} else if (type == PubtransTableType.ROI_DEPARTURE) {
config = ConfigParser.createConfig("departure.conf");
} else {
log.error("Failed to get table name from PT_TABLE-env variable, exiting application");
System.exit(1);
}
final Duration networkTimeout = config.getDuration("pubtrans.networkTimeout");
Connection connection = createPubtransConnection();
connection.setNetworkTimeout(SQL_TIMEOUT_EXECUTOR, (int) networkTimeout.toMillis());
final PulsarApplication app = PulsarApplication.newInstance(config);
PulsarApplicationContext context = app.getContext();
final PubtransConnector connector = PubtransConnector.newInstance(connection, context, type);
final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
log.info("Starting scheduler");
scheduler.scheduleAtFixedRate(() -> {
try {
if (connector.checkPrecondition()) {
connector.queryAndProcessResults();
} else {
log.error("Pubtrans poller precondition failed, skipping the current poll cycle.");
}
} catch (SQLServerException sqlServerException) {
// Occasionally (once every 2 hours?) the driver throws us out as a deadlock victim.
// There's no easy way to fix the root problem so lets just convert it to warning.
// More info: https://stackoverflow.com/questions/8390322/cause-of-a-process-being-a-deadlock-victim
if (sqlServerException.getErrorCode() == SQL_SERVER_ERROR_DEADLOCK_VICTIM) {
log.warn("SQL Server evicted us as deadlock victim. ignoring this for now...",
sqlServerException);
} else {
log.error("SQL Server Unexpected error code, shutting down", sqlServerException);
log.warn("Driver Error code: {}", sqlServerException.getErrorCode());
closeApplication(app, scheduler);
}
} catch (JedisException | SQLException | PulsarClientException connectionException) {
log.error("Connection problem, cannot recover so shutting down", connectionException);
closeApplication(app, scheduler);
} catch (Exception e) {
log.error("Unknown error at Pubtrans scheduler, shutting down", e);
closeApplication(app, scheduler);
}
}, 0, 1, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("Exception at Main", e);
}
}
private static void closeApplication(PulsarApplication app, ScheduledExecutorService scheduler) {
log.warn("Closing application");
scheduler.shutdown();
app.close();
}
private static Connection createPubtransConnection() throws Exception {
String connectionString = "";
try {
//Default path is what works with Docker out-of-the-box. Override with a local file if needed
connectionString = System.getenv("TRANSITDATA_PUBTRANS_CONN_STRING");
} catch (Exception e) {
log.error("Failed to read Pubtrans connection string from secrets", e);
throw e;
}
if (connectionString.isEmpty()) {
throw new Exception("Failed to find Pubtrans connection string, exiting application");
}
Connection connection = null;
try {
if (connectionString.contains("postgresql")) {
//Load Postgres driver
Class.forName("org.postgresql.Driver");
}
connection = DriverManager.getConnection(connectionString);
log.info("Database connection created");
} catch (SQLException e) {
log.error("Failed to connect to Pubtrans database", e);
throw e;
}
return connection;
}
}