Skip to content

Commit dbfecca

Browse files
Ganesh-RBGanesh Bombatkarcwperks
authored
Improve Health Status of security-auditlog Index in Single Node Clusters (#5030)
Signed-off-by: Ganesh Bombatkar <[email protected]> Signed-off-by: Craig Perkins <[email protected]> Co-authored-by: Ganesh Bombatkar <[email protected]> Co-authored-by: Craig Perkins <[email protected]>
1 parent 3c62a83 commit dbfecca

File tree

13 files changed

+178
-21
lines changed

13 files changed

+178
-21
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
package org.opensearch.security;
11+
12+
import java.io.IOException;
13+
14+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
15+
import org.apache.logging.log4j.LogManager;
16+
import org.apache.logging.log4j.Logger;
17+
import org.junit.ClassRule;
18+
import org.junit.Test;
19+
import org.junit.runner.RunWith;
20+
21+
import org.opensearch.test.framework.AuditCompliance;
22+
import org.opensearch.test.framework.AuditConfiguration;
23+
import org.opensearch.test.framework.AuditFilters;
24+
import org.opensearch.test.framework.cluster.ClusterManager;
25+
import org.opensearch.test.framework.cluster.LocalCluster;
26+
import org.opensearch.test.framework.cluster.TestRestClient;
27+
28+
import static org.hamcrest.MatcherAssert.assertThat;
29+
import static org.hamcrest.Matchers.containsString;
30+
import static org.opensearch.test.framework.TestSecurityConfig.AuthcDomain.AUTHC_HTTPBASIC_INTERNAL;
31+
import static org.opensearch.test.framework.TestSecurityConfig.User.USER_ADMIN;
32+
33+
@RunWith(com.carrotsearch.randomizedtesting.RandomizedRunner.class)
34+
@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
35+
public class InternalAuditLogTest {
36+
37+
private static final Logger log = LogManager.getLogger(InternalAuditLogTest.class);
38+
39+
@ClassRule
40+
public static final LocalCluster cluster = new LocalCluster.Builder().clusterManager(ClusterManager.SINGLENODE)
41+
.anonymousAuth(false)
42+
.authc(AUTHC_HTTPBASIC_INTERNAL)
43+
.users(USER_ADMIN)
44+
.internalAudit(
45+
new AuditConfiguration(true).compliance(new AuditCompliance().enabled(true))
46+
.filters(new AuditFilters().enabledRest(true).enabledTransport(true))
47+
)
48+
.build();
49+
50+
@Test
51+
public void testAuditLogShouldBeGreenInSingleNodeCluster() throws IOException {
52+
try (TestRestClient client = cluster.getRestClient(USER_ADMIN)) {
53+
client.get(""); // demo request for insuring audit-log index is created beforehand
54+
TestRestClient.HttpResponse indicesResponse = client.get("_cat/indices");
55+
56+
assertThat(indicesResponse.getBody(), containsString("security-auditlog"));
57+
assertThat(indicesResponse.getBody(), containsString("green"));
58+
}
59+
}
60+
}

src/integrationTest/java/org/opensearch/test/framework/cluster/LocalCluster.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,18 @@ public Builder audit(AuditConfiguration auditConfiguration) {
475475
return this;
476476
}
477477

478+
public Builder internalAudit(AuditConfiguration auditConfiguration) {
479+
if (auditConfiguration != null) {
480+
testSecurityConfig.audit(auditConfiguration);
481+
}
482+
if (auditConfiguration.isEnabled()) {
483+
nodeOverrideSettingsBuilder.put("plugins.security.audit.type", "internal_opensearch");
484+
} else {
485+
nodeOverrideSettingsBuilder.put("plugins.security.audit.type", "noop");
486+
}
487+
return this;
488+
}
489+
478490
public List<TestSecurityConfig.User> getUsers() {
479491
return testSecurityConfig.getUsers();
480492
}

src/main/java/org/opensearch/security/auditlog/impl/AuditLogImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public AuditLogImpl(
6969
) {
7070
super(settings, threadPool, resolver, clusterService, environment);
7171
this.settings = settings;
72-
this.messageRouter = new AuditMessageRouter(settings, clientProvider, threadPool, configPath);
72+
this.messageRouter = new AuditMessageRouter(settings, clientProvider, threadPool, configPath, clusterService);
7373
this.messageRouterEnabled = this.messageRouter.isEnabled();
7474

7575
log.info("Message routing enabled: {}", this.messageRouterEnabled);

src/main/java/org/opensearch/security/auditlog/routing/AuditMessageRouter.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.logging.log4j.Logger;
2424

2525
import org.opensearch.client.Client;
26+
import org.opensearch.cluster.service.ClusterService;
2627
import org.opensearch.common.settings.Settings;
2728
import org.opensearch.security.auditlog.config.ThreadPoolConfig;
2829
import org.opensearch.security.auditlog.impl.AuditCategory;
@@ -43,9 +44,15 @@ public class AuditMessageRouter {
4344
final SinkProvider sinkProvider;
4445
final AsyncStoragePool storagePool;
4546

46-
public AuditMessageRouter(final Settings settings, final Client clientProvider, ThreadPool threadPool, final Path configPath) {
47+
public AuditMessageRouter(
48+
final Settings settings,
49+
final Client clientProvider,
50+
ThreadPool threadPool,
51+
final Path configPath,
52+
final ClusterService clusterService
53+
) {
4754
this(
48-
new SinkProvider(settings, clientProvider, threadPool, configPath),
55+
new SinkProvider(settings, clientProvider, threadPool, configPath, clusterService),
4956
new AsyncStoragePool(ThreadPoolConfig.getConfig(settings))
5057
);
5158
}

src/main/java/org/opensearch/security/auditlog/sink/AbstractInternalOpenSearchSink.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,15 @@
1212
package org.opensearch.security.auditlog.sink;
1313

1414
import java.io.IOException;
15+
import java.util.Map;
16+
17+
import com.google.common.collect.ImmutableMap;
1518

1619
import org.opensearch.action.DocWriteRequest;
1720
import org.opensearch.action.index.IndexRequestBuilder;
1821
import org.opensearch.action.support.WriteRequest.RefreshPolicy;
1922
import org.opensearch.client.Client;
23+
import org.opensearch.cluster.service.ClusterService;
2024
import org.opensearch.common.settings.Settings;
2125
import org.opensearch.common.unit.TimeValue;
2226
import org.opensearch.common.util.concurrent.ThreadContext.StoredContext;
@@ -29,7 +33,9 @@ public abstract class AbstractInternalOpenSearchSink extends AuditLogSink {
2933

3034
protected final Client clientProvider;
3135
private final ThreadPool threadPool;
36+
protected final ClusterService clusterService;
3237
private final DocWriteRequest.OpType storeOpType;
38+
final static Map<String, Object> indexSettings = ImmutableMap.of("index.number_of_shards", 1, "index.auto_expand_replicas", "0-1");
3339

3440
public AbstractInternalOpenSearchSink(
3541
final String name,
@@ -38,19 +44,23 @@ public AbstractInternalOpenSearchSink(
3844
final Client clientProvider,
3945
ThreadPool threadPool,
4046
AuditLogSink fallbackSink,
41-
DocWriteRequest.OpType storeOpType
47+
DocWriteRequest.OpType storeOpType,
48+
ClusterService clusterService
4249
) {
4350
super(name, settings, settingsPrefix, fallbackSink);
4451
this.clientProvider = clientProvider;
4552
this.threadPool = threadPool;
4653
this.storeOpType = storeOpType;
54+
this.clusterService = clusterService;
4755
}
4856

4957
@Override
5058
public void close() throws IOException {
5159

5260
}
5361

62+
protected abstract boolean createIndexIfAbsent(String indexName);
63+
5464
public boolean doStore(final AuditMessage msg, String indexName) {
5565

5666
if (Boolean.parseBoolean(
@@ -64,6 +74,12 @@ public boolean doStore(final AuditMessage msg, String indexName) {
6474

6575
try (StoredContext ctx = threadPool.getThreadContext().stashContext()) {
6676
try {
77+
boolean ok = createIndexIfAbsent(indexName);
78+
if (!ok) {
79+
log.error("Failed to create index {}", indexName);
80+
return false;
81+
}
82+
6783
final IndexRequestBuilder irb = clientProvider.prepareIndex(indexName)
6884
.setRefreshPolicy(RefreshPolicy.IMMEDIATE)
6985
.setSource(msg.getAsMap());

src/main/java/org/opensearch/security/auditlog/sink/InternalOpenSearchDataStreamSink.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.opensearch.cluster.metadata.ComposableIndexTemplate;
2626
import org.opensearch.cluster.metadata.DataStream;
2727
import org.opensearch.cluster.metadata.Template;
28+
import org.opensearch.cluster.service.ClusterService;
2829
import org.opensearch.common.settings.Settings;
2930
import org.opensearch.security.auditlog.impl.AuditMessage;
3031
import org.opensearch.security.support.ConfigConstants;
@@ -43,9 +44,10 @@ public InternalOpenSearchDataStreamSink(
4344
final Path configPath,
4445
final Client clientProvider,
4546
ThreadPool threadPool,
46-
AuditLogSink fallbackSink
47+
AuditLogSink fallbackSink,
48+
ClusterService clusterService
4749
) {
48-
super(name, settings, settingsPrefix, clientProvider, threadPool, fallbackSink, DocWriteRequest.OpType.CREATE);
50+
super(name, settings, settingsPrefix, clientProvider, threadPool, fallbackSink, DocWriteRequest.OpType.CREATE, clusterService);
4951
Settings sinkSettings = getSinkSettings(settingsPrefix);
5052

5153
this.dataStreamName = sinkSettings.get(ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_NAME, "opensearch-security-auditlog");
@@ -132,6 +134,12 @@ private boolean initDataStream() {
132134
return this.dataStreamInitialized;
133135
}
134136

137+
@Override
138+
public boolean createIndexIfAbsent(String indexName) {
139+
// datastream is initialized in initDataStream
140+
return true;
141+
}
142+
135143
@Override
136144
public void close() throws IOException {
137145

src/main/java/org/opensearch/security/auditlog/sink/InternalOpenSearchSink.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414
import java.io.IOException;
1515
import java.nio.file.Path;
1616

17+
import org.opensearch.ResourceAlreadyExistsException;
18+
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
1719
import org.opensearch.client.Client;
20+
import org.opensearch.cluster.service.ClusterService;
1821
import org.opensearch.common.settings.Settings;
1922
import org.opensearch.security.auditlog.impl.AuditMessage;
2023
import org.opensearch.security.support.ConfigConstants;
@@ -36,9 +39,10 @@ public InternalOpenSearchSink(
3639
final Path configPath,
3740
final Client clientProvider,
3841
ThreadPool threadPool,
39-
AuditLogSink fallbackSink
42+
AuditLogSink fallbackSink,
43+
ClusterService clusterService
4044
) {
41-
super(name, settings, settingsPrefix, clientProvider, threadPool, fallbackSink, null);
45+
super(name, settings, settingsPrefix, clientProvider, threadPool, fallbackSink, null, clusterService);
4246

4347
Settings sinkSettings = getSinkSettings(settingsPrefix);
4448
this.index = sinkSettings.get(ConfigConstants.SECURITY_AUDIT_OPENSEARCH_INDEX, "'security-auditlog-'YYYY.MM.dd");
@@ -54,6 +58,23 @@ public InternalOpenSearchSink(
5458
}
5559
}
5660

61+
@Override
62+
public boolean createIndexIfAbsent(String indexName) {
63+
if (clusterService.state().metadata().hasIndex(indexName)) {
64+
return true;
65+
}
66+
67+
try {
68+
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(indexSettings);
69+
final boolean ok = clientProvider.admin().indices().create(createIndexRequest).actionGet().isAcknowledged();
70+
log.info("Index {} created?: {}", indexName, ok);
71+
return ok;
72+
} catch (ResourceAlreadyExistsException resourceAlreadyExistsException) {
73+
log.info("Index {} already exists", indexName);
74+
return true;
75+
}
76+
}
77+
5778
@Override
5879
public void close() throws IOException {
5980

src/main/java/org/opensearch/security/auditlog/sink/SinkProvider.java

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.logging.log4j.Logger;
2121

2222
import org.opensearch.client.Client;
23+
import org.opensearch.cluster.service.ClusterService;
2324
import org.opensearch.common.settings.Settings;
2425
import org.opensearch.security.dlic.rest.support.Utils;
2526
import org.opensearch.security.support.ConfigConstants;
@@ -34,21 +35,35 @@ public class SinkProvider {
3435
private final ThreadPool threadPool;
3536
private final Path configPath;
3637
private final Settings settings;
38+
private final ClusterService clusterService;
3739
final Map<String, AuditLogSink> allSinks = new HashMap<>();
3840
AuditLogSink defaultSink;
3941
AuditLogSink fallbackSink;
4042

41-
public SinkProvider(final Settings settings, final Client clientProvider, ThreadPool threadPool, final Path configPath) {
43+
public SinkProvider(
44+
final Settings settings,
45+
final Client clientProvider,
46+
ThreadPool threadPool,
47+
final Path configPath,
48+
final ClusterService clusterService
49+
) {
4250
this.settings = settings;
4351
this.clientProvider = clientProvider;
4452
this.threadPool = threadPool;
4553
this.configPath = configPath;
54+
this.clusterService = clusterService;
4655

4756
// fall back sink, make sure we don't lose messages
4857
String fallbackConfigPrefix = ConfigConstants.SECURITY_AUDIT_CONFIG_ENDPOINTS + "." + FALLBACKSINK_NAME;
4958
Settings fallbackSinkSettings = settings.getAsSettings(fallbackConfigPrefix);
5059
if (!fallbackSinkSettings.isEmpty()) {
51-
this.fallbackSink = createSink(FALLBACKSINK_NAME, fallbackSinkSettings.get("type"), settings, fallbackConfigPrefix + ".config");
60+
this.fallbackSink = createSink(
61+
FALLBACKSINK_NAME,
62+
fallbackSinkSettings.get("type"),
63+
settings,
64+
fallbackConfigPrefix + ".config",
65+
clusterService
66+
);
5267
}
5368

5469
// make sure we always have a fallback to write to
@@ -63,7 +78,8 @@ public SinkProvider(final Settings settings, final Client clientProvider, Thread
6378
DEFAULTSINK_NAME,
6479
settings.get(ConfigConstants.SECURITY_AUDIT_TYPE_DEFAULT),
6580
settings,
66-
ConfigConstants.SECURITY_AUDIT_CONFIG_DEFAULT
81+
ConfigConstants.SECURITY_AUDIT_CONFIG_DEFAULT,
82+
clusterService
6783
);
6884
if (defaultSink == null) {
6985
log.error("Default endpoint could not be created, auditlog will not work properly.");
@@ -92,7 +108,8 @@ public SinkProvider(final Settings settings, final Client clientProvider, Thread
92108
sinkName,
93109
type,
94110
this.settings,
95-
ConfigConstants.SECURITY_AUDIT_CONFIG_ENDPOINTS + "." + sinkName + ".config"
111+
ConfigConstants.SECURITY_AUDIT_CONFIG_ENDPOINTS + "." + sinkName + ".config",
112+
clusterService
96113
);
97114
if (sink == null) {
98115
log.error("Endpoint '{}' could not be created, check log file for further information.", sinkName);
@@ -128,12 +145,27 @@ protected void close(AuditLogSink sink) {
128145
}
129146
}
130147

131-
private final AuditLogSink createSink(final String name, final String type, final Settings settings, final String settingsPrefix) {
148+
private final AuditLogSink createSink(
149+
final String name,
150+
final String type,
151+
final Settings settings,
152+
final String settingsPrefix,
153+
final ClusterService clusterService
154+
) {
132155
AuditLogSink sink = null;
133156
if (type != null) {
134157
switch (type.toLowerCase()) {
135158
case "internal_opensearch":
136-
sink = new InternalOpenSearchSink(name, settings, settingsPrefix, configPath, clientProvider, threadPool, fallbackSink);
159+
sink = new InternalOpenSearchSink(
160+
name,
161+
settings,
162+
settingsPrefix,
163+
configPath,
164+
clientProvider,
165+
threadPool,
166+
fallbackSink,
167+
clusterService
168+
);
137169
break;
138170
case "internal_opensearch_data_stream":
139171
sink = new InternalOpenSearchDataStreamSink(
@@ -143,7 +175,8 @@ private final AuditLogSink createSink(final String name, final String type, fina
143175
configPath,
144176
clientProvider,
145177
threadPool,
146-
fallbackSink
178+
fallbackSink,
179+
clusterService
147180
);
148181
break;
149182
case "external_opensearch":

src/test/java/org/opensearch/security/auditlog/AbstractAuditlogiUnitTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ protected void validateJson(final String json) throws Exception { // this functi
114114
}
115115

116116
protected AuditMessageRouter createMessageRouterComplianceEnabled(Settings settings) {
117-
AuditMessageRouter router = new AuditMessageRouter(settings, null, null, null);
117+
AuditMessageRouter router = new AuditMessageRouter(settings, null, null, null, null);
118118
router.enableRoutes(settings);
119119
return router;
120120
}

src/test/java/org/opensearch/security/auditlog/routing/RoutingConfigurationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public void testNoDefaultSink() throws Exception {
7070
)
7171
)
7272
.build();
73-
AuditMessageRouter router = new AuditMessageRouter(settings, null, null, null);
73+
AuditMessageRouter router = new AuditMessageRouter(settings, null, null, null, null);
7474
// no default sink, audit log not enabled
7575
assertThat(router.isEnabled(), is(false));
7676
assertThat(router.defaultSink, is(nullValue()));

0 commit comments

Comments
 (0)