From 8263fa75940b5da63873a95acc86dbad59f8428c Mon Sep 17 00:00:00 2001 From: Mike Cohen Date: Tue, 19 Jan 2021 02:12:07 +1000 Subject: [PATCH] Implemented file buffers for directory queue manager. (#876) Previously, the queue manager was in memory only, pushing events through memory channels to consumers. This means if a consumer was too slow, events were accumulating in memory until the channel was full. At this point, the back pressure on the senders would cause server lockup since queues are often read in the critical path. This change creates an overflow buffer file for each queue listener. If the listener is not available to read from the channel immediately, the queue manager will write the data to a file, and return immediately to the writer. Therefore it is no longer possible to block on queue writes which ensures stability under high load. --- api/csrf.go | 2 +- .../definitions/Windows/Forensics/Lnk.yaml | 4 +- executor/pool.go | 7 +- file_store/api/queues.go | 3 +- file_store/api/testsuite.go | 3 +- file_store/directory/buffer.go | 239 +++++++++++++++ file_store/directory/queue.go | 282 +++++++++++++++++- file_store/directory/queue_test.go | 138 ++++++++- file_store/memory/queue.go | 67 ++++- file_store/mysql/queue.go | 4 +- file_store/result_sets/events_test.go | 7 +- flows/artifacts.go | 16 +- flows/foreman.go | 4 + flows/hunts.go | 80 ++--- .../src/components/core/api-service.js | 7 + .../src/components/forms/validated_int.js | 6 +- .../src/components/vfs/file-list.js | 12 +- .../src/components/vfs/file-text-view.js | 4 +- .../src/components/vfs/file-tree.js | 1 + .../client_monitoring/client_monitoring.go | 2 +- services/hunt_dispatcher/hunt_dispatcher.go | 38 ++- services/hunt_manager/hunt_manager.go | 24 +- services/interrogation/interrogation.go | 2 +- services/interrogation/utils.go | 2 +- services/journal.go | 4 +- services/journal/journal.go | 5 +- services/labels/labels.go | 2 +- services/notifications/notifications.go | 5 +- .../server_monitoring/server_monitoring.go | 6 +- services/services.go | 2 + services/test_utils.go | 4 +- services/vfs_service/utils.go | 5 +- services/vfs_service/vfs_service.go | 5 + utils/dict.go | 18 +- vql/common/for.go | 50 ++++ vql/functions/lists.go | 71 +++++ vql/server/hunts/create.go | 10 +- vql/server/monitoring.go | 2 +- 38 files changed, 1011 insertions(+), 132 deletions(-) create mode 100644 file_store/directory/buffer.go diff --git a/api/csrf.go b/api/csrf.go index 1ae8c16476f..a1e59851a92 100644 --- a/api/csrf.go +++ b/api/csrf.go @@ -27,7 +27,7 @@ func csrfProtect(config_obj *config_proto.Config, _, _ = hasher.Write([]byte(config_obj.Frontend.PrivateKey)) token := hasher.Sum(nil) - protectionFn := csrf.Protect(token, csrf.Path("/")) + protectionFn := csrf.Protect(token, csrf.Path("/"), csrf.MaxAge(7*24*60*60)) return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { protectionFn(parent).ServeHTTP(w, r) diff --git a/artifacts/definitions/Windows/Forensics/Lnk.yaml b/artifacts/definitions/Windows/Forensics/Lnk.yaml index c269e321ce8..c42e50ab415 100644 --- a/artifacts/definitions/Windows/Forensics/Lnk.yaml +++ b/artifacts/definitions/Windows/Forensics/Lnk.yaml @@ -395,9 +395,9 @@ sources: column_types: - name: Mtime type: timestamp - - name: ATime + - name: Atime type: timestamp - - name: CTime + - name: Ctime type: timestamp - name: HeaderCreationTime type: timestamp diff --git a/executor/pool.go b/executor/pool.go index b860f73ab4e..9c46f45fbb2 100644 --- a/executor/pool.go +++ b/executor/pool.go @@ -71,15 +71,18 @@ func (self *PoolClientExecutor) ReadResponse() <-chan *crypto_proto.GrrMessage { // Inspect the request and decide if we will cache it under a query. func getQueryName(message *crypto_proto.GrrMessage) string { + query_name := "" if message.VQLClientAction != nil { for _, query := range message.VQLClientAction.Query { if query.Name != "" { - return query.Name + query_name = query.Name } } + // Cache it under the query name and the + serialized, _ := json.Marshal(message.VQLClientAction.Env) + return fmt.Sprintf("%v: %v", query_name, string(serialized)) } - return "" } diff --git a/file_store/api/queues.go b/file_store/api/queues.go index 989ef91c273..42c8e39998f 100644 --- a/file_store/api/queues.go +++ b/file_store/api/queues.go @@ -10,7 +10,8 @@ import ( // responsible for rotating the queue files as required. type QueueManager interface { PushEventRows(path_manager PathManager, rows []*ordereddict.Dict) error - Watch(queue_name string) (output <-chan *ordereddict.Dict, cancel func()) + Watch(ctx context.Context, queue_name string) ( + output <-chan *ordereddict.Dict, cancel func()) } type ResultSetFileProperties struct { diff --git a/file_store/api/testsuite.go b/file_store/api/testsuite.go index 7b9b5e7c4c0..bb134af32ee 100644 --- a/file_store/api/testsuite.go +++ b/file_store/api/testsuite.go @@ -241,7 +241,8 @@ func (self *QueueManagerTestSuite) TestPush() { ordereddict.NewDict().Set("foo", 1), ordereddict.NewDict().Set("foo", 2)} - output, cancel := self.manager.Watch(artifact_name) + ctx := context.Background() + output, cancel := self.manager.Watch(ctx, artifact_name) defer cancel() err := self.manager.PushEventRows( diff --git a/file_store/directory/buffer.go b/file_store/directory/buffer.go new file mode 100644 index 00000000000..2975589500a --- /dev/null +++ b/file_store/directory/buffer.go @@ -0,0 +1,239 @@ +// A ring buffer to queue messages + +// Similar to the client ring buffer but this one has no limit because +// we never want to block writers. + +package directory + +import ( + "encoding/binary" + "encoding/json" + "errors" + "io" + "os" + "sync" + + "github.com/Velocidex/ordereddict" + config_proto "www.velocidex.com/golang/velociraptor/config/proto" + "www.velocidex.com/golang/velociraptor/constants" + logging "www.velocidex.com/golang/velociraptor/logging" +) + +// The below is similar to http_comms.FileBasedRingBuffer except: +// * Size of the file is not limited. +// * Leasing a full number of messages at once (rather than combined size). + +const ( + FileMagic = "VRB\x5e" + FirstRecordOffset = 50 +) + +type Header struct { + ReadPointer int64 // Leasing will start at this file offset. + WritePointer int64 // Enqueue will write at this file position. +} + +func (self *Header) MarshalBinary() ([]byte, error) { + data := make([]byte, FirstRecordOffset) + copy(data, FileMagic) + + binary.LittleEndian.PutUint64(data[4:12], uint64(self.ReadPointer)) + binary.LittleEndian.PutUint64(data[12:20], uint64(self.WritePointer)) + + return data, nil +} + +func (self *Header) UnmarshalBinary(data []byte) error { + if len(data) < FirstRecordOffset { + return errors.New("Invalid header length") + } + + if string(data[:4]) != FileMagic { + return errors.New("Invalid Magic") + } + + self.ReadPointer = int64(binary.LittleEndian.Uint64(data[4:12])) + self.WritePointer = int64(binary.LittleEndian.Uint64(data[12:20])) + + return nil +} + +type FileBasedRingBuffer struct { + config_obj *config_proto.Config + + mu sync.Mutex + + fd *os.File + header *Header + + read_buf []byte + write_buf []byte + + log_ctx *logging.LogContext +} + +// Enqueue the item into the ring buffer and append to the end. +func (self *FileBasedRingBuffer) Enqueue(item interface{}) error { + serialized, err := json.Marshal(item) + if err != nil { + return err + } + + self.mu.Lock() + defer self.mu.Unlock() + + // Write the new message to the end of the file at the WritePointer + binary.LittleEndian.PutUint64(self.write_buf, uint64(len(serialized))) + _, err = self.fd.WriteAt(self.write_buf, int64(self.header.WritePointer)) + if err != nil { + // File is corrupt now, reset it. + self.Reset() + return err + } + + n, err := self.fd.WriteAt(serialized, int64(self.header.WritePointer+8)) + if err != nil { + self.Reset() + return err + } + + self.header.WritePointer += 8 + int64(n) + + // Update the header + serialized, err = self.header.MarshalBinary() + if err != nil { + return err + } + _, err = self.fd.WriteAt(serialized, 0) + if err != nil { + self.Reset() + return err + } + + return nil +} + +// Returns some messages message from the file. +func (self *FileBasedRingBuffer) Lease(count int) []*ordereddict.Dict { + self.mu.Lock() + defer self.mu.Unlock() + + result := make([]*ordereddict.Dict, 0, count) + + // The file contains more data. + for self.header.WritePointer > self.header.ReadPointer { + + // Read the next chunk (length+value) from the current leased pointer. + n, err := self.fd.ReadAt(self.read_buf, self.header.ReadPointer) + if err != nil || n != len(self.read_buf) { + self.log_ctx.Error("Possible corruption detected: file too short.") + self._Truncate() + return nil + } + + length := int64(binary.LittleEndian.Uint64(self.read_buf)) + // File might be corrupt - just reset the + // entire file. + if length > constants.MAX_MEMORY*2 || length <= 0 { + self.log_ctx.Error("Possible corruption detected - item length is too large.") + self._Truncate() + return nil + } + + // Unmarshal one item at a time. + serialized := make([]byte, length) + n, _ = self.fd.ReadAt(serialized, self.header.ReadPointer+8) + if int64(n) != length { + self.log_ctx.Errorf( + "Possible corruption detected - expected item of length %v received %v.", + length, n) + self._Truncate() + return nil + } + + item := ordereddict.NewDict() + err = item.UnmarshalJSON(serialized) + if err == nil { + result = append(result, item) + } + + self.header.ReadPointer += 8 + int64(n) + // We read up to the write pointer, we may truncate the file now. + if self.header.ReadPointer == self.header.WritePointer { + self._Truncate() + } + + if len(result) >= count { + break + } + } + + return result +} + +// _Truncate returns the file to a virgin state. Assumes +// FileBasedRingBuffer is already under lock. +func (self *FileBasedRingBuffer) _Truncate() { + _ = self.fd.Truncate(0) + self.header.ReadPointer = FirstRecordOffset + self.header.WritePointer = FirstRecordOffset + serialized, _ := self.header.MarshalBinary() + _, _ = self.fd.WriteAt(serialized, 0) +} + +func (self *FileBasedRingBuffer) Reset() { + self.mu.Lock() + defer self.mu.Unlock() + + self._Truncate() +} + +// Closes the underlying file and shut down the readers. +func (self *FileBasedRingBuffer) Close() { + self.fd.Close() +} + +func NewFileBasedRingBuffer( + config_obj *config_proto.Config, fd *os.File) (*FileBasedRingBuffer, error) { + + log_ctx := logging.GetLogger(config_obj, &logging.FrontendComponent) + + header := &Header{ + // Pad the header a bit to allow for extensions. + WritePointer: FirstRecordOffset, + ReadPointer: FirstRecordOffset, + } + data := make([]byte, FirstRecordOffset) + n, err := fd.ReadAt(data, 0) + if n > 0 && n < FirstRecordOffset && err == io.EOF { + log_ctx.Error("Possible corruption detected: file too short.") + err = fd.Truncate(0) + if err != nil { + return nil, err + } + } + + if n > 0 && (err == nil || err == io.EOF) { + err := header.UnmarshalBinary(data[:n]) + // The header is not valid, truncate the file and + // start again. + if err != nil { + log_ctx.Errorf("Possible corruption detected: %v.", err) + err = fd.Truncate(0) + if err != nil { + return nil, err + } + } + } + + result := &FileBasedRingBuffer{ + config_obj: config_obj, + fd: fd, + header: header, + read_buf: make([]byte, 8), + write_buf: make([]byte, 8), + log_ctx: log_ctx, + } + + return result, nil +} diff --git a/file_store/directory/queue.go b/file_store/directory/queue.go index 00fee83806a..2b7628ec623 100644 --- a/file_store/directory/queue.go +++ b/file_store/directory/queue.go @@ -1,21 +1,279 @@ +// A Queue manager that uses files on disk. + +// The queue manager is a broken between writers and readers. Writers +// want to emit a message to a queue with minimumal delay, and have +// the message dispatched to all readers with minimal latency. + +// A memory queue simply pushes the message to all reader's via a +// buffered channel. As long as the channel buffer remains available +// this works well with very minimal latency in broadcasting to +// readers. However, when the channel becomes full the writers may be +// blocked while readers are working their way through the channel. + +// This queue manager uses a combination of a channel and a disk file +// to buffer messages for readers. When a writer writes to the queue +// manager, the manager attempts to write on the channel but if it is +// not available, then writer switches to a ring buffer file on disk. +// A separate go routine drains the disk file into the channel +// periodically. Therefore, we never block the writer - either the +// message is delivered immediately to the buffered channel, or it is +// written to disk and later delivered. + +// This low latency property is critical because queue managers are +// used to deliver messages in critical code paths and can not be +// delayed. + package directory import ( + "context" + "fmt" + "io/ioutil" + "os" + "sync" + "time" + "github.com/Velocidex/ordereddict" config_proto "www.velocidex.com/golang/velociraptor/config/proto" "www.velocidex.com/golang/velociraptor/file_store/api" - "www.velocidex.com/golang/velociraptor/file_store/memory" "www.velocidex.com/golang/velociraptor/file_store/result_sets" "www.velocidex.com/golang/velociraptor/utils" ) -var ( - pool = memory.NewQueuePool() -) +// A listener wraps a channel that our client will listen on. We send +// the message to each listener that is subscribed to the queue. +type Listener struct { + id int64 + mu sync.Mutex + + // The consumer interested in these events. The consumer may + // block arbitrarily. + output chan *ordereddict.Dict + + // We receive events on this channel - we guarantee this does + // not block for long. + input chan *ordereddict.Dict + + // A backup file to store extra messages. + file_buffer *FileBasedRingBuffer + + // Name of the file_buffer + tmpfile string + cancel func() +} + +// Should not block - very fast. +func (self *Listener) Send(item *ordereddict.Dict) { + self.input <- item +} + +func (self *Listener) Close() { + self.cancel() + self.file_buffer.Close() + + // Close the output channel so our listener will exit. + close(self.output) + + os.Remove(self.tmpfile) // clean up file buffer +} + +func (self *Listener) Debug() *ordereddict.Dict { + result := ordereddict.NewDict().Set("BackingFile", self.tmpfile) + st, _ := os.Stat(self.tmpfile) + result.Set("Size", int64(st.Size())) + + return result +} + +func NewListener(config_obj *config_proto.Config, ctx context.Context, + output chan *ordereddict.Dict) (*Listener, error) { + + tmpfile, err := ioutil.TempFile("", "journal") + if err != nil { + return nil, err + } + + file_buffer, err := NewFileBasedRingBuffer(config_obj, tmpfile) + if err != nil { + return nil, err + } + + subctx, cancel := context.WithCancel(ctx) + self := &Listener{ + id: time.Now().UnixNano(), + cancel: cancel, + input: make(chan *ordereddict.Dict), + output: output, + file_buffer: file_buffer, + tmpfile: tmpfile.Name(), + } + + // Pump messages from input channel and distribute to + // output. If output is busy we divert to the file buffer. + go func() { + defer cancel() + + for { + select { + case <-subctx.Done(): + return + + case item, ok := <-self.input: + if !ok { + return + } + select { + case <-subctx.Done(): + return + + // If we can immediately push + // to the output, do so + case self.output <- item: + + // Otherwise push to the file. + default: + self.file_buffer.Enqueue(item) + } + } + } + + }() + + // Pump messages from the file_buffer to our listeners. + go func() { + for { + // Wait here until the file has some data in it. + select { + case <-subctx.Done(): + return + + case <-time.After(time.Second): + // Get some messages from the file. + for _, item := range self.file_buffer.Lease(100) { + select { + case <-subctx.Done(): + return + case self.output <- item: + } + } + } + } + }() + + return self, nil +} + +// A Queue manages a set of registrations at a specific queue name +// (artifact name). +type QueuePool struct { + mu sync.Mutex + + config_obj *config_proto.Config + + registrations map[string][]*Listener +} + +func (self *QueuePool) Register( + ctx context.Context, vfs_path string) (<-chan *ordereddict.Dict, func()) { + self.mu.Lock() + defer self.mu.Unlock() + + output_chan := make(chan *ordereddict.Dict) + + registrations := self.registrations[vfs_path] + new_registration, err := NewListener(self.config_obj, ctx, output_chan) + if err != nil { + close(output_chan) + return output_chan, func() {} + } + + registrations = append(registrations, new_registration) + + self.registrations[vfs_path] = registrations + + return output_chan, func() { + self.unregister(vfs_path, new_registration.id) + } +} + +// This holds a lock on the entire pool and it is used when the system +// shuts down so not very often. +func (self *QueuePool) unregister(vfs_path string, id int64) { + self.mu.Lock() + defer self.mu.Unlock() + + registrations, pres := self.registrations[vfs_path] + if pres { + new_registrations := make([]*Listener, 0, len(registrations)) + for _, item := range registrations { + if id == item.id { + item.Close() + } else { + new_registrations = append(new_registrations, + item) + } + } + + self.registrations[vfs_path] = new_registrations + } +} + +// Make a copy of the registrations under lock and then we can take +// our time to send them later. +func (self *QueuePool) getRegistrations(vfs_path string) []*Listener { + self.mu.Lock() + defer self.mu.Unlock() + + registrations, ok := self.registrations[vfs_path] + if ok { + // Make a copy of the registrations for sending this + // message. + return append([]*Listener{}, registrations...) + } + + return nil +} + +func (self *QueuePool) Broadcast(vfs_path string, row *ordereddict.Dict) { + // Ensure we do not hold the lock for very long here. + for _, item := range self.getRegistrations(vfs_path) { + item.Send(row) + } +} + +func (self *QueuePool) Debug() *ordereddict.Dict { + self.mu.Lock() + defer self.mu.Unlock() + + result := ordereddict.NewDict() + for k, v := range self.registrations { + listeners := ordereddict.NewDict() + for idx, l := range v { + listeners.Set(fmt.Sprintf("%v", idx), l.Debug()) + } + result.Set(k, listeners) + } + return result +} + +func NewQueuePool(config_obj *config_proto.Config) *QueuePool { + return &QueuePool{ + config_obj: config_obj, + registrations: make(map[string][]*Listener), + } +} type DirectoryQueueManager struct { - FileStore api.FileStore - Clock utils.Clock + mu sync.Mutex + + queue_pool *QueuePool + FileStore api.FileStore + config_obj *config_proto.Config + Clock utils.Clock +} + +func (self *DirectoryQueueManager) Debug() *ordereddict.Dict { + return self.queue_pool.Debug() } func (self *DirectoryQueueManager) PushEventRows( @@ -32,20 +290,22 @@ func (self *DirectoryQueueManager) PushEventRows( // Set a timestamp per event for easier querying. row.Set("_ts", int(self.Clock.Now().Unix())) rs_writer.Write(row) - pool.Broadcast(path_manager.GetQueueName(), row) + self.queue_pool.Broadcast(path_manager.GetQueueName(), row) } return nil } -func (self *DirectoryQueueManager) Watch( +func (self *DirectoryQueueManager) Watch(ctx context.Context, queue_name string) (output <-chan *ordereddict.Dict, cancel func()) { - return pool.Register(queue_name) + return self.queue_pool.Register(ctx, queue_name) } func NewDirectoryQueueManager(config_obj *config_proto.Config, file_store api.FileStore) api.QueueManager { return &DirectoryQueueManager{ - FileStore: file_store, - Clock: utils.RealClock{}, + FileStore: file_store, + config_obj: config_obj, + queue_pool: NewQueuePool(config_obj), + Clock: utils.RealClock{}, } } diff --git a/file_store/directory/queue_test.go b/file_store/directory/queue_test.go index e73a8131ebc..b1f57431786 100644 --- a/file_store/directory/queue_test.go +++ b/file_store/directory/queue_test.go @@ -1,15 +1,34 @@ -package directory +package directory_test import ( + "context" "io/ioutil" "os" "testing" + "time" + "github.com/Velocidex/ordereddict" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "www.velocidex.com/golang/velociraptor/config" + config_proto "www.velocidex.com/golang/velociraptor/config/proto" "www.velocidex.com/golang/velociraptor/file_store/api" + "www.velocidex.com/golang/velociraptor/file_store/directory" "www.velocidex.com/golang/velociraptor/file_store/memory" + "www.velocidex.com/golang/velociraptor/file_store/test_utils" + "www.velocidex.com/golang/velociraptor/paths/artifacts" + "www.velocidex.com/golang/velociraptor/services" + "www.velocidex.com/golang/velociraptor/services/journal" + "www.velocidex.com/golang/velociraptor/services/repository" + "www.velocidex.com/golang/velociraptor/utils" +) + +var ( + monitoringArtifact = ` +name: TestQueue +type: SERVER_EVENT +` ) func TestDirectoryQueueManager(t *testing.T) { @@ -23,6 +42,121 @@ func TestDirectoryQueueManager(t *testing.T) { config_obj.Datastore.FilestoreDirectory = dir config_obj.Datastore.Location = dir - manager := NewDirectoryQueueManager(config_obj, memory.Test_memory_file_store) + manager := directory.NewDirectoryQueueManager(config_obj, memory.Test_memory_file_store) suite.Run(t, api.NewQueueManagerTestSuite(config_obj, manager, memory.Test_memory_file_store)) } + +type TestSuite struct { + suite.Suite + config_obj *config_proto.Config + client_id string + sm *services.Service + dir string +} + +func (self *TestSuite) SetupTest() { + dir, err := ioutil.TempDir("", "file_store_test") + assert.NoError(self.T(), err) + self.dir = dir + + os.Setenv("temp", dir) + + self.config_obj, err = new(config.Loader).WithFileLoader( + "../../http_comms/test_data/server.config.yaml"). + WithRequiredFrontend().WithWriteback(). + LoadAndValidate() + require.NoError(self.T(), err) + + // Start essential services. + ctx, _ := context.WithTimeout(context.Background(), time.Second*60) + self.sm = services.NewServiceManager(ctx, self.config_obj) + + require.NoError(self.T(), self.sm.Start(journal.StartJournalService)) + require.NoError(self.T(), self.sm.Start(repository.StartRepositoryManager)) + + self.client_id = "C.12312" +} + +func (self *TestSuite) TearDownTest() { + self.sm.Close() + test_utils.GetMemoryFileStore(self.T(), self.config_obj).Clear() + test_utils.GetMemoryDataStore(self.T(), self.config_obj).Clear() + os.RemoveAll(self.dir) // clean up + +} + +func (self *TestSuite) TestQueueManager() { + repo_manager, err := services.GetRepositoryManager() + assert.NoError(self.T(), err) + + repository, err := repo_manager.GetGlobalRepository(self.config_obj) + assert.NoError(self.T(), err) + + _, err = repository.LoadYaml(monitoringArtifact, true) + assert.NoError(self.T(), err) + + file_store := test_utils.GetMemoryFileStore(self.T(), self.config_obj) + manager := directory.NewDirectoryQueueManager( + self.config_obj, file_store).(*directory.DirectoryQueueManager) + + // Push some rows to the queue manager + ctx := context.Background() + + reader, cancel := manager.Watch(ctx, "TestQueue") + + path_manager := artifacts.NewMonitoringArtifactLogPathManager(self.config_obj, + "C.123", "TestQueue") + + // Query the state of the manager for testing. + dbg := manager.Debug() + // The initial size is zero + assert.Equal(self.T(), int64(0), utils.GetInt64(dbg, "TestQueue.0.Size")) + + // Push some rows without reading - this should write to the + // file buffer and not block. + for i := 0; i < 10; i++ { + err = manager.PushEventRows(path_manager, []*ordereddict.Dict{ + ordereddict.NewDict(). + Set("Foo", "Bar"), + }) + assert.NoError(self.T(), err) + } + + // The file should contain all the rows now. + dbg = manager.Debug() + + // File size is not accurate due to timestamps + assert.Greater(self.T(), utils.GetInt64(dbg, "TestQueue.0.Size"), int64(300)) + + // Now read all the rows from the file. + count := 0 + for row := range reader { + count++ + assert.Equal(self.T(), "Bar", utils.GetString(row, "Foo")) + + // Break on the 10th row + if count >= 10 { + break + } + } + + // Now check the file - it should be truncated since we read all messages. + dbg = manager.Debug() + assert.Equal(self.T(), int64(50), utils.GetInt64(dbg, "TestQueue.0.Size")) + + // Now cancel the watcher - further reads from the channel + // should not block - the channel is closed. + cancel() + + for range reader { + } + + // Now make sure the tempfile is removed. + tempfile := utils.GetString(dbg, "TestQueue.0.BackingFile") + _, err = os.Stat(tempfile) + assert.Error(self.T(), err) +} + +func TestFileBasedQueueManager(t *testing.T) { + suite.Run(t, &TestSuite{}) +} diff --git a/file_store/memory/queue.go b/file_store/memory/queue.go index 801337acca8..a6f1fa8a08d 100644 --- a/file_store/memory/queue.go +++ b/file_store/memory/queue.go @@ -8,28 +8,46 @@ package memory import ( + "context" "sync" "time" "github.com/Velocidex/ordereddict" config_proto "www.velocidex.com/golang/velociraptor/config/proto" "www.velocidex.com/golang/velociraptor/file_store/api" + "www.velocidex.com/golang/velociraptor/logging" "www.velocidex.com/golang/velociraptor/utils" ) var ( - pool = NewQueuePool() + mu sync.Mutex + pool *QueuePool ) +func GlobalQueuePool(config_obj *config_proto.Config) *QueuePool { + mu.Lock() + defer mu.Unlock() + + if pool != nil { + return pool + } + + pool = NewQueuePool(config_obj) + return pool +} + // A queue pool is an in-process listener for events. type Listener struct { id int64 Channel chan *ordereddict.Dict + name string } type QueuePool struct { mu sync.Mutex + config_obj *config_proto.Config + registrations map[string][]*Listener } @@ -41,6 +59,7 @@ func (self *QueuePool) Register(vfs_path string) (<-chan *ordereddict.Dict, func new_registration := &Listener{ Channel: make(chan *ordereddict.Dict, 1000), id: time.Now().UnixNano(), + name: vfs_path, } registrations = append(registrations, new_registration) @@ -51,6 +70,8 @@ func (self *QueuePool) Register(vfs_path string) (<-chan *ordereddict.Dict, func } } +// This holds a lock on the entire pool and it is used when the system +// shuts down so not very often. func (self *QueuePool) unregister(vfs_path string, id int64) { self.mu.Lock() defer self.mu.Unlock() @@ -71,28 +92,47 @@ func (self *QueuePool) unregister(vfs_path string, id int64) { } } -func (self *QueuePool) Broadcast(vfs_path string, row *ordereddict.Dict) { +// Make a copy of the registrations under lock and then we can take +// our time to send them later. +func (self *QueuePool) getRegistrations(vfs_path string) []*Listener { self.mu.Lock() defer self.mu.Unlock() registrations, ok := self.registrations[vfs_path] if ok { - for _, item := range registrations { - item.Channel <- row + // Make a copy of the registrations for sending this + // message. + return append([]*Listener{}, registrations...) + } + + return nil +} + +func (self *QueuePool) Broadcast(vfs_path string, row *ordereddict.Dict) { + // Ensure we do not hold the lock for very long here. + for _, item := range self.getRegistrations(vfs_path) { + select { + case item.Channel <- row: + case <-time.After(2 * time.Second): + logger := logging.GetLogger( + self.config_obj, &logging.FrontendComponent) + logger.Error("QueuePool: Dropping message to queue %v", + item.name) } } } -func NewQueuePool() *QueuePool { +func NewQueuePool(config_obj *config_proto.Config) *QueuePool { return &QueuePool{ + config_obj: config_obj, registrations: make(map[string][]*Listener), } } type MemoryQueueManager struct { - FileStore api.FileStore - - Clock utils.Clock + FileStore api.FileStore + config_obj *config_proto.Config + Clock utils.Clock } func (self *MemoryQueueManager) Debug() { @@ -106,7 +146,7 @@ func (self *MemoryQueueManager) PushEventRows( path_manager api.PathManager, dict_rows []*ordereddict.Dict) error { for _, row := range dict_rows { - pool.Broadcast(path_manager.GetQueueName(), + GlobalQueuePool(self.config_obj).Broadcast(path_manager.GetQueueName(), row.Set("_ts", int(self.Clock.Now().Unix()))) } @@ -131,14 +171,15 @@ func (self *MemoryQueueManager) PushEventRows( } func (self *MemoryQueueManager) Watch( - queue_name string) (output <-chan *ordereddict.Dict, cancel func()) { - return pool.Register(queue_name) + ctx context.Context, queue_name string) (output <-chan *ordereddict.Dict, cancel func()) { + return GlobalQueuePool(self.config_obj).Register(queue_name) } func NewMemoryQueueManager(config_obj *config_proto.Config, file_store api.FileStore) api.QueueManager { return &MemoryQueueManager{ - FileStore: file_store, - Clock: utils.RealClock{}, + FileStore: file_store, + config_obj: config_obj, + Clock: utils.RealClock{}, } } diff --git a/file_store/mysql/queue.go b/file_store/mysql/queue.go index 943dc75fdb8..4b935387759 100644 --- a/file_store/mysql/queue.go +++ b/file_store/mysql/queue.go @@ -30,6 +30,7 @@ package mysql import ( + "context" "database/sql" "fmt" "sync" @@ -242,7 +243,8 @@ func (self *MysqlQueueManager) PushEventRows( return err } -func (self *MysqlQueueManager) Watch(queue_name string) (<-chan *ordereddict.Dict, func()) { +func (self *MysqlQueueManager) Watch( + ctx context.Context, queue_name string) (<-chan *ordereddict.Dict, func()) { return pool.Register(queue_name) } diff --git a/file_store/result_sets/events_test.go b/file_store/result_sets/events_test.go index d266990a6a9..8167d0d9221 100644 --- a/file_store/result_sets/events_test.go +++ b/file_store/result_sets/events_test.go @@ -177,10 +177,9 @@ func (self *TimedResultSetTestSuite) TestTimedResultSets() { clock := &utils.MockClock{MockNow: now} // Start off by writing some events on a queue. - qm := &directory.DirectoryQueueManager{ - FileStore: self.file_store, - Clock: clock, - } + qm := directory.NewDirectoryQueueManager( + self.config_obj, self.file_store).(*directory.DirectoryQueueManager) + qm.Clock = clock path_manager := artifacts.NewArtifactPathManager( self.config_obj, diff --git a/flows/artifacts.go b/flows/artifacts.go index 5292afb2c0d..c4305df65ea 100644 --- a/flows/artifacts.go +++ b/flows/artifacts.go @@ -1,5 +1,5 @@ /* - Velociraptor - Hunting Evil +o Velociraptor - Hunting Evil Copyright (C) 2019 Velocidex Innovations. This program is free software: you can redistribute it and/or modify @@ -55,6 +55,8 @@ var ( Name: "uploaded_bytes", Help: "Total bytes of Uploaded Files.", }) + + notModified = errors.New("Not modified") ) // closeContext is called after all messages from the clients are @@ -420,15 +422,21 @@ func IsRequestComplete( // Update any hunts if needed. if constants.HuntIdRegex.MatchString(collection_context.Request.Creator) { - err := services.GetHuntDispatcher().ModifyHunt( + dispatcher := services.GetHuntDispatcher() + if dispatcher == nil { + return false, errors.New("Hunt dispatcher not valid") + } + + err := dispatcher.ModifyHunt( collection_context.Request.Creator, func(hunt *api_proto.Hunt) error { if hunt != nil && hunt.Stats != nil { hunt.Stats.TotalClientsWithResults++ + return nil } - return nil + return notModified }) - if err != nil { + if err != nil && err != notModified { return true, err } } diff --git a/flows/foreman.go b/flows/foreman.go index f6a6f090248..7aae9a85861 100644 --- a/flows/foreman.go +++ b/flows/foreman.go @@ -134,6 +134,10 @@ func ForemanProcessMessage( return err } + notifier := services.GetNotifier() + if notifier == nil { + return errors.New("Notifier not configured") + } return services.GetNotifier().NotifyListener(config_obj, client_id) }) } diff --git a/flows/hunts.go b/flows/hunts.go index 9e6f0d0a6e3..006a1c2534e 100644 --- a/flows/hunts.go +++ b/flows/hunts.go @@ -51,6 +51,8 @@ func GetNewHuntId() string { return constants.HUNT_PREFIX + string(result) } +// Backwards compatibility: Figure out the list of collected hunts +// from the hunt object's request func FindCollectedArtifacts( config_obj *config_proto.Config, hunt *api_proto.Hunt) { @@ -59,6 +61,11 @@ func FindCollectedArtifacts( return } + // Hunt already has artifacts list. + if len(hunt.Artifacts) > 0 { + return + } + hunt.Artifacts = hunt.StartRequest.Artifacts hunt.ArtifactSources = []string{} for _, artifact := range hunt.StartRequest.Artifacts { @@ -124,6 +131,17 @@ func CreateHunt( return "", errors.New("Hunt expiry is in the past!") } + // Set the artifacts information in the hunt object itself. + hunt.Artifacts = hunt.StartRequest.Artifacts + hunt.ArtifactSources = []string{} + for _, artifact := range hunt.StartRequest.Artifacts { + for _, source := range GetArtifactSources( + config_obj, artifact) { + hunt.ArtifactSources = append( + hunt.ArtifactSources, path.Join(artifact, source)) + } + } + manager, err := services.GetRepositoryManager() if err != nil { return "", err @@ -194,7 +212,12 @@ func ListHunts(config_obj *config_proto.Config, in *api_proto.ListHuntsRequest) result := &api_proto.ListHuntsResponse{} - err := services.GetHuntDispatcher().ApplyFuncOnHunts( + dispatcher := services.GetHuntDispatcher() + if dispatcher == nil { + return nil, errors.New("Hunt dispatcher not initialized") + } + + err := dispatcher.ApplyFuncOnHunts( func(hunt *api_proto.Hunt) error { if uint64(len(result.Items)) < in.Offset { return nil @@ -205,10 +228,6 @@ func ListHunts(config_obj *config_proto.Config, in *api_proto.ListHuntsRequest) } if in.IncludeArchived || hunt.State != api_proto.Hunt_ARCHIVED { - - // FIXME: Backwards compatibility. - hunt.HuntId = path.Base(hunt.HuntId) - result.Items = append(result.Items, hunt) } return nil @@ -230,23 +249,24 @@ func GetHunt(config_obj *config_proto.Config, in *api_proto.GetHuntRequest) ( var result *api_proto.Hunt - err = services.GetHuntDispatcher().ModifyHunt( - in.HuntId, + dispatcher := services.GetHuntDispatcher() + if dispatcher == nil { + return nil, errors.New("Hunt dispatcher not valid") + } + + err = dispatcher.ModifyHunt(in.HuntId, func(hunt_obj *api_proto.Hunt) error { - // Make a copy + // Make a copy of the hunt result = proto.Clone(hunt_obj).(*api_proto.Hunt) - // HACK: Velociraptor only knows how to - // collect artifacts now. Eventually the whole - // concept of a flow will go away but for now - // we need to figure out which artifacts we - // are actually collecting - there are not - // many possibilities since we have reduced - // the number of possible flows significantly. - FindCollectedArtifacts(config_obj, result) - - return nil + // We do not modify the hunt so it is not dirty. + return notModified }) + if err != notModified { + return nil, err + } + + FindCollectedArtifacts(config_obj, result) if result == nil || result.Stats == nil { return result, errors.New("Not found") @@ -254,7 +274,7 @@ func GetHunt(config_obj *config_proto.Config, in *api_proto.GetHuntRequest) ( result.Stats.AvailableDownloads, _ = availableHuntDownloadFiles(config_obj, in.HuntId) - return result, err + return result, nil } // availableHuntDownloadFiles returns the prepared zip downloads available to @@ -275,12 +295,6 @@ func availableHuntDownloadFiles(config_obj *config_proto.Config, // will update the StartTime. // 2. A hunt in the running state can go to the Stop state // 3. A hunt's description can be modified. - -// It is not possible to restart a stopped hunt. This is because the -// hunt manager watches the hunt participation events for all hunts at -// the same time, and just ignores clients that want to participate in -// stopped hunts. It is not possible to go back and re-examine the -// queue. func ModifyHunt( ctx context.Context, config_obj *config_proto.Config, @@ -335,19 +349,9 @@ func ModifyHunt( hunt.State = api_proto.Hunt_STOPPED } - // Write the new hunt object to the datastore. - db, err := datastore.GetDB(config_obj) - if err != nil { - return err - } - - hunt_path_manager := paths.NewHuntPathManager(hunt.HuntId) - err = db.SetSubject( - config_obj, hunt_path_manager.Path(), hunt) - if err != nil { - return err - } - + // Returning nil indicates to the hunt manager + // that the hunt was successfully modified. It + // will then flush it to disk. return nil }) diff --git a/gui/velociraptor/src/components/core/api-service.js b/gui/velociraptor/src/components/core/api-service.js index 606686f16b8..d502e2dd32a 100644 --- a/gui/velociraptor/src/components/core/api-service.js +++ b/gui/velociraptor/src/components/core/api-service.js @@ -38,6 +38,13 @@ const get = function(url, params, cancel_token) { url: api_handlers + url, params: params, cancelToken: cancel_token, + }).then(response=>{ + // Update the csrf token. + let token = response.headers["x-csrf-token"]; + if (token && token.length > 0) { + window.CsrfToken = token; + } + return response; }).catch(handle_error); }; diff --git a/gui/velociraptor/src/components/forms/validated_int.js b/gui/velociraptor/src/components/forms/validated_int.js index 3014dae2a78..69664914fd6 100644 --- a/gui/velociraptor/src/components/forms/validated_int.js +++ b/gui/velociraptor/src/components/forms/validated_int.js @@ -21,9 +21,13 @@ export default class ValidatedInteger extends React.Component { render() { let value = this.props.value; + + // Need to set the initial value to '' to tell React this is a + // controlled component. if (_.isUndefined(value)) { - value = 0; + value = ''; } + return ( <> self.GetLastTimestamp() { + atomic.StoreUint64(&self.last_timestamp, hunt_obj.StartTime) + } - // The hunts start time could have been modified - we need to - // update ours then. - if hunt_obj.StartTime > self.GetLastTimestamp() { - atomic.StoreUint64(&self.last_timestamp, hunt_obj.StartTime) + self.dirty = true } - - self.dirty = true - return err } diff --git a/services/hunt_manager/hunt_manager.go b/services/hunt_manager/hunt_manager.go index a4d16ec9f03..98b4040c01f 100644 --- a/services/hunt_manager/hunt_manager.go +++ b/services/hunt_manager/hunt_manager.go @@ -113,7 +113,7 @@ func (self *HuntManager) StartParticipation( if err != nil { return err } - qm_chan, cancel := journal.Watch("System.Hunt.Participation") + qm_chan, cancel := journal.Watch(ctx, "System.Hunt.Participation") wg.Add(1) go func() { @@ -149,7 +149,7 @@ func (self *HuntManager) StartFlowCompletion( return err } - qm_chan, cancel := journal.Watch("System.Flow.Completion") + qm_chan, cancel := journal.Watch(ctx, "System.Flow.Completion") wg.Add(1) go func() { @@ -173,6 +173,8 @@ func (self *HuntManager) StartFlowCompletion( return nil } +// Watch for all flows created by a hunt and maintain the list of hunt +// completions. func (self *HuntManager) ProcessFlowCompletion( ctx context.Context, config_obj *config_proto.Config, @@ -271,14 +273,17 @@ func (self *HuntManager) ProcessRow( return } - request := &flows_proto.ArtifactCollectorArgs{ - ClientId: participation_row.ClientId, - Creator: participation_row.HuntId, - } + // Prepare a collection request + request := &flows_proto.ArtifactCollectorArgs{} // Get hunt information about this hunt. now := uint64(time.Now().UnixNano() / 1000) - err = services.GetHuntDispatcher().ModifyHunt( + dispatcher := services.GetHuntDispatcher() + if dispatcher == nil { + return + } + + err = dispatcher.ModifyHunt( participation_row.HuntId, func(hunt_obj *api_proto.Hunt) error { if hunt_obj.Stats == nil { @@ -352,6 +357,11 @@ func (self *HuntManager) ProcessRow( // Direct the request against our client and schedule it. request.ClientId = participation_row.ClientId + + // Make sure the flow is created by the hunt - this is how we + // track it. + request.Creator = participation_row.HuntId + flow_id, err := launcher.ScheduleArtifactCollection( ctx, config_obj, vql_subsystem.NullACLManager{}, repository, request) if err != nil { diff --git a/services/interrogation/interrogation.go b/services/interrogation/interrogation.go index 2d1732a9bbc..c5a9c46c811 100644 --- a/services/interrogation/interrogation.go +++ b/services/interrogation/interrogation.go @@ -49,7 +49,7 @@ func (self *EnrollmentService) Start( return err } - events, cancel := journal.Watch("Server.Internal.Enrollment") + events, cancel := journal.Watch(ctx, "Server.Internal.Enrollment") wg.Add(1) go func() { diff --git a/services/interrogation/utils.go b/services/interrogation/utils.go index ed0d6f30d0c..c311b188bed 100644 --- a/services/interrogation/utils.go +++ b/services/interrogation/utils.go @@ -30,7 +30,7 @@ func watchForFlowCompletion( return err } - events, cancel := journal.Watch("System.Flow.Completion") + events, cancel := journal.Watch(ctx, "System.Flow.Completion") manager, err := services.GetRepositoryManager() if err != nil { return err diff --git a/services/journal.go b/services/journal.go index 74109751c09..7b79dcd38db 100644 --- a/services/journal.go +++ b/services/journal.go @@ -15,6 +15,7 @@ package services // in real time from client event artifacts. import ( + "context" "errors" "sync" @@ -52,7 +53,8 @@ type JournalService interface { // Watch the artifact named by queue_name for new rows. This // only makes sense for artifacts of type CLIENT_EVENT and // SERVER_EVENT - Watch(queue_name string) (output <-chan *ordereddict.Dict, cancel func()) + Watch(ctx context.Context, + queue_name string) (output <-chan *ordereddict.Dict, cancel func()) // Push the rows into the datastore in the location give by // the path manager. diff --git a/services/journal/journal.go b/services/journal/journal.go index f50877d6c96..c3d9bf821db 100644 --- a/services/journal/journal.go +++ b/services/journal/journal.go @@ -27,7 +27,8 @@ type JournalService struct { qm api.QueueManager } -func (self *JournalService) Watch(queue_name string) ( +func (self *JournalService) Watch( + ctx context.Context, queue_name string) ( output <-chan *ordereddict.Dict, cancel func()) { if self == nil || self.qm == nil { @@ -35,7 +36,7 @@ func (self *JournalService) Watch(queue_name string) ( return nil, func() {} } - return self.qm.Watch(queue_name) + return self.qm.Watch(ctx, queue_name) } func (self *JournalService) PushRowsToArtifact( diff --git a/services/labels/labels.go b/services/labels/labels.go index 6adbdc0b362..a37f4f2975e 100644 --- a/services/labels/labels.go +++ b/services/labels/labels.go @@ -370,7 +370,7 @@ func (self *Labeler) Start(ctx context.Context, return err } - events, cancel := journal.Watch("Server.Internal.Label") + events, cancel := journal.Watch(ctx, "Server.Internal.Label") wg.Add(1) go func() { diff --git a/services/notifications/notifications.go b/services/notifications/notifications.go index 896fa01926d..5617dc0d1f5 100644 --- a/services/notifications/notifications.go +++ b/services/notifications/notifications.go @@ -60,7 +60,7 @@ func StartNotificationService( if err != nil { return err } - events, cancel := journal.Watch("Server.Internal.Notifications") + events, cancel := journal.Watch(ctx, "Server.Internal.Notifications") wg.Add(1) go func() { @@ -74,6 +74,7 @@ func StartNotificationService( self.notification_pool.Shutdown() self.notification_pool = nil }() + defer logger.Info("Exiting notification service!") for { select { @@ -90,7 +91,6 @@ func StartNotificationService( continue } - self.pool_mu.Lock() if target == "Regex" { regex_str, ok := event.GetString("Regex") if ok { @@ -109,7 +109,6 @@ func StartNotificationService( } else { self.notification_pool.Notify(target) } - self.pool_mu.Unlock() } } }() diff --git a/services/server_monitoring/server_monitoring.go b/services/server_monitoring/server_monitoring.go index cbcb320414e..1b08ec818ec 100644 --- a/services/server_monitoring/server_monitoring.go +++ b/services/server_monitoring/server_monitoring.go @@ -47,6 +47,9 @@ func (self *EventTable) Close() { self.mu.Lock() defer self.mu.Unlock() + logger := logging.GetLogger(self.config_obj, &logging.FrontendComponent) + logger.Info("Closing Server Monitoring Event table") + // Close the old table. if self.cancel != nil { self.cancel() @@ -117,6 +120,7 @@ func (self *EventTable) Update( // Run each collection separately in parallel. for _, vql_request := range vql_requests { + err = self.RunQuery(ctx, config_obj, self.wg, vql_request) if err != nil { return err @@ -197,7 +201,6 @@ func (self *EventTable) RunQuery( for _, query := range vql_request.Query { vql, err := vfilter.Parse(query.VQL) if err != nil { - scope.Log("server_monitoring: %v", err) return } @@ -217,7 +220,6 @@ func (self *EventTable) RunQuery( rs_writer.Write(vfilter.RowToDict(ctx, scope, row). Set("_ts", self.Clock.Now().Unix())) rs_writer.Flush() - } } } diff --git a/services/services.go b/services/services.go index 8b7ad12e9de..592b53a3666 100644 --- a/services/services.go +++ b/services/services.go @@ -19,6 +19,7 @@ package services import ( "context" + "fmt" "sync" config_proto "www.velocidex.com/golang/velociraptor/config/proto" @@ -52,6 +53,7 @@ type Service struct { } func (self *Service) Close() { + fmt.Printf("Closing services\n") self.cancel() // Wait for services to exit. diff --git a/services/test_utils.go b/services/test_utils.go index d4f5a5b3eba..5bb9b140dc7 100644 --- a/services/test_utils.go +++ b/services/test_utils.go @@ -1,6 +1,7 @@ package services import ( + "context" "sync" "github.com/Velocidex/ordereddict" @@ -28,7 +29,8 @@ func GetPublishedEvents( if err != nil { return } - events, cancel := journal.Watch(artifact) + ctx := context.Background() + events, cancel := journal.Watch(ctx, artifact) defer cancel() // Wait here until we are set up. diff --git a/services/vfs_service/utils.go b/services/vfs_service/utils.go index d160a16545e..1e2d31ed870 100644 --- a/services/vfs_service/utils.go +++ b/services/vfs_service/utils.go @@ -31,12 +31,15 @@ func watchForFlowCompletion( return err } - events, cancel := journal.Watch("System.Flow.Completion") + events, cancel := journal.Watch(ctx, "System.Flow.Completion") + + logger := logging.GetLogger(config_obj, &logging.FrontendComponent) wg.Add(1) go func() { defer wg.Done() defer cancel() + defer logger.Info("Stopping watch for %v", artifact_name) builder := services.ScopeBuilder{ Config: config_obj, diff --git a/services/vfs_service/vfs_service.go b/services/vfs_service/vfs_service.go index fc2de10a64c..8c3c820c727 100644 --- a/services/vfs_service/vfs_service.go +++ b/services/vfs_service/vfs_service.go @@ -64,6 +64,8 @@ func (self *VFSService) ProcessDownloadFile( ts, _ := row.GetInt64("_ts") logger := logging.GetLogger(config_obj, &logging.FrontendComponent) + logger.Info("VFSService: Processing System.VFS.DownloadFile from %v", client_id) + flow_path_manager := paths.NewFlowPathManager(client_id, flow_id) path_manager := artifacts.NewArtifactPathManager(config_obj, @@ -127,6 +129,8 @@ func (self *VFSService) ProcessListDirectory( ts, _ := row.GetInt64("_ts") logger := logging.GetLogger(config_obj, &logging.FrontendComponent) + logger.Info("VFSService: Processing System.VFS.ListDirectory from %v", client_id) + path_manager := artifacts.NewArtifactPathManager(config_obj, client_id, flow_id, "System.VFS.ListDirectory") @@ -141,6 +145,7 @@ func (self *VFSService) ProcessListDirectory( var current_vfs_components []string = nil for row := range row_chan { + utils.Debug(row) full_path, _ := row.GetString("_FullPath") accessor, _ := row.GetString("_Accessor") name, _ := row.GetString("Name") diff --git a/utils/dict.go b/utils/dict.go index a8c6b9df870..1a5cfe90430 100644 --- a/utils/dict.go +++ b/utils/dict.go @@ -8,11 +8,11 @@ import ( // Returns the containing dict for a nested dict. This allows fetching // a key using dot notation. -func _get(dict *ordereddict.Dict, key string) *ordereddict.Dict { +func _get(dict *ordereddict.Dict, key string) (*ordereddict.Dict, string) { components := strings.Split(key, ".") // Only a single component, return the dict. if len(components) == 1 { - return dict + return dict, components[0] } // Iterate over all but the last component fetching the nested @@ -21,26 +21,26 @@ func _get(dict *ordereddict.Dict, key string) *ordereddict.Dict { for _, member := range components[:len(components)-1] { result, pres := dict.Get(member) if !pres { - return ordereddict.NewDict() + return ordereddict.NewDict(), "" } nested, ok := result.(*ordereddict.Dict) if !ok { - return ordereddict.NewDict() + return ordereddict.NewDict(), "" } dict = nested } - return dict + return dict, components[len(components)-1] } func GetString(dict *ordereddict.Dict, key string) string { - dict = _get(dict, key) - res, _ := dict.GetString(key) + subdict, last := _get(dict, key) + res, _ := subdict.GetString(last) return res } func GetInt64(dict *ordereddict.Dict, key string) int64 { - dict = _get(dict, key) - res, _ := dict.GetInt64(key) + subdict, last := _get(dict, key) + res, _ := subdict.GetInt64(last) return res } diff --git a/vql/common/for.go b/vql/common/for.go index a2dc872993f..234913201ac 100644 --- a/vql/common/for.go +++ b/vql/common/for.go @@ -64,6 +64,56 @@ func (self ForPlugin) Info(scope vfilter.Scope, type_map *vfilter.TypeMap) *vfil } } +type RangePluginArgs struct { + Start int64 `vfilter:"required,field=start,doc=Start index (0 based)"` + End int64 `vfilter:"required,field=end,doc=End index (0 based)"` + Step int64 `vfilter:"required,field=step,doc=End index (0 based)"` +} + +type RangePlugin struct{} + +func (self RangePlugin) Call( + ctx context.Context, + scope vfilter.Scope, + args *ordereddict.Dict) <-chan vfilter.Row { + output_chan := make(chan vfilter.Row) + + go func() { + defer close(output_chan) + + arg := &RangePluginArgs{} + err := vfilter.ExtractArgs(scope, args, arg) + if err != nil { + scope.Log("range: %v", err) + return + } + + if arg.Step == 0 { + arg.Step = 1 + } + + for i := arg.Start; i < arg.End; i += arg.Step { + select { + case <-ctx.Done(): + return + + case output_chan <- ordereddict.NewDict().Set("_value", i): + } + } + }() + + return output_chan +} + +func (self RangePlugin) Info(scope vfilter.Scope, type_map *vfilter.TypeMap) *vfilter.PluginInfo { + return &vfilter.PluginInfo{ + Name: "range", + Doc: "Iterate over range.", + ArgType: type_map.AddType(scope, &RangePluginArgs{}), + } +} + func init() { + vql_subsystem.RegisterPlugin(&RangePlugin{}) vql_subsystem.RegisterPlugin(&ForPlugin{}) } diff --git a/vql/functions/lists.go b/vql/functions/lists.go index f87e9a6b334..777c67f3f0d 100644 --- a/vql/functions/lists.go +++ b/vql/functions/lists.go @@ -218,7 +218,78 @@ func (self LenFunction) Info(scope vfilter.Scope, type_map *vfilter.TypeMap) *vf } } +type SliceFunctionArgs struct { + List vfilter.Any `vfilter:"required,field=list,doc=A list of items to slice"` + Start uint64 `vfilter:"required,field=start,doc=Start index (0 based)"` + End uint64 `vfilter:"required,field=end,doc=End index (0 based)"` +} +type SliceFunction struct{} + +func (self *SliceFunction) Call(ctx context.Context, + scope vfilter.Scope, + args *ordereddict.Dict) vfilter.Any { + arg := &SliceFunctionArgs{} + err := vfilter.ExtractArgs(scope, args, arg) + if err != nil { + scope.Log("len: %s", err.Error()) + return &vfilter.Null{} + } + + slice := reflect.ValueOf(arg.List) + // A slice of strings. Only the following are supported + // https://golang.org/pkg/reflect/#Value.Len + if slice.Type().Kind() == reflect.Slice || + slice.Type().Kind() == reflect.Map || + slice.Type().Kind() == reflect.Array || + slice.Type().Kind() == reflect.String { + + if arg.End > uint64(slice.Len()) { + arg.End = uint64(slice.Len()) + } + + if arg.Start > arg.End { + arg.Start = arg.End + } + + result := make([]interface{}, 0, arg.End-arg.Start) + for i := arg.Start; i < arg.End; i++ { + result = append(result, slice.Index(int(i)).Interface()) + } + + return result + } + + dict, ok := arg.List.(*ordereddict.Dict) + if ok { + keys := dict.Keys() + if arg.End > uint64(len(keys)) { + arg.End = uint64(len(keys)) + } + + if arg.Start > arg.End { + arg.Start = arg.End + } + + result := make([]interface{}, 0, arg.End-arg.Start) + for i := arg.Start; i < arg.End; i++ { + result = append(result, keys[int(i)]) + } + return result + } + + return []vfilter.Any{} +} + +func (self SliceFunction) Info(scope vfilter.Scope, type_map *vfilter.TypeMap) *vfilter.FunctionInfo { + return &vfilter.FunctionInfo{ + Name: "slice", + Doc: "Slice an array.", + ArgType: type_map.AddType(scope, &SliceFunctionArgs{}), + } +} + func init() { + vql_subsystem.RegisterFunction(&SliceFunction{}) vql_subsystem.RegisterFunction(&FilterFunction{}) vql_subsystem.RegisterFunction(&ArrayFunction{}) vql_subsystem.RegisterFunction(&JoinFunction{}) diff --git a/vql/server/hunts/create.go b/vql/server/hunts/create.go index 0a17fb2fdb7..d98f57b2425 100644 --- a/vql/server/hunts/create.go +++ b/vql/server/hunts/create.go @@ -43,6 +43,7 @@ type ScheduleHuntFunctionArg struct { OpsPerSecond float64 `vfilter:"optional,field=ops_per_sec,doc=Set query ops_per_sec value"` MaxRows uint64 `vfilter:"optional,field=max_rows,doc=Max number of rows to fetch"` MaxBytes uint64 `vfilter:"optional,field=max_bytes,doc=Max number of bytes to upload"` + Pause bool `vfilter:"optional,field=pause,doc=If specified the new hunt will be in the paused state"` } type ScheduleHuntFunction struct{} @@ -53,7 +54,7 @@ func (self *ScheduleHuntFunction) Call(ctx context.Context, err := vql_subsystem.CheckAccess(scope, acls.COLLECT_CLIENT) if err != nil { - scope.Log("flows: %s", err) + scope.Log("hunt: %s", err) return vfilter.Null{} } @@ -97,12 +98,17 @@ func (self *ScheduleHuntFunction) Call(ctx context.Context, return vfilter.Null{} } + state := api_proto.Hunt_RUNNING + if arg.Pause { + state = api_proto.Hunt_PAUSED + } + hunt_request := &api_proto.Hunt{ HuntDescription: arg.Description, Creator: vql_subsystem.GetPrincipal(scope), StartRequest: request, Expires: arg.Expires, - State: api_proto.Hunt_RUNNING, + State: state, } // Run the hunt in the ACL context of the caller. diff --git a/vql/server/monitoring.go b/vql/server/monitoring.go index 69081b4100d..ead7c532f45 100644 --- a/vql/server/monitoring.go +++ b/vql/server/monitoring.go @@ -155,7 +155,7 @@ func (self WatchMonitoringPlugin) Call( } // Ask the journal service to watch the event queue for us. - qm_chan, cancel := journal.Watch(arg.Artifact) + qm_chan, cancel := journal.Watch(ctx, arg.Artifact) defer cancel() for row := range qm_chan {