Skip to content

Commit

Permalink
Merge pull request #306 from DataDog/feat_graph_investigate_query_tim…
Browse files Browse the repository at this point in the history
…e_explosion

Feat graph investigate query time explosion
  • Loading branch information
Zenithar authored Dec 11, 2024
2 parents af44573 + efa60ec commit 06bf54c
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 97 deletions.
11 changes: 9 additions & 2 deletions deployments/kubehound/graph/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ COPY dsl/kubehound/pom.xml /home/app
RUN mvn -f /home/app/pom.xml clean install

# Now build our janusgraph wrapper container with KubeHound customizations
FROM janusgraph/janusgraph:1.0.0
FROM janusgraph/janusgraph:1.1.0
LABEL org.opencontainers.image.source="https://github.com/DataDog/kubehound/"

# Add our initialization script for the database schema to the startup directory
Expand Down Expand Up @@ -59,9 +59,16 @@ ENV gremlinserver.metrics.slf4jReporter.enabled=false
ENV gremlinserver.metrics.graphiteReporter.enabled=false
ENV gremlinserver.metrics.csvReporter.enabled=false

# Add safety net settings to prevent OOM and other issues
ENV janusgraph.query.force-index=false
ENV janusgraph.cluster.max-partitions=512
ENV janusgraph.query.batch=true
ENV janusgraph.query.hard-max-limit=100000
ENV janusgraph.query.smart-limit=true

# Performance tweaks based on: https://www.sailpoint.com/blog/souping-up-the-gremlin/
# gremlinPool will default to Runtime.availableProcessors()
ENV gremlinserver.gremlinPool=0
ENV gremlinserver.gremlinPool=0
# threadPoolWorker should be 2x VCPU (TODO: can we set dynamically?)
ENV gremlinserver.threadPoolWorker=16

Expand Down
3 changes: 2 additions & 1 deletion deployments/kubehound/graph/conf/jvm.options
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@
#################

-XX:+UseG1GC
-XX:+UseContainerSupport
-XX:SurvivorRatio=8
-XX:MaxTenuringThreshold=1

-javaagent:"/opt/janusgraph/lib/jmx_prometheus_javaagent-0.18.0.jar"=8099:/opt/janusgraph/lib/exporter-config.yaml
-javaagent:"/opt/janusgraph/lib/jmx_prometheus_javaagent-0.18.0.jar"=8099:/opt/janusgraph/lib/exporter-config.yaml
27 changes: 26 additions & 1 deletion deployments/kubehound/graph/kubehound-db-init.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,16 @@ mgmt.buildIndex('byServiceEndpoint', Vertex.class).addKey(serviceEndpoint).build
mgmt.buildIndex('byServiceDns', Vertex.class).addKey(serviceDns).buildCompositeIndex();
mgmt.buildIndex('byExposure', Vertex.class).addKey(exposure).buildCompositeIndex();

// Create composite indices for the properties we want to search on
mgmt.buildIndex('byClusterAndRunIDComposite', Vertex.class).addKey(cluster).addKey(runID).buildCompositeIndex();
mgmt.buildIndex('byClassAndRunIDComposite', Vertex.class).addKey(cls).addKey(runID).buildCompositeIndex();
mgmt.buildIndex('byClassAndClusterComposite', Vertex.class).addKey(cls).addKey(cluster).buildCompositeIndex();
mgmt.buildIndex('byClassAndTypeComposite', Vertex.class).addKey(cls).addKey(type).buildCompositeIndex();
mgmt.buildIndex('byClassAndExposureComposite', Vertex.class).addKey(cls).addKey(exposure).buildCompositeIndex();
mgmt.buildIndex('byTypeAndNameComposite', Vertex.class).addKey(type).addKey(name).buildCompositeIndex();
mgmt.buildIndex('byImageAndRunIDComposite', Vertex.class).addKey(image).addKey(runID).buildCompositeIndex();
mgmt.buildIndex('byAppAndRunIDComposite', Vertex.class).addKey(app).addKey(runID).buildCompositeIndex();
mgmt.buildIndex('byNamespaceAndRunIDComposite', Vertex.class).addKey(namespace).addKey(runID).buildCompositeIndex();

mgmt.commit();

Expand All @@ -194,9 +204,24 @@ ManagementSystem.awaitGraphIndexStatus(graph, 'byName').status(SchemaStatus.ENAB
ManagementSystem.awaitGraphIndexStatus(graph, 'byNamespace').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byType').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byCritical').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byPort').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byPortName').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byServiceEndpoint').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byServiceDns').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byExposure').status(SchemaStatus.ENABLED).call();

ManagementSystem.awaitGraphIndexStatus(graph, 'byClusterAndRunIDComposite').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byClassAndRunIDComposite').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byClassAndClusterComposite').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byClassAndTypeComposite').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byClassAndExposureComposite').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byTypeAndNameComposite').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byImageAndRunIDComposite').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byAppAndRunIDComposite').status(SchemaStatus.ENABLED).call();
ManagementSystem.awaitGraphIndexStatus(graph, 'byNamespaceAndRunIDComposite').status(SchemaStatus.ENABLED).call();

System.out.println("[KUBEHOUND] graph schema and indexes ready");
mgmt.close();

// Close the open connection
:remote close
:remote close
66 changes: 33 additions & 33 deletions deployments/kubehound/ui/LowHangingFruit-ContainerEscape.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,11 @@
" .has(\"runID\", graph.variables().get('runID_yourid').get().trim())\n",
" .where(\n",
" repeat(\n",
" outE().inV().simplePath() // Building the path from one vertex to another\n",
" outE().inV().simplePath().dedup() // Building the path from one vertex to another\n",
" ).until(\n",
" has(label, \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(label, \"Node\") // Keep only path ending with a critical asset\n",
" has(\"class\", \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(\"class\", \"Node\") // Keep only path ending with a critical asset\n",
" .limit(1)\n",
" )\n",
" .dedup().by(\"image\")\n",
Expand Down Expand Up @@ -201,11 +201,11 @@
" .has(\"runID\", graph.variables().get('runID_yourid').get().trim())\n",
" .where(\n",
" repeat(\n",
" outE().inV().simplePath() // Building the path from one vertex to another\n",
" outE().inV().simplePath().dedup() // Building the path from one vertex to another\n",
" ).until(\n",
" has(label, \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(label, \"Node\") // Keep only path ending with a critical asset\n",
" has(\"class\", \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(\"class\", \"Node\") // Keep only path ending with a critical asset\n",
" .limit(1)\n",
" )\n",
" .dedup()\n",
Expand Down Expand Up @@ -313,11 +313,11 @@
" .has(\"runID\", graph.variables().get('runID_yourid').get().trim())\n",
" .has(\"app\",graph.variables().get('containerEscape_vulnApp_yourid').get().trim())\n",
" .repeat(\n",
" outE().inV().simplePath() // Building the path from one vertex to another\n",
" outE().inV().simplePath().dedup() // Building the path from one vertex to another\n",
" ).until(\n",
" has(label, \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(label, \"Node\") // Keep only path ending with a critical asset\n",
" has(\"class\", \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(\"class\", \"Node\") // Keep only path ending with a critical asset\n",
" .path().by(elementMap())\n",
" .limit(1000)"
]
Expand Down Expand Up @@ -360,11 +360,11 @@
" .has(\"runID\", graph.variables().get('runID_yourid').get().trim())\n",
" .has(\"app\",graph.variables().get('containerEscape_vulnApp_yourid').get().trim())\n",
" .repeat(\n",
" outE().inV().simplePath() // Building the path from one vertex to another\n",
" outE().inV().simplePath().dedup() // Building the path from one vertex to another\n",
" ).until(\n",
" has(label, \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(label, \"Node\") // Keep only path ending with a critical asset\n",
" has(\"class\", \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(\"class\", \"Node\") // Keep only path ending with a critical asset\n",
" .path()\n",
" .by(valueMap(\"app\", \"class\",\"critical\").with(WithOptions.tokens,WithOptions.labels))\n",
" .dedup()\n",
Expand Down Expand Up @@ -402,11 +402,11 @@
" .has(\"runID\", graph.variables().get('runID_yourid').get().trim())\n",
" .has(\"app\",graph.variables().get('containerEscape_vulnApp_yourid').get().trim())\n",
" .repeat(\n",
" outE().inV().simplePath() // Building the path from one vertex to another\n",
" outE().inV().simplePath().dedup() // Building the path from one vertex to another\n",
" ).until(\n",
" has(label, \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(label, \"Node\") // Keep only path ending with a critical asset\n",
" has(\"class\", \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(\"class\", \"Node\") // Keep only path ending with a critical asset\n",
" .path().by(label())\n",
" .dedup()\n",
" .limit(1000)"
Expand Down Expand Up @@ -438,11 +438,11 @@
" .or().has(\"namespace\", within(graph.variables().get('containerEscape_whiteListedNamespace_yourid').get()))\n",
" )\n",
" .repeat(\n",
" outE().inV().simplePath() // Building the path from one vertex to another\n",
" outE().inV().simplePath().dedup() // Building the path from one vertex to another\n",
" ).until(\n",
" has(label, \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(label, \"Node\") // Keep only path ending with a critical asset\n",
" has(\"class\", \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(\"class\", \"Node\") // Keep only path ending with a critical asset\n",
" .path().by(elementMap())\n",
" .limit(1000)"
]
Expand All @@ -469,11 +469,11 @@
" .or().has(\"namespace\", within(graph.variables().get('containerEscape_whiteListedNamespace_yourid').get()))\n",
" )\n",
" .repeat(\n",
" outE().inV().simplePath() // Building the path from one vertex to another\n",
" outE().inV().simplePath().dedup() // Building the path from one vertex to another\n",
" ).until(\n",
" has(label, \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(label, \"Node\") // Keep only path ending with a critical asset\n",
" has(\"class\", \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(\"class\", \"Node\") // Keep only path ending with a critical asset\n",
" .path()\n",
" .by(valueMap(\"app\", \"class\",\"critical\").with(WithOptions.tokens,WithOptions.labels))\n",
" .dedup()\n",
Expand Down Expand Up @@ -502,12 +502,12 @@
" .or().has(\"namespace\", within(graph.variables().get('containerEscape_whiteListedNamespace_yourid').get()))\n",
" )\n",
" .repeat(\n",
" outE().inV().simplePath() // Building the path from one vertex to another\n",
" outE().inV().simplePath().dedup() // Building the path from one vertex to another\n",
" ).until(\n",
" has(label, \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(label, \"Node\") // Keep only path ending with a critical asset\n",
" .path().by(label())\n",
" has(\"class\", \"Node\") // Stop when meeting a critical asset\n",
" .or().loops().is(10) // Stop after X iteration\n",
" ).has(\"class\", \"Node\") // Keep only path ending with a critical asset\n",
" .path().by(\"class\")\n",
" .dedup()\n",
" .limit(1000)"
]
Expand Down
14 changes: 14 additions & 0 deletions pkg/kubehound/graph/vertex/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package vertex

var (
// Labels is a list of all possible labels for a vertex in the graph.
Labels = []string{
ContainerLabel,
EndpointLabel,
IdentityLabel,
NodeLabel,
PermissionSetLabel,
PodLabel,
VolumeLabel,
}
)
91 changes: 47 additions & 44 deletions pkg/kubehound/storage/graphdb/janusgraph_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,50 +67,53 @@ func (jgp *JanusGraphProvider) Prepare(ctx context.Context) error {
return nil
}

g := gremlin.Traversal_().WithRemote(jgp.drc)
tx := g.Tx()
defer tx.Close()

for {
// Begin a new transaction.
gtx, err := tx.Begin()
if err != nil {
return err
}

// Retrieve the number of vertices in the graph.
page, err := gtx.V().Count().Next()
if err != nil {
return err
}

// Decode the number of vertices from the page.
count, err := page.GetInt()
if err != nil {
return err
}

// If there are no more vertices to delete, break the loop.
if count == 0 {
break
}

// Delete the vertices in the graph.
err = <-gtx.V().Limit(deleteBatchSize).Drop().Iterate()
if err != nil {
return err
}

// Commit the transaction.
if err := tx.Commit(); err != nil {
return err
}

// Check context for cancellation.
select {
case <-ctx.Done():
return ctx.Err()
default:
// These vertex types are defined in the schema.
for _, vertexType := range vertex.Labels {
g := gremlin.Traversal_().WithRemote(jgp.drc)
tx := g.Tx()
defer tx.Close()

for {
// Begin a new transaction.
gtx, err := tx.Begin()
if err != nil {
return err
}

// Retrieve the number of vertices in the graph.
page, err := gtx.V().Has("class", vertexType).Count().Next()
if err != nil {
return err
}

// Decode the number of vertices from the page.
count, err := page.GetInt()
if err != nil {
return err
}

// If there are no more vertices to delete, break the loop.
if count == 0 {
break
}

// Delete the vertices in the graph.
err = <-gtx.V().Has("class", vertexType).Limit(deleteBatchSize).Drop().Iterate()
if err != nil {
return err
}

// Commit the transaction.
if err := tx.Commit(); err != nil {
return err
}

// Check context for cancellation.
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
}

Expand Down
5 changes: 2 additions & 3 deletions test/system/graph_dsl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ func (suite *DslTestSuite) TestTraversal_criticalPaths() {
// There are A LOT of paths in the test cluster. Just sample a few
expected := []string{
"path[Endpoint, ENDPOINT_EXPLOIT, Container, IDENTITY_ASSUME, Identity, PERMISSION_DISCOVER, PermissionSet]",
"path[Endpoint, ENDPOINT_EXPLOIT, Container, VOLUME_DISCOVER, Volume, TOKEN_STEAL, Identity, PERMISSION_DISCOVER, PermissionSet]",
"path[Endpoint, ENDPOINT_EXPLOIT, Container, CE_NSENTER, Node, IDENTITY_ASSUME, Identity, PERMISSION_DISCOVER, PermissionSet]",
"path[Endpoint, ENDPOINT_EXPLOIT, Container, CE_MODULE_LOAD, Node, IDENTITY_ASSUME, Identity, PERMISSION_DISCOVER, PermissionSet]",
"path[Endpoint, ENDPOINT_EXPLOIT, Container, CE_PRIV_MOUNT, Node, IDENTITY_ASSUME, Identity, PERMISSION_DISCOVER, PermissionSet]",
Expand All @@ -283,7 +282,7 @@ func (suite *DslTestSuite) TestTraversal_minHopsToCritical() {

serviceHops, err := res.GetInt()
suite.NoError(err)
suite.Equal(serviceHops, 4)
suite.Equal(4, serviceHops)

// Container should have 1 less hop
raw, err = suite.client.Submit("kh.containers().minHopsToCritical(6)")
Expand All @@ -295,7 +294,7 @@ func (suite *DslTestSuite) TestTraversal_minHopsToCritical() {

containerHops, err := res.GetInt()
suite.NoError(err)
suite.Equal(containerHops, serviceHops-1)
suite.Equal(serviceHops-1, containerHops)
}

func (suite *DslTestSuite) TestTraversal_criticalPathsFilter() {
Expand Down
2 changes: 1 addition & 1 deletion test/system/graph_edge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (suite *EdgeTestSuite) TestEdge_IDENTITY_ASSUME_Node() {
func (suite *EdgeTestSuite) TestEdge_POD_ATTACH() {
// Every pod should have a POD_ATTACH incoming from a node
rawCount, err := suite.g.V().
HasLabel("Pod").
Has("class", "Pod").
Count().Next()

suite.NoError(err)
Expand Down
30 changes: 18 additions & 12 deletions test/system/graph_vertex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,20 +356,26 @@ func (suite *VertexTestSuite) TestVertexIdentity() {

func (suite *VertexTestSuite) TestVertexClusterProperty() {
// All vertices should have the cluster property set
results, err := suite.g.V().
Values("cluster").
Dedup().
ToList()

suite.NoError(err)
suite.GreaterOrEqual(len(results), 1)

present := suite.resultsToStringArray(results)
expected := []string{
"kind-kubehound.test.local",
for _, label := range vertex.Labels {
suite.Run(label, func() {
results, err := suite.g.V().
Has("class", label).
Values("cluster").
Dedup().
ToList()

suite.NoError(err)
suite.GreaterOrEqual(len(results), 1)

present := suite.resultsToStringArray(results)
expected := []string{
"kind-kubehound.test.local",
}

suite.Subset(present, expected)
})
}

suite.Subset(present, expected)
}

func (suite *VertexTestSuite) TearDownSuite() {
Expand Down

0 comments on commit 06bf54c

Please sign in to comment.