Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class AccessController implements AuthorizationApi {
private final AccessControlService accessControlService;

public Mono<ResponseEntity<AuthenticationInfoDTO>> getUserAuthInfo(ServerWebExchange exchange) {
Mono<List<UserPermissionDTO>> permissions = accessControlService.getUser()
Mono<List<UserPermissionDTO>> permissions = AccessControlService.getUser()
.map(user -> accessControlService.getRoles()
.stream()
.filter(role -> user.groups().contains(role.getName()))
Expand Down Expand Up @@ -64,9 +64,9 @@ private List<UserPermissionDTO> mapPermissions(List<Permission> permissions, Lis
dto.setClusters(clusters);
dto.setResource(ResourceTypeDTO.fromValue(permission.getResource().toString().toUpperCase()));
dto.setValue(permission.getValue());
dto.setActions(permission.getActions()
dto.setActions(permission.getParsedActions()
.stream()
.map(String::toUpperCase)
.map(p -> p.name().toUpperCase())
.map(this::mapAction)
.filter(Objects::nonNull)
.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ public Mono<ResponseEntity<Void>> deleteConsumerGroup(String clusterName,
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.consumerGroup(id)
.consumerGroupActions(DELETE)
.consumerGroupActions(id, DELETE)
.operationName("deleteConsumerGroup")
.build();

Expand All @@ -66,8 +65,7 @@ public Mono<ResponseEntity<ConsumerGroupDetailsDTO>> getConsumerGroup(String clu
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.consumerGroup(consumerGroupId)
.consumerGroupActions(VIEW)
.consumerGroupActions(consumerGroupId, VIEW)
.operationName("getConsumerGroup")
.build();

Expand All @@ -84,8 +82,7 @@ public Mono<ResponseEntity<Flux<ConsumerGroupDTO>>> getTopicConsumerGroups(Strin
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(TopicAction.VIEW)
.topicActions(topicName, TopicAction.VIEW)
.operationName("getTopicConsumerGroups")
.build();

Expand Down Expand Up @@ -142,9 +139,8 @@ public Mono<ResponseEntity<Void>> resetConsumerGroupOffsets(String clusterName,
return resetDto.flatMap(reset -> {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(reset.getTopic())
.topicActions(TopicAction.VIEW)
.consumerGroupActions(RESET_OFFSETS)
.topicActions(reset.getTopic(), TopicAction.VIEW)
.consumerGroupActions(group, RESET_OFFSETS)
.operationName("resetConsumerGroupOffsets")
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, Stri

var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.connectActions(connectName, ConnectAction.VIEW)
.operationName("getConnectors")
.build();

Expand All @@ -73,8 +72,7 @@ public Mono<ResponseEntity<ConnectorDTO>> createConnector(String clusterName, St

var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW, ConnectAction.CREATE)
.connectActions(connectName, ConnectAction.VIEW, ConnectAction.CREATE)
.operationName("createConnector")
.build();

Expand All @@ -91,9 +89,7 @@ public Mono<ResponseEntity<ConnectorDTO>> getConnector(String clusterName, Strin

var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.connector(connectorName)
.connectActions(connectName, ConnectAction.VIEW)
.operationName("getConnector")
.build();

Expand All @@ -110,8 +106,7 @@ public Mono<ResponseEntity<Void>> deleteConnector(String clusterName, String con

var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
.connectActions(connectName, ConnectAction.VIEW, ConnectAction.EDIT)
.operationName("deleteConnector")
.operationParams(Map.of(CONNECTOR_NAME, connectName))
.build();
Expand All @@ -133,7 +128,6 @@ public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
) {
var context = AccessContext.builder()
.cluster(clusterName)
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
.operationName("getAllConnectors")
.build();

Expand All @@ -143,7 +137,6 @@ public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(

Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search)
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName))
.filterWhen(dto -> accessControlService.isConnectorAccessible(dto.getConnect(), dto.getName(), clusterName))
.sort(comparator);

return Mono.just(ResponseEntity.ok(job))
Expand All @@ -158,8 +151,7 @@ public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfig(String clust

var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.connectActions(connectName, ConnectAction.VIEW)
.operationName("getConnectorConfig")
.build();

Expand All @@ -178,8 +170,7 @@ public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName,

var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
.connectActions(connectName, ConnectAction.VIEW, ConnectAction.EDIT)
.operationName("setConnectorConfig")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();
Expand All @@ -205,8 +196,7 @@ public Mono<ResponseEntity<Void>> updateConnectorState(String clusterName, Strin

var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(connectActions)
.connectActions(connectName, connectActions)
.operationName("updateConnectorState")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();
Expand All @@ -225,8 +215,7 @@ public Mono<ResponseEntity<Flux<TaskDTO>>> getConnectorTasks(String clusterName,
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.connectActions(connectName, ConnectAction.VIEW)
.operationName("getConnectorTasks")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();
Expand All @@ -245,8 +234,7 @@ public Mono<ResponseEntity<Void>> restartConnectorTask(String clusterName, Strin

var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW, ConnectAction.RESTART)
.connectActions(connectName, ConnectAction.VIEW, ConnectAction.RESTART)
.operationName("restartConnectorTask")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();
Expand All @@ -264,8 +252,7 @@ public Mono<ResponseEntity<Flux<ConnectorPluginDTO>>> getConnectorPlugins(

var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.connectActions(connectName, ConnectAction.VIEW)
.operationName("getConnectorPlugins")
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ public Mono<ResponseEntity<Void>> deleteTopicMessages(

var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_DELETE)
.topicActions(topicName, MESSAGES_DELETE)
.build();

return validateAccess(context).<ResponseEntity<Void>>then(
Expand Down Expand Up @@ -90,8 +89,7 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
ServerWebExchange exchange) {
var contextBuilder = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.topicActions(topicName, MESSAGES_READ)
.operationName("getTopicMessages");

if (auditService.isAuditTopic(getCluster(clusterName), topicName)) {
Expand Down Expand Up @@ -128,8 +126,7 @@ public Mono<ResponseEntity<Void>> sendTopicMessages(

var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_PRODUCE)
.topicActions(topicName, MESSAGES_PRODUCE)
.operationName("sendTopicMessages")
.build();

Expand Down Expand Up @@ -175,8 +172,7 @@ public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterNam
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(TopicAction.VIEW)
.topicActions(topicName, TopicAction.VIEW)
.operationName("getSerdes")
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ public Mono<ResponseEntity<CompatibilityCheckResponseDTO>> checkSchemaCompatibil
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.VIEW)
.schemaActions(subject, SchemaAction.VIEW)
.operationName("checkSchemaCompatibility")
.build();

Expand All @@ -72,31 +71,31 @@ public Mono<ResponseEntity<CompatibilityCheckResponseDTO>> checkSchemaCompatibil
public Mono<ResponseEntity<SchemaSubjectDTO>> createNewSchema(
String clusterName, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubjectMono,
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schemaActions(SchemaAction.CREATE)
.operationName("createNewSchema")
.build();

return validateAccess(context).then(
newSchemaSubjectMono.flatMap(newSubject ->
schemaRegistryService.registerNewSchema(
getCluster(clusterName),
newSubject.getSubject(),
kafkaSrMapper.fromDto(newSubject)
)
).map(kafkaSrMapper::toDto)
.map(ResponseEntity::ok)
).doOnEach(sig -> audit(context, sig));
return newSchemaSubjectMono.flatMap(newSubject -> {
var context = AccessContext.builder()
.cluster(clusterName)
.schemaActions(newSubject.getSubject(), SchemaAction.CREATE)
.operationName("createNewSchema")
.build();
return validateAccess(context).then(
schemaRegistryService.registerNewSchema(
getCluster(clusterName),
newSubject.getSubject(),
kafkaSrMapper.fromDto(newSubject)
))
.map(kafkaSrMapper::toDto)
.map(ResponseEntity::ok)
.doOnEach(sig -> audit(context, sig));
}
);
}

@Override
public Mono<ResponseEntity<Void>> deleteLatestSchema(
String clusterName, String subject, ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.DELETE)
.schemaActions(subject, SchemaAction.DELETE)
.operationName("deleteLatestSchema")
.build();

Expand All @@ -112,8 +111,7 @@ public Mono<ResponseEntity<Void>> deleteSchema(
String clusterName, String subject, ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.DELETE)
.schemaActions(subject, SchemaAction.DELETE)
.operationName("deleteSchema")
.build();

Expand All @@ -129,8 +127,7 @@ public Mono<ResponseEntity<Void>> deleteSchemaByVersion(
String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subjectName)
.schemaActions(SchemaAction.DELETE)
.schemaActions(subjectName, SchemaAction.DELETE)
.operationName("deleteSchemaByVersion")
.build();

Expand All @@ -146,8 +143,7 @@ public Mono<ResponseEntity<Flux<SchemaSubjectDTO>>> getAllVersionsBySubject(
String clusterName, String subjectName, ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subjectName)
.schemaActions(SchemaAction.VIEW)
.schemaActions(subjectName, SchemaAction.VIEW)
.operationName("getAllVersionsBySubject")
.build();

Expand Down Expand Up @@ -175,8 +171,7 @@ public Mono<ResponseEntity<SchemaSubjectDTO>> getLatestSchema(String clusterName
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.VIEW)
.schemaActions(subject, SchemaAction.VIEW)
.operationName("getLatestSchema")
.build();

Expand All @@ -192,8 +187,7 @@ public Mono<ResponseEntity<SchemaSubjectDTO>> getSchemaByVersion(
String clusterName, String subject, Integer version, ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.VIEW)
.schemaActions(subject, SchemaAction.VIEW)
.operationName("getSchemaByVersion")
.operationParams(Map.of("subject", subject, "version", version))
.build();
Expand Down Expand Up @@ -248,7 +242,7 @@ public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schemaActions(SchemaAction.MODIFY_GLOBAL_COMPATIBILITY)
.schemaGlobalCompatChange()
.operationName("updateGlobalSchemaCompatibilityLevel")
.build();

Expand All @@ -268,16 +262,16 @@ public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(
public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(
String clusterName, String subject, @Valid Mono<CompatibilityLevelDTO> compatibilityLevelMono,
ServerWebExchange exchange) {

var context = AccessContext.builder()
.cluster(clusterName)
.schemaActions(SchemaAction.EDIT)
.schemaActions(subject, SchemaAction.EDIT)
.operationName("updateSchemaCompatibilityLevel")
.operationParams(Map.of("subject", subject))
.build();

return validateAccess(context).then(
compatibilityLevelMono
.flatMap(compatibilityLevelDTO ->
return compatibilityLevelMono.flatMap(compatibilityLevelDTO ->
validateAccess(context).then(
schemaRegistryService.updateSchemaCompatibility(
getCluster(clusterName),
subject,
Expand Down
Loading