https://qiita.com/hogedigo/items/15af273176599307a2b2
1. 요청 / 응답
채널을 이중으로 설정해서 응답을 받을 수 있다. error를 받고 싶은 경우, chan chan error를 사용한다.
요청을 보낼 때 channel of channel 에 error channel을 넣어서 보내면 그 채널에 처리된 결과를 넣어주게 되면, 요청을 처리한 후의 결과를 받을 수 있게 된다.
2. 처리 대기
Request / Response와 유사하지만 처리 완료를 기다리는 데 사용할 수 있다.
reqc := make(chan chan struct{})
요청을 보내는 쪽은 chan struct{} 를 만들고 chan chan에게 보낸다.
요청을 처리하는 쪽은 요청을 처리하고, 해당 채널을 close한다.
for {
select {
case ch := <-reqc:
doSomething()
close(ch) // 처리 완료를 통지하는 close
}
}
3. Subscriber 등록
Publisher/Subscriber에서 Subscriber를 등록하는 데 사용한다.
subscriber들을 등록하고, 삭제하고, 메시지를 publisch하는 struct이다.
type PubSub struct {
subscribe chan chan string
unsubscribe chan chan string
publish chan string
}
ps.subscribe <- sub을 사용해서 subscriber를 전달하면,
func (ps *PubSub) Start() {
go func() {
subscribers := make(map[chan string]struct{})
for {
select {
case ch := <-ps.subscribe:
subscribers[ch] = struct{}{}
case ch := <-ps.unsubscribe:
delete(subscribers, ch)
case msg := <-ps.publish:
for sub := range subscribers {
select {
case sub <- msg:
default:
}
}
}
}
}()
}
for-select 문에서 루프를 돌며 subscribe chan chan 으로 전달된 channel을 등록한다. 이렇게 등록된 채널을 통해서 ps.publisch <- msg 이렇게 message를 퍼블리시 하는 채널에 스트링을 전달하면 subscribers 를 통해서 모든 subscriber들에게 msg를 전달한다.
4. 순서 보장
goroutine으로 처리를 가속화 하면서 순서대로 입력 되는 것을 보장 하고자 할 때 사용한다. 예시로는 큰 파일을 읽으면서 한 row마다 어떤 fetch를 통해서 가공하는 파이프라인 처리를 생각해보면 된다.
한줄씩 읽어오고, 그 읽어온 데이터를 각 줄마다 어떤 처리과정을 거친다고 하면 직렬 처리를 한다면 시간이 많이 걸릴 수 밖에 없다.
그것을 아래와 같이 semaphore를 사용해서 시간을 단축할 수 있다.
// fetch 및 가공
func fetchSomething(inCh <-chan string) <-chan string {
outCh := make(chan string)
c := context.Background()
go func() {
defer close(outCh)
var wg sync.WaitGroup
sem := semaphore.NewWeighted(10)
for line := range inCh {
wg.Add(1)
sem.Acquire(c, 1)
go func(line string) {
defer wg.Done()
defer sem.Release(1)
// 이 예시에서는 fetch하는 대신 sleep으로 시간을 늘렸다.
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
outCh <- fmt.Sprintf("%s ... fetched!", line)
}(line)
}
wg.Wait()
}()
return outCh
}
결과를 보면, outCh에 들어가는 값이 순서가 보장되지 않음을 알 수 있다.
이때 순서를 지키며 outCh에서 데이터를 읽기 위해서는 chan chan을 사용하면 된다.
// fetch 및 가공
func fetchSomething(inCh <-chan string) <-chan string {
outChCh := make(chan chan string, 10)
go func() {
defer close(outChCh)
for line := range inCh {
outCh := make(chan string)
outChCh <- outCh
go func(line string) {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
outCh <- fmt.Sprintf("%s ... fetched!", line)
}(line)
}
}()
// chan chan을 그대로 건네줄 수 있지만,여기서는 chan으로 변환해서 준다.
outCh := make(chan string)
go func() {
defer close(outCh)
for ch := range outChCh {
outCh <- <-ch
}
}()
return outCh
}
fetch 처리 출력 용 channel을 작성해서 그것을 chan chan에 송신한다. 여기서는 처리 되는 순서에 상관 없이, inCh 에서 들어온 순서대로 chan chan에 추가 된다. 추가된 뒤에 고루틴에서 fetch 처리를 실시하고, 처리 출력 용 channel에 송신한다. 처리 출력 용 channel에 보내는 것은 순서가 보장되지 않지만, 읽을 때 저장된 순서대로 읽기 때문에, 순서가 보장된다. (chan chan 의 버퍼 사이즈가 세마포어의 역할을 담당한다. )
'Backend with Golang' 카테고리의 다른 글
[Docker] Storage: volumes vs bind mounts (0) | 2024.03.21 |
---|---|
[번역] Init() function (0) | 2024.01.19 |
CORS (Cross-Origin Resource Sharing) (1) | 2023.11.18 |
[Go] 재선언과 재할당 (0) | 2023.11.11 |
[Concurrency in Go] 비정상 고루틴의 치료 (steward & ward) (0) | 2023.11.10 |