1
2
3
4
5
6
7
8
9
10 package errgroup
11
12 import (
13 "context"
14 "fmt"
15 "runtime"
16 "runtime/debug"
17 "sync"
18 )
19
20 type token struct{}
21
22
23
24
25
26
27 type Group struct {
28 cancel func(error)
29
30 wg sync.WaitGroup
31
32 sem chan token
33
34 errOnce sync.Once
35 err error
36
37 mu sync.Mutex
38 panicValue any
39 abnormal bool
40 }
41
42 func (g *Group) done() {
43 if g.sem != nil {
44 <-g.sem
45 }
46 g.wg.Done()
47 }
48
49
50
51
52
53
54 func WithContext(ctx context.Context) (*Group, context.Context) {
55 ctx, cancel := context.WithCancelCause(ctx)
56 return &Group{cancel: cancel}, ctx
57 }
58
59
60
61
62
63
64 func (g *Group) Wait() error {
65 g.wg.Wait()
66 if g.cancel != nil {
67 g.cancel(g.err)
68 }
69 if g.panicValue != nil {
70 panic(g.panicValue)
71 }
72 if g.abnormal {
73 runtime.Goexit()
74 }
75 return g.err
76 }
77
78
79
80
81
82
83
84
85
86 func (g *Group) Go(f func() error) {
87 if g.sem != nil {
88 g.sem <- token{}
89 }
90
91 g.add(f)
92 }
93
94 func (g *Group) add(f func() error) {
95 g.wg.Add(1)
96 go func() {
97 defer g.done()
98 normalReturn := false
99 defer func() {
100 if normalReturn {
101 return
102 }
103 v := recover()
104 g.mu.Lock()
105 defer g.mu.Unlock()
106 if !g.abnormal {
107 if g.cancel != nil {
108 g.cancel(g.err)
109 }
110 g.abnormal = true
111 }
112 if v != nil && g.panicValue == nil {
113 switch v := v.(type) {
114 case error:
115 g.panicValue = PanicError{
116 Recovered: v,
117 Stack: debug.Stack(),
118 }
119 default:
120 g.panicValue = PanicValue{
121 Recovered: v,
122 Stack: debug.Stack(),
123 }
124 }
125 }
126 }()
127
128 err := f()
129 normalReturn = true
130 if err != nil {
131 g.errOnce.Do(func() {
132 g.err = err
133 if g.cancel != nil {
134 g.cancel(g.err)
135 }
136 })
137 }
138 }()
139 }
140
141
142
143
144
145 func (g *Group) TryGo(f func() error) bool {
146 if g.sem != nil {
147 select {
148 case g.sem <- token{}:
149
150 default:
151 return false
152 }
153 }
154
155 g.add(f)
156 return true
157 }
158
159
160
161
162
163
164
165
166
167 func (g *Group) SetLimit(n int) {
168 if n < 0 {
169 g.sem = nil
170 return
171 }
172 if len(g.sem) != 0 {
173 panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
174 }
175 g.sem = make(chan token, n)
176 }
177
178
179
180 type PanicError struct {
181 Recovered error
182 Stack []byte
183 }
184
185 func (p PanicError) Error() string {
186 if len(p.Stack) > 0 {
187 return fmt.Sprintf("recovered from errgroup.Group: %v\n%s", p.Recovered, p.Stack)
188 }
189 return fmt.Sprintf("recovered from errgroup.Group: %v", p.Recovered)
190 }
191
192 func (p PanicError) Unwrap() error { return p.Recovered }
193
194
195
196
197 type PanicValue struct {
198 Recovered any
199 Stack []byte
200 }
201
202 func (p PanicValue) String() string {
203 if len(p.Stack) > 0 {
204 return fmt.Sprintf("recovered from errgroup.Group: %v\n%s", p.Recovered, p.Stack)
205 }
206 return fmt.Sprintf("recovered from errgroup.Group: %v", p.Recovered)
207 }
208
View as plain text