Source file src/internal/trace/generation.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package trace
     6  
     7  import (
     8  	"bufio"
     9  	"bytes"
    10  	"cmp"
    11  	"encoding/binary"
    12  	"fmt"
    13  	"io"
    14  	"slices"
    15  	"strings"
    16  
    17  	"internal/trace/tracev2"
    18  )
    19  
    20  // generation contains all the trace data for a single
    21  // trace generation. It is purely data: it does not
    22  // track any parse state nor does it contain a cursor
    23  // into the generation.
    24  type generation struct {
    25  	gen        uint64
    26  	batches    map[ThreadID][]batch
    27  	batchMs    []ThreadID
    28  	cpuSamples []cpuSample
    29  	minTs      timestamp
    30  	*evTable
    31  }
    32  
    33  // spilledBatch represents a batch that was read out for the next generation,
    34  // while reading the previous one. It's passed on when parsing the next
    35  // generation.
    36  type spilledBatch struct {
    37  	gen uint64
    38  	*batch
    39  }
    40  
    41  // readGeneration buffers and decodes the structural elements of a trace generation
    42  // out of r. spill is the first batch of the new generation (already buffered and
    43  // parsed from reading the last generation). Returns the generation and the first
    44  // batch read of the next generation, if any.
    45  //
    46  // If gen is non-nil, it is valid and must be processed before handling the returned
    47  // error.
    48  func readGeneration(r *bufio.Reader, spill *spilledBatch) (*generation, *spilledBatch, error) {
    49  	g := &generation{
    50  		evTable: &evTable{
    51  			pcs: make(map[uint64]frame),
    52  		},
    53  		batches: make(map[ThreadID][]batch),
    54  	}
    55  	// Process the spilled batch.
    56  	if spill != nil {
    57  		g.gen = spill.gen
    58  		if err := processBatch(g, *spill.batch); err != nil {
    59  			return nil, nil, err
    60  		}
    61  		spill = nil
    62  	}
    63  	// Read batches one at a time until we either hit EOF or
    64  	// the next generation.
    65  	var spillErr error
    66  	for {
    67  		b, gen, err := readBatch(r)
    68  		if err == io.EOF {
    69  			break
    70  		}
    71  		if err != nil {
    72  			if g.gen != 0 {
    73  				// This is an error reading the first batch of the next generation.
    74  				// This is fine. Let's forge ahead assuming that what we've got so
    75  				// far is fine.
    76  				spillErr = err
    77  				break
    78  			}
    79  			return nil, nil, err
    80  		}
    81  		if gen == 0 {
    82  			// 0 is a sentinel used by the runtime, so we'll never see it.
    83  			return nil, nil, fmt.Errorf("invalid generation number %d", gen)
    84  		}
    85  		if g.gen == 0 {
    86  			// Initialize gen.
    87  			g.gen = gen
    88  		}
    89  		if gen == g.gen+1 { // TODO: advance this the same way the runtime does.
    90  			spill = &spilledBatch{gen: gen, batch: &b}
    91  			break
    92  		}
    93  		if gen != g.gen {
    94  			// N.B. Fail as fast as possible if we see this. At first it
    95  			// may seem prudent to be fault-tolerant and assume we have a
    96  			// complete generation, parsing and returning that first. However,
    97  			// if the batches are mixed across generations then it's likely
    98  			// we won't be able to parse this generation correctly at all.
    99  			// Rather than return a cryptic error in that case, indicate the
   100  			// problem as soon as we see it.
   101  			return nil, nil, fmt.Errorf("generations out of order")
   102  		}
   103  		if g.minTs == 0 || b.time < g.minTs {
   104  			g.minTs = b.time
   105  		}
   106  		if err := processBatch(g, b); err != nil {
   107  			return nil, nil, err
   108  		}
   109  	}
   110  
   111  	// Check some invariants.
   112  	if g.freq == 0 {
   113  		return nil, nil, fmt.Errorf("no frequency event found")
   114  	}
   115  	// N.B. Trust that the batch order is correct. We can't validate the batch order
   116  	// by timestamp because the timestamps could just be plain wrong. The source of
   117  	// truth is the order things appear in the trace and the partial order sequence
   118  	// numbers on certain events. If it turns out the batch order is actually incorrect
   119  	// we'll very likely fail to advance a partial order from the frontier.
   120  
   121  	// Compactify stacks and strings for better lookup performance later.
   122  	g.stacks.compactify()
   123  	g.strings.compactify()
   124  
   125  	// Validate stacks.
   126  	if err := validateStackStrings(&g.stacks, &g.strings, g.pcs); err != nil {
   127  		return nil, nil, err
   128  	}
   129  
   130  	// Fix up the CPU sample timestamps, now that we have freq.
   131  	for i := range g.cpuSamples {
   132  		s := &g.cpuSamples[i]
   133  		s.time = g.freq.mul(timestamp(s.time))
   134  	}
   135  	// Sort the CPU samples.
   136  	slices.SortFunc(g.cpuSamples, func(a, b cpuSample) int {
   137  		return cmp.Compare(a.time, b.time)
   138  	})
   139  	return g, spill, spillErr
   140  }
   141  
   142  // processBatch adds the batch to the generation.
   143  func processBatch(g *generation, b batch) error {
   144  	switch {
   145  	case b.isStringsBatch():
   146  		if err := addStrings(&g.strings, b); err != nil {
   147  			return err
   148  		}
   149  	case b.isStacksBatch():
   150  		if err := addStacks(&g.stacks, g.pcs, b); err != nil {
   151  			return err
   152  		}
   153  	case b.isCPUSamplesBatch():
   154  		samples, err := addCPUSamples(g.cpuSamples, b)
   155  		if err != nil {
   156  			return err
   157  		}
   158  		g.cpuSamples = samples
   159  	case b.isFreqBatch():
   160  		freq, err := parseFreq(b)
   161  		if err != nil {
   162  			return err
   163  		}
   164  		if g.freq != 0 {
   165  			return fmt.Errorf("found multiple frequency events")
   166  		}
   167  		g.freq = freq
   168  	case b.exp != tracev2.NoExperiment:
   169  		if g.expBatches == nil {
   170  			g.expBatches = make(map[tracev2.Experiment][]ExperimentalBatch)
   171  		}
   172  		if err := addExperimentalBatch(g.expBatches, b); err != nil {
   173  			return err
   174  		}
   175  	default:
   176  		if _, ok := g.batches[b.m]; !ok {
   177  			g.batchMs = append(g.batchMs, b.m)
   178  		}
   179  		g.batches[b.m] = append(g.batches[b.m], b)
   180  	}
   181  	return nil
   182  }
   183  
   184  // validateStackStrings makes sure all the string references in
   185  // the stack table are present in the string table.
   186  func validateStackStrings(
   187  	stacks *dataTable[stackID, stack],
   188  	strings *dataTable[stringID, string],
   189  	frames map[uint64]frame,
   190  ) error {
   191  	var err error
   192  	stacks.forEach(func(id stackID, stk stack) bool {
   193  		for _, pc := range stk.pcs {
   194  			frame, ok := frames[pc]
   195  			if !ok {
   196  				err = fmt.Errorf("found unknown pc %x for stack %d", pc, id)
   197  				return false
   198  			}
   199  			_, ok = strings.get(frame.funcID)
   200  			if !ok {
   201  				err = fmt.Errorf("found invalid func string ID %d for stack %d", frame.funcID, id)
   202  				return false
   203  			}
   204  			_, ok = strings.get(frame.fileID)
   205  			if !ok {
   206  				err = fmt.Errorf("found invalid file string ID %d for stack %d", frame.fileID, id)
   207  				return false
   208  			}
   209  		}
   210  		return true
   211  	})
   212  	return err
   213  }
   214  
   215  // addStrings takes a batch whose first byte is an EvStrings event
   216  // (indicating that the batch contains only strings) and adds each
   217  // string contained therein to the provided strings map.
   218  func addStrings(stringTable *dataTable[stringID, string], b batch) error {
   219  	if !b.isStringsBatch() {
   220  		return fmt.Errorf("internal error: addStrings called on non-string batch")
   221  	}
   222  	r := bytes.NewReader(b.data)
   223  	hdr, err := r.ReadByte() // Consume the EvStrings byte.
   224  	if err != nil || tracev2.EventType(hdr) != tracev2.EvStrings {
   225  		return fmt.Errorf("missing strings batch header")
   226  	}
   227  
   228  	var sb strings.Builder
   229  	for r.Len() != 0 {
   230  		// Read the header.
   231  		ev, err := r.ReadByte()
   232  		if err != nil {
   233  			return err
   234  		}
   235  		if tracev2.EventType(ev) != tracev2.EvString {
   236  			return fmt.Errorf("expected string event, got %d", ev)
   237  		}
   238  
   239  		// Read the string's ID.
   240  		id, err := binary.ReadUvarint(r)
   241  		if err != nil {
   242  			return err
   243  		}
   244  
   245  		// Read the string's length.
   246  		len, err := binary.ReadUvarint(r)
   247  		if err != nil {
   248  			return err
   249  		}
   250  		if len > tracev2.MaxEventTrailerDataSize {
   251  			return fmt.Errorf("invalid string size %d, maximum is %d", len, tracev2.MaxEventTrailerDataSize)
   252  		}
   253  
   254  		// Copy out the string.
   255  		n, err := io.CopyN(&sb, r, int64(len))
   256  		if n != int64(len) {
   257  			return fmt.Errorf("failed to read full string: read %d but wanted %d", n, len)
   258  		}
   259  		if err != nil {
   260  			return fmt.Errorf("copying string data: %w", err)
   261  		}
   262  
   263  		// Add the string to the map.
   264  		s := sb.String()
   265  		sb.Reset()
   266  		if err := stringTable.insert(stringID(id), s); err != nil {
   267  			return err
   268  		}
   269  	}
   270  	return nil
   271  }
   272  
   273  // addStacks takes a batch whose first byte is an EvStacks event
   274  // (indicating that the batch contains only stacks) and adds each
   275  // string contained therein to the provided stacks map.
   276  func addStacks(stackTable *dataTable[stackID, stack], pcs map[uint64]frame, b batch) error {
   277  	if !b.isStacksBatch() {
   278  		return fmt.Errorf("internal error: addStacks called on non-stacks batch")
   279  	}
   280  	r := bytes.NewReader(b.data)
   281  	hdr, err := r.ReadByte() // Consume the EvStacks byte.
   282  	if err != nil || tracev2.EventType(hdr) != tracev2.EvStacks {
   283  		return fmt.Errorf("missing stacks batch header")
   284  	}
   285  
   286  	for r.Len() != 0 {
   287  		// Read the header.
   288  		ev, err := r.ReadByte()
   289  		if err != nil {
   290  			return err
   291  		}
   292  		if tracev2.EventType(ev) != tracev2.EvStack {
   293  			return fmt.Errorf("expected stack event, got %d", ev)
   294  		}
   295  
   296  		// Read the stack's ID.
   297  		id, err := binary.ReadUvarint(r)
   298  		if err != nil {
   299  			return err
   300  		}
   301  
   302  		// Read how many frames are in each stack.
   303  		nFrames, err := binary.ReadUvarint(r)
   304  		if err != nil {
   305  			return err
   306  		}
   307  		if nFrames > tracev2.MaxFramesPerStack {
   308  			return fmt.Errorf("invalid stack size %d, maximum is %d", nFrames, tracev2.MaxFramesPerStack)
   309  		}
   310  
   311  		// Each frame consists of 4 fields: pc, funcID (string), fileID (string), line.
   312  		frames := make([]uint64, 0, nFrames)
   313  		for i := uint64(0); i < nFrames; i++ {
   314  			// Read the frame data.
   315  			pc, err := binary.ReadUvarint(r)
   316  			if err != nil {
   317  				return fmt.Errorf("reading frame %d's PC for stack %d: %w", i+1, id, err)
   318  			}
   319  			funcID, err := binary.ReadUvarint(r)
   320  			if err != nil {
   321  				return fmt.Errorf("reading frame %d's funcID for stack %d: %w", i+1, id, err)
   322  			}
   323  			fileID, err := binary.ReadUvarint(r)
   324  			if err != nil {
   325  				return fmt.Errorf("reading frame %d's fileID for stack %d: %w", i+1, id, err)
   326  			}
   327  			line, err := binary.ReadUvarint(r)
   328  			if err != nil {
   329  				return fmt.Errorf("reading frame %d's line for stack %d: %w", i+1, id, err)
   330  			}
   331  			frames = append(frames, pc)
   332  
   333  			if _, ok := pcs[pc]; !ok {
   334  				pcs[pc] = frame{
   335  					pc:     pc,
   336  					funcID: stringID(funcID),
   337  					fileID: stringID(fileID),
   338  					line:   line,
   339  				}
   340  			}
   341  		}
   342  
   343  		// Add the stack to the map.
   344  		if err := stackTable.insert(stackID(id), stack{pcs: frames}); err != nil {
   345  			return err
   346  		}
   347  	}
   348  	return nil
   349  }
   350  
   351  // addCPUSamples takes a batch whose first byte is an EvCPUSamples event
   352  // (indicating that the batch contains only CPU samples) and adds each
   353  // sample contained therein to the provided samples list.
   354  func addCPUSamples(samples []cpuSample, b batch) ([]cpuSample, error) {
   355  	if !b.isCPUSamplesBatch() {
   356  		return nil, fmt.Errorf("internal error: addCPUSamples called on non-CPU-sample batch")
   357  	}
   358  	r := bytes.NewReader(b.data)
   359  	hdr, err := r.ReadByte() // Consume the EvCPUSamples byte.
   360  	if err != nil || tracev2.EventType(hdr) != tracev2.EvCPUSamples {
   361  		return nil, fmt.Errorf("missing CPU samples batch header")
   362  	}
   363  
   364  	for r.Len() != 0 {
   365  		// Read the header.
   366  		ev, err := r.ReadByte()
   367  		if err != nil {
   368  			return nil, err
   369  		}
   370  		if tracev2.EventType(ev) != tracev2.EvCPUSample {
   371  			return nil, fmt.Errorf("expected CPU sample event, got %d", ev)
   372  		}
   373  
   374  		// Read the sample's timestamp.
   375  		ts, err := binary.ReadUvarint(r)
   376  		if err != nil {
   377  			return nil, err
   378  		}
   379  
   380  		// Read the sample's M.
   381  		m, err := binary.ReadUvarint(r)
   382  		if err != nil {
   383  			return nil, err
   384  		}
   385  		mid := ThreadID(m)
   386  
   387  		// Read the sample's P.
   388  		p, err := binary.ReadUvarint(r)
   389  		if err != nil {
   390  			return nil, err
   391  		}
   392  		pid := ProcID(p)
   393  
   394  		// Read the sample's G.
   395  		g, err := binary.ReadUvarint(r)
   396  		if err != nil {
   397  			return nil, err
   398  		}
   399  		goid := GoID(g)
   400  		if g == 0 {
   401  			goid = NoGoroutine
   402  		}
   403  
   404  		// Read the sample's stack.
   405  		s, err := binary.ReadUvarint(r)
   406  		if err != nil {
   407  			return nil, err
   408  		}
   409  
   410  		// Add the sample to the slice.
   411  		samples = append(samples, cpuSample{
   412  			schedCtx: schedCtx{
   413  				M: mid,
   414  				P: pid,
   415  				G: goid,
   416  			},
   417  			time:  Time(ts), // N.B. this is really a "timestamp," not a Time.
   418  			stack: stackID(s),
   419  		})
   420  	}
   421  	return samples, nil
   422  }
   423  
   424  // parseFreq parses out a lone EvFrequency from a batch.
   425  func parseFreq(b batch) (frequency, error) {
   426  	if !b.isFreqBatch() {
   427  		return 0, fmt.Errorf("internal error: parseFreq called on non-frequency batch")
   428  	}
   429  	r := bytes.NewReader(b.data)
   430  	r.ReadByte() // Consume the EvFrequency byte.
   431  
   432  	// Read the frequency. It'll come out as timestamp units per second.
   433  	f, err := binary.ReadUvarint(r)
   434  	if err != nil {
   435  		return 0, err
   436  	}
   437  	// Convert to nanoseconds per timestamp unit.
   438  	return frequency(1.0 / (float64(f) / 1e9)), nil
   439  }
   440  
   441  // addExperimentalBatch takes an experimental batch and adds it to the list of experimental
   442  // batches for the experiment its a part of.
   443  func addExperimentalBatch(expBatches map[tracev2.Experiment][]ExperimentalBatch, b batch) error {
   444  	if b.exp == tracev2.NoExperiment {
   445  		return fmt.Errorf("internal error: addExperimentalBatch called on non-experimental batch")
   446  	}
   447  	expBatches[b.exp] = append(expBatches[b.exp], ExperimentalBatch{
   448  		Thread: b.m,
   449  		Data:   b.data,
   450  	})
   451  	return nil
   452  }
   453  

View as plain text