Java 8 스트림 분할
Java 8 Stream에서 "파티션"작업을 구현하는 방법은 무엇입니까? 분할이란, 스트림을 주어진 크기의 하위 스트림으로 나눕니다. 어떻게 든 그것은 Guava Iterators.partition () 메서드와 동일 할 것입니다 . 단지 파티션이 List가 아닌 지연 평가 된 스트림 인 것이 바람직합니다.
임의의 소스 스트림을 고정 크기 배치로 분할하는 것은 불가능합니다. 이는 병렬 처리를 망칠 수 있기 때문입니다. 병렬로 처리 할 때 분할 후 첫 번째 하위 작업의 요소 수를 모를 수 있으므로 첫 번째 작업이 완전히 처리 될 때까지 다음 하위 작업에 대한 파티션을 만들 수 없습니다.
그러나 임의 액세스에서 파티션 스트림을 만들 수 있습니다 List
. 이러한 기능은 예를 들어 내 StreamEx
라이브러리 에서 사용할 수 있습니다.
List<Type> input = Arrays.asList(...);
Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize);
또는 스트림 스트림을 정말로 원한다면 :
Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream);
타사 라이브러리에 의존하지 않으려면 이러한 ofSubLists
방법을 수동으로 구현할 수 있습니다.
public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) {
if (length <= 0)
throw new IllegalArgumentException("length = " + length);
int size = source.size();
if (size <= 0)
return Stream.empty();
int fullChunks = (size - 1) / length;
return IntStream.range(0, fullChunks + 1).mapToObj(
n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length));
}
이 구현은 약간 길어 보이지만 MAX_VALUE에 가까운 목록 크기와 같은 일부 코너 사례를 고려합니다.
순서가 지정되지 않은 스트림에 대해 병렬 친화적 인 솔루션을 원하면 (따라서 어떤 스트림 요소가 단일 배치로 결합되는지 신경 쓰지 않음) 다음과 같은 수집기를 사용할 수 있습니다 (@sibnick에게 영감을 줌).
public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize,
Collector<List<T>, A, R> downstream) {
class Acc {
List<T> cur = new ArrayList<>();
A acc = downstream.supplier().get();
}
BiConsumer<Acc, T> accumulator = (acc, t) -> {
acc.cur.add(t);
if(acc.cur.size() == batchSize) {
downstream.accumulator().accept(acc.acc, acc.cur);
acc.cur = new ArrayList<>();
}
};
return Collector.of(Acc::new, accumulator,
(acc1, acc2) -> {
acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc);
for(T t : acc2.cur) accumulator.accept(acc1, t);
return acc1;
}, acc -> {
if(!acc.cur.isEmpty())
downstream.accumulator().accept(acc.acc, acc.cur);
return downstream.finisher().apply(acc.acc);
}, Collector.Characteristics.UNORDERED);
}
사용 예 :
List<List<Integer>> list = IntStream.range(0,20)
.boxed().parallel()
.collect(unorderedBatches(3, Collectors.toList()));
결과:
[[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]]
이러한 수집기는 스레드로부터 완벽하게 안전하며 순차적 스트림에 대해 정렬 된 배치를 생성합니다.
모든 배치에 대해 중간 변환을 적용하려면 다음 버전을 사용할 수 있습니다.
public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize,
Collector<T, AA, B> batchCollector,
Collector<B, A, R> downstream) {
return unorderedBatches(batchSize,
Collectors.mapping(list -> list.stream().collect(batchCollector), downstream));
}
예를 들어, 이렇게하면 즉시 모든 배치의 숫자를 합산 할 수 있습니다.
List<Integer> list = IntStream.range(0,20)
.boxed().parallel()
.collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue),
Collectors.toList()));
스트림을 순차적으로 사용하고 싶다면 스트림을 분할 할 수 있습니다 (또한 윈도우 잉과 같은 관련 기능을 수행 할 수 있습니다.이 경우 실제로 원하는 것입니다). 표준 스트림에 대한 분할을 지원할 두 라이브러리는 cyclops-react (저는 저자입니다)와 cyclops-react가 확장하는 jOOλ (Windowing과 같은 기능 추가)입니다.
cyclops-streams에는 Java Streams에서 작동하기위한 정적 함수 StreamUtils 모음 과 분할을위한 splitAt, headAndTail, splitBy, partition과 같은 일련의 함수가 있습니다.
스트림을 크기가 30 인 중첩 스트림의 스트림으로 창을 지정하려면 window 메서드를 사용할 수 있습니다.
OP의 관점에서 스트리밍 용어로 스트림을 주어진 크기의 여러 스트림으로 분할하는 것은 분할 작업이 아니라 창 작업입니다.
Stream<Streamable<Integer>> streamOfStreams = StreamUtils.window(stream,30);
jool.Seq 를 확장 하고 Windowing 기능을 추가 하는 ReactiveSeq 라는 Stream 확장 클래스 가있어 코드를 좀 더 깔끔하게 만들 수 있습니다.
ReactiveSeq<Integer> seq;
ReactiveSeq<ListX<Integer>> streamOfLists = seq.grouped(30);
Tagir가 위에서 지적했듯이 이것은 병렬 스트림에 적합하지 않습니다. 다중 스레드 방식으로 실행하려는 Stream을 창 또는 일괄 처리하려면. cyclops-react의 LazyFutureStream 이 유용 할 수 있습니다 (윈도우 링은 할 일 목록에 있지만 일반 이전 일괄 처리를 사용할 수 있습니다).
이 경우 데이터는 스트림을 실행하는 다중 스레드에서 다중 생산자 / 단일 소비자 대기없는 큐로 전달되며 해당 큐의 순차 데이터는 스레드에 다시 분배되기 전에 창을 표시 할 수 있습니다.
Stream<List<Data>> batched = new LazyReact().range(0,1000)
.grouped(30)
.map(this::process);
Jon Skeet이 자신의 의견 에서 보여 주었 듯이 파티션을 지연시키는 것이 불가능한 것 같습니다 . 지연되지 않은 파티션의 경우 이미 다음 코드가 있습니다.
public static <T> Stream<Stream<T>> partition(Stream<T> source, int size) {
final Iterator<T> it = source.iterator();
final Iterator<Stream<T>> partIt = Iterators.transform(Iterators.partition(it, size), List::stream);
final Iterable<Stream<T>> iterable = () -> partIt;
return StreamSupport.stream(iterable.spliterator(), false);
}
우아한 해결책을 찾았습니다. Iterable parts = Iterables.partition(stream::iterator, size)
이 문제에 대한 가장 우아하고 순수한 Java 8 솔루션은 다음과 같습니다.
public static <T> List<List<T>> partition(final List<T> list, int batchSize) {
return IntStream.range(0, getNumberOfPartitions(list, batchSize))
.mapToObj(i -> list.subList(i * batchSize, Math.min((i + 1) * batchSize, list.size())))
.collect(toList());
}
//https://stackoverflow.com/questions/23246983/get-the-next-higher-integer-value-in-java
private static <T> int getNumberOfPartitions(List<T> list, int batchSize) {
return (list.size() + batchSize- 1) / batchSize;
}
이것은 List를 사용하는 대신 느리게 평가되는 순수한 Java 솔루션입니다.
public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable
currentBatch.add(new ArrayList<T>(batchSize));
return Stream.concat(stream
.sequential()
.map(new Function<T, List<T>>(){
public List<T> apply(T t){
currentBatch.get(0).add(t);
return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
}
}), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
.limit(1)
).filter(Objects::nonNull);
}
이 방법은 Stream<List<T>>
유연성을 위해 반환 됩니다. 로 Stream<Stream<T>>
쉽게 변환 할 수 있습니다 partition(something, 10).map(List::stream)
.
내부에서 일종의 해킹이 가능하다고 생각합니다.
배치를위한 유틸리티 클래스 생성 :
public static class ConcurrentBatch {
private AtomicLong id = new AtomicLong();
private int batchSize;
public ConcurrentBatch(int batchSize) {
this.batchSize = batchSize;
}
public long next() {
return (id.getAndIncrement()) / batchSize;
}
public int getBatchSize() {
return batchSize;
}
}
및 방법 :
public static <T> void applyConcurrentBatchToStream(Consumer<List<T>> batchFunc, Stream<T> stream, int batchSize){
ConcurrentBatch batch = new ConcurrentBatch(batchSize);
//hack java map: extends and override computeIfAbsent
Supplier<ConcurrentMap<Long, List<T>>> mapFactory = () -> new ConcurrentHashMap<Long, List<T>>() {
@Override
public List<T> computeIfAbsent(Long key, Function<? super Long, ? extends List<T>> mappingFunction) {
List<T> rs = super.computeIfAbsent(key, mappingFunction);
//apply batchFunc to old lists, when new batch list is created
if(rs.isEmpty()){
for(Entry<Long, List<T>> e : entrySet()) {
List<T> batchList = e.getValue();
//todo: need to improve
synchronized (batchList) {
if (batchList.size() == batch.getBatchSize()){
batchFunc.accept(batchList);
remove(e.getKey());
batchList.clear();
}
}
}
}
return rs;
}
};
stream.map(s -> new AbstractMap.SimpleEntry<>(batch.next(), s))
.collect(groupingByConcurrent(AbstractMap.SimpleEntry::getKey, mapFactory, mapping(AbstractMap.SimpleEntry::getValue, toList())))
.entrySet()
.stream()
//map contains only unprocessed lists (size<batchSize)
.forEach(e -> batchFunc.accept(e.getValue()));
}
다음은 AbacusUtil의 빠른 솔루션입니다 .
IntStream.range(0, Integer.MAX_VALUE).split(size).forEach(s -> N.println(s.toArray()));
Disclaimer:I'm the developer of AbacusUtil.
ReferenceURL : https://stackoverflow.com/questions/32434592/partition-a-java-8-stream
'programing' 카테고리의 다른 글
백엔드 용 Webpack? (0) | 2021.01.15 |
---|---|
수동 잠금과 동기화 방법의 차이점 (0) | 2021.01.15 |
ref = 'string'이 "레거시"인 이유는 무엇입니까? (0) | 2021.01.15 |
Angular CLI에서 피어 종속성 설치를 어떻게 처리합니까? (0) | 2021.01.15 |
Spring 관리 컨텍스트 외부의 클래스에 Bean 주입 (0) | 2021.01.15 |