Backend with Golang

[Concurrency-5] Fan-Out, Fan-In

아직개구리 2023. 9. 12. 15:12

Best Practices for constructing Pipelines(이어서) 

  • Generator: data set을 stream of data on a channel 로 만드는 것이다.
  • channel을 사용함으로써 함수를 사용해서 할 때와 다른 점은 value를 추출하기 위해서 range statement 를 사용할 수 있고, 이를 통해 안전하고 concurrent 하게 사용할 수 있다. 이렇게 각각의 value들뿐만 아니라 pipeline 단계 별로도 concurrent 하다. 단계가 독립적으로 실행된다고 보면 된다. 
  • 예시에서 나오는 stage들은 공통으로 사용하는 done channel과 다음 스테이지로 전달되는 channel 에 의해서 interconnected 되어 있다. 
  • preemptable 해야만 하는 두개의 부분이 있는데, discrete value를 생성하는 것과, 그 discrete value를 보내는 단계이다.  

Some Handy Generator

여러가지 경우에 쓸모있는 generator들이 많다. 

  1. repeat : values ... interface{}  로 argument 를 받아서 done 채널에 값이 올때까지 무한정 data stream을 만든다.
  2. take : value stream과 num int를 받아서, num 만큼의 아이템을 사용하고 exit하는 stage이다 .
  3. repeatFn 은 function을 반복적으로 부르는 것. (합치면 for num := range(done,repeatFn(done,rand), 10) {} 이런식으로 사용이 가능하다. )  

interface{}의 사용: 어떻게 보면 금지되는 것 중 하나인데, 이 책의 작가는 pipeline stage에서는 써도 된다고 생각한다. pipeline의 많은 utility는 reusable stage들에서 오고, 이는 stage가 operate될때의 specificity level을 적절하게 설정해야 성취할 수 있는 특성이다.  type에 대한 info를 사용해서 어떠한 operation을 하지 않고 parameter의 갯수만 중요한 예시에서는 interface{}를 쓸 수 있는 것이다.  

 

extra pipeline stage와 type assertion 단계를 추가할 때의 Performance overhead에 대해서 염려할 수도 있다. 하지만 이것이 가져오는 Performance overhaed는 무시할 만(negligible) 하다. 

이런 것보다는 일반적으로 아래 두가지가 pipeline의 limiting factor로써 작용한다. 

  • generator: generator가 만약에 memory에서  stream을 만드는 것이 아니라면, I/O bound일 것이다. disk나 network으로부터 read한다면 type assertion stage를 추가하는 것과 같은 적은 performance overhead는 별게 아니게 된다. (영어표현은 eclipse the meager performance overhead shown here) 
  • computationally intensive stage: 이걸 완화시키기 위해서 fanout fanin 테크닉을 알아본다. 

** I/O bound: 프로세스가 진행될 때 I/O waiting 시간이 많은 경우를 의미한다. 파일 쓰기, 디스크작업, 네트워크 통신을 할 때 주로 나타난다. 반면에 CPU bound 는 CPU 사용기간이 I/O waiting 보다 많은 경우를 의미한다. CPU 성능에 의해 작업 속도가 결정된다. 

 

Fan-Out, Fan-In 

Stage 들은 computationally expensive 할 떄가 있다. 어떤 stage가 computationally expensive하다면, 이걸 완화시키기 위해서 어떻게 해야할까? 전체 pipeline의 rate-limit 을 하지 않도록? 

 

Fan-out: the process of starting multiple goroutines to handle input from the pipeline 

Fan-in : the process of combining multiple results into one channel 

 

두가지 조건: 1) stage가 이전에 계산한 값에 대해 의존도가 없어야함. 2) takes long time to run. 이 두가지 조건을 만족하면 fanning out 을 고려해볼 만 하다. 

 

이 책에서는 소수를 구하는 예시를 들어 설명한다. random int를 받아서, 그것이 소수인지 판별하고, 아니라면 그 다음 random int를 받아서 하는 방식을 선택한다. naive하지만 소수를 걸러내는데 시간이 많이 드는 것을 의도했다. 숫자가 크면 클수록 나눠줘야하는 숫자의 갯수가 커지고, primeFinder 함수에서 많은 시간이 지체되게 된다. 

 

fan out을 적용하기 위해 조건이 만족하는지 살펴보자. 

 

1) 먼저 random integer generator는 order-independent하고, 많은 시간이 소요되지 않는다.

2) primeFinder또한 order-independent 하지만 이 단계에서는 실행하는 데 긴 시간이 소요된다.  

 

만족한다! fan out을 적용할 수 있다. CPU 갯수만큼 primeFinder stage 를 복사해보자. 하나의 primeStream을 사용하는 대신,아래와 같이 CPU 갯수만큼의 goroutine을 가지도록. 

numFinders := runtime.NumCPU()
finders := make([]<-chan int, numFinders)
for i:= 0; i<numFinders; i++ {
	finders[i] = primeFinder(done, randIntStream)
}

이렇게 만들었을 때, 이제 이 결과들을 하나의 채널로 받게 하고 싶은데, 이때 필요한 단계가 fan-in 이다. 

fanning in 은 multiple streams of data를 하나의 single stream으로 나가게 하기 때문에 multiplexing 이라고 할 수 있다. (참고로, mux는 여러개의 입력을 하나의 output으로 나가게 하는 것이다. ) 

fanIn := func(
		done <-chan interface{}, channels ...<-chan interface{},
	) <-chan interface{} {
		var wg sync.WaitGroup
		multiplexedStream := make(chan interface{})
		multiplex := func(c <-chan interface{}) {
			defer wg.Done()
			for i := range c {
				select {
				case <-done:
					return
				case multiplexedStream <- i:
				}
			}
		}
		// Select from all the channels
		wg.Add(len(channels))
		for _, c := range channels {
			go multiplex(c)
		}
		// Wait for all the reads to complete
		go func() {
			wg.Wait()
			close(multiplexedStream)
		}()
		return multiplexedStream
	}

여기서 중요한 내용들 

  • 해당 goroutine이 torn down되기 위한 done 채널을 전달받고, 
  • sync.WaitGroup을 사용해서 channel이 모두 drain될때까지 기다린다. 
  •  multiplex 라는 함수를 만들고, 그 안에서는 channel을 pass받아서 안의 값을 읽고, 그것을 multiplexedStream에 전달한다. 
  • 마지막으로 goroutine을 하나 더 만들어서 multiplexing하는 모든 채널이 drained될떄까지 기다리고, 그 뒤에 multiplexedStream을 close한다. 

fanin 함수를 사용하여 아래와 같이 실행시킬 수 있다. 소수를 찾는데 시간이 많이 단축되는 걸 확인할 수 있다. 

finders := make([]<-chan interface{}, numFinders)
for i := 0; i < numFinders; i++ {
    finders[i] = primeFinder(done, randIntStream)
}

for prime := range take(done, fanIn(done, finders...), 10) {
    fmt.Printf("\t%d\n", prime)
}

이렇게 fan-Out fan In 패턴을 알아보았다. 

 

'Backend with Golang' 카테고리의 다른 글

[MySQL] Update Data in Batches  (0) 2023.09.19
[Concurrency-6] Or-done channel  (0) 2023.09.14
Rate Limiting  (1) 2023.09.06
Effective Go - Part 4: Embedding & Concurrency  (0) 2023.08.16
Effective Go - Part 3  (0) 2023.08.13