Skip to content

Commit

Permalink
Merge pull request #4564 from parca-dev/ephemeral-metastore
Browse files Browse the repository at this point in the history
Make symbolizer read-time
  • Loading branch information
brancz authored Apr 29, 2024
2 parents b9faf0a + 3301b57 commit 6f56ac9
Show file tree
Hide file tree
Showing 60 changed files with 22,112 additions and 24,204 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ Flags:
--debuginfod-http-request-timeout=5m
Timeout duration for HTTP request to upstream
debuginfod server. Defaults to 5m
--metastore="badger" Which metastore implementation to use
--profile-share-server="api.pprof.me:443"
gRPC address to send share profile requests to.
--store-address=STRING gRPC address to send profiles and symbols to.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/cespare/xxhash/v2 v2.3.0
github.com/common-nighthawk/go-figure v0.0.0-20210622060536-734e95fb86be
github.com/dennwc/varint v1.0.0
github.com/dgraph-io/badger/v4 v4.2.0
github.com/felixge/fgprof v0.9.4
github.com/fsnotify/fsnotify v1.7.0
Expand Down Expand Up @@ -103,7 +104,6 @@ require (
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
github.com/coreos/pkg v0.0.0-20230601102743-20bbbf26f4d8 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dennwc/varint v1.0.0 // indirect
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140 // indirect
Expand Down
41 changes: 41 additions & 0 deletions pkg/badgerlogger/badgerlogger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2024 The Parca Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package badgerlogger

import (
"fmt"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
)

type BadgerLogger struct {
Logger log.Logger
}

func (l *BadgerLogger) Errorf(f string, v ...interface{}) {
level.Error(l.Logger).Log("msg", fmt.Sprintf(f, v...))
}

func (l *BadgerLogger) Warningf(f string, v ...interface{}) {
level.Warn(l.Logger).Log("msg", fmt.Sprintf(f, v...))
}

func (l *BadgerLogger) Infof(f string, v ...interface{}) {
level.Info(l.Logger).Log("msg", fmt.Sprintf(f, v...))
}

func (l *BadgerLogger) Debugf(f string, v ...interface{}) {
level.Debug(l.Logger).Log("msg", fmt.Sprintf(f, v...))
}
108 changes: 108 additions & 0 deletions pkg/demangle/demangle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2024 The Parca Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package demangle

import (
"fmt"
"unsafe"

"github.com/ianlancetaylor/demangle"
)

// Demangler demangles GCC/LLVM C++ and Rust symbol names.
//
// Demangling is the inverse process of mangling (encoding of each unique
// function and parameter list combination into a unique name for the linker).
// With mangling the linker can tell the difference between overloaded functions
// (they have the same name in the source code but different parameter lists).
type Demangler struct {
options []demangle.Option
}

var (
DemangleOptions = []string{
"no_params",
"no_template_params",
"no_clones",
"no_rust",
"verbose",
"llvm_style",
}
demangleOptionMappings = map[string]demangle.Option{
DemangleOptions[0]: demangle.NoParams,
DemangleOptions[1]: demangle.NoTemplateParams,
DemangleOptions[2]: demangle.NoClones,
DemangleOptions[3]: demangle.NoRust,
DemangleOptions[4]: demangle.Verbose,
DemangleOptions[5]: demangle.LLVMStyle,
}
)

func stringsToDemanglerOptions(stringOptions []string) ([]demangle.Option, error) {
res := []demangle.Option{}

for _, str := range stringOptions {
opt, ok := demangleOptionMappings[str]
if !ok {
return nil, fmt.Errorf("unknown demangle option %q", opt)
}
res = append(res, opt)
}

return res, nil
}

func MustNewDefaultDemangler() Demangler {
d, err := NewDemangler()
if err != nil {
panic(err)
}
return d
}

func NewDefaultDemangler() (Demangler, error) {
return NewDemangler("no_params", "no_template_params")
}

// NewDemangler creates a new Demangler with a given demangler options.
func NewDemangler(options ...string) (Demangler, error) {
demanglerOptions, err := stringsToDemanglerOptions(options)
if err != nil {
return Demangler{}, err
}

return newDemangler(demanglerOptions), nil
}

// newDemangler creates a new Demangler with a given demangler options.
func newDemangler(options []demangle.Option) Demangler {
return Demangler{
options: options,
}
}

// Demangle updates the function names in a profile demangling C++ and
// Rust names, simplified according to demanglerMode. If force is set,
// overwrite any names that appear already demangled.
// A modified version of pprof demangler.
func (d Demangler) Demangle(fn []byte) string {
if demangled := demangle.Filter(unsafeString(fn), d.options...); demangled != unsafeString(fn) {
return demangled
}
return string(fn)
}

func unsafeString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}
66 changes: 66 additions & 0 deletions pkg/demangle/rust_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2024 The Parca Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package demangle

import (
"testing"

"github.com/ianlancetaylor/demangle"
"github.com/stretchr/testify/require"
)

func TestDemangleRemoveRustTypeParameters(t *testing.T) {
t.Parallel()

cases := []struct {
name string
mangledName string
expected string
}{
{
name: "first",
mangledName: "_RNvXs0_NtNtNtNtCscFZsgO8y1ob_6timely8dataflow9operators7generic11builder_rawINtB5_12OperatorCoreNtNtCs3rN31ftiYwT_7mz_repr9timestamp9TimestampNCINvMNtB7_10builder_rcINtB2m_15OperatorBuilderINtNtNtBb_6scopes5child5ChildIB32_IB32_IB32_INtNtBd_6worker6WorkerNtNtNtCsaCQYdX5e1rn_20timely_communication9allocator7generic7GenericEB1w_EB1w_EB1w_EB1w_EE16build_rescheduleNCINvB2l_5buildNCINvXNtB7_8operatorINtNtBb_6stream10StreamCoreB31_INtNtCs9Pxm3sdlyVG_5alloc3vec3VecTTTNtNtB1A_3row3RowyEIB6U_B7t_EEB1w_xEEEINtB6a_8OperatorB31_B6T_E14unary_frontierIB6U_INtNtB6Y_2rc2RcINtNtNtNtCs3pxblAp50HC_21differential_dataflow5trace15implementations3ord11OrdValBatchB7s_B7L_B1w_xjINtNtCs2MRLV2eX2T0_16timely_container11columnation11TimelyStackB7s_EIBaG_B7L_EEEENCINvXs1_NtNtNtB9c_9operators7arrange11arrangementINtNtB9c_10collection10CollectionB31_B7r_xEINtBc7_7ArrangeB31_B7s_B7L_xE12arrange_coreINtNtNtBb_8channels4pact12ExchangeCoreB6T_B7q_NCINvBc3_13arrange_namedINtNtB98_12spine_fueled5SpineB8O_EE0EBfi_E0NCNCBc0_00Bea_E0NCNCB66_00E0NCNCB5R_00E0ENtNtBd_10scheduling8Schedule8scheduleCs4ogIrgXwtlZ_8clusterd",
expected: "<timely::dataflow::operators::generic::builder_raw::OperatorCore<> as timely::scheduling::Schedule>::schedule",
},
{
name: "second",
mangledName: "_RNCINvMNtNtNtNtCs5myfTy8mnaF_6timely8dataflow9operators7generic10builder_rcINtB5_15OperatorBuilderINtNtNtBb_6scopes5child5ChildIB1z_INtNtBd_6worker6WorkerNtNtNtCsbo5udLplCaV_20timely_communication9allocator7generic7GenericENtNtCslnPiKci8RgF_7mz_repr9timestamp9TimestampEB3z_EE16build_rescheduleNCINvB4_5buildNCINvXNtB7_8operatorINtNtBb_6stream10StreamCoreB1y_INtNtCsfohDMHpnFpV_5alloc3vec3VecTTNtNtB3D_3row3RowB6k_EB3z_xEEEINtB52_8OperatorB1y_B5L_E14unary_frontierIB5M_INtNtB5Q_2rc2RcINtNtNtNtCsaEm0OTy3LfN_21differential_dataflow5trace15implementations3ord11OrdValBatchB6k_B6k_B3z_xjINtNtCsicJTUUNBAMQ_16timely_container11columnation11TimelyStackB6k_EB9o_EEENCINvXs1_NtNtNtB7V_9operators7arrange11arrangementINtNtB7V_10collection10CollectionB1y_B6j_xEINtBaK_7ArrangeB1y_B6k_B6k_xE12arrange_coreINtNtNtBb_8channels4pact12ExchangeCoreB5L_B6i_NCINvBaG_13arrange_namedINtNtB7R_12spine_fueled5SpineB7x_EE0EBdV_E0NCNCBaD_00BcN_E0NCNCB4Y_00E0NCNCB4K_00E0Cse28fqe15ASj_8clusterd",
expected: "<timely::dataflow::operators::generic::builder_rc::OperatorBuilder<>>::build_reschedule::<>::{closure#0}",
},
}

for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
t.Parallel()

d := newDemangler([]demangle.Option{demangle.NoParams, demangle.NoTemplateParams})
demangledName := d.Demangle([]byte(c.mangledName))

require.Equal(t, c.expected, demangledName)
})
}
}

func BenchmarkDemangleRemoveRustTypeParameters(b *testing.B) {
mangledName := "_RNvXs0_NtNtNtNtCscFZsgO8y1ob_6timely8dataflow9operators7generic11builder_rawINtB5_12OperatorCoreNtNtCs3rN31ftiYwT_7mz_repr9timestamp9TimestampNCINvMNtB7_10builder_rcINtB2m_15OperatorBuilderINtNtNtBb_6scopes5child5ChildIB32_IB32_IB32_INtNtBd_6worker6WorkerNtNtNtCsaCQYdX5e1rn_20timely_communication9allocator7generic7GenericEB1w_EB1w_EB1w_EB1w_EE16build_rescheduleNCINvB2l_5buildNCINvXNtB7_8operatorINtNtBb_6stream10StreamCoreB31_INtNtCs9Pxm3sdlyVG_5alloc3vec3VecTTTNtNtB1A_3row3RowyEIB6U_B7t_EEB1w_xEEEINtB6a_8OperatorB31_B6T_E14unary_frontierIB6U_INtNtB6Y_2rc2RcINtNtNtNtCs3pxblAp50HC_21differential_dataflow5trace15implementations3ord11OrdValBatchB7s_B7L_B1w_xjINtNtCs2MRLV2eX2T0_16timely_container11columnation11TimelyStackB7s_EIBaG_B7L_EEEENCINvXs1_NtNtNtB9c_9operators7arrange11arrangementINtNtB9c_10collection10CollectionB31_B7r_xEINtBc7_7ArrangeB31_B7s_B7L_xE12arrange_coreINtNtNtBb_8channels4pact12ExchangeCoreB6T_B7q_NCINvBc3_13arrange_namedINtNtB98_12spine_fueled5SpineB8O_EE0EBfi_E0NCNCBc0_00Bea_E0NCNCB66_00E0NCNCB5R_00E0ENtNtBd_10scheduling8Schedule8scheduleCs4ogIrgXwtlZ_8clusterd"

d := newDemangler([]demangle.Option{demangle.NoParams, demangle.NoTemplateParams})

var dontOptimize string
for i := 0; i < b.N; i++ {
dontOptimize = d.Demangle([]byte(mangledName))
}
_ = dontOptimize
}
77 changes: 77 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,15 @@
package ingester

import (
"bytes"
"context"
"sync"

"github.com/apache/arrow/go/v15/arrow"
"github.com/apache/arrow/go/v15/arrow/array"
"github.com/apache/arrow/go/v15/arrow/memory"
"github.com/go-kit/log"
"github.com/polarsignals/frostdb/dynparquet"

"github.com/parca-dev/parca/pkg/normalizer"
)
Expand All @@ -23,3 +31,72 @@ type Ingester interface {
Ingest(ctx context.Context, req normalizer.NormalizedWriteRawRequest) error
Close() error
}

type Table interface {
Schema() *dynparquet.Schema
InsertRecord(context.Context, arrow.Record) (tx uint64, err error)
}

type TableIngester struct {
logger log.Logger
mem memory.Allocator
table Table
schema *dynparquet.Schema
bufferPool *sync.Pool
}

func NewIngester(
logger log.Logger,
mem memory.Allocator,
table Table,
schema *dynparquet.Schema,
) Ingester {
return TableIngester{
logger: logger,
mem: mem,
table: table,
schema: schema,
bufferPool: &sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
},
}
}

func (ing TableIngester) Close() error { return nil }

func (ing TableIngester) Ingest(ctx context.Context, req normalizer.NormalizedWriteRawRequest) error {
// Read sorted rows into an arrow record
record, err := normalizer.ParquetBufToArrowRecord(ctx, ing.mem, ing.schema, req)
if err != nil {
return err
}
if record == nil {
return nil
}
defer record.Release()

if record.NumRows() == 0 {
return nil
}

for _, col := range record.Columns() {
switch col := col.(type) {
case *array.Dictionary:
// Dictionaries are lazily initialized, we need to do this here
// to make them concurrency safe. This should be solved
// upstream, but for now this is a fix to avoid data races with
// what we have.
col.Dictionary()
default:
// Do nothing
}
}

if _, err := ing.table.InsertRecord(ctx, record); err != nil {
return err
}

return nil
}
Loading

0 comments on commit 6f56ac9

Please sign in to comment.