In today's data-driven era, efficient stream processing has become a core requirement for modern applications. Whether processing real-time sensor data, analysing financial time series, or building high-performance event processing pipelines, developers need a tool that is both powerful and flexible. Since its introduction in Java 8, the standard library's Stream API has significantly improved the collection processing experience, but its design still has limitations: strict single-use consumption, limited index control, and lack of native support for window operations.
Semantic-Java emerges to address these gaps. It is a zero-dependency, modern Maven Java stream processing framework that skilfully blends the fluent expression of Java Streams, the lazy evaluation philosophy of JavaScript generators, and intelligent index control mechanisms inspired by database indexing. Whether you are handling time series data, event streams, or building complex data transformation pipelines, Semantic-Java provides an elegant and powerful solution.
This article will delve into the core concepts, design philosophy, API usage, and best practices of Semantic-Java, helping you master this revolutionary stream processing tool.
-
Zero Dependency and Lightweight Semantic-Java has no external dependencies, ensuring extreme lightness and purity. You can easily integrate it into any Java project without worrying about dependency conflicts or bloated deployment packages.
-
Seamless Fusion of Multiple Paradigms
- Java Streams Fluency: Provides intuitive operators like
filter,map,flatMap, making data processing pipelines clear and readable. - JavaScript Generator Laziness: Stream elements are computed and consumed only when needed, maximising memory efficiency and supporting infinite or very large data streams.
- Database-Inspired Intelligent Index Control: Introduces unique index redirection mechanisms (
redirect,translate,reverse), allowing you to flexibly control the logical position of elements in a stream, much like manipulating database records. - Native Window Operation Support: The newly added
WindowCollectableprovides native sliding and tumbling window operations, greatly simplifying time series analysis and stream aggregation.
- Java Streams Fluency: Provides intuitive operators like
-
Tailored for Modern Scenarios The framework is particularly suited for processing time series data (e.g., stock quotes, sensor readings), event streams (e.g., user behaviour logs, message queues), and any high-performance, composable data transformation pipelines.
-
Rich Native Statistical Support Built-in specialised statistics classes (e.g.,
IntStatistics,DoubleStatistics) for various numeric types fromBytetoBigDecimal, offering out-of-the-box support for aggregation calculations like sum, average, and extrema, eliminating the need for boilerplate code.
To clearly demonstrate the unique value of Semantic-Java, the following table compares its key features with the Java standard Stream API:
| Feature Dimension | Java Stream API | Semantic-Java | Remarks |
|---|---|---|---|
| Core Abstraction | Stream<T>, a single-use consumption pipeline. |
Semantic<E>, a lazy stream based on Generator<E>, supporting rich state transformations. |
Semantic-Java's generator model provides a better foundation for infinite streams and complex control flows. |
| Element Index | No native support. Elements have no associated index; requires external counters or workarounds like IntStream.range. |
Core Feature. Each element carries a long type index, which can be dynamically controlled via redirect, translate, reverse. |
Index control is Semantic-Java's "killer feature" for scenarios like time series, pagination, and logical reordering. |
| Window Operations | No native support. Requires third-party libraries (e.g., Apache Commons Collections' ListUtils.partition) or manual implementation using collect and sublists, leading to verbose code. |
Native Support. Direct support for sliding and tumbling windows via toWindow().slide(size, step) and toWindow().tumble(size). |
Greatly simplifies code for streaming analysis tasks like moving averages and real-time aggregation. |
| Lazy Evaluation | Supported. Intermediate operations are lazy. | Supported. Inherits the same lazy evaluation model. | Both share the same core execution model, beneficial for large-scale data processing. |
| Parallel Processing | Mature. Based on the ForkJoin framework, toggled via parallel() and sequential(). |
Supported. Sets concurrency via parallel(); terminal Collector implements parallel collection. |
Java Stream's parallel implementation is more mature and transparent. Semantic-Java provides parallelisation capabilities but focuses more on advanced abstractions like indexing and windows. |
| Dependencies | Part of the Java Standard Library, zero additional dependencies. | Zero-dependency independent framework. | Both are easy to integrate; Semantic-Java maintains its lightweight nature. |
| Ease of Use | Concise syntax, stable API, a standard skill for Java developers. | Builds upon the familiar Stream-style API, introducing more powerful index and window operations. Requires learning new concepts but is more powerful in specific domains. | For developers familiar with the Stream API, Semantic-Java has a gentle learning curve and solves more complex problems. |
| Ideal Use Cases | General-purpose collection data processing, filtering, mapping, reduction. | General-purpose collection processing, especially proficient in time series analysis, event stream processing, scenarios requiring complex element positioning (pagination, reordering), and sliding/tumbling window computations. | Semantic-Java is a "superset" and "enhancement" of Java Stream, offering more powerful abstractions for specific domains. |
Summary: The Java Stream API is an excellent and versatile standard tool. Semantic-Java builds upon it, significantly enhancing the ability to handle ordered data streams and time/sequence-sensitive computations by introducing index control and native window operations as core abstractions. It is a specialised, modern framework for tackling such problems.
Before diving into the API, understanding the following core abstractions is crucial:
-
Generator<E>(Generator) This is the source of the entire framework. It is a functional interface responsible for "generating" elements in the stream on demand. Its core method isaccept, which receives a consumer (to process elements) and an interruption predicate (to decide whether to terminate early). AllSemanticstreams are built uponGenerator, laying the foundation for lazy evaluation and unbounded streams. -
Semantic<E>(Semantic Stream) This is the main entry point and primary object for operations. It encapsulates aGeneratorand provides all streaming operations (e.g.,filter,map,distinct). Each operation returns a newSemanticinstance, enabling immutability and fluent chaining. It internally holds aconcurrentproperty, allowing for potential parallel processing. -
Collector<E, A, R>(Collector) This is the terminal operation of a stream. It defines how to accumulate stream elements into a mutable result container (A) and how to transform the final container into a result (R). TheCollectorsutility class provides numerous static factory methods (e.g.,toList(),useGroup(),useJoin()) to create common collectors.Collectorsupports short-circuiting operations (e.g.,findFirst) and parallel collection. -
IndexedControl(Index Control) This is Semantic-Java's "secret weapon". Each stream element carries alongtype index. Most operations (e.g.,filter,map) preserve or pass along the original index. Operations likeredirect,translate, andreversecan explicitly modify this index. The accumulator functionIndexedAccumulatorand interruption predicateIndexedInterruptof aCollectoralso receive the index parameter, allowing operations to be based on element position, greatly enhancing expressive power. -
Collectable<E>and its Subclasses This is the collectable state of a stream.OrderedCollectableandUnorderedCollectablerepresent terminal states requiring order preservation and not requiring order preservation, respectively. They provide acollectmethod that accepts aCollectorto perform the final aggregation. VariousStatisticsclasses (e.g.,IntStatistics) inherit fromOrderedCollectableand are specialised for numeric statistics. -
WindowCollectable<E>(Window Collectable Stream) This is an important new feature in Semantic-Java, inheriting fromOrderedCollectable. It is specifically designed for window operations, offering two window modes: sliding windows (slide) and tumbling windows (tumble). Window operations are crucial in time series analysis, stream aggregation, and pattern detection.
Add the following dependency to your pom.xml file:
<dependency>
<groupId>pers.eloyhere</groupId>
<artifactId>semantic-java</artifactId>
<version>1.0.0</version> <!-- Please use the latest version -->
</dependency>Let's start with a simple example to understand how to create and consume a stream.
import pers.eloyhere.semantic.Semantic;
import pers.eloyhere.semantic.Collectors;
import java.util.List;
public class QuickStart {
public static void main(String[] args) {
// 1. Create a stream from an array
List<String> names = Semantic.useFrom(new String[]{"Alice", "Bob", "Charlie"})
.toUnordered()
.collect(Collectors.toList());
System.out.println(names); // Output: [Alice, Bob, Charlie]
// 2. Create a stream from a collection
List<Integer> numbers = Semantic.useFrom(List.of(1, 2, 3, 4, 5))
.toUnordered()
.collect(Collectors.toList());
// 3. Create a numeric range stream
List<Long> range = Semantic.useRange(5, 10) // Generates 5,6,7,8,9
.toUnordered()
.collect(Collectors.toList());
System.out.println(range); // Output: [5, 6, 7, 8, 9]
}
}A typical data processing pipeline consists of several intermediate operations and one terminal operation.
import pers.eloyhere.semantic.Semantic;
import pers.eloyhere.semantic.Collectors;
import java.util.List;
public class BasicPipeline {
public static void main(String[] args) {
List<String> result = Semantic.useFrom(new String[]{"apple", "banana", "apricot", "cherry", "avocado"})
.filter(fruit -> fruit.startsWith("a")) // 1. Filter: keep only fruits starting with 'a'
.map(String::toUpperCase) // 2. Map: convert to uppercase
.toUnordered()
.collect(Collectors.toList()); // 3. Collect: convert to List
System.out.println(result); // Output: [APPLE, APRICOT, AVOCADO]
}
}The Semantic class provides various static methods to create streams:
useFrom(Iterable<E>)/useFrom(E[]): Create a stream from a collection or array.useRange(long start, long end): Create a consecutiveLongstream fromstart(inclusive) toend(exclusive). Ideal for generating indices or sequences.useBlob(InputStream): Create aBytestream by reading bytes from an input stream. For handling binary data.useBlob(InputStream, Charset): Create aCharacterstream by reading characters from an input stream with a specified charset.useText(String): Create a stream with the entire string as a single element.useText(String, String delimiter): Create a stream by splitting a string with a delimiter.useCodePoint(String): Create a stream by splitting a string by Unicode code points, correctly handling surrogate pairs.
Intermediate operations are the core of stream processing. They transform, filter, sort elements, etc., and return a new stream.
filter(Predicate<E>)/filter(BiPredicate<E, Long>): Decide whether to retain an element based on its value or its value + index.distinct(): Remove duplicate elements (based onequalsandhashCode).distinct(Comparator<E>): Deduplicate based on a custom comparator.limit(long n): Limit the stream to at mostnelements.skip(long n): Skip the firstnelements.sub(long start, long end): Take elements whose indices are in the range(start, end).takeWhile(Predicate/BP): Lazily take elements from the beginning until the condition is no longer met. This is a key difference fromfilter;filterchecks all elements, whereastakeWhilestops generating after the first element that fails the condition.dropWhile(Predicate/BP): Lazily skip elements from the beginning until the condition is no longer met, then keep all subsequent elements.
map(Function<E, R>)/map(BiFunction<E, Long, R>): Transform each element into another type. One of the most commonly used operations.flatMap(Function<E, Semantic<R>>): Map each element to a stream, then "flatten" all these streams into a single stream. For example, mapping a stream of sentences to a stream of words.flat(Function<E, Semantic<E>>): A specialised version offlatMapwhere the input and output element types are the same.
redirect(BiFunction<E, Long, Long>): The most powerful index operation. Allows you to dynamically compute a new index for each element. This is the foundation for implementing complex windows and grouped offsets.translate(long delta): Shift the index of all elements by a fixed value (newIndex = oldIndex + delta).reverse(): Reverse the index of all elements (newIndex = -oldIndex). Combined with sorting, it can achieve reverse order.
Important Note: Index control operations (especially redirect) should typically be invoked last in the operation chain because operations like distinct, filter, sorted may alter or rely on the original index order.
peek(Consumer<E>)/peek(BiConsumer<E, Long>): Perform an action (e.g., logging) on each element without modifying the stream. Primarily used for debugging.
concatenate(Semantic/Iterable/Array): Concatenate the current stream with another data source.sorted(): Sort by the natural order of elements (elements must implementComparable). This returns anOrderedCollectable.sorted(Comparator<E>): Sort by a custom comparator, returning anOrderedCollectable.
parallel(): Increase the stream's concurrency level by 1.parallel(long concurrent): Set the stream's concurrency level. This provides a hint for downstreamCollectorparallel collection.
A stream must be triggered by a terminal operation to perform computation and produce a result. In Semantic-Java, this is primarily achieved by converting the stream to a Collectable or Statistics and then calling the collect method.
toOrdered(): Convert to a collectable stream that preserves element order. Internally, it may use aTreeMapfor sorting based on indices, suitable for scenarios requiring ordered results. Note: May incur O(n log n) time and O(n) space overhead.toUnordered(): Convert to a collectable stream that does not guarantee order. Internally uses aHashMap, typically offering better performance thantoOrdered. Prefer this when result order is unimportant.toIntStatistics()/toDoubleStatistics(), etc.: Convert to a statistics stream for a specific numeric type. These classes inherit fromOrderedCollectableand additionally provide convenient statistical methods likesum(),average(),max(),min(),count().toWindow(): New method, convert to a window collectable stream. Returns aWindowCollectable<E>instance, supporting window operations.
Window operations are central to stream processing for handling time series and sliding aggregations. WindowCollectable inherits from OrderedCollectable, meaning it maintains an ordered buffer internally (based on TreeMap<Long, E>), enabling efficient window operations.
-
slide(long size, long step): Create a sliding window.size: Window size, i.e., the number of elements in each window.step: Slide step, i.e., the number of elements the window moves each time.- Returns a
Semantic<Semantic<E>>, i.e., a stream of streams, where each inner stream represents a window.
-
tumble(long size): Create a tumbling window (a special case of a sliding window where step equals size).size: Window size.- Equivalent to
slide(size, size). - Windows do not overlap; each element belongs to exactly one window.
Window Operation Example:
// Calculate the moving average for a sliding window of 3 elements (step size 1)
List<Double> prices = Arrays.asList(100.0, 101.0, 102.0, 103.0, 104.0);
List<Double> movingAverages = Semantic.useFrom(prices)
.toWindow() // Convert to a window collectable stream
.slide(3, 1) // Create a sliding window of size 3, step 1
.map(window -> window.collect(Collectors.useReduce(0.0, Double::sum)) / 3)
.toUnordered()
.collect(Collectors.toList());
System.out.println(movingAverages);
// Output: [101.0, 102.0, 103.0]
// Explanation:
// Window 1: [100.0, 101.0, 102.0] -> average 101.0
// Window 2: [101.0, 102.0, 103.0] -> average 102.0
// Window 3: [102.0, 103.0, 104.0] -> average 103.0Important Note: Window operations require knowing the position of all elements, so toWindow() triggers full consumption of the stream and stores it in an internal buffer. Use window operations with caution for infinite or very large streams.
The Collectors class contains numerous static factory methods for creating various Collector instances.
Reduction and Aggregation
toList()/toHashSet()/useToTreeSet(): Collect into standard collections.useToHashMap(Function<E,K>): Collect into aHashMapbased on a key extractor function.useToTreeMap(Function<E,K>, Function<E,V>): Collect into a sortedTreeMapbased on key and value extractor functions.useReduce(identity, operator): Reduce using the given identity value and associative operator.useCount(): Count the number of elements.
Search and Match
useFindFirst()/useFindLast()/useFindAny(): Find the first/last/any element.useFindAt(long index): Find the element at the specified (non-negative) index.useFindNegativeAt(long index): Find the element at the specified (negative) index (supports negative indices from the end, e.g., -1 for the last).useFindMaximum()/useFindMinimum(): Find the maximum/minimum element.useAnyMatch()/useAllMatch()/useNoneMatch(): Determine if any/all/none of the elements match the condition.
Grouping and Partitioning
useGroup(Function<E,K>): Group by a key extractor function, resulting inMap<K, List<E>>.useGroupBy(Function<E,K>, Function<E,V>): Group by key and value extractor functions, resulting inMap<K, List<V>>.usePartition(long n): Partition elements evenly intonlists based on index modulon.usePartitionBy(Function<E,Long>): Partition based on a partition key computed from the element value.
String Joining
useJoin(): Join with", "and wrap with[].useJoin(delimiter)useJoin(prefix, delimiter, suffix)
Frequency and Mode
useFrequency(): Calculate the frequency of each element, resulting inMap<E, Long>.useMode(): Find the most frequently occurring element (the mode).
Iteration
useForEach(Consumer): Perform an action on each element and return the number of processed elements.
For numeric streams, using the corresponding Statistics class is the most convenient approach. They internally optimise statistical calculations.
DoubleStatistics<Double> stats = Semantic.useFrom(new Double[]{1.5, 2.5, 3.5, 4.5})
.filter(x -> x > 2.0)
.toDoubleStatistics(); // Convert to Double statistics stream
System.out.println("Count: " + stats.count());
System.out.println("Sum: " + stats.sum());
System.out.println("Average: " + stats.average());
System.out.println("Min: " + stats.min());
System.out.println("Max: " + stats.max());
// Of course, you can also use the collect terminal operation
Double sum = stats.collect(Collectors.useReduce(0.0, Double::sum));Assume we have a sorted stream of stock prices by timestamp, and we want to calculate a moving average with a window of 3 data points. Using the new window API, this becomes very straightforward:
// Simulate stock price data
List<Double> prices = Arrays.asList(100.0, 101.0, 102.0, 103.0, 104.0);
// Calculate moving average with window size 3
List<Double> movingAverages = Semantic.useFrom(prices)
.toWindow() // Convert to window collectable stream
.slide(3, 1) // Create sliding window of size 3, step 1
.map(window -> window // Calculate average for each window
.toUnordered()
.collect(Collectors.useReduce(0.0, Double::sum)) / 3
)
.toUnordered()
.collect(Collectors.toList());
System.out.println(movingAverages); // Output: [101.0, 102.0, 103.0]Calculate the sum for tumbling windows of 2 elements each:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
List<Integer> windowSums = Semantic.useFrom(numbers)
.toWindow()
.tumble(2) // Tumbling window, size 2
.map(window -> window.collect(Collectors.useReduce(0, Integer::sum)))
.toUnordered()
.collect(Collectors.toList());
System.out.println(windowSums); // Output: [3, 7, 11]
// Explanation: [1+2=3, 3+4=7, 5+6=11]Fetch data for page 2 from a stream (10 items per page).
List<String> allItems = infinity(); // Assume a very large list
List<String> page2 = Semantic.useFrom(allItems)
.skip(10) // Skip the first 10 items (page 1)
.limit(10) // Take the next 10 items (page 2)
.toUnordered()
.collect(Collectors.toList());Assume we have a series of events, but some events need to be inserted at the front (e.g., higher priority events).
List<Event> events = getEvents(); // In chronological order
List<Event> highPriorityEvents = getHighPriorityEvents();
// "Insert" high-priority events at the very front of the stream
List<Event> processedStream = Semantic.useFrom(highPriorityEvents)
.translate(-1000000) // Give high-priority events a very small index to ensure they sort first
.concatenate(Semantic.useFrom(events))
.toOrdered() // Sort based on index
.collect(Collectors.toList());Like Java Streams, Semantic-Java's intermediate operations are lazy. This means data is not processed until a terminal operation is invoked. This offers significant optimisation potential:
- Infinite Streams: Can represent and process infinite sequences (e.g., a generator producing a random number sequence) because the next element is only computed when needed.
- Performance Optimisation: If the terminal operation is short-circuiting (e.g.,
findFirst,anyMatch) and intermediate operations likefilterandtakeWhilecan determine the result early, the stream may not process all elements.
Best Practice: Place operations that can filter out many elements (e.g., filter, distinct, takeWhile, limit, skip, sub) as early as possible in the operation chain to reduce unnecessary subsequent computations.
Understanding the rules of index propagation and overriding is crucial for correct framework usage:
- Index Origin: When a stream is created, elements are assigned initial indices (typically starting from 0 and incrementing).
- Index Propagation: Most operations (e.g.,
filter,map,peek) preserve the original index of elements. Whilefilterremoves elements, the remaining elements retain their original indices. - Index Override: Some operations recompute or override indices:
- Operations like
distinct,flatMap,sortedproduce new index sequences. Afterdistinct, element indices are a new sequence starting from 1.sorteddisrupts the original physical order, butOrderedCollectablereassigns logical indices based on the sorted order for collection.
- Operations like
- Index Redirection:
redirect,translate,reverseare operations that explicitly modify indices. Because they operate directly on indices, they should typically be placed last in the operation chain (after operations like sorting and deduplication that may depend on or alter indices) to ensure the intended index logic.
Rule Summary: Filter and transform first, then sort, and finally perform index redirection.
Window operations are a powerful feature, but their usage timing should be considered:
- Materialisation Requirement: Window operations need to know the position of all elements, so
toWindow()triggers full consumption of the stream. Ensure sufficient memory for very large datasets. - Performance Consideration: Sliding windows create multiple overlapping windows, each being a view of the original data. This can incur significant overhead if windows are large and have high overlap.
- Suitable Scenarios: Window operations are best suited for medium-sized datasets or data streams that can be processed in batches. For infinite streams, consider using
limit()to restrict the amount of data processed.
While the the Collector's collect method includes parallel collection logic based on ThreadPoolExecutor, and the Semantic class has a concurrent property and a parallel() method, the intermediate operations of Semantic itself are not implemented to execute in parallel. Parallelism is mainly manifested in the terminal collector (Collector.collect(generator, concurrent > 1)). The collector can split the workload across multiple threads and then merge the results.
Usage Suggestion: For computationally intensive terminal operations (e.g., complex reductions, grouping of large lists) and with sufficiently large data volumes, you can try using .parallel().toUnordered() in conjunction with a parallel-supporting Collector for potential performance gains. For simple operations or small data volumes, sequential execution is often more efficient.
Semantic-Java is a well-designed, forward-thinking Java stream processing framework. By introducing core abstractions like index control and native window operations, it significantly expands the capabilities of stream processing, enabling elegant solutions for scenarios involving time series, event streams, and other situations requiring complex positional logic. Its zero-dependency nature and deep integration with Java functional programming make integration and use exceptionally lightweight.
Core Value Recap:
- Powerful and Intuitive API: Combines familiar Stream operations with innovative index operations and native window operations.
- Unparalleled Flexibility: Operations like
redirectgive developers complete control over the logical view of the data stream. - Native Window Support:
WindowCollectableprovides out-of-the-box sliding and tumbling window operations, greatly simplifying time series analysis. - Excellent Performance Foundation: Lazy evaluation and short-circuiting optimisations ensure efficiency when handling large-scale or even infinite streams.
- Out-of-the-Box Toolset: A rich set of
Collectorsand specialisedStatisticsclasses cover the vast majority of terminal processing needs.
Advantages Compared to Java Stream: Compared to the standard Java Stream API, Semantic-Java provides two key enhancements—element indexing and native window operations—while maintaining similar ease of use. This results in cleaner, more intentional, and more powerful code for handling ordered data, time series analysis, pagination, data reordering, and similar scenarios.
Semantic-Java is not just a utility library; it represents a new way of thinking about stream processing. It encourages developers to think about and build data pipelines in a more declarative, semantic-rich manner. As functional programming becomes increasingly prevalent in the Java community, frameworks such as Semantic-Java will undoubtedly play an increasingly important role in big data processing, real-time computation, and system integration.
Start using Semantic-Java and redefine your Java stream processing experience.