1
2
3
4
5 package trace
6
7 import (
8 "bufio"
9 "bytes"
10 "cmp"
11 "encoding/binary"
12 "fmt"
13 "io"
14 "slices"
15 "strings"
16
17 "internal/trace/tracev2"
18 )
19
20
21
22
23
24 type generation struct {
25 gen uint64
26 batches map[ThreadID][]batch
27 batchMs []ThreadID
28 cpuSamples []cpuSample
29 minTs timestamp
30 *evTable
31 }
32
33
34
35
36 type spilledBatch struct {
37 gen uint64
38 *batch
39 }
40
41
42
43
44
45
46
47
48 func readGeneration(r *bufio.Reader, spill *spilledBatch) (*generation, *spilledBatch, error) {
49 g := &generation{
50 evTable: &evTable{
51 pcs: make(map[uint64]frame),
52 },
53 batches: make(map[ThreadID][]batch),
54 }
55
56 if spill != nil {
57 g.gen = spill.gen
58 if err := processBatch(g, *spill.batch); err != nil {
59 return nil, nil, err
60 }
61 spill = nil
62 }
63
64
65 var spillErr error
66 for {
67 b, gen, err := readBatch(r)
68 if err == io.EOF {
69 break
70 }
71 if err != nil {
72 if g.gen != 0 {
73
74
75
76 spillErr = err
77 break
78 }
79 return nil, nil, err
80 }
81 if gen == 0 {
82
83 return nil, nil, fmt.Errorf("invalid generation number %d", gen)
84 }
85 if g.gen == 0 {
86
87 g.gen = gen
88 }
89 if gen == g.gen+1 {
90 spill = &spilledBatch{gen: gen, batch: &b}
91 break
92 }
93 if gen != g.gen {
94
95
96
97
98
99
100
101 return nil, nil, fmt.Errorf("generations out of order")
102 }
103 if g.minTs == 0 || b.time < g.minTs {
104 g.minTs = b.time
105 }
106 if err := processBatch(g, b); err != nil {
107 return nil, nil, err
108 }
109 }
110
111
112 if g.freq == 0 {
113 return nil, nil, fmt.Errorf("no frequency event found")
114 }
115
116
117
118
119
120
121
122 g.stacks.compactify()
123 g.strings.compactify()
124
125
126 if err := validateStackStrings(&g.stacks, &g.strings, g.pcs); err != nil {
127 return nil, nil, err
128 }
129
130
131 for i := range g.cpuSamples {
132 s := &g.cpuSamples[i]
133 s.time = g.freq.mul(timestamp(s.time))
134 }
135
136 slices.SortFunc(g.cpuSamples, func(a, b cpuSample) int {
137 return cmp.Compare(a.time, b.time)
138 })
139 return g, spill, spillErr
140 }
141
142
143 func processBatch(g *generation, b batch) error {
144 switch {
145 case b.isStringsBatch():
146 if err := addStrings(&g.strings, b); err != nil {
147 return err
148 }
149 case b.isStacksBatch():
150 if err := addStacks(&g.stacks, g.pcs, b); err != nil {
151 return err
152 }
153 case b.isCPUSamplesBatch():
154 samples, err := addCPUSamples(g.cpuSamples, b)
155 if err != nil {
156 return err
157 }
158 g.cpuSamples = samples
159 case b.isFreqBatch():
160 freq, err := parseFreq(b)
161 if err != nil {
162 return err
163 }
164 if g.freq != 0 {
165 return fmt.Errorf("found multiple frequency events")
166 }
167 g.freq = freq
168 case b.exp != tracev2.NoExperiment:
169 if g.expBatches == nil {
170 g.expBatches = make(map[tracev2.Experiment][]ExperimentalBatch)
171 }
172 if err := addExperimentalBatch(g.expBatches, b); err != nil {
173 return err
174 }
175 default:
176 if _, ok := g.batches[b.m]; !ok {
177 g.batchMs = append(g.batchMs, b.m)
178 }
179 g.batches[b.m] = append(g.batches[b.m], b)
180 }
181 return nil
182 }
183
184
185
186 func validateStackStrings(
187 stacks *dataTable[stackID, stack],
188 strings *dataTable[stringID, string],
189 frames map[uint64]frame,
190 ) error {
191 var err error
192 stacks.forEach(func(id stackID, stk stack) bool {
193 for _, pc := range stk.pcs {
194 frame, ok := frames[pc]
195 if !ok {
196 err = fmt.Errorf("found unknown pc %x for stack %d", pc, id)
197 return false
198 }
199 _, ok = strings.get(frame.funcID)
200 if !ok {
201 err = fmt.Errorf("found invalid func string ID %d for stack %d", frame.funcID, id)
202 return false
203 }
204 _, ok = strings.get(frame.fileID)
205 if !ok {
206 err = fmt.Errorf("found invalid file string ID %d for stack %d", frame.fileID, id)
207 return false
208 }
209 }
210 return true
211 })
212 return err
213 }
214
215
216
217
218 func addStrings(stringTable *dataTable[stringID, string], b batch) error {
219 if !b.isStringsBatch() {
220 return fmt.Errorf("internal error: addStrings called on non-string batch")
221 }
222 r := bytes.NewReader(b.data)
223 hdr, err := r.ReadByte()
224 if err != nil || tracev2.EventType(hdr) != tracev2.EvStrings {
225 return fmt.Errorf("missing strings batch header")
226 }
227
228 var sb strings.Builder
229 for r.Len() != 0 {
230
231 ev, err := r.ReadByte()
232 if err != nil {
233 return err
234 }
235 if tracev2.EventType(ev) != tracev2.EvString {
236 return fmt.Errorf("expected string event, got %d", ev)
237 }
238
239
240 id, err := binary.ReadUvarint(r)
241 if err != nil {
242 return err
243 }
244
245
246 len, err := binary.ReadUvarint(r)
247 if err != nil {
248 return err
249 }
250 if len > tracev2.MaxEventTrailerDataSize {
251 return fmt.Errorf("invalid string size %d, maximum is %d", len, tracev2.MaxEventTrailerDataSize)
252 }
253
254
255 n, err := io.CopyN(&sb, r, int64(len))
256 if n != int64(len) {
257 return fmt.Errorf("failed to read full string: read %d but wanted %d", n, len)
258 }
259 if err != nil {
260 return fmt.Errorf("copying string data: %w", err)
261 }
262
263
264 s := sb.String()
265 sb.Reset()
266 if err := stringTable.insert(stringID(id), s); err != nil {
267 return err
268 }
269 }
270 return nil
271 }
272
273
274
275
276 func addStacks(stackTable *dataTable[stackID, stack], pcs map[uint64]frame, b batch) error {
277 if !b.isStacksBatch() {
278 return fmt.Errorf("internal error: addStacks called on non-stacks batch")
279 }
280 r := bytes.NewReader(b.data)
281 hdr, err := r.ReadByte()
282 if err != nil || tracev2.EventType(hdr) != tracev2.EvStacks {
283 return fmt.Errorf("missing stacks batch header")
284 }
285
286 for r.Len() != 0 {
287
288 ev, err := r.ReadByte()
289 if err != nil {
290 return err
291 }
292 if tracev2.EventType(ev) != tracev2.EvStack {
293 return fmt.Errorf("expected stack event, got %d", ev)
294 }
295
296
297 id, err := binary.ReadUvarint(r)
298 if err != nil {
299 return err
300 }
301
302
303 nFrames, err := binary.ReadUvarint(r)
304 if err != nil {
305 return err
306 }
307 if nFrames > tracev2.MaxFramesPerStack {
308 return fmt.Errorf("invalid stack size %d, maximum is %d", nFrames, tracev2.MaxFramesPerStack)
309 }
310
311
312 frames := make([]uint64, 0, nFrames)
313 for i := uint64(0); i < nFrames; i++ {
314
315 pc, err := binary.ReadUvarint(r)
316 if err != nil {
317 return fmt.Errorf("reading frame %d's PC for stack %d: %w", i+1, id, err)
318 }
319 funcID, err := binary.ReadUvarint(r)
320 if err != nil {
321 return fmt.Errorf("reading frame %d's funcID for stack %d: %w", i+1, id, err)
322 }
323 fileID, err := binary.ReadUvarint(r)
324 if err != nil {
325 return fmt.Errorf("reading frame %d's fileID for stack %d: %w", i+1, id, err)
326 }
327 line, err := binary.ReadUvarint(r)
328 if err != nil {
329 return fmt.Errorf("reading frame %d's line for stack %d: %w", i+1, id, err)
330 }
331 frames = append(frames, pc)
332
333 if _, ok := pcs[pc]; !ok {
334 pcs[pc] = frame{
335 pc: pc,
336 funcID: stringID(funcID),
337 fileID: stringID(fileID),
338 line: line,
339 }
340 }
341 }
342
343
344 if err := stackTable.insert(stackID(id), stack{pcs: frames}); err != nil {
345 return err
346 }
347 }
348 return nil
349 }
350
351
352
353
354 func addCPUSamples(samples []cpuSample, b batch) ([]cpuSample, error) {
355 if !b.isCPUSamplesBatch() {
356 return nil, fmt.Errorf("internal error: addCPUSamples called on non-CPU-sample batch")
357 }
358 r := bytes.NewReader(b.data)
359 hdr, err := r.ReadByte()
360 if err != nil || tracev2.EventType(hdr) != tracev2.EvCPUSamples {
361 return nil, fmt.Errorf("missing CPU samples batch header")
362 }
363
364 for r.Len() != 0 {
365
366 ev, err := r.ReadByte()
367 if err != nil {
368 return nil, err
369 }
370 if tracev2.EventType(ev) != tracev2.EvCPUSample {
371 return nil, fmt.Errorf("expected CPU sample event, got %d", ev)
372 }
373
374
375 ts, err := binary.ReadUvarint(r)
376 if err != nil {
377 return nil, err
378 }
379
380
381 m, err := binary.ReadUvarint(r)
382 if err != nil {
383 return nil, err
384 }
385 mid := ThreadID(m)
386
387
388 p, err := binary.ReadUvarint(r)
389 if err != nil {
390 return nil, err
391 }
392 pid := ProcID(p)
393
394
395 g, err := binary.ReadUvarint(r)
396 if err != nil {
397 return nil, err
398 }
399 goid := GoID(g)
400 if g == 0 {
401 goid = NoGoroutine
402 }
403
404
405 s, err := binary.ReadUvarint(r)
406 if err != nil {
407 return nil, err
408 }
409
410
411 samples = append(samples, cpuSample{
412 schedCtx: schedCtx{
413 M: mid,
414 P: pid,
415 G: goid,
416 },
417 time: Time(ts),
418 stack: stackID(s),
419 })
420 }
421 return samples, nil
422 }
423
424
425 func parseFreq(b batch) (frequency, error) {
426 if !b.isFreqBatch() {
427 return 0, fmt.Errorf("internal error: parseFreq called on non-frequency batch")
428 }
429 r := bytes.NewReader(b.data)
430 r.ReadByte()
431
432
433 f, err := binary.ReadUvarint(r)
434 if err != nil {
435 return 0, err
436 }
437
438 return frequency(1.0 / (float64(f) / 1e9)), nil
439 }
440
441
442
443 func addExperimentalBatch(expBatches map[tracev2.Experiment][]ExperimentalBatch, b batch) error {
444 if b.exp == tracev2.NoExperiment {
445 return fmt.Errorf("internal error: addExperimentalBatch called on non-experimental batch")
446 }
447 expBatches[b.exp] = append(expBatches[b.exp], ExperimentalBatch{
448 Thread: b.m,
449 Data: b.data,
450 })
451 return nil
452 }
453
View as plain text