Date: April 01, 2024
Goroutine and Concurrent Programming
Golang concurrency |
Table of Content
- Goroutine
- sync.WaitGroup
- sync.Once
- How Goroutine works
- Concurrent Programming Precautions
- Solving Concurrency Problems Using Mutexes
- Mutex to Ensure Atomic Access to a Shared Variable
- The Problem with Mutexes
- Another Resource Management Technique
- Channel
- Go Channel with Range and Close
- Channel Size
- Waiting for Data in a Channel
- SELECT Statement
- Implementing Producer-Consumer Pattern with Channel
- Buffered vs Unbuffered Channel
- Context
- Cancelable Context
- Setting a Timeout in Context
- Setting a Specific Value in Context
- Creating a Context with Both Cancellation and Value
Goroutine
- ์ค๋ ๋๋?
- ๊ณ ๋ฃจํด: ๊ฒฝ๋ ์ค๋ ๋๋ก ํจ์๋ ๋ช ๋ น์ ๋์์ ์คํ ์ ์ฌ์ฉ. main()๋ ๊ณ ๋ฃจํด์ ์ํด ์คํ ๋จ
- ํ๋ก์ธ์ค 1๊ฐ(์ฑ๊ธํ ์คํน) vs ๋ฉํฐํ๋ก์ธ์ค (๋ฉํฐํ์คํน).
- ํ๋ก์ธ์ค : ๋ฉ๋ชจ๋ฆฌ ๊ณต๊ฐ์ ๋ก๋ฉ๋์ด ๋์ํ๋ ํ๋ก๊ทธ๋จ. 1๊ฐ ์ด์์ ์ค๋ ๋๋ฅผ ๊ฐ์ง๊ณ ์์
- ์ค๋ ๋๋ ํ๋ก์ธ์ค์ ์ธ๋ถ์์ ๋จ์ ๋๋ ์คํ ํ๋ฆ. 1๊ฐ, 2๊ฐ, ๋ฉํฐ ์ค๋ ๋ ์์ ์ ์์
-
CPU๋ ํ๋์ ์ค๋ ๋ ์คํ๊ฐ๋ฅ, ์ฌ๋ฌ๊ฐ ์ค๋ ๋๋ฅผ ๋ฒ๊ฐ์๊ฐ๋ฉด์ ์ํํ์ฌ ๋์์คํ ์ฒ๋ผ ๋ณด์
-
์ปจํ ์คํธ ์ค์์นญ ๋น์ฉ
- ํ๋์ CPU๊ฐ ์ฌ๋ฌ๊ฐ thread ์ ํํ๋ฉด์ ์ํ์ ์ปจํ ์คํธ ์ค์์นญ ๋น์ฉ ๋ฐ์
- ์ค๋ ๋ ์ ํ์ ํ์ฌ ์ํ ๋ณด๊ดํด์ผ ํจ -> ์ค๋ ๋ ์ปจํ ์คํธ๋ฅผ ์ ์ฅ
- ์ค๋ ๋ ์ปจํ ์คํธ : ๋ช ๋ น ํฌ์ธํฐ, ์คํ๋ฉ๋ชจ๋ฆฌ ๋ฑ
- ์ปจํ ์คํธ ์ ์ฅ ๋ฐ ๋ณต์ ์ ๋น์ฉ ๋ฐ์
-
CPU ์ฝ์ด๋ง๋ค OS์ค๋ ๋๋ฅผ ํ๋๋ง ํ ๋นํด์ ์ฌ์ฉํ๊ธฐ ๋๋ฌธ์, Go์ธ์ด๋ ์ปจํ ์คํธ ์ค์์นญ ๋น์ฉ ๋ฐ์ ์์!
-
๊ณ ๋ฃจํด ์ฌ์ฉ
- ๋ชจ๋ ํ๋ก๊ทธ๋จ์ main()์ ๊ณ ๋ฃจํด์ผ๋ก ํ๋ ๊ฐ์ง๊ณ ์์
go ํจ์ํธ์ถ
๋ก ์๋ก์ด ๊ณ ๋ฃจํด ์ถ๊ฐ ๊ฐ๋ฅ.-
ํ์ฌ ๊ณ ๋ฃจํด์ด ์๋ ์๋ก์ด ๊ณ ๋ฃจํด์์ ํจ์๊ฐ ์ํ ๋จ
-
Goroutines
- Goroutines are lightweight concurrent functions or threads in Go.
- They allow you to perform multiple tasks concurrently, leveraging parallelism on multi-core systems.
- Unlike traditional threads, Goroutines are managed by the Go runtime, which dynamically allocates and manages their stack size.
- Goroutines communicate using channels, which prevent race conditions when accessing shared memory.
- Example: Fetching data from multiple APIs concurrently using Goroutines:
package main
import (
"context"
"fmt"
"net/http"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
urls := []string{
"https://api.example.com/users",
"https://api.example.com/products",
"https://api.example.com/orders",
}
results := make(chan string)
for _, url := range urls {
go fetchAPI(ctx, url, results)
}
for range urls {
fmt.Println(<-results)
}
}
func fetchAPI(ctx context.Context, url string, results chan<- string) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
results <- fmt.Sprintf("Error creating request for %s: %s", url, err.Error())
return
}
client := http.DefaultClient
resp, err := client.Do(req)
if err != nil {
results <- fmt.Sprintf("Error making request to %s: %s", url, err.Error())
return
}
defer resp.Body.Close()
results <- fmt.Sprintf("Response from %s: %d", url, resp.StatusCode)
}
- main ๊ณ ๋ฃจํด์ธ์ PrintHangul, PrintNumber ๊ณ ๋ฃจํด 2๊ฐ ์ถ๊ฐ์์ฑ
- 3๊ฐ ๊ณ ๋ฃจํด์ด ๋์์ ์คํ๋จ(CPU 3์ฝ์ด์ด์)
- 1-์ฝ์ด CPU์์๋ ๋์์ ์คํ๋๋ ๊ฒ ์ฒ๋ผ ๋ณด์
- main ๊ณ ๋ฃจํด์ ์ข
๋ฃํ๋ฉด ๋ชจ๋ ๊ณ ๋ฃจํด์ด ์ฆ์ ์ข
๋ฃ๋๊ณ , ํ๋ก๊ทธ๋จ์ด ์ข
๋ฃ๋จ
- main ๊ณ ๋ฃจํด์์ 3์ด๊ฐ ๊ธฐ๋ค๋ ค์ ๋๋จธ์ง 2๊ฐ ๊ณ ๋ฃจํด๋ ์คํ๋๋ ๋์ waitํจ
package main
import (
"fmt"
"time"
)
func PrintHangul() {
hanguls := []rune{'๊ฐ','๋','๋ค','๋ผ','๋ง','๋ฐ', '์ฌ'}
for _, v := range hanguls {
time.Sleep(300 * time.Millisecond)
fmt.Printf("%c", v)
}
}
func PrintNumbers(){
for i:=1; i<=5; i++ {
time.Sleep(400 * time.Millisecond)
fmt.Printf("%d ", i)
}
}
func main() {
// PrintHangul, PrintNumbers๊ฐ ๋์์ ์คํ
go PrintHangul()
go PrintNumbers()
// ๊ธฐ๋ค๋ฆฌ์ง ์์ผ๋ฉด ๋ฉ์ธํจ์ ์ข
๋ฃ๋๊ณ ๋ชจ๋ ๊ณ ๋ฃจํด๋ ์ข
๋ฃ๋๊ธฐ ๋๋ฌธ์ ๋๊ธฐ์๊ฐ ์ง์
// ํ์ง๋ง 3์ด ๋ผ๋ ์๊ฐ์ฒ๋ผ ํญ์ ์๊ฐ์ ๊ณ์ฐํ ํ์๋ ์์
// -> syncํจํค์ง WaitGroup ๊ฐ์ฒด ์ฌ์ฉ!
time.Sleep(3*time.Second)
}
sync.WaitGroup
-
์๋ธ ๊ณ ๋ฃจํด์ด ์ข ๋ฃ๋ ๋๊น์ง ๊ธฐ๋ค๋ฆฌ๊ธฐ
- ํญ์ ๊ณ ๋ฃจํด์ ์ข ๋ฃ์๊ฐ์ ๋ง์ถฐ time.Sleep(์ข ๋ฃ๊น์ง๊ฑธ๋ฆฌ๋์๊ฐ) ํธ์ถํ ์ ์์
-
๊ณ ๋ฃจํด์ด ๋๋ ๋๊น์ง waitํ ์ ์์: sync.WaitGroup ๊ฐ์ฒด
-
A
WaitGroup
is a synchronization primitive in Go that allows you - to wait for a collection of goroutines to finish executing.
- It keeps track of the number of goroutines that are currently active.
Add(n int)
increases the internal counter by n, indicating that n goroutines will be added to the group.Done()
decreases the counter by 1 when a goroutine completes its work.Wait()
blocks until the counter becomes zero (i.e., all goroutines have finished).
// sync.WaitGroup ๊ฐ์ฒด ์ฌ์ฉ
var wg sync.WaitGroup
// Add()๋ก ์๋ฃํด์ผ ํ๋ ์์
๊ฐ์ ์ค์ ํ๊ณ , ๊ฐ ์์
์ด ์๋ฃ ๋ ๋๋ง๋ค Done() ํธ์ถํ์ฌ
// ๋จ์ ์์
๊ฐ์๋ฅผ ํ๋์ฉ ์ค์ฌ์ค. Wait()์ ์ ์ฒด ์์
์ด ๋ชจ๋ ์๋ฃ๋ ๋๊น์ง ๋๊ธฐํ๊ฒ ๋จ
wg.Add(3) // ์์
๊ฐ์ ์ค์
wg.Done() // ์์
์ด ์๋ฃ๋ ๋๋ง๋ค ํธ์ถ
wg.Wait() // ๋ชจ๋ ์์
์ด ์๋ฃ๋ ๋๊น์ง ๋๊ธฐ
package main
import (
"sync"
"fmt"
)
var wg sync.WaitGroup
func SumAtoB(a, b int) {
sum := 0
for i:=a; i<=b; i++ {
sum += i
}
fmt.Printf("%d๋ถํฐ %d๊น์ง ํฉ๊ณ๋ %d์
๋๋ค.\n", a,b,sum)
wg.Done() // wg์ ๋จ์ ์์
๊ฐ์๋ฅผ 1์ฉ ๊ฐ์์ํด
}
func main() {
// Set the total work count: 10 goroutines will be created, increasing CPU utilization
wg.Add(10) // ์ด ์์
๊ฐ์ ์ค์ : 10๊ฐ์ ๊ณ ๋ฃจํด ์์ฑํ์ฌ CPU ์ ์ ์จ ์ฆ๊ฐ
for i:=0; i<10; i++ {
go SumAtoB(1, 1000000)
}
wg.Wait() // ๋ชจ๋ ์์
์ด ์๋ฃ ๋ ๋๊น์ง (๋จ์์์
๊ฐ์ = 0) ์ข
๋ฃํ์ง ์๊ณ ๋๊ธฐ
fmt.Println("๋ชจ๋ ๊ณ์ฐ์ด ์๋ฃ๋์ต๋๋ค.")
}
- synchronization primitives in go
-
https://medium.com/better-programming/using-synchronization-primitives-in-go-mutex-waitgroup-once-2e50359cb0a7
-
wait-group.example.go
package main
import (
"fmt"
"os"
"sync"
)
func main() {
homeDir, err := os.UserHomeDir()
if err != nil {
panic(err)
}
filesInHomeDir, err := os.ReadDir(homeDir)
if err != nil {
panic(err)
}
var wg sync.WaitGroup
wg.Add(len(filesInHomeDir))
fmt.Println("Printing files in", homeDir)
for _, file := range filesInHomeDir {
// wg.Add(1) // this also works instead of `wg.Add(len(filesInHomeDir))`
// anon function with parameter type `os.DisEntry`
// The `(file)` at the end of the expression immediately invokes
// the anonymous function with the value of the file variable.
go func(f os.DirEntry) {
defer wg.Done()
fmt.Println(f.Name())
}(file)
}
wg.Wait()
fmt.Println("finished....")
}
sync.Once
once-example.go
- Say you're building a REST API using the Go net/http package and you want a piece of code to be executed
- only when the HTTP handler is called (e.g. a get a DB connection).
- You can wrap that code with once.Do and rest assured that
- it will be only run when the handler is invoked for the first time.
package main
import (
"fmt"
"log"
"net/http"
"sync"
"time"
)
var once sync.Once
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Println("http handler start")
once.Do(oneTimeOp)
fmt.Println("http handler end")
w.Write([]byte("done!"))
})
log.Fatal(http.ListenAndServe(":8080", nil))
}
func oneTimeOp() {
fmt.Println("one time op start")
time.Sleep(3 * time.Second)
fmt.Println("one time op end")
}
How Goroutine works
- ๊ณ ๋ฃจํด์ ๋ช ๋ น์ ์ํํ๋ ๋จ์ผ ํ๋ฆ์ผ๋ก, OS ์ค๋ ๋๋ฅผ ์ด์ฉํ๋ ๊ฒฝ๋ ์ค๋ ๋
- 2-Core ์ปดํจํฐ ๊ฐ์
- Goroutine 1๊ฐ ์ผ๋:
Core1-OsThread1-GoRoutine1
- Goroutine 2๊ฐ ์ผ๋:
Core1-OsThread1-GoRoutine1
Core2-OsThread2-GoRoutine2
- Goroutine 3๊ฐ ์ผ๋
Core1-OsThread1-GoRoutine1
Core2-OsThread2-GoRoutine2
- ์์ ๋๋ ๋ ๊น์ง ๋๊ธฐ ํ ๋๋ 2์ฝ์ด์ '๊ณ ๋ฃจํด_3' ๋ฐฐ์ ๋จ
Core1-OsThread1-GoRoutine1
Core2-OsThread2-GoRoutine3
- GoRoutine3๊ฐ ์์คํ ์ฝ ํธ์ถ ํ ๋คํธ์ํฌ ๋๊ธฐ์ํ ์ผ๋:
- GoRoutine3๊ฐ ๋๊ธฐ๋ชฉ๋ก์ผ๋ก ๋น ์ง๊ณ , GoRoutin4๊ฐ ์ค๋ ๋2๋ฅผ ์ด์ฉํ์ฌ ์คํ๋จ
Core1-OsThread1-GoRoutine1
Core2-OsThread2-GoRoutine4
- ์ปจํ ์คํธ ์ค์์นญ์ CPU๊ฐ Thread๋ฅผ ๋ณ๊ฒฝํ ๋ ๋ฐ์ํ๋๋ฐ, ์ฝ์ด์ ์ค๋ ๋๋ ๋ณ๊ฒฝ๋์ง์๊ณ , ์ค๋ก์ง ๊ณ ๋ฃจํด๋ง ์ฎ๊ฒจ ๋ค๋๊ธฐ ๋๋ฌธ์,
-
๊ณ ๋ฃจํด์ ์ฌ์ฉํ๋ฉด, ์ปจํ ์คํธ ์ค์์นญ ๋น์ฉ์ด ๋ฐ์ํ์ง์๋๋ค!
-
์์คํ ์ฝ ํธ์ถ ์ (์ด์์ฒด์ ๊ฐ ์ง์ํ๋ ์๋น์ค๋ฅผ ํธ์ถ)
- (๊ณ ๋ฃจํด์ผ๋ก ์์คํ ์ฝ ํธ์ถ์; e.g. ๋คํธ์ํฌ๋ก ๋ฐ์ดํฐ ์ฝ์ ๋ ๋ฐ์ดํฐ ๋ค์ด์ฌ๋ ๊น์ง ๊ณ ๋ฃจํด์ด ๋๊ธฐ์ํ ๋จ),
- ๋คํธ์ํฌ ์์ ๋๊ธฐ์ํ์ธ ๊ณ ๋ฃจํด์ด ๋๊ธฐ๋ชฉ๋ก์ผ๋ก ๋น ์ง๊ณ , ๋๊ธฐ์ค์ด๋ ๋ค๋ฅธ ๊ณ ๋ฃจํด์ด OS ์ค๋ ๋๋ฅผ ์ด์ฉํ์ฌ ์คํ ๋จ
- ์ฝ์ด์ ์ค๋ ๋ ๋ณ๊ฒฝ(์ปจํ ์คํธ ์ค์์นญ) ์์ด ๊ณ ๋ฃจํด์ด ์ฎ๊ฒจ๋ค๋๊ธฐ ๋๋ฌธ์ ํจ์จ์
- ์ฝ์ด๊ฐ ์ค๋ ๋ ์ฎ๊ฒจ๋ค๋๋ ์ปจํ ์คํธ ์ค์์นญ์ ํ์ง ์๊ณ , ๋์ ๊ณ ๋ฃจํด์ด ์ง์ ๋๊ธฐ์ํ <-> ์คํ์ํ ์ค์์นญ ์ฎ๊ฒจ๋ค๋ ์ ํจ์จ์
Concurrent programming precautions
- ๋ฌธ์ ์ :
ํ๋์/๋์ผํ ๋ฉ๋ชจ๋ฆฌ ์์
์์ฌ๋ฌ๊ฐ ๊ณ ๋ฃจํด
์ ๊ทผ! - e.g. ์ ๊ธ1000, ์ถ๊ธ1000 ์ 10๊ฐ์ ๊ณ ๋ฃจํด์ด ๋์ ์คํ ํ๋ ์ํฉ
- ๋๊ฐ ๊ณ ๋ฃจํด์ด ๊ฐ๊ฐ 1000์ ์ ๊ธํ๋๋ฐ 2000์ด ์๋ 1000์ด๋์ํ์์ ๋ค์ ๋๋ฒ ์ถ๊ธ์ < 0 : panic!
race condition
- ํด๊ฒฐ์ฑ
: ํ ๊ณ ๋ฃจํด์์ ๊ฐ์ ๋ณ๊ฒฝํ ๋ ๋ค๋ฅธ ๊ณ ๋ฃจํด์ด ์ ๊ทผํ์ง ๋ชปํ๋๋ก
mutex
ํ์ฉ -
mutual exclusion
-
A
race condition
occurs when two or more threads access shared data and attempt to modify it simultaneously. - To prevent race conditions, you typically use locks or synchronization mechanisms.
-
A lock ensures that only one thread can access the shared data at a time.
-
Example of
race condition
package main
import (
"fmt"
"sync"
"time"
)
type Account struct {
Balance int
}
func DepositAndWithdraw(account *Account) {
if account.Balance < 0 {
panic(fmt.Sprintf("Balance should not be negative value: %d", account.Balance))
}
account.Balance += 1000
time.Sleep(time.Millisecond)
account.Balance -= 1000
}
func main() {
var wg sync.WaitGroup
account := &Account{0}
wg.Add(10)
for i:=0; i< 10; i++ {
// ํ๋์ ์์์ ๋ค์์ ๊ณ ๋ฃจํด์ด ์ ๊ทผ ํจ
// ๋ฎคํ
์ค๋ฅผ ํตํด Lock ๊ฑธ์ด ํด๊ฒฐ
go func() {
for {
DepositAndWithdraw(account)
}
wg.Done()
}()
}
wg.Wait()
}
Solving concurrency problems using mutexes
- ํ ๊ณ ๋ฃจํด์์ ๊ฐ์ ๋ณ๊ฒฝํ ๋ ๋ค๋ฅธ ๊ณ ๋ฃจํด์ด ์ ๊ทผํ์ง ๋ชปํ๋๋ก
mutex
ํ์ฉ:mutual exclusion
mutex.Lock()
์ผ๋ก mutex ํ๋mutext.Unlock()
์ผ๋ก mutex ๋ฐ๋ฉ- ๋ค๋ฅธ ๊ณ ๋ฃจํด์ด ์ด๋ฏธ ๋ฎคํ
์ค๋ฅผ ํ๋ํ๋ค๋ฉด ํด๋น ๊ณ ๋ฃจํด์ด ๋ฎคํ
์ค๋ฅผ ๋์ ๋(
mutex.Unlock()
) ๊น์ง ๊ธฐ๋ค๋ฆผ - ํ์ง๋ง ์ค์ง ํ๋์ ๊ณ ๋ฃจํด๋ง ๊ณต์ ์์์ ์ ๊ทผํ๊ธฐ ๋๋ฌธ์ ๋์์ฑ ํ๋ก๊ทธ๋จ ์ฑ๋ฅ ํฅ์ ์๋ฏธ๊ฐ ์์ด์ง..
-
๋ํ
Deadlock
๋ฐ์ ๊ฐ๋ฅ! -
A mutex (short for โmutual exclusionโ) is a synchronization primitive used
- to protect shared resources in concurrent programs. It ensures that
- only one goroutine can access a critical section of code (a shared resource) at any given time.
- Mutexes prevent race conditions by allowing exclusive access to data.
package main
import (
"fmt"
"sync"
"time"
)
var mutex sync.Mutex
type Account struct {
Balance int
}
func DepositAndWithdraw(account *Account) {
// ๋ฎคํ
์ค ํ๋!
mutex.Lock()
// ํ๋ฒ ํ๋ํ ๋ฎคํ
์ค๋ ๋ฐ๋์ Unlock() ํธ์ถํ์ฌ ๋ฐ๋ฉ
// `defer`: ํจ์ ์ข
๋ฃ ์ ์ ๋ฎคํ
์ค Unlock() ๋ฉ์๋ ํธ์ถ
defer mutex.Unlock()
if account.Balance < 0 {
panic(fmt.Sprint("Balance cannot be negative: %d", account.Balance))
}
account.Balance += 1000
time.Sleep(time.Millisecond)
account.Balance -= 1000
}
func main() {
var wg sync.WaitGroup
account := &Account{0}
wg.Add(10)
for i:=0; i< 10; i++ {
go func() {
for {
DepositAndWithdraw(account)
}
wg.Done()
}()
}
wg.Wait()
}
mutex to ensure atomic access to a shared variable
A mutex helps achieve atomic access by allowing only one thread to hold the lock (mutex) at any given time. The following code achieve 1.concurrency and 2.preventing race conditions.
Using mutex and WaitGroup
package main
import (
"fmt"
"sync"
)
type counter struct {
i int64
wg sync.WaitGroup
mu sync.Mutex
}
func (c *counter) increment() {
defer c.wg.Done()
c.mu.Lock()
c.i += 1
c.mu.Unlock()
}
func main() {
c := counter{i: 0}
for i := 0; i < 1000; i++ {
c.wg.Add(1)
go c.increment()
}
c.wg.Wait()
fmt.Println("Final Counter Value:", c.i)
}
Using mutex and done channel
- The done channel is used without a sync.WaitGroup.
- Each worker goroutine sends a signal to the done channel when it completes its task.
- The main goroutine waits for all workers to finish by receiving from the done channel.
- This approach allows us to manage concurrency without using a sync.WaitGroup
package main
import (
"fmt"
"runtime"
"sync"
)
const initialValue = -500
type counter struct {
i int64
mu sync.Mutex // ๊ณต์ ๋ฐ์ดํฐ i๋ฅผ ๋ณดํธํ๊ธฐ ์ํ ๋ฎคํ
์ค
once sync.Once // ํ ๋ฒ๋ง ์ํํ ํจ์๋ฅผ ์ง์ ํ๊ธฐ ์ํ Once ๊ตฌ์กฐ์ฒด
}
// counter ๊ฐ์ 1์ฉ ์ฆ๊ฐ์ํด
func (c *counter) increment() {
// i ๊ฐ ์ด๊ธฐํ ์์
์ ํ ๋ฒ๋ง ์ํ๋๋๋ก once์ Do() ๋ฉ์๋๋ก ์คํ
c.once.Do(func() {
c.i = initialValue
})
c.mu.Lock() // i ๊ฐ์ ๋ณ๊ฒฝํ๋ ๋ถ๋ถ(์๊ณ ์์ญ)์ ๋ฎคํ
์ค๋ก ์ ๊ธ
c.i += 1 // ๊ณต์ ๋ฐ์ดํฐ ๋ณ๊ฒฝ
c.mu.Unlock() // i ๊ฐ์ ๋ณ๊ฒฝ ์๋ฃํ ํ ๋ฎคํ
์ค ์ ๊ธ ํด์
}
// counter์ ๊ฐ์ ์ถ๋ ฅ
func (c *counter) display() {
fmt.Println(c.i)
}
func main() {
// ๋ชจ๋ CPU๋ฅผ ์ฌ์ฉํ๊ฒ ํจ
runtime.GOMAXPROCS(runtime.NumCPU())
c := counter{i: 0} // ์นด์ดํฐ ์์ฑ
done := make(chan struct{}) // ์๋ฃ ์ ํธ ์์ ์ฉ ์ฑ๋
// c.increment()๋ฅผ ์คํํ๋ ๊ณ ๋ฃจํด 1000๊ฐ ์คํ
for i := 0; i < 1000; i++ {
go func() {
c.increment() // ์นด์ดํฐ ๊ฐ์ 1 ์ฆ๊ฐ์ํด
done <- struct{}{} // done ์ฑ๋์ ์๋ฃ ์ ํธ ์ ์ก
}()
}
// ๋ชจ๋ ๊ณ ๋ฃจํด์ด ์๋ฃ๋ ๋๊น์ง ๋๊ธฐ
for i := 0; i < 1000; i++ {
<-done
}
c.display() // c์ ๊ฐ ์ถ๋ ฅ
}
More done channel example
https://gobyexample.com/closing-channels
package main
import "fmt"
func main() {
jobs := make(chan int, 5)
done := make(chan bool)
go func() {
for {
j, more := <-jobs
if more {
fmt.Println("received job", j)
} else {
fmt.Println("received all jobs")
done <- true
return
}
}
}()
for j := 1; j <= 3; j++ {
jobs <- j
fmt.Println("sent job", j)
}
close(jobs)
fmt.Println("sent all jobs")
<-done
_, ok := <-jobs
fmt.Println("received more jobs:", ok)
}
The problem with mutexes
- ๋ฎคํ ์ค๋ ๋์์ฑ ํ๋ก๊ทธ๋๋ฐ ์ฑ๋ฅ์ด์ ๊ฐ์์ํด
๋ฐ๋๋ฝ
๋ฐ์ ๊ฐ๋ฅ- e.g. ์ํ์ A์ B๊ฐ ๊ฐ๊ฐ ์์ 1, ํฌํฌ1 ์ง๊ณ ์์.
- A,B๊ฐ ํฌํฌ1, ์์ 1 ์ง์ผ๋ ค ํ ๋, A,B ๋๊ตฌํ๋ ์๋ณดํ์ง ์์, ๋ฐฅ์ ๋จน์ ์ ์์: ๋๊ฐ mutex ๊ฐ๊ฐ ์ฐจ์ง
์ด๋ค ๊ณ ๋ฃจํด๋ ์ํ๋ ๋งํผ ๋ฎคํ ์ค๋ฅผ ํ๋ณดํ์ง ๋ชปํด์ ๋ฌดํํ ๋๊ธฐํ๋ ๊ฒฝ์ฐ
; ๋ฐ๋๋ฝ- ๋ฉํฐ์ฝ์ด ํ๊ฒฝ์์๋ ์ฌ๋ฌ ๊ณ ๋ฃจํด์ผ๋ก ์ฑ๋ฅ ํฅ์ ๊ฐ๋ฅ
- ๊ฐ์๋ฉ๋ชจ๋ฆฌ ์ ๊ทผ์ ๊ผฌ์ผ ์ ์์
- ๋ฎคํ ์ค๋ก ๊ณ ๋ฃจํด ํ๋๋ง ์ ๊ทผํ๋๋ก ํ์ฌ ๊ผฌ์ด๋ ๋ฌธ์ ํด๊ฒฐ ๊ฐ๋ฅ
- ํ์ง๋ง, ๋ฎคํ ์ค๋ฅผ ์๋ชป ์ฌ์ฉํ๋ฉด ์ฑ๋ฅํฅ์ ์์ด ๋ฐ๋๋ฝ ๋ฐ์๊ฐ๋ฅ
- ๋ฎคํ ์ค ์ฌ์ฉ์ ์ข์ ๋ฒ์์์ ์ฌ์ฉํ์ฌ ๋ฐ๋๋ฝ ๋ฐ์ ๋ฐฉ์ง
-
๋๋ ๋๋ค ์์ -> ํฌํฌ ์์๋ก ๋ฎคํ ์ค ๋ฝ ์ฌ์ฉํ๋ฉด ํด๊ฒฐ ๊ฐ๋ฅ
-
๋ฉํฐ์ฝ์ด ์ปดํจํฐ์์๋ ์ฌ๋ฌ ๊ณ ๋ฃจํด์ ์ฌ์ฉํ์ฌ ์ฑ๋ฅ ํฅ์
- ํ์ง๋ง ๊ฐ์ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ์ฌ๋ฌ ๊ณ ๋ฃจํด์ด ์ ๊ทผํ๋ฉด ํ๋ก๊ทธ๋จ์ด ๊ผฌ์ผ ์ ์์
- ๋ฎคํ ์ค๋ฅผ ์ด์ฉํ๋ฉด ๋์์ ๊ณ ๋ฃจํด ํ๋๋ง ์ ๊ทผํ๋๋ก ์ ์ฅํด ๊ผฌ์ด๋ ๋ฌธ์ ๋ฅผ ๋ง์ ์ ์๋ค.
-
๊ทธ๋ฌ๋ ๋ฎคํ ์ค๋ฅผ ์๋ชป ์ฌ์ฉํ๋ฉด ์ฑ๋ฅ ํฅ์๋ ๋ชปํ๊ณ ๋ฐ๋๋ฝ์ด๋ผ๋ ์ฌ๊ฐํ ๋ฌธ์ ๊ฐ ์๊ธธ ์ ์๋ค.
-
Deadlock in goroutines (not in using mutex): different scenario than above
mutex
deadlock - A deadlock occurs when a group of goroutines are waiting for each other, and none of them can proceed.
-
Essentially, they're stuck in a circular dependency, unable to make progress.
-
Deadlock
์์
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
var wg sync.WaitGroup
func diningProblem(name string, first, second *sync.Mutex, firstName, secondName string) {
for i:= 0; i<100; i++ {
fmt.Printf("%s ๋ฐฅ์ ๋จน์ผ๋ ค ํฉ๋๋ค.\n", name)
first.Lock()
fmt.Printf("%s %s ํ๋\n", name, fisrtName)
second.Lock()
fmt.Printf("%s %s ํ๋\n", name, secondName)
fmt.Printf("%s ๋ฐฅ์ ๋จน์ต๋๋ค.\n", name)
time.Sleep(time.Duration(rand.Intn(1000))* time.Millisecond)
second.Unlock()
first.Unlock()
}
wg.Done()
}
func main() {
rand.Seed(time.Now().UnixNano())
wg.Add(2)
fork := &sync.Mutex{}
spoon := &sync.Mutex{}
go diningProblem("A", fork, spoon, "ํฌํฌ", "์์ ")
go diningProblem("B", spoon, fork, "์์ ", "ํฌํฌ")
wg.Wait()
}
Another Resource Management Technique
- ๊ฐ ๊ณ ๋ฃจํด์ด ์๋ก ๋ค๋ฅธ ์์์ ์ ๊ทผํ๊ฒ ๋ง๋๋ ๋๊ฐ์ง ๋ฐฉ๋ฒ
-
mutex์์ด ๋์์ฑ ํ๋ก๊ทธ๋๋ฐ ๊ฐ๋ฅ
-
์์ญ์ ๋๋๋ ๋ฐฉ๋ฒ
- ๊ฐ ๊ณ ๋ฃจํด์ ํ ๋น๋ ์์ ๋ง ํ๋ฏ๋ก ๊ณ ๋ฃจํด(์์ ์)๊ฐ ๊ฐ์ญ ์์
- ๊ณ ๋ฃจํด ๊ฐ ๊ฐ์ญ์ด ์์ด์ ๋ฎคํ ์ค๋ ํ์ ์์
-
์ญํ ์ ๋๋๋ ๋ฐฉ๋ฒ : Channel๊ณผ ํจ๊ป ์ค๋ช
-
๊ฐ ๊ณ ๋ฃจํด์ ํ ๋น๋ ์์ ๋ง ํ๋ฏ๋ก ๊ณ ๋ฃจํด๊ฐ ๊ฐ์ญ์ด ๋ฐ์ํ์ง ์์์, Mutex๊ฐ ํ์์์
package main
import (
"fmt"
"sync"
"time"
)
type Job interface {
Do()
}
type SquareJob struct {
index int
}
func (j *SquareJob) Do() {
fmt.Printf("%d ์์
์์\n", j.index)
time.Sleep(1 * time.Second)
fmt.Printf("%d ์์
์๋ฃ - ์์
๊ฒฐ๊ณผ: %d\n", j.index, j.index * j.index)
}
func main() {
jobList := [10]Job
for i:=0 ; i< len(jobList); i++ {
jobList[i] = &SquareJob{i}
}
var wg sync.WaitGroup
wg.Add(10)
for i:=0; i<10; i++ {
job := jobList[i]
go func() {
job.Do()
wg.Done()
}
}
wg.Wait()
}
Channel
- ์ฑ๋: ๊ณ ๋ฃจํด๋ผ๋ฆฌ ๋ฉ์์ง๋ฅผ ์ ๋ฌ ํ ์ ์๋ ๋ฉ์์ง ํ
- ๋ฉ์์งํ์ ๋ฉ์์ง๊ฐ ์์ด๊ฒ ๋๊ณ
-
๋ฉ์์ง๋ฅผ ์ฝ์ ๋๋ ์ฒ์์จ ๋ฉ์์ง๋ถํฐ ์ฐจ๋ก๋๋ก ์ฝ์
-
์ฑ๋ ์ธ์คํด์ค ์์ฑ
-
์ฑ๋์ ์ฌ์ฉํ๊ธฐ ์ํด์๋ ๋จผ์ ์ฑ๋ ์ธ์คํด์ค๋ฅผ ๋ง๋ค์ด์ผ ํจ
-
Channels are reference types, meaning they are already a reference to the underlying data structure.
square(ch chan int) {}
- no need to declare channel parameters as pointers since channel is already a reference type
// ์ฑ๋ํ์
: chan string
// chan: ์ฑ๋ํค์๋
// string: ๋ฉ์์ง ํ์
var messages chan string = make(chan string)
- ์ฑ๋์ ๋ฐ์ดํฐ ๋ฃ๊ธฐ
var messages chan string = make(chan string)
messages <- "This is a message"
- ์ฑ๋์์ ๋ฐ์ดํฐ ๋นผ๊ธฐ
// ์ฑ๋์์ ๋นผ๋ธ ๋ฐ์ดํฐ๋ฅผ ๋ด์ ๋ณ์
// "์ฑ๋ messages์ ๋ฐ์ดํฐ๊ฐ ์์ผ๋ฉด ๋ฐ์ดํฐ๊ฐ ๋ค์ด์ฌ๋๊น์ง '๋๊ธฐํจ'"
var msg string = <- messages
- ์์ฑํ goroutine์์ ์ฑ๋์ ๋ฐ์ดํฐ ๋นผ๊ธฐ (consumer)
- main goroutine์์ ๋ฐ์ดํฐ ๋ฃ๊ธฐ (provider)
package main
import (
"fmt"
"sync"
"time"
)
func square(wg *sync.WaitGroup, ch chan int) {
// ๋ฐ์ดํฐ๋ฅผ ๋นผ์จ๋ค
// ๋ฐ์ดํฐ ๋ค์ด์ฌ๋๊น์ง ๋๊ธฐ
n := <- ch
time.Sleep(time.Second)
fmt.Printf("Square: %d\n", n*n)
wg.Done()
}
// main ๊ณ ๋ฃจํด๊ณผ square ๊ณ ๋ฃจํด์ด ๋์ ์คํ
// main ๋ฃจํด์์ ์ฑ๋์ 9๋ฅผ ๋ฃ์ด์ค๋๊น์ง square๋ฃจํด์ ๋๊ธฐ์ํ
// 1. main ๋ฃจํด
// 2. square() ๋ฃจํด
func main() {
var wg sync.WaitGroup
// ํฌ๊ธฐ 0์ธ ์ฑ๋ ์์ฑ : ๋ฐ๋์ ๋ค๋ฅธ ๊ณ ๋ฃจํด์ด ์ฑ๋์์ ๋ฐ์ดํฐ ๊บผ๋ด์ผ ์ ์ ์ข
๋ฃ
// ์ด๋ค ๊ณ ๋ฃจํด๋ ๋ฐ์ดํฐ ๋นผ์ง ์์ผ๋ฉด ๋ชจ๋ ๊ณ ๋ฃจํด์ด ๊ณ์๋๊ธฐ ํ๋ค๊ฐ deadlock ๋ฐ์
ch := make(chan int)
wg.Add(1)
// ๋ฐ์ดํฐ๋ฅผ ๋นผ์ ์ฒ๋ฆฌ
go square(&wg, ch)
// ๋ฐ์ดํฐ ๋ฃ๋๋ค.
ch <- 9
// square๋ด์์ main๋ฃจํด์์ ๋ฃ์ด์ค ์ฑ๋ ๋ฐ์ดํฐ ๋นผ๊ณ wg.Done() ์๋ฃ ๋ ๋๊น์ง ๋๊ธฐ
wg.Wait()
}
- ์์ฑํ goroutine์์ ์ฑ๋์ ๋ฐ์ดํฐ ๋ฃ๊ธฐ (provider)
- main goroutine์์ ๋ฐ์ดํฐ ๋บด๊ธฐ (consumer)
package main
import (
"fmt"
"sync"
)
func computeAndSendResult(wg *sync.WaitGroup, ch chan<- int) {
defer wg.Done()
// Perform some computation
result := 42
// Send the result through the channel
ch <- result
}
func main() {
var wg sync.WaitGroup
wg.Add(1)
resultCh := make(chan int)
// produce
go computeAndSendResult(&wg, resultCh)
// consume
// Receive the result from the channel
result := <-resultCh
fmt.Println("Received result:", result)
wg.Wait()
}
go channel with range and close
- https://medium.com/better-programming/manging-go-channels-with-range-and-close-98f93f6e8c0c
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
func produce(c chan<- int) {
for i := 0; i < 5; i++ {
c <- i
fmt.Printf("producer put i=%d\n", i)
}
// Without closing channel, the consumer will wait indefinitely for channel
close(c)
fmt.Println("producer finished.\n")
}
func consume(c <-chan int) {
fmt.Println("consumer sleeps for 5 seconds...")
time.Sleep(5 * time.Second)
fmt.Println("consumer started")
for i := range c {
fmt.Printf("consumer gets i = %d\n", i)
}
fmt.Println("consumer finished. press ctrl+c to exit")
}
func main() {
// Both producer and consumer goroutine do not have to coexist
// i.e. even if the producer goroutine finishes (and closes the channel),
// the consumer goroutine range loop will receive all the values.
c := make(chan int, 5)
// producer
go produce(c)
// consumer
go consume(c)
e := make(chan os.Signal)
// `signal.Notify` registers a channel `e` to receive specific signals
// -> list of signals to capture i.e. `syscall.SIGINT`(Ctrl+c), `syscall.SIGTERM`(termination), etc...
signal.Notify(e, syscall.SIGINT, syscall.SIGTERM)
// blocks the main goroutine until one of these signals is received.
<-e
}
- Modify above so that it works without using
signal.Notify
-
- use
done
channel of type struct{}
- use
-
- use
sync.WaitGroup
- use
-
use
done
channel of type struct{}
package main
import (
"fmt"
"time"
)
func produce(c chan<- int, done chan<- struct{}) {
for i := 0; i < 5; i++ {
c <- i
fmt.Printf("producer put i=%d\n", i)
}
// Without closing channel, the consumer will wait indefinitely for channel
close(c)
fmt.Println("producer finished.\n")
done <- struct{}{}
}
func consume(c <-chan int, done chan<- struct{}) {
fmt.Println("consumer sleeps for 5 seconds...")
time.Sleep(5 * time.Second)
fmt.Println("consumer started")
for i := range c {
fmt.Printf("consumer gets i = %d\n", i)
}
// fmt.Println("consumer finished. press ctrl+c to exit")
fmt.Println("consumer finished.")
done <- struct{}{}
}
func main() {
// Both producer and consumer goroutine do not have to coexist
// i.e. even if the producer goroutine finishes (and closes the channel),
// the consumer goroutine range loop will receive all the values.
done := make(chan struct{})
c := make(chan int, 5)
// producer
go produce(c, done)
// consumer
go consume(c, done)
// in other cases use sync.WaitGroup to make main goroutine wait
// e := make(chan os.Signal)
// // `signal.Notify` registers a channel `e` to receive specific signals
// // -> list of signals to capture i.e. `syscall.SIGINT`(Ctrl+c), `syscall.SIGTERM`(termination), etc...
// signal.Notify(e, syscall.SIGINT, syscall.SIGTERM)
// // blocks the main goroutine until one of these signals is received.
// <-e
<-done
<-done
}
- use
sync.WaitGroup
package main
import (
"fmt"
"sync"
"time"
)
func produce(c chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 5; i++ {
c <- i
fmt.Printf("producer put i=%d\n", i)
}
// Without closing channel, the consumer will wait indefinitely for channel
close(c)
fmt.Println("producer finished.\n")
}
func consume(c <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("consumer sleeps for 5 seconds...")
time.Sleep(5 * time.Second)
fmt.Println("consumer started")
for i := range c {
fmt.Printf("consumer gets i = %d\n", i)
}
// fmt.Println("consumer finished. press ctrl+c to exit")
fmt.Println("consumer finished.")
}
func main() {
var wg sync.WaitGroup
// Both producer and consumer goroutine do not have to coexist
// i.e. even if the producer goroutine finishes (and closes the channel),
// the consumer goroutine range loop will receive all the values.
c := make(chan int, 5)
wg.Add(2)
// producer
go produce(c, &wg)
// consumer
go consume(c, &wg)
// in other cases use sync.WaitGroup to make main goroutine wait
// e := make(chan os.Signal)
// // `signal.Notify` registers a channel `e` to receive specific signals
// // -> list of signals to capture i.e. `syscall.SIGINT`(Ctrl+c), `syscall.SIGTERM`(termination), etc...
// signal.Notify(e, syscall.SIGINT, syscall.SIGTERM)
// // blocks the main goroutine until one of these signals is received.
// <-e
wg.Wait()
}
- Both producer and consumer goroutine do not have to coexist
-
i.e. even if the producer goroutine finishes (and closes the channel),
- the consumer goroutine range loop will receive all the values.
-
We can simulate this scenario by using a combination of:
- A buffered channel in the producer
- Delaying the consumer goroutine by adding a time.Sleep()
Channel size
- ๊ธฐ๋ณธ ์ฑ๋ํฌ๊ธฐ 0
- ์ฑ๋ํฌ๊ธฐ0: ์ฑ๋์ ๋ฐ์ดํฐ ๋ณด๊ดํ ๊ณณ์ด ์์ผ๋ฏ๋ก ๋ฐ์ดํฐ ๋นผ๊ฐ๋๊น์ง ๋๊ธฐ
- ์ฑ๋ ๋ง๋ค๋ ๋ฒํผ ํฌ๊ธฐ ์ค์ ๊ฐ๋ฅ
package main
import (
"fmt"
)
func main() {
ch := make(chan int)
// **DEADLOCK ์ฑ๋์ ๋ฐ์ดํฐ ๋ฃ๊ธฐ๋ง ํ๊ณ ๋บด์ง ์์๋
// ์ฑ๋์ ๋ฐ์ดํฐ ๋ฃ์๋ ๊ธฐ๋ณธ์ฌ์ด์ฆ 0์ด๊ธฐ ๋๋ฌธ์
// ๋ณด๊ดํ ์ ์์ผ๋ฏ๋ก ์ฑ๋์์ ๋ฐ์ดํฐ ๋นผ๋ ์ฝ๋๊ฐ ์์ด์ผ ์งํ๊ฐ๋ฅ!: goroutine 1๊ฐ ์์ฑํด์ ํด๋น func()์์ <-ch ํด์ค์ผํจ
// ๋ฐ๋๋ฝ ๋ฐ์! ํ๋ฐฐ ๋ณด๊ด์ฅ์ ์์ผ๋ฉด ๋ฌธ์์์ ๊ธฐ๋ค๋ ค์ผ ํจ
// ๋ฐ์ดํฐ ๋ณด๊ดํ ์ ์๋ ๋ฉ๋ชจ๋ฆฌ์์ญ: ๋ฒํผ
ch <- 9
fmt.Println("Never print")
}
- ๋ฒํผ ๊ฐ์ง ์ฑ๋
var chan string messages = make(chan string, 2)
- ๋ฒํผ๊ฐ ๋ค ์ฐจ๋ฉด, ๋ฒํผ๊ฐ ์๋ ํฌ๊ธฐ 0 ์ฑ๋์ฒ๋ผ ๋น์๋ฆฌ ์๊ธธ๋ ๊น์ง ๋๊ธฐ
- ๋ฐ์ดํฐ๋ฅผ ๋นผ์ฃผ์ง ์์ผ๋ฉด ๋ฒํผ์์ ๋ ์ฒ๋ผ ๊ณ ๋ฃจํด์ด ๋ฉ์ถ๊ฒ๋จ
Waiting for Data in a Channel
- ๊ณ ๋ฃจํด์์ ๋ฐ์ดํฐ๋ฅผ ๊ณ์ ๊ธฐ๋ค๋ฆฌ๋ฉด์ ๋ฐ์ดํฐ๊ฐ ๋ค์ด์ค๋ฉด ์์ ์ ์ํ
package main
import (
"fmt"
"sync"
"time"
)
func square(wg *sync.WaitGroup, ch chan int) {
// ์ฑ๋์ ๋ฐ์ดํฐ๊ฐ ๋ค์ด ์ฌ๋๊น์ง '๊ณ์' ๊ธฐ๋ค๋ฆผ
// ๋ฐ๋๋ฝ ๋ฐฉ์ง: square() ํธ์ถ ๋ฐ์์ close(์ฑ๋)๋ก ์ฑ๋์ด ๋ซํ๋ฉด
// for๋ฌธ์ ์ข
๋ฃํ์ฌ ํ๋ก๊ทธ๋จ ์ ์ ์ข
๋ฃํ๋๋ก ํจ
for n := range ch {
fmt.Printf("Square: %d\n", n*n)
time.Sleep(time.Second)
}
// ์์ for๋ฌธ์์ ๊ณ์ ์ฑ๋๋ก ๋ค์ด์ค๋ ๋ฐ์ดํฐ ๊ธฐ๋ค๋ฆฌ๋ ๋์์, ์คํ๋์ง ์์
// ๋ฐ์ดํฐ๋ฅผ ์ฑ๋ ch์ ๋ชจ๋ ๋ฃ์ ๋ค์์ close(ch)๋ก ์ฑ๋์ ๋ซ์ผ๋ฉด
// for eange์์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ณ ๋์ ์ฑ๋์ด ๋ซํ ์ํ๋ผ๋ฉด for๋ฌธ ์ข
๋ฃํจ -> wg.Done() ์คํ๋จ!
wg.Done()
}
func main() {
var wg sync.WaitGroup
ch := make(chan int)
wg.Add(1)
go square(&wg, ch)
// 10๋ฒ๋ง ๋ฐ์ดํฐ๋ฅผ ๋ฃ์
// square ๋ด์์ ์ฑ๋ ๋ฐ์ดํฐ ๊ณ์ ๊ธฐ๋ค๋ฆผ
for i :=0; i< 10; i++ {
ch <- i * 2
}
// ์์
์๋ฃ๋ฅผ ๊ธฐ๋ค๋ฆฌ์ง๋ง,
// square() ๋ด์์ wg.Done()์ด ์คํ ๋์ง ์๊ณ deadlock๋ฐ์
// ํ์ง๋ง ์ฑ๋์ ๋ซ์์ ๋ฐ๋๋ฝ ๋ฐฉ์ง ๊ฐ๋ฅ
// ์ฑ๋์์ ๋ฐ์ดํฐ๋ฅผ ๋ชจ๋ ๋นผ๋ธ ์ํ์ด๊ณ , ์ฑ๋์ด ๋ซํ์ผ๋ฉด
// for range ๋ฌธ์ ๋น ์ ธ๋๊ฐ๊ฒ ๋จ -> wg.Done()์ด ์คํ๋จ!!
close(ch)
wg.Wait()
}
SELECT statement
- ์ฌ๋ฌ ์ฑ๋์ ๋์์ ๊ธฐ๋ค๋ฆด ์ ์์.
- ์ด๋ค ์ฑ๋์ด๋ผ๋ ํ๋์ ์ฑ๋์์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด์ค๋ฉด ํด๋น ๊ตฌ๋ฌธ์ ์คํํ๊ณ select๋ฌธ์ด ์ข ๋ฃ๋จ.
- ํ๋์ case๋ง ์ฒ๋ฆฌ๋๋ฉด ์ข
๋ฃ๋๊ธฐ ๋๋ฌธ์, ๋ฐ๋ณตํด์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ณ ์ถ๋ค๋ฉด
for
๋ฌธ๊ณผ ํจ๊ป ์ฌ์ฉ ํด์ผํจ - ์ฑ๋์ ๋ฐ์ดํฐ๊ฐ ๋ค์ด์ค๊ธธ ๊ธฐ๋ค๋ฆฌ๋ ๋์ , ๋ค๋ฅธ์์ ์ํํ๊ฑฐ๋, ์ฌ๋ฌ์ฑ๋ ๋์๋๊ธฐ
- ์ฌ๋ฌ๊ฐ ์ฑ๋์ ๋์์ ๊ธฐ๋ค๋ฆผ. ํ๋์ ์ผ์ด์ค๋ง ์ฒ๋ฆฌ๋๋ฉด ์ข ๋ฃ๋จ
- ๋ฐ๋ณต๋ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ํ๋ ค๋ฉด for๋ฌธ๋ ๊ฐ์ด ์ฌ์ฉ
- ch์ฑ๋๊ณผ quit์ฑ๋์ ๋ชจ๋ ๊ธฐ๋ค๋ฆผ, ch์ฑ๋๋จผ์ ์๋ํ๊ธฐ ๋๋ฌธ์
- ch์ฑ๋ ๋จผ์ ์ฝ๋ค๊ฐ ๋ชจ๋ ์ฝ๊ณ ๋์ quit์ true๊ฐ ๋ค์ด์์ ์ฝ์ผ๋ฉด์ return ์คํ
package main
import (
"fmt"
"sync"
"time"
)
func square(wg *sync.WaitGroup, ch chan int, quit chan bool) {
for {
// ch์ quit ์์ชฝ์ ๋ชจ๋๊ธฐ๋ค๋ฆผ
// ch์ฑ๋์ ๋ฐ์ดํฐ๋ฅผ ๋ชจ๋ ์ฝ์ผ๋ฉด,
// quit ์ฑ๋๋ฐ์ดํฐ๋ฅผ ์ฝ๊ณ , square() ํจ์๊ฐ ์ข
๋ฃ๋จ
select {
case n := <-ch: // ch ์ฑ๋์์ ๋ฐ์ดํฐ๋ฅผ ๋นผ๋ผ ์ ์์ ๋ ์คํ
fmt.Printf("Squared: %d\n", n*n)
time.Sleep(time.Second)
case <- quit: // quit ์ฑ๋์์ ๋ฐ์ดํฐ๋ฅผ ๋นผ๋ผ ์ ์์ ๋ ์คํ
wg.Done()
return
}
}
}
func main() {
var wg sync.WaitGroup
ch := make(chan int)
quit := make(chan bool)
wg.Add(1)
go square(&wg, ch, quit)
for i:=0; i<10; i++ {
ch <- i
}
quit <- true
wg.Wait()
}
Execute at Regular Intervals
package main
import (
"fmt"
"sync"
"time"
)
func square(wg *sync.WaitGroup, ch chan int) {
// ์ํ๋ ์๊ฐ ๊ฐ๊ฒฉ์ผ๋ก ์ ํธ๋ฅผ ๋ณด๋ด์ฃผ๋ ์ฑ๋์ ๋ง๋ค ์ ์์
// 1์ด๊ฐ๊ฒฉ ์ฃผ๊ธฐ๋ก ์๊ทธ๋ ๋ณด๋ด์ฃผ๋ '์ฑ๋' ์์ฑํ์ฌ ๋ฐํ
// func Tick(d Duration) <-chan Time
// ์ด์ฑ๋์์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด์ค๋ฉด ์ผ์ ์๊ฐ๊ฐ๊ฒฉ์ผ๋ก ํ์ฌ ์๊ฐ์ ๋ํ๋ด๋ Timer ๊ฐ์ฒด๋ฅผ ๋ฐํ
tick := time.Tick(time.Second) // 1์ด ๊ฐ๊ฒฉ ์๊ทธ๋
// func After(d Duration) <-chan Time
// waits for the duration to elapse
// and then sends the current time on the returned channel.
// ์ด์ฑ๋์์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ผ๋ฉด ์ผ์ ์๊ฐ ๊ฒฝ๊ณผ ํ์ ํ์ฌ์๊ฐ์ ๋ํ๋ด๋ Timer ๊ฐ์ฒด๋ฅผ ๋ฐํ
terminate := time.After(10*time.Second) // 10์ด ์ดํ ์๊ทธ๋
for {
select {
case <- tick:
fmt.Println("tick")
case <- terminate:
fmt.Println("terminated...")
wg.Done()
return
case n := <-ch:
fmt.Printf("Squared: %d\n", n*n)
time.Sleep(time.Second)
}
}
}
func main() {
var wg sync.WaitGroup
ch := make(chan int)
wg.Add(1)
go square(&wg, ch)
for i:=0; i<10; i++ {
ch <-i
}
wg.Wait()
}
SELECT pattern
1-1. switch
func main() {
i:= "korea"
switch(i) {
case "korea":
fmt.Println("korea")
case "usa":
fmt.Println("usa")
case "japan":
fmt.Println("japan")
}
}
1-2. switch
func main() {
t:= time.Now()
switch i {
case t.Hour() < 12:
fmt.Println("It's before noon")
default:
fmt.Println("It's after noon")
}
}
1-3. switch
func WhiteSpace(c rune) bool {
switch c {
case ' ', '\t', '\n', '\f', '\r':
return true
}
return false
}
1-4. switch
func main() {
Loop:
for _, ch := range "a b\nc" {
switch ch {
case ' ':
break
case '\n':
break Loop
default:
fmt.Println("%c\n", ch)
}
}
}
1-1. select
- case๋ฌธ์ ์ฑ๋์ ๊ฐ์ด ๋ค์ด์ฌ ๋๊น์ง select๋ฌธ์์ block ๋จ
- c1 ์ฑ๋ ๊ฐ์ด ์์ผ๋ฉด c2 ํ๋ฆฐํธ
- c1, c2 ๋๋ค ์์ผ๋ฉด, default ํ๋ฆฐํธ
- default ์ผ์ด์ค ์ ์ ์ํ๋ฉด select๋ฌธ block๋จ
package main
import (
"fmt"
"time"
)
func main() {
c1 := make(chan string)
c2 := make(chan string)
go func() {
for {
time.Sleep(2 * time.Second)
c1 <- "one"
}
}()
go func() {
for {
time.Sleep(4 * time.Second)
c2 <- "two"
}
}()
for {
select {
case r1 := <-c1:
fmt.Printf("received: %s\n", r1)
case r2 := <-c2:
fmt.Printf("received: %s\n", r2)
default:
time.Sleep(1 * time.Second)
fmt.Printf("--default--\n")
}
}
}
1-2. select
- ์์ฐ์๊ฐ ๊ฒฐ๊ณผ๋ฅผ ์ค ๋ ๊น์ง ๊ธฐ๋ค๋ฆฌ๋ ๋ฐฉ์.
- ๊ฒฐ๊ณผ ๋ฐ์ผ๋ฉด return
package main
import (
"fmt"
"time"
)
func process(ch chan string) {
time.Sleep(10 * time.Second)
ch <- "process successful"
}
func scheduling() {
//do something
}
func main() {
ch := make(chan string)
go process(ch)
for {
time.Sleep(1 * time.Second)
select {
case v := <-ch:
fmt.Println("received value: ", v)
return
default:
fmt.Println("no value received")
}
scheduling()
}
}
1-3. select
- case s1 ์ด ์ ํ๋ ์ง, s2๊ฐ ์ ํ๋ ์ง๋ ๋ชจ๋ฆ ๋๋ค. ๋๋ค ์ ํ์ผ๋ก ์ฌ์ฉ ๋ ์ ์์ต๋๋ค.
package main
func server1(ch chan string) {
ch <- "from server1"
}
func server2(ch chan string) {
ch <- "from server2"
}
func main() {
output1 := make(chan string)
output2 := make(chan string)
go server1(output1)
go server2(output2)
time.Sleep(1 * time.Second)
select {
case s1 := <-output1:
fmt.Println(s1)
case s2 := <-output2:
fmt.Println(s2)
}
}
1-4. select
- 5์ด ์ด๋ด ์ด๋ฆ ์ ๋ ฅ๋ฐ๊ธฐ
package main
func main() {
}
Implementing Producer-Consumer Pattern with Channel
- ์ญํ ๋๋๋ ๋ฐฉ๋ฒ
- ์ปจ๋ฒ ์ด์ด๋ฒจํธ: ์ฐจ์ฒด์์ฐ->๋ฐํด์ค์น->๋์->์์ฑ
package main
import (
"fmt"
"sync"
"time"
)
// ๊ณต์ ์์:
// Body -> Tire -> Color
// ์์ฐ์ ์๋น์ ํจํด (Producer Consumer Pattern) ๋๋ pipeline pattern
// MakeBody()๋ฃจํด์ด ์์ฐ์, InstallTire()๋ฃจํด์ ์๋น์
// InstallTire()๋ PaintCar()๋ฃจํด์ ๋ํด์๋ ์์ฐ์
type Car struct {
Body string
Tire string
Color string
}
var wg sync.WaitGroup
var startTime = time.Now()
// 1์ด๊ฐ๊ฒฉ ์ฐจ์ฒด์์ฐํ์ฌ tireCh ์ฑ๋์ ๋ฐ์ดํฐ ๋ฃ์
// 10์ด ํ tireCh ์ฑ๋ ๋ซ๊ณ ๋ฃจํด์ข
๋ฃ
func MakeBody(tireCh chan *Car) { // ์ฐจ์ฒด์์ฐ
tick := time.Tick(time.Second)
after := time.After(10 * time.Second)
for {
select {
case <- tick:
// Make a body
car := &Car{}
car.Body = "Sports car"
tireCh <- car
case <-after:
close(tireCh)
wg.Done()
return
}
}
}
// tireCh์ฑ๋์์ ๋ฐ์ดํฐ ์ฝ์ด์ ๋ฐํด์ค์นํ๊ณ paintCh์ฑ๋์ ๋ฃ์ด์ค
// tireCh์ฑ๋ ๋ซํ๋ฉด ๋ฃจํด์ข
๋ฃํ๊ณ paintCh์ฑ๋ ๋ซ์์ค
func InstallTire(tireCh, paintCh chan *Car) { // ๋ฐํด์ค์น
for car := range tireCh {
// Make a body
time.Sleep(time.Second)
car.Tire = "Winter tire"
paintCh <- car
}
wg.Done()
close(paintCh)
}
// paintCh์ฑ๋์์ ๋ฐ์ดํฐ ์ฝ์ด์ ๋์์ ํ๊ณ , ์์ฑ๋ ์ฐจ ์ถ๋ ฅ
func PaintCar(paintCh chan *Car) { // ๋์
for car := range paintCh {
// Make a body
time.Sleep(time.Second)
car.Color = "Red"
duration := time.Now().Sub(startTime) // ๊ฒฝ๊ณผ ์๊ฐ ์ถ๋ ฅ
fmt.Printf("%.2f Complete Car: %s %s %s\n",duration.Seconds(), car.Body, car.Tire, car.Color)
}
wg.Done()
}
func main() {
tireCh := make(chan *Car)
paintCh := make(chan *Car)
fmt.Println("Start the factory")
wg.Add(3)
go MakeBody(tireCh)
go InstallTire(tireCh, paintCh)
go PaintCar(paintCh)
wg.Wait()
fmt.Println("Close the factory")
}
mutex to ensure atomic access to a shared variable
A mutex helps achieve atomic access by allowing only one thread to hold the lock (mutex) at any given time.
package main
import (
"fmt"
"sync"
)
type counter struct {
i int64
wg sync.WaitGroup
mu sync.Mutex
}
func main() {
c := counter{i: 0}
for i := 0; i < 1000; i++ {
c.wg.Add(1)
go func(num int) {
defer c.wg.Done()
c.mu.Lock()
c.i += 1
c.mu.Unlock()
}(i)
}
c.wg.Wait()
fmt.Println("Final Counter Value:", c.i)
}
buffered vs. unbuffered channel
If you don't explicitly close an unbuffered (non-buffered) channel in Go, it won't cause any immediate issues. However, there are important implications:
- Blocking Behavior:
- If a goroutine tries to receive from an unbuffered channel and no other goroutine is sending to it, the receiver will block indefinitely.
-
Similarly, if a goroutine tries to send to an unbuffered channel and no other goroutine is receiving from it, the sender will block until another goroutine starts receiving.
-
Resource Leaks:
- If you forget to close an unbuffered channel, it remains open indefinitely.
- This can lead to resource leaks, especially if the channel is used for synchronization or signaling.
-
Properly closing channels ensures that goroutines can exit gracefully when their work is done.
-
Signaling Completion:
- Closing an unbuffered channel is often used to signal the end of communication between goroutines.
- It allows the receiving goroutine to know that no more values will be sent.
- When the sender closes the channel, the receiver can detect this and exit gracefully.
In summary, while not explicitly closing an unbuffered channel won't cause immediate errors, it's good practice to close channels when they are no longer needed. This helps prevent blocking issues and ensures proper resource management. ๐
Context
-
https://go.dev/doc/database/cancel-operations
-
์ปจํ ์คํธ ์ฌ์ฉํ๊ธฐ
- ์ปจํ ์คํธ๋ ์์ ์ ์ง์ํ ๋ ์์ ๊ฐ๋ฅ ์๊ฐ, ์์ ์ทจ์ ๋ฑ์ ์กฐ๊ฑด์ ์ง์ํ ์ ์๋ ์์ ๋ช ์ธ์ ์ญํ
- ์๋ก์ด ๊ณ ๋ฃจํด์ผ๋ก ์์ ์์ํ ๋ ์ผ์ ์๊ฐ ๋์๋ง ์์ ์ง์ํ๊ฑฐ๋, ์ธ๋ถ์์ ์์ ์ทจ์์ ์ฌ์ฉ.
- ์์ ์ค์ ์ ๋ํ ๋ฐ์ดํฐ๋ ์ ๋ฌ ๊ฐ๋ฅ
- The
context
package provides tools for managing concurrent operations. - It allows you to control the lifecycle, cancellation, and propagation of requests across multiple Goroutines.
- Key features:
- Cancellation: Propagate cancellation signals across Goroutines.
- Deadlines: Set deadlines for operations.
- Values: Pass request-scoped data down the chain.
- Example: Managing timeouts for API requests using context.WithTimeout.
package main
import (
"context"
"fmt"
"net/http"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
urls := []string{
"https://api.example.com/users",
"https://api.example.com/products",
"https://api.example.com/orders",
}
results := make(chan string)
for _, url := range urls {
go fetchAPI(ctx, url, results)
}
for range urls {
fmt.Println(<-results)
}
}
func fetchAPI(ctx context.Context, url string, results chan<- string) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
results <- fmt.Sprintf("Error creating request for %s: %s", url, err.Error())
return
}
client := http.DefaultClient
resp, err := client.Do(req)
if err != nil {
results <- fmt.Sprintf("Error making request to %s: %s", url, err.Error())
return
}
defer resp.Body.Close()
results <- fmt.Sprintf("Response from %s: %d", url, resp.StatusCode)
}
Cancelable Context
- ์ด ์ปจํ ์คํธ๋ฅผ ๋ง๋ค์ด, ์์ ์์๊ฒ ์ ๋ฌํ๋ฉด ์์ ์ง์ํ ์ง์์๊ฐ ์ํ ๋ ์์ ์ทจ์ ์๋ฆด ์ ์์
package main
import (
"fmt"
"sync"
"time"
"context"
)
var wg sync.WaitGroup
// ์์
์ด ์ทจ์๋ ๋๊น์ง 1์ด๋ง๋ค ๋ฉ์์ง ์ถ๋ ฅํ๋ ๊ณ ๋ฃจํด
func PrintEverySecond(ctx context.Context) {
tick := time.Tick(time.Second)
for {
select {
case <-ctx.Done():
wg.Done()
return
case <-tick:
fmt.Println("tick")
}
}
}
func main() {
wg.Add(1)
// ์ทจ์ ๊ฐ๋ฅํ ์ปจํ
์คํธ ์์ฑ : ์ปจํ
์คํธ ๊ฐ์ฒด์ ์ทจ์ํจ์ ๋ฐํ
ctx, cancel := context.WithCancel(context.Background())
go PrintEverySecond(ctx)
time.Sleep(5 * time.Second)
// ์์
์ทจ์
// ์ปจํ
์คํธ์ Done()์ฑ๋์ ์๊ทธ๋์ ๋ณด๋ด, ์์
์๊ฐ ์์
์ทจ์ํ๋๋ก ์๋ฆผ
// <-ctx.Done() ์ฑ๋
cancel()
wg.Wait()
}
Setting a Timeout in Context
- ์ผ์ ์๊ฐ ๋์๋ง ์์ ์ ์ง์ํ ์ ์๋ ์ปจํ ์คํธ ์์ฑ
- WithTimeout() ๋๋ฒ์งธ ์ธ์๋ก ์๊ฐ์ ์ค์ ํ๋ฉด, ๊ทธ์๊ฐ์ด ์ง๋ ๋ค
- ์ปจํ ์คํธ์ Done()์ฑ๋์ ์๊ทธ๋์ ๋ณด๋ด์ ์์ ์ข ๋ฃ
- WithTimeout() ์ญ์ ๋๋ฒ์งธ ๋ฐํ๊ฐ์ผ๋ก cancelํจ์ ๋ฐํํ๊ธฐ ๋๋ฌธ์
- ์์ ์์์ ์ ์ํ๋ฉด ์ธ์ ๋ ์ง ์์ ์ทจ์ ๊ฐ๋ฅ
func main() {
wg.Add(1)
// 5์ด ํ ์ปจํ
์คํธ์ Done()์ฑ๋์ ์๊ทธ๋์ ๋ณด๋ด ์์
์ข
๋ฃ ์์ฒญ
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
go PrintEverySecond(ctx)
wg.Wait()
}
Setting a Specific Value in Context
- ์์ ์์๊ฒ ์ง์ํ ๋ ๋ณ๋์ ์ง์์ฌํญ ์ถ๊ฐ ๊ฐ๋ฅ
- ์ปจํ ์คํธ์ ํน์ ํค๋ก ๊ฐ์ ์ฝ์ด์ค๋๋ก ์ค์ ๊ฐ๋ฅ
- context.WithValue()๋ก ์ปจํ ์คํธ์ ๊ฐ ์ค์ ๊ฐ๋ฅ
package main
import (
"fmt"
"sync"
"context"
)
var wg sync.WaitGroup
func square(ctx context.Context) {
// ์ปจํ
์คํธ์์ ๊ฐ์ ์ฝ๊ธฐ
// Value๋ ๋น ์ธํฐํ์ด์ค ํ์
์ด๋ฏ๋ก (int)๋ก ๋ณํํ์ฌ n์ ํ ๋น
if v:= ctx.Value("number"); v != nil {
n := v.(int)
fmt.Printf("Square:%d\n", n*n)
}
wg.Done()
}
func main() {
wg.Add(1)
// "number"๋ฅผ ํค๋ก ๊ฐ์ 9๋ก ์ค์ ํ ์ปจํ
์คํธ๋ฅผ ๋ง๋ฆ
// square์ ์ธ์๋ก ๋๊ฒจ์ ๊ฐ์ ์ฌ์ฉํ ์ ์๋๋ก ํจ
ctx := context.WithValue(context.Background(), "number", 9)
go square(ctx)
wg.Wait()
}
Creating a Context with Both Cancellation and Value
- ์ปจํ ์คํธ๋ฅผ ๋ง๋ค๋ ํญ์ ์์ ์ปจํ ์คํธ ๊ฐ์ฒด๋ฅผ ์ธ์๋ก ๋ฃ์ด์ค์ผ ํ์
- ์ผ๋ฐ์ ์ผ๋ก context.Background()๋ฅผ ๋ฃ์ด์คฌ๋๋ฐ, ์ฌ๊ธฐ์ ์ด๋ฏธ ๋ง๋ค์ด์ง ์ปจํ ์คํธ ๊ฐ์ฒด ๋ฃ์ด๋ ๋จ
- ์ด๋ฅผํตํด ์ฌ๋ฌ ๊ฐ์ ์ค์ ํ๊ฑฐ๋, ๊ธฐ๋ฅ์ ์ค์ ํ ์ ์์
- ๊ตฌ๊ธ: "golang concurrency patterns"
ctx, cancel := context.WithCancel(context.Background())
ctx = context.WithValue(ctx, "number", 9)
ctx = context.WithValue(ctx, "keyword", "Lilly")