diff --git a/accessors/process/process_address_space_windows.go b/accessors/process/process_address_space_windows.go index cbcce17efe3..2e6d0f74924 100644 --- a/accessors/process/process_address_space_windows.go +++ b/accessors/process/process_address_space_windows.go @@ -269,6 +269,7 @@ func (self ProcessAccessor) New(scope vfilter.Scope) ( reader.mu.Unlock() } } + result.lru.Close() }) return result, nil } diff --git a/accessors/raw_registry/cache.go b/accessors/raw_registry/cache.go index 9430ddcc96a..9ba8f2ef7fe 100644 --- a/accessors/raw_registry/cache.go +++ b/accessors/raw_registry/cache.go @@ -61,6 +61,8 @@ func getRegFileSystemAccessorCache(scope vfilter.Scope) *RawRegFileSystemAccesso root_scope.AddDestructor(func() { cache.Close() + cache.lru.Close() + cache.readdir_lru.Close() }) vql_subsystem.CacheSet(root_scope, RAW_CACHE_TAG, cache) diff --git a/accessors/registry/cache.go b/accessors/registry/cache.go index 91d3211d377..b2ea4b748b8 100644 --- a/accessors/registry/cache.go +++ b/accessors/registry/cache.go @@ -116,6 +116,8 @@ func getRegFileSystemAccessorCache(scope vfilter.Scope) *RegFileSystemAccessorCa root_scope.AddDestructor(func() { cache.Close() + cache.lru.Close() + cache.readdir_lru.Close() }) vql_subsystem.CacheSet(root_scope, CACHE_TAG, cache) diff --git a/actions/events_test.go b/actions/events_test.go index 016b64017e3..71c361fbf73 100644 --- a/actions/events_test.go +++ b/actions/events_test.go @@ -15,6 +15,7 @@ import ( "www.velocidex.com/golang/velociraptor/actions" actions_proto "www.velocidex.com/golang/velociraptor/actions/proto" crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto" + "www.velocidex.com/golang/velociraptor/datastore" "www.velocidex.com/golang/velociraptor/file_store/test_utils" flows_proto "www.velocidex.com/golang/velociraptor/flows/proto" "www.velocidex.com/golang/velociraptor/responder" @@ -69,6 +70,10 @@ func (self *EventsTestSuite) SetupTest() { self.ConfigObj.Client.WritebackDarwin = self.writeback self.ConfigObj.Services.ClientMonitoring = true self.ConfigObj.Services.IndexServer = true + + datastore.SetGlobalDatastore(context.Background(), + self.ConfigObj.Datastore.Implementation, self.ConfigObj) + self.TestSuite.SetupTest() writeback_service := writeback.GetWritebackService() diff --git a/api/assets.go b/api/assets.go index 4e703bbce75..214f5479ec5 100644 --- a/api/assets.go +++ b/api/assets.go @@ -26,6 +26,7 @@ import ( "github.com/gorilla/csrf" "github.com/lpar/gzipped" + context "golang.org/x/net/context" "www.velocidex.com/golang/velociraptor/api/proto" utils "www.velocidex.com/golang/velociraptor/api/utils" config_proto "www.velocidex.com/golang/velociraptor/config/proto" @@ -34,11 +35,13 @@ import ( "www.velocidex.com/golang/velociraptor/services" ) -func install_static_assets(config_obj *config_proto.Config, mux *http.ServeMux) { +func install_static_assets( + ctx context.Context, + config_obj *config_proto.Config, mux *http.ServeMux) { base := utils.GetBasePath(config_obj) dir := utils.Join(base, "/app/") mux.Handle(dir, ipFilter(config_obj, http.StripPrefix( - dir, gzipped.FileServer(NewCachedFilesystem(gui_assets.HTTP))))) + dir, gzipped.FileServer(NewCachedFilesystem(ctx, gui_assets.HTTP))))) mux.Handle("/favicon.png", http.RedirectHandler(utils.Join(base, "/favicon.ico"), diff --git a/api/proxy.go b/api/proxy.go index 0686718f9cb..2001ab33e93 100644 --- a/api/proxy.go +++ b/api/proxy.go @@ -192,7 +192,7 @@ func PrepareGUIMux( downloadFileStore([]string{"clients"})))))) // Assets etc do not need auth. - install_static_assets(config_obj, mux) + install_static_assets(ctx, config_obj, mux) // Add reverse proxy support. err = AddProxyMux(config_obj, mux) diff --git a/api/static.go b/api/static.go index 5abc3338a53..a3b4cd67ed2 100644 --- a/api/static.go +++ b/api/static.go @@ -24,6 +24,7 @@ import ( errors "github.com/go-errors/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + context "golang.org/x/net/context" "www.velocidex.com/golang/velociraptor/services" "www.velocidex.com/golang/velociraptor/utils" ) @@ -145,7 +146,8 @@ func (self *CachedFilesystem) Exists(path string) bool { return true } -func NewCachedFilesystem(fs http.FileSystem) *CachedFilesystem { +func NewCachedFilesystem( + ctx context.Context, fs http.FileSystem) *CachedFilesystem { result := &CachedFilesystem{ FileSystem: fs, lru: ttlcache.NewCache(), @@ -153,6 +155,12 @@ func NewCachedFilesystem(fs http.FileSystem) *CachedFilesystem { result.lru.SetTTL(10 * time.Minute) result.lru.SkipTTLExtensionOnHit(true) + + go func() { + <-ctx.Done() + result.lru.Close() + }() + return result } diff --git a/crypto/client/client.go b/crypto/client/client.go index 876e7229af4..0fde0ae7c6a 100644 --- a/crypto/client/client.go +++ b/crypto/client/client.go @@ -1,6 +1,7 @@ package client import ( + "context" "crypto/rsa" "crypto/x509" @@ -54,7 +55,9 @@ func (self *ClientCryptoManager) AddCertificate( return server_name, nil } -func NewClientCryptoManager(config_obj *config_proto.Config, client_private_key_pem []byte) ( +func NewClientCryptoManager( + ctx context.Context, + config_obj *config_proto.Config, client_private_key_pem []byte) ( *ClientCryptoManager, error) { private_key, err := crypto_utils.ParseRsaPrivateKeyFromPemStr(client_private_key_pem) if err != nil { @@ -77,7 +80,7 @@ func NewClientCryptoManager(config_obj *config_proto.Config, client_private_key_ lru_size = config_obj.Frontend.Resources.ExpectedClients } - return &ClientCryptoManager{CryptoManager{ + result := &ClientCryptoManager{CryptoManager{ client_id: client_id, private_key: private_key, Resolver: NewInMemoryPublicKeyResolver(), @@ -85,5 +88,12 @@ func NewClientCryptoManager(config_obj *config_proto.Config, client_private_key_ unauthenticated_lru: ttlcache.NewCache(), caPool: roots, logger: logger, - }}, nil + }} + + go func() { + <-ctx.Done() + result.unauthenticated_lru.Close() + }() + + return result, nil } diff --git a/crypto/client/manager.go b/crypto/client/manager.go index 9d2ea520961..acb44368499 100644 --- a/crypto/client/manager.go +++ b/crypto/client/manager.go @@ -1,6 +1,7 @@ package client import ( + "context" "crypto" "crypto/aes" "crypto/cipher" @@ -85,7 +86,9 @@ func (self *CryptoManager) GetCSR() ([]byte, error) { Bytes: csrBytes}), nil } -func NewCryptoManager(config_obj *config_proto.Config, +func NewCryptoManager( + ctx context.Context, + config_obj *config_proto.Config, client_id string, private_key_pem []byte, public_key_resolver PublicKeyResolver, @@ -110,6 +113,11 @@ func NewCryptoManager(config_obj *config_proto.Config, result.unauthenticated_lru.SetTTL(time.Second * 60) result.unauthenticated_lru.SkipTTLExtensionOnHit(true) + go func() { + <-ctx.Done() + result.unauthenticated_lru.Close() + }() + return result, nil } diff --git a/crypto/crypto_test.go b/crypto/crypto_test.go index 67072059b32..0fdb0bfc6f2 100644 --- a/crypto/crypto_test.go +++ b/crypto/crypto_test.go @@ -70,7 +70,7 @@ func (self *TestSuite) SetupTest() { self.ConfigObj.Writeback.PrivateKey = string(key) // Configure the client manager. - self.client_manager, err = crypto_client.NewClientCryptoManager( + self.client_manager, err = crypto_client.NewClientCryptoManager(self.Ctx, self.ConfigObj, []byte(self.ConfigObj.Writeback.PrivateKey)) require.NoError(self.T(), err) diff --git a/crypto/server/manager.go b/crypto/server/manager.go index 6436dadd507..d0d60ed9819 100644 --- a/crypto/server/manager.go +++ b/crypto/server/manager.go @@ -88,7 +88,8 @@ func NewServerCryptoManager( return nil, err } - base, err := client.NewCryptoManager(config_obj, crypto_utils.GetSubjectName(cert), + base, err := client.NewCryptoManager(ctx, config_obj, + crypto_utils.GetSubjectName(cert), []byte(config_obj.Frontend.PrivateKey), resolver, logging.GetLogger(config_obj, &logging.FrontendComponent)) if err != nil { diff --git a/crypto/storage/storage_test.go b/crypto/storage/storage_test.go index 6816b51a523..c36b723d3ee 100644 --- a/crypto/storage/storage_test.go +++ b/crypto/storage/storage_test.go @@ -66,7 +66,7 @@ func (self *CrytpoStoreTestSuite) testWriting() { // Initial state no server connection. SetCurrentServerPem(nil) - fd, err := NewCryptoFileWriter(self.ConfigObj, 10000, output) + fd, err := NewCryptoFileWriter(self.Ctx, self.ConfigObj, 10000, output) assert.NoError(self.T(), err) defer fd.Close() diff --git a/crypto/storage/writer.go b/crypto/storage/writer.go index fb9746345ba..a4590015629 100644 --- a/crypto/storage/writer.go +++ b/crypto/storage/writer.go @@ -1,6 +1,7 @@ package storage import ( + "context" "crypto/rsa" "errors" "os" @@ -29,6 +30,7 @@ var ( type CryptoFileWriter struct { mu sync.Mutex + ctx context.Context config_obj *config_proto.Config fd *os.File header *Header @@ -78,7 +80,7 @@ func (self *CryptoFileWriter) serverPem() ([]byte, error) { // the crypto manager until we have contacted the server and fetched // its certificate. This code delays use of the crypto manager until // it becomes available. -func (self *CryptoFileWriter) cryptoManager() ( +func (self *CryptoFileWriter) cryptoManager(ctx context.Context) ( *crypto_client.ClientCryptoManager, error) { server_pem, err := self.serverPem() @@ -98,7 +100,7 @@ func (self *CryptoFileWriter) cryptoManager() ( return nil, err } - crypto_manager, err := crypto_client.NewClientCryptoManager( + crypto_manager, err := crypto_client.NewClientCryptoManager(ctx, self.config_obj, []byte(writeback.PrivateKey)) if err != nil { return nil, err @@ -168,7 +170,7 @@ func (self *CryptoFileWriter) Flush(keep_on_error KeepPolicy) error { nonce := self.config_obj.Client.Nonce - manager, err := self.cryptoManager() + manager, err := self.cryptoManager(self.ctx) if err != nil { return err } @@ -222,7 +224,7 @@ func (self *CryptoFileWriter) writeCerts() error { self.header.Next = pub_key.Next - manager, err := self.cryptoManager() + manager, err := self.cryptoManager(self.ctx) if err != nil { return err } @@ -250,6 +252,7 @@ func (self *CryptoFileWriter) writeCerts() error { } func NewCryptoFileWriter( + ctx context.Context, config_obj *config_proto.Config, max_size uint64, filename string) (*CryptoFileWriter, error) { @@ -279,6 +282,7 @@ func NewCryptoFileWriter( result := &CryptoFileWriter{ config_obj: config_obj, fd: fd, + ctx: ctx, header: &Header{}, max_size: max_size, } diff --git a/datastore/datastore.go b/datastore/datastore.go index 0a37566a9f7..85094122920 100644 --- a/datastore/datastore.go +++ b/datastore/datastore.go @@ -19,6 +19,7 @@ package datastore import ( + "context" "errors" "sync" "time" @@ -125,17 +126,20 @@ func GetDB(config_obj *config_proto.Config) (DataStore, error) { return nil, err } - return getImpl(config_obj, implementation) + ctx := context.Background() + return getImpl(ctx, config_obj, implementation) } -func getImpl(config_obj *config_proto.Config, implementation string) (DataStore, error) { +func getImpl( + ctx context.Context, config_obj *config_proto.Config, + implementation string) (DataStore, error) { switch implementation { case "FileBaseDataStore": return file_based_imp, nil case "ReadOnlyDataStore": if read_only_imp == nil { - read_only_imp = NewReadOnlyDataStore(config_obj) + read_only_imp = NewReadOnlyDataStore(ctx, config_obj) } return read_only_imp, nil @@ -144,7 +148,7 @@ func getImpl(config_obj *config_proto.Config, implementation string) (DataStore, case "Memcache": if memcache_imp == nil { - memcache_imp_ := NewMemcacheDataStore(config_obj) + memcache_imp_ := NewMemcacheDataStore(ctx, config_obj) memcache_imp = memcache_imp_ RegisterMemcacheDatastoreMetrics(memcache_imp_) } @@ -152,7 +156,7 @@ func getImpl(config_obj *config_proto.Config, implementation string) (DataStore, case "MemcacheFileDataStore": if memcache_file_imp == nil { - memcache_imp_ := NewMemcacheFileDataStore(config_obj) + memcache_imp_ := NewMemcacheFileDataStore(ctx, config_obj) memcache_file_imp = memcache_imp_ RegisterMemcacheDatastoreMetrics(memcache_imp_) } @@ -160,7 +164,7 @@ func getImpl(config_obj *config_proto.Config, implementation string) (DataStore, case "Test": if memcache_imp == nil { - memcache_imp = NewMemcacheDataStore(config_obj) + memcache_imp = NewMemcacheDataStore(ctx, config_obj) } return memcache_imp, nil @@ -171,12 +175,13 @@ func getImpl(config_obj *config_proto.Config, implementation string) (DataStore, } func SetGlobalDatastore( + ctx context.Context, implementation string, config_obj *config_proto.Config) (err error) { ds_mu.Lock() defer ds_mu.Unlock() - g_impl, err = getImpl(config_obj, implementation) + g_impl, err = getImpl(ctx, config_obj, implementation) return err } diff --git a/datastore/memcache.go b/datastore/memcache.go index 827783de80b..502cdd47ce8 100644 --- a/datastore/memcache.go +++ b/datastore/memcache.go @@ -1,6 +1,7 @@ package datastore import ( + "context" "errors" "fmt" "os" @@ -265,7 +266,7 @@ func (self *DirectoryLRUCache) Count() int { } func NewDirectoryLRUCache( - config_obj *config_proto.Config, + ctx context.Context, config_obj *config_proto.Config, max_size, max_item_size int) *DirectoryLRUCache { result := &DirectoryLRUCache{ @@ -276,6 +277,11 @@ func NewDirectoryLRUCache( max_item_size: max_item_size, } + go func() { + <-ctx.Done() + result.Cache.Close() + }() + result.Cache.SetCacheSizeLimit(max_size) return result } @@ -668,12 +674,13 @@ func (self *MemcacheDatastore) Stats() *MemcacheStats { } } -func NewMemcacheDataStore(config_obj *config_proto.Config) *MemcacheDatastore { +func NewMemcacheDataStore( + ctx context.Context, config_obj *config_proto.Config) *MemcacheDatastore { // This data store is used for testing so we really do not want to // expire anything. result := &MemcacheDatastore{ - data_cache: NewDataLRUCache(config_obj, 100000, 1000000), - dir_cache: NewDirectoryLRUCache(config_obj, 100000, 100000), + data_cache: NewDataLRUCache(ctx, config_obj, 100000, 1000000), + dir_cache: NewDirectoryLRUCache(ctx, config_obj, 100000, 100000), get_dir_metadata: get_dir_metadata, } diff --git a/datastore/memcache_data.go b/datastore/memcache_data.go index 16c186cffef..9594c6c0c5e 100644 --- a/datastore/memcache_data.go +++ b/datastore/memcache_data.go @@ -1,6 +1,7 @@ package datastore import ( + "context" "sync" "github.com/Velocidex/ttlcache/v2" @@ -56,7 +57,8 @@ func (self *DataLRUCache) Set(key string, value *BulkData) error { return self.Cache.Set(key, value) } -func NewDataLRUCache(config_obj *config_proto.Config, +func NewDataLRUCache( + ctx context.Context, config_obj *config_proto.Config, data_max_size, data_max_item_size int) *DataLRUCache { result := &DataLRUCache{ @@ -66,5 +68,10 @@ func NewDataLRUCache(config_obj *config_proto.Config, result.SetCacheSizeLimit(data_max_size) + go func() { + <-ctx.Done() + result.Cache.Close() + }() + return result } diff --git a/datastore/memcache_file.go b/datastore/memcache_file.go index 29c50aee5bd..350d694884d 100644 --- a/datastore/memcache_file.go +++ b/datastore/memcache_file.go @@ -638,7 +638,9 @@ func get_file_dir_metadata( return nil, errorNoDirectoryMetadata } -func NewMemcacheFileDataStore(config_obj *config_proto.Config) *MemcacheFileDataStore { +func NewMemcacheFileDataStore( + ctx context.Context, + config_obj *config_proto.Config) *MemcacheFileDataStore { data_max_size := 10000 if config_obj.Datastore != nil && config_obj.Datastore.MemcacheDatastoreMaxSize > 0 { @@ -657,9 +659,9 @@ func NewMemcacheFileDataStore(config_obj *config_proto.Config) *MemcacheFileData result := &MemcacheFileDataStore{ cache: &MemcacheDatastore{ - data_cache: NewDataLRUCache(config_obj, + data_cache: NewDataLRUCache(ctx, config_obj, data_max_size, data_max_item_size), - dir_cache: NewDirectoryLRUCache(config_obj, + dir_cache: NewDirectoryLRUCache(ctx, config_obj, data_max_size, dir_max_item_size), get_dir_metadata: get_file_dir_metadata, }, diff --git a/datastore/memcache_file_test.go b/datastore/memcache_file_test.go index 834cfa7b832..086d844dcd7 100644 --- a/datastore/memcache_file_test.go +++ b/datastore/memcache_file_test.go @@ -54,7 +54,7 @@ func (self *MemcacheFileTestSuite) SetupTest() { self.ctx, self.cancel = context.WithCancel(context.Background()) // Clear the cache between runs - db := datastore.NewMemcacheFileDataStore(self.config_obj) + db := datastore.NewMemcacheFileDataStore(self.ctx, self.config_obj) self.datastore = db db.Clear() @@ -118,7 +118,7 @@ func (self MemcacheFileTestSuite) TestDirectoryOverflow() { // Expire directories larger than 2 items. self.config_obj.Datastore.MemcacheDatastoreMaxDirSize = 4 - db := datastore.NewMemcacheFileDataStore(self.config_obj) + db := datastore.NewMemcacheFileDataStore(self.ctx, self.config_obj) db.StartWriter(self.ctx, &self.wg, self.config_obj) client_record := &api_proto.ClientMetadata{ diff --git a/datastore/memcache_test.go b/datastore/memcache_test.go index 0b8e60137f1..1736504753e 100644 --- a/datastore/memcache_test.go +++ b/datastore/memcache_test.go @@ -1,6 +1,7 @@ package datastore_test import ( + "context" "testing" "github.com/stretchr/testify/suite" @@ -20,8 +21,9 @@ func TestMemCacheDatastore(t *testing.T) { config_obj := config.GetDefaultConfig() config_obj.Datastore.Implementation = "Memcache" + ctx := context.Background() suite.Run(t, &MemcacheTestSuite{BaseTestSuite{ - datastore: datastore.NewMemcacheDataStore(config_obj), + datastore: datastore.NewMemcacheDataStore(ctx, config_obj), config_obj: config_obj, }}) } diff --git a/datastore/readonly.go b/datastore/readonly.go index 104082315bb..80b04453b29 100644 --- a/datastore/readonly.go +++ b/datastore/readonly.go @@ -4,6 +4,8 @@ package datastore import ( + "context" + "google.golang.org/protobuf/proto" config_proto "www.velocidex.com/golang/velociraptor/config/proto" "www.velocidex.com/golang/velociraptor/file_store/api" @@ -48,8 +50,10 @@ func (self *ReadOnlyDataStore) DeleteSubject( return self.cache.DeleteSubject(config_obj, urn) } -func NewReadOnlyDataStore(config_obj *config_proto.Config) *ReadOnlyDataStore { +func NewReadOnlyDataStore( + ctx context.Context, + config_obj *config_proto.Config) *ReadOnlyDataStore { return &ReadOnlyDataStore{&MemcacheFileDataStore{ - cache: NewMemcacheDataStore(config_obj), + cache: NewMemcacheDataStore(ctx, config_obj), }} } diff --git a/file_store/directory/directory_test.go b/file_store/directory/directory_test.go index 33e4331d748..141f07c6ce7 100644 --- a/file_store/directory/directory_test.go +++ b/file_store/directory/directory_test.go @@ -1,4 +1,4 @@ -package directory +package directory_test import ( "io/ioutil" @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/suite" "www.velocidex.com/golang/velociraptor/config" config_proto "www.velocidex.com/golang/velociraptor/config/proto" + "www.velocidex.com/golang/velociraptor/file_store/directory" "www.velocidex.com/golang/velociraptor/file_store/tests" ) @@ -16,7 +17,7 @@ type DirectoryTestSuite struct { *tests.FileStoreTestSuite config_obj *config_proto.Config - file_store *DirectoryFileStore + file_store *directory.DirectoryFileStore } func (self *DirectoryTestSuite) SetupTest() { @@ -34,7 +35,7 @@ func (self *DirectoryTestSuite) TearDownTest() { func TestDirectoryFileStore(t *testing.T) { config_obj := config.GetDefaultConfig() - file_store := NewDirectoryFileStore(config_obj) + file_store := directory.NewDirectoryFileStore(config_obj) suite.Run(t, &DirectoryTestSuite{ FileStoreTestSuite: tests.NewFileStoreTestSuite(config_obj, file_store), file_store: file_store, diff --git a/file_store/test_utils/testsuite.go b/file_store/test_utils/testsuite.go index 652527ec880..f91dd71601a 100644 --- a/file_store/test_utils/testsuite.go +++ b/file_store/test_utils/testsuite.go @@ -15,6 +15,7 @@ import ( artifacts_proto "www.velocidex.com/golang/velociraptor/artifacts/proto" "www.velocidex.com/golang/velociraptor/config" config_proto "www.velocidex.com/golang/velociraptor/config/proto" + "www.velocidex.com/golang/velociraptor/datastore" "www.velocidex.com/golang/velociraptor/file_store" "www.velocidex.com/golang/velociraptor/file_store/memory" "www.velocidex.com/golang/velociraptor/services" @@ -129,6 +130,9 @@ func (self *TestSuite) SetupTest() { self.ConfigObj = self.LoadConfig() } + datastore.SetGlobalDatastore(context.Background(), + self.ConfigObj.Datastore.Implementation, self.ConfigObj) + self.LoadArtifactsIntoConfig(definitions) // Start essential services. diff --git a/http_comms/comms.go b/http_comms/comms.go index 717c32c7e85..66e37113ca4 100644 --- a/http_comms/comms.go +++ b/http_comms/comms.go @@ -588,7 +588,7 @@ func (self *HTTPConnector) rekeyNextServer(ctx context.Context) error { } func (self *HTTPConnector) rekeyWithURL(ctx context.Context, url string) error { - req, err := http.NewRequest("GET", url+"server.pem", nil) + req, err := http.NewRequestWithContext(ctx, "GET", url+"server.pem", nil) if err != nil { return errors.Wrap(err, 0) } diff --git a/http_comms/e2e_test.go b/http_comms/e2e_test.go index 25f5d9e6b3b..f6fff0c96fa 100644 --- a/http_comms/e2e_test.go +++ b/http_comms/e2e_test.go @@ -130,7 +130,8 @@ func (self *TestSuite) makeServer( func (self *TestSuite) makeClient( client_ctx context.Context, client_wg *sync.WaitGroup) *http_comms.HTTPCommunicator { - manager, err := crypto_client.NewClientCryptoManager( + ctx := context.Background() + manager, err := crypto_client.NewClientCryptoManager(ctx, self.ConfigObj, []byte(self.ConfigObj.Writeback.PrivateKey)) assert.NoError(self.T(), err) diff --git a/http_comms/service.go b/http_comms/service.go index 101af7407ce..a435dbbf456 100644 --- a/http_comms/service.go +++ b/http_comms/service.go @@ -30,7 +30,7 @@ func StartHttpCommunicatorService( return nil, err } - crypto_manager, err := crypto_client.NewClientCryptoManager( + crypto_manager, err := crypto_client.NewClientCryptoManager(ctx, config_obj, []byte(writeback.PrivateKey)) if err != nil { return nil, err diff --git a/paths/paths_test.go b/paths/paths_test.go index 98bf38d923f..efb02d767bb 100644 --- a/paths/paths_test.go +++ b/paths/paths_test.go @@ -1,6 +1,7 @@ package paths_test import ( + "context" "strings" "testing" @@ -48,7 +49,7 @@ func (self *PathManagerTestSuite) TestAsClientPath() { // where the file will be created. Return this path - this includes // any file store escaping or path transformations. func (self *PathManagerTestSuite) getDatastorePath(path_spec api.DSPathSpec) string { - ds := datastore.NewMemcacheDataStore(self.config_obj) + ds := datastore.NewMemcacheDataStore(context.Background(), self.config_obj) data := &crypto_proto.VeloMessage{} ds.SetSubject(self.config_obj, path_spec, data) diff --git a/server/server_test.go b/server/server_test.go index d045592860c..90ca6e16239 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -111,7 +111,7 @@ func (self *ServerTestSuite) SetupTest() { self.server, err = server.NewServer(self.Sm.Ctx, self.ConfigObj, self.Sm.Wg) require.NoError(self.T(), err) - self.client_crypto, err = crypto_client.NewClientCryptoManager( + self.client_crypto, err = crypto_client.NewClientCryptoManager(self.Ctx, self.ConfigObj, []byte(self.ConfigObj.Writeback.PrivateKey)) require.NoError(self.T(), err) diff --git a/services/acl_manager/acl_manager.go b/services/acl_manager/acl_manager.go index a077c41c8d6..a769268046c 100644 --- a/services/acl_manager/acl_manager.go +++ b/services/acl_manager/acl_manager.go @@ -46,6 +46,11 @@ func NewACLManager( result.lru.SetTTL(timeout) result.lru.SkipTTLExtensionOnHit(true) + go func() { + <-ctx.Done() + result.lru.Close() + }() + backups, err := services.GetBackupService(config_obj) if err == nil { backups.Register(&ACLBackupProvider{ diff --git a/services/ddclient/noip.go b/services/ddclient/noip.go index a9c1a7df8cc..84773e33b81 100644 --- a/services/ddclient/noip.go +++ b/services/ddclient/noip.go @@ -78,7 +78,7 @@ func (self NoIPUpdater) UpdateDDNSRecord( }, } - req, err := http.NewRequest("GET", url, nil) + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return err } diff --git a/services/encrypted_logs/encrypted_logs.go b/services/encrypted_logs/encrypted_logs.go index 385f1cd23c9..d3d86528cae 100644 --- a/services/encrypted_logs/encrypted_logs.go +++ b/services/encrypted_logs/encrypted_logs.go @@ -66,7 +66,7 @@ func StartEncryptedLog( // logs are relatively fresh max_wait := time.Second * 10 - fd, err := storage.NewCryptoFileWriter(config_obj, max_size, filename) + fd, err := storage.NewCryptoFileWriter(ctx, config_obj, max_size, filename) if err != nil { logger.Error("StartEncryptedLog: %v", err) return nil diff --git a/services/frontend/frontend.go b/services/frontend/frontend.go index 9748cec98ad..b1b4a427ac1 100644 --- a/services/frontend/frontend.go +++ b/services/frontend/frontend.go @@ -334,7 +334,7 @@ func (self *MasterFrontendManager) Start(ctx context.Context, wg *sync.WaitGroup return err } - err = datastore.SetGlobalDatastore(implementation, config_obj) + err = datastore.SetGlobalDatastore(ctx, implementation, config_obj) if err != nil { return err } @@ -418,7 +418,7 @@ func (self *MinionFrontendManager) Start(ctx context.Context, wg *sync.WaitGroup return err } - err = datastore.SetGlobalDatastore(implementation, config_obj) + err = datastore.SetGlobalDatastore(ctx, implementation, config_obj) if err != nil { return err } diff --git a/services/labels/labels.go b/services/labels/labels.go index 7ded67a0784..1e5b8829872 100644 --- a/services/labels/labels.go +++ b/services/labels/labels.go @@ -354,6 +354,11 @@ func (self *Labeler) Start(ctx context.Context, return nil }) + go func() { + <-ctx.Done() + self.lru.Close() + }() + journal, err := services.GetJournal(config_obj) if err != nil { return err diff --git a/services/secrets/secrets.go b/services/secrets/secrets.go index bc25a5094f5..d4a6d336a36 100644 --- a/services/secrets/secrets.go +++ b/services/secrets/secrets.go @@ -406,5 +406,11 @@ func NewSecretsService( result.secrets_lru.SetTTL(time.Minute) result.secrets_lru.SkipTTLExtensionOnHit(true) + go func() { + <-ctx.Done() + result.definitions_lru.Close() + result.secrets_lru.Close() + }() + return result, nil } diff --git a/services/users/storage.go b/services/users/storage.go index 6ce87717437..64e912b579b 100644 --- a/services/users/storage.go +++ b/services/users/storage.go @@ -544,6 +544,11 @@ func NewUserStorageManager( result.lru.SetTTL(time.Minute) result.lru.SkipTTLExtensionOnHit(true) + go func() { + <-ctx.Done() + result.lru.Close() + }() + // Get initial mapping between lower case usernames and correct usernames err := result.buildUsernameLookup(ctx) if err != nil { diff --git a/vql/server/crypto/writer.go b/vql/server/crypto/writer.go index 13000377aad..7e25cd30e7c 100644 --- a/vql/server/crypto/writer.go +++ b/vql/server/crypto/writer.go @@ -73,7 +73,7 @@ func (self WriteCryptFilePlugin) Call( arg.MaxSize = 1024 * 1024 * 1024 * 1024 // 1Gb } - fd, err := storage.NewCryptoFileWriter(&config_proto.Config{ + fd, err := storage.NewCryptoFileWriter(ctx, &config_proto.Config{ Client: config_obj, }, arg.MaxSize, arg.Filename.String()) if err != nil { diff --git a/vql/tools/webdav_upload.go b/vql/tools/webdav_upload.go index dfbd98380a4..e257603dedf 100644 --- a/vql/tools/webdav_upload.go +++ b/vql/tools/webdav_upload.go @@ -147,7 +147,8 @@ func upload_webdav(ctx context.Context, scope vfilter.Scope, } } - req, err := http.NewRequest(http.MethodPut, parsedUrl.String(), reader) + req, err := http.NewRequestWithContext(ctx, + http.MethodPut, parsedUrl.String(), reader) if err != nil { return &uploads.UploadResponse{ Error: err.Error(),