티스토리 뷰

요약 및 개요

groupedFlux에서 tracking 할 수 있는 키의 개수 제한은 256개 인 것처럼 동작

 

아래 테스트 코드는 트러블슈팅 중 원본 로직을 흉내내며 따라가는 터미 코드

import lombok.Data;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

public class GroupedFluxTest {

    @Data
    static class Dummy {
        private String key;
        private String value;

        public Dummy(String key, String value) {
            this.key = key;
            this.value = value;
        }
    }

    @Data
    static class Dto {
        private String keyval;

        public Dto(String keyval) {
            this.keyval = keyval;
        }
    }

    @Test
    public void test() {
        List<Dummy> list = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            list.add(new Dummy("key"+i, "value"+i));
        }
        Flux<Dummy> dummyFlux = Flux.fromIterable(list).share();

        dummyFlux.subscribe(dummy -> {
            System.out.println("normal subscribe:" + dummy.toString());
        });

        Function<GroupedFlux<String, Dummy>, Flux> convert = (groupedFlux) -> {
            return groupedFlux.map(dummy -> {
                return new Dto(dummy.getKey() + ":" + dummy.getValue());
            });
        };

        //
        dummyFlux
                .groupBy(Dummy::getKey)
                .flatMap(convert::apply)
                .subscribe(o -> {
                    System.out.println("groupBy subscribe:" + o.toString());
                });

    }
}

groupBy 다음 flatMap 을 거친 subscriber에서 key가 999개까지 처리되길 기대하는데 실제로는 255(256개)에서 멈춤

XXXXXX [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
normal subscribe:GroupedFluxTest.Dummy(key=key0, value=value0)
normal subscribe:GroupedFluxTest.Dummy(key=key1, value=value1)
normal subscribe:GroupedFluxTest.Dummy(key=key2, value=value2)
normal subscribe:GroupedFluxTest.Dummy(key=key3, value=value3)
normal subscribe:GroupedFluxTest.Dummy(key=key4, value=value4)
... 
normal subscribe:GroupedFluxTest.Dummy(key=key994, value=value994)
normal subscribe:GroupedFluxTest.Dummy(key=key995, value=value995)
normal subscribe:GroupedFluxTest.Dummy(key=key996, value=value996)
normal subscribe:GroupedFluxTest.Dummy(key=key997, value=value997)
normal subscribe:GroupedFluxTest.Dummy(key=key998, value=value998)
normal subscribe:GroupedFluxTest.Dummy(key=key999, value=value999)
groupBy subscribe:GroupedFluxTest.Dto(keyval=key0:value0)
groupBy subscribe:GroupedFluxTest.Dto(keyval=key1:value1)
groupBy subscribe:GroupedFluxTest.Dto(keyval=key2:value2)
groupBy subscribe:GroupedFluxTest.Dto(keyval=key3:value3)
groupBy subscribe:GroupedFluxTest.Dto(keyval=key4:value4)
groupBy subscribe:GroupedFluxTest.Dto(keyval=key5:value5)
...
groupBy subscribe:GroupedFluxTest.Dto(keyval=key251:value251)
groupBy subscribe:GroupedFluxTest.Dto(keyval=key252:value252)
groupBy subscribe:GroupedFluxTest.Dto(keyval=key253:value253)
groupBy subscribe:GroupedFluxTest.Dto(keyval=key254:value254)
groupBy subscribe:GroupedFluxTest.Dto(keyval=key255:value255)

가장 도움이 된 답변

stackoverflow.com/questions/57788750/limiting-concurrency-of-observable-groupby-merge-combination

 

Limiting concurrency of Observable GroupBy / Merge Combination

We're implementing some software components using C# and Reactive Extensions. It contains functionality that splits an observable using the GroupBy method, than performs some arithmetics on those s...

stackoverflow.com

 

짧게 보자면

stackoverflow.com/questions/53477757/reactor-flux-subscriber-stream-stopped-when-using-reduce-on-flatmap

 

Reactor Flux subscriber stream stopped when using reduce on flatMap

I want change my code for single subscriber. Now i have auctionFlux.window(Duration.ofSeconds(120), Duration.ofSeconds(120)).subscribe( s -> s.groupBy(Auction::getItem).subscribe(

stackoverflow.com

stackoverflow.com/questions/48211297/is-there-a-limited-number-of-groupedfluxt-created-groupby-operator

 

Is there a limited number of GroupedFlux created groupBy operator

According to the document reference the groupBy operator splits a given Flux into multiple GroupedFlux depending on the keymapper function of the operator. If I execute the following code with a ra...

stackoverflow.com

 

조금 더 보자면

github.com/reactor/reactor-core/issues/596#issuecomment-386360221

 

groupBy followed by flatMap with "terminal operation" never finishes · Issue #596 · reactor/reactor-core

The following snippet never finishes, Flux source = Flux.push(sink -> { for (int i = 0; i < 6000; i++) { sink.next(i); } sink.complete(); }); StepVerifier.create(source.groupBy...

github.com

projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#groupBy-java.util.function.Function-

 

Flux (reactor-core 3.4.5)

 

projectreactor.io

공식 문서에 어느 정도 힌트가 있는 것 같다.

 

The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a flatMap with a maxConcurrency parameter that is set too low).

 

(정확하게 이해한 것 같지는 않아서 코드를 좀 더 까 봐야 할 것 같다.)

groupBy 오퍼레이션은 업스트림의 publisher를 consume 하는 과정이 포함되어 있으며

많은 양의 데이터에 대해 다운스트림에서 적절하게 처리되지 않는다면 행에 빠질 수 있다.

(결과는 행은 아니었고 그냥 잘렸다.)

 

flatMap 연산은 업스트림에 대해 동시에 처리할 수 있는 개수 제한이 걸려있다.

이게 256이었다.

즉 flatMap 앞에 groupBy 가 있으면 flatMap 이 처리 가능한 동시 개수를 넘는다면 (즉 groupBy의 key 개수가 많아진다면) groupBy 연산에서 나머지들은 버려지는 게 아닐까 생각

 

//아래에서 사용하는 상수
/**
* A small default of available slots in a given container, compromise between intensive pipelines, small
* subscribers numbers and memory use.
*/
public static final int SMALL_BUFFER_SIZE = Math.max(16,
Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));

//아래에서 사용하는 상수
/**
* An allocation friendly default of available slots in a given container, e.g. slow publishers and or fast/few
* subscribers
*/
public static final int XS_BUFFER_SIZE    = Math.max(8,
Integer.parseInt(System.getProperty("reactor.bufferSize.x", "32")));

//우리가 보통 호출하는 flatMap -> 내부적으로 아래의 concurrency 에 256값을 넘기면서 호출된다.
public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) {
		return flatMap(mapper, Queues.SMALL_BUFFER_SIZE, Queues
				.XS_BUFFER_SIZE);
	}
    
    
//실제 내부에서 추가로 호출되는 flatMap 
//동시에 처리할수 있는 시퀀스를 명시적으로 파라메터를 받는다.
* @param mapper the {@link Function} to transform input sequence into N sequences {@link Publisher}
* @param concurrency the maximum number of in-flight inner sequences
* @param prefetch the maximum in-flight elements from each inner {@link Publisher} sequence
* @param <V> the merged output sequence type
*/
public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int
concurrency, int prefetch) {
return flatMap(mapper, false, concurrency, prefetch);
}

flatmap의 설명이다.

The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel. In turn, that argument shows the size of the first Subscription.request(long) to the upstream. The prefetch argument allows to give an arbitrary prefetch size to the merged Publisher (in other words prefetch size means the size of the first Subscription.request(long) to the merged Publisher).

 

  • Generation of inners and subscription: this operator is eagerly subscribing to its inners.

flatMap 은 내부적으로 publisher에 대한 subscribe를 수행하고 있고 그것도 동시에 수행하고 있다.

그리고 동시에? 처리할 수 있는 양에도 제한도 걸려있다.(이걸 동시성이라고 불러야할지 병렬성이라고 불러야할지...)

 

groupedFlux는 key 마다 publisher를 만들어 준다.

flatMap에서 재빠르게 처리하지 못한다면 결국 groupedFlux에서 만들어내는 key마다 publisher 를 따라가지 못할 때

groupedFlux 에서 초과되는 key 들에 대한 publisher 가 무시되는 것 같다.

 

그래서 해결 방법은?

1. groupBy와 flatMap 사이에 parallel() 연산자를 붙임 -> 이게 왜 동작하는지는 조금 더 봐야 할 듯...

.groupBy(Dummy::getKey)
.parallel()
.flatMap(convert::apply)

2. flatMap에서 concurrency를 무진장 늘림 -> 터질지도 모르는데?

.groupBy(Dummy::getKey)
.flatMap(convert::apply, Integer.MAX_VALUE)

3. groupBy를 쓰지 않고 외부에 위임 concurrent 자료구조 이용? -> 리액티브의 철학? 에 위배되는 방법이라는 생각.

4. 애초에 초기 publisher에서 무진장 데이터(시퀀스)를 많이 들고 오지 않도록 제어 -> 원본 로직은 저장소에서 들고 온 데이터들을 일괄로 처리하다 발생한 거니 (마이크로) 배치 성격에 맞게 잘라서 처리하는 방법이 아닐까 생각한다.

댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함