Go Pipeline 패턴에 대해 알아보자

Go Pipeline Pattern이란?

Go 언어에서의 파이프라인 패턴은 동시성 프로그래밍에 있어 중요한 개념으로, 특히 복잡한 데이터 처리와 관련된 작업을 효율적으로 수행하기 위해 사용된다.
이 패턴은 여러 단계의 독립적인 작업 단위로 나누어진 작업의 흐름을 생성한다. 각 단계는 파이프라인의 다음 단계로 데이터를 전달하며, 이러한 방식으로 데이터는 파이프라인을 통해 흐르게 된다.

구성

image

  • 단계 생성: 각 파이프라인 단계는 보통 별도의 고루틴에서 실행되는 함수로 구성된다. 이 함수는 입력 채널로부터 데이터를 읽고, 어떤 처리를 수행한 다음, 출력 채널로 결과를 전송한다.
  • 채널 연결: 파이프라인의 연속적인 단계들은 채널을 통해 연결된다. 한 단계의 출력 채널은 다음 단계의 입력 채널이 된다.
  • 동기화와 종료: 파이프라인의 마지막 단계가 완료되면, 종종 모든 고루틴이 완료될 때까지 기다려야 한다. 이는 sync.WaitGroup을 사용하여 동기화할 수 있으며, 종료 신호를 전달하여 고루틴이 종료되도록 할 수 있다.

사례

파이프라인은 데이터 처리, 병렬 계산, 비동기 작업 흐름 등 다양한 컨텍스트에서 유용하다. 예를 들어, 네트워크 요청을 처리하거나, 큰 데이터 세트를 분석하거나, CPU 집약적인 작업을 병렬로 수행해야 하는 경우에 파이프라인을 사용할 수 있다.
파이프라인 패턴을 사용하면 복잡한 작업 흐름을 여러 단계로 나눌 수 있어 코드의 가독성과 유지 관리성이 향상되며, 시스템 자원을 효율적으로 활용할 수 있다.

예제

함수 작성 예제

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6)
 7
 8// 첫 번째 단계: 정수 생성
 9func gen(nums ...int) <-chan int {
10	out := make(chan int)
11	go func() {
12		for _, n := range nums {
13			out <- n
14		}
15		close(out)
16	}()
17	return out
18}
19
20// 두 번째 단계: 정수 제곱
21func sq(in <-chan int) <-chan int {
22	out := make(chan int)
23	go func() {
24		for n := range in {
25			out <- n * n
26		}
27		close(out)
28	}()
29	return out
30}
31
32// 세 번째 단계: 결과 출력
33func print(ch <-chan int) {
34	for n := range ch {
35		fmt.Println(n)
36	}
37}
38
39func main() {
40	// 파이프라인 설정
41	nums := gen(2, 3, 4) // 정수 생성
42	squares := sq(nums)  // 정수 제곱
43
44	// 파이프라인 실행 및 동기화
45	var wg sync.WaitGroup
46	wg.Add(1)
47	go func() {
48		print(squares) // 결과 출력
49		wg.Done()
50	}()
51	wg.Wait()
52}

파이프 인터페이스 작성 예제

 1package main
 2
 3import (
 4	"fmt"
 5	"log"
 6)
 7
 8func main() {
 9	// 새 파이프라인 생성
10	p := New(func(out chan interface{}) {
11		// 초기 데이터 제공
12		for i := 1; i <= 5; i++ {
13			out <- i
14		}
15		close(out) // 중요: 채널을 닫아야 함
16	})
17
18	// 파이프라인에 단계 추가
19	p = p.Pipe(func(in interface{}) (interface{}, error) {
20		// 정수를 제곱합니다.
21		if num, ok := in.(int); ok {
22			return num * num, nil
23		}
24		return nil, fmt.Errorf("invalid input")
25	}).Pipe(func(in interface{}) (interface{}, error) {
26		// 제곱된 정수에 10을 더합니다.
27		if num, ok := in.(int); ok {
28			return num + 10, nil
29		}
30		return nil, fmt.Errorf("invalid input")
31	})
32
33	// 파이프라인 실행 및 결과 수집
34	out := p.Merge()
35
36	// 결과 출력
37	for o := range out {
38		fmt.Println(o)
39	}
40}
41
42type Executor func(interface{}) (interface{}, error)
43
44type Pipeline interface {
45	Pipe(executor Executor) Pipeline
46	Merge() <-chan interface{}
47}
48
49type pipeline struct {
50	dataC     chan interface{}
51	errC      chan error
52	executors []Executor
53}
54
55func New(f func(chan interface{})) Pipeline {
56	inC := make(chan interface{})
57	go f(inC)
58	return &pipeline{
59		dataC:     inC,
60		errC:      make(chan error),
61		executors: []Executor{},
62	}
63}
64
65func (p *pipeline) Pipe(executor Executor) Pipeline {
66	p.executors = append(p.executors, executor)
67	return p
68}
69
70func (p *pipeline) Merge() <-chan interface{} {
71	for i := 0; i < len(p.executors); i++ {
72		p.dataC, p.errC = run(p.dataC, p.executors[i])
73	}
74	return p.dataC
75}
76
77func run(
78	inC <-chan interface{},
79	f Executor,
80) (chan interface{}, chan error) {
81	outC := make(chan interface{})
82	errC := make(chan error)
83
84	go func() {
85		defer close(outC)
86		for v := range inC {
87			res, err := f(v)
88			if err != nil {
89				errC <- err
90				continue
91			}
92			outC <- res
93		}
94	}()
95	return outC, errC
96}

ref

이 시리즈의 게시물

댓글