Skip to content
This repository was archived by the owner on Jan 28, 2021. It is now read-only.

Commit b05455e

Browse files
kuba--ajnavarro
kuba--
authored andcommitted
No holder.
Signed-off-by: kuba-- <[email protected]> (cherry picked from commit 3d2c375)
1 parent 2f288e1 commit b05455e

File tree

3 files changed

+258
-19
lines changed

3 files changed

+258
-19
lines changed

sql/index/pilosa/driver.go

+31-15
Original file line numberDiff line numberDiff line change
@@ -69,19 +69,15 @@ type (
6969

7070
// Driver implements sql.IndexDriver interface.
7171
Driver struct {
72-
root string
73-
holder *pilosa.Holder
72+
root string
7473
}
7574
)
7675

7776
// NewDriver returns a new instance of pilosa.Driver
7877
// which satisfies sql.IndexDriver interface
7978
func NewDriver(root string) *Driver {
80-
h := pilosa.NewHolder()
81-
h.Path = filepath.Join(root, "."+DriverID)
8279
return &Driver{
83-
root: root,
84-
holder: h,
80+
root: root,
8581
}
8682
}
8783

@@ -116,16 +112,12 @@ func (d *Driver) Create(
116112
return nil, err
117113
}
118114

119-
idx, err := d.holder.CreateIndexIfNotExists(
120-
indexName(db, table),
121-
pilosa.IndexOptions{},
122-
)
115+
idx, err := d.newPilosaIndex(db, table)
123116
if err != nil {
124117
return nil, err
125118
}
126119

127120
mapping := newMapping(d.mappingFilePath(db, table, id))
128-
129121
processingFile := d.processingFilePath(db, table, id)
130122
if err := index.WriteProcessingFile(
131123
processingFile,
@@ -173,11 +165,14 @@ func (d *Driver) LoadAll(db, table string) ([]sql.Index, error) {
173165
}
174166

175167
func (d *Driver) loadIndex(db, table, id string) (*pilosaIndex, error) {
176-
name := indexName(db, table)
177-
idx := d.holder.Index(name)
178-
if idx == nil {
179-
return nil, errLoadingIndex.New(name)
168+
idx, err := d.newPilosaIndex(db, table)
169+
if err != nil {
170+
return nil, err
180171
}
172+
if err := idx.Open(); err != nil {
173+
return nil, err
174+
}
175+
defer idx.Close()
181176

182177
dir := filepath.Join(d.root, db, table, id)
183178
config := d.configFilePath(db, table, id)
@@ -328,6 +323,12 @@ func (d *Driver) Save(
328323
if !ok {
329324
return errInvalidIndexType.New(i)
330325
}
326+
327+
if err := idx.index.Open(); err != nil {
328+
return err
329+
}
330+
defer idx.index.Close()
331+
331332
idx.wg.Add(1)
332333
defer idx.wg.Done()
333334

@@ -386,6 +387,11 @@ func (d *Driver) Delete(i sql.Index, partitions sql.PartitionIter) error {
386387
idx.wg.Wait()
387388
}
388389

390+
if err := idx.index.Open(); err != nil {
391+
return err
392+
}
393+
defer idx.index.Close()
394+
389395
if err := os.RemoveAll(filepath.Join(d.root, i.Database(), i.Table(), i.ID())); err != nil {
390396
return err
391397
}
@@ -534,3 +540,13 @@ func (d *Driver) processingFilePath(db, table, id string) string {
534540
func (d *Driver) mappingFilePath(db, table, id string) string {
535541
return filepath.Join(d.root, db, table, id, MappingFileName)
536542
}
543+
544+
func (d *Driver) newPilosaIndex(db, table string) (*pilosa.Index, error) {
545+
name := indexName(db, table)
546+
path := filepath.Join(d.root, "."+DriverID, name)
547+
idx, err := pilosa.NewIndex(path, name)
548+
if err != nil {
549+
return nil, err
550+
}
551+
return idx, nil
552+
}

sql/index/pilosa/driver_test.go

+221
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,66 @@ func TestLoadAll(t *testing.T) {
9999
require.NoError(err)
100100
}
101101

102+
func TestLoadAllWithMultipleDrivers(t *testing.T) {
103+
require := require.New(t)
104+
setup(t)
105+
defer cleanup(t)
106+
107+
d1 := NewDriver(tmpDir)
108+
idx1, err := d1.Create("db", "table", "id1", makeExpressions("table", "hash1"), nil)
109+
require.NoError(err)
110+
it1 := &partitionKeyValueIter{
111+
partitions: 2,
112+
offset: 0,
113+
total: 64,
114+
expressions: idx1.Expressions(),
115+
location: randLocation,
116+
}
117+
require.NoError(d1.Save(sql.NewEmptyContext(), idx1, it1))
118+
119+
d2 := NewDriver(tmpDir)
120+
idx2, err := d2.Create("db", "table", "id2", makeExpressions("table", "hash1"), nil)
121+
require.NoError(err)
122+
it2 := &partitionKeyValueIter{
123+
partitions: 2,
124+
offset: 0,
125+
total: 64,
126+
expressions: idx2.Expressions(),
127+
location: randLocation,
128+
}
129+
require.NoError(d2.Save(sql.NewEmptyContext(), idx2, it2))
130+
131+
d := NewDriver(tmpDir)
132+
indexes, err := d.LoadAll("db", "table")
133+
require.NoError(err)
134+
135+
require.Equal(2, len(indexes))
136+
i1, ok := idx1.(*pilosaIndex)
137+
require.True(ok)
138+
i2, ok := idx2.(*pilosaIndex)
139+
require.True(ok)
140+
141+
require.Equal(i1.index.Name(), i2.index.Name())
142+
143+
// Load index from another table. Previously this panicked as the same
144+
// pilosa.Holder was used for all indexes.
145+
146+
d3 := NewDriver(tmpDir)
147+
idx3, err := d3.Create("db", "table2", "id1", makeExpressions("table2", "hash1"), nil)
148+
require.NoError(err)
149+
it3 := &partitionKeyValueIter{
150+
partitions: 2,
151+
offset: 0,
152+
total: 64,
153+
expressions: idx3.Expressions(),
154+
location: randLocation,
155+
}
156+
require.NoError(d3.Save(sql.NewEmptyContext(), idx3, it3))
157+
158+
indexes, err = d.LoadAll("db", "table2")
159+
require.NoError(err)
160+
}
161+
102162
type logLoc struct {
103163
loc []byte
104164
err error
@@ -214,6 +274,39 @@ func TestSaveAndGetAll(t *testing.T) {
214274
require.True(errInvalidKeys.Is(err))
215275
}
216276

277+
func TestSaveAndGetAllWithMultipleDrivers(t *testing.T) {
278+
require := require.New(t)
279+
setup(t)
280+
defer cleanup(t)
281+
282+
db, table, id := "db_name", "table_name", "index_id"
283+
expressions := makeExpressions(table, "lang", "hash")
284+
285+
d1 := NewDriver(tmpDir)
286+
sqlIdx, err := d1.Create(db, table, id, expressions, nil)
287+
require.NoError(err)
288+
289+
it := &partitionKeyValueIter{
290+
partitions: 2,
291+
offset: 0,
292+
total: 64,
293+
expressions: sqlIdx.Expressions(),
294+
location: randLocation,
295+
}
296+
297+
err = d1.Save(sql.NewEmptyContext(), sqlIdx, it)
298+
require.NoError(err)
299+
300+
d2 := NewDriver(tmpDir)
301+
indexes, err := d2.LoadAll(db, table)
302+
require.NoError(err)
303+
require.Equal(1, len(indexes))
304+
305+
_, err = sqlIdx.Get()
306+
require.Error(err)
307+
require.True(errInvalidKeys.Is(err))
308+
}
309+
217310
func TestLoadCorruptedIndex(t *testing.T) {
218311
require := require.New(t)
219312
setup(t)
@@ -254,6 +347,58 @@ func TestDelete(t *testing.T) {
254347
require.NoError(err)
255348
}
256349

350+
func TestDeleteWithMultipleDrivers(t *testing.T) {
351+
require := require.New(t)
352+
setup(t)
353+
defer cleanup(t)
354+
355+
db, table, id := "db_name", "table_name", "index_id"
356+
357+
expressions := []sql.Expression{
358+
expression.NewGetFieldWithTable(0, sql.Int64, table, "lang", true),
359+
expression.NewGetFieldWithTable(1, sql.Int64, table, "field", true),
360+
}
361+
362+
d := NewDriver(tmpDir)
363+
sqlIdx, err := d.Create(db, table, id, expressions, nil)
364+
require.NoError(err)
365+
366+
d = NewDriver(tmpDir)
367+
err = d.Delete(sqlIdx, new(partitionIter))
368+
require.NoError(err)
369+
}
370+
371+
func TestDeleteAndLoadAll(t *testing.T) {
372+
require := require.New(t)
373+
setup(t)
374+
defer cleanup(t)
375+
376+
db, table, id := "db_name", "table_name", "index_id"
377+
expressions := makeExpressions(table, "lang", "hash")
378+
379+
d := NewDriver(tmpDir)
380+
sqlIdx, err := d.Create(db, table, id, expressions, nil)
381+
require.NoError(err)
382+
383+
it := &partitionKeyValueIter{
384+
partitions: 2,
385+
offset: 0,
386+
total: 64,
387+
expressions: sqlIdx.Expressions(),
388+
location: randLocation,
389+
}
390+
391+
err = d.Save(sql.NewEmptyContext(), sqlIdx, it)
392+
require.NoError(err)
393+
394+
err = d.Delete(sqlIdx, new(partitionIter))
395+
require.NoError(err)
396+
397+
indexes, err := d.LoadAll(db, table)
398+
require.NoError(err)
399+
require.Equal(0, len(indexes))
400+
}
401+
257402
func TestDeleteInProgress(t *testing.T) {
258403
require := require.New(t)
259404
setup(t)
@@ -449,6 +594,82 @@ func TestIntersection(t *testing.T) {
449594
require.NoError(interIt.Close())
450595
}
451596

597+
func TestIntersectionWithMultipleDrivers(t *testing.T) {
598+
ctx := sql.NewContext(context.Background())
599+
require := require.New(t)
600+
setup(t)
601+
defer cleanup(t)
602+
603+
db, table := "db_name", "table_name"
604+
idxLang, expLang := "idx_lang", makeExpressions(table, "lang")
605+
idxPath, expPath := "idx_path", makeExpressions(table, "path")
606+
607+
d1 := NewDriver(tmpDir)
608+
sqlIdxLang, err := d1.Create(db, table, idxLang, expLang, nil)
609+
require.NoError(err)
610+
611+
d2 := NewDriver(tmpDir)
612+
sqlIdxPath, err := d2.Create(db, table, idxPath, expPath, nil)
613+
require.NoError(err)
614+
615+
itLang := &partitionKeyValueIter{
616+
partitions: 2,
617+
offset: 0,
618+
total: 10,
619+
expressions: sqlIdxLang.Expressions(),
620+
location: offsetLocation,
621+
}
622+
623+
itPath := &partitionKeyValueIter{
624+
partitions: 2,
625+
offset: 0,
626+
total: 10,
627+
expressions: sqlIdxPath.Expressions(),
628+
location: offsetLocation,
629+
}
630+
631+
err = d1.Save(ctx, sqlIdxLang, itLang)
632+
require.NoError(err)
633+
634+
err = d2.Save(ctx, sqlIdxPath, itPath)
635+
require.NoError(err)
636+
637+
lookupLang, err := sqlIdxLang.Get(itLang.records[0][0].values...)
638+
require.NoError(err)
639+
lookupPath, err := sqlIdxPath.Get(itPath.records[0][itPath.total-1].values...)
640+
require.NoError(err)
641+
642+
m, ok := lookupLang.(sql.Mergeable)
643+
require.True(ok)
644+
require.True(m.IsMergeable(lookupPath))
645+
646+
interLookup, ok := lookupLang.(sql.SetOperations)
647+
require.True(ok)
648+
interIt, err := interLookup.Intersection(lookupPath).Values(testPartition(0))
649+
require.NoError(err)
650+
loc, err := interIt.Next()
651+
652+
require.True(err == io.EOF)
653+
require.NoError(interIt.Close())
654+
655+
lookupLang, err = sqlIdxLang.Get(itLang.records[0][0].values...)
656+
require.NoError(err)
657+
lookupPath, err = sqlIdxPath.Get(itPath.records[0][0].values...)
658+
require.NoError(err)
659+
660+
interLookup, ok = lookupPath.(sql.SetOperations)
661+
require.True(ok)
662+
interIt, err = interLookup.Intersection(lookupLang).Values(testPartition(0))
663+
require.NoError(err)
664+
loc, err = interIt.Next()
665+
require.NoError(err)
666+
require.Equal(loc, itPath.records[0][0].location)
667+
_, err = interIt.Next()
668+
require.True(err == io.EOF)
669+
670+
require.NoError(interIt.Close())
671+
}
672+
452673
func TestUnion(t *testing.T) {
453674
require := require.New(t)
454675
setup(t)

sql/index/pilosa/lookup.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,11 @@ func (l *indexLookup) values(p sql.Partition) (*pilosa.Row, error) {
8484
if err := l.mapping.open(); err != nil {
8585
return nil, err
8686
}
87-
8887
defer l.mapping.close()
8988

89+
if err := l.index.Open(); err != nil {
90+
return nil, err
91+
}
9092
var row *pilosa.Row
9193
for i, expr := range l.expressions {
9294
field := l.index.Field(fieldName(l.id, expr, p))
@@ -105,6 +107,9 @@ func (l *indexLookup) values(p sql.Partition) (*pilosa.Row, error) {
105107

106108
row = intersect(row, r)
107109
}
110+
if err := l.index.Close(); err != nil {
111+
return nil, err
112+
}
108113

109114
// evaluate composition of operations
110115
for _, op := range l.operations {
@@ -131,9 +136,6 @@ func (l *indexLookup) values(p sql.Partition) (*pilosa.Row, error) {
131136

132137
// Values implements sql.IndexLookup.Values
133138
func (l *indexLookup) Values(p sql.Partition) (sql.IndexValueIter, error) {
134-
l.index.Open()
135-
defer l.index.Close()
136-
137139
row, err := l.values(p)
138140
if err != nil {
139141
return nil, err

0 commit comments

Comments
 (0)