Skip to content

Commit 00263ae

Browse files
committed
File store does not keep a gigantic map of every message offset in memory considering it is already in the header file
1 parent 5b44e1c commit 00263ae

File tree

1 file changed

+30
-48
lines changed

1 file changed

+30
-48
lines changed

store/file/file_store.go

+30-48
Original file line numberDiff line numberDiff line change
@@ -22,28 +22,20 @@ import (
2222
"path"
2323
"strconv"
2424
"strings"
25-
"sync"
2625
"time"
2726

2827
"github.com/pkg/errors"
29-
3028
"github.com/quickfixgo/quickfix"
3129
"github.com/quickfixgo/quickfix/config"
3230
)
3331

34-
type msgDef struct {
35-
offset int64
36-
size int
37-
}
38-
3932
type fileStoreFactory struct {
4033
settings *quickfix.Settings
4134
}
4235

4336
type fileStore struct {
4437
sessionID quickfix.SessionID
4538
cache quickfix.MessageStore
46-
offsets sync.Map
4739
bodyFname string
4840
headerFname string
4941
sessionFname string
@@ -107,7 +99,6 @@ func newFileStore(sessionID quickfix.SessionID, dirname string, fileSync bool) (
10799
store := &fileStore{
108100
sessionID: sessionID,
109101
cache: memStore,
110-
offsets: sync.Map{},
111102
bodyFname: path.Join(dirname, fmt.Sprintf("%s.%s", sessionPrefix, "body")),
112103
headerFname: path.Join(dirname, fmt.Sprintf("%s.%s", sessionPrefix, "header")),
113104
sessionFname: path.Join(dirname, fmt.Sprintf("%s.%s", sessionPrefix, "session")),
@@ -199,18 +190,6 @@ func (store *fileStore) Refresh() (err error) {
199190
}
200191

201192
func (store *fileStore) populateCache() (creationTimePopulated bool, err error) {
202-
if tmpHeaderFile, err := os.Open(store.headerFname); err == nil {
203-
defer tmpHeaderFile.Close()
204-
for {
205-
var seqNum, size int
206-
var offset int64
207-
if cnt, err := fmt.Fscanf(tmpHeaderFile, "%d,%d,%d\n", &seqNum, &offset, &size); err != nil || cnt != 3 {
208-
break
209-
}
210-
store.offsets.Store(seqNum, msgDef{offset: offset, size: size})
211-
}
212-
}
213-
214193
if timeBytes, err := os.ReadFile(store.sessionFname); err == nil {
215194
var ctime time.Time
216195
if err := ctime.UnmarshalText(timeBytes); err == nil {
@@ -348,7 +327,6 @@ func (store *fileStore) SaveMessage(seqNum int, msg []byte) error {
348327
}
349328
}
350329

351-
store.offsets.Store(seqNum, msgDef{offset: offset, size: len(msg)})
352330
return nil
353331
}
354332

@@ -360,34 +338,38 @@ func (store *fileStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []
360338
return store.IncrNextSenderMsgSeqNum()
361339
}
362340

363-
func (store *fileStore) getMessage(seqNum int) (msg []byte, found bool, err error) {
364-
msgInfoTemp, found := store.offsets.Load(seqNum)
365-
if !found {
366-
return
367-
}
368-
msgInfo, ok := msgInfoTemp.(msgDef)
369-
if !ok {
370-
return nil, true, fmt.Errorf("incorrect msgInfo type while reading file: %s", store.bodyFname)
371-
}
372-
373-
msg = make([]byte, msgInfo.size)
374-
if _, err = store.bodyFile.ReadAt(msg, msgInfo.offset); err != nil {
375-
return nil, true, fmt.Errorf("unable to read from file: %s: %s", store.bodyFname, err.Error())
376-
}
377-
378-
return msg, true, nil
379-
}
380-
381341
func (store *fileStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error {
382-
for seqNum := beginSeqNum; seqNum <= endSeqNum; seqNum++ {
383-
m, found, err := store.getMessage(seqNum)
384-
if err != nil {
385-
return err
386-
}
387-
if found {
388-
if err = cb(m); err != nil {
389-
return err
342+
// Sync files and seek to start of header file
343+
if err := store.bodyFile.Sync(); err != nil {
344+
return fmt.Errorf("unable to flush file: %s: %s", store.bodyFname, err.Error())
345+
} else if err = store.headerFile.Sync(); err != nil {
346+
return fmt.Errorf("unable to flush file: %s: %s", store.headerFname, err.Error())
347+
} else if _, err = store.headerFile.Seek(0, io.SeekStart); err != nil {
348+
return fmt.Errorf("unable to seek to start of file: %s: %s", store.headerFname, err.Error())
349+
}
350+
351+
// Iterate over the header file
352+
for {
353+
var seqNum, size int
354+
var offset int64
355+
if cnt, err := fmt.Fscanf(store.headerFile, "%d,%d,%d\n", &seqNum, &offset, &size); err != nil {
356+
if errors.Is(err, io.EOF) {
357+
break
390358
}
359+
return fmt.Errorf("unable to read from file: %s: %s", store.headerFname, err.Error())
360+
} else if cnt < 3 || seqNum > endSeqNum {
361+
// If we have reached the end of possible iteration then break
362+
break
363+
} else if seqNum < beginSeqNum {
364+
// If we have not yet reached the starting sequence number then continue
365+
continue
366+
}
367+
// Otherwise process the file
368+
msg := make([]byte, size)
369+
if _, err := store.bodyFile.ReadAt(msg, offset); err != nil {
370+
return fmt.Errorf("unable to read from file: %s: %s", store.bodyFname, err.Error())
371+
} else if err = cb(msg); err != nil {
372+
return err
391373
}
392374
}
393375
return nil

0 commit comments

Comments
 (0)