Skip to content

Commit 79cc260

Browse files
committed
Update tests and property documentation
Modify test methods and assertions in ConvertToZTDFTest to use package-private visibility and improve assertion clarity. Enhance Javadocs for ConvertToZTDF, AbstractTDFProcessor, and related classes, including property descriptors and methods to improve code readability and maintainability.
1 parent bc1e319 commit 79cc260

13 files changed

+485
-65
lines changed

nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractTDFProcessor.java

Lines changed: 80 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@
2828
*/
2929
public abstract class AbstractTDFProcessor extends AbstractProcessor {
3030

31+
/**
32+
* Configuration property representing the limit on the number of FlowFiles
33+
* that can be pulled from the FlowFile queue at a time.
34+
* It supports expression language through the variable registry and has a default value of 10.
35+
*/
3136
public static final PropertyDescriptor FLOWFILE_PULL_SIZE = new org.apache.nifi.components.PropertyDescriptor.Builder()
3237
.name("FlowFile queue pull limit")
3338
.description("FlowFile queue pull size limit")
@@ -37,55 +42,92 @@ public abstract class AbstractTDFProcessor extends AbstractProcessor {
3742
.addValidator(StandardValidators.INTEGER_VALIDATOR)
3843
.build();
3944

45+
/**
46+
* Property descriptor representing an optional SSL Context Service.
47+
* This descriptor defines a property that can be used to configure
48+
* an SSLContextService, which is optional for the processor. This
49+
* service provides the SSL/TLS context needed for secure communication.
50+
*/
4051
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new org.apache.nifi.components.PropertyDescriptor.Builder()
4152
.name("SSL Context Service")
4253
.description("Optional SSL Context Service")
4354
.required(false)
4455
.identifiesControllerService(SSLContextService.class)
4556
.build();
4657

58+
/**
59+
* Represents a property descriptor for the OpenTDF Config Service.
60+
* <p>
61+
* This descriptor specifies that the property is required and identifies
62+
* a controller service of type {@link OpenTDFControllerService}. The controller service
63+
* provides the necessary configuration for the OpenTDF platform.
64+
*/
4765
public static final PropertyDescriptor OPENTDF_CONFIG_SERVICE = new org.apache.nifi.components.PropertyDescriptor.Builder()
4866
.name("OpenTDF Config Service")
4967
.description("Controller Service providing OpenTDF Platform Configuration")
5068
.required(true)
5169
.identifiesControllerService(OpenTDFControllerService.class)
5270
.build();
5371

72+
/**
73+
* Defines a successful relationship for the NiFi processor. This relationship is used to route flow files
74+
* that have been successfully processed. Flow files sent to this relationship indicate that the processor
75+
* completed its intended action without errors.
76+
* <p>
77+
* This relationship is commonly used as an output route for data that has passed all validation, transformation,
78+
* and processing steps.
79+
*/
5480
public static final Relationship REL_SUCCESS = new Relationship.Builder()
5581
.name("success")
5682
.description("")
5783
.build();
5884

85+
/**
86+
* Relationship representing a failure in processing flow files.
87+
* <p>
88+
* This relationship should be used to route flow files that could not
89+
* be processed successfully by the processor. The reasons for failure
90+
* can vary widely and may include issues like invalid data, processing
91+
* errors, or configuration issues.
92+
*/
5993
public static final Relationship REL_FAILURE = new Relationship.Builder()
6094
.name("failure")
6195
.description("")
6296
.build();
6397

6498
/**
65-
* Get a property value by evaluating attribute expressions if present.
99+
* Evaluates the provided PropertyValue if expression language is present,
100+
* otherwise returns the original PropertyValue.
66101
*
67-
* @param propertyValue
68-
* @return
102+
* @param propertyValue The PropertyValue to evaluate or return.
103+
* @return The evaluated PropertyValue if expression language is present,
104+
* otherwise the original PropertyValue.
69105
*/
70106
PropertyValue getPropertyValue(PropertyValue propertyValue) {
71107
return propertyValue.isExpressionLanguagePresent() ? propertyValue.evaluateAttributeExpressions() : propertyValue;
72108
}
73109

74-
Optional<PropertyValue> getPropertyValue(PropertyDescriptor propertyDescriptor, ProcessContext processContext) {
110+
/**
111+
* Retrieves the value of the specified property from the given process context.
112+
*
113+
* @param processContext The context from which to retrieve the property value.
114+
* @return An Optional containing the PropertyValue if it is set, or an empty Optional otherwise.
115+
*/
116+
Optional<PropertyValue> getPropertyValue(ProcessContext processContext) {
75117
PropertyValue propertyValue = null;
76-
if(processContext.getProperty(propertyDescriptor).isSet()){
77-
propertyValue = getPropertyValue(processContext.getProperty(propertyDescriptor));
118+
if(processContext.getProperty(ConvertToZTDF.SIGN_ASSERTIONS).isSet()){
119+
propertyValue = getPropertyValue(processContext.getProperty(ConvertToZTDF.SIGN_ASSERTIONS));
78120
}
79121
return Optional.ofNullable(propertyValue);
80122
}
81123

82124
private SDK sdk;
83125

84126
/**
85-
* Create a new TDF SDK using the OpenTDFController Service as a source of configuration
127+
* Retrieves an instance of the TDF SDK, initializing it if it is not already created.
86128
*
87-
* @param processContext
88-
* @return
129+
* @param processContext the NiFi ProcessContext providing necessary configuration and controller services.
130+
* @return an instance of the initialized SDK.
89131
*/
90132
SDK getTDFSDK(ProcessContext processContext) {
91133
if (sdk == null) {
@@ -159,17 +201,45 @@ public void onTrigger(ProcessContext processContext, ProcessSession processSessi
159201
*/
160202
abstract void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List<FlowFile> flowFiles) throws ProcessException;
161203

204+
/**
205+
* Creates and returns a new instance of TDF.
206+
*
207+
* @return A new instance of TDF.
208+
*/
162209
TDF getTDF() {
163210
return new TDF();
164211
}
165212

213+
/**
214+
* Creates and returns a new instance of NanoTDF.
215+
*
216+
* @return A new instance of NanoTDF.
217+
*/
166218
NanoTDF getNanoTDF(){
167219
return new NanoTDF();
168220
}
169221

222+
/**
223+
* Retrieves the list of property descriptors that are supported by this processor.
224+
*
225+
* @return A list containing the supported property descriptors.
226+
*/
170227
@Override
171228
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
172-
return Collections.unmodifiableList(Arrays.asList(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE));
229+
return List.of(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE);
230+
}
231+
232+
@Override
233+
public boolean equals(Object o) {
234+
if (this == o) return true;
235+
if (o == null || getClass() != o.getClass()) return false;
236+
if (!super.equals(o)) return false;
237+
AbstractTDFProcessor that = (AbstractTDFProcessor) o;
238+
return Objects.equals(sdk, that.sdk);
173239
}
174240

241+
@Override
242+
public int hashCode() {
243+
return Objects.hash(super.hashCode(), sdk);
244+
}
175245
}

nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractToProcessor.java

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,37 +34,52 @@ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
3434
return Collections.unmodifiableList(Arrays.asList(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE, KAS_URL));
3535
}
3636

37-
/**{
38-
* Get the kas urls from a flowfile attribute or if none present fallback to processor configuration KAS URL;
39-
* format is a comma separated list
40-
* @param flowFile
41-
* @param processContext
42-
* @return
43-
* @throws Exception
37+
/**
38+
* Retrieves a list of KAS (Key Access Service) URLs either from the flow file attributes or from the process context.
39+
* If the KAS URL is not provided through the flow file attribute and is not set in the process context, an exception is thrown.
40+
*
41+
* @param flowFile the flow file from which KAS URL attributes are retrieved.
42+
* @param processContext the process context to get the default KAS URL if not available in the flow file.
43+
* @return a list of KAS URLs.
44+
* @throws Exception if no KAS URL is provided via the flow file or the default in the process context, or if the KAS URLs provided are empty.
4445
*/
45-
List<String> getKasUrl(FlowFile flowFile, ProcessContext processContext) throws Exception{
46+
List<String> getKasUrl(FlowFile flowFile, ProcessContext processContext) throws Exception {
4647
String kasUrlAttribute = flowFile.getAttribute(KAS_URL_ATTRIBUTE);
47-
//check kas url
48+
// Check kas url
4849
if (!processContext.getProperty(KAS_URL).isSet() && kasUrlAttribute == null) {
4950
throw new Exception("no " + KAS_URL_ATTRIBUTE + " flowfile attribute and no default KAS URL configured");
5051
}
5152
String kasUrlValues = kasUrlAttribute != null ? kasUrlAttribute : getPropertyValue(processContext.getProperty(KAS_URL)).getValue();
52-
List<String> kasUrls = Arrays.stream(kasUrlValues.split(",")).filter(x->!x.isEmpty()).collect(Collectors.toList());
53-
if (kasUrlValues.isEmpty()){
53+
List<String> kasUrls = Arrays.stream(kasUrlValues.split(","))
54+
.filter(x -> !x.isEmpty())
55+
.toList(); // Use Stream.toList() for an unmodifiable list
56+
if (kasUrlValues.isEmpty()) {
5457
throw new Exception("no KAS Urls provided");
5558
}
5659
return kasUrls;
5760
}
5861

62+
/**
63+
* Converts a list of KAS (Key Access Service) URLs into a list of Config.KASInfo objects.
64+
*
65+
* @param kasUrls a list of strings representing the KAS URLs
66+
* @return a list of Config.KASInfo objects with each object's URL field set to the corresponding string from the input list
67+
*/
5968
List<Config.KASInfo> getKASInfoFromKASURLs(List<String> kasUrls){
60-
return kasUrls.stream().map(x->{ var ki = new Config.KASInfo(); ki.URL=x; return ki;}).collect(Collectors.toList());
69+
return kasUrls.stream().map(x -> {
70+
var ki = new Config.KASInfo();
71+
ki.URL = x;
72+
return ki;
73+
}).toList();
6174
}
6275

6376
/**
64-
* Get data attributes on a FlowFile from attribute value
65-
* @param flowFile
66-
* @return
67-
* @throws Exception
77+
* Extracts and returns a set of data attributes from the given FlowFile's attribute specified by TDF_ATTRIBUTE.
78+
* The attributes are split by commas and filtered to remove empty strings.
79+
*
80+
* @param flowFile the FlowFile from which to retrieve the data attributes.
81+
* @return a set of data attributes extracted from the given FlowFile.
82+
* @throws Exception if no data attributes are provided via the TDF_ATTRIBUTE FlowFile attribute.
6883
*/
6984
Set<String> getDataAttributes(FlowFile flowFile) throws Exception{
7085
Set<String> dataAttributes = Arrays.stream((flowFile.getAttribute(TDF_ATTRIBUTE) == null ? "" :

nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromNanoTDF.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,27 @@
1212
import java.nio.ByteBuffer;
1313
import java.util.List;
1414

15+
/**
16+
* A processor for decrypting NanoTDF flow file content using the OpenTDF framework.
17+
* <p>
18+
* This processor reads encrypted NanoTDF data from incoming flow files and decrypts
19+
* it using the associated SDK. The decrypted content is then written back into the
20+
* flow file and routed to the success relationship. If decryption fails, the flow file
21+
* is routed to the failure relationship.
22+
*/
1523
@CapabilityDescription("Decrypts NanoTDF flow file content")
1624
@Tags({"NanoTDF", "OpenTDF", "Decrypt", "Data Centric Security"})
1725
public class ConvertFromNanoTDF extends AbstractTDFProcessor {
1826

27+
/**
28+
* Processes the provided list of flow files by decrypting their content using the NanoTDF protocol.
29+
* If decryption succeeds, the flow file is routed to the success relationship; otherwise, it is routed to the failure relationship.
30+
*
31+
* @param processContext the NiFi ProcessContext which provides configuration and controller services
32+
* @param processSession the ProcessSession which provides mechanisms for reading, writing, transferring, and penalizing flow files
33+
* @param flowFiles the list of FlowFile objects to be processed
34+
* @throws ProcessException if any error occurs during the processing of flow files
35+
*/
1936
@Override
2037
public void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List<FlowFile> flowFiles) throws ProcessException {
2138
SDK sdk = getTDFSDK(processContext);

nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromZTDF.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,46 @@
1616
import java.util.ArrayList;
1717
import java.util.List;
1818

19-
19+
/**
20+
* Converts and decrypts ZTDF (Zero Trust Data Format) flow file content.
21+
* This class takes encrypted ZTDF content and decrypts it,
22+
* transferring the decrypted data to a specified success relationship.
23+
* If an error occurs during decryption, it transfers the flow file to a failure relationship.
24+
* <p>
25+
* This processor uses TDF (Trusted Data Format) SDK for decryption and
26+
* requires configuration of assertion verification keys to verify
27+
* the integrity and authenticity of the encrypted data.
28+
* <p>
29+
* It provides the primary method `processFlowFiles` which reads the encrypted
30+
* content from incoming flow files, decrypts it, and writes the decrypted
31+
* content back to the flow files.
32+
* <p>
33+
* The method `processFlowFiles` performs the following steps:
34+
* 1. Retrieves the TDF SDK instance.
35+
* 2. Reads the entire encrypted content of each flow file into an in-memory byte channel.
36+
* 3. Uses TDF Reader to load and decrypt the content.
37+
* 4. Writes the decrypted content back into the flow file and transfers it to the success relationship.
38+
* 5. If any error occurs during the decryption process, logs the error and transfers the flow file to the failure relationship.
39+
*/
2040
@CapabilityDescription("Decrypts ZTDF flow file content")
2141
@Tags({"ZTDF", "Zero Trust Data Format", "OpenTDF", "Decrypt", "Data Centric Security"})
2242
public class ConvertFromZTDF extends AbstractTDFProcessor {
2343

2444

45+
/**
46+
* Processes a list of flow files by decrypting their content using the TDF (Trusted Data Format) SDK.
47+
* For each flow file in the provided list, the following steps are executed:
48+
* 1. Reads the entire encrypted content of the flow file into an in-memory byte channel.
49+
* 2. Uses a TDF Reader to load and decrypt the content using the SDK.
50+
* 3. Writes the decrypted content back to the flow file.
51+
* 4. Transfers the successfully decrypted flow files to the success relationship.
52+
* 5. In case of an error during decryption, logs the error and transfers the flow file to the failure relationship.
53+
*
54+
* @param processContext the NiFi ProcessContext providing configuration and controller services.
55+
* @param processSession the NiFi ProcessSession used to read, write, and transfer flow files.
56+
* @param flowFiles a list of flow files to be decrypted.
57+
* @throws ProcessException if an error occurs during the decryption process.
58+
*/
2559
@Override
2660
void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List<FlowFile> flowFiles) throws ProcessException {
2761
SDK sdk = getTDFSDK(processContext);

nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToNanoTDF.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,25 @@
2020
import java.util.List;
2121
import java.util.Set;
2222

23+
/**
24+
* Processor for converting the content of a FlowFile into a NanoTDF (Trusted Data Format).
25+
* <p>
26+
* This class extends AbstractToProcessor and handles the encryption of FlowFile content into NanoTDF,
27+
* applying specified KAS (Key Access Service) URLs and data attributes as defined in the flow file attributes
28+
* or processor properties.
29+
* <p>
30+
* Relationships:
31+
* - REL_SUCCESS: When the conversion to NanoTDF is successful.
32+
* - REL_FAILURE: When the conversion to NanoTDF fails.
33+
* - REL_FLOWFILE_EXCEEDS_NANO_SIZE: When the content size exceeds the maximum allowed size for NanoTDF.
34+
* <p>
35+
* Property Descriptors:
36+
* - Inherited from AbstractToProcessor (e.g., KAS URL, SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, etc.)
37+
* <p>
38+
* Reads Attributes:
39+
* - kas_url: The Key Access Server (KAS) URL used for TDF creation. Overrides the default KAS URL property.
40+
* - tdf_attribute: A comma-separated list of data attributes added to the created TDF Data Policy.
41+
*/
2342
@CapabilityDescription("Transforms flow file content into a NanoTDF")
2443
@Tags({"NanoTDF", "OpenTDF", "Encrypt", "Data Centric Security"})
2544
@ReadsAttributes(value = {
@@ -30,26 +49,52 @@
3049
})
3150
public class ConvertToNanoTDF extends AbstractToProcessor {
3251

52+
/**
53+
* Defines a relationship indicating that NanoTDF creation has failed
54+
* due to the content size exceeding the maximum allowed NanoTDF size threshold.
55+
*/
3356
public static final Relationship REL_FLOWFILE_EXCEEDS_NANO_SIZE = new Relationship.Builder()
3457
.name("exceeds_size_limit")
3558
.description("NanoTDF creation failed due to the content size exceeding the max NanoTDF size threshold")
3659
.build();
3760

61+
/**
62+
* Represents the maximum allowable size for processing a flow file in nano TDF conversion.
63+
* Value is set to 16 MB.
64+
*/
3865
static final long MAX_SIZE = 16777218;
3966

67+
/**
68+
* Retrieves all the relationships defined in the ConvertToNanoTDF processor.
69+
*
70+
* @return a Set of Relationship objects representing the different relationships for the processor.
71+
*/
4072
@Override
4173
public Set<Relationship> getRelationships() {
4274
return new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_FLOWFILE_EXCEEDS_NANO_SIZE));
4375
}
4476

4577

78+
/**
79+
* Processes a list of FlowFiles to convert them to NanoTDF format.
80+
* If a FlowFile's size exceeds the maximum allowed size, it is routed to a specific relationship.
81+
* Otherwise, it attempts to convert the FlowFile's content and transfer it to a success relationship.
82+
* In case of an error during processing, the FlowFile is routed to a failure relationship.
83+
*
84+
* @param processContext the NiFi ProcessContext providing necessary configuration and controller services.
85+
* @param processSession the NiFi ProcessSession representing a transaction context for the processing of FlowFiles.
86+
* @param flowFiles a list of FlowFiles to be processed.
87+
* @throws ProcessException if an error occurs during the processing of the FlowFiles.
88+
*/
4689
@Override
4790
public void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List<FlowFile> flowFiles) throws ProcessException {
4891
SDK sdk = getTDFSDK(processContext);
4992
for (final FlowFile flowFile : flowFiles) {
5093
try {
5194
var kasInfoList = getKASInfoFromKASURLs(getKasUrl(flowFile, processContext));
5295
Set<String> dataAttributes = getDataAttributes(flowFile);
96+
// Config.newNanoTDFConfig is correctly handling the varargs
97+
@SuppressWarnings("unchecked")
5398
Config.NanoTDFConfig config = Config.newNanoTDFConfig(
5499
Config.withNanoKasInformation(kasInfoList.toArray(new Config.KASInfo[0])),
55100
Config.witDataAttributes(dataAttributes.toArray(new String[0]))

0 commit comments

Comments
 (0)