Source file src/cmd/go/internal/cache/prog.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package cache
     6  
     7  import (
     8  	"bufio"
     9  	"cmd/go/internal/base"
    10  	"cmd/go/internal/cacheprog"
    11  	"cmd/internal/quoted"
    12  	"context"
    13  	"crypto/sha256"
    14  	"encoding/base64"
    15  	"encoding/json"
    16  	"errors"
    17  	"fmt"
    18  	"internal/goexperiment"
    19  	"io"
    20  	"log"
    21  	"os"
    22  	"os/exec"
    23  	"sync"
    24  	"sync/atomic"
    25  	"time"
    26  )
    27  
    28  // ProgCache implements Cache via JSON messages over stdin/stdout to a child
    29  // helper process which can then implement whatever caching policy/mechanism it
    30  // wants.
    31  //
    32  // See https://github.com/golang/go/issues/59719
    33  type ProgCache struct {
    34  	cmd    *exec.Cmd
    35  	stdout io.ReadCloser  // from the child process
    36  	stdin  io.WriteCloser // to the child process
    37  	bw     *bufio.Writer  // to stdin
    38  	jenc   *json.Encoder  // to bw
    39  
    40  	// can are the commands that the child process declared that it supports.
    41  	// This is effectively the versioning mechanism.
    42  	can map[cacheprog.Cmd]bool
    43  
    44  	// fuzzDirCache is another Cache implementation to use for the FuzzDir
    45  	// method. In practice this is the default GOCACHE disk-based
    46  	// implementation.
    47  	//
    48  	// TODO(bradfitz): maybe this isn't ideal. But we'd need to extend the Cache
    49  	// interface and the fuzzing callers to be less disk-y to do more here.
    50  	fuzzDirCache Cache
    51  
    52  	closing      atomic.Bool
    53  	ctx          context.Context    // valid until Close via ctxClose
    54  	ctxCancel    context.CancelFunc // called on Close
    55  	readLoopDone chan struct{}      // closed when readLoop returns
    56  
    57  	mu         sync.Mutex // guards following fields
    58  	nextID     int64
    59  	inFlight   map[int64]chan<- *cacheprog.Response
    60  	outputFile map[OutputID]string // object => abs path on disk
    61  
    62  	// writeMu serializes writing to the child process.
    63  	// It must never be held at the same time as mu.
    64  	writeMu sync.Mutex
    65  }
    66  
    67  // startCacheProg starts the prog binary (with optional space-separated flags)
    68  // and returns a Cache implementation that talks to it.
    69  //
    70  // It blocks a few seconds to wait for the child process to successfully start
    71  // and advertise its capabilities.
    72  func startCacheProg(progAndArgs string, fuzzDirCache Cache) Cache {
    73  	if fuzzDirCache == nil {
    74  		panic("missing fuzzDirCache")
    75  	}
    76  	args, err := quoted.Split(progAndArgs)
    77  	if err != nil {
    78  		base.Fatalf("GOCACHEPROG args: %v", err)
    79  	}
    80  	var prog string
    81  	if len(args) > 0 {
    82  		prog = args[0]
    83  		args = args[1:]
    84  	}
    85  
    86  	ctx, ctxCancel := context.WithCancel(context.Background())
    87  
    88  	cmd := exec.CommandContext(ctx, prog, args...)
    89  	out, err := cmd.StdoutPipe()
    90  	if err != nil {
    91  		base.Fatalf("StdoutPipe to GOCACHEPROG: %v", err)
    92  	}
    93  	in, err := cmd.StdinPipe()
    94  	if err != nil {
    95  		base.Fatalf("StdinPipe to GOCACHEPROG: %v", err)
    96  	}
    97  	cmd.Stderr = os.Stderr
    98  	// On close, we cancel the context. Rather than killing the helper,
    99  	// close its stdin.
   100  	cmd.Cancel = in.Close
   101  
   102  	if err := cmd.Start(); err != nil {
   103  		base.Fatalf("error starting GOCACHEPROG program %q: %v", prog, err)
   104  	}
   105  
   106  	pc := &ProgCache{
   107  		ctx:          ctx,
   108  		ctxCancel:    ctxCancel,
   109  		fuzzDirCache: fuzzDirCache,
   110  		cmd:          cmd,
   111  		stdout:       out,
   112  		stdin:        in,
   113  		bw:           bufio.NewWriter(in),
   114  		inFlight:     make(map[int64]chan<- *cacheprog.Response),
   115  		outputFile:   make(map[OutputID]string),
   116  		readLoopDone: make(chan struct{}),
   117  	}
   118  
   119  	// Register our interest in the initial protocol message from the child to
   120  	// us, saying what it can do.
   121  	capResc := make(chan *cacheprog.Response, 1)
   122  	pc.inFlight[0] = capResc
   123  
   124  	pc.jenc = json.NewEncoder(pc.bw)
   125  	go pc.readLoop(pc.readLoopDone)
   126  
   127  	// Give the child process a few seconds to report its capabilities. This
   128  	// should be instant and not require any slow work by the program.
   129  	timer := time.NewTicker(5 * time.Second)
   130  	defer timer.Stop()
   131  	for {
   132  		select {
   133  		case <-timer.C:
   134  			log.Printf("# still waiting for GOCACHEPROG %v ...", prog)
   135  		case capRes := <-capResc:
   136  			can := map[cacheprog.Cmd]bool{}
   137  			for _, cmd := range capRes.KnownCommands {
   138  				can[cmd] = true
   139  			}
   140  			if len(can) == 0 {
   141  				base.Fatalf("GOCACHEPROG %v declared no supported commands", prog)
   142  			}
   143  			pc.can = can
   144  			return pc
   145  		}
   146  	}
   147  }
   148  
   149  func (c *ProgCache) readLoop(readLoopDone chan<- struct{}) {
   150  	defer close(readLoopDone)
   151  	jd := json.NewDecoder(c.stdout)
   152  	for {
   153  		res := new(cacheprog.Response)
   154  		if err := jd.Decode(res); err != nil {
   155  			if c.closing.Load() {
   156  				c.mu.Lock()
   157  				for _, ch := range c.inFlight {
   158  					close(ch)
   159  				}
   160  				c.inFlight = nil
   161  				c.mu.Unlock()
   162  				return // quietly
   163  			}
   164  			if err == io.EOF {
   165  				c.mu.Lock()
   166  				inFlight := len(c.inFlight)
   167  				c.mu.Unlock()
   168  				base.Fatalf("GOCACHEPROG exited pre-Close with %v pending requests", inFlight)
   169  			}
   170  			base.Fatalf("error reading JSON from GOCACHEPROG: %v", err)
   171  		}
   172  		c.mu.Lock()
   173  		ch, ok := c.inFlight[res.ID]
   174  		delete(c.inFlight, res.ID)
   175  		c.mu.Unlock()
   176  		if ok {
   177  			ch <- res
   178  		} else {
   179  			base.Fatalf("GOCACHEPROG sent response for unknown request ID %v", res.ID)
   180  		}
   181  	}
   182  }
   183  
   184  var errCacheprogClosed = errors.New("GOCACHEPROG program closed unexpectedly")
   185  
   186  func (c *ProgCache) send(ctx context.Context, req *cacheprog.Request) (*cacheprog.Response, error) {
   187  	resc := make(chan *cacheprog.Response, 1)
   188  	if err := c.writeToChild(req, resc); err != nil {
   189  		return nil, err
   190  	}
   191  	select {
   192  	case res := <-resc:
   193  		if res == nil {
   194  			return nil, errCacheprogClosed
   195  		}
   196  		if res.Err != "" {
   197  			return nil, errors.New(res.Err)
   198  		}
   199  		return res, nil
   200  	case <-ctx.Done():
   201  		return nil, ctx.Err()
   202  	}
   203  }
   204  
   205  func (c *ProgCache) writeToChild(req *cacheprog.Request, resc chan<- *cacheprog.Response) (err error) {
   206  	c.mu.Lock()
   207  	if c.inFlight == nil {
   208  		return errCacheprogClosed
   209  	}
   210  	c.nextID++
   211  	req.ID = c.nextID
   212  	c.inFlight[req.ID] = resc
   213  	c.mu.Unlock()
   214  
   215  	defer func() {
   216  		if err != nil {
   217  			c.mu.Lock()
   218  			if c.inFlight != nil {
   219  				delete(c.inFlight, req.ID)
   220  			}
   221  			c.mu.Unlock()
   222  		}
   223  	}()
   224  
   225  	c.writeMu.Lock()
   226  	defer c.writeMu.Unlock()
   227  
   228  	if err := c.jenc.Encode(req); err != nil {
   229  		return err
   230  	}
   231  	if err := c.bw.WriteByte('\n'); err != nil {
   232  		return err
   233  	}
   234  	if req.Body != nil && req.BodySize > 0 {
   235  		if err := c.bw.WriteByte('"'); err != nil {
   236  			return err
   237  		}
   238  		e := base64.NewEncoder(base64.StdEncoding, c.bw)
   239  		wrote, err := io.Copy(e, req.Body)
   240  		if err != nil {
   241  			return err
   242  		}
   243  		if err := e.Close(); err != nil {
   244  			return nil
   245  		}
   246  		if wrote != req.BodySize {
   247  			return fmt.Errorf("short write writing body to GOCACHEPROG for action %x, output %x: wrote %v; expected %v",
   248  				req.ActionID, req.OutputID, wrote, req.BodySize)
   249  		}
   250  		if _, err := c.bw.WriteString("\"\n"); err != nil {
   251  			return err
   252  		}
   253  	}
   254  	if err := c.bw.Flush(); err != nil {
   255  		return err
   256  	}
   257  	return nil
   258  }
   259  
   260  func (c *ProgCache) Get(a ActionID) (Entry, error) {
   261  	if !c.can[cacheprog.CmdGet] {
   262  		// They can't do a "get". Maybe they're a write-only cache.
   263  		//
   264  		// TODO(bradfitz,bcmills): figure out the proper error type here. Maybe
   265  		// errors.ErrUnsupported? Is entryNotFoundError even appropriate? There
   266  		// might be places where we rely on the fact that a recent Put can be
   267  		// read through a corresponding Get. Audit callers and check, and document
   268  		// error types on the Cache interface.
   269  		return Entry{}, &entryNotFoundError{}
   270  	}
   271  	res, err := c.send(c.ctx, &cacheprog.Request{
   272  		Command:  cacheprog.CmdGet,
   273  		ActionID: a[:],
   274  	})
   275  	if err != nil {
   276  		return Entry{}, err // TODO(bradfitz): or entryNotFoundError? Audit callers.
   277  	}
   278  	if res.Miss {
   279  		return Entry{}, &entryNotFoundError{}
   280  	}
   281  	e := Entry{
   282  		Size: res.Size,
   283  	}
   284  	if res.Time != nil {
   285  		e.Time = *res.Time
   286  	} else {
   287  		e.Time = time.Now()
   288  	}
   289  	if res.DiskPath == "" {
   290  		return Entry{}, &entryNotFoundError{errors.New("GOCACHEPROG didn't populate DiskPath on get hit")}
   291  	}
   292  	if copy(e.OutputID[:], res.OutputID) != len(res.OutputID) {
   293  		return Entry{}, &entryNotFoundError{errors.New("incomplete ProgResponse OutputID")}
   294  	}
   295  	c.noteOutputFile(e.OutputID, res.DiskPath)
   296  	return e, nil
   297  }
   298  
   299  func (c *ProgCache) noteOutputFile(o OutputID, diskPath string) {
   300  	c.mu.Lock()
   301  	defer c.mu.Unlock()
   302  	c.outputFile[o] = diskPath
   303  }
   304  
   305  func (c *ProgCache) OutputFile(o OutputID) string {
   306  	c.mu.Lock()
   307  	defer c.mu.Unlock()
   308  	return c.outputFile[o]
   309  }
   310  
   311  func (c *ProgCache) Put(a ActionID, file io.ReadSeeker) (_ OutputID, size int64, _ error) {
   312  	// Compute output ID.
   313  	h := sha256.New()
   314  	if _, err := file.Seek(0, 0); err != nil {
   315  		return OutputID{}, 0, err
   316  	}
   317  	size, err := io.Copy(h, file)
   318  	if err != nil {
   319  		return OutputID{}, 0, err
   320  	}
   321  	var out OutputID
   322  	h.Sum(out[:0])
   323  
   324  	if _, err := file.Seek(0, 0); err != nil {
   325  		return OutputID{}, 0, err
   326  	}
   327  
   328  	if !c.can[cacheprog.CmdPut] {
   329  		// Child is a read-only cache. Do nothing.
   330  		return out, size, nil
   331  	}
   332  
   333  	// For compatibility with Go 1.23/1.24 GOEXPERIMENT=gocacheprog users, also
   334  	// populate the deprecated ObjectID field. This will be removed in Go 1.25.
   335  	var deprecatedValue []byte
   336  	if goexperiment.CacheProg {
   337  		deprecatedValue = out[:]
   338  	}
   339  
   340  	res, err := c.send(c.ctx, &cacheprog.Request{
   341  		Command:  cacheprog.CmdPut,
   342  		ActionID: a[:],
   343  		OutputID: out[:],
   344  		ObjectID: deprecatedValue, // TODO(bradfitz): remove in Go 1.25
   345  		Body:     file,
   346  		BodySize: size,
   347  	})
   348  	if err != nil {
   349  		return OutputID{}, 0, err
   350  	}
   351  	if res.DiskPath == "" {
   352  		return OutputID{}, 0, errors.New("GOCACHEPROG didn't return DiskPath in put response")
   353  	}
   354  	c.noteOutputFile(out, res.DiskPath)
   355  	return out, size, err
   356  }
   357  
   358  func (c *ProgCache) Close() error {
   359  	c.closing.Store(true)
   360  	var err error
   361  
   362  	// First write a "close" message to the child so it can exit nicely
   363  	// and clean up if it wants. Only after that exchange do we cancel
   364  	// the context that kills the process.
   365  	if c.can[cacheprog.CmdClose] {
   366  		_, err = c.send(c.ctx, &cacheprog.Request{Command: cacheprog.CmdClose})
   367  		if errors.Is(err, errCacheprogClosed) {
   368  			// Allow the child to quit without responding to close.
   369  			err = nil
   370  		}
   371  	}
   372  	// Cancel the context, which will close the helper's stdin.
   373  	c.ctxCancel()
   374  	// Wait until the helper closes its stdout.
   375  	<-c.readLoopDone
   376  	return err
   377  }
   378  
   379  func (c *ProgCache) FuzzDir() string {
   380  	// TODO(bradfitz): figure out what to do here. For now just use the
   381  	// disk-based default.
   382  	return c.fuzzDirCache.FuzzDir()
   383  }
   384  

View as plain text