Source file src/sync/waitgroup.go
1 // Copyright 2011 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package sync 6 7 import ( 8 "internal/race" 9 "internal/synctest" 10 "sync/atomic" 11 "unsafe" 12 ) 13 14 // A WaitGroup is a counting semaphore typically used to wait 15 // for a group of goroutines or tasks to finish. 16 // 17 // Typically, a main goroutine will start tasks, each in a new 18 // goroutine, by calling [WaitGroup.Go] and then wait for all tasks to 19 // complete by calling [WaitGroup.Wait]. For example: 20 // 21 // var wg sync.WaitGroup 22 // wg.Go(task1) 23 // wg.Go(task2) 24 // wg.Wait() 25 // 26 // A WaitGroup may also be used for tracking tasks without using Go to 27 // start new goroutines by using [WaitGroup.Add] and [WaitGroup.Done]. 28 // 29 // The previous example can be rewritten using explicitly created 30 // goroutines along with Add and Done: 31 // 32 // var wg sync.WaitGroup 33 // wg.Add(1) 34 // go func() { 35 // defer wg.Done() 36 // task1() 37 // }() 38 // wg.Add(1) 39 // go func() { 40 // defer wg.Done() 41 // task2() 42 // }() 43 // wg.Wait() 44 // 45 // This pattern is common in code that predates [WaitGroup.Go]. 46 // 47 // A WaitGroup must not be copied after first use. 48 type WaitGroup struct { 49 noCopy noCopy 50 51 // Bits (high to low): 52 // bits[0:32] counter 53 // bits[32] flag: synctest bubble membership 54 // bits[33:64] wait count 55 state atomic.Uint64 56 sema uint32 57 } 58 59 // waitGroupBubbleFlag indicates that a WaitGroup is associated with a synctest bubble. 60 const waitGroupBubbleFlag = 0x8000_0000 61 62 // Add adds delta, which may be negative, to the [WaitGroup] task counter. 63 // If the counter becomes zero, all goroutines blocked on [WaitGroup.Wait] are released. 64 // If the counter goes negative, Add panics. 65 // 66 // Callers should prefer [WaitGroup.Go]. 67 // 68 // Note that calls with a positive delta that occur when the counter is zero 69 // must happen before a Wait. Calls with a negative delta, or calls with a 70 // positive delta that start when the counter is greater than zero, may happen 71 // at any time. 72 // Typically this means the calls to Add should execute before the statement 73 // creating the goroutine or other event to be waited for. 74 // If a WaitGroup is reused to wait for several independent sets of events, 75 // new Add calls must happen after all previous Wait calls have returned. 76 // See the WaitGroup example. 77 func (wg *WaitGroup) Add(delta int) { 78 if race.Enabled { 79 if delta < 0 { 80 // Synchronize decrements with Wait. 81 race.ReleaseMerge(unsafe.Pointer(wg)) 82 } 83 race.Disable() 84 defer race.Enable() 85 } 86 bubbled := false 87 if synctest.IsInBubble() { 88 // If Add is called from within a bubble, then all Add calls must be made 89 // from the same bubble. 90 switch synctest.Associate(wg) { 91 case synctest.Unbubbled: 92 case synctest.OtherBubble: 93 // wg is already associated with a different bubble. 94 fatal("sync: WaitGroup.Add called from multiple synctest bubbles") 95 case synctest.CurrentBubble: 96 bubbled = true 97 state := wg.state.Or(waitGroupBubbleFlag) 98 if state != 0 && state&waitGroupBubbleFlag == 0 { 99 // Add has been called from outside this bubble. 100 fatal("sync: WaitGroup.Add called from inside and outside synctest bubble") 101 } 102 } 103 } 104 state := wg.state.Add(uint64(delta) << 32) 105 if state&waitGroupBubbleFlag != 0 && !bubbled { 106 // Add has been called from within a synctest bubble (and we aren't in one). 107 fatal("sync: WaitGroup.Add called from inside and outside synctest bubble") 108 } 109 v := int32(state >> 32) 110 w := uint32(state & 0x7fffffff) 111 if race.Enabled && delta > 0 && v == int32(delta) { 112 // The first increment must be synchronized with Wait. 113 // Need to model this as a read, because there can be 114 // several concurrent wg.counter transitions from 0. 115 race.Read(unsafe.Pointer(&wg.sema)) 116 } 117 if v < 0 { 118 panic("sync: negative WaitGroup counter") 119 } 120 if w != 0 && delta > 0 && v == int32(delta) { 121 panic("sync: WaitGroup misuse: Add called concurrently with Wait") 122 } 123 if v > 0 || w == 0 { 124 return 125 } 126 // This goroutine has set counter to 0 when waiters > 0. 127 // Now there can't be concurrent mutations of state: 128 // - Adds must not happen concurrently with Wait, 129 // - Wait does not increment waiters if it sees counter == 0. 130 // Still do a cheap sanity check to detect WaitGroup misuse. 131 if wg.state.Load() != state { 132 panic("sync: WaitGroup misuse: Add called concurrently with Wait") 133 } 134 // Reset waiters count to 0. 135 wg.state.Store(0) 136 if bubbled { 137 // Adds must not happen concurrently with wait when counter is 0, 138 // so we can safely disassociate wg from its current bubble. 139 synctest.Disassociate(wg) 140 } 141 for ; w != 0; w-- { 142 runtime_Semrelease(&wg.sema, false, 0) 143 } 144 } 145 146 // Done decrements the [WaitGroup] task counter by one. 147 // It is equivalent to Add(-1). 148 // 149 // Callers should prefer [WaitGroup.Go]. 150 // 151 // In the terminology of [the Go memory model], a call to Done 152 // "synchronizes before" the return of any Wait call that it unblocks. 153 // 154 // [the Go memory model]: https://go.dev/ref/mem 155 func (wg *WaitGroup) Done() { 156 wg.Add(-1) 157 } 158 159 // Wait blocks until the [WaitGroup] task counter is zero. 160 func (wg *WaitGroup) Wait() { 161 if race.Enabled { 162 race.Disable() 163 } 164 for { 165 state := wg.state.Load() 166 v := int32(state >> 32) 167 w := uint32(state & 0x7fffffff) 168 if v == 0 { 169 // Counter is 0, no need to wait. 170 if race.Enabled { 171 race.Enable() 172 race.Acquire(unsafe.Pointer(wg)) 173 } 174 if w == 0 && state&waitGroupBubbleFlag != 0 && synctest.IsAssociated(wg) { 175 // Adds must not happen concurrently with wait when counter is 0, 176 // so we can disassociate wg from its current bubble. 177 if wg.state.CompareAndSwap(state, 0) { 178 synctest.Disassociate(wg) 179 } 180 } 181 return 182 } 183 // Increment waiters count. 184 if wg.state.CompareAndSwap(state, state+1) { 185 if race.Enabled && w == 0 { 186 // Wait must be synchronized with the first Add. 187 // Need to model this is as a write to race with the read in Add. 188 // As a consequence, can do the write only for the first waiter, 189 // otherwise concurrent Waits will race with each other. 190 race.Write(unsafe.Pointer(&wg.sema)) 191 } 192 synctestDurable := false 193 if state&waitGroupBubbleFlag != 0 && synctest.IsInBubble() { 194 if race.Enabled { 195 race.Enable() 196 } 197 if synctest.IsAssociated(wg) { 198 // Add was called within the current bubble, 199 // so this Wait is durably blocking. 200 synctestDurable = true 201 } 202 if race.Enabled { 203 race.Disable() 204 } 205 } 206 runtime_SemacquireWaitGroup(&wg.sema, synctestDurable) 207 if wg.state.Load() != 0 { 208 panic("sync: WaitGroup is reused before previous Wait has returned") 209 } 210 if race.Enabled { 211 race.Enable() 212 race.Acquire(unsafe.Pointer(wg)) 213 } 214 return 215 } 216 } 217 } 218 219 // Go calls f in a new goroutine and adds that task to the [WaitGroup]. 220 // When f returns, the task is removed from the WaitGroup. 221 // 222 // The function f must not panic. 223 // 224 // If the WaitGroup is empty, Go must happen before a [WaitGroup.Wait]. 225 // Typically, this simply means Go is called to start tasks before Wait is called. 226 // If the WaitGroup is not empty, Go may happen at any time. 227 // This means a goroutine started by Go may itself call Go. 228 // If a WaitGroup is reused to wait for several independent sets of tasks, 229 // new Go calls must happen after all previous Wait calls have returned. 230 // 231 // In the terminology of [the Go memory model], the return from f 232 // "synchronizes before" the return of any Wait call that it unblocks. 233 // 234 // [the Go memory model]: https://go.dev/ref/mem 235 func (wg *WaitGroup) Go(f func()) { 236 wg.Add(1) 237 go func() { 238 defer wg.Done() 239 f() 240 }() 241 } 242