Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/test-and-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ jobs:
distribution: 'temurin'
java-version: '11'
cache: 'maven'
- name: Run Spotless Apply
run: mvn spotless:apply
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Cache Maven packages
uses: actions/cache@v4
with:
Expand Down
18 changes: 18 additions & 0 deletions eclipse-java-formatter.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<profiles version="13">
<profile kind="CodeFormatterProfile" name="Custom-4spaces-NoCommentSplit" version="13">

<setting id="org.eclipse.jdt.core.formatter.tabulation.char" value="space"/>
<setting id="org.eclipse.jdt.core.formatter.tabulation.size" value="4"/>
<setting id="org.eclipse.jdt.core.formatter.indentation.size" value="4"/>
<setting id="org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations" value="false"/>

<setting id="org.eclipse.jdt.core.formatter.lineSplit" value="120"/>

<setting id="org.eclipse.jdt.core.formatter.comment.line_length" value="9999"/>
<setting id="org.eclipse.jdt.core.formatter.comment.format_line_comments" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.comment.format_block_comments" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.comment.format_javadoc_comments" value="false"/>

</profile>
</profiles>
30 changes: 29 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<common.version>2.0.3-RC6</common.version>
<common.version>2.0.5</common.version>
<spotlessMavenPlugin.version>2.43.0</spotlessMavenPlugin.version>
<googleJavaFormat.version>1.17.0</googleJavaFormat.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -106,6 +108,32 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.0</version>
</plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>${spotlessMavenPlugin.version}</version>
<configuration>
<java>
<eclipse>
<file>${project.basedir}/eclipse-java-formatter.xml</file>
</eclipse>
</java>
</configuration>
<executions>
<execution>
<id>spotless-check</id>
<goals>
<goal>check</goal>
</goals>
</execution>
<execution>
<id>spotless-apply</id>
<goals>
<goal>apply</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>

</build>
Expand Down
58 changes: 30 additions & 28 deletions src/main/java/fi/hsl/transitdata/stop/cancellations/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,29 @@ public static void main(String[] args) {

final DoiStopInfoSource doiStops = DoiStopInfoSource.newInstance(context, connString, useTestDoiQueries);

final ClosedStopHandler closedStopHandler = new ClosedStopHandler(context, connString, connString, useTestDoiQueries, useTestOmmQueries);
final DisruptionRouteHandler disruptionRouteHandler = new DisruptionRouteHandler(context, connString, connString, useTestDoiQueries, useTestOmmQueries);
final ClosedStopHandler closedStopHandler = new ClosedStopHandler(context, connString, connString,
useTestDoiQueries, useTestOmmQueries);
final DisruptionRouteHandler disruptionRouteHandler = new DisruptionRouteHandler(context, connString,
connString, useTestDoiQueries, useTestOmmQueries);

final StopCancellationPublisher publisher = new StopCancellationPublisher(context);

PulsarApplication finalApp = app;
scheduler.scheduleAtFixedRate(() -> {
try {
//Query closed stops, affected journey patterns and affected journeys
final Optional<InternalMessages.StopCancellations> stopCancellationsClosed = closedStopsEnabled ?
closedStopHandler.queryAndProcessResults(doiStops) :
Optional.empty();
final Optional<InternalMessages.StopCancellations> stopCancellationsClosed = closedStopsEnabled
? closedStopHandler.queryAndProcessResults(doiStops)
: Optional.empty();

//Query disruption routes and affected journeys
final Optional<InternalMessages.StopCancellations> stopCancellationsJourneyPatternDetour = disruptionRouteEnabled ?
disruptionRouteHandler.queryAndProcessResults(doiStops) :
Optional.empty();
final Optional<InternalMessages.StopCancellations> stopCancellationsJourneyPatternDetour = disruptionRouteEnabled
? disruptionRouteHandler.queryAndProcessResults(doiStops)
: Optional.empty();

//Stop cancellation message should be sent even if there are no cancellations so that cancellation-of-cancellation works in the processor
publisher.sendStopCancellations(mergeStopCancellations(unwrapOptionals(Arrays.asList(stopCancellationsClosed, stopCancellationsJourneyPatternDetour))));
publisher.sendStopCancellations(mergeStopCancellations(unwrapOptionals(
Arrays.asList(stopCancellationsClosed, stopCancellationsJourneyPatternDetour))));
} catch (PulsarClientException e) {
log.error("Pulsar connection error", e);
closeApplication(finalApp, scheduler);
Expand All @@ -92,40 +95,40 @@ private static <T> Collection<T> unwrapOptionals(List<Optional<T>> optionals) {
return optionals.stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
}

private static InternalMessages.StopCancellations mergeStopCancellations(Collection<InternalMessages.StopCancellations> stopCancellationMessages) {
InternalMessages.StopCancellations.Builder stopCancellationsBuilder = InternalMessages.StopCancellations.newBuilder();
private static InternalMessages.StopCancellations mergeStopCancellations(
Collection<InternalMessages.StopCancellations> stopCancellationMessages) {
InternalMessages.StopCancellations.Builder stopCancellationsBuilder = InternalMessages.StopCancellations
.newBuilder();

//Merge journey patterns from all stop cancellation messages
final Collection<InternalMessages.JourneyPattern> combinedJourneyPatterns = combineTripsOfJourneyPatterns(stopCancellationMessages.stream()
.map(InternalMessages.StopCancellations::getAffectedJourneyPatternsList)
.flatMap(List::stream)
//There may be multiple journey patterns with same ID, but they all should include same trips
.collect(Collectors.groupingBy(InternalMessages.JourneyPattern::getJourneyPatternId)));
final Collection<InternalMessages.JourneyPattern> combinedJourneyPatterns = combineTripsOfJourneyPatterns(
stopCancellationMessages.stream()
.map(InternalMessages.StopCancellations::getAffectedJourneyPatternsList).flatMap(List::stream)
//There may be multiple journey patterns with same ID, but they all should include same trips
.collect(Collectors.groupingBy(InternalMessages.JourneyPattern::getJourneyPatternId)));

combinedJourneyPatterns.forEach(stopCancellationsBuilder::addAffectedJourneyPatterns);

//Collect stop cancellations from all stop cancellation messages
stopCancellationMessages.stream()
.map(InternalMessages.StopCancellations::getStopCancellationsList)
.flatMap(List::stream)
.forEach(stopCancellationsBuilder::addStopCancellations);
stopCancellationMessages.stream().map(InternalMessages.StopCancellations::getStopCancellationsList)
.flatMap(List::stream).forEach(stopCancellationsBuilder::addStopCancellations);

return stopCancellationsBuilder.build();
}

// combines affected trips of one or more journeyPattern(s) by adding their unique trips to a single returned journeyPattern
//TODO: simplify SQL queries so that this would not be necessary
private static Collection<InternalMessages.JourneyPattern> combineTripsOfJourneyPatterns(Map<String, List<InternalMessages.JourneyPattern>> journeyPatternById) {
private static Collection<InternalMessages.JourneyPattern> combineTripsOfJourneyPatterns(
Map<String, List<InternalMessages.JourneyPattern>> journeyPatternById) {
return journeyPatternById.entrySet().stream().map(journeyPatternsWithId -> {
InternalMessages.JourneyPattern.Builder journeyPatternBuilder = InternalMessages.JourneyPattern.newBuilder();
InternalMessages.JourneyPattern.Builder journeyPatternBuilder = InternalMessages.JourneyPattern
.newBuilder();
journeyPatternBuilder.setJourneyPatternId(journeyPatternsWithId.getKey());
//Add list of stops from the first journey pattern
journeyPatternBuilder.addAllStops(journeyPatternsWithId.getValue().get(0).getStopsList());
//Get set of trips from all journey patterns
journeyPatternBuilder.addAllTrips(journeyPatternsWithId.getValue().stream()
.flatMap(journeyPattern -> journeyPattern.getTripsList().stream())
.collect(Collectors.toSet())
);
.flatMap(journeyPattern -> journeyPattern.getTripsList().stream()).collect(Collectors.toSet()));

return journeyPatternBuilder.build();
}).collect(Collectors.toList());
Expand All @@ -143,8 +146,7 @@ private static void closeApplication(PulsarApplication app, ScheduledExecutorSer

private static String readConnString(String envVar, String secretName) throws Exception {
//Default path is what works with Docker out-of-the-box. Override with a local file if needed
final String secretFilePath = ConfigUtils.getEnv(envVar)
.orElse("/run/secrets/" + secretName);
final String secretFilePath = ConfigUtils.getEnv(envVar).orElse("/run/secrets/" + secretName);

String connectionString = "";
try (Scanner scanner = new Scanner(new File(secretFilePath)).useDelimiter("\\Z")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,23 @@ public void sendStopCancellations(InternalMessages.StopCancellations message) th
sendStopCancellations(message, currentTimestampUtcMs);
}

private void sendStopCancellations(InternalMessages.StopCancellations message, long timestamp) throws PulsarClientException {
private void sendStopCancellations(InternalMessages.StopCancellations message, long timestamp)
throws PulsarClientException {
List<String> stopIds = new ArrayList<>();
if (message.getStopCancellationsList() != null) {
stopIds = message.getStopCancellationsList().stream().map(x -> x.getStopId()).collect(Collectors.toList());
}
log.info("Sending {} stop cancellations with {} affected journey patterns. Stop ids: {}",
message.getStopCancellationsCount(), message.getAffectedJourneyPatternsCount(), stopIds);
try {
producer.newMessage().value(message.toByteArray())
.eventTime(timestamp)
.property(TransitdataProperties.KEY_PROTOBUF_SCHEMA, TransitdataProperties.ProtobufSchema.StopCancellations.toString())
producer.newMessage().value(message.toByteArray()).eventTime(timestamp)
.property(TransitdataProperties.KEY_PROTOBUF_SCHEMA,
TransitdataProperties.ProtobufSchema.StopCancellations.toString())
.send();
}
catch (PulsarClientException pe) {
} catch (PulsarClientException pe) {
log.error("Failed to send stop cancellation message to Pulsar", pe);
throw pe;
}
catch (Exception e) {
} catch (Exception e) {
log.error("Failed to handle handle stop cancellation message", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,35 @@ public class ClosedStopHandler {
final DoiAffectedJourneyPatternSource affectedJourneyPatternSource;
final DoiAffectedJourneySource affectedJourneySource;

public ClosedStopHandler(PulsarApplicationContext context, String ommConnString, String doiConnString, boolean useTestDoiQueries, boolean useTestOmmQueries) throws SQLException {
public ClosedStopHandler(PulsarApplicationContext context, String ommConnString, String doiConnString,
boolean useTestDoiQueries, boolean useTestOmmQueries) throws SQLException {
closedStopSource = OmmClosedStopSource.newInstance(context, ommConnString, useTestOmmQueries);
affectedJourneyPatternSource = DoiAffectedJourneyPatternSource.newInstance(context, doiConnString, useTestDoiQueries);
affectedJourneyPatternSource = DoiAffectedJourneyPatternSource.newInstance(context, doiConnString,
useTestDoiQueries);
affectedJourneySource = DoiAffectedJourneySource.newInstance(context, doiConnString, useTestDoiQueries);
}

public Optional<InternalMessages.StopCancellations> queryAndProcessResults(DoiStopInfoSource doiStops) throws SQLException{
public Optional<InternalMessages.StopCancellations> queryAndProcessResults(DoiStopInfoSource doiStops)
throws SQLException {
List<ClosedStop> closedStops = closedStopSource.queryAndProcessResults(doiStops.getDoiStopInfo());

Map<String, JourneyPattern> affectedJourneyPatternById = affectedJourneyPatternSource.queryByClosedStops(closedStops);
Map<String, List<Journey>> affectedJourneysByJourneyPatternId = affectedJourneySource.queryByJourneyPatternIds(affectedJourneyPatternById.keySet());
Map<String, JourneyPattern> affectedJourneyPatternById = affectedJourneyPatternSource
.queryByClosedStops(closedStops);
Map<String, List<Journey>> affectedJourneysByJourneyPatternId = affectedJourneySource
.queryByJourneyPatternIds(affectedJourneyPatternById.keySet());
addAffectedJourneysToJourneyPatterns(affectedJourneyPatternById, affectedJourneysByJourneyPatternId);
addAffectedJourneyPatternsToClosedStops(closedStops, affectedJourneyPatternById);
return createStopCancellationsMessage(closedStops, affectedJourneyPatternById.values());
}

public static void addAffectedJourneysToJourneyPatterns(
Map<String, JourneyPattern> affectedJourneyPatternMap,
public static void addAffectedJourneysToJourneyPatterns(Map<String, JourneyPattern> affectedJourneyPatternMap,
Map<String, List<Journey>> affectedJourneysMap) {
for (Map.Entry<String, List<Journey>> entry : affectedJourneysMap.entrySet()) {
affectedJourneyPatternMap.get(entry.getKey()).addAffectedJourneys(entry.getValue());
}
}

public static void addAffectedJourneyPatternsToClosedStops(
List<ClosedStop> closedStops,
public static void addAffectedJourneyPatternsToClosedStops(List<ClosedStop> closedStops,
Map<String, JourneyPattern> affectedJourneyPatternMap) {
for (ClosedStop closedStop : closedStops) {
for (JourneyPattern journeyPattern : affectedJourneyPatternMap.values()) {
Expand All @@ -57,11 +60,14 @@ public static void addAffectedJourneyPatternsToClosedStops(

}

public static Optional<InternalMessages.StopCancellations> createStopCancellationsMessage(Collection<ClosedStop> closedStops, Collection<JourneyPattern> journeyPatterns) {
public static Optional<InternalMessages.StopCancellations> createStopCancellationsMessage(
Collection<ClosedStop> closedStops, Collection<JourneyPattern> journeyPatterns) {
if (!closedStops.isEmpty()) {
InternalMessages.StopCancellations.Builder builder = InternalMessages.StopCancellations.newBuilder();
builder.addAllStopCancellations(closedStops.stream().map(ClosedStop::getAsProtoBuf).collect(Collectors.toList()));
builder.addAllAffectedJourneyPatterns(journeyPatterns.stream().map(JourneyPattern::getAsProtoBuf).collect(Collectors.toList()));
builder.addAllStopCancellations(
closedStops.stream().map(ClosedStop::getAsProtoBuf).collect(Collectors.toList()));
builder.addAllAffectedJourneyPatterns(
journeyPatterns.stream().map(JourneyPattern::getAsProtoBuf).collect(Collectors.toList()));
return Optional.of(builder.build());
} else {
return Optional.empty();
Expand Down
Loading