Перейти к основному содержимому

Главная задача всех собеседований Golang

· 13 мин. чтения

Сегодня мы разберём одно из самых популярных заданий на собеседованиях по Go — задачу на многопоточность, в которой нужно отправить тысячи запросов на разные сайты и собрать ответы, используя паттерны worker pool и fan-out. Автор на собственном примере показывает, как легко ошибиться даже в знакомой задаче — забыть закрыть канал, неправильно расставить WaitGroup или упустить нюансы синхронизации — и почему эту задачу стоит прорешивать десятки раз перед каждым собеседованием.

Вопрос 1. Как реализовать параллельную обработку большого количества URL-адресов с использованием многопоточности в Go (паттерн Worker Pool с Fan-out и сбором результатов)?.

Таймкод: 00:00:00

Ответ собеседника: Правильный. Для решения задачи используется паттерн Worker Pool с Fan-out. Создаются три горутины-воркера, которые читают URL из входного канала. Для синхронизации используется WaitGroup: при запуске воркера добавляется единица, при завершении — Done. Входной канал закрывается сразу после записи всех URL. Для сбора результатов создаётся отдельный канал collect. Воркеры имитируют работу через time.Sleep(2 * time.Second), после чего записывают обработанный URL в канал collect. Закрытие канала collect происходит в отдельной горутине после завершения WaitGroup (wg.Wait()), что предотвращает deadlock и panic при записи в закрытый канал. Результаты собираются через цикл for range из канала collect.

Правильный ответ:

Ответ собеседника полностью корректен и демонстрирует глубокое понимание паттерна Worker Pool. Рассмотрим реализацию более детально с примерами кода и важными нюансами.

1. Базовая реализация Worker Pool

package main

import (
"fmt"
"sync"
"time"
)

// Job представляет задачу для обработки
type Job struct {
ID int
URL string
}

// Result представляет результат обработки
type Result struct {
Job Job
Status string
Error error
}

// worker функция, которая обрабатывает задачи
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()

for job := range jobs {
fmt.Printf("Worker %d processing job %d: %s\n", id, job.ID, job.URL)

// Имитация работы (HTTP запрос, парсинг и т.д.)
time.Sleep(2 * time.Second)

results <- Result{
Job: job,
Status: "completed",
Error: nil,
}
}
}

func main() {
const numWorkers = 3
const numJobs = 10

jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)

var wg sync.WaitGroup

// Запуск воркеров (Fan-out)
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}

// Отправка задач
go func() {
for j := 1; j <= numJobs; j++ {
jobs <- Job{
ID: j,
URL: fmt.Sprintf("https://example.com/page/%d", j),
}
}
close(jobs)
}()

// Закрытие канала результатов после завершения всех воркеров
go func() {
wg.Wait()
close(results)
}()

// Сбор результатов (Fan-in)
for result := range results {
fmt.Printf("Result: Job %d - %s\n", result.Job.ID, result.Status)
}
}

2. Ключевые принципы реализации

Fan-out (распределение работы): Несколько горутин-воркеров читают из одного канала задач. Go runtime автоматически распределяет задачи между воркерами.

Fan-in (сбор результатов): Все воркеры пишут результаты в один канал, из которого основная горутина собирает результаты.

Синхронизация: WaitGroup используется для отслеживания завершения всех воркеров. Закрытие канала результатов происходит только после завершения всех воркеров, что предотвращает panic.

3. Продвинутая реализация с контекстом и обработкой ошибок

package main

import (
"context"
"fmt"
"net/http"
"sync"
"time"
)

type Job struct {
ID int
URL string
}

type Result struct {
Job Job
StatusCode int
BodySize int
Duration time.Duration
Error error
}

func worker(ctx context.Context, id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()

client := &http.Client{
Timeout: 10 * time.Second,
}

for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d: shutting down\n", id)
return
case job, ok := <-jobs:
if !ok {
return
}

start := time.Now()
result := processURL(client, job)
result.Duration = time.Since(start)

select {
case results <- result:
case <-ctx.Done():
return
}
}
}
}

func processURL(client *http.Client, job Job) Result {
resp, err := client.Get(job.URL)
if err != nil {
return Result{Job: job, Error: err}
}
defer resp.Body.Close()

// Подсчёт размера тела ответа
buf := make([]byte, 8192)
size := 0
for {
n, err := resp.Body.Read(buf)
size += n
if err != nil {
break
}
}

return Result{
Job: job,
StatusCode: resp.StatusCode,
BodySize: size,
}
}

func main() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

urls := []string{
"https://google.com",
"https://github.com",
"https://stackoverflow.com",
// ... много других URL
}

const numWorkers = 5

jobs := make(chan Job, len(urls))
results := make(chan Result, len(urls))

var wg sync.WaitGroup

// Запуск воркеров
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(ctx, w, jobs, results, &wg)
}

// Отправка задач
go func() {
for i, url := range urls {
select {
case jobs <- Job{ID: i + 1, URL: url}:
case <-ctx.Done():
break
}
}
close(jobs)
}()

// Закрытие канала результатов
go func() {
wg.Wait()
close(results)
}()

// Сбор результатов
var successCount, errorCount int
for result := range results {
if result.Error != nil {
errorCount++
fmt.Printf("Error: Job %d (%s) - %v\n", result.Job.ID, result.Job.URL, result.Error)
} else {
successCount++
fmt.Printf("Success: Job %d (%s) - Status: %d, Size: %d bytes, Duration: %v\n",
result.Job.ID, result.Job.URL, result.StatusCode, result.BodySize, result.Duration)
}
}

fmt.Printf("\nTotal: %d success, %d errors\n", successCount, errorCount)
}

4. Важные паттерны и лучшие практики

Graceful shutdown: Использование context.Context позволяет корректно завершить воркеры при отмене или таймауте.

Безопасное закрытие каналов: Только отправитель должен закрывать канал. Канал задач закрывается после отправки всех задач. Канал результатов закрывается после завершения всех воркеров.

Буферизированные каналы: Использование буферизированных каналов улучшает производительность, позволяя отправителям не блокироваться.

Ограничение параллелизма: Количество воркеров должно быть ограничено для предотвращения исчерпания ресурсов (файловых дескрипторов, памяти, соединений).

5. Оптимизация производительности

// Динамическое управление количеством воркеров
func adaptiveWorkerPool(jobs <-chan Job, results chan<- Result, maxWorkers int) {
var wg sync.WaitGroup
semaphore := make(chan struct{}, maxWorkers)

for job := range jobs {
semaphore <- struct{}{} // Получаем слот
wg.Add(1)

go func(j Job) {
defer wg.Done()
defer func() { <-semaphore }() // Освобождаем слот

result := processJob(j)
results <- result
}(job)
}

wg.Wait()
}

6. Паттерн Pipeline для сложной обработки

// Многоступенчатая обработка
func pipeline(urls []string) {
// Этап 1: Генерация URL
gen := func(urls ...string) <-chan string {
out := make(chan string)
go func() {
for _, u := range urls {
out <- u
}
close(out)
}()
return out
}

// Этап 2: Загрузка (Fan-out)
fetch := func(in <-chan string) <-chan Result {
out := make(chan Result)
var wg sync.WaitGroup

const workers = 5
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for url := range in {
result := download(url)
out <- result
}
}()
}

go func() {
wg.Wait()
close(out)
}()

return out
}

// Этап 3: Обработка (Fan-out)
process := func(in <-chan Result) <-chan Result {
out := make(chan Result)
var wg sync.WaitGroup

const workers = 3
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for result := range in {
processed := transform(result)
out <- processed
}
}()
}

go func() {
wg.Wait()
close(out)
}()

return out
}

// Сборка pipeline
results := process(fetch(gen(urls...)))

for result := range results {
handleResult(result)
}
}

Итог: Паттерн Worker Pool с Fan-out/Fan-in является одним из наиболее эффективных подходов для параллельной обработки в Go. Ключевые моменты — правильная синхронизация через WaitGroup, безопасное закрытие каналов, использование контекста для управления временем жизни горутин и ограничение параллелизма для предотвращения исчерпания ресурсов.

Вопрос 2. Почему необходимо закрывать канал сразу после записи в него всех данных и что произойдёт, если этого не сделать?.

Таймкод: 00:07:30

Ответ собеседника: Правильный. Если не закрыть канал после записи всех данных, возникнет deadlock — горутины будут бесконечно ждать новые данные, что приведёт к панике 'all goroutines are asleep — deadlock'. Правило в Go: канал нужно закрывать сразу после записи всех данных в том же месте, где происходит запись. Если несколько горутин пишут в один канал, закрытие должно происходить после завершения всех писателей, например, через WaitGroup в отдельной горутине.

Правильный ответ:

Ответ собеседника полностью корректен. Рассмотрим тему более глубоко с детальными примерами.

1. Механика закрытия канала

Закрытие канала — это сигнал для получателей, что больше данных не будет. Когда получатель читает из закрытого канала, он получает zero value типа и флаг ok равный false:

ch := make(chan int, 3)
ch <- 1
ch <- 2
close(ch)

// Чтение из закрытого канала
for {
val, ok := <-ch
if !ok {
fmt.Println("Канал закрыт, данных больше нет")
break
}
fmt.Println("Получено:", val)
}

// Или более идиоматично:
for val := range ch {
fmt.Println("Получено:", val)
}

2. Что происходит без закрытия канала

Сценарий 1: Deadlock при чтении

func deadlockExample() {
ch := make(chan int)

go func() {
ch <- 1
ch <- 2
// Забыли close(ch)
}()

// Горутина заблокируется навсегда, ожидая третьего значения
for val := range ch {
fmt.Println(val)
}
// fatal error: all goroutines are asleep - deadlock!
}

Сценарий 2: Утечка горутин (goroutine leak)

func goroutineLeak() {
ch := make(chan int)

go func() {
// Эта горутина будет висеть вечно
for val := range ch {
process(val)
}
}()

ch <- 1
ch <- 2
// close(ch) — забыто!
// Горутина никогда не завершится, произойдёт утечка
}

3. Правила закрытия каналов

Правило 1: Закрывает только отправитель

// НЕПРАВИЛЬНО — получатель не должен закрывать канал
func badReceiver() {
ch := make(chan int, 1)
ch <- 1
close(ch) // panic: close of closed channel (если канал уже закрыт)
}

// ПРАВИЛЬНО
func goodSender() {
ch := make(chan int, 1)
ch <- 1
close(ch) // Отправитель закрывает канал
}

Правило 2: Закрывайте канал только один раз

func doubleClose() {
ch := make(chan int)
close(ch)
close(ch) // panic: close of closed channel
}

Правило 3: Не пишите в закрытый канал

func writeToClosed() {
ch := make(chan int)
close(ch)
ch <- 1 // panic: send on closed channel
}

4. Паттерн закрытия с несколькими отправителями

Когда несколько горутин пишут в один канал, нужно дождаться завершения всех:

func multipleSenders() {
ch := make(chan int)
var wg sync.WaitGroup

// Запускаем 5 отправителей
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 3; j++ {
ch <- id*10 + j
}
}(i)
}

// Закрываем канал после завершения всех отправителей
go func() {
wg.Wait()
close(ch)
}()

// Читаем все данные
for val := range ch {
fmt.Println(val)
}
}

5. Паттерн для канала результатов в Worker Pool

func workerPoolWithResults() {
jobs := make(chan Job, 100)
results := make(chan Result, 100)
var wg sync.WaitGroup

// Запуск воркеров
for w := 0; w < 5; w++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for job := range jobs {
result := process(job)
results <- result
}
}(w)
}

// Отправка задач
go func() {
for _, job := range allJobs {
jobs <- job
}
close(jobs) // Закрываем канал задач — воркеры завершатся
}()

// Закрытие канала результатов после завершения воркеров
go func() {
wg.Wait()
close(results) // Сигнализируем что результатов больше не будет
}()

// Сбор результатов
for result := range results {
handleResult(result)
}
}

6. Использование context вместо закрытия канала

Иногда вместо закрытия канала удобнее использовать контекст для сигнализации об отмене:

func contextCancellation(ctx context.Context) {
ch := make(chan int)

go func() {
defer close(ch)
for i := 0; ; i++ {
select {
case <-ctx.Done():
fmt.Println("Получен сигнал отмены")
return
case ch <- i:
}
}
}()

for val := range ch {
if val >= 10 {
cancel() // Отменяем контекст
break
}
fmt.Println(val)
}
}

7. Проверка состояния канала

func checkChannelState(ch chan int) {
// Чтение с проверкой — безопасный способ
val, ok := <-ch
if !ok {
fmt.Println("Канал закрыт")
return
}
fmt.Println("Получено:", val)

// Использование select для неблокирующего чтения
select {
case val, ok := <-ch:
if !ok {
fmt.Println("Канал закрыт")
} else {
fmt.Println("Получено:", val)
}
default:
fmt.Println("Канал пуст, но не закрыт")
}
}

Итог: Закрытие канала — это не просто рекомендация, а необходимость для корректной работы горутин. Незакрытый канал приводит к deadlock или утечке горутин. Ключевое правило: отправитель закрывает канал сразу после отправки всех данных. При множественных отправителях используйте WaitGroup для синхронизации закрытия. Закрытие канала — это сигнал для получателей о завершении передачи данных.

Вопрос 3. Почему нельзя закрывать канал непосредственно в воркерах и как правильно организовать закрытие канала результатов?.

Таймкод: 00:13:08

Ответ собеседника: Правильный. Нельзя закрывать канал в воркерах, потому что несколько горутин конкурируют за закрытие одного канала, и одна из них может закрыть его раньше, чем другие закончат запись, что вызовет panic при попытке записи в закрытый канал. Правильный способ — запустить отдельную горутину, которая ждёт завершения WaitGroup (wg.Wait()), а затем закрывает канал. Это гарантирует, что канал закроется только после того, как все воркеры завершат запись.

Правильный ответ:

Ответ собеседника полностью верен. Разберём проблему детальнее с примерами ошибок и правильных решений.

1. Проблема закрытия канала в воркерах

Ошибка 1: Преждевременное закрытие

func badCloseInWorker() {
results := make(chan int)
var wg sync.WaitGroup

for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

result := id * 10
results <- result

// НЕПРАВИЛЬНО: каждый воркер пытается закрыть канал
// Первый завершившийся закроет канал, остальные получат panic
close(results) // ❌ panic: close of closed channel

}(i)
}

go func() {
wg.Wait()
// К этому моменту канал уже закрыт одним из воркеров
}()

for r := range results {
fmt.Println(r)
}
}

Ошибка 2: Запись в закрытый канал

func writeToClosedChannel() {
results := make(chan int)
var wg sync.WaitGroup
var once sync.Once

for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

// Имитация разного времени выполнения
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)

// Попытка записи
results <- id // ❌ Может быть panic, если канал уже закрыт

}(i)
}

// Закрытие через sync.Once — тоже не решает проблему
go func() {
wg.Wait()
once.Do(func() { close(results) })
}()
}

2. Правильные паттерны закрытия

Паттерн 1: Отдельная горутина с WaitGroup (рекомендуемый)

func correctCloseWithWaitGroup() {
results := make(chan int, 100)
var wg sync.WaitGroup

// Запуск воркеров
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

result := process(id)
results <- result // Безопасная запись

}(i)
}

// ПРАВИЛЬНО: отдельная горутина закрывает канал после завершения всех воркеров
go func() {
wg.Wait()
close(results) // ✅ Закрытие только после завершения всех отправителей
}()

// Чтение результатов
for result := range results {
fmt.Println("Result:", result)
}
}

Паттерн 2: Использование sync.Once для гарантии однократного закрытия

func closeWithSyncOnce() {
results := make(chan int, 100)
var wg sync.WaitGroup
var closeOnce sync.Once

for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

result := process(id)
results <- result

}(i)
}

// sync.Once гарантирует, что close будет вызван только один раз
go func() {
wg.Wait()
closeOnce.Do(func() {
close(results) // ✅ Гарантированно однократное закрытие
})
}()

for result := range results {
fmt.Println("Result:", result)
}
}

3. Продвинутый паттерн: функция-обёртка

// Result представляет результат обработки
type Result struct {
Value int
Error error
}

// ProcessWithPool обрабатывает задачи с использованием пула воркеров
func ProcessWithPool(items []int, numWorkers int) <-chan Result {
results := make(chan Result, len(items))
var wg sync.WaitGroup

// Канал для распределения задач
jobs := make(chan int, len(items))

// Запуск воркеров
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for item := range jobs {
result, err := heavyProcessing(item)
results <- Result{Value: result, Error: err}
}
}()
}

// Отправка задач
go func() {
for _, item := range items {
jobs <- item
}
close(jobs)
}()

// Закрытие результатов после завершения воркеров
go func() {
wg.Wait()
close(results)
}()

return results
}

// Использование
func main() {
items := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

results := ProcessWithPool(items, 3)

for result := range results {
if result.Error != nil {
fmt.Println("Error:", result.Error)
} else {
fmt.Println("Result:", result.Value)
}
}
}

4. Паттерн с контекстом для graceful shutdown

func processWithContext(ctx context.Context, items []int, numWorkers int) <-chan Result {
results := make(chan Result, len(items))
var wg sync.WaitGroup

jobs := make(chan int, len(items))

// Запуск воркеров
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()

for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d: shutting down\n", workerID)
return
case item, ok := <-jobs:
if !ok {
return
}
result, err := heavyProcessing(item)
select {
case results <- Result{Value: result, Error: err}:
case <-ctx.Done():
return
}
}
}
}(w)
}

// Отправка задач
go func() {
defer close(jobs)
for _, item := range items {
select {
case jobs <- item:
case <-ctx.Done():
return
}
}
}()

// Закрытие результатов
go func() {
wg.Wait()
close(results)
}()

return results
}

5. Визуализация проблемы

Время ──────────────────────────────────────────────►

Воркер 1: [====работа====] запись → results wg.Done()
Воркер 2: [======работа======] запись → results wg.Done()
Воркер 3: [==работа==] запись → results wg.Done()

БЕЗ WaitGroup (ОШИБКА):
Воркер 3 завершился первым → close(results)
Воркер 1 пытается записать → ❌ panic: send on closed channel

С WaitGroup (ПРАВИЛЬНО):
Все воркеры завершились → wg.Wait() возвращает управление → close(results)

6. Антипаттерны, которых следует избегать

// Антипаттерн 1: Закрытие в первом завершившемся воркере
func antiPattern1() {
results := make(chan int)
var wg sync.WaitGroup

for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
results <- id

// НЕПРАВИЛЬНО: закрытие в воркере
if id == 0 { // Попытка закрыть только одним воркером
close(results) // ❌ Другие воркеры могут ещё писать
}
}(i)
}
}

// Антипаттерн 2: Использование счётчика вместо WaitGroup
func antiPattern2() {
results := make(chan int)
var counter int32

for i := 0; i < 5; i++ {
atomic.AddInt32(&counter, 1)
go func(id int) {
defer func() {
if atomic.AddInt32(&counter, -1) == 0 {
close(results) // ❌ Race condition возможен
}
}()
results <- id
}(i)
}
}

Итог: Закрытие канала в воркерах приводит к race condition и panic. Правильный подход — использовать отдельную горутину, которая вызывает wg.Wait() и затем close(results). Это гарантирует, что канал закроется только после завершения всех отправителей. Для дополнительной безопасности можно использовать sync.Once или контекст для graceful shutdown.