Backend with Golang

Channel of Channel [번역]

아직개구리 2023. 11. 21. 16:22

https://qiita.com/hogedigo/items/15af273176599307a2b2 

 

chan chan は意外と美味しい - Qiita

すっかり寒くなってきてチャンチャン焼きが美味しい今日この頃ですね(^_^)ところで、Go言語でchannelをchannelで受け渡し出来ること、ご存知でしょうか。自分の周囲では使っている人少な…

qiita.com

 

 

1. 요청 / 응답 

채널을 이중으로 설정해서 응답을 받을 수 있다. error를 받고 싶은 경우, chan chan error를 사용한다. 

요청을 보낼 때 channel of channel 에 error channel을 넣어서 보내면 그 채널에 처리된 결과를 넣어주게 되면, 요청을 처리한 후의 결과를 받을 수 있게 된다. 

 

2. 처리 대기 

Request / Response와 유사하지만 처리 완료를 기다리는 데 사용할 수 있다. 

reqc := make(chan chan struct{})

요청을 보내는 쪽은 chan struct{} 를 만들고 chan chan에게 보낸다. 

요청을 처리하는 쪽은 요청을 처리하고, 해당 채널을 close한다. 

for {
    select {
    case ch := <-reqc:
        doSomething()
        close(ch) // 처리 완료를 통지하는 close 
    }
}

 

3. Subscriber 등록 

Publisher/Subscriber에서 Subscriber를 등록하는 데 사용한다. 

subscriber들을 등록하고, 삭제하고, 메시지를 publisch하는 struct이다. 

type PubSub struct {
    subscribe   chan chan string
    unsubscribe chan chan string
    publish     chan string
}

 

ps.subscribe <- sub을 사용해서 subscriber를 전달하면, 

func (ps *PubSub) Start() {
    go func() {
        subscribers := make(map[chan string]struct{})

        for {
            select {
            case ch := <-ps.subscribe:
                subscribers[ch] = struct{}{}
            case ch := <-ps.unsubscribe:
                delete(subscribers, ch)
            case msg := <-ps.publish:
                for sub := range subscribers {
                    select {
                    case sub <- msg:
                    default:
                    }
                }
            }
        }
    }()
}

for-select 문에서 루프를 돌며 subscribe chan chan 으로 전달된 channel을 등록한다. 이렇게 등록된 채널을 통해서 ps.publisch <- msg 이렇게 message를 퍼블리시 하는 채널에 스트링을 전달하면 subscribers 를 통해서 모든 subscriber들에게 msg를 전달한다. 

 

4. 순서 보장 

goroutine으로 처리를 가속화 하면서 순서대로 입력 되는 것을 보장 하고자 할 때 사용한다. 예시로는 큰 파일을 읽으면서 한 row마다 어떤 fetch를 통해서 가공하는 파이프라인 처리를 생각해보면 된다. 

 

한줄씩 읽어오고, 그 읽어온 데이터를 각 줄마다 어떤 처리과정을 거친다고 하면 직렬 처리를 한다면 시간이 많이 걸릴 수 밖에 없다. 

그것을 아래와 같이 semaphore를 사용해서 시간을 단축할 수 있다. 

// fetch 및 가공  
func fetchSomething(inCh <-chan string) <-chan string {
    outCh := make(chan string)

    c := context.Background()

    go func() {
        defer close(outCh)

        var wg sync.WaitGroup

        sem := semaphore.NewWeighted(10)

        for line := range inCh {
            wg.Add(1)
            sem.Acquire(c, 1)

            go func(line string) {
                defer wg.Done()
                defer sem.Release(1)

                // 이 예시에서는 fetch하는 대신 sleep으로 시간을 늘렸다. 
                time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
                outCh <- fmt.Sprintf("%s ... fetched!", line)
            }(line)
        }

        wg.Wait()
    }()

    return outCh
}

 

결과를 보면, outCh에 들어가는 값이 순서가 보장되지 않음을 알 수 있다. 

 

이때 순서를 지키며 outCh에서 데이터를 읽기 위해서는 chan chan을 사용하면 된다. 

// fetch 및 가공 
func fetchSomething(inCh <-chan string) <-chan string {
    outChCh := make(chan chan string, 10)

    go func() {
        defer close(outChCh)

        for line := range inCh {
            outCh := make(chan string)
            outChCh <- outCh

            go func(line string) {
                time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
                outCh <- fmt.Sprintf("%s ... fetched!", line)
            }(line)
        }
    }()

    // chan chan을 그대로 건네줄 수 있지만,여기서는 chan으로 변환해서 준다. 
    outCh := make(chan string)
    go func() {
        defer close(outCh)

        for ch := range outChCh {
            outCh <- <-ch
        }
    }()

    return outCh
}

fetch 처리 출력 용 channel을 작성해서 그것을 chan chan에 송신한다. 여기서는 처리 되는 순서에 상관 없이, inCh 에서 들어온 순서대로 chan chan에 추가 된다. 추가된 뒤에 고루틴에서 fetch 처리를 실시하고, 처리 출력 용 channel에 송신한다. 처리 출력 용 channel에 보내는 것은 순서가 보장되지 않지만, 읽을 때 저장된 순서대로 읽기 때문에, 순서가 보장된다. (chan chan 의 버퍼 사이즈가 세마포어의 역할을 담당한다. )