Skip to content

Commit

Permalink
Implement reduceIndexed
Browse files Browse the repository at this point in the history
  • Loading branch information
tginsberg committed Jan 25, 2025
1 parent ad764dd commit ab9ebdd
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
+ Implement `uniquelyOccurring()` to emit stream elements that occur a single time
+ Implement `takeUntil()` to take from a stream until a predicate is met, including the first element that matches the predicate
+ Implement `foldIndexed()` to perform a traditional fold along with the index of each element
+ Implement `reduceIndexed()` to perform a reduce along with the index of each element

### 0.7.0
+ Use greedy integrators where possible (Fixes #57)
Expand Down
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ implementation("com.ginsberg:gatherers4j:0.8.0")
| `last(n)` | Constrain the stream to the last `n` values |
| `orderByFrequencyAscending()` | Returns a stream where elements are ordered from least to most frequent as `WithCount<T>` wrapper objects. |
| `orderByFrequencyDescending()` | Returns a stream where elements are ordered from most to least frequent as `WithCount<T>` wrapper objects. |
| `reduceIndexed(fn)` | Performs a reduce on the input stream using the given function, and includes the index of the elements |
| `reverse()` | Reverse the order of the stream |
| `shuffle()` | Shuffle the stream into a random order using the platform default `RandomGenerator` |
| `shuffle(rg)` | Shuffle the stream into a random order using the specified `RandomGenerator` |
Expand Down Expand Up @@ -173,11 +174,22 @@ Stream.of("A", "B", "C", "D", "E")
```java
Stream.of("A", "B", "C", "D", "E", "F", "G")
.gather(Gatherers4j.everyNth(3))
.toList()
.toList();

// ["A", "D", "G"]
```

#### Reduce a stream with access to the index of the elements

```java
Stream.of("A", "B", "C")
.gather(Gatherers4j.reduceIndexed((index, a, b) -> a + b + index))
.toList()
.getFirst();

// "AB1C2"
```

#### Take from a stream until a predicate is met, inclusive

```java
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/ginsberg/gatherers4j/Gatherers4j.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,13 @@ public static <INPUT> LastGatherer<INPUT> last(final int count) {
return new FrequencyGatherer<>(FrequencyGatherer.Order.Descending);
}


public static <INPUT extends @Nullable Object> ReduceIndexedGatherer<INPUT> reduceIndexed(
final TriFunction<Long, INPUT, INPUT, INPUT> reduceFunction
) {
return new ReduceIndexedGatherer<>(reduceFunction);
}

/// Reverse the order of the input Stream.
///
/// Note: This consumes the entire stream and holds it in memory, so it will not work on infinite
Expand Down
66 changes: 66 additions & 0 deletions src/main/java/com/ginsberg/gatherers4j/ReduceIndexedGatherer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2025 Todd Ginsberg
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ginsberg.gatherers4j;

import org.jspecify.annotations.Nullable;

import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Gatherer;

import static com.ginsberg.gatherers4j.GathererUtils.mustNotBeNull;

public class ReduceIndexedGatherer<INPUT extends @Nullable Object>
implements Gatherer<INPUT, ReduceIndexedGatherer.State<INPUT>, INPUT> {

private final TriFunction<Long, INPUT, INPUT, INPUT> reduceFunction;

ReduceIndexedGatherer(final TriFunction<Long, INPUT, INPUT, INPUT> reduceFunction) {
mustNotBeNull(reduceFunction, "Reduce function must not be null");
this.reduceFunction = reduceFunction;
}

@Override
public Supplier<State<INPUT>> initializer() {
return State::new;
}

@Override
public Integrator<State<INPUT>, INPUT, INPUT> integrator() {
return Integrator.ofGreedy((state, element, downstream) -> {
if (!state.hasFirstValue) {
state.value = element;
state.hasFirstValue = true;
} else {
state.value = reduceFunction.apply(state.index++, state.value, element);
}
return !downstream.isRejecting();
});
}

@Override
public BiConsumer<State<INPUT>, Downstream<? super INPUT>> finisher() {
return (inputState, downstream) -> downstream.push(inputState.value);
}

public static class State<INPUT> {
@Nullable
INPUT value;
long index = 1;
boolean hasFirstValue = false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2025 Todd Ginsberg
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ginsberg.gatherers4j;

import org.junit.jupiter.api.Test;

import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

class ReduceIndexedGathererTest {

@Test
@SuppressWarnings("DataFlowIssue")
void reduceFunctionMustNotBeNull() {
assertThatThrownBy(() -> new ReduceIndexedGatherer<>(null)).isInstanceOf(IllegalArgumentException.class);
}

@Test
void reduceIndexed() {
// Arrange
final Stream<String> input = Stream.of("A", "B", "C");

// Act
final String output = input
.gather(Gatherers4j.reduceIndexed((index, a, b) -> a + b + index))
.toList()
.getFirst();

// Assert
assertThat(output).isEqualTo("AB1C2");
}

}

0 comments on commit ab9ebdd

Please sign in to comment.