Source file src/internal/trace/generation.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package trace
     6  
     7  import (
     8  	"bufio"
     9  	"bytes"
    10  	"cmp"
    11  	"encoding/binary"
    12  	"errors"
    13  	"fmt"
    14  	"io"
    15  	"slices"
    16  	"strings"
    17  	"time"
    18  
    19  	"internal/trace/tracev2"
    20  	"internal/trace/version"
    21  )
    22  
    23  // generation contains all the trace data for a single
    24  // trace generation. It is purely data: it does not
    25  // track any parse state nor does it contain a cursor
    26  // into the generation.
    27  type generation struct {
    28  	gen        uint64
    29  	batches    map[ThreadID][]batch
    30  	batchMs    []ThreadID
    31  	cpuSamples []cpuSample
    32  	minTs      timestamp
    33  	*evTable
    34  }
    35  
    36  // readGeneration buffers and decodes the structural elements of a trace generation
    37  // out of r.
    38  func readGeneration(r *bufio.Reader, ver version.Version) (*generation, error) {
    39  	if ver < version.Go126 {
    40  		return nil, errors.New("internal error: readGeneration called for <1.26 trace")
    41  	}
    42  	g := &generation{
    43  		evTable: &evTable{
    44  			pcs: make(map[uint64]frame),
    45  		},
    46  		batches: make(map[ThreadID][]batch),
    47  	}
    48  
    49  	// Read batches one at a time until we either hit the next generation.
    50  	for {
    51  		b, gen, err := readBatch(r)
    52  		if err == io.EOF {
    53  			if len(g.batches) != 0 {
    54  				return nil, errors.New("incomplete generation found; trace likely truncated")
    55  			}
    56  			return nil, nil // All done.
    57  		}
    58  		if err != nil {
    59  			return nil, err
    60  		}
    61  		if g.gen == 0 {
    62  			// Initialize gen.
    63  			g.gen = gen
    64  		}
    65  		if b.isEndOfGeneration() {
    66  			break
    67  		}
    68  		if gen == 0 {
    69  			// 0 is a sentinel used by the runtime, so we'll never see it.
    70  			return nil, fmt.Errorf("invalid generation number %d", gen)
    71  		}
    72  		if gen != g.gen {
    73  			return nil, fmt.Errorf("broken trace: missing end-of-generation event, or generations are interleaved")
    74  		}
    75  		if g.minTs == 0 || b.time < g.minTs {
    76  			g.minTs = b.time
    77  		}
    78  		if err := processBatch(g, b, ver); err != nil {
    79  			return nil, err
    80  		}
    81  	}
    82  
    83  	// Check some invariants.
    84  	if g.freq == 0 {
    85  		return nil, fmt.Errorf("no frequency event found")
    86  	}
    87  	if !g.hasClockSnapshot {
    88  		return nil, fmt.Errorf("no clock snapshot event found")
    89  	}
    90  
    91  	// N.B. Trust that the batch order is correct. We can't validate the batch order
    92  	// by timestamp because the timestamps could just be plain wrong. The source of
    93  	// truth is the order things appear in the trace and the partial order sequence
    94  	// numbers on certain events. If it turns out the batch order is actually incorrect
    95  	// we'll very likely fail to advance a partial order from the frontier.
    96  
    97  	// Compactify stacks and strings for better lookup performance later.
    98  	g.stacks.compactify()
    99  	g.strings.compactify()
   100  
   101  	// Validate stacks.
   102  	if err := validateStackStrings(&g.stacks, &g.strings, g.pcs); err != nil {
   103  		return nil, err
   104  	}
   105  
   106  	// Now that we have the frequency, fix up CPU samples.
   107  	fixUpCPUSamples(g.cpuSamples, g.freq)
   108  	return g, nil
   109  }
   110  
   111  // spilledBatch represents a batch that was read out for the next generation,
   112  // while reading the previous one. It's passed on when parsing the next
   113  // generation.
   114  //
   115  // Used only for trace versions < Go126.
   116  type spilledBatch struct {
   117  	gen uint64
   118  	*batch
   119  }
   120  
   121  // readGenerationWithSpill buffers and decodes the structural elements of a trace generation
   122  // out of r. spill is the first batch of the new generation (already buffered and
   123  // parsed from reading the last generation). Returns the generation and the first
   124  // batch read of the next generation, if any.
   125  //
   126  // If gen is non-nil, it is valid and must be processed before handling the returned
   127  // error.
   128  func readGenerationWithSpill(r *bufio.Reader, spill *spilledBatch, ver version.Version) (*generation, *spilledBatch, error) {
   129  	if ver >= version.Go126 {
   130  		return nil, nil, errors.New("internal error: readGenerationWithSpill called for Go 1.26+ trace")
   131  	}
   132  	g := &generation{
   133  		evTable: &evTable{
   134  			pcs: make(map[uint64]frame),
   135  		},
   136  		batches: make(map[ThreadID][]batch),
   137  	}
   138  	// Process the spilled batch.
   139  	if spill != nil {
   140  		// Process the spilled batch, which contains real data.
   141  		g.gen = spill.gen
   142  		g.minTs = spill.batch.time
   143  		if err := processBatch(g, *spill.batch, ver); err != nil {
   144  			return nil, nil, err
   145  		}
   146  		spill = nil
   147  	}
   148  	// Read batches one at a time until we either hit the next generation.
   149  	var spillErr error
   150  	for {
   151  		b, gen, err := readBatch(r)
   152  		if err == io.EOF {
   153  			break
   154  		}
   155  		if err != nil {
   156  			if g.gen != 0 {
   157  				// This may be an error reading the first batch of the next generation.
   158  				// This is fine. Let's forge ahead assuming that what we've got so
   159  				// far is fine.
   160  				spillErr = err
   161  				break
   162  			}
   163  			return nil, nil, err
   164  		}
   165  		if gen == 0 {
   166  			// 0 is a sentinel used by the runtime, so we'll never see it.
   167  			return nil, nil, fmt.Errorf("invalid generation number %d", gen)
   168  		}
   169  		if g.gen == 0 {
   170  			// Initialize gen.
   171  			g.gen = gen
   172  		}
   173  		if gen == g.gen+1 {
   174  			// TODO: Increment the generation with wraparound the same way the runtime does.
   175  			spill = &spilledBatch{gen: gen, batch: &b}
   176  			break
   177  		}
   178  		if gen != g.gen {
   179  			// N.B. Fail as fast as possible if we see this. At first it
   180  			// may seem prudent to be fault-tolerant and assume we have a
   181  			// complete generation, parsing and returning that first. However,
   182  			// if the batches are mixed across generations then it's likely
   183  			// we won't be able to parse this generation correctly at all.
   184  			// Rather than return a cryptic error in that case, indicate the
   185  			// problem as soon as we see it.
   186  			return nil, nil, fmt.Errorf("generations out of order")
   187  		}
   188  		if g.minTs == 0 || b.time < g.minTs {
   189  			g.minTs = b.time
   190  		}
   191  		if err := processBatch(g, b, ver); err != nil {
   192  			return nil, nil, err
   193  		}
   194  	}
   195  
   196  	// Check some invariants.
   197  	if g.freq == 0 {
   198  		return nil, nil, fmt.Errorf("no frequency event found")
   199  	}
   200  	if ver >= version.Go125 && !g.hasClockSnapshot {
   201  		return nil, nil, fmt.Errorf("no clock snapshot event found")
   202  	}
   203  
   204  	// N.B. Trust that the batch order is correct. We can't validate the batch order
   205  	// by timestamp because the timestamps could just be plain wrong. The source of
   206  	// truth is the order things appear in the trace and the partial order sequence
   207  	// numbers on certain events. If it turns out the batch order is actually incorrect
   208  	// we'll very likely fail to advance a partial order from the frontier.
   209  
   210  	// Compactify stacks and strings for better lookup performance later.
   211  	g.stacks.compactify()
   212  	g.strings.compactify()
   213  
   214  	// Validate stacks.
   215  	if err := validateStackStrings(&g.stacks, &g.strings, g.pcs); err != nil {
   216  		return nil, nil, err
   217  	}
   218  
   219  	// Now that we have the frequency, fix up CPU samples.
   220  	fixUpCPUSamples(g.cpuSamples, g.freq)
   221  	return g, spill, spillErr
   222  }
   223  
   224  // processBatch adds the batch to the generation.
   225  func processBatch(g *generation, b batch, ver version.Version) error {
   226  	switch {
   227  	case b.isStringsBatch():
   228  		if err := addStrings(&g.strings, b); err != nil {
   229  			return err
   230  		}
   231  	case b.isStacksBatch():
   232  		if err := addStacks(&g.stacks, g.pcs, b); err != nil {
   233  			return err
   234  		}
   235  	case b.isCPUSamplesBatch():
   236  		samples, err := addCPUSamples(g.cpuSamples, b)
   237  		if err != nil {
   238  			return err
   239  		}
   240  		g.cpuSamples = samples
   241  	case b.isSyncBatch(ver):
   242  		if err := setSyncBatch(&g.sync, b, ver); err != nil {
   243  			return err
   244  		}
   245  	case b.exp != tracev2.NoExperiment:
   246  		if g.expBatches == nil {
   247  			g.expBatches = make(map[tracev2.Experiment][]ExperimentalBatch)
   248  		}
   249  		if err := addExperimentalBatch(g.expBatches, b); err != nil {
   250  			return err
   251  		}
   252  	case b.isEndOfGeneration():
   253  		return errors.New("internal error: unexpectedly processing EndOfGeneration; broken trace?")
   254  	default:
   255  		if _, ok := g.batches[b.m]; !ok {
   256  			g.batchMs = append(g.batchMs, b.m)
   257  		}
   258  		g.batches[b.m] = append(g.batches[b.m], b)
   259  	}
   260  	return nil
   261  }
   262  
   263  // validateStackStrings makes sure all the string references in
   264  // the stack table are present in the string table.
   265  func validateStackStrings(
   266  	stacks *dataTable[stackID, stack],
   267  	strings *dataTable[stringID, string],
   268  	frames map[uint64]frame,
   269  ) error {
   270  	var err error
   271  	stacks.forEach(func(id stackID, stk stack) bool {
   272  		for _, pc := range stk.pcs {
   273  			frame, ok := frames[pc]
   274  			if !ok {
   275  				err = fmt.Errorf("found unknown pc %x for stack %d", pc, id)
   276  				return false
   277  			}
   278  			_, ok = strings.get(frame.funcID)
   279  			if !ok {
   280  				err = fmt.Errorf("found invalid func string ID %d for stack %d", frame.funcID, id)
   281  				return false
   282  			}
   283  			_, ok = strings.get(frame.fileID)
   284  			if !ok {
   285  				err = fmt.Errorf("found invalid file string ID %d for stack %d", frame.fileID, id)
   286  				return false
   287  			}
   288  		}
   289  		return true
   290  	})
   291  	return err
   292  }
   293  
   294  // addStrings takes a batch whose first byte is an EvStrings event
   295  // (indicating that the batch contains only strings) and adds each
   296  // string contained therein to the provided strings map.
   297  func addStrings(stringTable *dataTable[stringID, string], b batch) error {
   298  	if !b.isStringsBatch() {
   299  		return fmt.Errorf("internal error: addStrings called on non-string batch")
   300  	}
   301  	r := bytes.NewReader(b.data)
   302  	hdr, err := r.ReadByte() // Consume the EvStrings byte.
   303  	if err != nil || tracev2.EventType(hdr) != tracev2.EvStrings {
   304  		return fmt.Errorf("missing strings batch header")
   305  	}
   306  
   307  	var sb strings.Builder
   308  	for r.Len() != 0 {
   309  		// Read the header.
   310  		ev, err := r.ReadByte()
   311  		if err != nil {
   312  			return err
   313  		}
   314  		if tracev2.EventType(ev) != tracev2.EvString {
   315  			return fmt.Errorf("expected string event, got %d", ev)
   316  		}
   317  
   318  		// Read the string's ID.
   319  		id, err := binary.ReadUvarint(r)
   320  		if err != nil {
   321  			return err
   322  		}
   323  
   324  		// Read the string's length.
   325  		len, err := binary.ReadUvarint(r)
   326  		if err != nil {
   327  			return err
   328  		}
   329  		if len > tracev2.MaxEventTrailerDataSize {
   330  			return fmt.Errorf("invalid string size %d, maximum is %d", len, tracev2.MaxEventTrailerDataSize)
   331  		}
   332  
   333  		// Copy out the string.
   334  		n, err := io.CopyN(&sb, r, int64(len))
   335  		if n != int64(len) {
   336  			return fmt.Errorf("failed to read full string: read %d but wanted %d", n, len)
   337  		}
   338  		if err != nil {
   339  			return fmt.Errorf("copying string data: %w", err)
   340  		}
   341  
   342  		// Add the string to the map.
   343  		s := sb.String()
   344  		sb.Reset()
   345  		if err := stringTable.insert(stringID(id), s); err != nil {
   346  			return err
   347  		}
   348  	}
   349  	return nil
   350  }
   351  
   352  // addStacks takes a batch whose first byte is an EvStacks event
   353  // (indicating that the batch contains only stacks) and adds each
   354  // string contained therein to the provided stacks map.
   355  func addStacks(stackTable *dataTable[stackID, stack], pcs map[uint64]frame, b batch) error {
   356  	if !b.isStacksBatch() {
   357  		return fmt.Errorf("internal error: addStacks called on non-stacks batch")
   358  	}
   359  	r := bytes.NewReader(b.data)
   360  	hdr, err := r.ReadByte() // Consume the EvStacks byte.
   361  	if err != nil || tracev2.EventType(hdr) != tracev2.EvStacks {
   362  		return fmt.Errorf("missing stacks batch header")
   363  	}
   364  
   365  	for r.Len() != 0 {
   366  		// Read the header.
   367  		ev, err := r.ReadByte()
   368  		if err != nil {
   369  			return err
   370  		}
   371  		if tracev2.EventType(ev) != tracev2.EvStack {
   372  			return fmt.Errorf("expected stack event, got %d", ev)
   373  		}
   374  
   375  		// Read the stack's ID.
   376  		id, err := binary.ReadUvarint(r)
   377  		if err != nil {
   378  			return err
   379  		}
   380  
   381  		// Read how many frames are in each stack.
   382  		nFrames, err := binary.ReadUvarint(r)
   383  		if err != nil {
   384  			return err
   385  		}
   386  		if nFrames > tracev2.MaxFramesPerStack {
   387  			return fmt.Errorf("invalid stack size %d, maximum is %d", nFrames, tracev2.MaxFramesPerStack)
   388  		}
   389  
   390  		// Each frame consists of 4 fields: pc, funcID (string), fileID (string), line.
   391  		frames := make([]uint64, 0, nFrames)
   392  		for i := uint64(0); i < nFrames; i++ {
   393  			// Read the frame data.
   394  			pc, err := binary.ReadUvarint(r)
   395  			if err != nil {
   396  				return fmt.Errorf("reading frame %d's PC for stack %d: %w", i+1, id, err)
   397  			}
   398  			funcID, err := binary.ReadUvarint(r)
   399  			if err != nil {
   400  				return fmt.Errorf("reading frame %d's funcID for stack %d: %w", i+1, id, err)
   401  			}
   402  			fileID, err := binary.ReadUvarint(r)
   403  			if err != nil {
   404  				return fmt.Errorf("reading frame %d's fileID for stack %d: %w", i+1, id, err)
   405  			}
   406  			line, err := binary.ReadUvarint(r)
   407  			if err != nil {
   408  				return fmt.Errorf("reading frame %d's line for stack %d: %w", i+1, id, err)
   409  			}
   410  			frames = append(frames, pc)
   411  
   412  			if _, ok := pcs[pc]; !ok {
   413  				pcs[pc] = frame{
   414  					pc:     pc,
   415  					funcID: stringID(funcID),
   416  					fileID: stringID(fileID),
   417  					line:   line,
   418  				}
   419  			}
   420  		}
   421  
   422  		// Add the stack to the map.
   423  		if err := stackTable.insert(stackID(id), stack{pcs: frames}); err != nil {
   424  			return err
   425  		}
   426  	}
   427  	return nil
   428  }
   429  
   430  // addCPUSamples takes a batch whose first byte is an EvCPUSamples event
   431  // (indicating that the batch contains only CPU samples) and adds each
   432  // sample contained therein to the provided samples list.
   433  func addCPUSamples(samples []cpuSample, b batch) ([]cpuSample, error) {
   434  	if !b.isCPUSamplesBatch() {
   435  		return nil, fmt.Errorf("internal error: addCPUSamples called on non-CPU-sample batch")
   436  	}
   437  	r := bytes.NewReader(b.data)
   438  	hdr, err := r.ReadByte() // Consume the EvCPUSamples byte.
   439  	if err != nil || tracev2.EventType(hdr) != tracev2.EvCPUSamples {
   440  		return nil, fmt.Errorf("missing CPU samples batch header")
   441  	}
   442  
   443  	for r.Len() != 0 {
   444  		// Read the header.
   445  		ev, err := r.ReadByte()
   446  		if err != nil {
   447  			return nil, err
   448  		}
   449  		if tracev2.EventType(ev) != tracev2.EvCPUSample {
   450  			return nil, fmt.Errorf("expected CPU sample event, got %d", ev)
   451  		}
   452  
   453  		// Read the sample's timestamp.
   454  		ts, err := binary.ReadUvarint(r)
   455  		if err != nil {
   456  			return nil, err
   457  		}
   458  
   459  		// Read the sample's M.
   460  		m, err := binary.ReadUvarint(r)
   461  		if err != nil {
   462  			return nil, err
   463  		}
   464  		mid := ThreadID(m)
   465  
   466  		// Read the sample's P.
   467  		p, err := binary.ReadUvarint(r)
   468  		if err != nil {
   469  			return nil, err
   470  		}
   471  		pid := ProcID(p)
   472  
   473  		// Read the sample's G.
   474  		g, err := binary.ReadUvarint(r)
   475  		if err != nil {
   476  			return nil, err
   477  		}
   478  		goid := GoID(g)
   479  		if g == 0 {
   480  			goid = NoGoroutine
   481  		}
   482  
   483  		// Read the sample's stack.
   484  		s, err := binary.ReadUvarint(r)
   485  		if err != nil {
   486  			return nil, err
   487  		}
   488  
   489  		// Add the sample to the slice.
   490  		samples = append(samples, cpuSample{
   491  			schedCtx: schedCtx{
   492  				M: mid,
   493  				P: pid,
   494  				G: goid,
   495  			},
   496  			time:  Time(ts), // N.B. this is really a "timestamp," not a Time.
   497  			stack: stackID(s),
   498  		})
   499  	}
   500  	return samples, nil
   501  }
   502  
   503  // sync holds the per-generation sync data.
   504  type sync struct {
   505  	freq             frequency
   506  	hasClockSnapshot bool
   507  	snapTime         timestamp
   508  	snapMono         uint64
   509  	snapWall         time.Time
   510  }
   511  
   512  func setSyncBatch(s *sync, b batch, ver version.Version) error {
   513  	if !b.isSyncBatch(ver) {
   514  		return fmt.Errorf("internal error: setSyncBatch called on non-sync batch")
   515  	}
   516  	r := bytes.NewReader(b.data)
   517  	if ver >= version.Go125 {
   518  		hdr, err := r.ReadByte() // Consume the EvSync byte.
   519  		if err != nil || tracev2.EventType(hdr) != tracev2.EvSync {
   520  			return fmt.Errorf("missing sync batch header")
   521  		}
   522  	}
   523  
   524  	lastTs := b.time
   525  	for r.Len() != 0 {
   526  		// Read the header
   527  		ev, err := r.ReadByte()
   528  		if err != nil {
   529  			return err
   530  		}
   531  		et := tracev2.EventType(ev)
   532  		switch {
   533  		case et == tracev2.EvFrequency:
   534  			if s.freq != 0 {
   535  				return fmt.Errorf("found multiple frequency events")
   536  			}
   537  			// Read the frequency. It'll come out as timestamp units per second.
   538  			f, err := binary.ReadUvarint(r)
   539  			if err != nil {
   540  				return err
   541  			}
   542  			// Convert to nanoseconds per timestamp unit.
   543  			s.freq = frequency(1.0 / (float64(f) / 1e9))
   544  		case et == tracev2.EvClockSnapshot && ver >= version.Go125:
   545  			if s.hasClockSnapshot {
   546  				return fmt.Errorf("found multiple clock snapshot events")
   547  			}
   548  			s.hasClockSnapshot = true
   549  			// Read the EvClockSnapshot arguments.
   550  			tdiff, err := binary.ReadUvarint(r)
   551  			if err != nil {
   552  				return err
   553  			}
   554  			lastTs += timestamp(tdiff)
   555  			s.snapTime = lastTs
   556  			mono, err := binary.ReadUvarint(r)
   557  			if err != nil {
   558  				return err
   559  			}
   560  			s.snapMono = mono
   561  			sec, err := binary.ReadUvarint(r)
   562  			if err != nil {
   563  				return err
   564  			}
   565  			nsec, err := binary.ReadUvarint(r)
   566  			if err != nil {
   567  				return err
   568  			}
   569  			// TODO(felixge): In theory we could inject s.snapMono into the time
   570  			// value below to make it comparable. But there is no API for this
   571  			// in the time package right now.
   572  			s.snapWall = time.Unix(int64(sec), int64(nsec))
   573  		default:
   574  			return fmt.Errorf("expected frequency or clock snapshot event, got %d", ev)
   575  		}
   576  	}
   577  	return nil
   578  }
   579  
   580  // addExperimentalBatch takes an experimental batch and adds it to the list of experimental
   581  // batches for the experiment its a part of.
   582  func addExperimentalBatch(expBatches map[tracev2.Experiment][]ExperimentalBatch, b batch) error {
   583  	if b.exp == tracev2.NoExperiment {
   584  		return fmt.Errorf("internal error: addExperimentalBatch called on non-experimental batch")
   585  	}
   586  	expBatches[b.exp] = append(expBatches[b.exp], ExperimentalBatch{
   587  		Thread: b.m,
   588  		Data:   b.data,
   589  	})
   590  	return nil
   591  }
   592  
   593  func fixUpCPUSamples(samples []cpuSample, freq frequency) {
   594  	// Fix up the CPU sample timestamps.
   595  	for i := range samples {
   596  		s := &samples[i]
   597  		s.time = freq.mul(timestamp(s.time))
   598  	}
   599  	// Sort the CPU samples.
   600  	slices.SortFunc(samples, func(a, b cpuSample) int {
   601  		return cmp.Compare(a.time, b.time)
   602  	})
   603  }
   604  

View as plain text