Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/IndexIngestionSeparation.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ This separation enables:
# Important Note

> [!WARNING]
> **As of now, only brand new deployments are supported for Index and Ingestion Separation. No migration path is implemented, described or tested for existing deployments to move from a standard model to Index & Ingestion separation model.**
> **For customers deploying SmartBus on CMP, the Splunk Operator for Kubernetes (SOK) manages the configuration and lifecycle of the ingestor tier. The following SOK guide provides implementation details for setting up ingestion separation and integrating with existing indexers. This reference is primarily intended for CMP users leveraging SOK-managed ingestors.**

# Document Variables

Expand Down Expand Up @@ -40,7 +40,7 @@ SQS message bus inputs can be found in the table below.
| largeMessageStorePath | string | S3 path for Large Message Store |
| deadLetterQueueName | string | Name of the SQS dead letter queue |

Change of any of the bus inputs does not restart Splunk. It just updates the config values with no disruptions.
**First provisioning or update of any of the bus inputs requires Ingestor Cluster and Indexer Cluster Splunkd restart, but this restart is implemented automatically and done by SOK.**

## Example
```
Expand Down
18 changes: 18 additions & 0 deletions pkg/splunk/enterprise/indexercluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,15 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller
}

cr.Status.BusConfiguration = busConfig.Spec

for i := int32(0); i < cr.Spec.Replicas; i++ {
idxcClient := mgr.getClient(ctx, i)
err = idxcClient.RestartSplunk()
if err != nil {
return result, err
}
scopedLog.Info("Restarted splunk", "indexer", i)
}
}
}

Expand Down Expand Up @@ -565,6 +574,15 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient,
}

cr.Status.BusConfiguration = busConfig.Spec

for i := int32(0); i < cr.Spec.Replicas; i++ {
idxcClient := mgr.getClient(ctx, i)
err = idxcClient.RestartSplunk()
if err != nil {
return result, err
}
scopedLog.Info("Restarted splunk", "indexer", i)
}
}
}

Expand Down
36 changes: 34 additions & 2 deletions pkg/splunk/enterprise/ingestorcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr

// If bus config is updated
if !reflect.DeepEqual(cr.Status.BusConfiguration, busConfig.Spec) {
mgr := newIngestorClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient)
mgr := newIngestorClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client)

err = mgr.handlePushBusChange(ctx, cr, busConfig, client)
if err != nil {
Expand All @@ -238,6 +238,15 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr
}

cr.Status.BusConfiguration = busConfig.Spec

for i := int32(0); i < cr.Spec.Replicas; i++ {
ingClient := mgr.getClient(ctx, i)
err = ingClient.RestartSplunk()
if err != nil {
return result, err
}
scopedLog.Info("Restarted splunk", "ingestor", i)
}
}

// Upgrade fron automated MC to MC CRD
Expand Down Expand Up @@ -280,6 +289,27 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr
return result, nil
}

// getClient for ingestorClusterPodManager returns a SplunkClient for the member n
func (mgr *ingestorClusterPodManager) getClient(ctx context.Context, n int32) *splclient.SplunkClient {
reqLogger := log.FromContext(ctx)
scopedLog := reqLogger.WithName("ingestorClusterPodManager.getClient").WithValues("name", mgr.cr.GetName(), "namespace", mgr.cr.GetNamespace())

// Get Pod Name
memberName := GetSplunkStatefulsetPodName(SplunkIngestor, mgr.cr.GetName(), n)

// Get Fully Qualified Domain Name
fqdnName := splcommon.GetServiceFQDN(mgr.cr.GetNamespace(),
fmt.Sprintf("%s.%s", memberName, GetSplunkServiceName(SplunkIngestor, mgr.cr.GetName(), true)))

// Retrieve admin password from Pod
adminPwd, err := splutil.GetSpecificSecretTokenFromPod(ctx, mgr.c, memberName, mgr.cr.GetNamespace(), "password")
if err != nil {
scopedLog.Error(err, "Couldn't retrieve the admin password from pod")
}

return mgr.newSplunkClient(fmt.Sprintf("https://%s:8089", fqdnName), "admin", adminPwd)
}

// validateIngestorClusterSpec checks validity and makes default updates to a IngestorClusterSpec and returns error if something is wrong
func validateIngestorClusterSpec(ctx context.Context, c splcommon.ControllerClient, cr *enterpriseApi.IngestorCluster) error {
// We cannot have 0 replicas in IngestorCluster spec since this refers to number of ingestion pods in an ingestor cluster
Expand Down Expand Up @@ -372,19 +402,21 @@ func getChangedBusFieldsForIngestor(busConfig *enterpriseApi.BusConfiguration, b
}

type ingestorClusterPodManager struct {
c splcommon.ControllerClient
log logr.Logger
cr *enterpriseApi.IngestorCluster
secrets *corev1.Secret
newSplunkClient func(managementURI, username, password string) *splclient.SplunkClient
}

// newIngestorClusterPodManager function to create pod manager this is added to write unit test case
var newIngestorClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc) ingestorClusterPodManager {
var newIngestorClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc, c splcommon.ControllerClient) ingestorClusterPodManager {
return ingestorClusterPodManager{
log: log,
cr: cr,
secrets: secret,
newSplunkClient: newSplunkClient,
c: c,
}
}

Expand Down
12 changes: 4 additions & 8 deletions pkg/splunk/enterprise/ingestorcluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@ import (
"github.com/go-logr/logr"
enterpriseApi "github.com/splunk/splunk-operator/api/v4"
splclient "github.com/splunk/splunk-operator/pkg/splunk/client"
splcommon "github.com/splunk/splunk-operator/pkg/splunk/common"
spltest "github.com/splunk/splunk-operator/pkg/splunk/test"
splutil "github.com/splunk/splunk-operator/pkg/splunk/util"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

func init() {
Expand All @@ -56,11 +55,7 @@ func TestApplyIngestorCluster(t *testing.T) {

ctx := context.TODO()

scheme := runtime.NewScheme()
_ = enterpriseApi.AddToScheme(scheme)
_ = corev1.AddToScheme(scheme)
_ = appsv1.AddToScheme(scheme)
c := fake.NewClientBuilder().WithScheme(scheme).Build()
c := spltest.NewMockClient()

// Object definitions
busConfig := &enterpriseApi.BusConfiguration{
Expand Down Expand Up @@ -250,8 +245,9 @@ func TestApplyIngestorCluster(t *testing.T) {
// outputs.conf
origNew := newIngestorClusterPodManager
mockHTTPClient := &spltest.MockHTTPClient{}
newIngestorClusterPodManager = func(l logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, _ NewSplunkClientFunc) ingestorClusterPodManager {
newIngestorClusterPodManager = func(l logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, _ NewSplunkClientFunc, c splcommon.ControllerClient) ingestorClusterPodManager {
return ingestorClusterPodManager{
c: c,
log: l, cr: cr, secrets: secret,
newSplunkClient: func(uri, user, pass string) *splclient.SplunkClient {
return &splclient.SplunkClient{ManagementURI: uri, Username: user, Password: pass, Client: mockHTTPClient}
Expand Down
4 changes: 4 additions & 0 deletions pkg/splunk/enterprise/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2624,6 +2624,8 @@ func TestUpdateCRStatus(t *testing.T) {
WithStatusSubresource(&enterpriseApi.Standalone{}).
WithStatusSubresource(&enterpriseApi.MonitoringConsole{}).
WithStatusSubresource(&enterpriseApi.IndexerCluster{}).
WithStatusSubresource(&enterpriseApi.BusConfiguration{}).
WithStatusSubresource(&enterpriseApi.IngestorCluster{}).
WithStatusSubresource(&enterpriseApi.SearchHeadCluster{})
c := builder.Build()
ctx := context.TODO()
Expand Down Expand Up @@ -3302,9 +3304,11 @@ func TestGetCurrentImage(t *testing.T) {
WithStatusSubresource(&enterpriseApi.ClusterManager{}).
WithStatusSubresource(&enterpriseApi.Standalone{}).
WithStatusSubresource(&enterpriseApi.MonitoringConsole{}).
WithStatusSubresource(&enterpriseApi.BusConfiguration{}).
WithStatusSubresource(&enterpriseApi.IndexerCluster{}).
WithStatusSubresource(&enterpriseApi.SearchHeadCluster{}).
WithStatusSubresource(&enterpriseApi.IngestorCluster{})

client := builder.Build()
client.Create(ctx, &current)
_, err := ApplyClusterManager(ctx, client, &current)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,14 +384,6 @@ var _ = Describe("indingsep test", func() {
err = deployment.UpdateCR(ctx, bus)
Expect(err).To(Succeed(), "Unable to deploy Bus Configuration with updated CR")

// Ensure that Ingestor Cluster has not been restarted
testcaseEnvInst.Log.Info("Ensure that Ingestor Cluster has not been restarted")
testenv.IngestorReady(ctx, deployment, testcaseEnvInst)

// Ensure that Indexer Cluster has not been restarted
testcaseEnvInst.Log.Info("Ensure that Indexer Cluster has not been restarted")
testenv.SingleSiteIndexersReady(ctx, deployment, testcaseEnvInst)

// Get instance of current Ingestor Cluster CR with latest config
testcaseEnvInst.Log.Info("Get instance of current Ingestor Cluster CR with latest config")
ingest := &enterpriseApi.IngestorCluster{}
Expand Down
Loading