Skip to content

Commit

Permalink
[extension/dbstorage] Add DB Transactions to dbstorage.Batch() method (
Browse files Browse the repository at this point in the history
…open-telemetry#37805)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
According to [Storage API
documentation](https://github.com/open-telemetry/opentelemetry-collector/blob/main/extension/xextension/storage/README.md)
`Batch` is expected to "execute several operations in a single
transaction"
Moreover, Persistent Queue in `exporterhelper` is actively using
`storage.Batch()` with read+write/write+delete operations in single
call, which is really needs to be in a single transaction to avoid
accidental data inconsistency in Persistent Queue in case of unexpected
service shutdown. For example
[here](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterqueue/persistent_queue.go#L140)
or
[here](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterqueue/persistent_queue.go#L257)
or
[here](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterqueue/persistent_queue.go#L463)
As currently supported SQlite and PostgreSQL driver natively support
Transaction - this PR implements single transaction inside
`storage.Batch()` call
Also, I've added bunch of unit tests that were missing to ensure that
show Storage API works as expected in `dbstorage` extension

<!--Describe what testing was performed and which tests were added.-->
#### Testing
Respective Unit Tests were added

---------

Co-authored-by: Sean Marciniak <[email protected]>
  • Loading branch information
Fiery-Fenix and MovieStoreGuy authored Feb 12, 2025
1 parent 589d24a commit ee2d165
Show file tree
Hide file tree
Showing 9 changed files with 405 additions and 30 deletions.
27 changes: 27 additions & 0 deletions .chloggen/feat_dbstorage-transaction-support.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: dbstorageextension

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add DB Transactions to dbstorage.Batch() method as it is expected by Storage API

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37805]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
93 changes: 67 additions & 26 deletions extension/storage/dbstorage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
// SQLite driver
_ "github.com/mattn/go-sqlite3"
"go.opentelemetry.io/collector/extension/xextension/storage"
"go.uber.org/zap"
)

const (
Expand All @@ -25,15 +26,16 @@ const (
)

type dbStorageClient struct {
logger *zap.Logger
db *sql.DB
getQuery *sql.Stmt
setQuery *sql.Stmt
deleteQuery *sql.Stmt
}

func newClient(ctx context.Context, driverName string, db *sql.DB, tableName string) (*dbStorageClient, error) {
func newClient(ctx context.Context, logger *zap.Logger, db *sql.DB, driverName string, tableName string) (*dbStorageClient, error) {
createTableSQL := createTable
if driverName == "sqlite" {
if driverName == driverSQLite {
createTableSQL = createTableSqlite
}
var err error
Expand All @@ -54,50 +56,52 @@ func newClient(ctx context.Context, driverName string, db *sql.DB, tableName str
if err != nil {
return nil, err
}
return &dbStorageClient{db, selectQuery, setQuery, deleteQuery}, nil
return &dbStorageClient{logger, db, selectQuery, setQuery, deleteQuery}, nil
}

// Get will retrieve data from storage that corresponds to the specified key
func (c *dbStorageClient) Get(ctx context.Context, key string) ([]byte, error) {
rows, err := c.getQuery.QueryContext(ctx, key)
if err != nil {
return nil, err
}
if !rows.Next() {
return nil, nil
}
var result []byte
err = rows.Scan(&result)
if err != nil {
return result, err
}
err = rows.Close()
return result, err
return c.get(ctx, key, nil)
}

// Set will store data. The data can be retrieved using the same key
func (c *dbStorageClient) Set(ctx context.Context, key string, value []byte) error {
_, err := c.setQuery.ExecContext(ctx, key, value, value)
return err
return c.set(ctx, key, value, nil)
}

// Delete will delete data associated with the specified key
func (c *dbStorageClient) Delete(ctx context.Context, key string) error {
_, err := c.deleteQuery.ExecContext(ctx, key)
return err
return c.delete(ctx, key, nil)
}

// Batch executes the specified operations in order. Get operation results are updated in place
func (c *dbStorageClient) Batch(ctx context.Context, ops ...*storage.Operation) error {
var err error
// Start a new transaction
tx, err := c.db.BeginTx(ctx, nil)
if err != nil {
return err
}

// In case of any error we should roll back whole transaction to keep DB in consistent state
// In case of successful commit - tx.Rollback() will be a no-op here as tx is already closed
defer func() {
// We should ignore error related already finished transaction here
// It might happened, for example, if Context was canceled outside of Batch() function
// in this case whole transaction will be rolled back by sql package and we'll receive ErrTxDone here,
// which is actually not an issue because transaction was correctly closed with rollback
if rollbackErr := tx.Rollback(); !errors.Is(rollbackErr, sql.ErrTxDone) {
c.logger.Error("Failed to rollback Batch() transaction", zap.Error(rollbackErr))
}
}()

for _, op := range ops {
switch op.Type {
case storage.Get:
op.Value, err = c.Get(ctx, op.Key)
op.Value, err = c.get(ctx, op.Key, tx)
case storage.Set:
err = c.Set(ctx, op.Key, op.Value)
err = c.set(ctx, op.Key, op.Value, tx)
case storage.Delete:
err = c.Delete(ctx, op.Key)
err = c.delete(ctx, op.Key, tx)
default:
return errors.New("wrong operation type")
}
Expand All @@ -106,7 +110,8 @@ func (c *dbStorageClient) Batch(ctx context.Context, ops ...*storage.Operation)
return err
}
}
return err

return tx.Commit()
}

// Close will close the database
Expand All @@ -119,3 +124,39 @@ func (c *dbStorageClient) Close(_ context.Context) error {
}
return c.getQuery.Close()
}

func (c *dbStorageClient) get(ctx context.Context, key string, tx *sql.Tx) ([]byte, error) {
rows, err := c.wrapTx(c.getQuery, tx).QueryContext(ctx, key)
if err != nil {
return nil, err
}

if !rows.Next() {
return nil, nil
}

var result []byte
if err := rows.Scan(&result); err != nil {
return result, err
}

return result, rows.Close()
}

func (c *dbStorageClient) set(ctx context.Context, key string, value []byte, tx *sql.Tx) error {
_, err := c.wrapTx(c.setQuery, tx).ExecContext(ctx, key, value, value)
return err
}

func (c *dbStorageClient) delete(ctx context.Context, key string, tx *sql.Tx) error {
_, err := c.wrapTx(c.deleteQuery, tx).ExecContext(ctx, key)
return err
}

func (c *dbStorageClient) wrapTx(stmt *sql.Stmt, tx *sql.Tx) *sql.Stmt {
if tx != nil {
return tx.Stmt(stmt)
}

return stmt
}
Loading

0 comments on commit ee2d165

Please sign in to comment.