Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Sonarcloud, tests, documentation #29

Merged
merged 9 commits into from
Oct 18, 2024
20 changes: 19 additions & 1 deletion .github/workflows/checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,15 @@ jobs:
java-version: "21"
distribution: "temurin"
server-id: github
- name: Cache Maven packages
uses: actions/cache@v4
with:
path: ~/.m2
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-m2
- name: Maven Verify
run: |
mvn --batch-mode clean install -s settings.xml
mvn --batch-mode clean verify -s settings.xml
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

Expand All @@ -61,6 +67,18 @@ jobs:
java-version: "17"
distribution: "temurin"
server-id: github
- name: Cache SonarCloud packages
uses: actions/cache@v4
with:
path: ~/.sonar/cache
key: ${{ runner.os }}-sonar
restore-keys: ${{ runner.os }}-sonar
- name: Cache Maven packages
uses: actions/cache@v4
with:
path: ~/.m2
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-m2
- name: Maven Test Coverage
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
Expand Down
19 changes: 19 additions & 0 deletions nifi-tdf-controller-services-api-nar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,25 @@
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.12</version>
<executions>
<execution>
<id>default-prepare-agent</id>
<phase>none</phase>
</execution>
<execution>
<id>default-report</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.nifi</groupId>
Expand Down
33 changes: 0 additions & 33 deletions nifi-tdf-processors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,37 +97,4 @@
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<executions>
<execution>
<id>prepare-agent</id>
<goals>
<goal>prepare-agent</goal>
</goals>
<configuration>
<destFile>${project.parent.basedir}/target/jacoco.exec</destFile>
</configuration>
</execution>
<execution>
<id>report</id>
<phase>test</phase>
<goals>
<goal>report</goal>
</goals>
<configuration>
<dataFile>${project.parent.basedir}/target/jacoco.exec</dataFile>
<formats>
<format>XML</format>
</formats>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
*/
public abstract class AbstractTDFProcessor extends AbstractProcessor {

/**
* Configuration property representing the limit on the number of FlowFiles
* that can be pulled from the FlowFile queue at a time.
* It supports expression language through the variable registry and has a default value of 10.
*/
public static final PropertyDescriptor FLOWFILE_PULL_SIZE = new org.apache.nifi.components.PropertyDescriptor.Builder()
.name("FlowFile queue pull limit")
.description("FlowFile queue pull size limit")
Expand All @@ -37,55 +42,92 @@ public abstract class AbstractTDFProcessor extends AbstractProcessor {
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.build();

/**
* Property descriptor representing an optional SSL Context Service.
* This descriptor defines a property that can be used to configure
* an SSLContextService, which is optional for the processor. This
* service provides the SSL/TLS context needed for secure communication.
*/
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new org.apache.nifi.components.PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("Optional SSL Context Service")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();

/**
* Represents a property descriptor for the OpenTDF Config Service.
* <p>
* This descriptor specifies that the property is required and identifies
* a controller service of type {@link OpenTDFControllerService}. The controller service
* provides the necessary configuration for the OpenTDF platform.
*/
public static final PropertyDescriptor OPENTDF_CONFIG_SERVICE = new org.apache.nifi.components.PropertyDescriptor.Builder()
.name("OpenTDF Config Service")
.description("Controller Service providing OpenTDF Platform Configuration")
.required(true)
.identifiesControllerService(OpenTDFControllerService.class)
.build();

/**
* Defines a successful relationship for the NiFi processor. This relationship is used to route flow files
* that have been successfully processed. Flow files sent to this relationship indicate that the processor
* completed its intended action without errors.
* <p>
* This relationship is commonly used as an output route for data that has passed all validation, transformation,
* and processing steps.
*/
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("")
.build();

/**
* Relationship representing a failure in processing flow files.
* <p>
* This relationship should be used to route flow files that could not
* be processed successfully by the processor. The reasons for failure
* can vary widely and may include issues like invalid data, processing
* errors, or configuration issues.
*/
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("")
.build();

/**
* Get a property value by evaluating attribute expressions if present.
* Evaluates the provided PropertyValue if expression language is present,
* otherwise returns the original PropertyValue.
*
* @param propertyValue
* @return
* @param propertyValue The PropertyValue to evaluate or return.
* @return The evaluated PropertyValue if expression language is present,
* otherwise the original PropertyValue.
*/
PropertyValue getPropertyValue(PropertyValue propertyValue) {
return propertyValue.isExpressionLanguagePresent() ? propertyValue.evaluateAttributeExpressions() : propertyValue;
}

Optional<PropertyValue> getPropertyValue(PropertyDescriptor propertyDescriptor, ProcessContext processContext) {
/**
* Retrieves the value of the specified property from the given process context.
*
* @param processContext The context from which to retrieve the property value.
* @return An Optional containing the PropertyValue if it is set, or an empty Optional otherwise.
*/
Optional<PropertyValue> getPropertyValue(ProcessContext processContext) {
PropertyValue propertyValue = null;
if(processContext.getProperty(propertyDescriptor).isSet()){
propertyValue = getPropertyValue(processContext.getProperty(propertyDescriptor));
if(processContext.getProperty(ConvertToZTDF.SIGN_ASSERTIONS).isSet()){
propertyValue = getPropertyValue(processContext.getProperty(ConvertToZTDF.SIGN_ASSERTIONS));
}
return Optional.ofNullable(propertyValue);
}

private SDK sdk;

/**
* Create a new TDF SDK using the OpenTDFController Service as a source of configuration
* Retrieves an instance of the TDF SDK, initializing it if it is not already created.
*
* @param processContext
* @return
* @param processContext the NiFi ProcessContext providing necessary configuration and controller services.
* @return an instance of the initialized SDK.
*/
SDK getTDFSDK(ProcessContext processContext) {
if (sdk == null) {
Expand Down Expand Up @@ -159,17 +201,31 @@ public void onTrigger(ProcessContext processContext, ProcessSession processSessi
*/
abstract void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List<FlowFile> flowFiles) throws ProcessException;

/**
* Creates and returns a new instance of TDF.
*
* @return A new instance of TDF.
*/
TDF getTDF() {
return new TDF();
}

/**
* Creates and returns a new instance of NanoTDF.
*
* @return A new instance of NanoTDF.
*/
NanoTDF getNanoTDF(){
return new NanoTDF();
}

/**
* Retrieves the list of property descriptors that are supported by this processor.
*
* @return A list containing the supported property descriptors.
*/
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.unmodifiableList(Arrays.asList(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE));
return List.of(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,37 +34,52 @@ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.unmodifiableList(Arrays.asList(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE, KAS_URL));
}

/**{
* Get the kas urls from a flowfile attribute or if none present fallback to processor configuration KAS URL;
* format is a comma separated list
* @param flowFile
* @param processContext
* @return
* @throws Exception
/**
* Retrieves a list of KAS (Key Access Service) URLs either from the flow file attributes or from the process context.
* If the KAS URL is not provided through the flow file attribute and is not set in the process context, an exception is thrown.
*
* @param flowFile the flow file from which KAS URL attributes are retrieved.
* @param processContext the process context to get the default KAS URL if not available in the flow file.
* @return a list of KAS URLs.
* @throws Exception if no KAS URL is provided via the flow file or the default in the process context, or if the KAS URLs provided are empty.
*/
List<String> getKasUrl(FlowFile flowFile, ProcessContext processContext) throws Exception{
List<String> getKasUrl(FlowFile flowFile, ProcessContext processContext) throws Exception {
String kasUrlAttribute = flowFile.getAttribute(KAS_URL_ATTRIBUTE);
//check kas url
// Check kas url
if (!processContext.getProperty(KAS_URL).isSet() && kasUrlAttribute == null) {
throw new Exception("no " + KAS_URL_ATTRIBUTE + " flowfile attribute and no default KAS URL configured");
}
String kasUrlValues = kasUrlAttribute != null ? kasUrlAttribute : getPropertyValue(processContext.getProperty(KAS_URL)).getValue();
List<String> kasUrls = Arrays.stream(kasUrlValues.split(",")).filter(x->!x.isEmpty()).collect(Collectors.toList());
if (kasUrlValues.isEmpty()){
List<String> kasUrls = Arrays.stream(kasUrlValues.split(","))
.filter(x -> !x.isEmpty())
.toList(); // Use Stream.toList() for an unmodifiable list
if (kasUrlValues.isEmpty()) {
throw new Exception("no KAS Urls provided");
}
return kasUrls;
}

/**
* Converts a list of KAS (Key Access Service) URLs into a list of Config.KASInfo objects.
*
* @param kasUrls a list of strings representing the KAS URLs
* @return a list of Config.KASInfo objects with each object's URL field set to the corresponding string from the input list
*/
List<Config.KASInfo> getKASInfoFromKASURLs(List<String> kasUrls){
return kasUrls.stream().map(x->{ var ki = new Config.KASInfo(); ki.URL=x; return ki;}).collect(Collectors.toList());
return kasUrls.stream().map(x -> {
var ki = new Config.KASInfo();
ki.URL = x;
return ki;
}).toList();
}

/**
* Get data attributes on a FlowFile from attribute value
* @param flowFile
* @return
* @throws Exception
* Extracts and returns a set of data attributes from the given FlowFile's attribute specified by TDF_ATTRIBUTE.
* The attributes are split by commas and filtered to remove empty strings.
*
* @param flowFile the FlowFile from which to retrieve the data attributes.
* @return a set of data attributes extracted from the given FlowFile.
* @throws Exception if no data attributes are provided via the TDF_ATTRIBUTE FlowFile attribute.
*/
Set<String> getDataAttributes(FlowFile flowFile) throws Exception{
Set<String> dataAttributes = Arrays.stream((flowFile.getAttribute(TDF_ATTRIBUTE) == null ? "" :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,27 @@
import java.nio.ByteBuffer;
import java.util.List;

/**
* A processor for decrypting NanoTDF flow file content using the OpenTDF framework.
* <p>
* This processor reads encrypted NanoTDF data from incoming flow files and decrypts
* it using the associated SDK. The decrypted content is then written back into the
* flow file and routed to the success relationship. If decryption fails, the flow file
* is routed to the failure relationship.
*/
@CapabilityDescription("Decrypts NanoTDF flow file content")
@Tags({"NanoTDF", "OpenTDF", "Decrypt", "Data Centric Security"})
public class ConvertFromNanoTDF extends AbstractTDFProcessor {

/**
* Processes the provided list of flow files by decrypting their content using the NanoTDF protocol.
* If decryption succeeds, the flow file is routed to the success relationship; otherwise, it is routed to the failure relationship.
*
* @param processContext the NiFi ProcessContext which provides configuration and controller services
* @param processSession the ProcessSession which provides mechanisms for reading, writing, transferring, and penalizing flow files
* @param flowFiles the list of FlowFile objects to be processed
* @throws ProcessException if any error occurs during the processing of flow files
*/
@Override
public void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List<FlowFile> flowFiles) throws ProcessException {
SDK sdk = getTDFSDK(processContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,46 @@
import java.util.ArrayList;
import java.util.List;


/**
* Converts and decrypts ZTDF (Zero Trust Data Format) flow file content.
* This class takes encrypted ZTDF content and decrypts it,
* transferring the decrypted data to a specified success relationship.
* If an error occurs during decryption, it transfers the flow file to a failure relationship.
* <p>
* This processor uses TDF (Trusted Data Format) SDK for decryption and
* requires configuration of assertion verification keys to verify
* the integrity and authenticity of the encrypted data.
* <p>
* It provides the primary method `processFlowFiles` which reads the encrypted
* content from incoming flow files, decrypts it, and writes the decrypted
* content back to the flow files.
* <p>
* The method `processFlowFiles` performs the following steps:
* 1. Retrieves the TDF SDK instance.
* 2. Reads the entire encrypted content of each flow file into an in-memory byte channel.
* 3. Uses TDF Reader to load and decrypt the content.
* 4. Writes the decrypted content back into the flow file and transfers it to the success relationship.
* 5. If any error occurs during the decryption process, logs the error and transfers the flow file to the failure relationship.
*/
@CapabilityDescription("Decrypts ZTDF flow file content")
@Tags({"ZTDF", "Zero Trust Data Format", "OpenTDF", "Decrypt", "Data Centric Security"})
public class ConvertFromZTDF extends AbstractTDFProcessor {


/**
* Processes a list of flow files by decrypting their content using the TDF (Trusted Data Format) SDK.
* For each flow file in the provided list, the following steps are executed:
* 1. Reads the entire encrypted content of the flow file into an in-memory byte channel.
* 2. Uses a TDF Reader to load and decrypt the content using the SDK.
* 3. Writes the decrypted content back to the flow file.
* 4. Transfers the successfully decrypted flow files to the success relationship.
* 5. In case of an error during decryption, logs the error and transfers the flow file to the failure relationship.
*
* @param processContext the NiFi ProcessContext providing configuration and controller services.
* @param processSession the NiFi ProcessSession used to read, write, and transfer flow files.
* @param flowFiles a list of flow files to be decrypted.
* @throws ProcessException if an error occurs during the decryption process.
*/
@Override
void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List<FlowFile> flowFiles) throws ProcessException {
SDK sdk = getTDFSDK(processContext);
Expand Down
Loading