diff --git a/cmd/tsbs_generate_queries/databases/clickhouse/common.go b/cmd/tsbs_generate_queries/databases/clickhouse/common.go index b4f29b9f0..0705a95a0 100644 --- a/cmd/tsbs_generate_queries/databases/clickhouse/common.go +++ b/cmd/tsbs_generate_queries/databases/clickhouse/common.go @@ -3,6 +3,7 @@ package clickhouse import ( "time" + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/dea" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils" "github.com/timescale/tsbs/pkg/query" @@ -10,7 +11,8 @@ import ( // BaseGenerator contains settings specific for ClickHouse. type BaseGenerator struct { - UseTags bool + UseTags bool + PropertyAccessMode string } // GenerateEmptyQuery returns an empty query.ClickHouse. @@ -42,3 +44,22 @@ func (g *BaseGenerator) NewDevops(start, end time.Time, scale int) (utils.QueryG return devops, nil } + +// NewDEA creates a new devops use case query generator. +func (g *BaseGenerator) NewDEA(start, end time.Time, scale int) (utils.QueryGenerator, error) { + core, err := dea.NewCore(start, end, scale) + + if err != nil { + return nil, err + } + + devops := &DEA{ + BaseGenerator: g, + Core: core, + } + + return devops, nil +} + +// ClickHouse understands and can compare time presented as strings of this format +const clickhouseTimeStringFormat = "2006-01-02 15:04:05" diff --git a/cmd/tsbs_generate_queries/databases/clickhouse/dea.go b/cmd/tsbs_generate_queries/databases/clickhouse/dea.go new file mode 100644 index 000000000..a82d58ce6 --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/clickhouse/dea.go @@ -0,0 +1,324 @@ +package clickhouse + +import ( + "fmt" + "math/rand" + "strings" + "time" + + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/dea" + "github.com/timescale/tsbs/pkg/query" +) + +// Devops produces ClickHouse-specific queries for all the devops query types. +type DEA struct { + *dea.Core + *BaseGenerator +} + +const ( + Json string = "json" + Map string = "map" +) + +func Must[T any](v T, err error) T { + if err != nil { + panic(err) + } + return v +} + +func (d *DEA) getJSONProperty(key string) string { + return fmt.Sprintf("simpleJSONExtractRaw(propertiesJson, '%s')", key) +} + +func (d *DEA) getMapProperty(key string) string { + return fmt.Sprintf("propertiesMap['%s']", key) +} + +func (d *DEA) getProperty(key string) string { + switch d.PropertyAccessMode { + case Json: + return d.getJSONProperty(key) + case Map: + return d.getMapProperty(key) + default: + panic(fmt.Sprintf("unknown access mode %s", d.PropertyAccessMode)) + } +} + +func (d *DEA) getPropertyAlias(property string) string { + return fmt.Sprintf("a_%s", property) +} + +func (d *DEA) getAliasedProperties(keys []string) []string { + aliasedProps := make([]string, len(keys)) + + for i := range keys { + aliasedProps[i] = fmt.Sprintf("%s as %s", d.getProperty(keys[i]), d.getPropertyAlias(keys[i])) + } + + return aliasedProps +} + +func (d *DEA) getFunnelStepSelectStatements(nSteps int) []string { + var statements []string + + for i := range nSteps { + statements = append(statements, fmt.Sprintf("countIf(steps = %d) AS step_%d", i+1, i+1)) + } + + return statements +} + +/* +IF(name = 'navigation' AND simpleJSONExtractString(properties, 'url') LIKE 'app.smartlook.com/sign/up%', 1, 0) AS step_0, +IF(step_0 = 1, timestamp, NULL) AS latest_0, +IF(name = 'signup_step_1', 1, 0) AS step_1, +IF(step_1 = 1, timestamp, NULL) AS latest_1, +IF(name = 'signup_step_2', 1, 0) AS step_2, +IF(step_2 = 1, timestamp, NULL) AS latest_2 +*/ +func (d *DEA) getFunnelFiltersStatements(nSteps int) []string { + var statements []string + + for i := range nSteps { + stepEvent := d.GetRandomEvent() + eventProperty := Must(d.GetRandomProperties(1, dea.AvailableStrProperties))[0] + eventValue := Must(d.GetRandomPropertyValue(eventProperty)) + + statements = append(statements, fmt.Sprintf("IF (name = '%s' AND %s LIKE '%s', 1, 0)", + stepEvent, + eventProperty, + eventValue)) + statements = append(statements, fmt.Sprintf("IF (step_%d = 1, timestamp, NULL) as latest_%d", + i, i)) + } + + return statements +} + +/* +step_0 = 1 +OR step_1 = 1 +OR step_2 = 1 +*/ +func (d *DEA) getFunnelConversionStatement(nSteps int) string { + var statements []string + + for i := range nSteps - 1 { + statements = append(statements, fmt.Sprintf("step_%d = 1", i)) + } + + return strings.Join(statements, " OR ") +} + +func (d *DEA) getCustomStrictFunnelSQL(nSteps int) string { + interval := d.Interval.MustRandWindow(time.Hour * 24) // TODO: Update to some other interval + + sql := fmt.Sprintf(` + SELECT %s + FROM ( + SELECT user_id, + steps + FROM ( + SELECT user_id, + steps, + max(steps) OVER (PARTITION BY user_id) AS max_steps, + FROM ( + SELECT *, + IF( + latest_0 <= latest_1 + AND latest_1 <= latest_0 + INTERVAL 14 DAY + AND latest_1 <= latest_2 + AND latest_2 <= latest_1 + INTERVAL 14 DAY, + 3, + IF( + latest_0 <= latest_1 + AND latest_1 <= latest_0 + INTERVAL 14 DAY, + 2, + 1 + ) + ) AS steps, + IF( + isNotNull(latest_1) + AND latest_1 <= latest_0 + INTERVAL 14 DAY, + dateDiff( + 'second', + toDateTime(latest_0), + toDateTime(latest_1) + ), + NULL + ) AS step_1_conversion_time, + IF( + isNotNull(latest_2) + AND latest_2 <= latest_1 + INTERVAL 14 DAY, + dateDiff( + 'second', + toDateTime(latest_1), + toDateTime(latest_2) + ), + NULL + ) AS step_2_conversion_time + FROM ( + SELECT user_id, + eventDate, + step_0, + min(latest_0) OVER ( + PARTITION BY user_id + ORDER BY eventDate DESC ROWS BETWEEN UNBOUNDED PRECEDING AND 0 PRECEDING + ) latest_0, + step_1, + min(latest_1) OVER ( + PARTITION BY user_id + ORDER BY eventDate DESC ROWS BETWEEN UNBOUNDED PRECEDING AND 0 PRECEDING + ) latest_1, + step_2, + min(latest_2) OVER ( + PARTITION BY user_id + ORDER BY eventDate DESC ROWS BETWEEN UNBOUNDED PRECEDING AND 0 PRECEDING + ) latest_2 + FROM ( + SELECT user_id, + timestamp AS eventDate, + name, + %s + FROM ( + SELECT user_id, id, properties, name, timestamp + FROM %s e + WHERE timestamp >= '%s' AND timestamp <= '%s' + ) WHERE ( + %s + ) + ) + ) + WHERE step_0 = 1 + ) + ) + GROUP BY user_id, steps + HAVING steps = max_steps + ) + `, strings.Join(d.getFunnelStepSelectStatements(nSteps), ",\n"), + strings.Join(d.getFunnelFiltersStatements(nSteps), ",\n"), + dea.TableName, + interval.Start().Format(clickhouseTimeStringFormat), + interval.End().Format(clickhouseTimeStringFormat), + d.getFunnelConversionStatement(nSteps)) + + return sql +} + +func (d *DEA) getInternalStrictFunnelSQL(nSteps int) string { + var steps []string + observedEvents := make(map[string]bool) + + for range nSteps { + event := d.GetRandomEvent() + observedEvents[event] = true + + property := Must(d.GetRandomProperties(1, dea.AvailableStrProperties))[0] + propertyValue := Must(d.GetRandomPropertyValue(property)) + + stepStatement := fmt.Sprintf("name = '%s' AND %s = '%s'", + event, property, propertyValue) + steps = append(steps, stepStatement) + } + + var observedEventsStatements []string + for event := range observedEvents { + observedEventsStatements = append(observedEventsStatements, fmt.Sprintf("'%s'", event)) + } + + sql := fmt.Sprintf(`SELECT + level, + count() AS c + FROM ( + SELECT + user_id, + windowFunnel(6048000000000000)(timestamp, + %s + ) AS level + FROM + %s + WHERE ( + name IN (%s) + ) + GROUP BY user_id + ) + GROUP BY level + ORDER BY level ASC;`, + strings.Join(steps, ",\n"), + dea.TableName, + strings.Join(observedEventsStatements, ", "), + ) + + return sql +} + +func (d *DEA) NestedWhere(qi query.Query) { + interval := d.Interval.MustRandWindow(time.Hour * 24) // TODO: Update to some other interval + randomProperties, err := d.GetRandomProperties(4, dea.AvailableStrProperties) + panicIfErr(err) + + selectClauses := strings.Join(d.getAliasedProperties(randomProperties), ", ") + + whereClauses := fmt.Sprintf("%s = '%s' AND %s != '' AND (%s LIKE '%%%s%%' OR %s LIKE '%%%s') "+ + "AND (createdAt >= '%s') AND (createdAt <= '%s')", + d.getPropertyAlias(randomProperties[0]), + Must(d.GetRandomPropertyValue(randomProperties[0])), + d.getPropertyAlias(randomProperties[1]), + d.getPropertyAlias(randomProperties[2]), + Must(d.GetRandomPropertyValue(randomProperties[2]))[2:6], + d.getPropertyAlias(randomProperties[3]), + Must(d.GetRandomPropertyValue(randomProperties[3]))[2:], + interval.Start().Format(clickhouseTimeStringFormat), + interval.End().Format(clickhouseTimeStringFormat)) + + sql := fmt.Sprintf(` + SELECT toStartOfHour(created_at) AS hour, %s FROM %s WHERE %s + `, selectClauses, dea.TableName, whereClauses) + + humanLabel := "ClickHouse nested and dynamic" + humanDesc := fmt.Sprintf("%s: nested where query with dynamic data access", humanLabel) + d.fillInQuery(qi, humanLabel, humanDesc, dea.TableName, sql) +} + +func (d *DEA) EventHistogram(qi query.Query) { + interval := d.Interval.MustRandWindow(time.Hour * 24) // TODO: Update to some other interval + + sql := fmt.Sprintf(` + SELECT + toDate(timestamp, 'Europe/Prague') as day, + uniq(user_id) as visitors, + count() as cnt + FROM %s + WHERE + name LIKE '%s' + AND timestamp >= now() - INTERVAL 30 DAY + GROUP BY + day + ORDER BY + day WITH FILL + FROM toDate(%s) + TO toDate(%s) + `, + dea.TableName, + d.GetRandomEvent(), + interval.Start().Format(clickhouseTimeStringFormat), + interval.End().Format(clickhouseTimeStringFormat)) + + d.fillInQuery(qi, "ClickHouse Event histogram", "Clickhouse Event histogram", dea.TableName, sql) +} + +func (d *DEA) Funnel(qi query.Query, inOrder bool, minSteps, maxSteps int) { + if maxSteps < minSteps { + panic(fmt.Errorf("maximum steps (%d) is less than minimum steps (%d)", maxSteps, minSteps)) + } + + nSteps := rand.Intn(maxSteps-minSteps) + minSteps + 1 + // selectStatement := + + sql := d.getInternalStrictFunnelSQL(nSteps) + + d.fillInQuery(qi, "ClickHouse Event histogram", "Clickhouse Event histogram", dea.TableName, sql) +} diff --git a/cmd/tsbs_generate_queries/databases/clickhouse/devops.go b/cmd/tsbs_generate_queries/databases/clickhouse/devops.go index a10144bde..dd989c595 100644 --- a/cmd/tsbs_generate_queries/databases/clickhouse/devops.go +++ b/cmd/tsbs_generate_queries/databases/clickhouse/devops.go @@ -67,18 +67,17 @@ func (d *Devops) getSelectClausesAggMetrics(aggregateFunction string, metrics [] return selectAggregateClauses } -// ClickHouse understands and can compare time presented as strings of this format -const clickhouseTimeStringFormat = "2006-01-02 15:04:05" - // MaxAllCPU selects the MAX of all metrics under 'cpu' per hour for nhosts hosts, // e.g. in pseudo-SQL: // // SELECT MAX(metric1), ..., MAX(metricN) // FROM cpu // WHERE -// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N') -// AND time >= '$HOUR_START' -// AND time < '$HOUR_END' +// +// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N') +// AND time >= '$HOUR_START' +// AND time < '$HOUR_END' +// // GROUP BY hour // ORDER BY hour // @@ -290,9 +289,11 @@ func (d *Devops) LastPointPerHost(qi query.Query) { // SELECT minute, max(metric1), ..., max(metricN) // FROM cpu // WHERE -// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N') -// AND time >= '$HOUR_START' -// AND time < '$HOUR_END' +// +// (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N') +// AND time >= '$HOUR_START' +// AND time < '$HOUR_END' +// // GROUP BY minute // ORDER BY minute ASC // @@ -305,8 +306,7 @@ func (d *Devops) LastPointPerHost(qi query.Query) { // single-groupby-5-8-1 func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) { interval := d.Interval.MustRandWindow(timeRange) - metrics, err := devops.GetCPUMetricsSlice(numMetrics) - panicIfErr(err) + metrics := Must(devops.GetCPUMetricsSlice(numMetrics)) selectClauses := d.getSelectClausesAggMetrics("max", metrics) sql := fmt.Sprintf(` diff --git a/cmd/tsbs_generate_queries/main.go b/cmd/tsbs_generate_queries/main.go index b5970f42c..6c69cf32c 100644 --- a/cmd/tsbs_generate_queries/main.go +++ b/cmd/tsbs_generate_queries/main.go @@ -4,12 +4,14 @@ package main import ( "fmt" - "github.com/timescale/tsbs/pkg/query/config" "os" "time" + "github.com/timescale/tsbs/pkg/query/config" + "github.com/blagojts/viper" "github.com/spf13/pflag" + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/dea" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/iot" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils" @@ -18,6 +20,14 @@ import ( ) var useCaseMatrix = map[string]map[string]utils.QueryFillerMaker{ + "dea": { + dea.LabelNestedWhere: dea.NewNestedWhere(), + dea.LabelEventHistogram: dea.NewEventHistogram(), + dea.LabelFunnel + "-strict-long": dea.NewFunnel(true, 8, 12), + // dea.LabelFunnel + "-any-long": dea.NewFunnel(false, 8, 12), + dea.LabelFunnel + "-strict-short": dea.NewFunnel(true, 2, 4), + // dea.LabelFunnel + "-any-short": dea.NewFunnel(false, 2, 4), + }, "devops": { devops.LabelSingleGroupby + "-1-1-1": devops.NewSingleGroupby(1, 1, 1), devops.LabelSingleGroupby + "-1-1-12": devops.NewSingleGroupby(1, 1, 12), diff --git a/cmd/tsbs_generate_queries/uses/dea/common.go b/cmd/tsbs_generate_queries/uses/dea/common.go new file mode 100644 index 000000000..e57cd2bc0 --- /dev/null +++ b/cmd/tsbs_generate_queries/uses/dea/common.go @@ -0,0 +1,100 @@ +package dea + +import ( + "fmt" + "math/rand" + "time" + + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/common" + "github.com/timescale/tsbs/pkg/query" +) + +const ( + TableName = "events" + + LabelNestedWhere = "nested-where" + LabelEventHistogram = "event-histogram" + LabelFunnel = "funnel" +) + +const ( + UrlProperty = "url" + EmailProperty = "email" + NameProperty = "name" + PhoneProperty = "phone" + FavoriteNumberProperty = "age" +) + +const ( + NavigationEvent = "navigation" + ClickEvent = "click" + CheckoutEvent = "cart_checkout" + TypingEvent = "typing" +) + +var ( + AvailableEvents = []string{NavigationEvent, ClickEvent, CheckoutEvent, TypingEvent} + AvailableStrProperties = []string{UrlProperty, EmailProperty, NameProperty, PhoneProperty} + AvailableStrPropertyValues = map[string][]string{ + UrlProperty: {"https://www.seznam.cz"}, + EmailProperty: {"hello@world.com"}, + NameProperty: {"john doe"}, + PhoneProperty: {"+420602303222"}, + } +) + +// Core is the common component of all generators for all systems. +type Core struct { + *common.Core +} + +// NewCore returns a new Core for the given time range and cardinality +func NewCore(start, end time.Time, scale int) (*Core, error) { + c, err := common.NewCore(start, end, scale) + return &Core{Core: c}, err +} + +type NestedWhereFiller interface { + NestedWhere(qi query.Query) +} + +type EventHistogramFiller interface { + EventHistogram(query.Query) +} + +type FunnelFiller interface { + Funnel(qi query.Query, inOrder bool, minSteps int, maxSteps int) +} + +func (d *Core) GetRandomProperties(n int, properties []string) ([]string, error) { + if n < 1 { + return nil, fmt.Errorf("number of properties cannot be < 1; got %d", n) + } + if n > len(properties) { + return nil, fmt.Errorf("number of properties (%d) larger than total properties (%d)", n, len(properties)) + } + + randomNumbers, err := common.GetRandomSubsetPerm(n, len(properties)) + if err != nil { + return nil, err + } + + selectedProperties := make([]string, n) + for i, n := range randomNumbers { + selectedProperties[i] = properties[n] + } + + return selectedProperties, nil +} + +func (d *Core) GetRandomPropertyValue(property string) (string, error) { + if values, ok := AvailableStrPropertyValues[property]; ok { + return values[rand.Intn(len(values))], nil + } else { + return "", fmt.Errorf("no values for %s", property) + } +} + +func (d *Core) GetRandomEvent() string { + return AvailableEvents[rand.Intn(len(AvailableEvents))] +} diff --git a/cmd/tsbs_generate_queries/uses/dea/event_histogram.go b/cmd/tsbs_generate_queries/uses/dea/event_histogram.go new file mode 100644 index 000000000..69a682e8f --- /dev/null +++ b/cmd/tsbs_generate_queries/uses/dea/event_histogram.go @@ -0,0 +1,29 @@ +package dea + +import ( + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/common" + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils" + "github.com/timescale/tsbs/pkg/query" +) + +type EventHistogram struct { + core utils.QueryGenerator +} + +func NewEventHistogram() utils.QueryFillerMaker { + return func(core utils.QueryGenerator) utils.QueryFiller { + return &EventHistogram{ + core: core, + } + } +} + +func (i *EventHistogram) Fill(q query.Query) query.Query { + fc, ok := i.core.(EventHistogramFiller) + if !ok { + common.PanicUnimplementedQuery(i.core) + } + fc.EventHistogram(q) + + return q +} diff --git a/cmd/tsbs_generate_queries/uses/dea/funnel.go b/cmd/tsbs_generate_queries/uses/dea/funnel.go new file mode 100644 index 000000000..754ed350e --- /dev/null +++ b/cmd/tsbs_generate_queries/uses/dea/funnel.go @@ -0,0 +1,35 @@ +package dea + +import ( + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/common" + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils" + "github.com/timescale/tsbs/pkg/query" +) + +type Funnel struct { + core utils.QueryGenerator + + inOrder bool + minSteps, maxSteps int +} + +func NewFunnel(inOrder bool, minSteps, maxSteps int) utils.QueryFillerMaker { + return func(core utils.QueryGenerator) utils.QueryFiller { + return &Funnel{ + core: core, + inOrder: inOrder, + minSteps: minSteps, + maxSteps: maxSteps, + } + } +} + +func (f *Funnel) Fill(q query.Query) query.Query { + fc, ok := f.core.(FunnelFiller) + if !ok { + common.PanicUnimplementedQuery(f.core) + } + fc.Funnel(q, f.inOrder, f.minSteps, f.maxSteps) + + return q +} diff --git a/cmd/tsbs_generate_queries/uses/dea/nested_where.go b/cmd/tsbs_generate_queries/uses/dea/nested_where.go new file mode 100644 index 000000000..cd05d2f82 --- /dev/null +++ b/cmd/tsbs_generate_queries/uses/dea/nested_where.go @@ -0,0 +1,31 @@ +package dea + +import ( + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/common" + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils" + "github.com/timescale/tsbs/pkg/query" +) + +// NestedWhere contains info for filling in avg load queries. +type NestedWhere struct { + core utils.QueryGenerator +} + +// NewNestedWhere creates a new avg load query filler. +func NewNestedWhere() utils.QueryFillerMaker { + return func(core utils.QueryGenerator) utils.QueryFiller { + return &NestedWhere{ + core: core, + } + } +} + +// Fill fills in the query.Query with query details. +func (i *NestedWhere) Fill(q query.Query) query.Query { + fc, ok := i.core.(NestedWhereFiller) + if !ok { + common.PanicUnimplementedQuery(i.core) + } + fc.NestedWhere(q) + return q +} diff --git a/cmd/tsbs_run_queries_pinot/main.go b/cmd/tsbs_run_queries_pinot/main.go new file mode 100644 index 000000000..0058164bb --- /dev/null +++ b/cmd/tsbs_run_queries_pinot/main.go @@ -0,0 +1,180 @@ +// tsbs_run_queries_clickhouse speed tests ClickHouse using requests from stdin or file. +// +// It reads encoded Query objects from stdin or file, and makes concurrent requests to the provided ClickHouse endpoint. +// This program has no knowledge of the internals of the endpoint. +package main + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/blagojts/viper" + "github.com/jmoiron/sqlx" + _ "github.com/kshvakov/clickhouse" + "github.com/spf13/pflag" + "github.com/timescale/tsbs/internal/utils" + "github.com/timescale/tsbs/pkg/query" +) + +// Program option vars: +var ( + chConnect string + hostsList []string + user string + password string + + showExplain bool +) + +// Global vars: +var ( + runner *query.BenchmarkRunner +) + +// Parse args: +func init() { + var config query.BenchmarkRunnerConfig + config.AddToFlagSet(pflag.CommandLine) + var hosts string + + pflag.String("additional-params", "sslmode=disable", + "String of additional ClickHouse connection parameters, e.g., 'sslmode=disable'.") + pflag.String("hosts", "localhost", + "Comma separated list of ClickHouse hosts (pass multiple values for sharding reads on a multi-node setup)") + pflag.String("user", "default", "User to connect to ClickHouse as") + pflag.String("password", "", "Password to connect to ClickHouse") + + pflag.Parse() + + err := utils.SetupConfigFile() + + if err != nil { + panic(fmt.Errorf("fatal error config file: %s", err)) + } + + if err := viper.Unmarshal(&config); err != nil { + panic(fmt.Errorf("unable to decode config: %s", err)) + } + + chConnect = viper.GetString("additional-params") + hosts = viper.GetString("hosts") + user = viper.GetString("user") + password = viper.GetString("password") + + // Parse comma separated string of hosts and put in a slice (for multi-node setups) + for _, host := range strings.Split(hosts, ",") { + hostsList = append(hostsList, host) + } + + runner = query.NewBenchmarkRunner(config) +} + +func main() { + runner.Run(&query.ClickHousePool, newProcessor) +} + +// Get the connection string for a connection to PostgreSQL. + +// If we're running queries against multiple nodes we need to balance the queries +// across replicas. Each worker is assigned a sequence number -- we'll use that +// to evenly distribute hosts to worker connections +func getConnectString(workerNumber int) string { + // Round robin the host/worker assignment by assigning a host based on workerNumber % totalNumberOfHosts + host := hostsList[workerNumber%len(hostsList)] + + return fmt.Sprintf("tcp://%s:9000?username=%s&password=%s&database=%s", host, user, password, runner.DatabaseName()) +} + +// prettyPrintResponse prints a Query and its response in JSON format with two +// keys: 'query' which has a value of the SQL used to generate the second key +// 'results' which is an array of each row in the return set. +func prettyPrintResponse(rows *sqlx.Rows, q *query.ClickHouse) { + resp := make(map[string]interface{}) + resp["query"] = string(q.SqlQuery) + + results := []map[string]interface{}{} + for rows.Next() { + r := make(map[string]interface{}) + if err := rows.MapScan(r); err != nil { + panic(err) + } + results = append(results, r) + resp["results"] = results + } + + line, err := json.MarshalIndent(resp, "", " ") + if err != nil { + panic(err) + } + + fmt.Println(string(line) + "\n") +} + +type queryExecutorOptions struct { + showExplain bool + debug bool + printResponse bool +} + +// query.Processor interface implementation +type processor struct { + db *sqlx.DB + opts *queryExecutorOptions +} + +// query.Processor interface implementation +func newProcessor() query.Processor { + return &processor{} +} + +// query.Processor interface implementation +func (p *processor) Init(workerNumber int) { + p.db = sqlx.MustConnect("clickhouse", getConnectString(workerNumber)) + p.opts = &queryExecutorOptions{ + // ClickHouse could not do EXPLAIN + showExplain: false, + debug: runner.DebugLevel() > 0, + printResponse: runner.DoPrintResponses(), + } +} + +// query.Processor interface implementation +func (p *processor) ProcessQuery(q query.Query, isWarm bool) ([]*query.Stat, error) { + // No need to run again for EXPLAIN + if isWarm && p.opts.showExplain { + return nil, nil + } + + // Ensure ClickHouse query + chQuery := q.(*query.ClickHouse) + + start := time.Now() + + // SqlQuery is []byte, so cast is needed + sql := string(chQuery.SqlQuery) + + // Main action - run the query + rows, err := p.db.Queryx(sql) + if err != nil { + return nil, err + } + + // Print some extra info if needed + if p.opts.debug { + fmt.Println(sql) + } + if p.opts.printResponse { + prettyPrintResponse(rows, chQuery) + } + + // Finalize the query + rows.Close() + took := float64(time.Since(start).Nanoseconds()) / 1e6 + + stat := query.GetStat() + stat.Init(q.HumanLabelName(), took) + + return []*query.Stat{stat}, err +} diff --git a/config.yaml b/config.yaml new file mode 100644 index 000000000..bde7cdaaf --- /dev/null +++ b/config.yaml @@ -0,0 +1,35 @@ +data-source: + simulator: + debug: 0 + initial-scale: "0" + log-interval: 10s + max-data-points: "0" + max-metric-count: "100" + scale: "1" + seed: 0 + timestamp-end: "2020-01-02T00:00:00Z" + timestamp-start: "2020-01-01T00:00:00Z" + use-case: iot + type: SIMULATOR +loader: + db-specific: + debug: 0 + host: localhost + log-batches: false + password: "smartlook-events" + user: default + runner: + batch-size: "10000" + channel-capacity: "0" + db-name: benchmark + do-abort-on-exist: false + do-create-db: true + do-load: true + flow-control: false + hash-workers: false + insert-intervals: "" + limit: "0" + reporting-period: 10s + seed: 0 + workers: "1" + target: clickhouse diff --git a/internal/inputs/generator_queries.go b/internal/inputs/generator_queries.go index 44a9ec58c..3221d3dcf 100644 --- a/internal/inputs/generator_queries.go +++ b/internal/inputs/generator_queries.go @@ -40,6 +40,10 @@ type IoTGeneratorMaker interface { NewIoT(start, end time.Time, scale int) (queryUtils.QueryGenerator, error) } +type DEAGeneratorMaker interface { + NewDEA(start, end time.Time, scale int) (queryUtils.QueryGenerator, error) +} + // QueryGenerator is a type of Generator for creating queries to test against a // database. The output is specific to the type of database (due to each using // different querying techniques, e.g. SQL or REST), but is consumed by TSBS @@ -196,6 +200,14 @@ func (g *QueryGenerator) getUseCaseGenerator(c *config.QueryGeneratorConfig) (qu } return devopsFactory.NewDevops(g.tsStart, g.tsEnd, scale) + case common.UseCaseDEA: + deaFactory, ok := factory.(DEAGeneratorMaker) + + if !ok { + return nil, fmt.Errorf(errUseCaseNotImplementedFmt, c.Use, c.Format) + } + + return deaFactory.NewDEA(g.tsStart, g.tsEnd, scale) default: return nil, fmt.Errorf(errUnknownUseCaseFmt, c.Use) } diff --git a/pkg/data/usecases/dea/event.go b/pkg/data/usecases/dea/event.go index 62faa2b17..f326f6c0c 100644 --- a/pkg/data/usecases/dea/event.go +++ b/pkg/data/usecases/dea/event.go @@ -12,39 +12,69 @@ import ( "github.com/timescale/tsbs/pkg/publicid" ) +const ( + UrlProperty = "url" + EmailProperty = "email" + NameProperty = "name" + PhoneProperty = "phone" + FavoriteNumberProperty = "age" +) + +const ( + NavigationEvent = "navigation" + ClickEvent = "click" + CheckoutEvent = "cart_checkout" + TypingEvent = "typing" +) + +var ( + AvailableEvents = []string{NavigationEvent, ClickEvent, CheckoutEvent, TypingEvent} + AvailableStrProperties = []string{UrlProperty, EmailProperty, NameProperty, PhoneProperty} + AvailableStrPropertyValues = map[string][]string{ + UrlProperty: {"https://www.seznam.cz"}, + EmailProperty: {"hello@world.com"}, + NameProperty: {"john doe"}, + PhoneProperty: {"+420602303222"}, + } +) + var ( OtelChoices = map[string][]string{ - "telemetry.auto.version": {"1.23.0", "1.23.1", "1.23.2", "1.23.3", "1.23.4"}, - "os.description": {"Linux 5.10.104-linuxkit", "Linux 5.10.105-linuxkit", "Linux 5.10.106-linuxkit", "Linux 5.10.107-linuxkit", "Linux 5.10.108-linuxkit"}, - "process.runtime.description": {"Eclipse Adoptium OpenJDK 64-Bit Server VM 17.0.6+10", "Eclipse Adoptium OpenJDK 64-Bit Server VM 17.0.7+10", "Eclipse Adoptium OpenJDK 64-Bit Server VM 17.0.8+10", "Eclipse Adoptium OpenJDK 64-Bit Server VM 17.0.9+10", "Eclipse Adoptium OpenJDK 64-Bit Server VM 17.0.10+10"}, - "service.name": {"adservice", "bbservice", "ccservice", "ddservice", "eeservice"}, - "service.namespace": {"opentelemetry-demo", "opentelemetry-demo", "opentelemetry-demo", "opentelemetry-demo", "opentelemetry-demo"}, - "telemetry.sdk.version": {"1.23.0", "1.23.1", "1.23.2", "1.23.3", "1.23.4"}, - "process.runtime.version": {"17.0.6+10", "17.0.7+10", "17.0.8+10", "17.0.9+10", "17.0.10+10"}, - "telemetry.sdk.name": {"opentelemetry", "opentelemetry", "opentelemetry", "opentelemetry", "opentelemetry"}, - "host.arch": {"aarch64", "aarch64", "aarch64", "aarch64", "aarch64"}, - "host.name": {"c97f4b793890", "c97f4b793891", "c97f4b793892", "c97f4b793893", "c97f4b793894"}, - "process.executable.path": { - "/opt/1/java/openjdk/bin/java", - "/opt/2/java/openjdk/bin/java", - "/opt/3/java/openjdk/bin/java", - "/opt/4/java/openjdk/bin/java", - "/opt/5/java/openjdk/bin/java", - }, - "process.pid": {"1", "2", "3", "4", "5"}, - "process.runtime.name": {"OpenJDK Runtime Environment", "Oracle JDK Runtime Environment", "Eclipse Adoptium OpenJDK Runtime Environment", "Amazon Corretto JDK Runtime Environment", "IBM JDK Runtime Environment"}, - "container.id": { - "c97f4b7938901101550efbda3c250414cee6ba9bfb4769dc7fe156cb2311735e", - "c97f4b7938911101550efbda3c250414cee6ba9bfb4769dc7fe156cb2311735e", - "c97f4b7938921101550efbda3c250414cee6ba9bfb4769dc7fe156cb2311735e", - }, - "os.type": {"linux", "macos", "windows"}, - "process.command_line": { - "/opt/1/java/openjdk/bin/java -javaagent:/usr/src/app/opentelemetry-javaagent.jar", - "/opt/2/java/openjdk/bin/java -javaagent:/usr/src/app/opentelemetry-javaagent.jar", - "/opt/3/java/openjdk/bin/java -javaagent:/usr/src/app/opentelemetry-javaagent.jar", - }, - "telemetry.sdk.language": {"java", "go", "python", "javascript", "csharp"}, + UrlProperty: {"https://www.seznam.cz"}, + EmailProperty: {"hello@world.com"}, + NameProperty: {"john doe"}, + PhoneProperty: {"+420602303222"}, + // "telemetry.auto.version": {"1.23.0", "1.23.1", "1.23.2", "1.23.3", "1.23.4"}, + // "os.description": {"Linux 5.10.104-linuxkit", "Linux 5.10.105-linuxkit", "Linux 5.10.106-linuxkit", "Linux 5.10.107-linuxkit", "Linux 5.10.108-linuxkit"}, + // "process.runtime.description": {"Eclipse Adoptium OpenJDK 64-Bit Server VM 17.0.6+10", "Eclipse Adoptium OpenJDK 64-Bit Server VM 17.0.7+10", "Eclipse Adoptium OpenJDK 64-Bit Server VM 17.0.8+10", "Eclipse Adoptium OpenJDK 64-Bit Server VM 17.0.9+10", "Eclipse Adoptium OpenJDK 64-Bit Server VM 17.0.10+10"}, + // "service.name": {"adservice", "bbservice", "ccservice", "ddservice", "eeservice"}, + // "service.namespace": {"opentelemetry-demo", "opentelemetry-demo", "opentelemetry-demo", "opentelemetry-demo", "opentelemetry-demo"}, + // "telemetry.sdk.version": {"1.23.0", "1.23.1", "1.23.2", "1.23.3", "1.23.4"}, + // "process.runtime.version": {"17.0.6+10", "17.0.7+10", "17.0.8+10", "17.0.9+10", "17.0.10+10"}, + // "telemetry.sdk.name": {"opentelemetry", "opentelemetry", "opentelemetry", "opentelemetry", "opentelemetry"}, + // "host.arch": {"aarch64", "aarch64", "aarch64", "aarch64", "aarch64"}, + // "host.name": {"c97f4b793890", "c97f4b793891", "c97f4b793892", "c97f4b793893", "c97f4b793894"}, + // "process.executable.path": { + // "/opt/1/java/openjdk/bin/java", + // "/opt/2/java/openjdk/bin/java", + // "/opt/3/java/openjdk/bin/java", + // "/opt/4/java/openjdk/bin/java", + // "/opt/5/java/openjdk/bin/java", + // }, + // "process.pid": {"1", "2", "3", "4", "5"}, + // "process.runtime.name": {"OpenJDK Runtime Environment", "Oracle JDK Runtime Environment", "Eclipse Adoptium OpenJDK Runtime Environment", "Amazon Corretto JDK Runtime Environment", "IBM JDK Runtime Environment"}, + // "container.id": { + // "c97f4b7938901101550efbda3c250414cee6ba9bfb4769dc7fe156cb2311735e", + // "c97f4b7938911101550efbda3c250414cee6ba9bfb4769dc7fe156cb2311735e", + // "c97f4b7938921101550efbda3c250414cee6ba9bfb4769dc7fe156cb2311735e", + // }, + // "os.type": {"linux", "macos", "windows"}, + // "process.command_line": { + // "/opt/1/java/openjdk/bin/java -javaagent:/usr/src/app/opentelemetry-javaagent.jar", + // "/opt/2/java/openjdk/bin/java -javaagent:/usr/src/app/opentelemetry-javaagent.jar", + // "/opt/3/java/openjdk/bin/java -javaagent:/usr/src/app/opentelemetry-javaagent.jar", + // }, + // "telemetry.sdk.language": {"java", "go", "python", "javascript", "csharp"}, } OtelChoicesKeys = func() []string { @@ -56,6 +86,19 @@ var ( }() ) +func generateArrayOfIds(n int, prefix string) []string { + var users = make([]string, n) + + for i := range n { + users[i] = publicid.MustWithPrefix(prefix) + } + + return users +} + +var knownUsers = generateArrayOfIds(200000, "u") +var knownTenants = generateArrayOfIds(100, "t") + type Event struct { simulatedMeasurements []common.SimulatedMeasurement tags []common.Tag @@ -132,7 +175,7 @@ func newEventWithMeasurementGenerator(_ int, start time.Time, generator func(tim { Key: []byte("user_id"), Value: func() interface{} { - return publicid.MustWithPrefix("u") + return gofakeit.RandomString(knownUsers) }, }, { @@ -144,7 +187,7 @@ func newEventWithMeasurementGenerator(_ int, start time.Time, generator func(tim { Key: []byte("tenant_id"), Value: func() interface{} { - return publicid.MustWithPrefix("t") + return gofakeit.RandomString(knownTenants) }, }, { @@ -156,7 +199,7 @@ func newEventWithMeasurementGenerator(_ int, start time.Time, generator func(tim { Key: []byte("name"), Value: func() interface{} { - return gofakeit.RandomString([]string{"click", "pageview", "submit", "network", "error"}) + return gofakeit.RandomString(AvailableEvents) }, }, { diff --git a/pkg/query/config/config.go b/pkg/query/config/config.go index 86459e93b..374f05d78 100644 --- a/pkg/query/config/config.go +++ b/pkg/query/config/config.go @@ -2,6 +2,7 @@ package config import ( "fmt" + "github.com/spf13/pflag" "github.com/timescale/tsbs/internal/utils" "github.com/timescale/tsbs/pkg/data/usecases/common" @@ -25,7 +26,8 @@ type QueryGeneratorConfig struct { TimescaleUseTags bool `mapstructure:"timescale-use-tags"` TimescaleUseTimeBucket bool `mapstructure:"timescale-use-time-bucket"` - ClickhouseUseTags bool `mapstructure:"clickhouse-use-tags"` + ClickhouseUseTags bool `mapstructure:"clickhouse-use-tags"` + ClickhousePropertyAccessMode string `mapstructure:"clickhouse-property-access-mode"` MongoUseNaive bool `mapstructure:"mongo-use-native"` DbName string `mapstructure:"db-name"` @@ -57,6 +59,7 @@ func (c *QueryGeneratorConfig) AddToFlagSet(fs *pflag.FlagSet) { "The number of round-robin serialization groups. Use this to scale up data generation to multiple processes.") fs.Bool("clickhouse-use-tags", true, "ClickHouse only: Use separate tags table when querying") + fs.String("clickhouse-property-access-mode", "json", "ClickHouse only (DEA): Use json or map") fs.Bool("mongo-use-naive", true, "MongoDB only: Generate queries for the 'naive' data storage format for Mongo") fs.Bool("timescale-use-json", false, "TimescaleDB only: Use separate JSON tags table when querying") fs.Bool("timescale-use-tags", true, "TimescaleDB only: Use separate tags table when querying") diff --git a/pkg/query/factories/init_factories.go b/pkg/query/factories/init_factories.go index ff3faf47d..f1b6f4730 100644 --- a/pkg/query/factories/init_factories.go +++ b/pkg/query/factories/init_factories.go @@ -20,7 +20,8 @@ func InitQueryFactories(config *config.QueryGeneratorConfig) map[string]interfac factories := make(map[string]interface{}) factories[constants.FormatCassandra] = &cassandra.BaseGenerator{} factories[constants.FormatClickhouse] = &clickhouse.BaseGenerator{ - UseTags: config.ClickhouseUseTags, + UseTags: config.ClickhouseUseTags, + PropertyAccessMode: config.ClickhousePropertyAccessMode, } factories[constants.FormatCrateDB] = &cratedb.BaseGenerator{} factories[constants.FormatInflux] = &influx.BaseGenerator{} diff --git a/pkg/targets/clickhouse/benchmark.go b/pkg/targets/clickhouse/benchmark.go index 3308c7004..fc79147d9 100644 --- a/pkg/targets/clickhouse/benchmark.go +++ b/pkg/targets/clickhouse/benchmark.go @@ -29,9 +29,9 @@ type insertData struct { fields string // 1451606400000000000,58,2,24,61,22,63,6,44,80,38 } -var tableCols map[string][]string +var tableCols = map[string][]string{"tags": {"event_id", "user_id", "session_id", "tenant_id", "timestamp", "name", "properties_map", "properties_json"}} -var tagColumnTypes []string +var tagColumnTypes = []string{"string", "string", "string", "string", "int64", "string", "string", "string"} // allows for testing var fatal = log.Fatalf diff --git a/pkg/targets/clickhouse/creator.go b/pkg/targets/clickhouse/creator.go index aebc5de57..1f79b5630 100644 --- a/pkg/targets/clickhouse/creator.go +++ b/pkg/targets/clickhouse/creator.go @@ -153,6 +153,8 @@ func (d *dbCreator) CreateDB(dbName string) error { tableCols["tags"] = d.headers.TagKeys tagColumnTypes = d.headers.TagTypes + // fmt.Println("\n\nTag column types:", tagColumnTypes) + for tableName, fieldColumns := range d.headers.FieldKeys { //tableName: cpu // fieldColumns content: diff --git a/pkg/targets/clickhouse/processor.go b/pkg/targets/clickhouse/processor.go index 5ef6c7477..ce4354730 100644 --- a/pkg/targets/clickhouse/processor.go +++ b/pkg/targets/clickhouse/processor.go @@ -93,6 +93,7 @@ func (p *processor) processCSI(tableName string, rows []*insertData) uint64 { tagRows := make([][]string, 0, len(rows)) dataRows := make([][]interface{}, 0, len(rows)) ret := uint64(0) + // fmt.Println("table cols:", tableCols["tags"]) commonTagsLen := len(tableCols["tags"]) colLen := len(tableCols[tableName]) + 2 @@ -330,7 +331,12 @@ func insertTags(_ *ClickhouseConfig, conn driver.Conn, startID int, rows [][]str var variadicArgs []interface{} = make([]interface{}, len(row)+1) // +1 here for additional 'id' column value // Place id at the beginning variadicArgs[0] = id + // And all the rest of column values afterwards + // fmt.Println("\n\nTag column types:", tagColumnTypes) + // [string string string string int64 string string string] + // [string string string string int64 string string string] + for i, value := range row { variadicArgs[i+1] = convertBasedOnType(tagColumnTypes[i], value) } @@ -347,6 +353,7 @@ func insertTags(_ *ClickhouseConfig, conn driver.Conn, startID int, rows [][]str variadicArgs[len(variadicArgs)-2] = map[string]string{} } + // fmt.Println(variadicArgs...) if err := batch.Append(variadicArgs...); err != nil { panic(err) } diff --git a/pkg/targets/pinot/benchmark.go b/pkg/targets/pinot/benchmark.go new file mode 100644 index 000000000..42cea103a --- /dev/null +++ b/pkg/targets/pinot/benchmark.go @@ -0,0 +1,135 @@ +package pinot + +import ( + "bufio" + "fmt" + "log" + + "github.com/timescale/tsbs/load" + "github.com/timescale/tsbs/pkg/data" + "github.com/timescale/tsbs/pkg/targets" +) + +const dbType = "clickhouse" + +type ClickhouseConfig struct { + Host string + User string + Password string + + LogBatches bool + InTableTag bool + Debug int + DbName string +} + +// String values of tags and fields to insert - string representation +type insertData struct { + tags string // hostname=host_0,region=eu-west-1,datacenter=eu-west-1b,rack=67,os=Ubuntu16.10,arch=x86,team=NYC,service=7,service_version=0,service_environment=production + fields string // 1451606400000000000,58,2,24,61,22,63,6,44,80,38 +} + +var tableCols map[string][]string + +var tagColumnTypes []string + +// allows for testing +var fatal = log.Fatalf + +// getConnectString() builds connect string to ClickHouse +// db - whether database specification should be added to the connection string +func getConnectString(conf *ClickhouseConfig, db bool) string { + // connectString: tcp://127.0.0.1:9000?debug=true + // ClickHouse ex.: + // tcp://host1:9000?username=user&password=qwerty&database=clicks&read_timeout=10&write_timeout=20&alt_hosts=host2:9000,host3:9000 + if db { + return fmt.Sprintf("tcp://%s:9000?username=%s&password=%s&database=%s", conf.Host, conf.User, conf.Password, conf.DbName) + } + + return fmt.Sprintf("tcp://%s:9000?username=%s&password=%s", conf.Host, conf.User, conf.Password) +} + +// Point is a single row of data keyed by which table it belongs +// Ex.: +// tags,hostname=host_0,region=eu-west-1,datacenter=eu-west-1b,rack=67,os=Ubuntu16.10,arch=x86,team=NYC,service=7,service_version=0,service_environment=production +// cpu,1451606400000000000,58,2,24,61,22,63,6,44,80,38 +type point struct { + table string + row *insertData +} + +// scan.Batch interface implementation +type tableArr struct { + m map[string][]*insertData + cnt uint +} + +// scan.Batch interface implementation +func (ta *tableArr) Len() uint { + return ta.cnt +} + +// scan.Batch interface implementation +func (ta *tableArr) Append(item data.LoadedPoint) { + that := item.Data.(*point) + k := that.table + ta.m[k] = append(ta.m[k], that.row) + ta.cnt++ +} + +// scan.BatchFactory interface implementation +type factory struct{} + +// scan.BatchFactory interface implementation +func (f *factory) New() targets.Batch { + return &tableArr{ + m: map[string][]*insertData{}, + cnt: 0, + } +} + +const tagsPrefix = "tags" + +func NewBenchmark(file string, hashWorkers bool, conf *ClickhouseConfig) targets.Benchmark { + return &benchmark{ + ds: &fileDataSource{ + scanner: bufio.NewScanner(load.GetBufferedReader(file)), + }, + hashWorkers: hashWorkers, + conf: conf, + } +} + +// targets.Benchmark interface implementation +type benchmark struct { + ds targets.DataSource + hashWorkers bool + conf *ClickhouseConfig +} + +func (b *benchmark) GetDataSource() targets.DataSource { + return b.ds +} + +func (b *benchmark) GetBatchFactory() targets.BatchFactory { + return &factory{} +} + +func (b *benchmark) GetPointIndexer(maxPartitions uint) targets.PointIndexer { + if b.hashWorkers { + return &hostnameIndexer{ + partitions: maxPartitions, + } + } + return &targets.ConstantIndexer{} +} + +// loader.Benchmark interface implementation +func (b *benchmark) GetProcessor() targets.Processor { + return &processor{conf: b.conf} +} + +// loader.Benchmark interface implementation +func (b *benchmark) GetDBCreator() targets.DBCreator { + return &dbCreator{ds: b.GetDataSource(), config: b.conf} +} diff --git a/pkg/targets/pinot/clickhouse_test.go b/pkg/targets/pinot/clickhouse_test.go new file mode 100644 index 000000000..9f5329b8a --- /dev/null +++ b/pkg/targets/pinot/clickhouse_test.go @@ -0,0 +1,237 @@ +package pinot + +import ( + "bufio" + "bytes" + "fmt" + "log" + "testing" + + "github.com/timescale/tsbs/pkg/data" +) + +func TestGetConnectString(t *testing.T) { + wantHost := "localhost" + wantUser := "default" + wantPassword := "" + wantDB := "benchmark" + want := fmt.Sprintf("tcp://%s:9000?username=%s&password=%s&database=%s", wantHost, wantUser, wantPassword, wantDB) + + connStr := getConnectString(&ClickhouseConfig{ + Host: wantHost, + User: wantUser, + Password: wantPassword, + DbName: wantDB, + }, + true) + if connStr != want { + t.Errorf("incorrect connect string: got %s want %s", connStr, want) + } +} + +func TestHypertableArr(t *testing.T) { + f := &factory{} + ha := f.New().(*tableArr) + if ha.Len() != 0 { + t.Errorf("tableArr not initialized with count 0") + } + p := data.LoadedPoint{ + Data: &point{ + table: "table1", + row: &insertData{ + tags: "t1,t2", + fields: "0,f1,f2", + }, + }, + } + ha.Append(p) + if ha.Len() != 1 { + t.Errorf("tableArr count is not 1 after first append") + } + p = data.LoadedPoint{ + Data: &point{ + table: "table2", + row: &insertData{ + tags: "t3,t4", + fields: "1,f3,f4", + }, + }, + } + ha.Append(p) + if ha.Len() != 2 { + t.Errorf("tableArr count is not 2 after 2nd append") + } + if len(ha.m) != 2 { + t.Errorf("tableArr does not have 2 different hypertables") + } +} + +func TestNextItem(t *testing.T) { + cases := []struct { + desc string + input string + wantPrefix string + wantFields string + wantTags string + shouldFatal bool + }{ + { + desc: "correct input", + input: "tags,tag1text,tag2text\ncpu,140,0.0,0.0\n", + wantPrefix: "cpu", + wantFields: "140,0.0,0.0", + wantTags: "tag1text,tag2text", + }, + { + desc: "incorrect tags prefix", + input: "foo,bar,baz\ncpu,140,0.0,0.0\n", + shouldFatal: true, + }, + { + desc: "missing values line", + input: "tags,tag1text,tag2text", + shouldFatal: true, + }, + } + for _, c := range cases { + br := bufio.NewReader(bytes.NewReader([]byte(c.input))) + dataSource := &fileDataSource{scanner: bufio.NewScanner(br)} + if c.shouldFatal { + fmt.Println(c.desc) + isCalled := false + fatal = func(fmt string, args ...interface{}) { + isCalled = true + log.Printf(fmt, args...) + } + _ = dataSource.NextItem() + if !isCalled { + t.Errorf("%s: did not call fatal when it should", c.desc) + } + } else { + p := dataSource.NextItem() + data := p.Data.(*point) + if data.table != c.wantPrefix { + t.Errorf("%s: incorrect prefix: got %s want %s", c.desc, data.table, c.wantPrefix) + } + if data.row.fields != c.wantFields { + t.Errorf("%s: incorrect fields: got %s want %s", c.desc, data.row.fields, c.wantFields) + } + if data.row.tags != c.wantTags { + t.Errorf("%s: incorrect tags: got %s want %s", c.desc, data.row.tags, c.wantTags) + } + } + } +} + +func TestDecodeEOF(t *testing.T) { + input := []byte("tags,tag1text,tag2text\ncpu,140,0.0,0.0\n") + br := bufio.NewReader(bytes.NewReader([]byte(input))) + dataSource := &fileDataSource{scanner: bufio.NewScanner(br)} + _ = dataSource.NextItem() + // nothing left, should be EOF + p := dataSource.NextItem() + if p.Data != nil { + t.Errorf("expected p to be nil, got %v", p) + } +} + +func TestHeaders(t *testing.T) { + cases := []struct { + desc string + input string + wantTags []string + wantCols map[string][]string + wantTypes []string + shouldFatal bool + wantBuffered int + }{ + { + desc: "min case: exactly three lines", + input: "tags,tag1 string,tag2 float32\ncols,col1,col2\n\n", + wantTags: []string{"tag1", "tag2"}, + wantCols: map[string][]string{"cols": {"col1", "col2"}}, + wantTypes: []string{"string", "float32"}, + wantBuffered: 0, + }, + { + desc: "min case: more than the header 3 lines", + input: "tags,tag1 string,tag2 string\ncols,col1,col2\n\nrow1\nrow2\n", + wantTags: []string{"tag1", "tag2"}, + wantTypes: []string{"string", "string"}, + wantCols: map[string][]string{"cols": {"col1", "col2"}}, + wantBuffered: len([]byte("row1\nrow2\n")), + }, + { + desc: "multiple tables: more than 3 lines for header", + input: "tags,tag1 int32,tag2 int64\ncols,col1,col2\ncols2,col21,col22\n\n", + wantTags: []string{"tag1", "tag2"}, + wantTypes: []string{"int32", "int64"}, + wantCols: map[string][]string{"cols": {"col1", "col2"}, "cols2": {"col21", "col22"}}, + wantBuffered: 0, + }, + { + desc: "multiple tables: more than 3 lines for header w/ extra", + input: "tags,tag1 tag,tag2 tag2\ncols,col1,col2\ncols2,col21,col22\n\nrow1\nrow2\n", + wantTags: []string{"tag1", "tag2"}, + wantTypes: []string{"tag", "tag2"}, + wantCols: map[string][]string{"cols": {"col1", "col2"}, "cols2": {"col21", "col22"}}, + wantBuffered: len([]byte("row1\nrow2\n")), + }, + { + desc: "too few lines", + input: "tags\ncols\n", + shouldFatal: true, + }, + { + desc: "no line ender", + input: "tags", + shouldFatal: true, + }, + } + + for _, c := range cases { + br := bufio.NewReader(bytes.NewReader([]byte(c.input))) + dataSource := &fileDataSource{bufio.NewScanner(br), nil} + if c.shouldFatal { + isCalled := false + fatal = func(fmt string, args ...interface{}) { + isCalled = true + log.Printf(fmt, args...) + } + dataSource.Headers() + if !isCalled { + t.Errorf("%s: did not call fatal when it should", c.desc) + } + } else { + headers := dataSource.Headers() + if !strArrEq(headers.TagKeys, c.wantTags) { + t.Errorf("%s: incorrect tags: got\n%v\nwant\n%v", c.desc, headers.TagKeys, c.wantTags) + } + if !strArrEq(headers.TagTypes, c.wantTypes) { + t.Errorf("%s: incorrect tag types: got\n%v\nwant\n%v", c.desc, headers.TagTypes, c.wantTypes) + } + + if len(headers.FieldKeys) != len(c.wantCols) { + t.Errorf("%s: incorrect cols len: got %d want %d", c.desc, len(headers.FieldKeys), len(c.wantCols)) + } + for key, got := range headers.FieldKeys { + want := c.wantCols[key] + if !strArrEq(got, want) { + t.Errorf("%s: cols row incorrect: got\n%v\nwant\n%v\n", c.desc, got, want) + } + } + } + } +} + +func strArrEq(a []string, b []string) bool { + if len(a) != len(b) { + return false + } + for i, aa := range a { + if aa != b[i] { + return false + } + } + return true +} diff --git a/pkg/targets/pinot/creator.go b/pkg/targets/pinot/creator.go new file mode 100644 index 000000000..f97032d7b --- /dev/null +++ b/pkg/targets/pinot/creator.go @@ -0,0 +1,203 @@ +package pinot + +import ( + "fmt" + "strings" + + "github.com/jmoiron/sqlx" + _ "github.com/kshvakov/clickhouse" + "github.com/timescale/tsbs/pkg/data/usecases/common" + "github.com/timescale/tsbs/pkg/targets" +) + +// loader.DBCreator interface implementation +type dbCreator struct { + ds targets.DataSource + headers *common.GeneratedDataHeaders + connStr string + config *ClickhouseConfig +} + +// loader.DBCreator interface implementation +func (d *dbCreator) Init() { + // fills dbCreator struct with data structure (tables description) + // specified at the beginning of the data file + d.headers = d.ds.Headers() +} + +// loader.DBCreator interface implementation +func (d *dbCreator) DBExists(dbName string) bool { + db := sqlx.MustConnect(dbType, getConnectString(d.config, false)) + defer db.Close() + + sql := fmt.Sprintf("SELECT name, engine FROM system.databases WHERE name = '%s'", dbName) + if d.config.Debug > 0 { + fmt.Printf(sql) + } + var rows []struct { + Name string `db:"name"` + Engine string `db:"engine"` + } + + err := db.Select(&rows, sql) + if err != nil { + panic(err) + } + for _, row := range rows { + if row.Name == dbName { + return true + } + } + + return false +} + +// loader.DBCreator interface implementation +func (d *dbCreator) RemoveOldDB(dbName string) error { + db := sqlx.MustConnect(dbType, getConnectString(d.config, false)) + defer db.Close() + + sql := fmt.Sprintf("DROP DATABASE IF EXISTS %s", dbName) + if _, err := db.Exec(sql); err != nil { + panic(err) + } + return nil +} + +// loader.DBCreator interface implementation +func (d *dbCreator) CreateDB(dbName string) error { + // Connect to ClickHouse in general and CREATE DATABASE + db := sqlx.MustConnect(dbType, getConnectString(d.config, false)) + sql := fmt.Sprintf("CREATE DATABASE %s", dbName) + _, err := db.Exec(sql) + if err != nil { + panic(err) + } + db.Close() + db = nil + + // Connect to specified database within ClickHouse + db = sqlx.MustConnect(dbType, getConnectString(d.config, true)) + defer db.Close() + + createTagsTable(d.config, db, d.headers.TagKeys, d.headers.TagTypes) + if tableCols == nil { + tableCols = make(map[string][]string) + } + tableCols["tags"] = d.headers.TagKeys + tagColumnTypes = d.headers.TagTypes + + for tableName, fieldColumns := range d.headers.FieldKeys { + //tableName: cpu + // fieldColumns content: + // usage_user,usage_system,usage_idle,usage_nice,usage_iowait,usage_irq,usage_softirq,usage_steal,usage_guest,usage_guest_nice + createMetricsTable(d.config, db, tableName, fieldColumns) + } + + return nil +} + +// createTagsTable builds CREATE TABLE SQL statement and runs it +func createTagsTable(conf *ClickhouseConfig, db *sqlx.DB, tagNames, tagTypes []string) { + sql := generateTagsTableQuery(tagNames, tagTypes) + if conf.Debug > 0 { + fmt.Printf(sql) + } + _, err := db.Exec(sql) + if err != nil { + panic(err) + } +} + +// createMetricsTable builds CREATE TABLE SQL statement and runs it +func createMetricsTable(conf *ClickhouseConfig, db *sqlx.DB, tableName string, fieldColumns []string) { + tableCols[tableName] = fieldColumns + + // We'll have some service columns in table to be created and columnNames contains all column names to be created + var columnNames []string + + if conf.InTableTag { + // First column in the table - service column - partitioning field + partitioningColumn := tableCols["tags"][0] // would be 'hostname' + columnNames = append(columnNames, partitioningColumn) + } + + // Add all column names from fieldColumns into columnNames + columnNames = append(columnNames, fieldColumns...) + + // columnsWithType - column specifications with type. Ex.: "cpu_usage Float64" + var columnsWithType []string + for _, column := range columnNames { + if len(column) == 0 { + // Skip nameless columns + continue + } + columnsWithType = append(columnsWithType, fmt.Sprintf("%s Nullable(Float64)", column)) + } + + sql := fmt.Sprintf(` + CREATE TABLE %s ( + created_date Date DEFAULT today(), + created_at DateTime DEFAULT now(), + time String, + tags_id UInt32, + %s, + additional_tags String DEFAULT '' + ) ENGINE = MergeTree(created_date, (tags_id, created_at), 8192) + `, + tableName, + strings.Join(columnsWithType, ",")) + if conf.Debug > 0 { + fmt.Printf(sql) + } + _, err := db.Exec(sql) + if err != nil { + panic(err) + } +} + +func generateTagsTableQuery(tagNames, tagTypes []string) string { + // prepare COLUMNs specification for CREATE TABLE statement + // all columns would be of the type specified in the tags header + // e.g. tags, tag2 string,tag2 int32... + if len(tagNames) != len(tagTypes) { + panic("wrong number of tag names and tag types") + } + + tagColumnDefinitions := make([]string, len(tagNames)) + for i, tagName := range tagNames { + tagType := serializedTypeToClickHouseType(tagTypes[i]) + tagColumnDefinitions[i] = fmt.Sprintf("%s %s", tagName, tagType) + } + + cols := strings.Join(tagColumnDefinitions, ",\n") + + index := "id" + + return fmt.Sprintf( + "CREATE TABLE tags(\n"+ + "created_date Date DEFAULT today(),\n"+ + "created_at DateTime DEFAULT now(),\n"+ + "id UInt32,\n"+ + "%s"+ + ") ENGINE = MergeTree(created_date, (%s), 8192)", + cols, + index) +} + +func serializedTypeToClickHouseType(serializedType string) string { + switch serializedType { + case "string": + return "Nullable(String)" + case "float32": + return "Nullable(Float32)" + case "float64": + return "Nullable(Float64)" + case "int64": + return "Nullable(Int64)" + case "int32": + return "Nullable(Int32)" + default: + panic(fmt.Sprintf("unrecognized type %s", serializedType)) + } +} diff --git a/pkg/targets/pinot/creator_test.go b/pkg/targets/pinot/creator_test.go new file mode 100644 index 000000000..ed9a67cb4 --- /dev/null +++ b/pkg/targets/pinot/creator_test.go @@ -0,0 +1,68 @@ +package pinot + +import ( + "fmt" + "testing" +) + +func TestGenerateTagsTableQuery(t *testing.T) { + testCases := []struct { + inTagNames []string + inTagTypes []string + out string + }{{ + inTagNames: []string{"tag1"}, + inTagTypes: []string{"string"}, + out: "CREATE TABLE tags(\n" + + "created_date Date DEFAULT today(),\n" + + "created_at DateTime DEFAULT now(),\n" + + "id UInt32,\n" + + "tag1 Nullable(String)" + + ") ENGINE = MergeTree(created_date, (id), 8192)"}, { + inTagNames: []string{"tag1", "tag2", "tag3", "tag4"}, + inTagTypes: []string{"int32", "int64", "float32", "float64"}, + out: "CREATE TABLE tags(\n" + + "created_date Date DEFAULT today(),\n" + + "created_at DateTime DEFAULT now(),\n" + + "id UInt32,\n" + + "tag1 Nullable(Int32),\n" + + "tag2 Nullable(Int64),\n" + + "tag3 Nullable(Float32),\n" + + "tag4 Nullable(Float64)" + + ") ENGINE = MergeTree(created_date, (id), 8192)"}, + } + for _, tc := range testCases { + t.Run(fmt.Sprintf("tags table for %v", tc.inTagNames), func(t *testing.T) { + res := generateTagsTableQuery(tc.inTagNames, tc.inTagTypes) + if res != tc.out { + t.Errorf("unexpected result.\nexpected: %s\ngot: %s", tc.out, res) + } + }) + } +} + +func TestGenerateTagsTableQueryPanicOnWrongFormat(t *testing.T) { + defer func() { + r := recover() + if r == nil { + t.Errorf("did not panic when should") + } + }() + + generateTagsTableQuery([]string{"tag"}, []string{}) + + t.Fatalf("test should have stopped at this point") +} + +func TestGenerateTagsTableQueryPanicOnWrongType(t *testing.T) { + defer func() { + r := recover() + if r == nil { + t.Errorf("did not panic when should") + } + }() + + generateTagsTableQuery([]string{"unknownType"}, []string{"uint32"}) + + t.Fatalf("test should have stopped at this point") +} diff --git a/pkg/targets/pinot/file_data_source.go b/pkg/targets/pinot/file_data_source.go new file mode 100644 index 000000000..26e95ee3c --- /dev/null +++ b/pkg/targets/pinot/file_data_source.go @@ -0,0 +1,162 @@ +package pinot + +import ( + "bufio" + "strings" + + "github.com/timescale/tsbs/pkg/data" + "github.com/timescale/tsbs/pkg/data/usecases/common" +) + +// scan.PointDecoder interface implementation +type fileDataSource struct { + scanner *bufio.Scanner + //cached headers (should be read only at start of file) + headers *common.GeneratedDataHeaders +} + +// scan.PointDecoder interface implementation +func (d *fileDataSource) NextItem() data.LoadedPoint { + // Data Point Example + // tags,hostname=host_0,region=eu-west-1,datacenter=eu-west-1b,rack=67,os=Ubuntu16.10,arch=x86,team=NYC,service=7,service_version=0,service_environment=production + // cpu,1451606400000000000,58,2,24,61,22,63,6,44,80,38 + + newPoint := &insertData{} + ok := d.scanner.Scan() + if !ok && d.scanner.Err() == nil { + // nothing scanned & no error = EOF + return data.LoadedPoint{} + } else if !ok { + fatal("scan error: %v", d.scanner.Err()) + return data.LoadedPoint{} + } + + // The first line is a CSV line of tags with the first element being "tags" + // Ex.: + // tags,hostname=host_0,region=eu-west-1,datacenter=eu-west-1b,rack=67,os=Ubuntu16.10,arch=x86,team=NYC,service=7,service_version=0,service_environment=production + parts := strings.SplitN(d.scanner.Text(), ",", 2) // prefix & then rest of line + prefix := parts[0] + if prefix != tagsPrefix { + fatal("data file in invalid format; got %s expected %s", prefix, tagsPrefix) + return data.LoadedPoint{} + } + newPoint.tags = parts[1] + + // Scan again to get the data line + // cpu,1451606400000000000,58,2,24,61,22,63,6,44,80,38 + ok = d.scanner.Scan() + if !ok { + fatal("scan error: %v", d.scanner.Err()) + return data.LoadedPoint{} + } + parts = strings.SplitN(d.scanner.Text(), ",", 2) // prefix & then rest of line + prefix = parts[0] + newPoint.fields = parts[1] + + return data.NewLoadedPoint(&point{ + table: prefix, + row: newPoint, + }) +} + +func (d *fileDataSource) Headers() *common.GeneratedDataHeaders { + if d.headers != nil { + return d.headers + } + // First N lines are header, describing data structure. + // The first line containing tags table name ('tags') followed by list of tags, comma-separated. + // Ex.: tags,hostname,region,datacenter,rack,os,arch,team,service,service_version + // The second through N-1 line containing table name (ex.: 'cpu') followed by list of column names, + // comma-separated. Ex.: cpu,usage_user,usage_system,usage_idle,usage_nice,usage_iowait,usage_irq,usage_softirq + // The last line being blank to separate from the data + // + // Header example: + // tags,hostname,region,datacenter,rack,os,arch,team,service,service_version,service_environment + // cpu,usage_user,usage_system,usage_idle,usage_nice,usage_iowait,usage_irq,usage_softirq,usage_steal,usage_guest,usage_guest_nice + // disk,total,free,used,used_percent,inodes_total,inodes_free,inodes_used + // nginx,accepts,active,handled,reading,requests,waiting,writing + var tags string + var cols []string + i := 0 + for { + var line string + ok := d.scanner.Scan() + if !ok && d.scanner.Err() == nil { // nothing scanned & no error = EOF + fatal("reached EOF, but not enough things scanned") + return nil + } else if !ok { + fatal("scan error: %v", d.scanner.Err()) + return nil + } + if i == 0 { + // read first line - list of tags + tags = d.scanner.Text() + tags = strings.TrimSpace(tags) + } else { + // read the second and further lines - metrics descriptions + line = d.scanner.Text() + line = strings.TrimSpace(line) + if len(line) == 0 { + // empty line - end of header + break + } + // append new table/columns set to the list of tables/columns set + cols = append(cols, line) + } + i++ + } + + // tags content: + //tags,hostname,region,datacenter,rack,os,arch,team,service,service_version,service_environment + // + // Parts would contain + // 0: tags - reserved word - tags mark + // 1: + // N: actual tags + // so we'll use tags[1:] for tags specification + parts := strings.Split(tags, ",") + if parts[0] != "tags" { + fatal("input header in wrong format. got '%s', expected 'tags'", parts[0]) + return nil + } + tagNames, tagTypes := extractTagNamesAndTypes(parts[1:]) + fieldKeys := make(map[string][]string) + // cols content are lines (metrics descriptions) as: + // cpu,usage_user,usage_system,usage_idle,usage_nice,usage_iowait,usage_irq,usage_softirq,usage_steal,usage_guest,usage_guest_nice + // disk,total,free,used,used_percent,inodes_total,inodes_free,inodes_used + // nginx,accepts,active,handled,reading,requests,waiting,writing + // generalised description: + // tableName,fieldName1,...,fieldNameX + for _, colsForMeasure := range cols { + tableSpec := strings.Split(colsForMeasure, ",") + // tableSpec contain + // 0: table name + // 1: table column name 1 + // N: table column name N + + // Ex.: cpu OR disk OR nginx + tableName := tableSpec[0] + fieldKeys[tableName] = tableSpec[1:] + } + d.headers = &common.GeneratedDataHeaders{ + TagKeys: tagNames, + TagTypes: tagTypes, + FieldKeys: fieldKeys, + } + return d.headers +} + +func extractTagNamesAndTypes(tags []string) ([]string, []string) { + tagNames := make([]string, len(tags)) + tagTypes := make([]string, len(tags)) + for i, tagWithType := range tags { + tagAndType := strings.Split(tagWithType, " ") + if len(tagAndType) != 2 { + panic("tag header has invalid format") + } + tagNames[i] = tagAndType[0] + tagTypes[i] = tagAndType[1] + } + + return tagNames, tagTypes +} diff --git a/pkg/targets/pinot/implemented_target.go b/pkg/targets/pinot/implemented_target.go new file mode 100644 index 000000000..5df910515 --- /dev/null +++ b/pkg/targets/pinot/implemented_target.go @@ -0,0 +1,37 @@ +package pinot + +import ( + "github.com/blagojts/viper" + "github.com/spf13/pflag" + "github.com/timescale/tsbs/pkg/data/serialize" + "github.com/timescale/tsbs/pkg/data/source" + "github.com/timescale/tsbs/pkg/targets" + "github.com/timescale/tsbs/pkg/targets/constants" + "github.com/timescale/tsbs/pkg/targets/timescaledb" +) + +func NewTarget() targets.ImplementedTarget { + return &clickhouseTarget{} +} + +type clickhouseTarget struct{} + +func (c clickhouseTarget) Benchmark(string, *source.DataSourceConfig, *viper.Viper) (targets.Benchmark, error) { + panic("implement me") +} + +func (c clickhouseTarget) Serializer() serialize.PointSerializer { + return ×caledb.Serializer{} +} + +func (c clickhouseTarget) TargetSpecificFlags(flagPrefix string, flagSet *pflag.FlagSet) { + flagSet.String(flagPrefix+"host", "localhost", "Hostname of ClickHouse instance") + flagSet.String(flagPrefix+"user", "default", "User to connect to ClickHouse as") + flagSet.String(flagPrefix+"password", "", "Password to connect to ClickHouse") + flagSet.Bool(flagPrefix+"log-batches", false, "Whether to time individual batches.") + flagSet.Int(flagPrefix+"debug", 0, "Debug printing (choices: 0, 1, 2). (default 0)") +} + +func (c clickhouseTarget) TargetName() string { + return constants.FormatClickhouse +} diff --git a/pkg/targets/pinot/indexer.go b/pkg/targets/pinot/indexer.go new file mode 100644 index 000000000..d175665b2 --- /dev/null +++ b/pkg/targets/pinot/indexer.go @@ -0,0 +1,22 @@ +package pinot + +import ( + "hash/fnv" + "strings" + + "github.com/timescale/tsbs/pkg/data" +) + +// hostnameIndexer is used to consistently send the same hostnames to the same queue +type hostnameIndexer struct { + partitions uint +} + +// scan.PointIndexer interface implementation +func (i *hostnameIndexer) GetIndex(item data.LoadedPoint) uint { + p := item.Data.(*point) + hostname := strings.SplitN(p.row.tags, ",", 2)[0] + h := fnv.New32a() + h.Write([]byte(hostname)) + return uint(h.Sum32()) % i.partitions +} diff --git a/pkg/targets/pinot/processor.go b/pkg/targets/pinot/processor.go new file mode 100644 index 000000000..7df322d62 --- /dev/null +++ b/pkg/targets/pinot/processor.go @@ -0,0 +1,413 @@ +package pinot + +import ( + "fmt" + "strconv" + "strings" + "sync" + "time" + + "github.com/jmoiron/sqlx" + "github.com/timescale/tsbs/pkg/targets" +) + +// load.Processor interface implementation +type processor struct { + db *sqlx.DB + csi *syncCSI + conf *ClickhouseConfig +} + +// load.Processor interface implementation +func (p *processor) Init(workerNum int, doLoad, hashWorkers bool) { + if doLoad { + p.db = sqlx.MustConnect(dbType, getConnectString(p.conf, true)) + if hashWorkers { + p.csi = newSyncCSI() + } else { + p.csi = globalSyncCSI + } + } +} + +// load.ProcessorCloser interface implementation +func (p *processor) Close(doLoad bool) { + if doLoad { + p.db.Close() + } +} + +// load.Processor interface implementation +func (p *processor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint64) { + batches := b.(*tableArr) + rowCnt := 0 + metricCnt := uint64(0) + for tableName, rows := range batches.m { + rowCnt += len(rows) + if doLoad { + start := time.Now() + metricCnt += p.processCSI(tableName, rows) + + if p.conf.LogBatches { + now := time.Now() + took := now.Sub(start) + batchSize := len(rows) + fmt.Printf("BATCH: batchsize %d row rate %f/sec (took %v)\n", batchSize, float64(batchSize)/took.Seconds(), took) + } + } + } + batches.m = map[string][]*insertData{} + batches.cnt = 0 + + return metricCnt, uint64(rowCnt) +} + +func newSyncCSI() *syncCSI { + return &syncCSI{ + m: make(map[string]int64), + mutex: &sync.RWMutex{}, + } +} + +type syncCSI struct { + // Map hostname to tags.id for this host + m map[string]int64 + mutex *sync.RWMutex +} + +// globalSyncCSI is used when data is not hashed by some function to a worker consistently so +// therefore all workers need to know about the same map from hostname -> tags_id +var globalSyncCSI = newSyncCSI() + +// Process part of incoming data - insert into tables +func (p *processor) processCSI(tableName string, rows []*insertData) uint64 { + tagRows := make([][]string, 0, len(rows)) + dataRows := make([][]interface{}, 0, len(rows)) + ret := uint64(0) + commonTagsLen := len(tableCols["tags"]) + + colLen := len(tableCols[tableName]) + 2 + if p.conf.InTableTag { + colLen++ + } + + var tagsIdPosition int = 0 + + for _, row := range rows { + // Split the tags into individual common tags and + // an extra bit leftover for non-common tags that need to be added separately. + // For each of the common tags, remove everything after = in the form