1
2
3
4
5 package cache
6
7 import (
8 "bufio"
9 "cmd/go/internal/base"
10 "cmd/go/internal/cacheprog"
11 "cmd/internal/quoted"
12 "context"
13 "crypto/sha256"
14 "encoding/base64"
15 "encoding/json"
16 "errors"
17 "fmt"
18 "internal/goexperiment"
19 "io"
20 "log"
21 "os"
22 "os/exec"
23 "sync"
24 "sync/atomic"
25 "time"
26 )
27
28
29
30
31
32
33 type ProgCache struct {
34 cmd *exec.Cmd
35 stdout io.ReadCloser
36 stdin io.WriteCloser
37 bw *bufio.Writer
38 jenc *json.Encoder
39
40
41
42 can map[cacheprog.Cmd]bool
43
44
45
46
47
48
49
50 fuzzDirCache Cache
51
52 closing atomic.Bool
53 ctx context.Context
54 ctxCancel context.CancelFunc
55 readLoopDone chan struct{}
56
57 mu sync.Mutex
58 nextID int64
59 inFlight map[int64]chan<- *cacheprog.Response
60 outputFile map[OutputID]string
61
62
63
64 writeMu sync.Mutex
65 }
66
67
68
69
70
71
72 func startCacheProg(progAndArgs string, fuzzDirCache Cache) Cache {
73 if fuzzDirCache == nil {
74 panic("missing fuzzDirCache")
75 }
76 args, err := quoted.Split(progAndArgs)
77 if err != nil {
78 base.Fatalf("GOCACHEPROG args: %v", err)
79 }
80 var prog string
81 if len(args) > 0 {
82 prog = args[0]
83 args = args[1:]
84 }
85
86 ctx, ctxCancel := context.WithCancel(context.Background())
87
88 cmd := exec.CommandContext(ctx, prog, args...)
89 out, err := cmd.StdoutPipe()
90 if err != nil {
91 base.Fatalf("StdoutPipe to GOCACHEPROG: %v", err)
92 }
93 in, err := cmd.StdinPipe()
94 if err != nil {
95 base.Fatalf("StdinPipe to GOCACHEPROG: %v", err)
96 }
97 cmd.Stderr = os.Stderr
98
99
100 cmd.Cancel = in.Close
101
102 if err := cmd.Start(); err != nil {
103 base.Fatalf("error starting GOCACHEPROG program %q: %v", prog, err)
104 }
105
106 pc := &ProgCache{
107 ctx: ctx,
108 ctxCancel: ctxCancel,
109 fuzzDirCache: fuzzDirCache,
110 cmd: cmd,
111 stdout: out,
112 stdin: in,
113 bw: bufio.NewWriter(in),
114 inFlight: make(map[int64]chan<- *cacheprog.Response),
115 outputFile: make(map[OutputID]string),
116 readLoopDone: make(chan struct{}),
117 }
118
119
120
121 capResc := make(chan *cacheprog.Response, 1)
122 pc.inFlight[0] = capResc
123
124 pc.jenc = json.NewEncoder(pc.bw)
125 go pc.readLoop(pc.readLoopDone)
126
127
128
129 timer := time.NewTicker(5 * time.Second)
130 defer timer.Stop()
131 for {
132 select {
133 case <-timer.C:
134 log.Printf("# still waiting for GOCACHEPROG %v ...", prog)
135 case capRes := <-capResc:
136 can := map[cacheprog.Cmd]bool{}
137 for _, cmd := range capRes.KnownCommands {
138 can[cmd] = true
139 }
140 if len(can) == 0 {
141 base.Fatalf("GOCACHEPROG %v declared no supported commands", prog)
142 }
143 pc.can = can
144 return pc
145 }
146 }
147 }
148
149 func (c *ProgCache) readLoop(readLoopDone chan<- struct{}) {
150 defer close(readLoopDone)
151 jd := json.NewDecoder(c.stdout)
152 for {
153 res := new(cacheprog.Response)
154 if err := jd.Decode(res); err != nil {
155 if c.closing.Load() {
156 c.mu.Lock()
157 for _, ch := range c.inFlight {
158 close(ch)
159 }
160 c.inFlight = nil
161 c.mu.Unlock()
162 return
163 }
164 if err == io.EOF {
165 c.mu.Lock()
166 inFlight := len(c.inFlight)
167 c.mu.Unlock()
168 base.Fatalf("GOCACHEPROG exited pre-Close with %v pending requests", inFlight)
169 }
170 base.Fatalf("error reading JSON from GOCACHEPROG: %v", err)
171 }
172 c.mu.Lock()
173 ch, ok := c.inFlight[res.ID]
174 delete(c.inFlight, res.ID)
175 c.mu.Unlock()
176 if ok {
177 ch <- res
178 } else {
179 base.Fatalf("GOCACHEPROG sent response for unknown request ID %v", res.ID)
180 }
181 }
182 }
183
184 var errCacheprogClosed = errors.New("GOCACHEPROG program closed unexpectedly")
185
186 func (c *ProgCache) send(ctx context.Context, req *cacheprog.Request) (*cacheprog.Response, error) {
187 resc := make(chan *cacheprog.Response, 1)
188 if err := c.writeToChild(req, resc); err != nil {
189 return nil, err
190 }
191 select {
192 case res := <-resc:
193 if res == nil {
194 return nil, errCacheprogClosed
195 }
196 if res.Err != "" {
197 return nil, errors.New(res.Err)
198 }
199 return res, nil
200 case <-ctx.Done():
201 return nil, ctx.Err()
202 }
203 }
204
205 func (c *ProgCache) writeToChild(req *cacheprog.Request, resc chan<- *cacheprog.Response) (err error) {
206 c.mu.Lock()
207 if c.inFlight == nil {
208 return errCacheprogClosed
209 }
210 c.nextID++
211 req.ID = c.nextID
212 c.inFlight[req.ID] = resc
213 c.mu.Unlock()
214
215 defer func() {
216 if err != nil {
217 c.mu.Lock()
218 if c.inFlight != nil {
219 delete(c.inFlight, req.ID)
220 }
221 c.mu.Unlock()
222 }
223 }()
224
225 c.writeMu.Lock()
226 defer c.writeMu.Unlock()
227
228 if err := c.jenc.Encode(req); err != nil {
229 return err
230 }
231 if err := c.bw.WriteByte('\n'); err != nil {
232 return err
233 }
234 if req.Body != nil && req.BodySize > 0 {
235 if err := c.bw.WriteByte('"'); err != nil {
236 return err
237 }
238 e := base64.NewEncoder(base64.StdEncoding, c.bw)
239 wrote, err := io.Copy(e, req.Body)
240 if err != nil {
241 return err
242 }
243 if err := e.Close(); err != nil {
244 return nil
245 }
246 if wrote != req.BodySize {
247 return fmt.Errorf("short write writing body to GOCACHEPROG for action %x, output %x: wrote %v; expected %v",
248 req.ActionID, req.OutputID, wrote, req.BodySize)
249 }
250 if _, err := c.bw.WriteString("\"\n"); err != nil {
251 return err
252 }
253 }
254 if err := c.bw.Flush(); err != nil {
255 return err
256 }
257 return nil
258 }
259
260 func (c *ProgCache) Get(a ActionID) (Entry, error) {
261 if !c.can[cacheprog.CmdGet] {
262
263
264
265
266
267
268
269 return Entry{}, &entryNotFoundError{}
270 }
271 res, err := c.send(c.ctx, &cacheprog.Request{
272 Command: cacheprog.CmdGet,
273 ActionID: a[:],
274 })
275 if err != nil {
276 return Entry{}, err
277 }
278 if res.Miss {
279 return Entry{}, &entryNotFoundError{}
280 }
281 e := Entry{
282 Size: res.Size,
283 }
284 if res.Time != nil {
285 e.Time = *res.Time
286 } else {
287 e.Time = time.Now()
288 }
289 if res.DiskPath == "" {
290 return Entry{}, &entryNotFoundError{errors.New("GOCACHEPROG didn't populate DiskPath on get hit")}
291 }
292 if copy(e.OutputID[:], res.OutputID) != len(res.OutputID) {
293 return Entry{}, &entryNotFoundError{errors.New("incomplete ProgResponse OutputID")}
294 }
295 c.noteOutputFile(e.OutputID, res.DiskPath)
296 return e, nil
297 }
298
299 func (c *ProgCache) noteOutputFile(o OutputID, diskPath string) {
300 c.mu.Lock()
301 defer c.mu.Unlock()
302 c.outputFile[o] = diskPath
303 }
304
305 func (c *ProgCache) OutputFile(o OutputID) string {
306 c.mu.Lock()
307 defer c.mu.Unlock()
308 return c.outputFile[o]
309 }
310
311 func (c *ProgCache) Put(a ActionID, file io.ReadSeeker) (_ OutputID, size int64, _ error) {
312
313 h := sha256.New()
314 if _, err := file.Seek(0, 0); err != nil {
315 return OutputID{}, 0, err
316 }
317 size, err := io.Copy(h, file)
318 if err != nil {
319 return OutputID{}, 0, err
320 }
321 var out OutputID
322 h.Sum(out[:0])
323
324 if _, err := file.Seek(0, 0); err != nil {
325 return OutputID{}, 0, err
326 }
327
328 if !c.can[cacheprog.CmdPut] {
329
330 return out, size, nil
331 }
332
333
334
335 var deprecatedValue []byte
336 if goexperiment.CacheProg {
337 deprecatedValue = out[:]
338 }
339
340 res, err := c.send(c.ctx, &cacheprog.Request{
341 Command: cacheprog.CmdPut,
342 ActionID: a[:],
343 OutputID: out[:],
344 ObjectID: deprecatedValue,
345 Body: file,
346 BodySize: size,
347 })
348 if err != nil {
349 return OutputID{}, 0, err
350 }
351 if res.DiskPath == "" {
352 return OutputID{}, 0, errors.New("GOCACHEPROG didn't return DiskPath in put response")
353 }
354 c.noteOutputFile(out, res.DiskPath)
355 return out, size, err
356 }
357
358 func (c *ProgCache) Close() error {
359 c.closing.Store(true)
360 var err error
361
362
363
364
365 if c.can[cacheprog.CmdClose] {
366 _, err = c.send(c.ctx, &cacheprog.Request{Command: cacheprog.CmdClose})
367 if errors.Is(err, errCacheprogClosed) {
368
369 err = nil
370 }
371 }
372
373 c.ctxCancel()
374
375 <-c.readLoopDone
376 return err
377 }
378
379 func (c *ProgCache) FuzzDir() string {
380
381
382 return c.fuzzDirCache.FuzzDir()
383 }
384
View as plain text