From 1bcec1823c7cf0b334b87f9d0e822392f221c871 Mon Sep 17 00:00:00 2001 From: Mike Cohen Date: Thu, 4 Nov 2021 03:00:08 +1000 Subject: [PATCH] Added a gRPC based remote datastore (#1366) This allows minions to connect to the memcache data store of the master. --- acls/acls.go | 10 ++ acls/proto/acl.pb.go | 35 ++++-- acls/proto/acl.proto | 1 + api/datastore.go | 132 ++++++++++++++++++++ api/datastore_test.go | 132 ++++++++++++++++++++ api/fixtures/TestDatastore.golden | 18 +++ api/proto/api.pb.gw.go | 28 ++--- api/proto/datastore.pb.go | 51 +++++--- api/proto/datastore.proto | 2 + bin/frontend.go | 8 ++ datastore/datastore.go | 9 ++ datastore/filebased.go | 16 +++ datastore/memcache.go | 66 ++++++++-- datastore/memcache_file.go | 52 ++++++++ datastore/remote.go | 197 ++++++++++++++++++++++++++++++ file_store/file_store.go | 4 +- services/journal/replication.go | 4 +- vql/functions/log.go | 49 ++++++-- vql/parsers/sql.go | 7 +- vtesting/assert/wrapper.go | 10 +- 20 files changed, 763 insertions(+), 68 deletions(-) create mode 100644 api/datastore.go create mode 100644 api/datastore_test.go create mode 100644 api/fixtures/TestDatastore.golden create mode 100644 datastore/remote.go diff --git a/acls/acls.go b/acls/acls.go index 820b46ccf40..08951e9ec8f 100644 --- a/acls/acls.go +++ b/acls/acls.go @@ -110,6 +110,9 @@ const ( // Allowed to create zip files. PREPARE_RESULTS + // Allowed raw datastore access + DATASTORE_ACCESS + // When adding new permission - update CheckAccess, // GetRolePermissions and acl.proto ) @@ -150,6 +153,8 @@ func (self ACL_PERMISSION) String() string { return "MACHINE_STATE" case PREPARE_RESULTS: return "PREPARE_RESULTS" + case DATASTORE_ACCESS: + return "DATASTORE_ACCESS" } return fmt.Sprintf("%d", self) @@ -192,6 +197,8 @@ func GetPermission(name string) ACL_PERMISSION { return MACHINE_STATE case "PREPARE_RESULTS": return PREPARE_RESULTS + case "DATASTORE_ACCESS": + return DATASTORE_ACCESS } return NO_PERMISSIONS @@ -345,6 +352,9 @@ func CheckAccessWithToken( case PREPARE_RESULTS: return token.PrepareResults, nil + case DATASTORE_ACCESS: + return token.DatastoreAccess, nil + } return false, nil diff --git a/acls/proto/acl.pb.go b/acls/proto/acl.pb.go index c2f0bd7ab76..ad2592bca7b 100644 --- a/acls/proto/acl.pb.go +++ b/acls/proto/acl.pb.go @@ -42,6 +42,7 @@ type ApiClientACL struct { FilesystemWrite bool `protobuf:"varint,14,opt,name=filesystem_write,json=filesystemWrite,proto3" json:"filesystem_write,omitempty"` MachineState bool `protobuf:"varint,16,opt,name=machine_state,json=machineState,proto3" json:"machine_state,omitempty"` PrepareResults bool `protobuf:"varint,17,opt,name=prepare_results,json=prepareResults,proto3" json:"prepare_results,omitempty"` + DatastoreAccess bool `protobuf:"varint,18,opt,name=datastore_access,json=datastoreAccess,proto3" json:"datastore_access,omitempty"` // A list of roles in lieu of the permissions above. These will be // interpolated into this ACL object. Roles []string `protobuf:"bytes,9,rep,name=roles,proto3" json:"roles,omitempty"` @@ -191,6 +192,13 @@ func (x *ApiClientACL) GetPrepareResults() bool { return false } +func (x *ApiClientACL) GetDatastoreAccess() bool { + if x != nil { + return x.DatastoreAccess + } + return false +} + func (x *ApiClientACL) GetRoles() []string { if x != nil { return x.Roles @@ -260,7 +268,7 @@ var File_acl_proto protoreflect.FileDescriptor var file_acl_proto_rawDesc = []byte{ 0x0a, 0x09, 0x61, 0x63, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x14, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x65, 0x6d, 0x61, 0x6e, 0x74, - 0x69, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xe3, 0x05, 0x0a, 0x0c, 0x41, 0x70, 0x69, + 0x69, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x8e, 0x06, 0x0a, 0x0c, 0x41, 0x70, 0x69, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x41, 0x43, 0x4c, 0x12, 0x4b, 0x0a, 0x09, 0x61, 0x6c, 0x6c, 0x5f, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x42, 0x2e, 0xe2, 0xfc, 0xe3, 0xc4, 0x01, 0x28, 0x12, 0x26, 0x50, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x73, 0x20, 0x61, @@ -305,17 +313,20 @@ var file_acl_proto_rawDesc = []byte{ 0x08, 0x52, 0x0c, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x27, 0x0a, 0x0f, 0x70, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x5f, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x11, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0e, 0x70, 0x72, 0x65, 0x70, 0x61, 0x72, - 0x65, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x6f, 0x6c, 0x65, - 0x73, 0x18, 0x09, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x72, 0x6f, 0x6c, 0x65, 0x73, 0x22, 0x51, - 0x0a, 0x04, 0x52, 0x6f, 0x6c, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x35, 0x0a, 0x0b, 0x70, 0x65, - 0x72, 0x6d, 0x69, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x13, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x41, 0x70, 0x69, 0x43, 0x6c, 0x69, 0x65, 0x6e, - 0x74, 0x41, 0x43, 0x4c, 0x52, 0x0b, 0x70, 0x65, 0x72, 0x6d, 0x69, 0x73, 0x73, 0x69, 0x6f, 0x6e, - 0x73, 0x42, 0x32, 0x5a, 0x30, 0x77, 0x77, 0x77, 0x2e, 0x76, 0x65, 0x6c, 0x6f, 0x63, 0x69, 0x64, - 0x65, 0x78, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x6c, 0x61, 0x6e, 0x67, 0x2f, 0x76, 0x65, - 0x6c, 0x6f, 0x63, 0x69, 0x72, 0x61, 0x70, 0x74, 0x6f, 0x72, 0x2f, 0x61, 0x63, 0x6c, 0x73, 0x2f, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x12, 0x29, 0x0a, 0x10, 0x64, 0x61, 0x74, 0x61, + 0x73, 0x74, 0x6f, 0x72, 0x65, 0x5f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x12, 0x20, 0x01, + 0x28, 0x08, 0x52, 0x0f, 0x64, 0x61, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x41, 0x63, 0x63, + 0x65, 0x73, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x6f, 0x6c, 0x65, 0x73, 0x18, 0x09, 0x20, 0x03, + 0x28, 0x09, 0x52, 0x05, 0x72, 0x6f, 0x6c, 0x65, 0x73, 0x22, 0x51, 0x0a, 0x04, 0x52, 0x6f, 0x6c, + 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x35, 0x0a, 0x0b, 0x70, 0x65, 0x72, 0x6d, 0x69, 0x73, 0x73, + 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2e, 0x41, 0x70, 0x69, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x41, 0x43, 0x4c, 0x52, + 0x0b, 0x70, 0x65, 0x72, 0x6d, 0x69, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x42, 0x32, 0x5a, 0x30, + 0x77, 0x77, 0x77, 0x2e, 0x76, 0x65, 0x6c, 0x6f, 0x63, 0x69, 0x64, 0x65, 0x78, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x67, 0x6f, 0x6c, 0x61, 0x6e, 0x67, 0x2f, 0x76, 0x65, 0x6c, 0x6f, 0x63, 0x69, 0x72, + 0x61, 0x70, 0x74, 0x6f, 0x72, 0x2f, 0x61, 0x63, 0x6c, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/acls/proto/acl.proto b/acls/proto/acl.proto index a5f3c46e27b..823b7ce798e 100644 --- a/acls/proto/acl.proto +++ b/acls/proto/acl.proto @@ -33,6 +33,7 @@ message ApiClientACL { bool filesystem_write = 14; bool machine_state = 16; bool prepare_results = 17; + bool datastore_access = 18; // A list of roles in lieu of the permissions above. These will be // interpolated into this ACL object. diff --git a/api/datastore.go b/api/datastore.go new file mode 100644 index 00000000000..466c0dec7b1 --- /dev/null +++ b/api/datastore.go @@ -0,0 +1,132 @@ +package api + +import ( + "github.com/golang/protobuf/ptypes/empty" + context "golang.org/x/net/context" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "www.velocidex.com/golang/velociraptor/acls" + api_proto "www.velocidex.com/golang/velociraptor/api/proto" + "www.velocidex.com/golang/velociraptor/datastore" + "www.velocidex.com/golang/velociraptor/file_store/api" + "www.velocidex.com/golang/velociraptor/file_store/path_specs" +) + +// Raw Datastore access requires the DATASTORE_ACCESS permission. This +// is not provided by any role so the only callers allowed are +// server-server gRPC calls (e.g. minion -> master) +func (self *ApiServer) GetSubject( + ctx context.Context, + in *api_proto.DataRequest) (*api_proto.DataResponse, error) { + + user_name := GetGRPCUserInfo(self.config, ctx, self.ca_pool).Name + perm, err := acls.CheckAccess(self.config, user_name, acls.DATASTORE_ACCESS) + if !perm || err != nil { + return nil, status.Error(codes.PermissionDenied, + "User is not allowed to access datastore.") + } + + db, err := datastore.GetDB(self.config) + if err != nil { + return nil, err + } + + raw_db, ok := db.(datastore.RawDataStore) + if !ok { + return nil, status.Error(codes.Internal, + "Datastore has no raw access.") + } + + data, err := raw_db.GetBuffer(self.config, getURN(in)) + return &api_proto.DataResponse{ + Data: data, + }, err +} + +func (self *ApiServer) SetSubject( + ctx context.Context, + in *api_proto.DataRequest) (*api_proto.DataResponse, error) { + + user_name := GetGRPCUserInfo(self.config, ctx, self.ca_pool).Name + perm, err := acls.CheckAccess(self.config, user_name, acls.DATASTORE_ACCESS) + if !perm || err != nil { + return nil, status.Error(codes.PermissionDenied, + "User is not allowed to access datastore.") + } + + db, err := datastore.GetDB(self.config) + if err != nil { + return nil, err + } + + raw_db, ok := db.(datastore.RawDataStore) + if !ok { + return nil, status.Error(codes.Internal, + "Datastore has no raw access.") + } + + err = raw_db.SetBuffer(self.config, getURN(in), in.Data) + return &api_proto.DataResponse{}, err +} + +func (self *ApiServer) ListChildren( + ctx context.Context, + in *api_proto.DataRequest) (*api_proto.ListChildrenResponse, error) { + + user_name := GetGRPCUserInfo(self.config, ctx, self.ca_pool).Name + perm, err := acls.CheckAccess(self.config, user_name, acls.DATASTORE_ACCESS) + if !perm || err != nil { + return nil, status.Error(codes.PermissionDenied, + "User is not allowed to access datastore.") + } + + db, err := datastore.GetDB(self.config) + if err != nil { + return nil, err + } + + children, err := db.ListChildren(self.config, getURN(in)) + if err != nil { + return nil, err + } + + result := &api_proto.ListChildrenResponse{} + for _, child := range children { + result.Children = append(result.Children, &api_proto.DSPathSpec{ + Components: child.Components(), + PathType: int64(child.Type()), + IsDir: child.IsDir(), + }) + } + + return result, nil +} + +func (self *ApiServer) DeleteSubject( + ctx context.Context, + in *api_proto.DataRequest) (*empty.Empty, error) { + + user_name := GetGRPCUserInfo(self.config, ctx, self.ca_pool).Name + perm, err := acls.CheckAccess(self.config, user_name, acls.DATASTORE_ACCESS) + if !perm || err != nil { + return nil, status.Error(codes.PermissionDenied, + "User is not allowed to access datastore.") + } + + db, err := datastore.GetDB(self.config) + if err != nil { + return nil, err + } + + return &empty.Empty{}, db.DeleteSubject(self.config, getURN(in)) +} + +func getURN(in *api_proto.DataRequest) api.DSPathSpec { + path_spec := in.Pathspec + if path_spec == nil { + path_spec = &api_proto.DSPathSpec{} + } + + return path_specs.NewUnsafeDatastorePath( + path_spec.Components...).SetType(api.PathType(path_spec.PathType)) +} diff --git a/api/datastore_test.go b/api/datastore_test.go new file mode 100644 index 00000000000..2a03ec3d3a0 --- /dev/null +++ b/api/datastore_test.go @@ -0,0 +1,132 @@ +package api + +import ( + "testing" + + "github.com/sebdah/goldie" + "github.com/stretchr/testify/suite" + "google.golang.org/protobuf/proto" + api_proto "www.velocidex.com/golang/velociraptor/api/proto" + config_proto "www.velocidex.com/golang/velociraptor/config/proto" + "www.velocidex.com/golang/velociraptor/datastore" + "www.velocidex.com/golang/velociraptor/file_store/path_specs" + "www.velocidex.com/golang/velociraptor/file_store/test_utils" + "www.velocidex.com/golang/velociraptor/grpc_client" + "www.velocidex.com/golang/velociraptor/json" + "www.velocidex.com/golang/velociraptor/vtesting/assert" +) + +type DatastoreAPITest struct { + test_utils.TestSuite + + client_config *config_proto.Config +} + +func (self *DatastoreAPITest) SetupTest() { + self.TestSuite.SetupTest() + + // Now bring up an API server. + self.ConfigObj.Frontend.ServerServices = &config_proto.ServerServicesConfig{} + self.ConfigObj.API.BindPort = 8101 + + server_builder, err := NewServerBuilder( + self.Sm.Ctx, self.ConfigObj, self.Sm.Wg) + assert.NoError(self.T(), err) + + err = server_builder.WithAPIServer(self.Sm.Ctx, self.Sm.Wg) + assert.NoError(self.T(), err) + +} + +func (self *DatastoreAPITest) TestDatastore() { + db, err := datastore.GetDB(self.ConfigObj) + assert.NoError(self.T(), err) + + path_spec := path_specs.NewUnsafeDatastorePath("A", "B", "C") + sample := &api_proto.AgentInformation{Name: "Velociraptor"} + assert.NoError(self.T(), + db.SetSubject(self.ConfigObj, path_spec, sample)) + + // Make some RPC calls + conn, closer, err := grpc_client.Factory.GetAPIClient( + self.Sm.Ctx, self.ConfigObj) + assert.NoError(self.T(), err) + defer closer() + + res, err := conn.GetSubject(self.Sm.Ctx, &api_proto.DataRequest{ + Pathspec: &api_proto.DSPathSpec{ + Components: path_spec.Components(), + }}) + assert.NoError(self.T(), err) + assert.Equal(self.T(), res.Data, []byte("{\"name\":\"Velociraptor\"}")) + + // Now set data through gRPC and read it using the standard + // datastore. + path_spec2 := path_specs.NewUnsafeDatastorePath("A", "B", "D") + _, err = conn.SetSubject(self.Sm.Ctx, &api_proto.DataRequest{ + Data: []byte("{\"name\":\"Another Name\"}"), + Pathspec: &api_proto.DSPathSpec{ + Components: path_spec2.Components(), + }}) + assert.NoError(self.T(), err) + + assert.NoError(self.T(), + db.GetSubject(self.ConfigObj, path_spec2, sample)) + assert.Equal(self.T(), sample.Name, "Another Name") + + // Now list the children + res2, err := conn.ListChildren(self.Sm.Ctx, &api_proto.DataRequest{ + Pathspec: &api_proto.DSPathSpec{ + Components: path_spec.Dir().Components(), + }}) + assert.NoError(self.T(), err) + goldie.Assert(self.T(), "TestDatastore", json.MustMarshalIndent(res2)) +} + +func (self *DatastoreAPITest) TestRemoteDatastore() { + config_obj := proto.Clone(self.ConfigObj).(*config_proto.Config) + config_obj.Datastore.Implementation = "RemoteFileDataStore" + + db, err := datastore.GetDB(config_obj) + assert.NoError(self.T(), err) + + path_spec := path_specs.NewUnsafeDatastorePath("A", "B", "C") + sample := &api_proto.AgentInformation{Name: "Velociraptor"} + assert.NoError(self.T(), + db.SetSubject(config_obj, path_spec, sample)) + + sample2 := &api_proto.AgentInformation{} + assert.NoError(self.T(), + db.GetSubject(config_obj, path_spec, sample2)) + + assert.Equal(self.T(), sample, sample2) + + // Test ListDirectory + path_spec2 := path_specs.NewUnsafeDatastorePath("A", "B", "D") + assert.NoError(self.T(), + db.SetSubject(config_obj, path_spec2, sample)) + + children, err := db.ListChildren(config_obj, path_spec.Dir()) + assert.NoError(self.T(), err) + assert.Equal(self.T(), 2, len(children)) + assert.Equal(self.T(), path_spec, children[0]) + assert.Equal(self.T(), path_spec2, children[1]) + + // Now delete one + assert.NoError(self.T(), + db.DeleteSubject(config_obj, path_spec)) + + children, err = db.ListChildren(config_obj, path_spec.Dir()) + assert.NoError(self.T(), err) + assert.Equal(self.T(), 1, len(children)) + assert.Equal(self.T(), path_spec2, children[0]) + + children, err = db.ListChildren(config_obj, path_spec.Dir().Dir()) + assert.NoError(self.T(), err) + assert.Equal(self.T(), 1, len(children)) + assert.True(self.T(), children[0].IsDir()) +} + +func TestAPIDatastore(t *testing.T) { + suite.Run(t, &DatastoreAPITest{}) +} diff --git a/api/fixtures/TestDatastore.golden b/api/fixtures/TestDatastore.golden new file mode 100644 index 00000000000..9c440a57a2a --- /dev/null +++ b/api/fixtures/TestDatastore.golden @@ -0,0 +1,18 @@ +{ + "children": [ + { + "components": [ + "A", + "B", + "C" + ] + }, + { + "components": [ + "A", + "B", + "D" + ] + } + ] +} \ No newline at end of file diff --git a/api/proto/api.pb.gw.go b/api/proto/api.pb.gw.go index b6361c267d2..b27850128d1 100644 --- a/api/proto/api.pb.gw.go +++ b/api/proto/api.pb.gw.go @@ -22,8 +22,8 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" - proto_3 "www.velocidex.com/golang/velociraptor/artifacts/proto" - proto_0 "www.velocidex.com/golang/velociraptor/flows/proto" + proto_6 "www.velocidex.com/golang/velociraptor/artifacts/proto" + proto_2 "www.velocidex.com/golang/velociraptor/flows/proto" ) // Suppress "imported and not used" errors @@ -983,7 +983,7 @@ func local_request_API_GetTable_0(ctx context.Context, marshaler runtime.Marshal } func request_API_CollectArtifact_0(ctx context.Context, marshaler runtime.Marshaler, client APIClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_0.ArtifactCollectorArgs + var protoReq proto_2.ArtifactCollectorArgs var metadata runtime.ServerMetadata newReader, berr := utilities.IOReaderFactory(req.Body) @@ -1000,7 +1000,7 @@ func request_API_CollectArtifact_0(ctx context.Context, marshaler runtime.Marsha } func local_request_API_CollectArtifact_0(ctx context.Context, marshaler runtime.Marshaler, server APIServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_0.ArtifactCollectorArgs + var protoReq proto_2.ArtifactCollectorArgs var metadata runtime.ServerMetadata newReader, berr := utilities.IOReaderFactory(req.Body) @@ -1317,7 +1317,7 @@ var ( ) func request_API_GetToolInfo_0(ctx context.Context, marshaler runtime.Marshaler, client APIClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_3.Tool + var protoReq proto_6.Tool var metadata runtime.ServerMetadata if err := req.ParseForm(); err != nil { @@ -1333,7 +1333,7 @@ func request_API_GetToolInfo_0(ctx context.Context, marshaler runtime.Marshaler, } func local_request_API_GetToolInfo_0(ctx context.Context, marshaler runtime.Marshaler, server APIServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_3.Tool + var protoReq proto_6.Tool var metadata runtime.ServerMetadata if err := req.ParseForm(); err != nil { @@ -1349,7 +1349,7 @@ func local_request_API_GetToolInfo_0(ctx context.Context, marshaler runtime.Mars } func request_API_SetToolInfo_0(ctx context.Context, marshaler runtime.Marshaler, client APIClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_3.Tool + var protoReq proto_6.Tool var metadata runtime.ServerMetadata newReader, berr := utilities.IOReaderFactory(req.Body) @@ -1366,7 +1366,7 @@ func request_API_SetToolInfo_0(ctx context.Context, marshaler runtime.Marshaler, } func local_request_API_SetToolInfo_0(ctx context.Context, marshaler runtime.Marshaler, server APIServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_3.Tool + var protoReq proto_6.Tool var metadata runtime.ServerMetadata newReader, berr := utilities.IOReaderFactory(req.Body) @@ -1435,7 +1435,7 @@ func local_request_API_GetServerMonitoringState_0(ctx context.Context, marshaler } func request_API_SetServerMonitoringState_0(ctx context.Context, marshaler runtime.Marshaler, client APIClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_0.ArtifactCollectorArgs + var protoReq proto_2.ArtifactCollectorArgs var metadata runtime.ServerMetadata newReader, berr := utilities.IOReaderFactory(req.Body) @@ -1452,7 +1452,7 @@ func request_API_SetServerMonitoringState_0(ctx context.Context, marshaler runti } func local_request_API_SetServerMonitoringState_0(ctx context.Context, marshaler runtime.Marshaler, server APIServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_0.ArtifactCollectorArgs + var protoReq proto_2.ArtifactCollectorArgs var metadata runtime.ServerMetadata newReader, berr := utilities.IOReaderFactory(req.Body) @@ -1473,7 +1473,7 @@ var ( ) func request_API_GetClientMonitoringState_0(ctx context.Context, marshaler runtime.Marshaler, client APIClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_0.GetClientMonitoringStateRequest + var protoReq proto_2.GetClientMonitoringStateRequest var metadata runtime.ServerMetadata if err := req.ParseForm(); err != nil { @@ -1489,7 +1489,7 @@ func request_API_GetClientMonitoringState_0(ctx context.Context, marshaler runti } func local_request_API_GetClientMonitoringState_0(ctx context.Context, marshaler runtime.Marshaler, server APIServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_0.GetClientMonitoringStateRequest + var protoReq proto_2.GetClientMonitoringStateRequest var metadata runtime.ServerMetadata if err := req.ParseForm(); err != nil { @@ -1505,7 +1505,7 @@ func local_request_API_GetClientMonitoringState_0(ctx context.Context, marshaler } func request_API_SetClientMonitoringState_0(ctx context.Context, marshaler runtime.Marshaler, client APIClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_0.ClientEventTable + var protoReq proto_2.ClientEventTable var metadata runtime.ServerMetadata newReader, berr := utilities.IOReaderFactory(req.Body) @@ -1522,7 +1522,7 @@ func request_API_SetClientMonitoringState_0(ctx context.Context, marshaler runti } func local_request_API_SetClientMonitoringState_0(ctx context.Context, marshaler runtime.Marshaler, server APIServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_0.ClientEventTable + var protoReq proto_2.ClientEventTable var metadata runtime.ServerMetadata newReader, berr := utilities.IOReaderFactory(req.Body) diff --git a/api/proto/datastore.pb.go b/api/proto/datastore.pb.go index 583bed656e5..3b07536585d 100644 --- a/api/proto/datastore.pb.go +++ b/api/proto/datastore.pb.go @@ -27,6 +27,7 @@ type DSPathSpec struct { Components []string `protobuf:"bytes,1,rep,name=components,proto3" json:"components,omitempty"` PathType int64 `protobuf:"varint,2,opt,name=path_type,json=pathType,proto3" json:"path_type,omitempty"` + IsDir bool `protobuf:"varint,3,opt,name=is_dir,json=isDir,proto3" json:"is_dir,omitempty"` } func (x *DSPathSpec) Reset() { @@ -75,12 +76,20 @@ func (x *DSPathSpec) GetPathType() int64 { return 0 } +func (x *DSPathSpec) GetIsDir() bool { + if x != nil { + return x.IsDir + } + return false +} + type DataRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields Pathspec *DSPathSpec `protobuf:"bytes,1,opt,name=pathspec,proto3" json:"pathspec,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` } func (x *DataRequest) Reset() { @@ -122,6 +131,13 @@ func (x *DataRequest) GetPathspec() *DSPathSpec { return nil } +func (x *DataRequest) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + type DataResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -220,26 +236,29 @@ var File_datastore_proto protoreflect.FileDescriptor var file_datastore_proto_rawDesc = []byte{ 0x0a, 0x0f, 0x64, 0x61, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x12, 0x05, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x49, 0x0a, 0x0a, 0x44, 0x53, 0x50, 0x61, + 0x6f, 0x12, 0x05, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x60, 0x0a, 0x0a, 0x44, 0x53, 0x50, 0x61, 0x74, 0x68, 0x53, 0x70, 0x65, 0x63, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x70, 0x61, 0x74, 0x68, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x70, 0x61, 0x74, 0x68, 0x54, - 0x79, 0x70, 0x65, 0x22, 0x3c, 0x0a, 0x0b, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x2d, 0x0a, 0x08, 0x70, 0x61, 0x74, 0x68, 0x73, 0x70, 0x65, 0x63, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x44, 0x53, 0x50, - 0x61, 0x74, 0x68, 0x53, 0x70, 0x65, 0x63, 0x52, 0x08, 0x70, 0x61, 0x74, 0x68, 0x73, 0x70, 0x65, - 0x63, 0x22, 0x22, 0x0a, 0x0c, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x45, 0x0a, 0x14, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x68, 0x69, - 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2d, 0x0a, - 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x11, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x44, 0x53, 0x50, 0x61, 0x74, 0x68, 0x53, 0x70, - 0x65, 0x63, 0x52, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x42, 0x31, 0x5a, 0x2f, - 0x77, 0x77, 0x77, 0x2e, 0x76, 0x65, 0x6c, 0x6f, 0x63, 0x69, 0x64, 0x65, 0x78, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x67, 0x6f, 0x6c, 0x61, 0x6e, 0x67, 0x2f, 0x76, 0x65, 0x6c, 0x6f, 0x63, 0x69, 0x72, - 0x61, 0x70, 0x74, 0x6f, 0x72, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x79, 0x70, 0x65, 0x12, 0x15, 0x0a, 0x06, 0x69, 0x73, 0x5f, 0x64, 0x69, 0x72, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x08, 0x52, 0x05, 0x69, 0x73, 0x44, 0x69, 0x72, 0x22, 0x50, 0x0a, 0x0b, 0x44, 0x61, + 0x74, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2d, 0x0a, 0x08, 0x70, 0x61, 0x74, + 0x68, 0x73, 0x70, 0x65, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x44, 0x53, 0x50, 0x61, 0x74, 0x68, 0x53, 0x70, 0x65, 0x63, 0x52, 0x08, + 0x70, 0x61, 0x74, 0x68, 0x73, 0x70, 0x65, 0x63, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x22, 0x0a, 0x0c, + 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, + 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x22, 0x45, 0x0a, 0x14, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2d, 0x0a, 0x08, 0x63, 0x68, 0x69, 0x6c, + 0x64, 0x72, 0x65, 0x6e, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2e, 0x44, 0x53, 0x50, 0x61, 0x74, 0x68, 0x53, 0x70, 0x65, 0x63, 0x52, 0x08, 0x63, + 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x42, 0x31, 0x5a, 0x2f, 0x77, 0x77, 0x77, 0x2e, 0x76, + 0x65, 0x6c, 0x6f, 0x63, 0x69, 0x64, 0x65, 0x78, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x6c, + 0x61, 0x6e, 0x67, 0x2f, 0x76, 0x65, 0x6c, 0x6f, 0x63, 0x69, 0x72, 0x61, 0x70, 0x74, 0x6f, 0x72, + 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( diff --git a/api/proto/datastore.proto b/api/proto/datastore.proto index 1ff5e2bc8f8..c9b22a5f714 100644 --- a/api/proto/datastore.proto +++ b/api/proto/datastore.proto @@ -8,10 +8,12 @@ message DSPathSpec { repeated string components = 1; int64 path_type = 2; + bool is_dir = 3; } message DataRequest { DSPathSpec pathspec = 1; + bytes data = 2; } message DataResponse { diff --git a/bin/frontend.go b/bin/frontend.go index dc12fe89506..6f2c294d140 100644 --- a/bin/frontend.go +++ b/bin/frontend.go @@ -101,6 +101,14 @@ func startFrontend(sm *services.Service) (*api.Builder, error) { // first so other services can contact the master node. config_obj.Frontend.IsMaster = !*frontend_cmd_minion + + // Minions use the RemoteFileDataStore to sync with the server. + if !config_obj.Frontend.IsMaster { + logger.Info("Frontend will run as a minion.") + logger.Info("Enabling remote datastore since we are a minion.") + config_obj.Datastore.Implementation = "RemoteFileDataStore" + } + err := sm.Start(frontend.StartFrontendService) if err != nil { return nil, err diff --git a/datastore/datastore.go b/datastore/datastore.go index 67c198af48c..acb157b1fdd 100644 --- a/datastore/datastore.go +++ b/datastore/datastore.go @@ -47,6 +47,12 @@ type DatastoreInfo struct { // When WalkFunc return StopIteration we exit the walk. type WalkFunc func(urn api.DSPathSpec) error +// Raw level access only used internally rarely. +type RawDataStore interface { + GetBuffer(config_obj *config_proto.Config, urn api.DSPathSpec) ([]byte, error) + SetBuffer(config_obj *config_proto.Config, urn api.DSPathSpec, data []byte) error +} + type DataStore interface { // Reads a stored message from the datastore. If there is no // stored message at this URN, the function returns an @@ -93,6 +99,9 @@ func GetDB(config_obj *config_proto.Config) (DataStore, error) { return file_based_imp, nil + case "RemoteFileDataStore": + return remote_datastopre_imp, nil + case "Memcache": return memcache_imp, nil diff --git a/datastore/filebased.go b/datastore/filebased.go index 8294dd1f9a3..32b06affcac 100644 --- a/datastore/filebased.go +++ b/datastore/filebased.go @@ -388,3 +388,19 @@ func TraceDirectory(config_obj *config_proto.Config, fmt.Printf("Trace FileBaseDataStore: %v: %v\n", name, filename.AsDatastoreDirectory(config_obj)) } + +// Support RawDataStore interface +func (self *FileBaseDataStore) GetBuffer( + config_obj *config_proto.Config, + urn api.DSPathSpec) ([]byte, error) { + + return readContentFromFile( + config_obj, urn, true /* must exist */) +} + +func (self *FileBaseDataStore) SetBuffer( + config_obj *config_proto.Config, + urn api.DSPathSpec, data []byte) error { + + return writeContentToFile(config_obj, urn, data) +} diff --git a/datastore/memcache.go b/datastore/memcache.go index ffbe370cba5..8addb8b8e53 100644 --- a/datastore/memcache.go +++ b/datastore/memcache.go @@ -56,6 +56,13 @@ func (self *DirectoryMetadata) Set(key string, value api.DSPathSpec) { self.data[key] = value } +func (self *DirectoryMetadata) Remove(key string) { + self.mu.Lock() + defer self.mu.Unlock() + + delete(self.data, key) +} + func (self *DirectoryMetadata) Len() int { self.mu.Lock() defer self.mu.Unlock() @@ -278,15 +285,10 @@ func (self *MemcacheDatastore) SetData( urn api.DSPathSpec, data []byte) (err error) { - parent := urn.Dir() - parent_path := parent.AsDatastoreDirectory(config_obj) - md, pres := self.dir_cache.Get(parent_path) - if !pres { - // Get new dir metadata - md, err = self.get_dir_metadata(self.dir_cache, config_obj, parent) - if err != nil { - return err - } + // Get new dir metadata + md, err := self.get_dir_metadata(self.dir_cache, config_obj, urn.Dir()) + if err != nil { + return err } // Update the directory metadata. @@ -294,7 +296,9 @@ func (self *MemcacheDatastore) SetData( md.Set(md_key, urn) // Update the cache + parent_path := urn.Dir().AsDatastoreDirectory(config_obj) self.dir_cache.Set(parent_path, md) + return self.data_cache.Set(urn.AsClientPath(), &BulkData{ data: data, }) @@ -305,7 +309,26 @@ func (self *MemcacheDatastore) DeleteSubject( urn api.DSPathSpec) error { defer Instrument("delete", urn)() - return self.data_cache.Remove(urn.AsClientPath()) + err := self.data_cache.Remove(urn.AsClientPath()) + if err != nil { + return err + } + + // Get new dir metadata + md, err := self.get_dir_metadata(self.dir_cache, config_obj, urn.Dir()) + if err != nil { + return err + } + + // Update the directory metadata. + md_key := urn.Base() + api.GetExtensionForDatastore(urn) + md.Remove(md_key) + + // Update the cache + parent_path := urn.Dir().AsClientPath() + self.dir_cache.Set(parent_path, md) + + return nil } func (self *MemcacheDatastore) SetChildren( @@ -382,6 +405,29 @@ func (self *MemcacheDatastore) Clear() { self.dir_cache.Purge() } +// Support RawDataStore interface +func (self *MemcacheDatastore) GetBuffer( + config_obj *config_proto.Config, + urn api.DSPathSpec) ([]byte, error) { + path := urn.AsClientPath() + bulk_data_any, err := self.data_cache.Get(path) + bulk_data, ok := bulk_data_any.(*BulkData) + if !ok { + return nil, internalError + } + bulk_data.mu.Lock() + defer bulk_data.mu.Unlock() + + return bulk_data.data, err +} + +func (self *MemcacheDatastore) SetBuffer( + config_obj *config_proto.Config, + urn api.DSPathSpec, data []byte) error { + + return self.SetData(config_obj, urn, data) +} + func (self *MemcacheDatastore) Debug(config_obj *config_proto.Config) { for _, key := range self.dir_cache.GetKeys() { md, _ := self.dir_cache.Get(key) diff --git a/datastore/memcache_file.go b/datastore/memcache_file.go index 25935432e18..0258b67ff41 100644 --- a/datastore/memcache_file.go +++ b/datastore/memcache_file.go @@ -136,6 +136,7 @@ func (self *MemcacheFileDataStore) StartWriter( case MUTATION_OP_DEL_SUBJECT: file_based_imp.DeleteSubject(config_obj, mutation.urn) + self.invalidateDirCache(config_obj, mutation.urn.Dir()) } mutation.wg.Done() } @@ -319,6 +320,57 @@ func (self *MemcacheFileDataStore) Dump() []api.DSPathSpec { return self.cache.Dump() } +// Support RawDataStore interface +func (self *MemcacheFileDataStore) GetBuffer( + config_obj *config_proto.Config, + urn api.DSPathSpec) ([]byte, error) { + + bulk_data, err := self.cache.GetBuffer(config_obj, urn) + if err == nil { + metricLRUHit.Inc() + return bulk_data, err + } + + bulk_data, err = readContentFromFile( + config_obj, urn, true /* must exist */) + if err != nil { + return nil, err + } + + metricLRUMiss.Inc() + self.cache.SetData(config_obj, urn, bulk_data) + + return bulk_data, nil +} + +func (self *MemcacheFileDataStore) SetBuffer( + config_obj *config_proto.Config, + urn api.DSPathSpec, data []byte) error { + + err := self.cache.SetData(config_obj, urn, data) + if err != nil { + return err + } + + var wg sync.WaitGroup + wg.Add(1) + select { + case <-self.ctx.Done(): + return nil + + case self.writer <- &Mutation{ + op: MUTATION_OP_SET_SUBJECT, + urn: urn, + wg: &wg, + data: data}: + } + + if config_obj.Datastore.MemcacheWriteMutationBuffer < 0 { + wg.Wait() + } + return nil +} + // Recursively makes sure the directories are added to the cache. We // treat the file backing as authoritative, so if the dir cache is not // present in cache we read intermediate paths from disk. diff --git a/datastore/remote.go b/datastore/remote.go new file mode 100644 index 00000000000..2475bd7fd0d --- /dev/null +++ b/datastore/remote.go @@ -0,0 +1,197 @@ +// Implements a remote datastore + +package datastore + +import ( + "context" + "time" + + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + api_proto "www.velocidex.com/golang/velociraptor/api/proto" + config_proto "www.velocidex.com/golang/velociraptor/config/proto" + "www.velocidex.com/golang/velociraptor/file_store/api" + "www.velocidex.com/golang/velociraptor/file_store/path_specs" + "www.velocidex.com/golang/velociraptor/grpc_client" +) + +var ( + remote_datastopre_imp = NewRemoteDataStore() + RPC_TIMEOUT = 100 // Seconds +) + +type RemoteDataStore struct{} + +func (self *RemoteDataStore) GetSubject( + config_obj *config_proto.Config, + urn api.DSPathSpec, + message proto.Message) error { + + defer Instrument("read", urn)() + + ctx, cancel := context.WithTimeout(context.Background(), + time.Duration(RPC_TIMEOUT)*time.Second) + defer cancel() + + conn, closer, err := grpc_client.Factory.GetAPIClient(ctx, config_obj) + defer closer() + + result, err := conn.GetSubject(ctx, &api_proto.DataRequest{ + Pathspec: &api_proto.DSPathSpec{ + Components: urn.Components(), + PathType: int64(urn.Type()), + }}) + + if err != nil { + return err + } + + serialized_content := result.Data + if len(serialized_content) == 0 { + return nil + } + + // It is really a JSON blob + if serialized_content[0] == '{' { + err = protojson.Unmarshal(serialized_content, message) + } else { + err = proto.Unmarshal(serialized_content, message) + } + + return err +} + +func (self *RemoteDataStore) SetSubject( + config_obj *config_proto.Config, + urn api.DSPathSpec, + message proto.Message) error { + + defer Instrument("write", urn)() + + var value []byte + var err error + + if urn.Type() == api.PATH_TYPE_DATASTORE_JSON { + value, err = protojson.Marshal(message) + if err != nil { + return err + } + } else { + value, err = proto.Marshal(message) + } + + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), + time.Duration(RPC_TIMEOUT)*time.Second) + defer cancel() + + conn, closer, err := grpc_client.Factory.GetAPIClient(ctx, config_obj) + defer closer() + + _, err = conn.SetSubject(ctx, &api_proto.DataRequest{ + Data: value, + Pathspec: &api_proto.DSPathSpec{ + Components: urn.Components(), + PathType: int64(urn.Type()), + }}) + + return err +} + +func (self *RemoteDataStore) DeleteSubject( + config_obj *config_proto.Config, + urn api.DSPathSpec) error { + defer Instrument("delete", urn)() + + ctx, cancel := context.WithTimeout(context.Background(), + time.Duration(RPC_TIMEOUT)*time.Second) + defer cancel() + + conn, closer, err := grpc_client.Factory.GetAPIClient(ctx, config_obj) + defer closer() + + _, err = conn.DeleteSubject(ctx, &api_proto.DataRequest{ + Pathspec: &api_proto.DSPathSpec{ + Components: urn.Components(), + PathType: int64(urn.Type()), + }}) + + return err +} + +// Lists all the children of a URN. +func (self *RemoteDataStore) ListChildren( + config_obj *config_proto.Config, + urn api.DSPathSpec) ([]api.DSPathSpec, error) { + + defer Instrument("list", urn)() + + ctx, cancel := context.WithTimeout(context.Background(), + time.Duration(RPC_TIMEOUT)*time.Second) + defer cancel() + + conn, closer, err := grpc_client.Factory.GetAPIClient(ctx, config_obj) + defer closer() + + result, err := conn.ListChildren(ctx, &api_proto.DataRequest{ + Pathspec: &api_proto.DSPathSpec{ + Components: urn.Components(), + PathType: int64(urn.Type()), + }}) + + if err != nil { + return nil, err + } + + children := make([]api.DSPathSpec, 0, len(result.Children)) + for _, child := range result.Children { + child_urn := path_specs.NewUnsafeDatastorePath( + child.Components...).SetType(api.PathType(child.PathType)) + if child.IsDir { + child_urn = child_urn.SetDir() + } + children = append(children, child_urn) + } + + return children, err +} + +func (self *RemoteDataStore) Walk(config_obj *config_proto.Config, + root api.DSPathSpec, walkFn WalkFunc) error { + + all_children, err := self.ListChildren(config_obj, root) + if err != nil { + return err + } + + for _, child := range all_children { + // Recurse into directories + if child.IsDir() { + err := self.Walk(config_obj, child, walkFn) + if err != nil { + // Do not quit the walk early. + } + } else { + err := walkFn(child) + if err == StopIteration { + return nil + } + continue + } + } + + return nil +} + +// Called to close all db handles etc. Not thread safe. +func (self *RemoteDataStore) Close() {} +func (self *RemoteDataStore) Debug(config_obj *config_proto.Config) { +} + +func NewRemoteDataStore() *RemoteDataStore { + result := &RemoteDataStore{} + return result +} diff --git a/file_store/file_store.go b/file_store/file_store.go index 31373969713..e0d5035f4a7 100644 --- a/file_store/file_store.go +++ b/file_store/file_store.go @@ -40,7 +40,7 @@ func GetFileStore(config_obj *config_proto.Config) api.FileStore { case "Test": return memory.NewMemoryFileStore(config_obj) - case "FileBaseDataStore", "MemcacheFileDataStore": + case "FileBaseDataStore", "MemcacheFileDataStore", "RemoteFileDataStore": return directory.NewDirectoryFileStore(config_obj) default: @@ -58,7 +58,7 @@ func GetFileStoreFileSystemAccessor( switch config_obj.Datastore.Implementation { - case "FileBaseDataStore", "MemcacheFileDataStore": + case "FileBaseDataStore", "MemcacheFileDataStore", "RemoteFileDataStore": return accessors.NewFileStoreFileSystemAccessor( config_obj, directory.NewDirectoryFileStore(config_obj)), nil diff --git a/services/journal/replication.go b/services/journal/replication.go index 22f25fa7a9b..362a5a888c0 100644 --- a/services/journal/replication.go +++ b/services/journal/replication.go @@ -17,6 +17,7 @@ import ( config_proto "www.velocidex.com/golang/velociraptor/config/proto" "www.velocidex.com/golang/velociraptor/file_store" "www.velocidex.com/golang/velociraptor/file_store/api" + "www.velocidex.com/golang/velociraptor/grpc_client" "www.velocidex.com/golang/velociraptor/json" "www.velocidex.com/golang/velociraptor/logging" "www.velocidex.com/golang/velociraptor/result_sets" @@ -166,7 +167,8 @@ func (self *ReplicationService) Start( }() logger := logging.GetLogger(self.config_obj, &logging.FrontendComponent) - logger.Debug("Starting Replication service to master frontend") + logger.Debug("Starting Replication service to master frontend at %v", + grpc_client.GetAPIConnectionString(self.config_obj)) return nil } diff --git a/vql/functions/log.go b/vql/functions/log.go index 329dc216bfc..b58de946482 100644 --- a/vql/functions/log.go +++ b/vql/functions/log.go @@ -19,15 +19,27 @@ package functions import ( "context" + "time" "github.com/Velocidex/ordereddict" + "www.velocidex.com/golang/velociraptor/utils" vql_subsystem "www.velocidex.com/golang/velociraptor/vql" "www.velocidex.com/golang/vfilter" "www.velocidex.com/golang/vfilter/arg_parser" ) +const ( + LOG_TAG = "last_log" +) + +type logCache struct { + message string + time int64 +} + type LogFunctionArgs struct { - Message string `vfilter:"required,field=message,doc=Message to log."` + Message string `vfilter:"required,field=message,doc=Message to log."` + DedupTime int64 `vfilter:"optional,field=dedup,doc=Supporess same message in this many seconds (default 60 sec)."` } type LogFunction struct{} @@ -42,16 +54,39 @@ func (self *LogFunction) Call(ctx context.Context, return false } - last_log_str, ok := scope.GetContext("last_log") - if ok { - last_log, ok := last_log_str.(string) - if ok && arg.Message == last_log { - return true + if arg.DedupTime == 0 { + arg.DedupTime = 60 + } + + now := time.Now().Unix() + + last_log_any := vql_subsystem.CacheGet(scope, LOG_TAG) + + // No previous message was set - log it and save it. + if utils.IsNil(last_log_any) { + last_log := &logCache{ + message: arg.Message, + time: now, } + scope.Log("%v", arg.Message) + vql_subsystem.CacheSet(scope, LOG_TAG, last_log) + return true + } + + last_log, ok := last_log_any.(*logCache) + // Message is identical to last and within the dedup time. + if ok && last_log.message == arg.Message && + arg.DedupTime > 0 && // User can set dedup time negative to disable. + now < last_log.time+arg.DedupTime { + return true } + // Log it and store for next time. scope.Log("%v", arg.Message) - scope.SetContext("last_log", arg.Message) + vql_subsystem.CacheSet(scope, LOG_TAG, &logCache{ + message: arg.Message, + time: now, + }) return true } diff --git a/vql/parsers/sql.go b/vql/parsers/sql.go index e9c9342dee5..ae2d764e5ce 100644 --- a/vql/parsers/sql.go +++ b/vql/parsers/sql.go @@ -38,9 +38,11 @@ func (self SQLPlugin) GetHandleOther(scope vfilter.Scope, connstring string, dri cacheKey := fmt.Sprintf("%s %s", driver, connstring) client := vql_subsystem.CacheGet(scope, cacheKey) - if client == nil { + if utils.IsNil(client) { client, err := sqlx.Open(driver, connstring) if err != nil { + // Cache failure to connect. + vql_subsystem.CacheSet(scope, cacheKey, err) return nil, err } if driver == "mysql" { @@ -59,14 +61,17 @@ func (self SQLPlugin) GetHandleOther(scope vfilter.Scope, connstring string, dri return nil, err } + vql_subsystem.CacheSet(scope, cacheKey, client) return client, nil } switch t := client.(type) { case error: return nil, t + case *sqlx.DB: return t, nil + default: return nil, errors.New("Error") } diff --git a/vtesting/assert/wrapper.go b/vtesting/assert/wrapper.go index 224c7b792dc..4cec3b63897 100644 --- a/vtesting/assert/wrapper.go +++ b/vtesting/assert/wrapper.go @@ -30,20 +30,20 @@ func Equal(t TestingT, expected, actual interface{}, msgAndArgs ...interface{}) } } - assert.Equal(t, expected, actual, msgAndArgs) + assert.Equal(t, expected, actual, msgAndArgs...) } func NoError(t TestingT, err error, msgAndArgs ...interface{}) { - assert.NoError(t, err, msgAndArgs) + assert.NoError(t, err, msgAndArgs...) } func Regexp(t TestingT, expected, actual interface{}, msgAndArgs ...interface{}) { - assert.Regexp(t, expected, actual, msgAndArgs) + assert.Regexp(t, expected, actual, msgAndArgs...) } func True(t TestingT, expected bool, msgAndArgs ...interface{}) { - assert.True(t, expected, msgAndArgs) + assert.True(t, expected, msgAndArgs...) } func NotNil(t TestingT, expected interface{}, msgAndArgs ...interface{}) { - assert.NotNil(t, expected, msgAndArgs) + assert.NotNil(t, expected, msgAndArgs...) }