1+ package examples ;
2+
3+ import functions .error_handler ;
4+ import functions .error_handler_fn ;
5+ import jnr .ffi .Memory ;
6+ import jnr .ffi .Pointer ;
7+ import jnr .ffi .Runtime ;
8+ import types .temporal .TInterpolation ;
9+
10+ import java .io .*;
11+ import java .time .LocalDateTime ;
12+ import java .time .OffsetDateTime ;
13+ import java .time .ZoneOffset ;
14+ import java .time .format .DateTimeFormatter ;
15+ import java .util .*;
16+
17+ import static functions .functions .*;
18+
19+ /**
20+ * A program that reads AIS data from a CSV file and constructs temporal values
21+ * using EXPANDABLE SEQUENCES instead of accumulating all instants in memory.
22+ *
23+ * Key difference from N10_AIS_Assemble_Full:
24+ * - N10: Accumulates ALL instants → Builds sequence at the END
25+ * - N11: Builds sequence INCREMENTALLY using temporal_append_tinstant()
26+ *
27+ * Advantages:
28+ * - Lower memory footprint (no ArrayList of instants)
29+ * - Sequence always available (no need to wait for end)
30+ * - Automatic capacity expansion
31+ * - Better for streaming scenarios
32+ *
33+ * This uses the MEOS expandable sequence API:
34+ * - Create initial sequence with one instant
35+ * - Append subsequent instants using temporal_append_tinstant()
36+ * - MEOS handles memory expansion automatically
37+ *
38+ * Input: data/aisdk-2026-02-13.csv (full day of AIS data)
39+ */
40+ public class N11_AIS_Expand_Full {
41+
42+ /* Configuration constants */
43+ static final int MAX_NO_RECS = 20_000_000 ; // Max records to read
44+ static final int MAX_NO_SHIPS = 6500 ; // Max ships to track
45+ static final int NO_RECS_BATCH = 100_000 ; // Marker frequency
46+
47+ /* Validation ranges for Denmark region */
48+ static final double LAT_MIN = 40.18 ;
49+ static final double LAT_MAX = 84.17 ;
50+ static final double LON_MIN = -16.1 ;
51+ static final double LON_MAX = 32.88 ;
52+ static final double SOG_MIN = 0.0 ;
53+ static final double SOG_MAX = 1022.0 ; // Speed in 1/10 knot steps
54+
55+ /* Date format parser for European format (DD/MM/YYYY HH:MM:SS) */
56+ static final DateTimeFormatter EUROPEAN_DATETIME_FORMATTER =
57+ DateTimeFormatter .ofPattern ("dd/MM/yyyy HH:mm:ss" );
58+
59+ static class AISRecord {
60+ OffsetDateTime T ;
61+ long MMSI ;
62+ double Latitude ;
63+ double Longitude ;
64+ double SOG ;
65+ }
66+
67+ static class TripRecord {
68+ long MMSI ; /* Ship identifier */
69+ int noRecords ; /* Number of input records */
70+ int noTripInstants ; /* Number of trip instants */
71+ int noSOGInstants ; /* Number of SOG instants */
72+ Pointer trip ; /* Expandable trip sequence */
73+ Pointer sog ; /* Expandable SOG sequence */
74+ OffsetDateTime lastTripTimestamp ; /* Last trip timestamp (duplicate detection) */
75+ OffsetDateTime lastSOGTimestamp ; /* Last SOG timestamp (duplicate detection) */
76+
77+ TripRecord (long mmsi ) {
78+ this .MMSI = mmsi ;
79+ this .noRecords = 0 ;
80+ this .noTripInstants = 0 ;
81+ this .noSOGInstants = 0 ;
82+ this .trip = null ;
83+ this .sog = null ;
84+ this .lastTripTimestamp = null ;
85+ this .lastSOGTimestamp = null ;
86+ }
87+ }
88+
89+ /**
90+ * Parse European date format (DD/MM/YYYY HH:MM:SS) to OffsetDateTime in UTC
91+ */
92+ private static OffsetDateTime parseEuropeanDateTime (String dateStr ) {
93+ try {
94+ LocalDateTime localDateTime = LocalDateTime .parse (dateStr .trim (), EUROPEAN_DATETIME_FORMATTER );
95+ return OffsetDateTime .of (localDateTime , ZoneOffset .UTC );
96+ } catch (Exception e ) {
97+ return null ;
98+ }
99+ }
100+
101+ public static void main (String [] args ) {
102+ error_handler_fn errorHandler = new error_handler ();
103+
104+ // Initialize MEOS
105+ meos_initialize_timezone ("UTC" );
106+ meos_initialize_error_handler (errorHandler );
107+
108+ long startTime = System .currentTimeMillis ();
109+
110+ // Map to store trips by MMSI
111+ Map <Long , TripRecord > trips = new LinkedHashMap <>();
112+
113+ int noRecords = 0 ;
114+ int noErrRecords = 0 ;
115+
116+ try {
117+ System .out .println ("Opening input file: data/aisdk-2026-02-13.csv" );
118+ BufferedReader reader = new BufferedReader (
119+ new FileReader ("src/main/java/examples/data/aisdk-2026-02-13.csv" ));
120+
121+ // Read header line
122+ String headerLine = reader .readLine ();
123+ if (headerLine == null ) {
124+ System .out .println ("Empty file" );
125+ reader .close ();
126+ meos_finalize ();
127+ return ;
128+ }
129+
130+ System .out .println ("Processing records" );
131+ System .out .println (" one '*' marker every " + NO_RECS_BATCH + " records" );
132+
133+ Runtime runtime = Runtime .getSystemRuntime ();
134+
135+ // Process each line
136+ String line ;
137+ while ((line = reader .readLine ()) != null && noRecords < MAX_NO_RECS ) {
138+ noRecords ++;
139+
140+ // Print progress marker
141+ if (noRecords % NO_RECS_BATCH == 0 ) {
142+ System .out .print ("*" );
143+ System .out .flush ();
144+ }
145+
146+ // Parse the line
147+ AISRecord rec = parseLine (line );
148+
149+ // Validate record
150+ if (rec == null ) {
151+ noErrRecords ++;
152+ continue ;
153+ }
154+
155+ // Check max ships limit
156+ if (trips .size () >= MAX_NO_SHIPS && !trips .containsKey (rec .MMSI )) {
157+ continue ;
158+ }
159+
160+ // Get or create trip record
161+ TripRecord trip = trips .computeIfAbsent (rec .MMSI , TripRecord ::new );
162+ trip .noRecords ++;
163+
164+ // Process trip instant (if lat/lon present)
165+ if (!Double .isNaN (rec .Latitude ) && !Double .isNaN (rec .Longitude )) {
166+ try {
167+ // Check for duplicate timestamp
168+ if (trip .lastTripTimestamp != null && rec .T .equals (trip .lastTripTimestamp )) {
169+ continue ; // Skip duplicate
170+ }
171+
172+ Pointer gs = geogpoint_make2d (4326 , rec .Longitude , rec .Latitude );
173+ Pointer inst = tpointinst_make (gs , rec .T );
174+
175+ if (trip .trip == null ) {
176+ // Create initial sequence with first instant
177+ Pointer instArray = Memory .allocate (runtime , Long .BYTES );
178+ instArray .putPointer (0 , inst );
179+ trip .trip = tsequence_make (instArray , 1 ,
180+ true , true , TInterpolation .LINEAR .getValue (), true );
181+
182+ if (trip .trip == null ) {
183+ System .err .printf ("\n MMSI: %d, error creating trip sequence\n " , trip .MMSI );
184+ continue ;
185+ }
186+ } else {
187+ // Append instant to existing sequence
188+ Pointer newSeq = temporal_append_tinstant (trip .trip , inst , TInterpolation .LINEAR .getValue (),
189+ 0.0 , null , true );
190+
191+ if (newSeq == null ) {
192+ System .err .printf ("\n MMSI: %d, error appending to trip\n " , trip .MMSI );
193+ continue ;
194+ }
195+
196+ trip .trip = newSeq ;
197+ }
198+
199+ trip .noTripInstants ++;
200+ trip .lastTripTimestamp = rec .T ;
201+
202+ } catch (Exception e ) {
203+ System .err .printf ("\n Error processing trip instant for MMSI %d: %s\n " ,
204+ trip .MMSI , e .getMessage ());
205+ }
206+ }
207+
208+ // Process SOG instant (if SOG present)
209+ if (!Double .isNaN (rec .SOG )) {
210+ try {
211+ // Check for duplicate timestamp
212+ if (trip .lastSOGTimestamp != null && rec .T .equals (trip .lastSOGTimestamp )) {
213+ continue ; // Skip duplicate
214+ }
215+
216+ Pointer inst = tfloatinst_make (rec .SOG , rec .T );
217+
218+ if (trip .sog == null ) {
219+ // Create initial sequence with first instant
220+ Pointer instArray = Memory .allocate (runtime , Long .BYTES );
221+ instArray .putPointer (0 , inst );
222+ trip .sog = tsequence_make (instArray , 1 ,
223+ true , true , TInterpolation .LINEAR .getValue (), true );
224+
225+ if (trip .sog == null ) {
226+ System .err .printf ("\n MMSI: %d, error creating SOG sequence\n " , trip .MMSI );
227+ continue ;
228+ }
229+ } else {
230+ // Append instant to existing sequence
231+ Pointer newSeq = temporal_append_tinstant (trip .sog , inst , TInterpolation .LINEAR .getValue (),
232+ 0.0 , null , true );
233+
234+ if (newSeq == null ) {
235+ System .err .printf ("\n MMSI: %d, error appending to SOG\n " , trip .MMSI );
236+ continue ;
237+ }
238+
239+ trip .sog = newSeq ;
240+ }
241+
242+ trip .noSOGInstants ++;
243+ trip .lastSOGTimestamp = rec .T ;
244+
245+ } catch (Exception e ) {
246+ System .err .printf ("\n Error processing SOG instant for MMSI %d: %s\n " ,
247+ trip .MMSI , e .getMessage ());
248+ }
249+ }
250+ }
251+
252+ reader .close ();
253+
254+ // Print results table
255+ System .out .println ("\n -----------------------------------------------------------------------------" );
256+ System .out .println ("| MMSI | #Rec | #TrInst | #SInst | Distance | Speed |" );
257+ System .out .println ("-----------------------------------------------------------------------------" );
258+
259+ for (TripRecord trip : trips .values ()) {
260+ System .out .printf ("| %9d | %5d | %5d | %5d |" ,
261+ trip .MMSI , trip .noRecords ,
262+ trip .noTripInstants , trip .noSOGInstants );
263+
264+ // Calculate trip distance
265+ if (trip .trip != null ) {
266+ try {
267+ double distance = tpoint_length (trip .trip );
268+ System .out .printf (" %15.6f |" , distance );
269+ } catch (Exception e ) {
270+ System .out .print (" --- |" );
271+ }
272+ } else {
273+ System .out .print (" --- |" );
274+ }
275+
276+ // Calculate time-weighted average SOG
277+ if (trip .sog != null ) {
278+ try {
279+ double avgSOG = tnumber_twavg (trip .sog );
280+ System .out .printf (" %13.6f |\n " , avgSOG );
281+ } catch (Exception e ) {
282+ System .out .println (" --- |" );
283+ }
284+ } else {
285+ System .out .println (" --- |" );
286+ }
287+ }
288+
289+ System .out .println ("-----------------------------------------------------------------------------" );
290+ System .out .printf ("\n %d records read.\n " , noRecords );
291+ System .out .printf ("%d erroneous records ignored.\n " , noErrRecords );
292+ System .out .printf ("%d trips read.\n " , trips .size ());
293+
294+ } catch (IOException e ) {
295+ System .err .println ("Error reading file: " + e .getMessage ());
296+ e .printStackTrace ();
297+ }
298+
299+ // Calculate elapsed time
300+ long endTime = System .currentTimeMillis ();
301+ double timeTaken = (endTime - startTime ) / 1000.0 ;
302+ System .out .printf ("The program took %.3f seconds to execute\n " , timeTaken );
303+
304+ // Finalize MEOS
305+ meos_finalize ();
306+ }
307+
308+ /**
309+ * Parse a CSV line into an AISRecord.
310+ * Returns null if the line is invalid or incomplete.
311+ */
312+ private static AISRecord parseLine (String line ) {
313+ String [] fields = line .split ("," , -1 ); // -1 to keep empty fields
314+
315+ if (fields .length < 8 ) {
316+ return null ;
317+ }
318+
319+ AISRecord rec = new AISRecord ();
320+ boolean hasT = false , hasMMSI = false , hasLatLon = false , hasSOG = false ;
321+
322+ try {
323+ // Field 0: Timestamp (European format: DD/MM/YYYY HH:MM:SS)
324+ if (!fields [0 ].isEmpty () && !fields [0 ].equals ("Unknown" )) {
325+ rec .T = parseEuropeanDateTime (fields [0 ]);
326+ hasT = (rec .T != null );
327+ }
328+
329+ // Field 2: MMSI
330+ if (!fields [2 ].isEmpty () && !fields [2 ].equals ("Unknown" )) {
331+ rec .MMSI = Long .parseLong (fields [2 ].trim ());
332+ hasMMSI = (rec .MMSI != 0 );
333+ }
334+
335+ // Field 3: Latitude
336+ rec .Latitude = Double .NaN ;
337+ if (!fields [3 ].isEmpty () && !fields [3 ].equals ("Unknown" )) {
338+ double lat = Double .parseDouble (fields [3 ].trim ());
339+ if (lat >= LAT_MIN && lat <= LAT_MAX ) {
340+ rec .Latitude = lat ;
341+ hasLatLon = true ;
342+ }
343+ }
344+
345+ // Field 4: Longitude
346+ rec .Longitude = Double .NaN ;
347+ if (!fields [4 ].isEmpty () && !fields [4 ].equals ("Unknown" )) {
348+ double lon = Double .parseDouble (fields [4 ].trim ());
349+ if (lon >= LON_MIN && lon <= LON_MAX ) {
350+ rec .Longitude = lon ;
351+ hasLatLon = hasLatLon && !Double .isNaN (rec .Latitude );
352+ } else {
353+ hasLatLon = false ;
354+ }
355+ }
356+
357+ // Field 7: SOG
358+ rec .SOG = Double .NaN ;
359+ if (!fields [7 ].isEmpty () && !fields [7 ].equals ("Unknown" )) {
360+ double sog = Double .parseDouble (fields [7 ].trim ());
361+ if (sog >= SOG_MIN && sog <= SOG_MAX ) {
362+ rec .SOG = sog ;
363+ hasSOG = true ;
364+ }
365+ }
366+
367+ } catch (NumberFormatException e ) {
368+ return null ;
369+ }
370+
371+ // Record is valid if it has timestamp, MMSI, and at least one of (lat/lon or SOG)
372+ if (hasT && hasMMSI && (hasLatLon || hasSOG )) {
373+ return rec ;
374+ }
375+
376+ return null ;
377+ }
378+ }
0 commit comments