As a preview in JDK 23 and about to be final in the upcoming JDK 24, Java has added a capability to its Stream API to create new collectors using a new interface called Gatherer. Java has had a Collector API for a while now, but the Gatherer interface addresses some limitations, including the issue that there was no way to tell a Collector that processing was complete. If there were still some data the Collector was processing, this data was lost. You will see an example of this later.

Gatherers can be stateful or stateless. They can be serial or parallel. The JDK will provide some built-in implementations, just as it provided for Collectors. Gatherers are also meant to be composable, so you can link them up into a chain providing higher level capabilities.

Rust

In Rust, there are a lot of operations similar to what’s available in Java’s Stream API … things like map or filter. But Rust’s standard library provides many more varied operations than are available for Streams in Java. As examples of the Gatherer interface, I will implement two of Rust’s methods available on Rust slices: split and splitn. These methods take a predicate function, and break up the input into slices containing the elements up to the value matching the predicate. The difference between the two methods is that splitn will return at most n slices. For example, given

let s = [ 1, 2, 0, 3, 4, 0, 5, 6];

then if you had

let res = s.split(|n| *n == 0);

you would end up with 3 slices:

[1, 2],
[3, 4],
[5, 6]

however if you had

let res = s.splitn(2, |n| *n == 0);

you would end up with 2 slices:

[1, 2],
[3, 4, 0, 5, 6]

There are some interesting “edge” cases as well in the Rust split methods. For example, if the predicate matches more than once “in a row” an empty slice is emitted. Using a slightly tweaked example from above, if we have

let s = [ 1, 2, 0, 0, 5, 6];
let res = s.split(|n| *n == 0);

you’d have

[1, 2]
[]
[5, 6]

Java

So let’s see how we would do that in Java.

Note

As I mentioned, the Gatherer interface is still preview in JDK 23. To use it you will need to pass --enable-preview to the JVM when you run these examples. Or pass the flag via your build tool, like Gradle or Maven. I will show how to do it with Gradle.

Constructing a Gatherer

The Gatherer. interface takes two static “of” methods, one for sequential gatherers and one for parallel gatherers, each with a few variations depending on what your use case is. I will let you read the documentation for the details. A Gatherer can take up to four parameters: a supplier, an integrator, a combiner and a finalizer.

  • supplier: if you have a stateful gatherer, this is a function to initialize the state
  • integrator: a function that does the main work of your gatherer
  • combiner: a merging function taking two states and producing a combined state
  • finalizer: called when downstream processing is complete, allowing your gatherer to deal with any pending work

For our split and splitn functions, we will have a sequential, stateful gatherer. The state is the intermediate “slices” while we wait for the predicate to match. I collect these gatherers into a single static utility class I’ve ingeniously named Gatherers.

Note

For decades now I’ve adopted a pattern I first saw in Spring Framework code, to have static utility classes be public abstract classes. This avoids some “boilerplate” where you would otherwise need to create a private constructor so that class can’t be instantiated.

In the Gatherers class I have static methods to create our splitters.

public abstract class Gatherers {
    public static <T> Gatherer<T, ?, List<T>> split(Predicate<? super T> predicate) {
        return Gatherer.ofSequential(
                (), // TODO: supplier
                (state, element, downstream) -> { }, // TODO: integrator
                (state, downstream) -> { }); // TODO: finalizer
    }
 
    public static <T> Gatherer<T, ?, List<T>> splitn(int n, Predicate<? super T> predicate) {
        return Gatherer.ofSequential(
                (), // TODO: supplier
                (state, element, downstream) -> { }, // TODO: integrator
                (state, downstream) -> { }); // TODO: finalizer
    }
}

I define two static methods creating instances of Gatherer for our two split methods. At this stage they are almost identical except for the method names and the extra n argument. I’ve elided the details for now, to show the overall structure of the implementation. As I mentioned earlier, the supplier is called once at the start of the stream processing, the integrator is called for each element in the stream, and the finalizer is called when processing is complete. The instance of state returned by the supplier is passed to the integrator and finalizer. downstream is the next part of the stream.

To use the returned Gatherer instance, you create a stream like:

var split = Stream.of(10, 40, 30, 20, 60, 50)
    .gather(Gatherers.splitn(2, (el) -> el % 3 == 0))
    .iterator();

In this example, the predicate will split the input into two results, based on the first element that is divisible by 3 … so if we iterated over the output it will be:

10, 40
20, 60, 50

On the other hand, if we used split instead of splitn, the output would be:

10, 40
20
50

Implementing split

Let’s fill in the details of the split method.

public static <T> Gatherer<T, ?, List<T>> split(Predicate<? super T> predicate) {
    class SplitState {
        final List<T> list = new ArrayList<>();
        boolean did_push;
    }    u
 
    return Gatherer.ofSequential(
            SplitState::new,
            (state, element, downstream) -> {
                if (predicate.test(element)) {
                    List<T> copy = List.copyOf(state.list);
                    state.list.clear();
                    downstream.push(copy);
                    state.did_push = true;
                }
                else {
                    state.list.add(element);
                    state.did_push = false;
                }
 
                return true;
            },
            (state, downstream) -> {
                if (state.did_push || !state.list.isEmpty()) {
                    downstream.push(List.copyOf(state.list));
                }
            });
}

I create an inner class to hold the state of the splitter. There is a list of elements to accumulate values until the predicate matches. Then to match the semantics of the Rust equivalent, I keep a boolean flag to know at finalization whether the last thing I did was a push downstream. Simply checking whether the state list is empty is not enough. This is because it’s set in the predicate match section as we’ll see next. That means the method was called with a matching value. So the state will be empty, but if the stream is now finalized, the Rust semantics requires an empty set returned. I initialize the state calling SplitState::new.

The integrator is obviously more complex, though not terribly so. If the predicate matches, I make a (immutable) copy of the state list and it’s the copy that is sent downstream. I clear the state list so it starts fresh and set the flag that I just did the push for the reason mentioned in the previous paragraph. Note that the current i.e. matching element is discarded. Rust has a version of split split_inclusive that includes the matching element in the output. Implementing that is a simple tweak. If the predicate does not match, I add the element to the list state and clear the push flag. In both cases I return true to indicate stream processing can continue. Java’s Stream API allows short-circuiting, in which case you’d return false from the integrator.

Lastly, the finalize is straightforward, sending whatever is left in the state list even if it’s an empty list.

Implementing splitn

The splitn method is almost identical to the split method. The main difference is in the state, to keep track of how many splits have been done, and to check that to decide if we should stop.

public static <T> Gatherer<T, ?, List<T>> splitn(int n, Predicate<? super T> predicate) {
    class SplitState {
        final List<T> list = new ArrayList<>();
        boolean did_push;
        int splits = 1;
    }
    if (n < 1) {
      throw new IllegalArgumentException("n must be greater than 0");
    }
 
    return Gatherer.ofSequential(
            SplitState::new,
            (state, element, downstream) -> {
                if (state.splits < n && predicate.test(element)) {
                    List<T> copy = List.copyOf(state.list);
                    state.list.clear();
                    downstream.push(copy);
                    state.did_push = true;
                    state.splits++;
                }

The rest of the implementation is the same so I omit it here. I start the number of splits at 1 because there is always at least one split. If n is 1 then it doesn’t matter what the predicate is, you just get all the elements of the stream in the result.

Building with Gradle

Until JDK 24 is released, you will need to pass the --enable-preview flag to use the Gatherer interfaced as I mentioned. To do this with Gradle, you can do the following:

java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(23)
    }
}
 
compileJava {
    options.compilerArgs += ["--enable-preview"]
}
 
compileTestJava {
    options.compilerArgs += ["--enable-preview"]
}
 
tasks.named('test') {
    // Use JUnit Platform for unit tests.
    useJUnitPlatform()
    jvmArgs '--enable-preview'
}

Conclusion

This overview just touches the surface of what you can do with the new Gatherer interface. I didn’t cover parallel gatherers, or how to compose gatherers. I also didn’t show anything about using the combiner function. But I feel this is enough to get a sense of what you can do with the new interface. The code and some unit tests for this blog is available on my GitHub.