Skip to content
This repository was archived by the owner on Aug 18, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ FROM openjdk:8-jre-slim
#Install curl for health check
RUN apt-get update && apt-get install -y --no-install-recommends curl

ADD target/transitdata-rail-tripupdate-source.jar /usr/app/transitdata-rail-tripupdate-source.jar
ADD target/transitdata-generic-gtfsrt-source.jar /usr/app/transitdata-generic-gtfsrt-source.jar

ENTRYPOINT ["java", "-jar", "/usr/app/transitdata-rail-tripupdate-source.jar"]
ENTRYPOINT ["java", "-jar", "/usr/app/transitdata-generic-gtfsrt-source.jar"]
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>fi.hsl.transitdata</groupId>
<artifactId>transitdata-rail-tripupdate-source</artifactId>
<artifactId>transitdata-generic-gtfsrt-source</artifactId>
<version>1.0.3</version>
<packaging>jar</packaging>

Expand Down Expand Up @@ -106,7 +106,7 @@
</dependencies>

<build>
<finalName>transitdata-rail-tripupdate-source</finalName>
<finalName>transitdata-generic-gtfsrt-source</finalName>
<resources>
<resource>
<directory>src/main/resources</directory>
Expand All @@ -130,7 +130,7 @@
<transformers>
<!-- add Main-Class to manifest file -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>fi.hsl.transitdata.railsource.HSLRailSourceMain</mainClass> </transformer>
<mainClass>fi.hsl.transitdata.gtfsrtsource.HSLGtfsRtSourceMain</mainClass> </transformer>
</transformers>
<filters>
<filter>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package fi.hsl.transitdata.gtfsrtsource;

import com.google.transit.realtime.GtfsRealtime;

import java.util.Optional;

/**
* Interface to process feed entities, used for e.g. transforming entity ids
*/
public interface FeedEntityProcessor {
/**
* Processes feed entity
* @param feedEntity Feed entity
* @return Optional feed entity. Empty if feed entity should not be published.
*/
Optional<GtfsRealtime.FeedEntity> process(GtfsRealtime.FeedEntity feedEntity);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package fi.hsl.transitdata.gtfsrtsource;

import com.google.transit.realtime.GtfsRealtime;
import fi.hsl.common.gtfsrt.FeedMessageFactory;
import fi.hsl.common.transitdata.TransitdataProperties;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;

import java.util.Optional;

/**
* Sends feed entities individually from feed message to Pulsar topic
*/
@Slf4j
public class FeedEntityPublisher {
private Producer<byte[]> producer;
private FeedEntityProcessor feedEntityProcessor;

public FeedEntityPublisher(Producer<byte[]> producer) {
this(producer, Optional::ofNullable);
}

public FeedEntityPublisher(Producer<byte[]> producer, FeedEntityProcessor feedEntityProcessor) {
this.producer = producer;
this.feedEntityProcessor = feedEntityProcessor;
}

public int publishFeedMessage(GtfsRealtime.FeedMessage feedMessage) {
log.info("Found {} feed entities", feedMessage.getEntityList().size());
int published = feedMessage.getEntityList().stream().map(feedEntityProcessor::process).mapToInt(feedEntity -> {
if (feedEntity.isPresent()) {
return sendFeedEntity(feedEntity.get()) ? 1 : 0;
} else {
return 0;
}
}).sum();
return published;
}

private boolean sendFeedEntity(GtfsRealtime.FeedEntity feedEntity) {
long now = System.currentTimeMillis();

String entityId = feedEntity.getId();

GtfsRealtime.FeedMessage feedMessage;
if (feedEntity.hasTripUpdate()) {
feedMessage = FeedMessageFactory.createDifferentialFeedMessage(entityId, feedEntity.getTripUpdate(), now);
} else if (feedEntity.hasVehicle()) {
feedMessage = FeedMessageFactory.createDifferentialFeedMessage(entityId, feedEntity.getVehicle(), now);
} else {
log.warn("Unsupported feed entity (id: {}, has trip update: {}, has vehicle: {}, has alert: {})", entityId, feedEntity.hasTripUpdate(), feedEntity.hasVehicle(), feedEntity.hasAlert());
return false;
}

producer.newMessage()
.key(entityId)
.value(feedMessage.toByteArray())
.eventTime(now)
.property(TransitdataProperties.KEY_PROTOBUF_SCHEMA, TransitdataProperties.ProtobufSchema.GTFS_TripUpdate.toString())
.sendAsync()
.whenComplete((messageId, throwable) -> {
if (throwable != null) {
if (throwable instanceof PulsarClientException) {
log.error("Failed to send message to Pulsar", throwable);
} else {
log.error("Unexpected error", throwable);
}
}

if (messageId != null) {
if (feedMessage.getEntity(0).hasTripUpdate()) {
GtfsRealtime.TripUpdate tripUpdate = feedMessage.getEntity(0).getTripUpdate();
log.debug("Sending TripUpdate for entity {} with {} StopTimeUpdates and status {}",
entityId, tripUpdate.getStopTimeUpdateCount(), tripUpdate.getTrip().getScheduleRelationship());
} else if (feedMessage.getEntity(0).hasVehicle()) {
GtfsRealtime.VehiclePosition vehiclePosition = feedMessage.getEntity(0).getVehicle();
log.debug("Sending VehiclePosition for entity {}", entityId);
}
}
});
return true;
}
}
Original file line number Diff line number Diff line change
@@ -1,36 +1,47 @@
package fi.hsl.transitdata.railsource;
package fi.hsl.transitdata.gtfsrtsource;

import com.google.protobuf.InvalidProtocolBufferException;
import com.typesafe.config.Config;
import fi.hsl.common.config.ConfigParser;
import fi.hsl.common.pulsar.PulsarApplication;
import fi.hsl.common.pulsar.PulsarApplicationContext;
import fi.hsl.transitdata.gtfsrtsource.raildigitraffic.RailDigitrafficFeedEntityProcessor;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
* Rail source service sends tripupdates and alerts from trains
* to corresponding tripupdate and alert topics.
* Gtfs Rt source polls feed messages from specified URL and publishes them individually to Pulsar
*/
public class HSLRailSourceMain {
public class HSLGtfsRtSourceMain {

private static final Logger log = LoggerFactory.getLogger(HSLRailSourceMain.class);
private static final Logger log = LoggerFactory.getLogger(HSLGtfsRtSourceMain.class);

public static void main(String[] args) {

try {
final Config config = ConfigParser.createConfig();
final PulsarApplication app = PulsarApplication.newInstance(config);
final PulsarApplicationContext context = app.getContext();
final HslRailPoller poller = new HslRailPoller(context.getProducer(), context.getJedis(), config,
new RailTripUpdateService(context.getProducer()));

final FeedEntityProcessor feedEntityProcessor;
switch (config.getString("poller.processor")) {
case "raildigitraffic":
feedEntityProcessor = new RailDigitrafficFeedEntityProcessor();
break;
default:
feedEntityProcessor = Optional::ofNullable;
break;
}

final HslGtfsRtPoller poller = new HslGtfsRtPoller(config, new FeedEntityPublisher(context.getProducer(), feedEntityProcessor));

final int pollIntervalInSeconds = config.getInt("poller.interval");
final long maxTimeAfterSending = config.getDuration("poller.unhealthyAfterNotSending", TimeUnit.NANOSECONDS);
Expand Down
42 changes: 42 additions & 0 deletions src/main/java/fi/hsl/transitdata/gtfsrtsource/HslGtfsRtPoller.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package fi.hsl.transitdata.gtfsrtsource;

import com.google.transit.realtime.GtfsRealtime;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.io.InputStream;
import java.net.URL;

@Slf4j
class HslGtfsRtPoller {
private final String gtfsRtUrl;
private final FeedEntityPublisher feedEntityPublisher;

HslGtfsRtPoller(Config config, FeedEntityPublisher feedEntityPublisher) {
this.gtfsRtUrl = config.getString("poller.gtfsrturl");
this.feedEntityPublisher = feedEntityPublisher;
}

void poll() throws IOException {
GtfsRealtime.FeedMessage feedMessage = readFeedMessage(gtfsRtUrl);
handleFeedMessage(feedMessage);
}

static GtfsRealtime.FeedMessage readFeedMessage(String url) throws IOException {
return readFeedMessage(new URL(url));
}

static GtfsRealtime.FeedMessage readFeedMessage(URL url) throws IOException {
log.info("Reading GTFS RT feed messages from " + url);

try (InputStream inputStream = url.openStream()) {
return GtfsRealtime.FeedMessage.parseFrom(inputStream);
}
}

private void handleFeedMessage(GtfsRealtime.FeedMessage feedMessage) {
feedEntityPublisher.publishFeedMessage(feedMessage);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package fi.hsl.transitdata.gtfsrtsource.raildigitraffic;

import com.google.transit.realtime.GtfsRealtime;
import fi.hsl.transitdata.gtfsrtsource.FeedEntityProcessor;

import java.util.Optional;

import static fi.hsl.transitdata.gtfsrtsource.raildigitraffic.RailSpecific.*;

public class RailDigitrafficFeedEntityProcessor implements FeedEntityProcessor {
@Override
public Optional<GtfsRealtime.FeedEntity> process(GtfsRealtime.FeedEntity feedEntity) {
if (feedEntity.hasTripUpdate()) {
GtfsRealtime.TripUpdate tripUpdate = feedEntity.getTripUpdate();
//Remove 'delay' field from trip update as stop time updates should be used to provide timing information
tripUpdate = fixInvalidTripUpdateDelayUsage(tripUpdate);
//Remove 'delay' field from stop time updates as raildigitraffic2gtfsrt API only provides inaccurate values
tripUpdate = removeDelayFieldFromStopTimeUpdates(tripUpdate);
//Remove 'trip_id' field from trip descriptor as we don't know if trip id provided by raildigitraffic2gtfsrt API is the same as in static GTFS feed used by Google and others
tripUpdate = removeTripIdField(tripUpdate);

return Optional.of(feedEntity.toBuilder().setId(generateEntityId(tripUpdate)).setTripUpdate(tripUpdate).build());
} else {
return Optional.empty();
}
}

private static String generateEntityId(GtfsRealtime.TripUpdate tripUpdate) {
return "rail_" + String.join("-", tripUpdate.getTrip().getRouteId(), tripUpdate.getTrip().getStartDate(), tripUpdate.getTrip().getStartTime(), String.valueOf(tripUpdate.getTrip().getDirectionId()));
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package fi.hsl.transitdata.railsource;
package fi.hsl.transitdata.gtfsrtsource.raildigitraffic;

import com.google.transit.realtime.GtfsRealtime;
import lombok.extern.slf4j.Slf4j;
Expand Down
51 changes: 0 additions & 51 deletions src/main/java/fi/hsl/transitdata/railsource/HslRailPoller.java

This file was deleted.

Loading