Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BE: Controllers structure minor refactr #4110

Merged
merged 2 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,46 @@

import com.provectus.kafka.ui.exception.ClusterNotFoundException;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.ClustersStorage;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import org.springframework.beans.factory.annotation.Autowired;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;

public abstract class AbstractController {

private ClustersStorage clustersStorage;
protected ClustersStorage clustersStorage;
protected AccessControlService accessControlService;
protected AuditService auditService;

protected KafkaCluster getCluster(String name) {
return clustersStorage.getClusterByName(name)
.orElseThrow(() -> new ClusterNotFoundException(
String.format("Cluster with name '%s' not found", name)));
}

protected Mono<Void> validateAccess(AccessContext context) {
return accessControlService.validateAccess(context);
}

protected void audit(AccessContext acxt, Signal<?> sig) {
auditService.audit(acxt, sig);
}

@Autowired
public void setClustersStorage(ClustersStorage clustersStorage) {
this.clustersStorage = clustersStorage;
}

@Autowired
public void setAccessControlService(AccessControlService accessControlService) {
this.accessControlService = accessControlService;
}

@Autowired
public void setAuditService(AuditService auditService) {
this.auditService = auditService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.service.acl.AclsService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.resource.PatternType;
Expand All @@ -29,8 +27,6 @@
public class AclsController extends AbstractController implements AclsApi {

private final AclsService aclsService;
private final AccessControlService accessControlService;
private final AuditService auditService;

@Override
public Mono<ResponseEntity<Void>> createAcl(String clusterName, Mono<KafkaAclDTO> kafkaAclDto,
Expand All @@ -41,11 +37,11 @@ public Mono<ResponseEntity<Void>> createAcl(String clusterName, Mono<KafkaAclDTO
.operationName("createAcl")
.build();

return accessControlService.validateAccess(context)
return validateAccess(context)
.then(kafkaAclDto)
.map(ClusterMapper::toAclBinding)
.flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}

Expand All @@ -58,11 +54,11 @@ public Mono<ResponseEntity<Void>> deleteAcl(String clusterName, Mono<KafkaAclDTO
.operationName("deleteAcl")
.build();

return accessControlService.validateAccess(context)
return validateAccess(context)
.then(kafkaAclDto)
.map(ClusterMapper::toAclBinding)
.flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}

Expand All @@ -88,12 +84,12 @@ public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,

var filter = new ResourcePatternFilter(resourceType, resourceName, namePatternType);

return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
Mono.just(
ResponseEntity.ok(
aclsService.listAcls(getCluster(clusterName), filter)
.map(ClusterMapper::toKafkaAclDto)))
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}

@Override
Expand All @@ -104,11 +100,11 @@ public Mono<ResponseEntity<String>> getAclAsCsv(String clusterName, ServerWebExc
.operationName("getAclAsCsv")
.build();

return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
aclsService.getAclAsCsvString(getCluster(clusterName))
.map(ResponseEntity::ok)
.flatMap(Mono::just)
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
);
}

Expand All @@ -120,10 +116,10 @@ public Mono<ResponseEntity<Void>> syncAclsCsv(String clusterName, Mono<String> c
.operationName("syncAclsCsv")
.build();

return accessControlService.validateAccess(context)
return validateAccess(context)
.then(csvMono)
.flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}

Expand All @@ -137,10 +133,10 @@ public Mono<ResponseEntity<Void>> createConsumerAcl(String clusterName,
.operationName("createConsumerAcl")
.build();

return accessControlService.validateAccess(context)
return validateAccess(context)
.then(createConsumerAclDto)
.flatMap(req -> aclsService.createConsumerAcl(getCluster(clusterName), req))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}

Expand All @@ -154,10 +150,10 @@ public Mono<ResponseEntity<Void>> createProducerAcl(String clusterName,
.operationName("createProducerAcl")
.build();

return accessControlService.validateAccess(context)
return validateAccess(context)
.then(createProducerAclDto)
.flatMap(req -> aclsService.createProducerAcl(getCluster(clusterName), req))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}

Expand All @@ -171,10 +167,10 @@ public Mono<ResponseEntity<Void>> createStreamAppAcl(String clusterName,
.operationName("createStreamAppAcl")
.build();

return accessControlService.validateAccess(context)
return validateAccess(context)
.then(createStreamAppAclDto)
.flatMap(req -> aclsService.createStreamAppAcl(getCluster(clusterName), req))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.ApplicationInfoService;
import com.provectus.kafka.ui.service.KafkaClusterFactory;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import com.provectus.kafka.ui.util.ApplicationRestarter;
import com.provectus.kafka.ui.util.DynamicConfigOperations;
import com.provectus.kafka.ui.util.DynamicConfigOperations.PropertiesStructure;
Expand All @@ -39,7 +37,7 @@
@Slf4j
@RestController
@RequiredArgsConstructor
public class ApplicationConfigController implements ApplicationConfigApi {
public class ApplicationConfigController extends AbstractController implements ApplicationConfigApi {

private static final PropertiesMapper MAPPER = Mappers.getMapper(PropertiesMapper.class);

Expand All @@ -51,12 +49,10 @@ interface PropertiesMapper {
ApplicationConfigPropertiesDTO toDto(PropertiesStructure propertiesStructure);
}

private final AccessControlService accessControlService;
private final DynamicConfigOperations dynamicConfigOperations;
private final ApplicationRestarter restarter;
private final KafkaClusterFactory kafkaClusterFactory;
private final ApplicationInfoService applicationInfoService;
private final AuditService auditService;

@Override
public Mono<ResponseEntity<ApplicationInfoDTO>> getApplicationInfo(ServerWebExchange exchange) {
Expand All @@ -69,12 +65,12 @@ public Mono<ResponseEntity<ApplicationConfigDTO>> getCurrentConfig(ServerWebExch
.applicationConfigActions(VIEW)
.operationName("getCurrentConfig")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(Mono.fromSupplier(() -> ResponseEntity.ok(
new ApplicationConfigDTO()
.properties(MAPPER.toDto(dynamicConfigOperations.getCurrentProperties()))
)))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}

@Override
Expand All @@ -84,14 +80,14 @@ public Mono<ResponseEntity<Void>> restartWithConfig(Mono<RestartRequestDTO> rest
.applicationConfigActions(EDIT)
.operationName("restartWithConfig")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(restartRequestDto)
.<ResponseEntity<Void>>map(dto -> {
dynamicConfigOperations.persist(MAPPER.fromDto(dto.getConfig().getProperties()));
restarter.requestRestart();
return ResponseEntity.ok().build();
})
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}

@Override
Expand All @@ -101,13 +97,13 @@ public Mono<ResponseEntity<UploadedFileInfoDTO>> uploadConfigRelatedFile(Flux<Pa
.applicationConfigActions(EDIT)
.operationName("uploadConfigRelatedFile")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(fileFlux.single())
.flatMap(file ->
dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file)
.map(path -> new UploadedFileInfoDTO().location(path.toString()))
.map(ResponseEntity::ok))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}

@Override
Expand All @@ -117,7 +113,7 @@ public Mono<ResponseEntity<ApplicationConfigValidationDTO>> validateConfig(Mono<
.applicationConfigActions(EDIT)
.operationName("validateConfig")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(configDto)
.flatMap(config -> {
PropertiesStructure propertiesStructure = MAPPER.fromDto(config.getProperties());
Expand All @@ -126,7 +122,7 @@ public Mono<ResponseEntity<ApplicationConfigValidationDTO>> validateConfig(Mono<
.map(validations -> new ApplicationConfigValidationDTO().clusters(validations));
})
.map(ResponseEntity::ok)
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}

private Mono<Map<String, ClusterConfigValidationDTO>> validateClustersConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.service.BrokerService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
Expand All @@ -31,9 +29,6 @@ public class BrokersController extends AbstractController implements BrokersApi
private final BrokerService brokerService;
private final ClusterMapper clusterMapper;

private final AuditService auditService;
private final AccessControlService accessControlService;

@Override
public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
ServerWebExchange exchange) {
Expand All @@ -43,9 +38,9 @@ public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
.build();

var job = brokerService.getBrokers(getCluster(clusterName)).map(clusterMapper::toBrokerDto);
return accessControlService.validateAccess(context)
return validateAccess(context)
.thenReturn(ResponseEntity.ok(job))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}

@Override
Expand All @@ -57,14 +52,14 @@ public Mono<ResponseEntity<BrokerMetricsDTO>> getBrokersMetrics(String clusterNa
.operationParams(Map.of("id", id))
.build();

return accessControlService.validateAccess(context)
return validateAccess(context)
.then(
brokerService.getBrokerMetrics(getCluster(clusterName), id)
.map(clusterMapper::toBrokerMetrics)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build())
)
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}

@Override
Expand All @@ -80,10 +75,10 @@ public Mono<ResponseEntity<Flux<BrokersLogdirsDTO>>> getAllBrokersLogdirs(String
.operationParams(Map.of("brokerIds", brokerIds))
.build();

return accessControlService.validateAccess(context)
return validateAccess(context)
.thenReturn(ResponseEntity.ok(
brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokerIds)))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}

@Override
Expand All @@ -97,11 +92,11 @@ public Mono<ResponseEntity<Flux<BrokerConfigDTO>>> getBrokerConfig(String cluste
.operationParams(Map.of("brokerId", id))
.build();

return accessControlService.validateAccess(context).thenReturn(
return validateAccess(context).thenReturn(
ResponseEntity.ok(
brokerService.getBrokerConfig(getCluster(clusterName), id)
.map(clusterMapper::toBrokerConfig))
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}

@Override
Expand All @@ -116,11 +111,11 @@ public Mono<ResponseEntity<Void>> updateBrokerTopicPartitionLogDir(String cluste
.operationParams(Map.of("brokerId", id))
.build();

return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
brokerLogdir
.flatMap(bld -> brokerService.updateBrokerLogDir(getCluster(clusterName), id, bld))
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}

@Override
Expand All @@ -136,11 +131,11 @@ public Mono<ResponseEntity<Void>> updateBrokerConfigByName(String clusterName,
.operationParams(Map.of("brokerId", id))
.build();

return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
brokerConfig
.flatMap(bci -> brokerService.updateBrokerConfigByName(
getCluster(clusterName), id, name, bci.getValue()))
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
}
Loading