diff --git a/api/csrf.go b/api/csrf.go index 1ae8c16476f..a1e59851a92 100644 --- a/api/csrf.go +++ b/api/csrf.go @@ -27,7 +27,7 @@ func csrfProtect(config_obj *config_proto.Config, _, _ = hasher.Write([]byte(config_obj.Frontend.PrivateKey)) token := hasher.Sum(nil) - protectionFn := csrf.Protect(token, csrf.Path("/")) + protectionFn := csrf.Protect(token, csrf.Path("/"), csrf.MaxAge(7*24*60*60)) return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { protectionFn(parent).ServeHTTP(w, r) diff --git a/artifacts/definitions/Windows/Forensics/Lnk.yaml b/artifacts/definitions/Windows/Forensics/Lnk.yaml index c269e321ce8..c42e50ab415 100644 --- a/artifacts/definitions/Windows/Forensics/Lnk.yaml +++ b/artifacts/definitions/Windows/Forensics/Lnk.yaml @@ -395,9 +395,9 @@ sources: column_types: - name: Mtime type: timestamp - - name: ATime + - name: Atime type: timestamp - - name: CTime + - name: Ctime type: timestamp - name: HeaderCreationTime type: timestamp diff --git a/executor/pool.go b/executor/pool.go index b860f73ab4e..9c46f45fbb2 100644 --- a/executor/pool.go +++ b/executor/pool.go @@ -71,15 +71,18 @@ func (self *PoolClientExecutor) ReadResponse() <-chan *crypto_proto.GrrMessage { // Inspect the request and decide if we will cache it under a query. func getQueryName(message *crypto_proto.GrrMessage) string { + query_name := "" if message.VQLClientAction != nil { for _, query := range message.VQLClientAction.Query { if query.Name != "" { - return query.Name + query_name = query.Name } } + // Cache it under the query name and the + serialized, _ := json.Marshal(message.VQLClientAction.Env) + return fmt.Sprintf("%v: %v", query_name, string(serialized)) } - return "" } diff --git a/file_store/api/queues.go b/file_store/api/queues.go index 989ef91c273..42c8e39998f 100644 --- a/file_store/api/queues.go +++ b/file_store/api/queues.go @@ -10,7 +10,8 @@ import ( // responsible for rotating the queue files as required. type QueueManager interface { PushEventRows(path_manager PathManager, rows []*ordereddict.Dict) error - Watch(queue_name string) (output <-chan *ordereddict.Dict, cancel func()) + Watch(ctx context.Context, queue_name string) ( + output <-chan *ordereddict.Dict, cancel func()) } type ResultSetFileProperties struct { diff --git a/file_store/api/testsuite.go b/file_store/api/testsuite.go index 7b9b5e7c4c0..bb134af32ee 100644 --- a/file_store/api/testsuite.go +++ b/file_store/api/testsuite.go @@ -241,7 +241,8 @@ func (self *QueueManagerTestSuite) TestPush() { ordereddict.NewDict().Set("foo", 1), ordereddict.NewDict().Set("foo", 2)} - output, cancel := self.manager.Watch(artifact_name) + ctx := context.Background() + output, cancel := self.manager.Watch(ctx, artifact_name) defer cancel() err := self.manager.PushEventRows( diff --git a/file_store/directory/buffer.go b/file_store/directory/buffer.go new file mode 100644 index 00000000000..2975589500a --- /dev/null +++ b/file_store/directory/buffer.go @@ -0,0 +1,239 @@ +// A ring buffer to queue messages + +// Similar to the client ring buffer but this one has no limit because +// we never want to block writers. + +package directory + +import ( + "encoding/binary" + "encoding/json" + "errors" + "io" + "os" + "sync" + + "github.com/Velocidex/ordereddict" + config_proto "www.velocidex.com/golang/velociraptor/config/proto" + "www.velocidex.com/golang/velociraptor/constants" + logging "www.velocidex.com/golang/velociraptor/logging" +) + +// The below is similar to http_comms.FileBasedRingBuffer except: +// * Size of the file is not limited. +// * Leasing a full number of messages at once (rather than combined size). + +const ( + FileMagic = "VRB\x5e" + FirstRecordOffset = 50 +) + +type Header struct { + ReadPointer int64 // Leasing will start at this file offset. + WritePointer int64 // Enqueue will write at this file position. +} + +func (self *Header) MarshalBinary() ([]byte, error) { + data := make([]byte, FirstRecordOffset) + copy(data, FileMagic) + + binary.LittleEndian.PutUint64(data[4:12], uint64(self.ReadPointer)) + binary.LittleEndian.PutUint64(data[12:20], uint64(self.WritePointer)) + + return data, nil +} + +func (self *Header) UnmarshalBinary(data []byte) error { + if len(data) < FirstRecordOffset { + return errors.New("Invalid header length") + } + + if string(data[:4]) != FileMagic { + return errors.New("Invalid Magic") + } + + self.ReadPointer = int64(binary.LittleEndian.Uint64(data[4:12])) + self.WritePointer = int64(binary.LittleEndian.Uint64(data[12:20])) + + return nil +} + +type FileBasedRingBuffer struct { + config_obj *config_proto.Config + + mu sync.Mutex + + fd *os.File + header *Header + + read_buf []byte + write_buf []byte + + log_ctx *logging.LogContext +} + +// Enqueue the item into the ring buffer and append to the end. +func (self *FileBasedRingBuffer) Enqueue(item interface{}) error { + serialized, err := json.Marshal(item) + if err != nil { + return err + } + + self.mu.Lock() + defer self.mu.Unlock() + + // Write the new message to the end of the file at the WritePointer + binary.LittleEndian.PutUint64(self.write_buf, uint64(len(serialized))) + _, err = self.fd.WriteAt(self.write_buf, int64(self.header.WritePointer)) + if err != nil { + // File is corrupt now, reset it. + self.Reset() + return err + } + + n, err := self.fd.WriteAt(serialized, int64(self.header.WritePointer+8)) + if err != nil { + self.Reset() + return err + } + + self.header.WritePointer += 8 + int64(n) + + // Update the header + serialized, err = self.header.MarshalBinary() + if err != nil { + return err + } + _, err = self.fd.WriteAt(serialized, 0) + if err != nil { + self.Reset() + return err + } + + return nil +} + +// Returns some messages message from the file. +func (self *FileBasedRingBuffer) Lease(count int) []*ordereddict.Dict { + self.mu.Lock() + defer self.mu.Unlock() + + result := make([]*ordereddict.Dict, 0, count) + + // The file contains more data. + for self.header.WritePointer > self.header.ReadPointer { + + // Read the next chunk (length+value) from the current leased pointer. + n, err := self.fd.ReadAt(self.read_buf, self.header.ReadPointer) + if err != nil || n != len(self.read_buf) { + self.log_ctx.Error("Possible corruption detected: file too short.") + self._Truncate() + return nil + } + + length := int64(binary.LittleEndian.Uint64(self.read_buf)) + // File might be corrupt - just reset the + // entire file. + if length > constants.MAX_MEMORY*2 || length <= 0 { + self.log_ctx.Error("Possible corruption detected - item length is too large.") + self._Truncate() + return nil + } + + // Unmarshal one item at a time. + serialized := make([]byte, length) + n, _ = self.fd.ReadAt(serialized, self.header.ReadPointer+8) + if int64(n) != length { + self.log_ctx.Errorf( + "Possible corruption detected - expected item of length %v received %v.", + length, n) + self._Truncate() + return nil + } + + item := ordereddict.NewDict() + err = item.UnmarshalJSON(serialized) + if err == nil { + result = append(result, item) + } + + self.header.ReadPointer += 8 + int64(n) + // We read up to the write pointer, we may truncate the file now. + if self.header.ReadPointer == self.header.WritePointer { + self._Truncate() + } + + if len(result) >= count { + break + } + } + + return result +} + +// _Truncate returns the file to a virgin state. Assumes +// FileBasedRingBuffer is already under lock. +func (self *FileBasedRingBuffer) _Truncate() { + _ = self.fd.Truncate(0) + self.header.ReadPointer = FirstRecordOffset + self.header.WritePointer = FirstRecordOffset + serialized, _ := self.header.MarshalBinary() + _, _ = self.fd.WriteAt(serialized, 0) +} + +func (self *FileBasedRingBuffer) Reset() { + self.mu.Lock() + defer self.mu.Unlock() + + self._Truncate() +} + +// Closes the underlying file and shut down the readers. +func (self *FileBasedRingBuffer) Close() { + self.fd.Close() +} + +func NewFileBasedRingBuffer( + config_obj *config_proto.Config, fd *os.File) (*FileBasedRingBuffer, error) { + + log_ctx := logging.GetLogger(config_obj, &logging.FrontendComponent) + + header := &Header{ + // Pad the header a bit to allow for extensions. + WritePointer: FirstRecordOffset, + ReadPointer: FirstRecordOffset, + } + data := make([]byte, FirstRecordOffset) + n, err := fd.ReadAt(data, 0) + if n > 0 && n < FirstRecordOffset && err == io.EOF { + log_ctx.Error("Possible corruption detected: file too short.") + err = fd.Truncate(0) + if err != nil { + return nil, err + } + } + + if n > 0 && (err == nil || err == io.EOF) { + err := header.UnmarshalBinary(data[:n]) + // The header is not valid, truncate the file and + // start again. + if err != nil { + log_ctx.Errorf("Possible corruption detected: %v.", err) + err = fd.Truncate(0) + if err != nil { + return nil, err + } + } + } + + result := &FileBasedRingBuffer{ + config_obj: config_obj, + fd: fd, + header: header, + read_buf: make([]byte, 8), + write_buf: make([]byte, 8), + log_ctx: log_ctx, + } + + return result, nil +} diff --git a/file_store/directory/queue.go b/file_store/directory/queue.go index 00fee83806a..2b7628ec623 100644 --- a/file_store/directory/queue.go +++ b/file_store/directory/queue.go @@ -1,21 +1,279 @@ +// A Queue manager that uses files on disk. + +// The queue manager is a broken between writers and readers. Writers +// want to emit a message to a queue with minimumal delay, and have +// the message dispatched to all readers with minimal latency. + +// A memory queue simply pushes the message to all reader's via a +// buffered channel. As long as the channel buffer remains available +// this works well with very minimal latency in broadcasting to +// readers. However, when the channel becomes full the writers may be +// blocked while readers are working their way through the channel. + +// This queue manager uses a combination of a channel and a disk file +// to buffer messages for readers. When a writer writes to the queue +// manager, the manager attempts to write on the channel but if it is +// not available, then writer switches to a ring buffer file on disk. +// A separate go routine drains the disk file into the channel +// periodically. Therefore, we never block the writer - either the +// message is delivered immediately to the buffered channel, or it is +// written to disk and later delivered. + +// This low latency property is critical because queue managers are +// used to deliver messages in critical code paths and can not be +// delayed. + package directory import ( + "context" + "fmt" + "io/ioutil" + "os" + "sync" + "time" + "github.com/Velocidex/ordereddict" config_proto "www.velocidex.com/golang/velociraptor/config/proto" "www.velocidex.com/golang/velociraptor/file_store/api" - "www.velocidex.com/golang/velociraptor/file_store/memory" "www.velocidex.com/golang/velociraptor/file_store/result_sets" "www.velocidex.com/golang/velociraptor/utils" ) -var ( - pool = memory.NewQueuePool() -) +// A listener wraps a channel that our client will listen on. We send +// the message to each listener that is subscribed to the queue. +type Listener struct { + id int64 + mu sync.Mutex + + // The consumer interested in these events. The consumer may + // block arbitrarily. + output chan *ordereddict.Dict + + // We receive events on this channel - we guarantee this does + // not block for long. + input chan *ordereddict.Dict + + // A backup file to store extra messages. + file_buffer *FileBasedRingBuffer + + // Name of the file_buffer + tmpfile string + cancel func() +} + +// Should not block - very fast. +func (self *Listener) Send(item *ordereddict.Dict) { + self.input <- item +} + +func (self *Listener) Close() { + self.cancel() + self.file_buffer.Close() + + // Close the output channel so our listener will exit. + close(self.output) + + os.Remove(self.tmpfile) // clean up file buffer +} + +func (self *Listener) Debug() *ordereddict.Dict { + result := ordereddict.NewDict().Set("BackingFile", self.tmpfile) + st, _ := os.Stat(self.tmpfile) + result.Set("Size", int64(st.Size())) + + return result +} + +func NewListener(config_obj *config_proto.Config, ctx context.Context, + output chan *ordereddict.Dict) (*Listener, error) { + + tmpfile, err := ioutil.TempFile("", "journal") + if err != nil { + return nil, err + } + + file_buffer, err := NewFileBasedRingBuffer(config_obj, tmpfile) + if err != nil { + return nil, err + } + + subctx, cancel := context.WithCancel(ctx) + self := &Listener{ + id: time.Now().UnixNano(), + cancel: cancel, + input: make(chan *ordereddict.Dict), + output: output, + file_buffer: file_buffer, + tmpfile: tmpfile.Name(), + } + + // Pump messages from input channel and distribute to + // output. If output is busy we divert to the file buffer. + go func() { + defer cancel() + + for { + select { + case <-subctx.Done(): + return + + case item, ok := <-self.input: + if !ok { + return + } + select { + case <-subctx.Done(): + return + + // If we can immediately push + // to the output, do so + case self.output <- item: + + // Otherwise push to the file. + default: + self.file_buffer.Enqueue(item) + } + } + } + + }() + + // Pump messages from the file_buffer to our listeners. + go func() { + for { + // Wait here until the file has some data in it. + select { + case <-subctx.Done(): + return + + case <-time.After(time.Second): + // Get some messages from the file. + for _, item := range self.file_buffer.Lease(100) { + select { + case <-subctx.Done(): + return + case self.output <- item: + } + } + } + } + }() + + return self, nil +} + +// A Queue manages a set of registrations at a specific queue name +// (artifact name). +type QueuePool struct { + mu sync.Mutex + + config_obj *config_proto.Config + + registrations map[string][]*Listener +} + +func (self *QueuePool) Register( + ctx context.Context, vfs_path string) (<-chan *ordereddict.Dict, func()) { + self.mu.Lock() + defer self.mu.Unlock() + + output_chan := make(chan *ordereddict.Dict) + + registrations := self.registrations[vfs_path] + new_registration, err := NewListener(self.config_obj, ctx, output_chan) + if err != nil { + close(output_chan) + return output_chan, func() {} + } + + registrations = append(registrations, new_registration) + + self.registrations[vfs_path] = registrations + + return output_chan, func() { + self.unregister(vfs_path, new_registration.id) + } +} + +// This holds a lock on the entire pool and it is used when the system +// shuts down so not very often. +func (self *QueuePool) unregister(vfs_path string, id int64) { + self.mu.Lock() + defer self.mu.Unlock() + + registrations, pres := self.registrations[vfs_path] + if pres { + new_registrations := make([]*Listener, 0, len(registrations)) + for _, item := range registrations { + if id == item.id { + item.Close() + } else { + new_registrations = append(new_registrations, + item) + } + } + + self.registrations[vfs_path] = new_registrations + } +} + +// Make a copy of the registrations under lock and then we can take +// our time to send them later. +func (self *QueuePool) getRegistrations(vfs_path string) []*Listener { + self.mu.Lock() + defer self.mu.Unlock() + + registrations, ok := self.registrations[vfs_path] + if ok { + // Make a copy of the registrations for sending this + // message. + return append([]*Listener{}, registrations...) + } + + return nil +} + +func (self *QueuePool) Broadcast(vfs_path string, row *ordereddict.Dict) { + // Ensure we do not hold the lock for very long here. + for _, item := range self.getRegistrations(vfs_path) { + item.Send(row) + } +} + +func (self *QueuePool) Debug() *ordereddict.Dict { + self.mu.Lock() + defer self.mu.Unlock() + + result := ordereddict.NewDict() + for k, v := range self.registrations { + listeners := ordereddict.NewDict() + for idx, l := range v { + listeners.Set(fmt.Sprintf("%v", idx), l.Debug()) + } + result.Set(k, listeners) + } + return result +} + +func NewQueuePool(config_obj *config_proto.Config) *QueuePool { + return &QueuePool{ + config_obj: config_obj, + registrations: make(map[string][]*Listener), + } +} type DirectoryQueueManager struct { - FileStore api.FileStore - Clock utils.Clock + mu sync.Mutex + + queue_pool *QueuePool + FileStore api.FileStore + config_obj *config_proto.Config + Clock utils.Clock +} + +func (self *DirectoryQueueManager) Debug() *ordereddict.Dict { + return self.queue_pool.Debug() } func (self *DirectoryQueueManager) PushEventRows( @@ -32,20 +290,22 @@ func (self *DirectoryQueueManager) PushEventRows( // Set a timestamp per event for easier querying. row.Set("_ts", int(self.Clock.Now().Unix())) rs_writer.Write(row) - pool.Broadcast(path_manager.GetQueueName(), row) + self.queue_pool.Broadcast(path_manager.GetQueueName(), row) } return nil } -func (self *DirectoryQueueManager) Watch( +func (self *DirectoryQueueManager) Watch(ctx context.Context, queue_name string) (output <-chan *ordereddict.Dict, cancel func()) { - return pool.Register(queue_name) + return self.queue_pool.Register(ctx, queue_name) } func NewDirectoryQueueManager(config_obj *config_proto.Config, file_store api.FileStore) api.QueueManager { return &DirectoryQueueManager{ - FileStore: file_store, - Clock: utils.RealClock{}, + FileStore: file_store, + config_obj: config_obj, + queue_pool: NewQueuePool(config_obj), + Clock: utils.RealClock{}, } } diff --git a/file_store/directory/queue_test.go b/file_store/directory/queue_test.go index e73a8131ebc..b1f57431786 100644 --- a/file_store/directory/queue_test.go +++ b/file_store/directory/queue_test.go @@ -1,15 +1,34 @@ -package directory +package directory_test import ( + "context" "io/ioutil" "os" "testing" + "time" + "github.com/Velocidex/ordereddict" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "www.velocidex.com/golang/velociraptor/config" + config_proto "www.velocidex.com/golang/velociraptor/config/proto" "www.velocidex.com/golang/velociraptor/file_store/api" + "www.velocidex.com/golang/velociraptor/file_store/directory" "www.velocidex.com/golang/velociraptor/file_store/memory" + "www.velocidex.com/golang/velociraptor/file_store/test_utils" + "www.velocidex.com/golang/velociraptor/paths/artifacts" + "www.velocidex.com/golang/velociraptor/services" + "www.velocidex.com/golang/velociraptor/services/journal" + "www.velocidex.com/golang/velociraptor/services/repository" + "www.velocidex.com/golang/velociraptor/utils" +) + +var ( + monitoringArtifact = ` +name: TestQueue +type: SERVER_EVENT +` ) func TestDirectoryQueueManager(t *testing.T) { @@ -23,6 +42,121 @@ func TestDirectoryQueueManager(t *testing.T) { config_obj.Datastore.FilestoreDirectory = dir config_obj.Datastore.Location = dir - manager := NewDirectoryQueueManager(config_obj, memory.Test_memory_file_store) + manager := directory.NewDirectoryQueueManager(config_obj, memory.Test_memory_file_store) suite.Run(t, api.NewQueueManagerTestSuite(config_obj, manager, memory.Test_memory_file_store)) } + +type TestSuite struct { + suite.Suite + config_obj *config_proto.Config + client_id string + sm *services.Service + dir string +} + +func (self *TestSuite) SetupTest() { + dir, err := ioutil.TempDir("", "file_store_test") + assert.NoError(self.T(), err) + self.dir = dir + + os.Setenv("temp", dir) + + self.config_obj, err = new(config.Loader).WithFileLoader( + "../../http_comms/test_data/server.config.yaml"). + WithRequiredFrontend().WithWriteback(). + LoadAndValidate() + require.NoError(self.T(), err) + + // Start essential services. + ctx, _ := context.WithTimeout(context.Background(), time.Second*60) + self.sm = services.NewServiceManager(ctx, self.config_obj) + + require.NoError(self.T(), self.sm.Start(journal.StartJournalService)) + require.NoError(self.T(), self.sm.Start(repository.StartRepositoryManager)) + + self.client_id = "C.12312" +} + +func (self *TestSuite) TearDownTest() { + self.sm.Close() + test_utils.GetMemoryFileStore(self.T(), self.config_obj).Clear() + test_utils.GetMemoryDataStore(self.T(), self.config_obj).Clear() + os.RemoveAll(self.dir) // clean up + +} + +func (self *TestSuite) TestQueueManager() { + repo_manager, err := services.GetRepositoryManager() + assert.NoError(self.T(), err) + + repository, err := repo_manager.GetGlobalRepository(self.config_obj) + assert.NoError(self.T(), err) + + _, err = repository.LoadYaml(monitoringArtifact, true) + assert.NoError(self.T(), err) + + file_store := test_utils.GetMemoryFileStore(self.T(), self.config_obj) + manager := directory.NewDirectoryQueueManager( + self.config_obj, file_store).(*directory.DirectoryQueueManager) + + // Push some rows to the queue manager + ctx := context.Background() + + reader, cancel := manager.Watch(ctx, "TestQueue") + + path_manager := artifacts.NewMonitoringArtifactLogPathManager(self.config_obj, + "C.123", "TestQueue") + + // Query the state of the manager for testing. + dbg := manager.Debug() + // The initial size is zero + assert.Equal(self.T(), int64(0), utils.GetInt64(dbg, "TestQueue.0.Size")) + + // Push some rows without reading - this should write to the + // file buffer and not block. + for i := 0; i < 10; i++ { + err = manager.PushEventRows(path_manager, []*ordereddict.Dict{ + ordereddict.NewDict(). + Set("Foo", "Bar"), + }) + assert.NoError(self.T(), err) + } + + // The file should contain all the rows now. + dbg = manager.Debug() + + // File size is not accurate due to timestamps + assert.Greater(self.T(), utils.GetInt64(dbg, "TestQueue.0.Size"), int64(300)) + + // Now read all the rows from the file. + count := 0 + for row := range reader { + count++ + assert.Equal(self.T(), "Bar", utils.GetString(row, "Foo")) + + // Break on the 10th row + if count >= 10 { + break + } + } + + // Now check the file - it should be truncated since we read all messages. + dbg = manager.Debug() + assert.Equal(self.T(), int64(50), utils.GetInt64(dbg, "TestQueue.0.Size")) + + // Now cancel the watcher - further reads from the channel + // should not block - the channel is closed. + cancel() + + for range reader { + } + + // Now make sure the tempfile is removed. + tempfile := utils.GetString(dbg, "TestQueue.0.BackingFile") + _, err = os.Stat(tempfile) + assert.Error(self.T(), err) +} + +func TestFileBasedQueueManager(t *testing.T) { + suite.Run(t, &TestSuite{}) +} diff --git a/file_store/memory/queue.go b/file_store/memory/queue.go index 801337acca8..a6f1fa8a08d 100644 --- a/file_store/memory/queue.go +++ b/file_store/memory/queue.go @@ -8,28 +8,46 @@ package memory import ( + "context" "sync" "time" "github.com/Velocidex/ordereddict" config_proto "www.velocidex.com/golang/velociraptor/config/proto" "www.velocidex.com/golang/velociraptor/file_store/api" + "www.velocidex.com/golang/velociraptor/logging" "www.velocidex.com/golang/velociraptor/utils" ) var ( - pool = NewQueuePool() + mu sync.Mutex + pool *QueuePool ) +func GlobalQueuePool(config_obj *config_proto.Config) *QueuePool { + mu.Lock() + defer mu.Unlock() + + if pool != nil { + return pool + } + + pool = NewQueuePool(config_obj) + return pool +} + // A queue pool is an in-process listener for events. type Listener struct { id int64 Channel chan *ordereddict.Dict + name string } type QueuePool struct { mu sync.Mutex + config_obj *config_proto.Config + registrations map[string][]*Listener } @@ -41,6 +59,7 @@ func (self *QueuePool) Register(vfs_path string) (<-chan *ordereddict.Dict, func new_registration := &Listener{ Channel: make(chan *ordereddict.Dict, 1000), id: time.Now().UnixNano(), + name: vfs_path, } registrations = append(registrations, new_registration) @@ -51,6 +70,8 @@ func (self *QueuePool) Register(vfs_path string) (<-chan *ordereddict.Dict, func } } +// This holds a lock on the entire pool and it is used when the system +// shuts down so not very often. func (self *QueuePool) unregister(vfs_path string, id int64) { self.mu.Lock() defer self.mu.Unlock() @@ -71,28 +92,47 @@ func (self *QueuePool) unregister(vfs_path string, id int64) { } } -func (self *QueuePool) Broadcast(vfs_path string, row *ordereddict.Dict) { +// Make a copy of the registrations under lock and then we can take +// our time to send them later. +func (self *QueuePool) getRegistrations(vfs_path string) []*Listener { self.mu.Lock() defer self.mu.Unlock() registrations, ok := self.registrations[vfs_path] if ok { - for _, item := range registrations { - item.Channel <- row + // Make a copy of the registrations for sending this + // message. + return append([]*Listener{}, registrations...) + } + + return nil +} + +func (self *QueuePool) Broadcast(vfs_path string, row *ordereddict.Dict) { + // Ensure we do not hold the lock for very long here. + for _, item := range self.getRegistrations(vfs_path) { + select { + case item.Channel <- row: + case <-time.After(2 * time.Second): + logger := logging.GetLogger( + self.config_obj, &logging.FrontendComponent) + logger.Error("QueuePool: Dropping message to queue %v", + item.name) } } } -func NewQueuePool() *QueuePool { +func NewQueuePool(config_obj *config_proto.Config) *QueuePool { return &QueuePool{ + config_obj: config_obj, registrations: make(map[string][]*Listener), } } type MemoryQueueManager struct { - FileStore api.FileStore - - Clock utils.Clock + FileStore api.FileStore + config_obj *config_proto.Config + Clock utils.Clock } func (self *MemoryQueueManager) Debug() { @@ -106,7 +146,7 @@ func (self *MemoryQueueManager) PushEventRows( path_manager api.PathManager, dict_rows []*ordereddict.Dict) error { for _, row := range dict_rows { - pool.Broadcast(path_manager.GetQueueName(), + GlobalQueuePool(self.config_obj).Broadcast(path_manager.GetQueueName(), row.Set("_ts", int(self.Clock.Now().Unix()))) } @@ -131,14 +171,15 @@ func (self *MemoryQueueManager) PushEventRows( } func (self *MemoryQueueManager) Watch( - queue_name string) (output <-chan *ordereddict.Dict, cancel func()) { - return pool.Register(queue_name) + ctx context.Context, queue_name string) (output <-chan *ordereddict.Dict, cancel func()) { + return GlobalQueuePool(self.config_obj).Register(queue_name) } func NewMemoryQueueManager(config_obj *config_proto.Config, file_store api.FileStore) api.QueueManager { return &MemoryQueueManager{ - FileStore: file_store, - Clock: utils.RealClock{}, + FileStore: file_store, + config_obj: config_obj, + Clock: utils.RealClock{}, } } diff --git a/file_store/mysql/queue.go b/file_store/mysql/queue.go index 943dc75fdb8..4b935387759 100644 --- a/file_store/mysql/queue.go +++ b/file_store/mysql/queue.go @@ -30,6 +30,7 @@ package mysql import ( + "context" "database/sql" "fmt" "sync" @@ -242,7 +243,8 @@ func (self *MysqlQueueManager) PushEventRows( return err } -func (self *MysqlQueueManager) Watch(queue_name string) (<-chan *ordereddict.Dict, func()) { +func (self *MysqlQueueManager) Watch( + ctx context.Context, queue_name string) (<-chan *ordereddict.Dict, func()) { return pool.Register(queue_name) } diff --git a/file_store/result_sets/events_test.go b/file_store/result_sets/events_test.go index d266990a6a9..8167d0d9221 100644 --- a/file_store/result_sets/events_test.go +++ b/file_store/result_sets/events_test.go @@ -177,10 +177,9 @@ func (self *TimedResultSetTestSuite) TestTimedResultSets() { clock := &utils.MockClock{MockNow: now} // Start off by writing some events on a queue. - qm := &directory.DirectoryQueueManager{ - FileStore: self.file_store, - Clock: clock, - } + qm := directory.NewDirectoryQueueManager( + self.config_obj, self.file_store).(*directory.DirectoryQueueManager) + qm.Clock = clock path_manager := artifacts.NewArtifactPathManager( self.config_obj, diff --git a/flows/artifacts.go b/flows/artifacts.go index 5292afb2c0d..c4305df65ea 100644 --- a/flows/artifacts.go +++ b/flows/artifacts.go @@ -1,5 +1,5 @@ /* - Velociraptor - Hunting Evil +o Velociraptor - Hunting Evil Copyright (C) 2019 Velocidex Innovations. This program is free software: you can redistribute it and/or modify @@ -55,6 +55,8 @@ var ( Name: "uploaded_bytes", Help: "Total bytes of Uploaded Files.", }) + + notModified = errors.New("Not modified") ) // closeContext is called after all messages from the clients are @@ -420,15 +422,21 @@ func IsRequestComplete( // Update any hunts if needed. if constants.HuntIdRegex.MatchString(collection_context.Request.Creator) { - err := services.GetHuntDispatcher().ModifyHunt( + dispatcher := services.GetHuntDispatcher() + if dispatcher == nil { + return false, errors.New("Hunt dispatcher not valid") + } + + err := dispatcher.ModifyHunt( collection_context.Request.Creator, func(hunt *api_proto.Hunt) error { if hunt != nil && hunt.Stats != nil { hunt.Stats.TotalClientsWithResults++ + return nil } - return nil + return notModified }) - if err != nil { + if err != nil && err != notModified { return true, err } } diff --git a/flows/foreman.go b/flows/foreman.go index f6a6f090248..7aae9a85861 100644 --- a/flows/foreman.go +++ b/flows/foreman.go @@ -134,6 +134,10 @@ func ForemanProcessMessage( return err } + notifier := services.GetNotifier() + if notifier == nil { + return errors.New("Notifier not configured") + } return services.GetNotifier().NotifyListener(config_obj, client_id) }) } diff --git a/flows/hunts.go b/flows/hunts.go index 9e6f0d0a6e3..006a1c2534e 100644 --- a/flows/hunts.go +++ b/flows/hunts.go @@ -51,6 +51,8 @@ func GetNewHuntId() string { return constants.HUNT_PREFIX + string(result) } +// Backwards compatibility: Figure out the list of collected hunts +// from the hunt object's request func FindCollectedArtifacts( config_obj *config_proto.Config, hunt *api_proto.Hunt) { @@ -59,6 +61,11 @@ func FindCollectedArtifacts( return } + // Hunt already has artifacts list. + if len(hunt.Artifacts) > 0 { + return + } + hunt.Artifacts = hunt.StartRequest.Artifacts hunt.ArtifactSources = []string{} for _, artifact := range hunt.StartRequest.Artifacts { @@ -124,6 +131,17 @@ func CreateHunt( return "", errors.New("Hunt expiry is in the past!") } + // Set the artifacts information in the hunt object itself. + hunt.Artifacts = hunt.StartRequest.Artifacts + hunt.ArtifactSources = []string{} + for _, artifact := range hunt.StartRequest.Artifacts { + for _, source := range GetArtifactSources( + config_obj, artifact) { + hunt.ArtifactSources = append( + hunt.ArtifactSources, path.Join(artifact, source)) + } + } + manager, err := services.GetRepositoryManager() if err != nil { return "", err @@ -194,7 +212,12 @@ func ListHunts(config_obj *config_proto.Config, in *api_proto.ListHuntsRequest) result := &api_proto.ListHuntsResponse{} - err := services.GetHuntDispatcher().ApplyFuncOnHunts( + dispatcher := services.GetHuntDispatcher() + if dispatcher == nil { + return nil, errors.New("Hunt dispatcher not initialized") + } + + err := dispatcher.ApplyFuncOnHunts( func(hunt *api_proto.Hunt) error { if uint64(len(result.Items)) < in.Offset { return nil @@ -205,10 +228,6 @@ func ListHunts(config_obj *config_proto.Config, in *api_proto.ListHuntsRequest) } if in.IncludeArchived || hunt.State != api_proto.Hunt_ARCHIVED { - - // FIXME: Backwards compatibility. - hunt.HuntId = path.Base(hunt.HuntId) - result.Items = append(result.Items, hunt) } return nil @@ -230,23 +249,24 @@ func GetHunt(config_obj *config_proto.Config, in *api_proto.GetHuntRequest) ( var result *api_proto.Hunt - err = services.GetHuntDispatcher().ModifyHunt( - in.HuntId, + dispatcher := services.GetHuntDispatcher() + if dispatcher == nil { + return nil, errors.New("Hunt dispatcher not valid") + } + + err = dispatcher.ModifyHunt(in.HuntId, func(hunt_obj *api_proto.Hunt) error { - // Make a copy + // Make a copy of the hunt result = proto.Clone(hunt_obj).(*api_proto.Hunt) - // HACK: Velociraptor only knows how to - // collect artifacts now. Eventually the whole - // concept of a flow will go away but for now - // we need to figure out which artifacts we - // are actually collecting - there are not - // many possibilities since we have reduced - // the number of possible flows significantly. - FindCollectedArtifacts(config_obj, result) - - return nil + // We do not modify the hunt so it is not dirty. + return notModified }) + if err != notModified { + return nil, err + } + + FindCollectedArtifacts(config_obj, result) if result == nil || result.Stats == nil { return result, errors.New("Not found") @@ -254,7 +274,7 @@ func GetHunt(config_obj *config_proto.Config, in *api_proto.GetHuntRequest) ( result.Stats.AvailableDownloads, _ = availableHuntDownloadFiles(config_obj, in.HuntId) - return result, err + return result, nil } // availableHuntDownloadFiles returns the prepared zip downloads available to @@ -275,12 +295,6 @@ func availableHuntDownloadFiles(config_obj *config_proto.Config, // will update the StartTime. // 2. A hunt in the running state can go to the Stop state // 3. A hunt's description can be modified. - -// It is not possible to restart a stopped hunt. This is because the -// hunt manager watches the hunt participation events for all hunts at -// the same time, and just ignores clients that want to participate in -// stopped hunts. It is not possible to go back and re-examine the -// queue. func ModifyHunt( ctx context.Context, config_obj *config_proto.Config, @@ -335,19 +349,9 @@ func ModifyHunt( hunt.State = api_proto.Hunt_STOPPED } - // Write the new hunt object to the datastore. - db, err := datastore.GetDB(config_obj) - if err != nil { - return err - } - - hunt_path_manager := paths.NewHuntPathManager(hunt.HuntId) - err = db.SetSubject( - config_obj, hunt_path_manager.Path(), hunt) - if err != nil { - return err - } - + // Returning nil indicates to the hunt manager + // that the hunt was successfully modified. It + // will then flush it to disk. return nil }) diff --git a/gui/velociraptor/src/components/core/api-service.js b/gui/velociraptor/src/components/core/api-service.js index 606686f16b8..d502e2dd32a 100644 --- a/gui/velociraptor/src/components/core/api-service.js +++ b/gui/velociraptor/src/components/core/api-service.js @@ -38,6 +38,13 @@ const get = function(url, params, cancel_token) { url: api_handlers + url, params: params, cancelToken: cancel_token, + }).then(response=>{ + // Update the csrf token. + let token = response.headers["x-csrf-token"]; + if (token && token.length > 0) { + window.CsrfToken = token; + } + return response; }).catch(handle_error); }; diff --git a/gui/velociraptor/src/components/forms/validated_int.js b/gui/velociraptor/src/components/forms/validated_int.js index 3014dae2a78..69664914fd6 100644 --- a/gui/velociraptor/src/components/forms/validated_int.js +++ b/gui/velociraptor/src/components/forms/validated_int.js @@ -21,9 +21,13 @@ export default class ValidatedInteger extends React.Component { render() { let value = this.props.value; + + // Need to set the initial value to '' to tell React this is a + // controlled component. if (_.isUndefined(value)) { - value = 0; + value = ''; } + return ( <> self.GetLastTimestamp() { + atomic.StoreUint64(&self.last_timestamp, hunt_obj.StartTime) + } - // The hunts start time could have been modified - we need to - // update ours then. - if hunt_obj.StartTime > self.GetLastTimestamp() { - atomic.StoreUint64(&self.last_timestamp, hunt_obj.StartTime) + self.dirty = true } - - self.dirty = true - return err } diff --git a/services/hunt_manager/hunt_manager.go b/services/hunt_manager/hunt_manager.go index a4d16ec9f03..98b4040c01f 100644 --- a/services/hunt_manager/hunt_manager.go +++ b/services/hunt_manager/hunt_manager.go @@ -113,7 +113,7 @@ func (self *HuntManager) StartParticipation( if err != nil { return err } - qm_chan, cancel := journal.Watch("System.Hunt.Participation") + qm_chan, cancel := journal.Watch(ctx, "System.Hunt.Participation") wg.Add(1) go func() { @@ -149,7 +149,7 @@ func (self *HuntManager) StartFlowCompletion( return err } - qm_chan, cancel := journal.Watch("System.Flow.Completion") + qm_chan, cancel := journal.Watch(ctx, "System.Flow.Completion") wg.Add(1) go func() { @@ -173,6 +173,8 @@ func (self *HuntManager) StartFlowCompletion( return nil } +// Watch for all flows created by a hunt and maintain the list of hunt +// completions. func (self *HuntManager) ProcessFlowCompletion( ctx context.Context, config_obj *config_proto.Config, @@ -271,14 +273,17 @@ func (self *HuntManager) ProcessRow( return } - request := &flows_proto.ArtifactCollectorArgs{ - ClientId: participation_row.ClientId, - Creator: participation_row.HuntId, - } + // Prepare a collection request + request := &flows_proto.ArtifactCollectorArgs{} // Get hunt information about this hunt. now := uint64(time.Now().UnixNano() / 1000) - err = services.GetHuntDispatcher().ModifyHunt( + dispatcher := services.GetHuntDispatcher() + if dispatcher == nil { + return + } + + err = dispatcher.ModifyHunt( participation_row.HuntId, func(hunt_obj *api_proto.Hunt) error { if hunt_obj.Stats == nil { @@ -352,6 +357,11 @@ func (self *HuntManager) ProcessRow( // Direct the request against our client and schedule it. request.ClientId = participation_row.ClientId + + // Make sure the flow is created by the hunt - this is how we + // track it. + request.Creator = participation_row.HuntId + flow_id, err := launcher.ScheduleArtifactCollection( ctx, config_obj, vql_subsystem.NullACLManager{}, repository, request) if err != nil { diff --git a/services/interrogation/interrogation.go b/services/interrogation/interrogation.go index 2d1732a9bbc..c5a9c46c811 100644 --- a/services/interrogation/interrogation.go +++ b/services/interrogation/interrogation.go @@ -49,7 +49,7 @@ func (self *EnrollmentService) Start( return err } - events, cancel := journal.Watch("Server.Internal.Enrollment") + events, cancel := journal.Watch(ctx, "Server.Internal.Enrollment") wg.Add(1) go func() { diff --git a/services/interrogation/utils.go b/services/interrogation/utils.go index ed0d6f30d0c..c311b188bed 100644 --- a/services/interrogation/utils.go +++ b/services/interrogation/utils.go @@ -30,7 +30,7 @@ func watchForFlowCompletion( return err } - events, cancel := journal.Watch("System.Flow.Completion") + events, cancel := journal.Watch(ctx, "System.Flow.Completion") manager, err := services.GetRepositoryManager() if err != nil { return err diff --git a/services/journal.go b/services/journal.go index 74109751c09..7b79dcd38db 100644 --- a/services/journal.go +++ b/services/journal.go @@ -15,6 +15,7 @@ package services // in real time from client event artifacts. import ( + "context" "errors" "sync" @@ -52,7 +53,8 @@ type JournalService interface { // Watch the artifact named by queue_name for new rows. This // only makes sense for artifacts of type CLIENT_EVENT and // SERVER_EVENT - Watch(queue_name string) (output <-chan *ordereddict.Dict, cancel func()) + Watch(ctx context.Context, + queue_name string) (output <-chan *ordereddict.Dict, cancel func()) // Push the rows into the datastore in the location give by // the path manager. diff --git a/services/journal/journal.go b/services/journal/journal.go index f50877d6c96..c3d9bf821db 100644 --- a/services/journal/journal.go +++ b/services/journal/journal.go @@ -27,7 +27,8 @@ type JournalService struct { qm api.QueueManager } -func (self *JournalService) Watch(queue_name string) ( +func (self *JournalService) Watch( + ctx context.Context, queue_name string) ( output <-chan *ordereddict.Dict, cancel func()) { if self == nil || self.qm == nil { @@ -35,7 +36,7 @@ func (self *JournalService) Watch(queue_name string) ( return nil, func() {} } - return self.qm.Watch(queue_name) + return self.qm.Watch(ctx, queue_name) } func (self *JournalService) PushRowsToArtifact( diff --git a/services/labels/labels.go b/services/labels/labels.go index 6adbdc0b362..a37f4f2975e 100644 --- a/services/labels/labels.go +++ b/services/labels/labels.go @@ -370,7 +370,7 @@ func (self *Labeler) Start(ctx context.Context, return err } - events, cancel := journal.Watch("Server.Internal.Label") + events, cancel := journal.Watch(ctx, "Server.Internal.Label") wg.Add(1) go func() { diff --git a/services/notifications/notifications.go b/services/notifications/notifications.go index 896fa01926d..5617dc0d1f5 100644 --- a/services/notifications/notifications.go +++ b/services/notifications/notifications.go @@ -60,7 +60,7 @@ func StartNotificationService( if err != nil { return err } - events, cancel := journal.Watch("Server.Internal.Notifications") + events, cancel := journal.Watch(ctx, "Server.Internal.Notifications") wg.Add(1) go func() { @@ -74,6 +74,7 @@ func StartNotificationService( self.notification_pool.Shutdown() self.notification_pool = nil }() + defer logger.Info("Exiting notification service!") for { select { @@ -90,7 +91,6 @@ func StartNotificationService( continue } - self.pool_mu.Lock() if target == "Regex" { regex_str, ok := event.GetString("Regex") if ok { @@ -109,7 +109,6 @@ func StartNotificationService( } else { self.notification_pool.Notify(target) } - self.pool_mu.Unlock() } } }() diff --git a/services/server_monitoring/server_monitoring.go b/services/server_monitoring/server_monitoring.go index cbcb320414e..1b08ec818ec 100644 --- a/services/server_monitoring/server_monitoring.go +++ b/services/server_monitoring/server_monitoring.go @@ -47,6 +47,9 @@ func (self *EventTable) Close() { self.mu.Lock() defer self.mu.Unlock() + logger := logging.GetLogger(self.config_obj, &logging.FrontendComponent) + logger.Info("Closing Server Monitoring Event table") + // Close the old table. if self.cancel != nil { self.cancel() @@ -117,6 +120,7 @@ func (self *EventTable) Update( // Run each collection separately in parallel. for _, vql_request := range vql_requests { + err = self.RunQuery(ctx, config_obj, self.wg, vql_request) if err != nil { return err @@ -197,7 +201,6 @@ func (self *EventTable) RunQuery( for _, query := range vql_request.Query { vql, err := vfilter.Parse(query.VQL) if err != nil { - scope.Log("server_monitoring: %v", err) return } @@ -217,7 +220,6 @@ func (self *EventTable) RunQuery( rs_writer.Write(vfilter.RowToDict(ctx, scope, row). Set("_ts", self.Clock.Now().Unix())) rs_writer.Flush() - } } } diff --git a/services/services.go b/services/services.go index 8b7ad12e9de..592b53a3666 100644 --- a/services/services.go +++ b/services/services.go @@ -19,6 +19,7 @@ package services import ( "context" + "fmt" "sync" config_proto "www.velocidex.com/golang/velociraptor/config/proto" @@ -52,6 +53,7 @@ type Service struct { } func (self *Service) Close() { + fmt.Printf("Closing services\n") self.cancel() // Wait for services to exit. diff --git a/services/test_utils.go b/services/test_utils.go index d4f5a5b3eba..5bb9b140dc7 100644 --- a/services/test_utils.go +++ b/services/test_utils.go @@ -1,6 +1,7 @@ package services import ( + "context" "sync" "github.com/Velocidex/ordereddict" @@ -28,7 +29,8 @@ func GetPublishedEvents( if err != nil { return } - events, cancel := journal.Watch(artifact) + ctx := context.Background() + events, cancel := journal.Watch(ctx, artifact) defer cancel() // Wait here until we are set up. diff --git a/services/vfs_service/utils.go b/services/vfs_service/utils.go index d160a16545e..1e2d31ed870 100644 --- a/services/vfs_service/utils.go +++ b/services/vfs_service/utils.go @@ -31,12 +31,15 @@ func watchForFlowCompletion( return err } - events, cancel := journal.Watch("System.Flow.Completion") + events, cancel := journal.Watch(ctx, "System.Flow.Completion") + + logger := logging.GetLogger(config_obj, &logging.FrontendComponent) wg.Add(1) go func() { defer wg.Done() defer cancel() + defer logger.Info("Stopping watch for %v", artifact_name) builder := services.ScopeBuilder{ Config: config_obj, diff --git a/services/vfs_service/vfs_service.go b/services/vfs_service/vfs_service.go index fc2de10a64c..8c3c820c727 100644 --- a/services/vfs_service/vfs_service.go +++ b/services/vfs_service/vfs_service.go @@ -64,6 +64,8 @@ func (self *VFSService) ProcessDownloadFile( ts, _ := row.GetInt64("_ts") logger := logging.GetLogger(config_obj, &logging.FrontendComponent) + logger.Info("VFSService: Processing System.VFS.DownloadFile from %v", client_id) + flow_path_manager := paths.NewFlowPathManager(client_id, flow_id) path_manager := artifacts.NewArtifactPathManager(config_obj, @@ -127,6 +129,8 @@ func (self *VFSService) ProcessListDirectory( ts, _ := row.GetInt64("_ts") logger := logging.GetLogger(config_obj, &logging.FrontendComponent) + logger.Info("VFSService: Processing System.VFS.ListDirectory from %v", client_id) + path_manager := artifacts.NewArtifactPathManager(config_obj, client_id, flow_id, "System.VFS.ListDirectory") @@ -141,6 +145,7 @@ func (self *VFSService) ProcessListDirectory( var current_vfs_components []string = nil for row := range row_chan { + utils.Debug(row) full_path, _ := row.GetString("_FullPath") accessor, _ := row.GetString("_Accessor") name, _ := row.GetString("Name") diff --git a/utils/dict.go b/utils/dict.go index a8c6b9df870..1a5cfe90430 100644 --- a/utils/dict.go +++ b/utils/dict.go @@ -8,11 +8,11 @@ import ( // Returns the containing dict for a nested dict. This allows fetching // a key using dot notation. -func _get(dict *ordereddict.Dict, key string) *ordereddict.Dict { +func _get(dict *ordereddict.Dict, key string) (*ordereddict.Dict, string) { components := strings.Split(key, ".") // Only a single component, return the dict. if len(components) == 1 { - return dict + return dict, components[0] } // Iterate over all but the last component fetching the nested @@ -21,26 +21,26 @@ func _get(dict *ordereddict.Dict, key string) *ordereddict.Dict { for _, member := range components[:len(components)-1] { result, pres := dict.Get(member) if !pres { - return ordereddict.NewDict() + return ordereddict.NewDict(), "" } nested, ok := result.(*ordereddict.Dict) if !ok { - return ordereddict.NewDict() + return ordereddict.NewDict(), "" } dict = nested } - return dict + return dict, components[len(components)-1] } func GetString(dict *ordereddict.Dict, key string) string { - dict = _get(dict, key) - res, _ := dict.GetString(key) + subdict, last := _get(dict, key) + res, _ := subdict.GetString(last) return res } func GetInt64(dict *ordereddict.Dict, key string) int64 { - dict = _get(dict, key) - res, _ := dict.GetInt64(key) + subdict, last := _get(dict, key) + res, _ := subdict.GetInt64(last) return res } diff --git a/vql/common/for.go b/vql/common/for.go index a2dc872993f..234913201ac 100644 --- a/vql/common/for.go +++ b/vql/common/for.go @@ -64,6 +64,56 @@ func (self ForPlugin) Info(scope vfilter.Scope, type_map *vfilter.TypeMap) *vfil } } +type RangePluginArgs struct { + Start int64 `vfilter:"required,field=start,doc=Start index (0 based)"` + End int64 `vfilter:"required,field=end,doc=End index (0 based)"` + Step int64 `vfilter:"required,field=step,doc=End index (0 based)"` +} + +type RangePlugin struct{} + +func (self RangePlugin) Call( + ctx context.Context, + scope vfilter.Scope, + args *ordereddict.Dict) <-chan vfilter.Row { + output_chan := make(chan vfilter.Row) + + go func() { + defer close(output_chan) + + arg := &RangePluginArgs{} + err := vfilter.ExtractArgs(scope, args, arg) + if err != nil { + scope.Log("range: %v", err) + return + } + + if arg.Step == 0 { + arg.Step = 1 + } + + for i := arg.Start; i < arg.End; i += arg.Step { + select { + case <-ctx.Done(): + return + + case output_chan <- ordereddict.NewDict().Set("_value", i): + } + } + }() + + return output_chan +} + +func (self RangePlugin) Info(scope vfilter.Scope, type_map *vfilter.TypeMap) *vfilter.PluginInfo { + return &vfilter.PluginInfo{ + Name: "range", + Doc: "Iterate over range.", + ArgType: type_map.AddType(scope, &RangePluginArgs{}), + } +} + func init() { + vql_subsystem.RegisterPlugin(&RangePlugin{}) vql_subsystem.RegisterPlugin(&ForPlugin{}) } diff --git a/vql/functions/lists.go b/vql/functions/lists.go index f87e9a6b334..777c67f3f0d 100644 --- a/vql/functions/lists.go +++ b/vql/functions/lists.go @@ -218,7 +218,78 @@ func (self LenFunction) Info(scope vfilter.Scope, type_map *vfilter.TypeMap) *vf } } +type SliceFunctionArgs struct { + List vfilter.Any `vfilter:"required,field=list,doc=A list of items to slice"` + Start uint64 `vfilter:"required,field=start,doc=Start index (0 based)"` + End uint64 `vfilter:"required,field=end,doc=End index (0 based)"` +} +type SliceFunction struct{} + +func (self *SliceFunction) Call(ctx context.Context, + scope vfilter.Scope, + args *ordereddict.Dict) vfilter.Any { + arg := &SliceFunctionArgs{} + err := vfilter.ExtractArgs(scope, args, arg) + if err != nil { + scope.Log("len: %s", err.Error()) + return &vfilter.Null{} + } + + slice := reflect.ValueOf(arg.List) + // A slice of strings. Only the following are supported + // https://golang.org/pkg/reflect/#Value.Len + if slice.Type().Kind() == reflect.Slice || + slice.Type().Kind() == reflect.Map || + slice.Type().Kind() == reflect.Array || + slice.Type().Kind() == reflect.String { + + if arg.End > uint64(slice.Len()) { + arg.End = uint64(slice.Len()) + } + + if arg.Start > arg.End { + arg.Start = arg.End + } + + result := make([]interface{}, 0, arg.End-arg.Start) + for i := arg.Start; i < arg.End; i++ { + result = append(result, slice.Index(int(i)).Interface()) + } + + return result + } + + dict, ok := arg.List.(*ordereddict.Dict) + if ok { + keys := dict.Keys() + if arg.End > uint64(len(keys)) { + arg.End = uint64(len(keys)) + } + + if arg.Start > arg.End { + arg.Start = arg.End + } + + result := make([]interface{}, 0, arg.End-arg.Start) + for i := arg.Start; i < arg.End; i++ { + result = append(result, keys[int(i)]) + } + return result + } + + return []vfilter.Any{} +} + +func (self SliceFunction) Info(scope vfilter.Scope, type_map *vfilter.TypeMap) *vfilter.FunctionInfo { + return &vfilter.FunctionInfo{ + Name: "slice", + Doc: "Slice an array.", + ArgType: type_map.AddType(scope, &SliceFunctionArgs{}), + } +} + func init() { + vql_subsystem.RegisterFunction(&SliceFunction{}) vql_subsystem.RegisterFunction(&FilterFunction{}) vql_subsystem.RegisterFunction(&ArrayFunction{}) vql_subsystem.RegisterFunction(&JoinFunction{}) diff --git a/vql/server/hunts/create.go b/vql/server/hunts/create.go index 0a17fb2fdb7..d98f57b2425 100644 --- a/vql/server/hunts/create.go +++ b/vql/server/hunts/create.go @@ -43,6 +43,7 @@ type ScheduleHuntFunctionArg struct { OpsPerSecond float64 `vfilter:"optional,field=ops_per_sec,doc=Set query ops_per_sec value"` MaxRows uint64 `vfilter:"optional,field=max_rows,doc=Max number of rows to fetch"` MaxBytes uint64 `vfilter:"optional,field=max_bytes,doc=Max number of bytes to upload"` + Pause bool `vfilter:"optional,field=pause,doc=If specified the new hunt will be in the paused state"` } type ScheduleHuntFunction struct{} @@ -53,7 +54,7 @@ func (self *ScheduleHuntFunction) Call(ctx context.Context, err := vql_subsystem.CheckAccess(scope, acls.COLLECT_CLIENT) if err != nil { - scope.Log("flows: %s", err) + scope.Log("hunt: %s", err) return vfilter.Null{} } @@ -97,12 +98,17 @@ func (self *ScheduleHuntFunction) Call(ctx context.Context, return vfilter.Null{} } + state := api_proto.Hunt_RUNNING + if arg.Pause { + state = api_proto.Hunt_PAUSED + } + hunt_request := &api_proto.Hunt{ HuntDescription: arg.Description, Creator: vql_subsystem.GetPrincipal(scope), StartRequest: request, Expires: arg.Expires, - State: api_proto.Hunt_RUNNING, + State: state, } // Run the hunt in the ACL context of the caller. diff --git a/vql/server/monitoring.go b/vql/server/monitoring.go index 69081b4100d..ead7c532f45 100644 --- a/vql/server/monitoring.go +++ b/vql/server/monitoring.go @@ -155,7 +155,7 @@ func (self WatchMonitoringPlugin) Call( } // Ask the journal service to watch the event queue for us. - qm_chan, cancel := journal.Watch(arg.Artifact) + qm_chan, cancel := journal.Watch(ctx, arg.Artifact) defer cancel() for row := range qm_chan {