Skip to content

Commit

Permalink
Changes to support distributed architecture. (Velocidex#1384)
Browse files Browse the repository at this point in the history
- Added SetSubjectWithCompletion() to allow asynchronous writes to
   control visibility.
 - Remove Walk() from Datastore and Filestore interfaces.
 - Added gRPC health check API

 - Added an env_var VELOCIRAPTOR_SLOW_FILESYSTEM which introduces
   artificial delays to all filesystem activity. This helps to
   simulate behavior on slow network filesystems such as EFS.

 - Minions now write client events themselves to the filesystem, as
   well as sending them over gRPC. In turn the master does not write
   the events but simply feeds them to any server event queries it
   needs.

 - Minion's gRPC now delay sending events and cache in memory to
   increase the send buffer size and reduce number of RPC calls.

 - Added File store implementation MemcacheFileDataStore. This
   filestore queues data in memory and writes it asynchronously to
   storage. This helps to combine multiple parts of the result sets
   into larger filesystem writes for more efficient writing.

 - System.Flow.Completion is now a server event artifact to ensure
   more efficient writing.

 - Client info manager now manages client ping information. The data
   is cached in memory for a while and then flushed asynchronously to
   the filestore in order to avoid writing client ping information too
   often. Data is sent to a new server event
   Server.Internal.ClientPing periodically to update the master in a
   more efficient batch way.

 - Minion hunt dispatchers are now all read only - they update their
   hunt status by receiving an Server.Internal.HuntUpdate message so
   they do not need to read it from storage. Only the master hunt
   dispatcher is allowed to update hunts in storage.

 - Notification service now batches notifications to reduce gRPC
   traffic.
  • Loading branch information
scudette authored Nov 22, 2021
1 parent 63f122e commit ef20530
Show file tree
Hide file tree
Showing 138 changed files with 4,445 additions and 2,660 deletions.
71 changes: 0 additions & 71 deletions .github/workflows/codeql-analysis.yml

This file was deleted.

5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,8 @@ lint:

KapeFilesSync:
python3 scripts/kape_files.py ~/projects/KapeFiles/ > artifacts/definitions/Windows/KapeFiles/Targets.yaml

# Do this after fetching the build artifacts with `gh run download <RunID>`
UpdateCIArtifacts:
mv artifact/server/* artifacts/testdata/server/testcases/
mv artifact/windows/* artifacts/testdata/windows/
14 changes: 13 additions & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,19 @@ func (self *ApiServer) ListClients(
"User is not allowed to view clients.")
}

return search.SearchClients(ctx, self.config, in, user_name)
result, err := search.SearchClients(ctx, self.config, in, user_name)
if err != nil {
return nil, err
}

// Warm up the cache pre-emptively so we have fresh connected
// status
notifier := services.GetNotifier()
for _, item := range result.Items {
notifier.IsClientConnected(
ctx, self.config, item.ClientId, 0 /* timeout */)
}
return result, nil
}

func (self *ApiServer) NotifyClients(
Expand Down
23 changes: 13 additions & 10 deletions api/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,27 +102,30 @@ func (self *ApiServer) GetClient(
"User is not allowed to view clients.")
}

// Update the user's MRU
if in.UpdateMru {
err = search.UpdateMRU(self.config, user_name, in.ClientId)
if err != nil {
return nil, err
}
}

api_client, err := search.GetApiClient(ctx,
self.config,
in.ClientId,
!in.Lightweight, // Detailed
)
api_client, err := search.FastGetApiClient(ctx, self.config, in.ClientId)
if err != nil {
return nil, err
}

if self.server_obj != nil && !in.Lightweight &&
// Wait up to 2 seconds to find out if clients are connected.
services.GetNotifier().IsClientConnected(ctx,
self.config, in.ClientId, 2) {
api_client.LastSeenAt = uint64(time.Now().UnixNano() / 1000)
if self.server_obj != nil {
if !in.Lightweight &&
// Wait up to 2 seconds to find out if clients are connected.
services.GetNotifier().IsClientConnected(ctx,
self.config, in.ClientId, 2) {
api_client.LastSeenAt = uint64(time.Now().UnixNano() / 1000)
} else {
// Warm up the cache anyway.
go services.GetNotifier().IsClientConnected(
ctx, self.config, in.ClientId, 2)
}
}

return api_client, nil
Expand Down
23 changes: 21 additions & 2 deletions api/datastore.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package api

import (
"sync"

"github.com/golang/protobuf/ptypes/empty"
context "golang.org/x/net/context"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -65,7 +67,21 @@ func (self *ApiServer) SetSubject(
"Datastore has no raw access.")
}

err = raw_db.SetBuffer(self.config, getURN(in), in.Data)
if in.Sync {
var wg sync.WaitGroup

// Wait for the data to hit the disk.
wg.Add(1)
err = raw_db.SetBuffer(self.config, getURN(in), in.Data, func() {
wg.Done()
})
wg.Wait()

} else {

// Just write quickly.
err = raw_db.SetBuffer(self.config, getURN(in), in.Data, nil)
}
return &api_proto.DataResponse{}, err
}

Expand Down Expand Up @@ -95,6 +111,7 @@ func (self *ApiServer) ListChildren(
result.Children = append(result.Children, &api_proto.DSPathSpec{
Components: child.Components(),
PathType: int64(child.Type()),
Tag: child.Tag(),
IsDir: child.IsDir(),
})
}
Expand Down Expand Up @@ -128,5 +145,7 @@ func getURN(in *api_proto.DataRequest) api.DSPathSpec {
}

return path_specs.NewUnsafeDatastorePath(
path_spec.Components...).SetType(api.PathType(path_spec.PathType))
path_spec.Components...).
SetType(api.PathType(path_spec.PathType)).
SetTag(path_spec.Tag)
}
25 changes: 21 additions & 4 deletions api/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"testing"
"time"

"github.com/sebdah/goldie"
"github.com/stretchr/testify/suite"
Expand All @@ -13,6 +14,7 @@ import (
"www.velocidex.com/golang/velociraptor/file_store/test_utils"
"www.velocidex.com/golang/velociraptor/grpc_client"
"www.velocidex.com/golang/velociraptor/json"
"www.velocidex.com/golang/velociraptor/vtesting"
"www.velocidex.com/golang/velociraptor/vtesting/assert"
)

Expand All @@ -23,11 +25,10 @@ type DatastoreAPITest struct {
}

func (self *DatastoreAPITest) SetupTest() {
self.TestSuite.SetupTest()
self.ConfigObj = self.LoadConfig()
self.ConfigObj.API.BindPort = 8787

// Now bring up an API server.
self.ConfigObj.Frontend.ServerServices = &config_proto.ServerServicesConfig{}
self.ConfigObj.API.BindPort = 8101
self.TestSuite.SetupTest()

server_builder, err := NewServerBuilder(
self.Sm.Ctx, self.ConfigObj, self.Sm.Wg)
Expand All @@ -36,6 +37,19 @@ func (self *DatastoreAPITest) SetupTest() {
err = server_builder.WithAPIServer(self.Sm.Ctx, self.Sm.Wg)
assert.NoError(self.T(), err)

// Now bring up an API server.
self.ConfigObj.Frontend.ServerServices = &config_proto.ServerServicesConfig{}

// Wait for the server to come up.
conn, closer, err := grpc_client.Factory.GetAPIClient(
self.Sm.Ctx, self.ConfigObj)
assert.NoError(self.T(), err)
defer closer()

vtesting.WaitUntil(2*time.Second, self.T(), func() bool {
res, err := conn.Check(self.Sm.Ctx, &api_proto.HealthCheckRequest{})
return err == nil && res.Status == api_proto.HealthCheckResponse_SERVING
})
}

func (self *DatastoreAPITest) TestDatastore() {
Expand All @@ -53,9 +67,12 @@ func (self *DatastoreAPITest) TestDatastore() {
assert.NoError(self.T(), err)
defer closer()

test_utils.GetMemoryDataStore(self.T(), self.ConfigObj).Debug(self.ConfigObj)

res, err := conn.GetSubject(self.Sm.Ctx, &api_proto.DataRequest{
Pathspec: &api_proto.DSPathSpec{
Components: path_spec.Components(),
PathType: int64(path_spec.Type()),
}})
assert.NoError(self.T(), err)
assert.Equal(self.T(), res.Data, []byte("{\"name\":\"Velociraptor\"}"))
Expand Down
2 changes: 1 addition & 1 deletion api/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func getTransformer(

flow, err := flows.LoadCollectionContext(config_obj, client_id, flow_id)
if err != nil {
flow = &flows_proto.ArtifactCollectorContext{}
flow = flows.NewCollectionContext(config_obj)
}

return ordereddict.NewDict().
Expand Down
21 changes: 11 additions & 10 deletions api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,17 @@ func (self *ApiServer) PushEvents(
}

// Only return the first row
if true {
journal, err := services.GetJournal()
if err != nil {
return nil, err
}

err = journal.PushRowsToArtifact(self.config,
rows, in.Artifact, in.ClientId, in.FlowId)
return &empty.Empty{}, err
journal, err := services.GetJournal()
if err != nil {
return nil, err
}

// only broadcast the events for local listeners. Minions
// write the events themselves, so we just need to broadcast
// for any server event artifacts that occur.
journal.Broadcast(self.config,
rows, in.Artifact, in.ClientId, in.FlowId)
return &empty.Empty{}, err
}

return nil, status.Error(codes.InvalidArgument, "no peer certs?")
Expand Down Expand Up @@ -282,7 +283,7 @@ func getAllArtifacts(

file_store_factory := file_store.GetFileStore(config_obj)

return file_store_factory.Walk(log_path,
return api.Walk(file_store_factory, log_path,
func(full_path api.FSPathSpec, info os.FileInfo) error {
// Walking the events directory will give us
// all the day json files. Each day json file
Expand Down
16 changes: 16 additions & 0 deletions api/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package api

import (
context "golang.org/x/net/context"
"www.velocidex.com/golang/velociraptor/api/proto"
api_proto "www.velocidex.com/golang/velociraptor/api/proto"
)

func (self *ApiServer) Check(
ctx context.Context,
in *api_proto.HealthCheckRequest) (*api_proto.HealthCheckResponse, error) {

return &proto.HealthCheckResponse{
Status: api_proto.HealthCheckResponse_SERVING,
}, nil
}
20 changes: 20 additions & 0 deletions api/mock/api_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ef20530

Please sign in to comment.