1
2
3
4
5 package trace
6
7 import (
8 "bytes"
9 "encoding/binary"
10 "fmt"
11 "io"
12
13 "internal/trace/tracev2"
14 "internal/trace/version"
15 )
16
17
18 type timestamp uint64
19
20
21
22 type batch struct {
23 m ThreadID
24 time timestamp
25 data []byte
26 exp tracev2.Experiment
27 }
28
29 func (b *batch) isStringsBatch() bool {
30 return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvStrings
31 }
32
33 func (b *batch) isStacksBatch() bool {
34 return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvStacks
35 }
36
37 func (b *batch) isCPUSamplesBatch() bool {
38 return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvCPUSamples
39 }
40
41 func (b *batch) isSyncBatch(ver version.Version) bool {
42 return (b.exp == tracev2.NoExperiment && len(b.data) > 0) &&
43 ((tracev2.EventType(b.data[0]) == tracev2.EvFrequency && ver < version.Go125) ||
44 (tracev2.EventType(b.data[0]) == tracev2.EvSync && ver >= version.Go125))
45 }
46
47 func (b *batch) isEndOfGeneration() bool {
48 return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvEndOfGeneration
49 }
50
51
52 func readBatch(r interface {
53 io.Reader
54 io.ByteReader
55 }) (batch, uint64, error) {
56
57 b, err := r.ReadByte()
58 if err != nil {
59 return batch{}, 0, err
60 }
61 if typ := tracev2.EventType(b); typ == tracev2.EvEndOfGeneration {
62 return batch{m: NoThread, exp: tracev2.NoExperiment, data: []byte{b}}, 0, nil
63 }
64 if typ := tracev2.EventType(b); typ != tracev2.EvEventBatch && typ != tracev2.EvExperimentalBatch {
65 return batch{}, 0, fmt.Errorf("expected batch event, got event %d", typ)
66 }
67
68
69 exp := tracev2.NoExperiment
70 if tracev2.EventType(b) == tracev2.EvExperimentalBatch {
71 e, err := r.ReadByte()
72 if err != nil {
73 return batch{}, 0, err
74 }
75 exp = tracev2.Experiment(e)
76 }
77
78
79
80 gen, err := binary.ReadUvarint(r)
81 if err != nil {
82 return batch{}, gen, fmt.Errorf("error reading batch gen: %w", err)
83 }
84 m, err := binary.ReadUvarint(r)
85 if err != nil {
86 return batch{}, gen, fmt.Errorf("error reading batch M ID: %w", err)
87 }
88 ts, err := binary.ReadUvarint(r)
89 if err != nil {
90 return batch{}, gen, fmt.Errorf("error reading batch timestamp: %w", err)
91 }
92
93
94 size, err := binary.ReadUvarint(r)
95 if err != nil {
96 return batch{}, gen, fmt.Errorf("error reading batch size: %w", err)
97 }
98 if size > tracev2.MaxBatchSize {
99 return batch{}, gen, fmt.Errorf("invalid batch size %d, maximum is %d", size, tracev2.MaxBatchSize)
100 }
101
102
103 var data bytes.Buffer
104 data.Grow(int(size))
105 n, err := io.CopyN(&data, r, int64(size))
106 if n != int64(size) {
107 return batch{}, gen, fmt.Errorf("failed to read full batch: read %d but wanted %d", n, size)
108 }
109 if err != nil {
110 return batch{}, gen, fmt.Errorf("copying batch data: %w", err)
111 }
112
113
114 return batch{
115 m: ThreadID(m),
116 time: timestamp(ts),
117 data: data.Bytes(),
118 exp: exp,
119 }, gen, nil
120 }
121
View as plain text