Go Pipeline 패턴에 대해 알아보자
Go Pipeline Pattern이란?
Go 언어에서의 파이프라인 패턴은 동시성 프로그래밍에 있어 중요한 개념으로, 특히 복잡한 데이터 처리와 관련된 작업을 효율적으로 수행하기 위해 사용된다.
이 패턴은 여러 단계의 독립적인 작업 단위로 나누어진 작업의 흐름을 생성한다. 각 단계는 파이프라인의 다음 단계로 데이터를 전달하며, 이러한 방식으로 데이터는 파이프라인을 통해 흐르게 된다.
구성
- 단계 생성: 각 파이프라인 단계는 보통 별도의 고루틴에서 실행되는 함수로 구성된다. 이 함수는 입력 채널로부터 데이터를 읽고, 어떤 처리를 수행한 다음, 출력 채널로 결과를 전송한다.
- 채널 연결: 파이프라인의 연속적인 단계들은 채널을 통해 연결된다. 한 단계의 출력 채널은 다음 단계의 입력 채널이 된다.
- 동기화와 종료: 파이프라인의 마지막 단계가 완료되면, 종종 모든 고루틴이 완료될 때까지 기다려야 한다. 이는
sync.WaitGroup
을 사용하여 동기화할 수 있으며, 종료 신호를 전달하여 고루틴이 종료되도록 할 수 있다.
사례
파이프라인은 데이터 처리, 병렬 계산, 비동기 작업 흐름 등 다양한 컨텍스트에서 유용하다. 예를 들어, 네트워크 요청을 처리하거나, 큰 데이터 세트를 분석하거나, CPU 집약적인 작업을 병렬로 수행해야 하는 경우에 파이프라인을 사용할 수 있다.
파이프라인 패턴을 사용하면 복잡한 작업 흐름을 여러 단계로 나눌 수 있어 코드의 가독성과 유지 관리성이 향상되며, 시스템 자원을 효율적으로 활용할 수 있다.
예제
함수 작성 예제
1package main
2
3import (
4 "fmt"
5 "sync"
6)
7
8// 첫 번째 단계: 정수 생성
9func gen(nums ...int) <-chan int {
10 out := make(chan int)
11 go func() {
12 for _, n := range nums {
13 out <- n
14 }
15 close(out)
16 }()
17 return out
18}
19
20// 두 번째 단계: 정수 제곱
21func sq(in <-chan int) <-chan int {
22 out := make(chan int)
23 go func() {
24 for n := range in {
25 out <- n * n
26 }
27 close(out)
28 }()
29 return out
30}
31
32// 세 번째 단계: 결과 출력
33func print(ch <-chan int) {
34 for n := range ch {
35 fmt.Println(n)
36 }
37}
38
39func main() {
40 // 파이프라인 설정
41 nums := gen(2, 3, 4) // 정수 생성
42 squares := sq(nums) // 정수 제곱
43
44 // 파이프라인 실행 및 동기화
45 var wg sync.WaitGroup
46 wg.Add(1)
47 go func() {
48 print(squares) // 결과 출력
49 wg.Done()
50 }()
51 wg.Wait()
52}
파이프 인터페이스 작성 예제
1package main
2
3import (
4 "fmt"
5 "log"
6)
7
8func main() {
9 // 새 파이프라인 생성
10 p := New(func(out chan interface{}) {
11 // 초기 데이터 제공
12 for i := 1; i <= 5; i++ {
13 out <- i
14 }
15 close(out) // 중요: 채널을 닫아야 함
16 })
17
18 // 파이프라인에 단계 추가
19 p = p.Pipe(func(in interface{}) (interface{}, error) {
20 // 정수를 제곱합니다.
21 if num, ok := in.(int); ok {
22 return num * num, nil
23 }
24 return nil, fmt.Errorf("invalid input")
25 }).Pipe(func(in interface{}) (interface{}, error) {
26 // 제곱된 정수에 10을 더합니다.
27 if num, ok := in.(int); ok {
28 return num + 10, nil
29 }
30 return nil, fmt.Errorf("invalid input")
31 })
32
33 // 파이프라인 실행 및 결과 수집
34 out := p.Merge()
35
36 // 결과 출력
37 for o := range out {
38 fmt.Println(o)
39 }
40}
41
42type Executor func(interface{}) (interface{}, error)
43
44type Pipeline interface {
45 Pipe(executor Executor) Pipeline
46 Merge() <-chan interface{}
47}
48
49type pipeline struct {
50 dataC chan interface{}
51 errC chan error
52 executors []Executor
53}
54
55func New(f func(chan interface{})) Pipeline {
56 inC := make(chan interface{})
57 go f(inC)
58 return &pipeline{
59 dataC: inC,
60 errC: make(chan error),
61 executors: []Executor{},
62 }
63}
64
65func (p *pipeline) Pipe(executor Executor) Pipeline {
66 p.executors = append(p.executors, executor)
67 return p
68}
69
70func (p *pipeline) Merge() <-chan interface{} {
71 for i := 0; i < len(p.executors); i++ {
72 p.dataC, p.errC = run(p.dataC, p.executors[i])
73 }
74 return p.dataC
75}
76
77func run(
78 inC <-chan interface{},
79 f Executor,
80) (chan interface{}, chan error) {
81 outC := make(chan interface{})
82 errC := make(chan error)
83
84 go func() {
85 defer close(outC)
86 for v := range inC {
87 res, err := f(v)
88 if err != nil {
89 errC <- err
90 continue
91 }
92 outC <- res
93 }
94 }()
95 return outC, errC
96}