Source file src/cmd/vendor/golang.org/x/sync/errgroup/errgroup.go

     1  // Copyright 2016 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Package errgroup provides synchronization, error propagation, and Context
     6  // cancelation for groups of goroutines working on subtasks of a common task.
     7  //
     8  // [errgroup.Group] is related to [sync.WaitGroup] but adds handling of tasks
     9  // returning errors.
    10  package errgroup
    11  
    12  import (
    13  	"context"
    14  	"fmt"
    15  	"runtime"
    16  	"runtime/debug"
    17  	"sync"
    18  )
    19  
    20  type token struct{}
    21  
    22  // A Group is a collection of goroutines working on subtasks that are part of
    23  // the same overall task. A Group should not be reused for different tasks.
    24  //
    25  // A zero Group is valid, has no limit on the number of active goroutines,
    26  // and does not cancel on error.
    27  type Group struct {
    28  	cancel func(error)
    29  
    30  	wg sync.WaitGroup
    31  
    32  	sem chan token
    33  
    34  	errOnce sync.Once
    35  	err     error
    36  
    37  	mu         sync.Mutex
    38  	panicValue any  // = PanicError | PanicValue; non-nil if some Group.Go goroutine panicked.
    39  	abnormal   bool // some Group.Go goroutine terminated abnormally (panic or goexit).
    40  }
    41  
    42  func (g *Group) done() {
    43  	if g.sem != nil {
    44  		<-g.sem
    45  	}
    46  	g.wg.Done()
    47  }
    48  
    49  // WithContext returns a new Group and an associated Context derived from ctx.
    50  //
    51  // The derived Context is canceled the first time a function passed to Go
    52  // returns a non-nil error or the first time Wait returns, whichever occurs
    53  // first.
    54  func WithContext(ctx context.Context) (*Group, context.Context) {
    55  	ctx, cancel := context.WithCancelCause(ctx)
    56  	return &Group{cancel: cancel}, ctx
    57  }
    58  
    59  // Wait blocks until all function calls from the Go method have returned
    60  // normally, then returns the first non-nil error (if any) from them.
    61  //
    62  // If any of the calls panics, Wait panics with a [PanicValue];
    63  // and if any of them calls [runtime.Goexit], Wait calls runtime.Goexit.
    64  func (g *Group) Wait() error {
    65  	g.wg.Wait()
    66  	if g.cancel != nil {
    67  		g.cancel(g.err)
    68  	}
    69  	if g.panicValue != nil {
    70  		panic(g.panicValue)
    71  	}
    72  	if g.abnormal {
    73  		runtime.Goexit()
    74  	}
    75  	return g.err
    76  }
    77  
    78  // Go calls the given function in a new goroutine.
    79  //
    80  // The first call to Go must happen before a Wait.
    81  // It blocks until the new goroutine can be added without the number of
    82  // goroutines in the group exceeding the configured limit.
    83  //
    84  // The first goroutine in the group that returns a non-nil error, panics, or
    85  // invokes [runtime.Goexit] will cancel the associated Context, if any.
    86  func (g *Group) Go(f func() error) {
    87  	if g.sem != nil {
    88  		g.sem <- token{}
    89  	}
    90  
    91  	g.add(f)
    92  }
    93  
    94  func (g *Group) add(f func() error) {
    95  	g.wg.Add(1)
    96  	go func() {
    97  		defer g.done()
    98  		normalReturn := false
    99  		defer func() {
   100  			if normalReturn {
   101  				return
   102  			}
   103  			v := recover()
   104  			g.mu.Lock()
   105  			defer g.mu.Unlock()
   106  			if !g.abnormal {
   107  				if g.cancel != nil {
   108  					g.cancel(g.err)
   109  				}
   110  				g.abnormal = true
   111  			}
   112  			if v != nil && g.panicValue == nil {
   113  				switch v := v.(type) {
   114  				case error:
   115  					g.panicValue = PanicError{
   116  						Recovered: v,
   117  						Stack:     debug.Stack(),
   118  					}
   119  				default:
   120  					g.panicValue = PanicValue{
   121  						Recovered: v,
   122  						Stack:     debug.Stack(),
   123  					}
   124  				}
   125  			}
   126  		}()
   127  
   128  		err := f()
   129  		normalReturn = true
   130  		if err != nil {
   131  			g.errOnce.Do(func() {
   132  				g.err = err
   133  				if g.cancel != nil {
   134  					g.cancel(g.err)
   135  				}
   136  			})
   137  		}
   138  	}()
   139  }
   140  
   141  // TryGo calls the given function in a new goroutine only if the number of
   142  // active goroutines in the group is currently below the configured limit.
   143  //
   144  // The return value reports whether the goroutine was started.
   145  func (g *Group) TryGo(f func() error) bool {
   146  	if g.sem != nil {
   147  		select {
   148  		case g.sem <- token{}:
   149  			// Note: this allows barging iff channels in general allow barging.
   150  		default:
   151  			return false
   152  		}
   153  	}
   154  
   155  	g.add(f)
   156  	return true
   157  }
   158  
   159  // SetLimit limits the number of active goroutines in this group to at most n.
   160  // A negative value indicates no limit.
   161  // A limit of zero will prevent any new goroutines from being added.
   162  //
   163  // Any subsequent call to the Go method will block until it can add an active
   164  // goroutine without exceeding the configured limit.
   165  //
   166  // The limit must not be modified while any goroutines in the group are active.
   167  func (g *Group) SetLimit(n int) {
   168  	if n < 0 {
   169  		g.sem = nil
   170  		return
   171  	}
   172  	if len(g.sem) != 0 {
   173  		panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
   174  	}
   175  	g.sem = make(chan token, n)
   176  }
   177  
   178  // PanicError wraps an error recovered from an unhandled panic
   179  // when calling a function passed to Go or TryGo.
   180  type PanicError struct {
   181  	Recovered error
   182  	Stack     []byte // result of call to [debug.Stack]
   183  }
   184  
   185  func (p PanicError) Error() string {
   186  	if len(p.Stack) > 0 {
   187  		return fmt.Sprintf("recovered from errgroup.Group: %v\n%s", p.Recovered, p.Stack)
   188  	}
   189  	return fmt.Sprintf("recovered from errgroup.Group: %v", p.Recovered)
   190  }
   191  
   192  func (p PanicError) Unwrap() error { return p.Recovered }
   193  
   194  // PanicValue wraps a value that does not implement the error interface,
   195  // recovered from an unhandled panic when calling a function passed to Go or
   196  // TryGo.
   197  type PanicValue struct {
   198  	Recovered any
   199  	Stack     []byte // result of call to [debug.Stack]
   200  }
   201  
   202  func (p PanicValue) String() string {
   203  	if len(p.Stack) > 0 {
   204  		return fmt.Sprintf("recovered from errgroup.Group: %v\n%s", p.Recovered, p.Stack)
   205  	}
   206  	return fmt.Sprintf("recovered from errgroup.Group: %v", p.Recovered)
   207  }
   208  

View as plain text