Skip to content
This repository was archived by the owner on Dec 22, 2021. It is now read-only.

CDAP-15237 Enhance the streaming plugin to support CDC option #14

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions docs/CDCSalesforce-streamingsource.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# CDC Salesforce Streaming Source

Description
-----------
This plugin reads Change Data Capture (CDC) events from Salesforce.

All CDC source plugins are normally used in conjunction with CDC sink plugins.
CDC source produces messages in CDC format.

Properties
----------
**Client Id**: Client ID from the connected app.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was another PR in the salesforce project that renamed these (https://github.com/data-integrations/salesforce/blob/master/docs/Salesforce-batchsource.md). Let's use the same wording and descriptions.


**Client Secret**: Client Secret from the connected app.

**Username**: Username to use when connecting to Salesforce.

**Password**: Password to use when connecting to Salesforce.

**Login Url**: Salesforce login URL to authenticate against.
The default value is https://login.salesforce.com/services/oauth2/token.
This should be changed when running against the Salesforce sandbox.

**Tracking Objects**: Objects to read change events from (For example: Task for base object and Employee__c for custom) separated by ",".
If list is empty then subscription for all events will be used.

**Error Handling**: Possible values are: "Skip on error" or "Fail on error". These are strategies on handling records
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would a user ever want to skip a change event? Skipping an event can cause the application of future events to fail and kind of defeats the purpose of CDC.

which cannot be transformed. "Skip on error" - just skip, "Fail on error" - fails the pipeline if at least one erroneous
record is found.

Note: CDC must be enabled on the database for the source to read the change data.

Salesforce Change Data Capture
--------------------------
When something changes in object for which is enable 'Change notifications'. A Change Data Capture event, or change
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the first sentence

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this information seems like it should be moved from here to the Description section.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove "When something changes in object for which is enable 'Change notifications'."

event, is a notification that Salesforce sends when a change to a Salesforce record occurs as part of a create, update,
delete, or undelete operation. The notification includes all new and changed fields, and header fields that contain
information about the change. For example, header fields indicate the type of
change that triggered the event and the origin of the change. Change events support all custom objects and a subset of
standard objects. More information can be found in [official documentation](https://developer.salesforce.com/docs/atlas.en-us.change_data_capture.meta/change_data_capture/cdc_intro.htm).

### Enable Change Data Capture for objects
To enable Change Data Capture for objects in Salesforce you have to
[select Objects for Change Notifications](https://developer.salesforce.com/docs/atlas.en-us.change_data_capture.meta/change_data_capture/cdc_select_objects.htm)
Binary file added icons/CDCSalesforce-streamingsource.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
33 changes: 29 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@
<mssql.jdbc.version>6.1.0.jre7</mssql.jdbc.version>
<hadoop.version>2.8.0</hadoop.version>
<guava.version>13.0.1</guava.version>
<salesforce.api.version>46.0.0</salesforce.api.version>
<cometd.java.client.version>4.0.0</cometd.java.client.version>
<awaitility.version>3.1.6</awaitility.version>
<json.version>20180813</json.version>
<!-- inlined with twill's netty version-->
<netty.version>4.1.16.Final</netty.version>
<junit.version>4.11</junit.version>
Expand Down Expand Up @@ -514,6 +518,27 @@
</exclusion>
</exclusions>
</dependency>
<!-- Dependency for salesforce-->
<dependency>
<groupId>com.force.api</groupId>
<artifactId>force-wsc</artifactId>
<version>${salesforce.api.version}</version>
</dependency>
<dependency>
<groupId>com.force.api</groupId>
<artifactId>force-partner-api</artifactId>
<version>${salesforce.api.version}</version>
</dependency>
<dependency>
<groupId>org.cometd.java</groupId>
<artifactId>cometd-java-client</artifactId>
<version>${cometd.java.client.version}</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
</dependency>
<!-- Dependency for sql server ct-->
<dependency>
<groupId>io.cdap.plugin</groupId>
Expand Down Expand Up @@ -591,10 +616,10 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>3.1.6</version>
<scope>test</scope>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>${json.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>

Expand Down
51 changes: 51 additions & 0 deletions src/main/java/io/cdap/plugin/cdc/common/ErrorHandling.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright © 2019 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.cdc.common;

import java.util.Optional;
import java.util.stream.Stream;

/**
* Indicates error handling strategy which will be used during reading Salesforce records.
*/
public enum ErrorHandling {

SKIP("Skip on error"),
STOP("Stop on error");

private final String value;

ErrorHandling(String value) {
this.value = value;
}

public String getValue() {
return value;
}

/**
* Converts error handling string value into {@link ErrorHandling} enum.
*
* @param stringValue error handling string value
* @return error handling type in optional container
*/
public static Optional<ErrorHandling> fromValue(String stringValue) {
return Stream.of(values())
.filter(keyType -> keyType.value.equalsIgnoreCase(stringValue))
.findAny();
}
}
6 changes: 5 additions & 1 deletion src/main/java/io/cdap/plugin/cdc/common/Schemas.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ public static StructuredRecord toCDCRecord(StructuredRecord changeRecord) {
}

public static String getTableName(String namespacedTableName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's error prone to represent multiple things in a single string. How do we know everything that calls this passes in a properly formatted string?

There should be a NamespacedTable class that has a namespace field and a tableName field, and whatever first gets the namespaced table name should create that object, which should be used everywhere else.

This cleanup doesn't have to be done in this PR, but please open a JIRA to refactor.

return namespacedTableName.split("\\.")[1];
String[] parts = namespacedTableName.split("\\.");
if (parts.length == 1) {
return namespacedTableName;
}
return parts[1];
}

private static Schema enumWith(Class<? extends Enum<?>> enumClass) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright © 2019 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.cdc.source.salesforce;

import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.dataset.DatasetProperties;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.streaming.StreamingContext;
import io.cdap.cdap.etl.api.streaming.StreamingSource;
import io.cdap.plugin.cdc.common.Schemas;
import io.cdap.plugin.common.Constants;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Streaming source for reading from Salesforce CDC plugin.
*/
@Plugin(type = StreamingSource.PLUGIN_TYPE)
@Name("CDCSalesforce")
@Description("CDC Salesforce Streaming Source")
public class CDCSalesforce extends StreamingSource<StructuredRecord> {

private static final Logger LOG = LoggerFactory.getLogger(CDCSalesforce.class);
private final SalesforceConfig config;

public CDCSalesforce(SalesforceConfig config) {
this.config = config;
}

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
LOG.info("Creating connection with url '{}', username '{}', clientId '{}'",
config.getLoginUrl(), config.getUsername(), config.getClientId());
config.validate();

pipelineConfigurer.createDataset(config.referenceName, Constants.EXTERNAL_DATASET_TYPE,
DatasetProperties.EMPTY);
pipelineConfigurer.getStageConfigurer().setOutputSchema(Schemas.CHANGE_SCHEMA);
}

@Override
public JavaDStream<StructuredRecord> getStream(StreamingContext context) {
config.validate();

SalesforceReceiver salesforceReceiver
= new SalesforceReceiver(config.getAuthenticatorCredentials(), config.getObjects(), config.getErrorHandling());
return context.getSparkStreamingContext()
.receiverStream(salesforceReceiver)
.map(Schemas::toCDCRecord);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright © 2019 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.cdc.source.salesforce;

import com.sforce.ws.ConnectionException;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;
import io.cdap.cdap.etl.api.validation.InvalidStageException;
import io.cdap.plugin.cdc.common.CDCReferencePluginConfig;
import io.cdap.plugin.cdc.common.ErrorHandling;
import io.cdap.plugin.cdc.source.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.cdc.source.salesforce.util.SalesforceConnectionUtil;
import io.cdap.plugin.cdc.source.salesforce.util.SalesforceConstants;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;

/**
* Defines the {@link PluginConfig} for the {@link CDCSalesforce}.
*/
public class SalesforceConfig extends CDCReferencePluginConfig {
private static final String OBJECTS_SEPARATOR = ",";

@Name(SalesforceConstants.PROPERTY_CLIENT_ID)
@Description("Salesforce connected app's client ID")
@Macro
private String clientId;

@Name(SalesforceConstants.PROPERTY_CLIENT_SECRET)
@Description("Salesforce connected app's client secret key")
@Macro
private String clientSecret;

@Name(SalesforceConstants.PROPERTY_USERNAME)
@Description("Salesforce username")
@Macro
private String username;

@Name(SalesforceConstants.PROPERTY_PASSWORD)
@Description("Salesforce password")
@Macro
private String password;

@Name(SalesforceConstants.PROPERTY_LOGIN_URL)
@Description("Endpoint to authenticate to")
@Macro
private String loginUrl;

@Name(SalesforceConstants.PROPERTY_OBJECTS)
@Description("Tracking Objects")
@Macro
@Nullable
private String objects;

@Name(SalesforceConstants.PROPERTY_ERROR_HANDLING)
@Description("Strategy used to handle erroneous records. Acceptable values are Skip on error, Stop on error.\n" +
"Skip on error - ignores erroneous record.\n" +
"Stop on error - fails pipeline due to erroneous record.")
@Macro
private String errorHandling;

public SalesforceConfig() {
super("");
}

public SalesforceConfig(String referenceName, String clientId, String clientSecret,
String username, String password, String loginUrl, String objects, String errorHandling) {
super(referenceName);
this.clientId = clientId;
this.clientSecret = clientSecret;
this.username = username;
this.password = password;
this.loginUrl = loginUrl;
this.objects = objects;
this.errorHandling = errorHandling;
}

public String getClientId() {
return clientId;
}

public String getUsername() {
return username;
}

public String getPassword() {
return password;
}

public String getLoginUrl() {
return loginUrl;
}

public List<String> getObjects() {
if (objects == null || objects.isEmpty()) {
return Collections.emptyList();
}
return Arrays.asList(objects.split(OBJECTS_SEPARATOR));
}

public ErrorHandling getErrorHandling() {
return ErrorHandling.fromValue(errorHandling)
.orElseThrow(() -> new InvalidConfigPropertyException("Unsupported error handling value: " + errorHandling,
SalesforceConstants.PROPERTY_ERROR_HANDLING));
}

@Override
public void validate() {
validateConnection();
validateErrorHandling();
}

public AuthenticatorCredentials getAuthenticatorCredentials() {
return SalesforceConnectionUtil.getAuthenticatorCredentials(username, password, clientId, clientSecret, loginUrl);
}

private void validateConnection() {
if (containsMacro(SalesforceConstants.PROPERTY_CLIENT_ID)
|| containsMacro(SalesforceConstants.PROPERTY_CLIENT_SECRET)
|| containsMacro(SalesforceConstants.PROPERTY_USERNAME)
|| containsMacro(SalesforceConstants.PROPERTY_PASSWORD)
|| containsMacro(SalesforceConstants.PROPERTY_LOGIN_URL)) {
return;
}

try {
SalesforceConnectionUtil.getPartnerConnection(getAuthenticatorCredentials());
} catch (ConnectionException e) {
throw new InvalidStageException("Cannot connect to Salesforce API with credentials specified", e);
}
}

private void validateErrorHandling() {
if (containsMacro(SalesforceConstants.PROPERTY_ERROR_HANDLING)) {
return;
}

getErrorHandling();
}
}
Loading