Skip to content

Commit

Permalink
Implemented file buffers for directory queue manager. (Velocidex#876)
Browse files Browse the repository at this point in the history
Previously, the queue manager was in memory only, pushing events
through memory channels to consumers. This means if a consumer was too
slow, events were accumulating in memory until the channel was
full. At this point, the back pressure on the senders would cause
server lockup since queues are often read in the critical path.

This change creates an overflow buffer file for each queue
listener. If the listener is not available to read from the channel
immediately, the queue manager will write the data to a file, and
return immediately to the writer.

Therefore it is no longer possible to block on queue writes which
ensures stability under high load.
  • Loading branch information
scudette authored Jan 18, 2021
1 parent a1a52ce commit 8263fa7
Show file tree
Hide file tree
Showing 38 changed files with 1,011 additions and 132 deletions.
2 changes: 1 addition & 1 deletion api/csrf.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func csrfProtect(config_obj *config_proto.Config,
_, _ = hasher.Write([]byte(config_obj.Frontend.PrivateKey))
token := hasher.Sum(nil)

protectionFn := csrf.Protect(token, csrf.Path("/"))
protectionFn := csrf.Protect(token, csrf.Path("/"), csrf.MaxAge(7*24*60*60))

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
protectionFn(parent).ServeHTTP(w, r)
Expand Down
4 changes: 2 additions & 2 deletions artifacts/definitions/Windows/Forensics/Lnk.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -395,9 +395,9 @@ sources:
column_types:
- name: Mtime
type: timestamp
- name: ATime
- name: Atime
type: timestamp
- name: CTime
- name: Ctime
type: timestamp
- name: HeaderCreationTime
type: timestamp
Expand Down
7 changes: 5 additions & 2 deletions executor/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,18 @@ func (self *PoolClientExecutor) ReadResponse() <-chan *crypto_proto.GrrMessage {

// Inspect the request and decide if we will cache it under a query.
func getQueryName(message *crypto_proto.GrrMessage) string {
query_name := ""
if message.VQLClientAction != nil {
for _, query := range message.VQLClientAction.Query {
if query.Name != "" {
return query.Name
query_name = query.Name
}
}
// Cache it under the query name and the
serialized, _ := json.Marshal(message.VQLClientAction.Env)
return fmt.Sprintf("%v: %v", query_name, string(serialized))

}

return ""
}

Expand Down
3 changes: 2 additions & 1 deletion file_store/api/queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
// responsible for rotating the queue files as required.
type QueueManager interface {
PushEventRows(path_manager PathManager, rows []*ordereddict.Dict) error
Watch(queue_name string) (output <-chan *ordereddict.Dict, cancel func())
Watch(ctx context.Context, queue_name string) (
output <-chan *ordereddict.Dict, cancel func())
}

type ResultSetFileProperties struct {
Expand Down
3 changes: 2 additions & 1 deletion file_store/api/testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ func (self *QueueManagerTestSuite) TestPush() {
ordereddict.NewDict().Set("foo", 1),
ordereddict.NewDict().Set("foo", 2)}

output, cancel := self.manager.Watch(artifact_name)
ctx := context.Background()
output, cancel := self.manager.Watch(ctx, artifact_name)
defer cancel()

err := self.manager.PushEventRows(
Expand Down
239 changes: 239 additions & 0 deletions file_store/directory/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
// A ring buffer to queue messages

// Similar to the client ring buffer but this one has no limit because
// we never want to block writers.

package directory

import (
"encoding/binary"
"encoding/json"
"errors"
"io"
"os"
"sync"

"github.com/Velocidex/ordereddict"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/constants"
logging "www.velocidex.com/golang/velociraptor/logging"
)

// The below is similar to http_comms.FileBasedRingBuffer except:
// * Size of the file is not limited.
// * Leasing a full number of messages at once (rather than combined size).

const (
FileMagic = "VRB\x5e"
FirstRecordOffset = 50
)

type Header struct {
ReadPointer int64 // Leasing will start at this file offset.
WritePointer int64 // Enqueue will write at this file position.
}

func (self *Header) MarshalBinary() ([]byte, error) {
data := make([]byte, FirstRecordOffset)
copy(data, FileMagic)

binary.LittleEndian.PutUint64(data[4:12], uint64(self.ReadPointer))
binary.LittleEndian.PutUint64(data[12:20], uint64(self.WritePointer))

return data, nil
}

func (self *Header) UnmarshalBinary(data []byte) error {
if len(data) < FirstRecordOffset {
return errors.New("Invalid header length")
}

if string(data[:4]) != FileMagic {
return errors.New("Invalid Magic")
}

self.ReadPointer = int64(binary.LittleEndian.Uint64(data[4:12]))
self.WritePointer = int64(binary.LittleEndian.Uint64(data[12:20]))

return nil
}

type FileBasedRingBuffer struct {
config_obj *config_proto.Config

mu sync.Mutex

fd *os.File
header *Header

read_buf []byte
write_buf []byte

log_ctx *logging.LogContext
}

// Enqueue the item into the ring buffer and append to the end.
func (self *FileBasedRingBuffer) Enqueue(item interface{}) error {
serialized, err := json.Marshal(item)
if err != nil {
return err
}

self.mu.Lock()
defer self.mu.Unlock()

// Write the new message to the end of the file at the WritePointer
binary.LittleEndian.PutUint64(self.write_buf, uint64(len(serialized)))
_, err = self.fd.WriteAt(self.write_buf, int64(self.header.WritePointer))
if err != nil {
// File is corrupt now, reset it.
self.Reset()
return err
}

n, err := self.fd.WriteAt(serialized, int64(self.header.WritePointer+8))
if err != nil {
self.Reset()
return err
}

self.header.WritePointer += 8 + int64(n)

// Update the header
serialized, err = self.header.MarshalBinary()
if err != nil {
return err
}
_, err = self.fd.WriteAt(serialized, 0)
if err != nil {
self.Reset()
return err
}

return nil
}

// Returns some messages message from the file.
func (self *FileBasedRingBuffer) Lease(count int) []*ordereddict.Dict {
self.mu.Lock()
defer self.mu.Unlock()

result := make([]*ordereddict.Dict, 0, count)

// The file contains more data.
for self.header.WritePointer > self.header.ReadPointer {

// Read the next chunk (length+value) from the current leased pointer.
n, err := self.fd.ReadAt(self.read_buf, self.header.ReadPointer)
if err != nil || n != len(self.read_buf) {
self.log_ctx.Error("Possible corruption detected: file too short.")
self._Truncate()
return nil
}

length := int64(binary.LittleEndian.Uint64(self.read_buf))
// File might be corrupt - just reset the
// entire file.
if length > constants.MAX_MEMORY*2 || length <= 0 {
self.log_ctx.Error("Possible corruption detected - item length is too large.")
self._Truncate()
return nil
}

// Unmarshal one item at a time.
serialized := make([]byte, length)
n, _ = self.fd.ReadAt(serialized, self.header.ReadPointer+8)
if int64(n) != length {
self.log_ctx.Errorf(
"Possible corruption detected - expected item of length %v received %v.",
length, n)
self._Truncate()
return nil
}

item := ordereddict.NewDict()
err = item.UnmarshalJSON(serialized)
if err == nil {
result = append(result, item)
}

self.header.ReadPointer += 8 + int64(n)
// We read up to the write pointer, we may truncate the file now.
if self.header.ReadPointer == self.header.WritePointer {
self._Truncate()
}

if len(result) >= count {
break
}
}

return result
}

// _Truncate returns the file to a virgin state. Assumes
// FileBasedRingBuffer is already under lock.
func (self *FileBasedRingBuffer) _Truncate() {
_ = self.fd.Truncate(0)
self.header.ReadPointer = FirstRecordOffset
self.header.WritePointer = FirstRecordOffset
serialized, _ := self.header.MarshalBinary()
_, _ = self.fd.WriteAt(serialized, 0)
}

func (self *FileBasedRingBuffer) Reset() {
self.mu.Lock()
defer self.mu.Unlock()

self._Truncate()
}

// Closes the underlying file and shut down the readers.
func (self *FileBasedRingBuffer) Close() {
self.fd.Close()
}

func NewFileBasedRingBuffer(
config_obj *config_proto.Config, fd *os.File) (*FileBasedRingBuffer, error) {

log_ctx := logging.GetLogger(config_obj, &logging.FrontendComponent)

header := &Header{
// Pad the header a bit to allow for extensions.
WritePointer: FirstRecordOffset,
ReadPointer: FirstRecordOffset,
}
data := make([]byte, FirstRecordOffset)
n, err := fd.ReadAt(data, 0)
if n > 0 && n < FirstRecordOffset && err == io.EOF {
log_ctx.Error("Possible corruption detected: file too short.")
err = fd.Truncate(0)
if err != nil {
return nil, err
}
}

if n > 0 && (err == nil || err == io.EOF) {
err := header.UnmarshalBinary(data[:n])
// The header is not valid, truncate the file and
// start again.
if err != nil {
log_ctx.Errorf("Possible corruption detected: %v.", err)
err = fd.Truncate(0)
if err != nil {
return nil, err
}
}
}

result := &FileBasedRingBuffer{
config_obj: config_obj,
fd: fd,
header: header,
read_buf: make([]byte, 8),
write_buf: make([]byte, 8),
log_ctx: log_ctx,
}

return result, nil
}
Loading

0 comments on commit 8263fa7

Please sign in to comment.