From 7c815a65c5704138477d6b18b571617830478e63 Mon Sep 17 00:00:00 2001 From: Mike Cohen Date: Fri, 14 Jun 2024 15:45:55 +1000 Subject: [PATCH] Optimize hunt_dispatcher GetFlows() API (#3561) In many cases we do not need the full enriched and current flow information (e.g. when deleting a hunt) so we need a way to specify only retrieving basic information. --- api/hunts.go | 6 +++- .../definitions/Generic/Client/Profile.yaml | 2 ++ .../definitions/Server/Monitor/Profile.yaml | 2 ++ file_store/memory/memory.go | 9 ++++++ services/client_info/storage.go | 2 +- services/hunt_dispatcher.go | 11 ++++++- services/hunt_dispatcher/flows.go | 29 +++++++++++++++---- .../hunt_dispatcher/hunt_dispatcher_test.go | 17 ++++++++--- services/hunt_dispatcher/index.go | 5 ++++ services/hunt_dispatcher/storage.go | 10 ++++++- services/launcher/delete.go | 2 +- vql/readers/paged_reader_test.go | 11 +++---- vql/server/downloads/downloads.go | 5 ++-- vql/server/flows/flow_test.go | 8 +++-- vql/server/flows/parallel.go | 7 +++-- vql/server/flows/parallel_test.go | 9 ++++-- vql/server/flows/uploads.go | 6 +++- vql/server/hunts/delete.go | 6 ++-- vql/server/hunts/hunts.go | 25 +++++++++------- 19 files changed, 127 insertions(+), 45 deletions(-) diff --git a/api/hunts.go b/api/hunts.go index 04b790ca93e..b65a110b877 100644 --- a/api/hunts.go +++ b/api/hunts.go @@ -53,7 +53,11 @@ func (self *ApiServer) GetHuntFlows( scope := vql_subsystem.MakeScope() flow_chan, total_rows, err := hunt_dispatcher.GetFlows( - ctx, org_config_obj, options, scope, in.HuntId, int(in.StartRow)) + ctx, org_config_obj, + services.FlowSearchOptions{ + ResultSetOptions: options, + }, + scope, in.HuntId, int(in.StartRow)) if err != nil { return nil, Status(self.verbose, err) } diff --git a/artifacts/definitions/Generic/Client/Profile.yaml b/artifacts/definitions/Generic/Client/Profile.yaml index ece43364cc8..4f58f9df62b 100644 --- a/artifacts/definitions/Generic/Client/Profile.yaml +++ b/artifacts/definitions/Generic/Client/Profile.yaml @@ -35,12 +35,14 @@ parameters: - name: Allocs description: A sampling of all past memory allocations type: bool + default: Y - name: Block description: Stack traces that led to blocking on synchronization primitives type: bool - name: Goroutine description: Stack traces of all current goroutines type: bool + default: Y - name: Heap description: A sampling of memory allocations of live objects type: bool diff --git a/artifacts/definitions/Server/Monitor/Profile.yaml b/artifacts/definitions/Server/Monitor/Profile.yaml index 68b476b38cb..a409847182a 100644 --- a/artifacts/definitions/Server/Monitor/Profile.yaml +++ b/artifacts/definitions/Server/Monitor/Profile.yaml @@ -33,12 +33,14 @@ parameters: - name: Allocs description: A sampling of all past memory allocations type: bool + default: Y - name: Block description: Stack traces that led to blocking on synchronization primitives type: bool - name: Goroutine description: Stack traces of all current goroutines type: bool + default: Y - name: Heap description: A sampling of memory allocations of live objects type: bool diff --git a/file_store/memory/memory.go b/file_store/memory/memory.go index fb60a0c1ee7..68043a2b2c4 100644 --- a/file_store/memory/memory.go +++ b/file_store/memory/memory.go @@ -134,6 +134,9 @@ type MemoryWriter struct { } func (self *MemoryWriter) Size() (int64, error) { + self.memory_file_store.mu.Lock() + defer self.memory_file_store.mu.Unlock() + return int64(len(self.buf)), nil } @@ -175,6 +178,9 @@ func (self *MemoryWriter) Update(data []byte, offset int64) error { func (self *MemoryWriter) Write(data []byte) (int, error) { defer api.InstrumentWithDelay("write", "MemoryWriter", nil)() + self.memory_file_store.mu.Lock() + defer self.memory_file_store.mu.Unlock() + self.buf = append(self.buf, data...) return len(data), nil } @@ -215,6 +221,9 @@ func (self *MemoryWriter) Close() error { func (self *MemoryWriter) Truncate() error { defer api.InstrumentWithDelay("truncate", "MemoryWriter", nil)() + self.memory_file_store.mu.Lock() + defer self.memory_file_store.mu.Unlock() + self.buf = nil return nil } diff --git a/services/client_info/storage.go b/services/client_info/storage.go index 6a23edabdd9..b88bf20aba7 100644 --- a/services/client_info/storage.go +++ b/services/client_info/storage.go @@ -87,7 +87,7 @@ func (self *Store) StartHouseKeep( return case <-utils.GetTime().After(utils.Jitter(utils.Jitter(delay))): - if utils.GetTime().Now().Sub(last_run) < time.Second { + if utils.GetTime().Now().Sub(last_run) < 10*time.Second { utils.SleepWithCtx(ctx, time.Minute) continue } diff --git a/services/hunt_dispatcher.go b/services/hunt_dispatcher.go index 37fc6242e75..cdd4f88c733 100644 --- a/services/hunt_dispatcher.go +++ b/services/hunt_dispatcher.go @@ -71,6 +71,15 @@ const ( OnlyRunningHunts ) +type FlowSearchOptions struct { + result_sets.ResultSetOptions + + // Additional Options for efficient search. + + // BasicInformation includes only client id and flow id. + BasicInformation bool +} + type IHuntDispatcher interface { // Applies the function on all the hunts. Functions may not // modify the hunt but will have read only access to the hunt @@ -106,7 +115,7 @@ type IHuntDispatcher interface { // Paged view into the flows in the hunt GetFlows(ctx context.Context, config_obj *config_proto.Config, - options result_sets.ResultSetOptions, scope vfilter.Scope, + options FlowSearchOptions, scope vfilter.Scope, hunt_id string, start int) ( output chan *api_proto.FlowDetails, total_rows int64, err error) diff --git a/services/hunt_dispatcher/flows.go b/services/hunt_dispatcher/flows.go index b68a0d0454b..549a9b5f63b 100644 --- a/services/hunt_dispatcher/flows.go +++ b/services/hunt_dispatcher/flows.go @@ -7,6 +7,7 @@ import ( api_proto "www.velocidex.com/golang/velociraptor/api/proto" config_proto "www.velocidex.com/golang/velociraptor/config/proto" "www.velocidex.com/golang/velociraptor/file_store" + flows_proto "www.velocidex.com/golang/velociraptor/flows/proto" "www.velocidex.com/golang/velociraptor/json" "www.velocidex.com/golang/velociraptor/logging" "www.velocidex.com/golang/velociraptor/paths" @@ -105,7 +106,7 @@ func (self *HuntDispatcher) syncFlowTables( func (self *HuntDispatcher) GetFlows( ctx context.Context, config_obj *config_proto.Config, - options result_sets.ResultSetOptions, scope vfilter.Scope, + options services.FlowSearchOptions, scope vfilter.Scope, hunt_id string, start int) (chan *api_proto.FlowDetails, int64, error) { output_chan := make(chan *api_proto.FlowDetails) @@ -124,7 +125,7 @@ func (self *HuntDispatcher) GetFlows( file_store_factory := file_store.GetFileStore(config_obj) rs_reader, err := result_sets.NewResultSetReaderWithOptions( ctx, self.config_obj, file_store_factory, - table_to_query, options) + table_to_query, options.ResultSetOptions) if err != nil { close(output_chan) return output_chan, 0, err @@ -166,10 +167,26 @@ func (self *HuntDispatcher) GetFlows( } } - collection_context, err := launcher.GetFlowDetails( - ctx, config_obj, client_id, flow_id) - if err != nil { - continue + var collection_context *api_proto.FlowDetails + + if options.BasicInformation { + collection_context = &api_proto.FlowDetails{ + Context: &flows_proto.ArtifactCollectorContext{ + ClientId: client_id, + SessionId: flow_id, + }, + } + + // If the user wants detailed flow information we need + // to fetch this now. For many uses this is not + // necessary so we can get away with very basic + // information. + } else { + collection_context, err = launcher.GetFlowDetails( + ctx, config_obj, client_id, flow_id) + if err != nil { + continue + } } select { diff --git a/services/hunt_dispatcher/hunt_dispatcher_test.go b/services/hunt_dispatcher/hunt_dispatcher_test.go index bfd8d52c3a1..0139be1b8fd 100644 --- a/services/hunt_dispatcher/hunt_dispatcher_test.go +++ b/services/hunt_dispatcher/hunt_dispatcher_test.go @@ -99,13 +99,22 @@ type: INTERNAL func (self *HuntDispatcherTestSuite) TestLoadingFromDisk() { // All hunts are now running. hunts := self.getAllHunts() - assert.Equal(self.T(), len(hunts), 5) - for _, h := range hunts { - assert.Equal(self.T(), h.State, api_proto.Hunt_RUNNING) - } + + vtesting.WaitUntil(5*time.Second, self.T(), func() bool { + if len(hunts) != 5 { + return false + } + for _, h := range hunts { + if h.State != api_proto.Hunt_RUNNING { + return false + } + } + return true + }) } func (self *HuntDispatcherTestSuite) TearDownTest() { + self.TestSuite.TearDownTest() if self.time_closer != nil { self.time_closer() } diff --git a/services/hunt_dispatcher/index.go b/services/hunt_dispatcher/index.go index 41ac3e95543..322f365db4b 100644 --- a/services/hunt_dispatcher/index.go +++ b/services/hunt_dispatcher/index.go @@ -3,6 +3,7 @@ package hunt_dispatcher import ( "context" "sort" + "sync/atomic" "time" "github.com/Velocidex/ordereddict" @@ -32,6 +33,10 @@ func (self *HuntStorageManagerImpl) FlushIndex( return nil } + if atomic.LoadInt64(&self.closed) > 0 { + return nil + } + // Debounce the flushing a bit so we dont overload the system for // fast events. Note that flushes occur periodically anyway so if // we skip a flush we will get it later. diff --git a/services/hunt_dispatcher/storage.go b/services/hunt_dispatcher/storage.go index 426627780f8..99e91becb7e 100644 --- a/services/hunt_dispatcher/storage.go +++ b/services/hunt_dispatcher/storage.go @@ -90,7 +90,8 @@ type HuntStorageManagerImpl struct { I_am_master bool // If any of the hunt objects are dirty this will be set. - dirty bool + dirty bool + closed int64 last_flush_time time.Time } @@ -122,6 +123,7 @@ func (self *HuntStorageManagerImpl) GetLastTimestamp() uint64 { func (self *HuntStorageManagerImpl) Close(ctx context.Context) { atomic.SwapUint64(&self.last_timestamp, 0) + atomic.SwapInt64(&self.closed, 1) self.FlushIndex(ctx) } @@ -211,6 +213,7 @@ func (self *HuntStorageManagerImpl) SetHunt( if hunt.State == api_proto.Hunt_ARCHIVED { delete(self.hunts, hunt.HuntId) + self.dirty = true return db.DeleteSubject(self.config_obj, hunt_path_manager.Path()) } @@ -267,6 +270,11 @@ func (self *HuntStorageManagerImpl) ListHunts( // Get the full record from memory cache hunt_obj, err := self.GetHunt(ctx, summary.HuntId) if err != nil { + // Something is wrong! The index is referring to a hunt we + // dont know about - we should re-flush to sync the index. + self.mu.Lock() + self.dirty = true + self.mu.Unlock() continue } diff --git a/services/launcher/delete.go b/services/launcher/delete.go index edaba8521b5..691715aba66 100644 --- a/services/launcher/delete.go +++ b/services/launcher/delete.go @@ -191,8 +191,8 @@ func (self *reporter) emit_ds( } self.seen[client_path] = true - id := self.id self.id++ + id := self.id self.pool.Submit(func() { self.mu.Lock() diff --git a/vql/readers/paged_reader_test.go b/vql/readers/paged_reader_test.go index 97d4be7e615..0fb50bbe0aa 100644 --- a/vql/readers/paged_reader_test.go +++ b/vql/readers/paged_reader_test.go @@ -1,6 +1,3 @@ -//go:build !windows -// +build !windows - package readers import ( @@ -80,7 +77,7 @@ func (self *TestSuite) TestPagedReader() { buff := make([]byte, 4) for i := 0; i < 10; i++ { - reader, err := NewPagedReader( + reader, err := NewAccessorReader( self.scope, "file", self.filenames[i], 100) assert.NoError(self.T(), err) _, err = reader.ReadAt(buff, 0) @@ -90,7 +87,7 @@ func (self *TestSuite) TestPagedReader() { } for i := 0; i < 10; i++ { - reader, err := NewPagedReader(self.scope, "file", self.filenames[i], 100) + reader, err := NewAccessorReader(self.scope, "file", self.filenames[i], 100) assert.NoError(self.T(), err) _, err = reader.ReadAt(buff, 0) assert.NoError(self.T(), err) @@ -99,7 +96,7 @@ func (self *TestSuite) TestPagedReader() { // Open the same reader 10 time returns from the cache. for i := 0; i < 10; i++ { - reader, err := NewPagedReader(self.scope, "file", self.filenames[1], 100) + reader, err := NewAccessorReader(self.scope, "file", self.filenames[1], 100) assert.NoError(self.T(), err) _, err = reader.ReadAt(buff, 0) @@ -110,7 +107,7 @@ func (self *TestSuite) TestPagedReader() { // Make sure that it is ok to close the reader at any time - // the next read will be valid. - reader, err := NewPagedReader(self.scope, "file", self.filenames[1], 100) + reader, err := NewAccessorReader(self.scope, "file", self.filenames[1], 100) assert.NoError(self.T(), err) for i := 0; i < 10; i++ { diff --git a/vql/server/downloads/downloads.go b/vql/server/downloads/downloads.go index e51cdd1c910..a490f1201c3 100644 --- a/vql/server/downloads/downloads.go +++ b/vql/server/downloads/downloads.go @@ -801,7 +801,7 @@ func createHuntDownloadFile( return } - options := result_sets.ResultSetOptions{} + options := services.FlowSearchOptions{BasicInformation: true} flow_chan, _, err := hunt_dispatcher.GetFlows(sub_ctx, config_obj, options, scope, hunt_id, 0) if err != nil { @@ -809,7 +809,6 @@ func createHuntDownloadFile( } for flow_details := range flow_chan { - if flow_details == nil || flow_details.Context == nil { continue } @@ -875,7 +874,7 @@ func generateCombinedResults( defer maybeClose(json_writer) defer maybeClose(csv_writer) - options := result_sets.ResultSetOptions{} + options := services.FlowSearchOptions{BasicInformation: true} flow_chan, _, err := hunt_dispatcher.GetFlows(ctx, config_obj, options, scope, hunt_details.HuntId, 0) if err != nil { diff --git a/vql/server/flows/flow_test.go b/vql/server/flows/flow_test.go index 9a407b56891..2c8cfd5912a 100644 --- a/vql/server/flows/flow_test.go +++ b/vql/server/flows/flow_test.go @@ -8,7 +8,7 @@ import ( "github.com/Velocidex/ordereddict" "github.com/alecthomas/assert" - "github.com/sebdah/goldie" + "github.com/sebdah/goldie/v2" "github.com/stretchr/testify/suite" "google.golang.org/protobuf/types/known/emptypb" actions_proto "www.velocidex.com/golang/velociraptor/actions/proto" @@ -107,7 +107,11 @@ func (self *FilestoreTestSuite) TestEnumerateFlow() { Set("flow_id", self.flow_id). Set("client_id", self.client_id))) - goldie.Assert(self.T(), "TestEnumerateFlow", json.MustMarshalIndent(result)) + g := goldie.New(self.T(), + goldie.WithFixtureDir("fixtures"), + goldie.WithDiffEngine(goldie.ClassicDiff)) + + g.Assert(self.T(), "TestEnumerateFlow", json.MustMarshalIndent(result)) } func TestFilestorePlugin(t *testing.T) { diff --git a/vql/server/flows/parallel.go b/vql/server/flows/parallel.go index f9d754984cf..e6b0f2addc1 100644 --- a/vql/server/flows/parallel.go +++ b/vql/server/flows/parallel.go @@ -8,7 +8,6 @@ import ( "github.com/Velocidex/ordereddict" "www.velocidex.com/golang/velociraptor/acls" config_proto "www.velocidex.com/golang/velociraptor/config/proto" - "www.velocidex.com/golang/velociraptor/result_sets" "www.velocidex.com/golang/velociraptor/services" "www.velocidex.com/golang/velociraptor/vql" vql_subsystem "www.velocidex.com/golang/velociraptor/vql" @@ -227,7 +226,7 @@ func breakHuntIntoScopes( return } - options := result_sets.ResultSetOptions{} + options := services.FlowSearchOptions{BasicInformation: true} flow_chan, _, err := hunt_dispatcher.GetFlows( ctx, config_obj, options, scope, arg.HuntId, 0) if err != nil { @@ -235,6 +234,10 @@ func breakHuntIntoScopes( } for flow_details := range flow_chan { + if flow_details == nil || flow_details.Context == nil { + continue + } + flow_job, err := breakIntoScopes(ctx, config_obj, scope, &ParallelPluginArgs{ Artifact: arg.Artifact, diff --git a/vql/server/flows/parallel_test.go b/vql/server/flows/parallel_test.go index 2517bb90e7b..e35ec344906 100644 --- a/vql/server/flows/parallel_test.go +++ b/vql/server/flows/parallel_test.go @@ -8,7 +8,7 @@ import ( "github.com/Velocidex/ordereddict" "github.com/alecthomas/assert" - "github.com/sebdah/goldie" + "github.com/sebdah/goldie/v2" "github.com/stretchr/testify/suite" api_proto "www.velocidex.com/golang/velociraptor/api/proto" "www.velocidex.com/golang/velociraptor/file_store" @@ -232,7 +232,12 @@ func (self *TestSuite) TestHuntsSource() { // Stable sort the section list so we can goldie it. sort.Strings(sections) - goldie.Assert(self.T(), "TestHuntsSource", json.MustMarshalIndent(sections)) + + g := goldie.New(self.T(), + goldie.WithFixtureDir("fixtures"), + goldie.WithDiffEngine(goldie.ClassicDiff)) + + g.Assert(self.T(), "TestHuntsSource", json.MustMarshalIndent(sections)) vql, err := vfilter.Parse(` SELECT * FROM parallelize( diff --git a/vql/server/flows/uploads.go b/vql/server/flows/uploads.go index fa0b474a711..449d4449715 100644 --- a/vql/server/flows/uploads.go +++ b/vql/server/flows/uploads.go @@ -123,7 +123,7 @@ func (self UploadsPlugins) Call( return } - options := result_sets.ResultSetOptions{} + options := services.FlowSearchOptions{BasicInformation: true} flow_chan, _, err := hunt_dispatcher.GetFlows( ctx, config_obj, options, scope, arg.HuntId, 0) if err != nil { @@ -132,6 +132,10 @@ func (self UploadsPlugins) Call( } for flow_details := range flow_chan { + if flow_details == nil || flow_details.Context == nil { + continue + } + client_id := flow_details.Context.ClientId flow_id := flow_details.Context.SessionId diff --git a/vql/server/hunts/delete.go b/vql/server/hunts/delete.go index ab3f1666ddb..c855c1dd0a5 100644 --- a/vql/server/hunts/delete.go +++ b/vql/server/hunts/delete.go @@ -6,7 +6,6 @@ import ( "github.com/Velocidex/ordereddict" "www.velocidex.com/golang/velociraptor/acls" api_proto "www.velocidex.com/golang/velociraptor/api/proto" - "www.velocidex.com/golang/velociraptor/result_sets" "www.velocidex.com/golang/velociraptor/services" "www.velocidex.com/golang/velociraptor/vql" vql_subsystem "www.velocidex.com/golang/velociraptor/vql" @@ -76,7 +75,7 @@ func (self DeleteHuntPlugin) Call(ctx context.Context, Set("hunt_id", arg.HuntId). Set("details", hunt_obj)) - options := result_sets.ResultSetOptions{} + options := services.FlowSearchOptions{BasicInformation: true} flow_chan, _, err := hunt_dispatcher.GetFlows( ctx, config_obj, options, scope, arg.HuntId, 0) if err != nil { @@ -85,6 +84,9 @@ func (self DeleteHuntPlugin) Call(ctx context.Context, } for flow_details := range flow_chan { + if flow_details == nil || flow_details.Context == nil { + continue + } results, err := launcher.Storage().DeleteFlow(ctx, config_obj, flow_details.Context.ClientId, diff --git a/vql/server/hunts/hunts.go b/vql/server/hunts/hunts.go index 7708de44b5e..4e88e5f42dc 100644 --- a/vql/server/hunts/hunts.go +++ b/vql/server/hunts/hunts.go @@ -246,7 +246,7 @@ func (self HuntResultsPlugin) Call( return } - options := result_sets.ResultSetOptions{} + options := services.FlowSearchOptions{BasicInformation: true} flow_chan, _, err := hunt_dispatcher.GetFlows( ctx, org_config_obj, options, scope, arg.HuntId, 0) if err != nil { @@ -322,9 +322,10 @@ func (self HuntResultsPlugin) Info(scope vfilter.Scope, type_map *vfilter.TypeMa } type HuntFlowsPluginArgs struct { - HuntId string `vfilter:"required,field=hunt_id,doc=The hunt id to inspect."` - StartRow int64 `vfilter:"optional,field=start_row,doc=The first row to show (used for paging)."` - Limit int64 `vfilter:"optional,field=limit,doc=Number of rows to show (used for paging)."` + HuntId string `vfilter:"required,field=hunt_id,doc=The hunt id to inspect."` + StartRow int64 `vfilter:"optional,field=start_row,doc=The first row to show (used for paging)."` + Limit int64 `vfilter:"optional,field=limit,doc=Number of rows to show (used for paging)."` + BasicInformation bool `vfilter:"optional,field=basic_info,doc=If specified we onlyh return basic information like flow id and client id."` } type HuntFlowsPlugin struct{} @@ -362,7 +363,10 @@ func (self HuntFlowsPlugin) Call( return } - options := result_sets.ResultSetOptions{} + // Get full information about all the flows in the hunt. + options := services.FlowSearchOptions{ + BasicInformation: arg.BasicInformation, + } flow_chan, _, err := hunt_dispatcher.GetFlows( ctx, config_obj, options, scope, arg.HuntId, int(arg.StartRow)) @@ -372,14 +376,13 @@ func (self HuntFlowsPlugin) Call( } for flow_details := range flow_chan { - - client_id := "" - flow_id := "" - if flow_details.Context != nil { - client_id = flow_details.Context.ClientId - flow_id = flow_details.Context.SessionId + if flow_details == nil || flow_details.Context == nil { + continue } + client_id := flow_details.Context.ClientId + flow_id := flow_details.Context.SessionId + result := ordereddict.NewDict(). Set("HuntId", arg.HuntId). Set("ClientId", client_id).