package main import ( "context" "errors" "fmt" "math/rand" "sync" "time" ) // Более оптимальное решение задачи из этого видео: // https://youtu.be/wZCLVt_5-4c // Из сигнатуры функции processData убран кастомный тип + убран один селект // (он не был прям сильно лишним, но от него можно избавиться +- безболезненно) // условие задачи: // реализовать функцию processParallel // прокинуть контекст var errTimeout = errors.New("timed out") func processData(ctx context.Context, v int) (int, error) { ch := make(chan struct{}) go func() { time.Sleep(time.Duration(rand.Intn(10)) * time.Second) close(ch) }() select { case <-ch: case <-ctx.Done(): return 0, errTimeout } return v * 2, nil } func main() { in := make(chan int) out := make(chan int) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() go func() { defer close(in) for i := range 10 { select { case in <- i + 1: case <-ctx.Done(): return } } }() start := time.Now() processParallel(ctx, in, out, 5) for v := range out { fmt.Println("v =", v) } fmt.Println("main duration:", time.Since(start)) } func processParallel(ctx context.Context, in <-chan int, out chan<- int, numWorkers int) { wg := &sync.WaitGroup{} for range numWorkers { wg.Add(1) go worker(ctx, in, out, wg) } go func() { wg.Wait() close(out) }() } func worker(ctx context.Context, in <-chan int, out chan<- int, wg *sync.WaitGroup) { defer wg.Done() for { select { case v, ok := <-in: if !ok { return } val, err := processData(ctx, v) if errors.Is(err, errTimeout) { return } //...handle other error types select { case <-ctx.Done(): return case out <- val: } case <-ctx.Done(): return } } }