1  
     2  
     3  
     4  
     5  package trace
     6  
     7  import (
     8  	"bufio"
     9  	"bytes"
    10  	"cmp"
    11  	"encoding/binary"
    12  	"errors"
    13  	"fmt"
    14  	"io"
    15  	"slices"
    16  	"strings"
    17  	"time"
    18  
    19  	"internal/trace/tracev2"
    20  	"internal/trace/version"
    21  )
    22  
    23  
    24  
    25  
    26  
    27  type generation struct {
    28  	gen        uint64
    29  	batches    map[ThreadID][]batch
    30  	batchMs    []ThreadID
    31  	cpuSamples []cpuSample
    32  	minTs      timestamp
    33  	*evTable
    34  }
    35  
    36  
    37  
    38  func readGeneration(r *bufio.Reader, ver version.Version) (*generation, error) {
    39  	if ver < version.Go126 {
    40  		return nil, errors.New("internal error: readGeneration called for <1.26 trace")
    41  	}
    42  	g := &generation{
    43  		evTable: &evTable{
    44  			pcs: make(map[uint64]frame),
    45  		},
    46  		batches: make(map[ThreadID][]batch),
    47  	}
    48  
    49  	
    50  	for {
    51  		b, gen, err := readBatch(r)
    52  		if err == io.EOF {
    53  			if len(g.batches) != 0 {
    54  				return nil, errors.New("incomplete generation found; trace likely truncated")
    55  			}
    56  			return nil, nil 
    57  		}
    58  		if err != nil {
    59  			return nil, err
    60  		}
    61  		if g.gen == 0 {
    62  			
    63  			g.gen = gen
    64  		}
    65  		if b.isEndOfGeneration() {
    66  			break
    67  		}
    68  		if gen == 0 {
    69  			
    70  			return nil, fmt.Errorf("invalid generation number %d", gen)
    71  		}
    72  		if gen != g.gen {
    73  			return nil, fmt.Errorf("broken trace: missing end-of-generation event, or generations are interleaved")
    74  		}
    75  		if g.minTs == 0 || b.time < g.minTs {
    76  			g.minTs = b.time
    77  		}
    78  		if err := processBatch(g, b, ver); err != nil {
    79  			return nil, err
    80  		}
    81  	}
    82  
    83  	
    84  	if g.freq == 0 {
    85  		return nil, fmt.Errorf("no frequency event found")
    86  	}
    87  	if !g.hasClockSnapshot {
    88  		return nil, fmt.Errorf("no clock snapshot event found")
    89  	}
    90  
    91  	
    92  	
    93  	
    94  	
    95  	
    96  
    97  	
    98  	g.stacks.compactify()
    99  	g.strings.compactify()
   100  
   101  	
   102  	if err := validateStackStrings(&g.stacks, &g.strings, g.pcs); err != nil {
   103  		return nil, err
   104  	}
   105  
   106  	
   107  	fixUpCPUSamples(g.cpuSamples, g.freq)
   108  	return g, nil
   109  }
   110  
   111  
   112  
   113  
   114  
   115  
   116  type spilledBatch struct {
   117  	gen uint64
   118  	*batch
   119  }
   120  
   121  
   122  
   123  
   124  
   125  
   126  
   127  
   128  func readGenerationWithSpill(r *bufio.Reader, spill *spilledBatch, ver version.Version) (*generation, *spilledBatch, error) {
   129  	if ver >= version.Go126 {
   130  		return nil, nil, errors.New("internal error: readGenerationWithSpill called for Go 1.26+ trace")
   131  	}
   132  	g := &generation{
   133  		evTable: &evTable{
   134  			pcs: make(map[uint64]frame),
   135  		},
   136  		batches: make(map[ThreadID][]batch),
   137  	}
   138  	
   139  	if spill != nil {
   140  		
   141  		g.gen = spill.gen
   142  		g.minTs = spill.batch.time
   143  		if err := processBatch(g, *spill.batch, ver); err != nil {
   144  			return nil, nil, err
   145  		}
   146  		spill = nil
   147  	}
   148  	
   149  	var spillErr error
   150  	for {
   151  		b, gen, err := readBatch(r)
   152  		if err == io.EOF {
   153  			break
   154  		}
   155  		if err != nil {
   156  			if g.gen != 0 {
   157  				
   158  				
   159  				
   160  				spillErr = err
   161  				break
   162  			}
   163  			return nil, nil, err
   164  		}
   165  		if gen == 0 {
   166  			
   167  			return nil, nil, fmt.Errorf("invalid generation number %d", gen)
   168  		}
   169  		if g.gen == 0 {
   170  			
   171  			g.gen = gen
   172  		}
   173  		if gen == g.gen+1 {
   174  			
   175  			spill = &spilledBatch{gen: gen, batch: &b}
   176  			break
   177  		}
   178  		if gen != g.gen {
   179  			
   180  			
   181  			
   182  			
   183  			
   184  			
   185  			
   186  			return nil, nil, fmt.Errorf("generations out of order")
   187  		}
   188  		if g.minTs == 0 || b.time < g.minTs {
   189  			g.minTs = b.time
   190  		}
   191  		if err := processBatch(g, b, ver); err != nil {
   192  			return nil, nil, err
   193  		}
   194  	}
   195  
   196  	
   197  	if g.freq == 0 {
   198  		return nil, nil, fmt.Errorf("no frequency event found")
   199  	}
   200  	if ver >= version.Go125 && !g.hasClockSnapshot {
   201  		return nil, nil, fmt.Errorf("no clock snapshot event found")
   202  	}
   203  
   204  	
   205  	
   206  	
   207  	
   208  	
   209  
   210  	
   211  	g.stacks.compactify()
   212  	g.strings.compactify()
   213  
   214  	
   215  	if err := validateStackStrings(&g.stacks, &g.strings, g.pcs); err != nil {
   216  		return nil, nil, err
   217  	}
   218  
   219  	
   220  	fixUpCPUSamples(g.cpuSamples, g.freq)
   221  	return g, spill, spillErr
   222  }
   223  
   224  
   225  func processBatch(g *generation, b batch, ver version.Version) error {
   226  	switch {
   227  	case b.isStringsBatch():
   228  		if err := addStrings(&g.strings, b); err != nil {
   229  			return err
   230  		}
   231  	case b.isStacksBatch():
   232  		if err := addStacks(&g.stacks, g.pcs, b); err != nil {
   233  			return err
   234  		}
   235  	case b.isCPUSamplesBatch():
   236  		samples, err := addCPUSamples(g.cpuSamples, b)
   237  		if err != nil {
   238  			return err
   239  		}
   240  		g.cpuSamples = samples
   241  	case b.isSyncBatch(ver):
   242  		if err := setSyncBatch(&g.sync, b, ver); err != nil {
   243  			return err
   244  		}
   245  	case b.exp != tracev2.NoExperiment:
   246  		if g.expBatches == nil {
   247  			g.expBatches = make(map[tracev2.Experiment][]ExperimentalBatch)
   248  		}
   249  		if err := addExperimentalBatch(g.expBatches, b); err != nil {
   250  			return err
   251  		}
   252  	case b.isEndOfGeneration():
   253  		return errors.New("internal error: unexpectedly processing EndOfGeneration; broken trace?")
   254  	default:
   255  		if _, ok := g.batches[b.m]; !ok {
   256  			g.batchMs = append(g.batchMs, b.m)
   257  		}
   258  		g.batches[b.m] = append(g.batches[b.m], b)
   259  	}
   260  	return nil
   261  }
   262  
   263  
   264  
   265  func validateStackStrings(
   266  	stacks *dataTable[stackID, stack],
   267  	strings *dataTable[stringID, string],
   268  	frames map[uint64]frame,
   269  ) error {
   270  	var err error
   271  	stacks.forEach(func(id stackID, stk stack) bool {
   272  		for _, pc := range stk.pcs {
   273  			frame, ok := frames[pc]
   274  			if !ok {
   275  				err = fmt.Errorf("found unknown pc %x for stack %d", pc, id)
   276  				return false
   277  			}
   278  			_, ok = strings.get(frame.funcID)
   279  			if !ok {
   280  				err = fmt.Errorf("found invalid func string ID %d for stack %d", frame.funcID, id)
   281  				return false
   282  			}
   283  			_, ok = strings.get(frame.fileID)
   284  			if !ok {
   285  				err = fmt.Errorf("found invalid file string ID %d for stack %d", frame.fileID, id)
   286  				return false
   287  			}
   288  		}
   289  		return true
   290  	})
   291  	return err
   292  }
   293  
   294  
   295  
   296  
   297  func addStrings(stringTable *dataTable[stringID, string], b batch) error {
   298  	if !b.isStringsBatch() {
   299  		return fmt.Errorf("internal error: addStrings called on non-string batch")
   300  	}
   301  	r := bytes.NewReader(b.data)
   302  	hdr, err := r.ReadByte() 
   303  	if err != nil || tracev2.EventType(hdr) != tracev2.EvStrings {
   304  		return fmt.Errorf("missing strings batch header")
   305  	}
   306  
   307  	var sb strings.Builder
   308  	for r.Len() != 0 {
   309  		
   310  		ev, err := r.ReadByte()
   311  		if err != nil {
   312  			return err
   313  		}
   314  		if tracev2.EventType(ev) != tracev2.EvString {
   315  			return fmt.Errorf("expected string event, got %d", ev)
   316  		}
   317  
   318  		
   319  		id, err := binary.ReadUvarint(r)
   320  		if err != nil {
   321  			return err
   322  		}
   323  
   324  		
   325  		len, err := binary.ReadUvarint(r)
   326  		if err != nil {
   327  			return err
   328  		}
   329  		if len > tracev2.MaxEventTrailerDataSize {
   330  			return fmt.Errorf("invalid string size %d, maximum is %d", len, tracev2.MaxEventTrailerDataSize)
   331  		}
   332  
   333  		
   334  		n, err := io.CopyN(&sb, r, int64(len))
   335  		if n != int64(len) {
   336  			return fmt.Errorf("failed to read full string: read %d but wanted %d", n, len)
   337  		}
   338  		if err != nil {
   339  			return fmt.Errorf("copying string data: %w", err)
   340  		}
   341  
   342  		
   343  		s := sb.String()
   344  		sb.Reset()
   345  		if err := stringTable.insert(stringID(id), s); err != nil {
   346  			return err
   347  		}
   348  	}
   349  	return nil
   350  }
   351  
   352  
   353  
   354  
   355  func addStacks(stackTable *dataTable[stackID, stack], pcs map[uint64]frame, b batch) error {
   356  	if !b.isStacksBatch() {
   357  		return fmt.Errorf("internal error: addStacks called on non-stacks batch")
   358  	}
   359  	r := bytes.NewReader(b.data)
   360  	hdr, err := r.ReadByte() 
   361  	if err != nil || tracev2.EventType(hdr) != tracev2.EvStacks {
   362  		return fmt.Errorf("missing stacks batch header")
   363  	}
   364  
   365  	for r.Len() != 0 {
   366  		
   367  		ev, err := r.ReadByte()
   368  		if err != nil {
   369  			return err
   370  		}
   371  		if tracev2.EventType(ev) != tracev2.EvStack {
   372  			return fmt.Errorf("expected stack event, got %d", ev)
   373  		}
   374  
   375  		
   376  		id, err := binary.ReadUvarint(r)
   377  		if err != nil {
   378  			return err
   379  		}
   380  
   381  		
   382  		nFrames, err := binary.ReadUvarint(r)
   383  		if err != nil {
   384  			return err
   385  		}
   386  		if nFrames > tracev2.MaxFramesPerStack {
   387  			return fmt.Errorf("invalid stack size %d, maximum is %d", nFrames, tracev2.MaxFramesPerStack)
   388  		}
   389  
   390  		
   391  		frames := make([]uint64, 0, nFrames)
   392  		for i := uint64(0); i < nFrames; i++ {
   393  			
   394  			pc, err := binary.ReadUvarint(r)
   395  			if err != nil {
   396  				return fmt.Errorf("reading frame %d's PC for stack %d: %w", i+1, id, err)
   397  			}
   398  			funcID, err := binary.ReadUvarint(r)
   399  			if err != nil {
   400  				return fmt.Errorf("reading frame %d's funcID for stack %d: %w", i+1, id, err)
   401  			}
   402  			fileID, err := binary.ReadUvarint(r)
   403  			if err != nil {
   404  				return fmt.Errorf("reading frame %d's fileID for stack %d: %w", i+1, id, err)
   405  			}
   406  			line, err := binary.ReadUvarint(r)
   407  			if err != nil {
   408  				return fmt.Errorf("reading frame %d's line for stack %d: %w", i+1, id, err)
   409  			}
   410  			frames = append(frames, pc)
   411  
   412  			if _, ok := pcs[pc]; !ok {
   413  				pcs[pc] = frame{
   414  					pc:     pc,
   415  					funcID: stringID(funcID),
   416  					fileID: stringID(fileID),
   417  					line:   line,
   418  				}
   419  			}
   420  		}
   421  
   422  		
   423  		if err := stackTable.insert(stackID(id), stack{pcs: frames}); err != nil {
   424  			return err
   425  		}
   426  	}
   427  	return nil
   428  }
   429  
   430  
   431  
   432  
   433  func addCPUSamples(samples []cpuSample, b batch) ([]cpuSample, error) {
   434  	if !b.isCPUSamplesBatch() {
   435  		return nil, fmt.Errorf("internal error: addCPUSamples called on non-CPU-sample batch")
   436  	}
   437  	r := bytes.NewReader(b.data)
   438  	hdr, err := r.ReadByte() 
   439  	if err != nil || tracev2.EventType(hdr) != tracev2.EvCPUSamples {
   440  		return nil, fmt.Errorf("missing CPU samples batch header")
   441  	}
   442  
   443  	for r.Len() != 0 {
   444  		
   445  		ev, err := r.ReadByte()
   446  		if err != nil {
   447  			return nil, err
   448  		}
   449  		if tracev2.EventType(ev) != tracev2.EvCPUSample {
   450  			return nil, fmt.Errorf("expected CPU sample event, got %d", ev)
   451  		}
   452  
   453  		
   454  		ts, err := binary.ReadUvarint(r)
   455  		if err != nil {
   456  			return nil, err
   457  		}
   458  
   459  		
   460  		m, err := binary.ReadUvarint(r)
   461  		if err != nil {
   462  			return nil, err
   463  		}
   464  		mid := ThreadID(m)
   465  
   466  		
   467  		p, err := binary.ReadUvarint(r)
   468  		if err != nil {
   469  			return nil, err
   470  		}
   471  		pid := ProcID(p)
   472  
   473  		
   474  		g, err := binary.ReadUvarint(r)
   475  		if err != nil {
   476  			return nil, err
   477  		}
   478  		goid := GoID(g)
   479  		if g == 0 {
   480  			goid = NoGoroutine
   481  		}
   482  
   483  		
   484  		s, err := binary.ReadUvarint(r)
   485  		if err != nil {
   486  			return nil, err
   487  		}
   488  
   489  		
   490  		samples = append(samples, cpuSample{
   491  			schedCtx: schedCtx{
   492  				M: mid,
   493  				P: pid,
   494  				G: goid,
   495  			},
   496  			time:  Time(ts), 
   497  			stack: stackID(s),
   498  		})
   499  	}
   500  	return samples, nil
   501  }
   502  
   503  
   504  type sync struct {
   505  	freq             frequency
   506  	hasClockSnapshot bool
   507  	snapTime         timestamp
   508  	snapMono         uint64
   509  	snapWall         time.Time
   510  }
   511  
   512  func setSyncBatch(s *sync, b batch, ver version.Version) error {
   513  	if !b.isSyncBatch(ver) {
   514  		return fmt.Errorf("internal error: setSyncBatch called on non-sync batch")
   515  	}
   516  	r := bytes.NewReader(b.data)
   517  	if ver >= version.Go125 {
   518  		hdr, err := r.ReadByte() 
   519  		if err != nil || tracev2.EventType(hdr) != tracev2.EvSync {
   520  			return fmt.Errorf("missing sync batch header")
   521  		}
   522  	}
   523  
   524  	lastTs := b.time
   525  	for r.Len() != 0 {
   526  		
   527  		ev, err := r.ReadByte()
   528  		if err != nil {
   529  			return err
   530  		}
   531  		et := tracev2.EventType(ev)
   532  		switch {
   533  		case et == tracev2.EvFrequency:
   534  			if s.freq != 0 {
   535  				return fmt.Errorf("found multiple frequency events")
   536  			}
   537  			
   538  			f, err := binary.ReadUvarint(r)
   539  			if err != nil {
   540  				return err
   541  			}
   542  			
   543  			s.freq = frequency(1.0 / (float64(f) / 1e9))
   544  		case et == tracev2.EvClockSnapshot && ver >= version.Go125:
   545  			if s.hasClockSnapshot {
   546  				return fmt.Errorf("found multiple clock snapshot events")
   547  			}
   548  			s.hasClockSnapshot = true
   549  			
   550  			tdiff, err := binary.ReadUvarint(r)
   551  			if err != nil {
   552  				return err
   553  			}
   554  			lastTs += timestamp(tdiff)
   555  			s.snapTime = lastTs
   556  			mono, err := binary.ReadUvarint(r)
   557  			if err != nil {
   558  				return err
   559  			}
   560  			s.snapMono = mono
   561  			sec, err := binary.ReadUvarint(r)
   562  			if err != nil {
   563  				return err
   564  			}
   565  			nsec, err := binary.ReadUvarint(r)
   566  			if err != nil {
   567  				return err
   568  			}
   569  			
   570  			
   571  			
   572  			s.snapWall = time.Unix(int64(sec), int64(nsec))
   573  		default:
   574  			return fmt.Errorf("expected frequency or clock snapshot event, got %d", ev)
   575  		}
   576  	}
   577  	return nil
   578  }
   579  
   580  
   581  
   582  func addExperimentalBatch(expBatches map[tracev2.Experiment][]ExperimentalBatch, b batch) error {
   583  	if b.exp == tracev2.NoExperiment {
   584  		return fmt.Errorf("internal error: addExperimentalBatch called on non-experimental batch")
   585  	}
   586  	expBatches[b.exp] = append(expBatches[b.exp], ExperimentalBatch{
   587  		Thread: b.m,
   588  		Data:   b.data,
   589  	})
   590  	return nil
   591  }
   592  
   593  func fixUpCPUSamples(samples []cpuSample, freq frequency) {
   594  	
   595  	for i := range samples {
   596  		s := &samples[i]
   597  		s.time = freq.mul(timestamp(s.time))
   598  	}
   599  	
   600  	slices.SortFunc(samples, func(a, b cpuSample) int {
   601  		return cmp.Compare(a.time, b.time)
   602  	})
   603  }
   604  
View as plain text