티스토리 뷰
요약 및 개요
groupedFlux에서 tracking 할 수 있는 키의 개수 제한은 256개 인 것처럼 동작
아래 테스트 코드는 트러블슈팅 중 원본 로직을 흉내내며 따라가는 터미 코드
import lombok.Data;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
public class GroupedFluxTest {
@Data
static class Dummy {
private String key;
private String value;
public Dummy(String key, String value) {
this.key = key;
this.value = value;
}
}
@Data
static class Dto {
private String keyval;
public Dto(String keyval) {
this.keyval = keyval;
}
}
@Test
public void test() {
List<Dummy> list = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
list.add(new Dummy("key"+i, "value"+i));
}
Flux<Dummy> dummyFlux = Flux.fromIterable(list).share();
dummyFlux.subscribe(dummy -> {
System.out.println("normal subscribe:" + dummy.toString());
});
Function<GroupedFlux<String, Dummy>, Flux> convert = (groupedFlux) -> {
return groupedFlux.map(dummy -> {
return new Dto(dummy.getKey() + ":" + dummy.getValue());
});
};
//
dummyFlux
.groupBy(Dummy::getKey)
.flatMap(convert::apply)
.subscribe(o -> {
System.out.println("groupBy subscribe:" + o.toString());
});
}
}
groupBy 다음 flatMap 을 거친 subscriber에서 key가 999개까지 처리되길 기대하는데 실제로는 255(256개)에서 멈춤
XXXXXX [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
normal subscribe:GroupedFluxTest.Dummy(key=key0, value=value0)
normal subscribe:GroupedFluxTest.Dummy(key=key1, value=value1)
normal subscribe:GroupedFluxTest.Dummy(key=key2, value=value2)
normal subscribe:GroupedFluxTest.Dummy(key=key3, value=value3)
normal subscribe:GroupedFluxTest.Dummy(key=key4, value=value4)
...
normal subscribe:GroupedFluxTest.Dummy(key=key994, value=value994)
normal subscribe:GroupedFluxTest.Dummy(key=key995, value=value995)
normal subscribe:GroupedFluxTest.Dummy(key=key996, value=value996)
normal subscribe:GroupedFluxTest.Dummy(key=key997, value=value997)
normal subscribe:GroupedFluxTest.Dummy(key=key998, value=value998)
normal subscribe:GroupedFluxTest.Dummy(key=key999, value=value999)
groupBy subscribe:GroupedFluxTest.Dto(keyval=key0:value0)
groupBy subscribe:GroupedFluxTest.Dto(keyval=key1:value1)
groupBy subscribe:GroupedFluxTest.Dto(keyval=key2:value2)
groupBy subscribe:GroupedFluxTest.Dto(keyval=key3:value3)
groupBy subscribe:GroupedFluxTest.Dto(keyval=key4:value4)
groupBy subscribe:GroupedFluxTest.Dto(keyval=key5:value5)
...
groupBy subscribe:GroupedFluxTest.Dto(keyval=key251:value251)
groupBy subscribe:GroupedFluxTest.Dto(keyval=key252:value252)
groupBy subscribe:GroupedFluxTest.Dto(keyval=key253:value253)
groupBy subscribe:GroupedFluxTest.Dto(keyval=key254:value254)
groupBy subscribe:GroupedFluxTest.Dto(keyval=key255:value255)
가장 도움이 된 답변
stackoverflow.com/questions/57788750/limiting-concurrency-of-observable-groupby-merge-combination
짧게 보자면
조금 더 보자면
github.com/reactor/reactor-core/issues/596#issuecomment-386360221
공식 문서에 어느 정도 힌트가 있는 것 같다.
The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a flatMap with a maxConcurrency parameter that is set too low).
(정확하게 이해한 것 같지는 않아서 코드를 좀 더 까 봐야 할 것 같다.)
groupBy 오퍼레이션은 업스트림의 publisher를 consume 하는 과정이 포함되어 있으며
많은 양의 데이터에 대해 다운스트림에서 적절하게 처리되지 않는다면 행에 빠질 수 있다.
(결과는 행은 아니었고 그냥 잘렸다.)
flatMap 연산은 업스트림에 대해 동시에 처리할 수 있는 개수 제한이 걸려있다.
이게 256이었다.
즉 flatMap 앞에 groupBy 가 있으면 flatMap 이 처리 가능한 동시 개수를 넘는다면 (즉 groupBy의 key 개수가 많아진다면) groupBy 연산에서 나머지들은 버려지는 게 아닐까 생각
//아래에서 사용하는 상수
/**
* A small default of available slots in a given container, compromise between intensive pipelines, small
* subscribers numbers and memory use.
*/
public static final int SMALL_BUFFER_SIZE = Math.max(16,
Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));
//아래에서 사용하는 상수
/**
* An allocation friendly default of available slots in a given container, e.g. slow publishers and or fast/few
* subscribers
*/
public static final int XS_BUFFER_SIZE = Math.max(8,
Integer.parseInt(System.getProperty("reactor.bufferSize.x", "32")));
//우리가 보통 호출하는 flatMap -> 내부적으로 아래의 concurrency 에 256값을 넘기면서 호출된다.
public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) {
return flatMap(mapper, Queues.SMALL_BUFFER_SIZE, Queues
.XS_BUFFER_SIZE);
}
//실제 내부에서 추가로 호출되는 flatMap
//동시에 처리할수 있는 시퀀스를 명시적으로 파라메터를 받는다.
* @param mapper the {@link Function} to transform input sequence into N sequences {@link Publisher}
* @param concurrency the maximum number of in-flight inner sequences
* @param prefetch the maximum in-flight elements from each inner {@link Publisher} sequence
* @param <V> the merged output sequence type
*/
public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int
concurrency, int prefetch) {
return flatMap(mapper, false, concurrency, prefetch);
}
flatmap의 설명이다.
The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel. In turn, that argument shows the size of the first Subscription.request(long) to the upstream. The prefetch argument allows to give an arbitrary prefetch size to the merged Publisher (in other words prefetch size means the size of the first Subscription.request(long) to the merged Publisher).
- Generation of inners and subscription: this operator is eagerly subscribing to its inners.
flatMap 은 내부적으로 publisher에 대한 subscribe를 수행하고 있고 그것도 동시에 수행하고 있다.
그리고 동시에? 처리할 수 있는 양에도 제한도 걸려있다.(이걸 동시성이라고 불러야할지 병렬성이라고 불러야할지...)
groupedFlux는 key 마다 publisher를 만들어 준다.
flatMap에서 재빠르게 처리하지 못한다면 결국 groupedFlux에서 만들어내는 key마다 publisher 를 따라가지 못할 때
groupedFlux 에서 초과되는 key 들에 대한 publisher 가 무시되는 것 같다.
그래서 해결 방법은?
1. groupBy와 flatMap 사이에 parallel() 연산자를 붙임 -> 이게 왜 동작하는지는 조금 더 봐야 할 듯...
.groupBy(Dummy::getKey)
.parallel()
.flatMap(convert::apply)
2. flatMap에서 concurrency를 무진장 늘림 -> 터질지도 모르는데?
.groupBy(Dummy::getKey)
.flatMap(convert::apply, Integer.MAX_VALUE)
3. groupBy를 쓰지 않고 외부에 위임 concurrent 자료구조 이용? -> 리액티브의 철학? 에 위배되는 방법이라는 생각.
4. 애초에 초기 publisher에서 무진장 데이터(시퀀스)를 많이 들고 오지 않도록 제어 -> 원본 로직은 저장소에서 들고 온 데이터들을 일괄로 처리하다 발생한 거니 (마이크로) 배치 성격에 맞게 잘라서 처리하는 방법이 아닐까 생각한다.
'개발관련 > 뻘팁-작업노트' 카테고리의 다른 글
jackson 분석 - 작업 노트 (0) | 2021.08.08 |
---|---|
JHipster (맛보기 + 소견) (0) | 2021.05.11 |
java - java.util.regex.PatternSyntaxException: Dangling meta character '*' near index 0 (0) | 2021.05.10 |
CompletableFuture async (0) | 2021.04.06 |
accesslog awk (0) | 2021.04.04 |
- Total
- Today
- Yesterday
- 사기꾼증후군
- Dangling
- 에픽테토스
- 전설로떠나는월가의영웅
- AWS
- elasticsearch
- 클린 아키텍처
- WebSocket
- COMMIT
- Kafka
- 말의품격
- 개발자
- 기술블로그
- 만들면서 배우는 클린 아키텍처
- 기술센싱
- kafka 2.8.0
- Spring
- jhipster
- meta character
- completablefuture
- pecs
- Async
- opensearch
- Java
- flush
- PatternSyntaxException
- 기술사이트
- fsync
- percolate
- Generic
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |