Skip to content

Commit 5244d8f

Browse files
committed
Structs for upsert and delete
1 parent 9f2a6c6 commit 5244d8f

File tree

2 files changed

+141
-89
lines changed

2 files changed

+141
-89
lines changed

database/db.go

Lines changed: 6 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -691,67 +691,25 @@ func (db *DB) CreateIgnoreStreamed(
691691
)
692692
}
693693

694-
func WithOnSuccessUpsert(onSuccess ...OnSuccess[Entity]) ExecOption {
695-
return func(options *ExecOptions) {
696-
options.onSuccess = onSuccess
697-
}
698-
}
699-
700-
func WithStatement(stmt string, placeholders int) ExecOption {
701-
return func(options *ExecOptions) {
702-
options.stmt = stmt
703-
options.placeholders = placeholders
704-
}
705-
}
706-
707-
type ExecOption func(options *ExecOptions)
708-
709-
type ExecOptions struct {
710-
onSuccess []OnSuccess[Entity]
711-
stmt string
712-
placeholders int
713-
}
714-
715-
func NewExecOptions(execOpts ...ExecOption) *ExecOptions {
716-
execOptions := &ExecOptions{}
717-
718-
for _, option := range execOpts {
719-
option(execOptions)
720-
}
721-
722-
return execOptions
723-
}
724-
725694
// UpsertStreamed bulk upserts the specified entities via NamedBulkExec.
726695
// The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream.
727696
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
728697
// concurrency is controlled via Options.MaxConnectionsPerTable.
729698
// Entities for which the query ran successfully will be passed to onSuccess.
730699
func (db *DB) UpsertStreamed(
731-
ctx context.Context, entities <-chan Entity, execOpts ...ExecOption,
700+
ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity],
732701
) error {
733-
734-
execOptions := NewExecOptions(execOpts...)
735-
736702
first, forward, err := com.CopyFirst(ctx, entities)
737703
if err != nil {
738704
return errors.Wrap(err, "can't copy first entity")
739705
}
740706

741707
sem := db.GetSemaphoreForTable(TableName(first))
742-
var stmt string
743-
var placeholders int
744-
745-
if execOptions.stmt != "" {
746-
stmt = execOptions.stmt
747-
placeholders = execOptions.placeholders
748-
} else {
749-
stmt, placeholders = db.BuildUpsertStmt(first)
750-
}
708+
stmt, placeholders := db.BuildUpsertStmt(first)
751709

752710
return db.NamedBulkExec(
753711
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem,
754-
forward, SplitOnDupId[Entity], execOptions.onSuccess...,
712+
forward, SplitOnDupId[Entity], onSuccess...,
755713
)
756714
}
757715

@@ -770,58 +728,17 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan Entity) error
770728
return db.NamedBulkExecTx(ctx, stmt, db.Options.MaxRowsPerTransaction, sem, forward)
771729
}
772730

773-
func WithOnSuccessDelete(onSuccess ...OnSuccess[any]) DeleteOption {
774-
return func(options *DeleteOptions) {
775-
options.onSuccess = onSuccess
776-
}
777-
}
778-
779-
func ByColumn(column string) DeleteOption {
780-
return func(options *DeleteOptions) {
781-
options.column = column
782-
}
783-
}
784-
785-
type DeleteOption func(options *DeleteOptions)
786-
787-
type DeleteOptions struct {
788-
onSuccess []OnSuccess[any]
789-
column string
790-
}
791-
792-
func NewDeleteOptions(execOpts ...DeleteOption) *DeleteOptions {
793-
deleteOptions := &DeleteOptions{}
794-
795-
for _, option := range execOpts {
796-
option(deleteOptions)
797-
}
798-
799-
return deleteOptions
800-
}
801-
802731
// DeleteStreamed bulk deletes the specified ids via BulkExec.
803732
// The delete statement is created using BuildDeleteStmt with the passed entityType.
804733
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
805734
// concurrency is controlled via Options.MaxConnectionsPerTable.
806735
// IDs for which the query ran successfully will be passed to onSuccess.
807736
func (db *DB) DeleteStreamed(
808-
ctx context.Context, entityType Entity, ids <-chan interface{}, deleteOpts ...DeleteOption,
737+
ctx context.Context, entityType Entity, ids <-chan interface{}, onSuccess ...OnSuccess[any],
809738
) error {
810-
811-
deleteOptions := NewDeleteOptions(deleteOpts...)
812-
813739
sem := db.GetSemaphoreForTable(TableName(entityType))
814-
815-
var stmt string
816-
817-
if deleteOptions.column != "" {
818-
stmt = fmt.Sprintf("DELETE FROM %s WHERE %s IN (?)", TableName(entityType), deleteOptions.column)
819-
} else {
820-
stmt = db.BuildDeleteStmt(entityType)
821-
}
822-
823740
return db.BulkExec(
824-
ctx, stmt, db.Options.MaxPlaceholdersPerStatement, sem, ids, deleteOptions.onSuccess...,
741+
ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, onSuccess...,
825742
)
826743
}
827744

@@ -837,7 +754,7 @@ func (db *DB) Delete(
837754
}
838755
close(idsCh)
839756

840-
return db.DeleteStreamed(ctx, entityType, idsCh, WithOnSuccessDelete(onSuccess...))
757+
return db.DeleteStreamed(ctx, entityType, idsCh, onSuccess...)
841758
}
842759

843760
func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted {

database/optionally.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package database
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/icinga/icinga-go-library/com"
7+
"github.com/pkg/errors"
8+
)
9+
10+
// Upsert inserts new rows into a table or updates rows of a table if the primary key already exists.
11+
type Upsert interface {
12+
// Stream bulk upserts the specified entities via NamedBulkExec.
13+
// If not explicitly specified, the upsert statement is created using
14+
// BuildUpsertStmt with the first entity from the entities stream.
15+
Stream(ctx context.Context, entities <-chan Entity) error
16+
}
17+
18+
// UpsertOption is a functional option for NewUpsert.
19+
type UpsertOption func(u *upsert)
20+
21+
// WithOnUpsert adds callback(s) to bulk upserts. Entities for which the
22+
// operation was performed successfully are passed to the callbacks.
23+
func WithOnUpsert(onUpsert ...OnSuccess[Entity]) UpsertOption {
24+
return func(u *upsert) {
25+
u.onUpsert = onUpsert
26+
}
27+
}
28+
29+
// WithStatement uses the specified statement for bulk upserts instead of automatically creating one.
30+
func WithStatement(stmt string, placeholders int) UpsertOption {
31+
return func(u *upsert) {
32+
u.stmt = stmt
33+
u.placeholders = placeholders
34+
}
35+
}
36+
37+
// NewUpsert creates a new Upsert initalized with a database.
38+
func NewUpsert(db *DB, options ...UpsertOption) Upsert {
39+
u := &upsert{db: db}
40+
41+
for _, option := range options {
42+
option(u)
43+
}
44+
45+
return u
46+
}
47+
48+
type upsert struct {
49+
db *DB
50+
onUpsert []OnSuccess[Entity]
51+
stmt string
52+
placeholders int
53+
}
54+
55+
func (u *upsert) Stream(ctx context.Context, entities <-chan Entity) error {
56+
first, forward, err := com.CopyFirst(ctx, entities)
57+
if err != nil {
58+
return errors.Wrap(err, "can't copy first entity")
59+
}
60+
61+
sem := u.db.GetSemaphoreForTable(TableName(first))
62+
var stmt string
63+
var placeholders int
64+
65+
if u.stmt != "" {
66+
stmt = u.stmt
67+
placeholders = u.placeholders
68+
} else {
69+
stmt, placeholders = u.db.BuildUpsertStmt(first)
70+
}
71+
72+
return u.db.NamedBulkExec(
73+
ctx, stmt, u.db.BatchSizeByPlaceholders(placeholders), sem,
74+
forward, SplitOnDupId[Entity], u.onUpsert...,
75+
)
76+
}
77+
78+
// Delete deletes rows of a table.
79+
type Delete interface {
80+
// Stream bulk deletes rows from the table specified in from using the given args stream via BulkExec.
81+
// Unless explicitly specified, the DELETE statement is created using BuildDeleteStmt.
82+
Stream(ctx context.Context, from any, args <-chan any) error
83+
}
84+
85+
// DeleteOption is a functional option for NewDelete.
86+
type DeleteOption func(options *delete)
87+
88+
// WithOnDelete adds callback(s) to bulk deletes. Arguments for which the
89+
// operation was performed successfully are passed to the callbacks.
90+
func WithOnDelete(onDelete ...OnSuccess[any]) DeleteOption {
91+
return func(d *delete) {
92+
d.onDelete = onDelete
93+
}
94+
}
95+
96+
// ByColumn uses the given column for the WHERE clause that the rows must
97+
// satisfy in order to be deleted, instead of automatically using ID.
98+
func ByColumn(column string) DeleteOption {
99+
return func(d *delete) {
100+
d.column = column
101+
}
102+
}
103+
104+
// NewDelete creates a new Delete initalized with a database.
105+
func NewDelete(db *DB, options ...DeleteOption) Delete {
106+
d := &delete{db: db}
107+
108+
for _, option := range options {
109+
option(d)
110+
}
111+
112+
return d
113+
}
114+
115+
type delete struct {
116+
db *DB
117+
column string
118+
onDelete []OnSuccess[any]
119+
}
120+
121+
func (d *delete) Stream(ctx context.Context, from any, args <-chan any) error {
122+
var stmt string
123+
124+
if d.column != "" {
125+
stmt = fmt.Sprintf(`DELETE FROM "%s" WHERE %s IN (?)`, TableName(from), d.column)
126+
} else {
127+
stmt = d.db.BuildDeleteStmt(from)
128+
}
129+
130+
sem := d.db.GetSemaphoreForTable(TableName(from))
131+
132+
return d.db.BulkExec(
133+
ctx, stmt, d.db.Options.MaxPlaceholdersPerStatement, sem, args, d.onDelete...,
134+
)
135+
}

0 commit comments

Comments
 (0)