Skip to content

Commit 8b78faf

Browse files
committed
fix: N04_AIS_Stream DB & File examples: now both using expandable MEOS sequences correctly
1 parent c412ee1 commit 8b78faf

2 files changed

Lines changed: 176 additions & 76 deletions

File tree

src/main/java/examples/N04_AIS_Stream_DB.java

Lines changed: 90 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,25 @@
1616

1717
/**
1818
* A simple program that reads AIS data from a CSV file, accumulates the
19-
* observations in main memory and sends the temporal values to a MobilityDB
20-
* database when they reach a given number of instants in order to free
21-
* the memory and ingest the newest observations.
19+
* observations using MEOS expandable sequences, and sends the temporal values
20+
* to a MobilityDB database when they reach a given number of instants.
2221
*
23-
* This program illustrates streaming of temporal data to MobilityDB by:
24-
* 1. Accumulating instants in memory (up to NO_INSTS_BATCH per ship)
25-
* 2. Building a temporal sequence from accumulated instants
26-
* 3. Sending to database and keeping last instants for continuity
22+
* This program illustrates streaming of temporal data to MobilityDB using
23+
* the MEOS expandable sequences API:
24+
* 1. Create expandable sequence with first instant
25+
* 2. Append subsequent instants using temporal_append_tinstant()
26+
* 3. When batch size reached: send to database and restart sequence
2727
*
28-
* Note: Unlike the C version which uses MEOS expandable sequences API,
29-
* this Java version uses a simpler list-based accumulation since the
30-
* expandable API is not exposed in the Java bindings.
28+
* Key difference from AIS_Assemble:
29+
* - AIS_Assemble: Accumulate ALL data → Build once at end
30+
* - AIS_Stream_DB: Build incrementally → Send batches → Free memory
3131
*
32-
* SETUP: Same as AIS_Store.java - use mobilitydb/mobilitydb Docker image
32+
* This version is FAITHFUL to the C implementation using expandable sequences,
33+
* unlike earlier versions that used ArrayList accumulation.
34+
*
35+
* SETUP: Use mobilitydb/mobilitydb Docker image
36+
* docker run --name postgres-mobilitydb -e POSTGRES_PASSWORD=postgres \
37+
* -p 5432:5432 -d mobilitydb/mobilitydb
3338
*/
3439
public class N04_AIS_Stream_DB {
3540

@@ -49,11 +54,12 @@ static class AISRecord {
4954
}
5055

5156
static class TripRecord {
52-
long MMSI; /* Identifier of the trip */
53-
List<Pointer> instants; /* Accumulated instants */
57+
long MMSI; /* Identifier of the trip */
58+
Pointer trip; /* Expandable sequence of observations */
5459

55-
TripRecord() {
56-
instants = new ArrayList<>();
60+
TripRecord(long mmsi) {
61+
this.MMSI = mmsi;
62+
this.trip = null;
5763
}
5864
}
5965

@@ -66,6 +72,44 @@ private static void executeSQL(Connection conn, String sql) throws SQLException
6672
}
6773
}
6874

75+
/**
76+
* Get the number of instants in a temporal sequence.
77+
* Simulates accessing seq->count in C.
78+
*/
79+
private static int getSequenceCount(Pointer seq) {
80+
return temporal_num_instants(seq);
81+
}
82+
83+
/**
84+
* Restart sequence by keeping only the last NO_INSTS_KEEP instants.
85+
* Simulates tsequence_restart() from C.
86+
*/
87+
private static Pointer restartSequence(Pointer seq, int keepCount) {
88+
int totalCount = getSequenceCount(seq);
89+
90+
if (totalCount <= keepCount) {
91+
return seq; // Nothing to restart
92+
}
93+
94+
// Extract last keepCount instants
95+
Runtime runtime = Runtime.getSystemRuntime();
96+
Pointer[] keptInsts = new Pointer[keepCount];
97+
98+
for (int i = 0; i < keepCount; i++) {
99+
int idx = totalCount - keepCount + i + 1; // 1-indexed!
100+
keptInsts[i] = temporal_instant_n(seq, idx);
101+
}
102+
103+
// Create new sequence with kept instants
104+
Pointer instArray = Memory.allocate(runtime, keepCount * Long.BYTES);
105+
for (int i = 0; i < keepCount; i++) {
106+
instArray.putPointer(i * Long.BYTES, keptInsts[i]);
107+
}
108+
109+
return tsequence_make(instArray, keepCount,
110+
true, true, TInterpolation.LINEAR.getValue(), true);
111+
}
112+
69113
public static void main(String[] args) {
70114
error_handler_fn errorHandler = new error_handler();
71115

@@ -85,9 +129,11 @@ public static void main(String[] args) {
85129
int noWrites = 0;
86130

87131
// Map to store trips by MMSI
88-
Map<Long, TripRecord> trips = new HashMap<>();
132+
Map<Long, TripRecord> trips = new LinkedHashMap<>();
89133
int noShips = 0;
90134

135+
Runtime runtime = Runtime.getSystemRuntime();
136+
91137
try {
92138
/***************************************************************************
93139
* Section 1: Connection to the database
@@ -116,7 +162,7 @@ public static void main(String[] args) {
116162
new FileReader("src/main/java/examples/data/ais_instants.csv"));
117163

118164
/***************************************************************************
119-
* Section 3: Read input file and stream to database
165+
* Section 3: Read input file and stream to database using expandable sequences
120166
***************************************************************************/
121167

122168
System.out.printf("Accumulating %d instants before sending them to the database%n",
@@ -162,26 +208,15 @@ public static void main(String[] args) {
162208
meos_finalize();
163209
return;
164210
}
165-
trip = new TripRecord();
166-
trip.MMSI = rec.MMSI;
211+
trip = new TripRecord(rec.MMSI);
167212
trips.put(rec.MMSI, trip);
168213
noShips++;
169214
}
170215

171216
// Send to database when batch size is reached
172-
if (trip.instants.size() >= NO_INSTS_BATCH) {
173-
// Build sequence from accumulated instants
174-
Runtime runtime = Runtime.getSystemRuntime();
175-
Pointer array = Memory.allocate(runtime, trip.instants.size() * Long.BYTES);
176-
for (int i = 0; i < trip.instants.size(); i++) {
177-
array.putPointer(i * Long.BYTES, trip.instants.get(i));
178-
}
179-
180-
Pointer seqPtr = tsequence_make(array, trip.instants.size(),
181-
true, true, TInterpolation.LINEAR.getValue(), true);
182-
217+
if (trip.trip != null && getSequenceCount(trip.trip) == NO_INSTS_BATCH) {
183218
// Construct and execute query
184-
String tempOut = tspatial_out(seqPtr, 15);
219+
String tempOut = tspatial_out(trip.trip, 15);
185220
String query = String.format(
186221
"INSERT INTO public.AISTrips(MMSI, trip) " +
187222
"VALUES (%d, '%s') ON CONFLICT (MMSI) DO " +
@@ -193,19 +228,32 @@ public static void main(String[] args) {
193228
System.out.print("*");
194229
System.out.flush();
195230

196-
// Keep only the last instants for continuity
197-
List<Pointer> kept = new ArrayList<>();
198-
int startIdx = Math.max(0, trip.instants.size() - NO_INSTS_KEEP);
199-
for (int i = startIdx; i < trip.instants.size(); i++) {
200-
kept.add(trip.instants.get(i));
201-
}
202-
trip.instants = kept;
231+
// Restart the sequence by only keeping the last instants
232+
trip.trip = restartSequence(trip.trip, NO_INSTS_KEEP);
203233
}
204234

205-
// Add the new observation to the list
235+
// Append the new observation to the expandable sequence
206236
Pointer gs = geogpoint_make2d(4326, rec.Longitude, rec.Latitude);
207-
Pointer instPtr = tpointinst_make(gs, rec.T);
208-
trip.instants.add(instPtr);
237+
Pointer inst = tpointinst_make(gs, rec.T);
238+
239+
if (trip.trip == null) {
240+
// Create initial expandable sequence with first instant
241+
Pointer instArray = Memory.allocate(runtime, Long.BYTES);
242+
instArray.putPointer(0, inst);
243+
trip.trip = tsequence_make(instArray, 1,
244+
true, true, TInterpolation.LINEAR.getValue(), true);
245+
} else {
246+
// Append instant to existing sequence (expandable!)
247+
Pointer newSeq = temporal_append_tinstant(trip.trip, inst, TInterpolation.LINEAR.getValue(),
248+
0.0, null, true);
249+
250+
if (newSeq == null) {
251+
System.err.printf("\nError appending instant for MMSI %d\n", trip.MMSI);
252+
continue;
253+
}
254+
255+
trip.trip = newSeq;
256+
}
209257

210258
} catch (NumberFormatException e) {
211259
System.out.println("Record with invalid values ignored");
@@ -254,7 +302,7 @@ public static void main(String[] args) {
254302
// Calculate elapsed time
255303
long endTime = System.currentTimeMillis();
256304
double timeTaken = (endTime - startTime) / 1000.0;
257-
System.out.printf("The program took %f seconds to execute%n", timeTaken);
305+
System.out.printf("The program took %.3f seconds to execute%n", timeTaken);
258306

259307
// Finalize MEOS
260308
meos_finalize();

src/main/java/examples/N04_AIS_Stream_File.java

Lines changed: 86 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,21 @@
1515

1616
/**
1717
* A simple program that reads AIS data from a CSV file, accumulates the
18-
* observations in main memory and writes the temporal values to an output file
19-
* when they reach a given number of instants in order to free the memory and
20-
* ingest the newest observations.
18+
* observations using MEOS expandable sequences, and writes the temporal values
19+
* to an output file when they reach a given number of instants.
2120
*
2221
* This program is similar to AIS_Stream_DB but writes to a file instead of a database.
23-
* The accumulated temporal values are appended to the output file regularly.
22+
* It uses the MEOS expandable sequences API:
23+
* 1. Create expandable sequence with first instant
24+
* 2. Append subsequent instants using temporal_append_tinstant()
25+
* 3. When batch size reached: write to file and restart sequence
26+
*
27+
* Key difference from AIS_Assemble:
28+
* - AIS_Assemble: Accumulate ALL data → Build once at end
29+
* - AIS_Stream_File: Build incrementally → Write batches → Free memory
30+
*
31+
* This version is FAITHFUL to the C implementation using expandable sequences,
32+
* unlike earlier versions that used ArrayList accumulation.
2433
*/
2534
public class N04_AIS_Stream_File {
2635

@@ -40,14 +49,53 @@ static class AISRecord {
4049
}
4150

4251
static class TripRecord {
43-
long MMSI; /* Identifier of the trip */
44-
List<Pointer> instants; /* Accumulated instants */
52+
long MMSI; /* Identifier of the trip */
53+
Pointer trip; /* Expandable sequence of observations */
4554

46-
TripRecord() {
47-
instants = new ArrayList<>();
55+
TripRecord(long mmsi) {
56+
this.MMSI = mmsi;
57+
this.trip = null;
4858
}
4959
}
5060

61+
/**
62+
* Get the number of instants in a temporal sequence.
63+
* Simulates accessing seq->count in C.
64+
*/
65+
private static int getSequenceCount(Pointer seq) {
66+
return temporal_num_instants(seq);
67+
}
68+
69+
/**
70+
* Restart sequence by keeping only the last NO_INSTS_KEEP instants.
71+
* Simulates tsequence_restart() from C.
72+
*/
73+
private static Pointer restartSequence(Pointer seq, int keepCount) {
74+
int totalCount = getSequenceCount(seq);
75+
76+
if (totalCount <= keepCount) {
77+
return seq; // Nothing to restart
78+
}
79+
80+
// Extract last keepCount instants
81+
Runtime runtime = Runtime.getSystemRuntime();
82+
Pointer[] keptInsts = new Pointer[keepCount];
83+
84+
for (int i = 0; i < keepCount; i++) {
85+
int idx = totalCount - keepCount + i + 1; // 1-indexed!
86+
keptInsts[i] = temporal_instant_n(seq, idx);
87+
}
88+
89+
// Create new sequence with kept instants
90+
Pointer instArray = Memory.allocate(runtime, keepCount * Long.BYTES);
91+
for (int i = 0; i < keepCount; i++) {
92+
instArray.putPointer(i * Long.BYTES, keptInsts[i]);
93+
}
94+
95+
return tsequence_make(instArray, keepCount,
96+
true, true, TInterpolation.LINEAR.getValue(), true);
97+
}
98+
5199
public static void main(String[] args) {
52100
error_handler_fn errorHandler = new error_handler();
53101

@@ -60,12 +108,14 @@ public static void main(String[] args) {
60108
int noWrites = 0;
61109

62110
// Map to store trips by MMSI
63-
Map<Long, TripRecord> trips = new HashMap<>();
111+
Map<Long, TripRecord> trips = new LinkedHashMap<>();
64112
int noShips = 0;
65113

66114
BufferedWriter fileOut = null;
67115
BufferedReader fileIn = null;
68116

117+
Runtime runtime = Runtime.getSystemRuntime();
118+
69119
try {
70120
/***************************************************************************
71121
* Section 1: Open the output file
@@ -84,7 +134,7 @@ public static void main(String[] args) {
84134
new FileReader("src/main/java/examples/data/ais_instants.csv"));
85135

86136
/***************************************************************************
87-
* Section 3: Read input file and stream to output file
137+
* Section 3: Read input file and stream to output file using expandable sequences
88138
***************************************************************************/
89139

90140
System.out.printf("Accumulating %d instants before sending them to the output file%n",
@@ -126,46 +176,48 @@ public static void main(String[] args) {
126176
System.out.printf("Maximum number of ships exceeded: %d%n", MAX_TRIPS);
127177
return;
128178
}
129-
trip = new TripRecord();
130-
trip.MMSI = rec.MMSI;
179+
trip = new TripRecord(rec.MMSI);
131180
trips.put(rec.MMSI, trip);
132181
noShips++;
133182
}
134183

135184
// Write to file when batch size is reached
136-
if (trip.instants.size() >= NO_INSTS_BATCH) {
137-
// Build sequence from accumulated instants
138-
Runtime runtime = Runtime.getSystemRuntime();
139-
Pointer array = Memory.allocate(runtime, trip.instants.size() * Long.BYTES);
140-
for (int i = 0; i < trip.instants.size(); i++) {
141-
array.putPointer(i * Long.BYTES, trip.instants.get(i));
142-
}
143-
144-
Pointer seqPtr = tsequence_make(array, trip.instants.size(),
145-
true, true, TInterpolation.LINEAR.getValue(), true);
146-
185+
if (trip.trip != null && getSequenceCount(trip.trip) == NO_INSTS_BATCH) {
147186
// Write to output file
148-
String tempOut = tspatial_out(seqPtr, 15);
187+
String tempOut = tspatial_out(trip.trip, 15);
149188
fileOut.write(String.format("%d, %s%n", trip.MMSI, tempOut));
150189
fileOut.flush();
151190

152191
noWrites++;
153192
System.out.print("*");
154193
System.out.flush();
155194

156-
// Keep only the last instants for continuity
157-
List<Pointer> kept = new ArrayList<>();
158-
int startIdx = Math.max(0, trip.instants.size() - NO_INSTS_KEEP);
159-
for (int i = startIdx; i < trip.instants.size(); i++) {
160-
kept.add(trip.instants.get(i));
161-
}
162-
trip.instants = kept;
195+
// Restart the sequence by only keeping the last instants
196+
trip.trip = restartSequence(trip.trip, NO_INSTS_KEEP);
163197
}
164198

165-
// Add the new observation to the list
199+
// Append the new observation to the expandable sequence
166200
Pointer gs = geogpoint_make2d(4326, rec.Longitude, rec.Latitude);
167-
Pointer instPtr = tpointinst_make(gs, rec.T);
168-
trip.instants.add(instPtr);
201+
Pointer inst = tpointinst_make(gs, rec.T);
202+
203+
if (trip.trip == null) {
204+
// Create initial expandable sequence with first instant
205+
Pointer instArray = Memory.allocate(runtime, Long.BYTES);
206+
instArray.putPointer(0, inst);
207+
trip.trip = tsequence_make(instArray, 1,
208+
true, true, TInterpolation.LINEAR.getValue(), true);
209+
} else {
210+
// Append instant to existing sequence (expandable!)
211+
Pointer newSeq = temporal_append_tinstant(trip.trip, inst, TInterpolation.LINEAR.getValue(),
212+
0.0, null, true);
213+
214+
if (newSeq == null) {
215+
System.err.printf("\nError appending instant for MMSI %d\n", trip.MMSI);
216+
continue;
217+
}
218+
219+
trip.trip = newSeq;
220+
}
169221

170222
} catch (NumberFormatException e) {
171223
System.out.println("Record with invalid values ignored");

0 commit comments

Comments
 (0)