1
2
3
4
5 package trace
6
7 import (
8 "bufio"
9 "bytes"
10 "cmp"
11 "encoding/binary"
12 "errors"
13 "fmt"
14 "io"
15 "slices"
16 "strings"
17 "time"
18
19 "internal/trace/tracev2"
20 "internal/trace/version"
21 )
22
23
24
25
26
27 type generation struct {
28 gen uint64
29 batches map[ThreadID][]batch
30 batchMs []ThreadID
31 cpuSamples []cpuSample
32 minTs timestamp
33 *evTable
34 }
35
36
37
38 func readGeneration(r *bufio.Reader, ver version.Version) (*generation, error) {
39 if ver < version.Go126 {
40 return nil, errors.New("internal error: readGeneration called for <1.26 trace")
41 }
42 g := &generation{
43 evTable: &evTable{
44 pcs: make(map[uint64]frame),
45 },
46 batches: make(map[ThreadID][]batch),
47 }
48
49
50 for {
51 b, gen, err := readBatch(r)
52 if err == io.EOF {
53 if len(g.batches) != 0 {
54 return nil, errors.New("incomplete generation found; trace likely truncated")
55 }
56 return nil, nil
57 }
58 if err != nil {
59 return nil, err
60 }
61 if g.gen == 0 {
62
63 g.gen = gen
64 }
65 if b.isEndOfGeneration() {
66 break
67 }
68 if gen == 0 {
69
70 return nil, fmt.Errorf("invalid generation number %d", gen)
71 }
72 if gen != g.gen {
73 return nil, fmt.Errorf("broken trace: missing end-of-generation event, or generations are interleaved")
74 }
75 if g.minTs == 0 || b.time < g.minTs {
76 g.minTs = b.time
77 }
78 if err := processBatch(g, b, ver); err != nil {
79 return nil, err
80 }
81 }
82
83
84 if g.freq == 0 {
85 return nil, fmt.Errorf("no frequency event found")
86 }
87 if !g.hasClockSnapshot {
88 return nil, fmt.Errorf("no clock snapshot event found")
89 }
90
91
92
93
94
95
96
97
98 g.stacks.compactify()
99 g.strings.compactify()
100
101
102 if err := validateStackStrings(&g.stacks, &g.strings, g.pcs); err != nil {
103 return nil, err
104 }
105
106
107 fixUpCPUSamples(g.cpuSamples, g.freq)
108 return g, nil
109 }
110
111
112
113
114
115
116 type spilledBatch struct {
117 gen uint64
118 *batch
119 }
120
121
122
123
124
125
126
127
128 func readGenerationWithSpill(r *bufio.Reader, spill *spilledBatch, ver version.Version) (*generation, *spilledBatch, error) {
129 if ver >= version.Go126 {
130 return nil, nil, errors.New("internal error: readGenerationWithSpill called for Go 1.26+ trace")
131 }
132 g := &generation{
133 evTable: &evTable{
134 pcs: make(map[uint64]frame),
135 },
136 batches: make(map[ThreadID][]batch),
137 }
138
139 if spill != nil {
140
141 g.gen = spill.gen
142 g.minTs = spill.batch.time
143 if err := processBatch(g, *spill.batch, ver); err != nil {
144 return nil, nil, err
145 }
146 spill = nil
147 }
148
149 var spillErr error
150 for {
151 b, gen, err := readBatch(r)
152 if err == io.EOF {
153 break
154 }
155 if err != nil {
156 if g.gen != 0 {
157
158
159
160 spillErr = err
161 break
162 }
163 return nil, nil, err
164 }
165 if gen == 0 {
166
167 return nil, nil, fmt.Errorf("invalid generation number %d", gen)
168 }
169 if g.gen == 0 {
170
171 g.gen = gen
172 }
173 if gen == g.gen+1 {
174
175 spill = &spilledBatch{gen: gen, batch: &b}
176 break
177 }
178 if gen != g.gen {
179
180
181
182
183
184
185
186 return nil, nil, fmt.Errorf("generations out of order")
187 }
188 if g.minTs == 0 || b.time < g.minTs {
189 g.minTs = b.time
190 }
191 if err := processBatch(g, b, ver); err != nil {
192 return nil, nil, err
193 }
194 }
195
196
197 if g.freq == 0 {
198 return nil, nil, fmt.Errorf("no frequency event found")
199 }
200 if ver >= version.Go125 && !g.hasClockSnapshot {
201 return nil, nil, fmt.Errorf("no clock snapshot event found")
202 }
203
204
205
206
207
208
209
210
211 g.stacks.compactify()
212 g.strings.compactify()
213
214
215 if err := validateStackStrings(&g.stacks, &g.strings, g.pcs); err != nil {
216 return nil, nil, err
217 }
218
219
220 fixUpCPUSamples(g.cpuSamples, g.freq)
221 return g, spill, spillErr
222 }
223
224
225 func processBatch(g *generation, b batch, ver version.Version) error {
226 switch {
227 case b.isStringsBatch():
228 if err := addStrings(&g.strings, b); err != nil {
229 return err
230 }
231 case b.isStacksBatch():
232 if err := addStacks(&g.stacks, g.pcs, b); err != nil {
233 return err
234 }
235 case b.isCPUSamplesBatch():
236 samples, err := addCPUSamples(g.cpuSamples, b)
237 if err != nil {
238 return err
239 }
240 g.cpuSamples = samples
241 case b.isSyncBatch(ver):
242 if err := setSyncBatch(&g.sync, b, ver); err != nil {
243 return err
244 }
245 case b.exp != tracev2.NoExperiment:
246 if g.expBatches == nil {
247 g.expBatches = make(map[tracev2.Experiment][]ExperimentalBatch)
248 }
249 if err := addExperimentalBatch(g.expBatches, b); err != nil {
250 return err
251 }
252 case b.isEndOfGeneration():
253 return errors.New("internal error: unexpectedly processing EndOfGeneration; broken trace?")
254 default:
255 if _, ok := g.batches[b.m]; !ok {
256 g.batchMs = append(g.batchMs, b.m)
257 }
258 g.batches[b.m] = append(g.batches[b.m], b)
259 }
260 return nil
261 }
262
263
264
265 func validateStackStrings(
266 stacks *dataTable[stackID, stack],
267 strings *dataTable[stringID, string],
268 frames map[uint64]frame,
269 ) error {
270 var err error
271 stacks.forEach(func(id stackID, stk stack) bool {
272 for _, pc := range stk.pcs {
273 frame, ok := frames[pc]
274 if !ok {
275 err = fmt.Errorf("found unknown pc %x for stack %d", pc, id)
276 return false
277 }
278 _, ok = strings.get(frame.funcID)
279 if !ok {
280 err = fmt.Errorf("found invalid func string ID %d for stack %d", frame.funcID, id)
281 return false
282 }
283 _, ok = strings.get(frame.fileID)
284 if !ok {
285 err = fmt.Errorf("found invalid file string ID %d for stack %d", frame.fileID, id)
286 return false
287 }
288 }
289 return true
290 })
291 return err
292 }
293
294
295
296
297 func addStrings(stringTable *dataTable[stringID, string], b batch) error {
298 if !b.isStringsBatch() {
299 return fmt.Errorf("internal error: addStrings called on non-string batch")
300 }
301 r := bytes.NewReader(b.data)
302 hdr, err := r.ReadByte()
303 if err != nil || tracev2.EventType(hdr) != tracev2.EvStrings {
304 return fmt.Errorf("missing strings batch header")
305 }
306
307 var sb strings.Builder
308 for r.Len() != 0 {
309
310 ev, err := r.ReadByte()
311 if err != nil {
312 return err
313 }
314 if tracev2.EventType(ev) != tracev2.EvString {
315 return fmt.Errorf("expected string event, got %d", ev)
316 }
317
318
319 id, err := binary.ReadUvarint(r)
320 if err != nil {
321 return err
322 }
323
324
325 len, err := binary.ReadUvarint(r)
326 if err != nil {
327 return err
328 }
329 if len > tracev2.MaxEventTrailerDataSize {
330 return fmt.Errorf("invalid string size %d, maximum is %d", len, tracev2.MaxEventTrailerDataSize)
331 }
332
333
334 n, err := io.CopyN(&sb, r, int64(len))
335 if n != int64(len) {
336 return fmt.Errorf("failed to read full string: read %d but wanted %d", n, len)
337 }
338 if err != nil {
339 return fmt.Errorf("copying string data: %w", err)
340 }
341
342
343 s := sb.String()
344 sb.Reset()
345 if err := stringTable.insert(stringID(id), s); err != nil {
346 return err
347 }
348 }
349 return nil
350 }
351
352
353
354
355 func addStacks(stackTable *dataTable[stackID, stack], pcs map[uint64]frame, b batch) error {
356 if !b.isStacksBatch() {
357 return fmt.Errorf("internal error: addStacks called on non-stacks batch")
358 }
359 r := bytes.NewReader(b.data)
360 hdr, err := r.ReadByte()
361 if err != nil || tracev2.EventType(hdr) != tracev2.EvStacks {
362 return fmt.Errorf("missing stacks batch header")
363 }
364
365 for r.Len() != 0 {
366
367 ev, err := r.ReadByte()
368 if err != nil {
369 return err
370 }
371 if tracev2.EventType(ev) != tracev2.EvStack {
372 return fmt.Errorf("expected stack event, got %d", ev)
373 }
374
375
376 id, err := binary.ReadUvarint(r)
377 if err != nil {
378 return err
379 }
380
381
382 nFrames, err := binary.ReadUvarint(r)
383 if err != nil {
384 return err
385 }
386 if nFrames > tracev2.MaxFramesPerStack {
387 return fmt.Errorf("invalid stack size %d, maximum is %d", nFrames, tracev2.MaxFramesPerStack)
388 }
389
390
391 frames := make([]uint64, 0, nFrames)
392 for i := uint64(0); i < nFrames; i++ {
393
394 pc, err := binary.ReadUvarint(r)
395 if err != nil {
396 return fmt.Errorf("reading frame %d's PC for stack %d: %w", i+1, id, err)
397 }
398 funcID, err := binary.ReadUvarint(r)
399 if err != nil {
400 return fmt.Errorf("reading frame %d's funcID for stack %d: %w", i+1, id, err)
401 }
402 fileID, err := binary.ReadUvarint(r)
403 if err != nil {
404 return fmt.Errorf("reading frame %d's fileID for stack %d: %w", i+1, id, err)
405 }
406 line, err := binary.ReadUvarint(r)
407 if err != nil {
408 return fmt.Errorf("reading frame %d's line for stack %d: %w", i+1, id, err)
409 }
410 frames = append(frames, pc)
411
412 if _, ok := pcs[pc]; !ok {
413 pcs[pc] = frame{
414 pc: pc,
415 funcID: stringID(funcID),
416 fileID: stringID(fileID),
417 line: line,
418 }
419 }
420 }
421
422
423 if err := stackTable.insert(stackID(id), stack{pcs: frames}); err != nil {
424 return err
425 }
426 }
427 return nil
428 }
429
430
431
432
433 func addCPUSamples(samples []cpuSample, b batch) ([]cpuSample, error) {
434 if !b.isCPUSamplesBatch() {
435 return nil, fmt.Errorf("internal error: addCPUSamples called on non-CPU-sample batch")
436 }
437 r := bytes.NewReader(b.data)
438 hdr, err := r.ReadByte()
439 if err != nil || tracev2.EventType(hdr) != tracev2.EvCPUSamples {
440 return nil, fmt.Errorf("missing CPU samples batch header")
441 }
442
443 for r.Len() != 0 {
444
445 ev, err := r.ReadByte()
446 if err != nil {
447 return nil, err
448 }
449 if tracev2.EventType(ev) != tracev2.EvCPUSample {
450 return nil, fmt.Errorf("expected CPU sample event, got %d", ev)
451 }
452
453
454 ts, err := binary.ReadUvarint(r)
455 if err != nil {
456 return nil, err
457 }
458
459
460 m, err := binary.ReadUvarint(r)
461 if err != nil {
462 return nil, err
463 }
464 mid := ThreadID(m)
465
466
467 p, err := binary.ReadUvarint(r)
468 if err != nil {
469 return nil, err
470 }
471 pid := ProcID(p)
472
473
474 g, err := binary.ReadUvarint(r)
475 if err != nil {
476 return nil, err
477 }
478 goid := GoID(g)
479 if g == 0 {
480 goid = NoGoroutine
481 }
482
483
484 s, err := binary.ReadUvarint(r)
485 if err != nil {
486 return nil, err
487 }
488
489
490 samples = append(samples, cpuSample{
491 schedCtx: schedCtx{
492 M: mid,
493 P: pid,
494 G: goid,
495 },
496 time: Time(ts),
497 stack: stackID(s),
498 })
499 }
500 return samples, nil
501 }
502
503
504 type sync struct {
505 freq frequency
506 hasClockSnapshot bool
507 snapTime timestamp
508 snapMono uint64
509 snapWall time.Time
510 }
511
512 func setSyncBatch(s *sync, b batch, ver version.Version) error {
513 if !b.isSyncBatch(ver) {
514 return fmt.Errorf("internal error: setSyncBatch called on non-sync batch")
515 }
516 r := bytes.NewReader(b.data)
517 if ver >= version.Go125 {
518 hdr, err := r.ReadByte()
519 if err != nil || tracev2.EventType(hdr) != tracev2.EvSync {
520 return fmt.Errorf("missing sync batch header")
521 }
522 }
523
524 lastTs := b.time
525 for r.Len() != 0 {
526
527 ev, err := r.ReadByte()
528 if err != nil {
529 return err
530 }
531 et := tracev2.EventType(ev)
532 switch {
533 case et == tracev2.EvFrequency:
534 if s.freq != 0 {
535 return fmt.Errorf("found multiple frequency events")
536 }
537
538 f, err := binary.ReadUvarint(r)
539 if err != nil {
540 return err
541 }
542
543 s.freq = frequency(1.0 / (float64(f) / 1e9))
544 case et == tracev2.EvClockSnapshot && ver >= version.Go125:
545 if s.hasClockSnapshot {
546 return fmt.Errorf("found multiple clock snapshot events")
547 }
548 s.hasClockSnapshot = true
549
550 tdiff, err := binary.ReadUvarint(r)
551 if err != nil {
552 return err
553 }
554 lastTs += timestamp(tdiff)
555 s.snapTime = lastTs
556 mono, err := binary.ReadUvarint(r)
557 if err != nil {
558 return err
559 }
560 s.snapMono = mono
561 sec, err := binary.ReadUvarint(r)
562 if err != nil {
563 return err
564 }
565 nsec, err := binary.ReadUvarint(r)
566 if err != nil {
567 return err
568 }
569
570
571
572 s.snapWall = time.Unix(int64(sec), int64(nsec))
573 default:
574 return fmt.Errorf("expected frequency or clock snapshot event, got %d", ev)
575 }
576 }
577 return nil
578 }
579
580
581
582 func addExperimentalBatch(expBatches map[tracev2.Experiment][]ExperimentalBatch, b batch) error {
583 if b.exp == tracev2.NoExperiment {
584 return fmt.Errorf("internal error: addExperimentalBatch called on non-experimental batch")
585 }
586 expBatches[b.exp] = append(expBatches[b.exp], ExperimentalBatch{
587 Thread: b.m,
588 Data: b.data,
589 })
590 return nil
591 }
592
593 func fixUpCPUSamples(samples []cpuSample, freq frequency) {
594
595 for i := range samples {
596 s := &samples[i]
597 s.time = freq.mul(timestamp(s.time))
598 }
599
600 slices.SortFunc(samples, func(a, b cpuSample) int {
601 return cmp.Compare(a.time, b.time)
602 })
603 }
604
View as plain text