diff --git a/docs/CDCSalesforce-streamingsource.md b/docs/CDCSalesforce-streamingsource.md
new file mode 100644
index 0000000..eafcfa3
--- /dev/null
+++ b/docs/CDCSalesforce-streamingsource.md
@@ -0,0 +1,44 @@
+# CDC Salesforce Streaming Source
+
+Description
+-----------
+This plugin reads Change Data Capture (CDC) events from Salesforce.
+
+All CDC source plugins are normally used in conjunction with CDC sink plugins.
+CDC source produces messages in CDC format.
+
+Properties
+----------
+**Client Id**: Client ID from the connected app.
+
+**Client Secret**: Client Secret from the connected app.
+
+**Username**: Username to use when connecting to Salesforce.
+
+**Password**: Password to use when connecting to Salesforce.
+
+**Login Url**: Salesforce login URL to authenticate against.
+The default value is https://login.salesforce.com/services/oauth2/token.
+This should be changed when running against the Salesforce sandbox.
+
+**Tracking Objects**: Objects to read change events from (For example: Task for base object and Employee__c for custom) separated by ",".
+If list is empty then subscription for all events will be used.
+
+**Error Handling**: Possible values are: "Skip on error" or "Fail on error". These are strategies on handling records
+which cannot be transformed. "Skip on error" - just skip, "Fail on error" - fails the pipeline if at least one erroneous
+record is found.
+
+Note: CDC must be enabled on the database for the source to read the change data.
+
+Salesforce Change Data Capture
+--------------------------
+When something changes in object for which is enable 'Change notifications'. A Change Data Capture event, or change
+event, is a notification that Salesforce sends when a change to a Salesforce record occurs as part of a create, update,
+delete, or undelete operation. The notification includes all new and changed fields, and header fields that contain
+information about the change. For example, header fields indicate the type of
+change that triggered the event and the origin of the change. Change events support all custom objects and a subset of
+standard objects. More information can be found in [official documentation](https://developer.salesforce.com/docs/atlas.en-us.change_data_capture.meta/change_data_capture/cdc_intro.htm).
+
+### Enable Change Data Capture for objects
+To enable Change Data Capture for objects in Salesforce you have to
+[select Objects for Change Notifications](https://developer.salesforce.com/docs/atlas.en-us.change_data_capture.meta/change_data_capture/cdc_select_objects.htm)
\ No newline at end of file
diff --git a/icons/CDCSalesforce-streamingsource.png b/icons/CDCSalesforce-streamingsource.png
new file mode 100644
index 0000000..8802058
Binary files /dev/null and b/icons/CDCSalesforce-streamingsource.png differ
diff --git a/pom.xml b/pom.xml
index 8cb6886..19fccc4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,6 +87,10 @@
6.1.0.jre7
2.8.0
13.0.1
+ 46.0.0
+ 4.0.0
+ 3.1.6
+ 20180813
4.1.16.Final
4.11
@@ -514,6 +518,27 @@
+
+
+ com.force.api
+ force-wsc
+ ${salesforce.api.version}
+
+
+ com.force.api
+ force-partner-api
+ ${salesforce.api.version}
+
+
+ org.cometd.java
+ cometd-java-client
+ ${cometd.java.client.version}
+
+
+ org.awaitility
+ awaitility
+ ${awaitility.version}
+
io.cdap.plugin
@@ -591,10 +616,10 @@
test
- org.awaitility
- awaitility
- 3.1.6
- test
+ org.json
+ json
+ ${json.version}
+ compile
diff --git a/src/main/java/io/cdap/plugin/cdc/common/ErrorHandling.java b/src/main/java/io/cdap/plugin/cdc/common/ErrorHandling.java
new file mode 100644
index 0000000..d5828e5
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/cdc/common/ErrorHandling.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.cdc.common;
+
+import java.util.Optional;
+import java.util.stream.Stream;
+
+/**
+ * Indicates error handling strategy which will be used during reading Salesforce records.
+ */
+public enum ErrorHandling {
+
+ SKIP("Skip on error"),
+ STOP("Stop on error");
+
+ private final String value;
+
+ ErrorHandling(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ /**
+ * Converts error handling string value into {@link ErrorHandling} enum.
+ *
+ * @param stringValue error handling string value
+ * @return error handling type in optional container
+ */
+ public static Optional fromValue(String stringValue) {
+ return Stream.of(values())
+ .filter(keyType -> keyType.value.equalsIgnoreCase(stringValue))
+ .findAny();
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/cdc/common/Schemas.java b/src/main/java/io/cdap/plugin/cdc/common/Schemas.java
index f8fcb41..169a5c8 100644
--- a/src/main/java/io/cdap/plugin/cdc/common/Schemas.java
+++ b/src/main/java/io/cdap/plugin/cdc/common/Schemas.java
@@ -81,7 +81,11 @@ public static StructuredRecord toCDCRecord(StructuredRecord changeRecord) {
}
public static String getTableName(String namespacedTableName) {
- return namespacedTableName.split("\\.")[1];
+ String[] parts = namespacedTableName.split("\\.");
+ if (parts.length == 1) {
+ return namespacedTableName;
+ }
+ return parts[1];
}
private static Schema enumWith(Class extends Enum>> enumClass) {
diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/CDCSalesforce.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/CDCSalesforce.java
new file mode 100644
index 0000000..3bb956c
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/CDCSalesforce.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.cdc.source.salesforce;
+
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.dataset.DatasetProperties;
+import io.cdap.cdap.etl.api.PipelineConfigurer;
+import io.cdap.cdap.etl.api.streaming.StreamingContext;
+import io.cdap.cdap.etl.api.streaming.StreamingSource;
+import io.cdap.plugin.cdc.common.Schemas;
+import io.cdap.plugin.common.Constants;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Streaming source for reading from Salesforce CDC plugin.
+ */
+@Plugin(type = StreamingSource.PLUGIN_TYPE)
+@Name("CDCSalesforce")
+@Description("CDC Salesforce Streaming Source")
+public class CDCSalesforce extends StreamingSource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CDCSalesforce.class);
+ private final SalesforceConfig config;
+
+ public CDCSalesforce(SalesforceConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
+ LOG.info("Creating connection with url '{}', username '{}', clientId '{}'",
+ config.getLoginUrl(), config.getUsername(), config.getClientId());
+ config.validate();
+
+ pipelineConfigurer.createDataset(config.referenceName, Constants.EXTERNAL_DATASET_TYPE,
+ DatasetProperties.EMPTY);
+ pipelineConfigurer.getStageConfigurer().setOutputSchema(Schemas.CHANGE_SCHEMA);
+ }
+
+ @Override
+ public JavaDStream getStream(StreamingContext context) {
+ config.validate();
+
+ SalesforceReceiver salesforceReceiver
+ = new SalesforceReceiver(config.getAuthenticatorCredentials(), config.getObjects(), config.getErrorHandling());
+ return context.getSparkStreamingContext()
+ .receiverStream(salesforceReceiver)
+ .map(Schemas::toCDCRecord);
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceConfig.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceConfig.java
new file mode 100644
index 0000000..0b1f99b
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceConfig.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.cdc.source.salesforce;
+
+import com.sforce.ws.ConnectionException;
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Macro;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;
+import io.cdap.cdap.etl.api.validation.InvalidStageException;
+import io.cdap.plugin.cdc.common.CDCReferencePluginConfig;
+import io.cdap.plugin.cdc.common.ErrorHandling;
+import io.cdap.plugin.cdc.source.salesforce.authenticator.AuthenticatorCredentials;
+import io.cdap.plugin.cdc.source.salesforce.util.SalesforceConnectionUtil;
+import io.cdap.plugin.cdc.source.salesforce.util.SalesforceConstants;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Defines the {@link PluginConfig} for the {@link CDCSalesforce}.
+ */
+public class SalesforceConfig extends CDCReferencePluginConfig {
+ private static final String OBJECTS_SEPARATOR = ",";
+
+ @Name(SalesforceConstants.PROPERTY_CLIENT_ID)
+ @Description("Salesforce connected app's client ID")
+ @Macro
+ private String clientId;
+
+ @Name(SalesforceConstants.PROPERTY_CLIENT_SECRET)
+ @Description("Salesforce connected app's client secret key")
+ @Macro
+ private String clientSecret;
+
+ @Name(SalesforceConstants.PROPERTY_USERNAME)
+ @Description("Salesforce username")
+ @Macro
+ private String username;
+
+ @Name(SalesforceConstants.PROPERTY_PASSWORD)
+ @Description("Salesforce password")
+ @Macro
+ private String password;
+
+ @Name(SalesforceConstants.PROPERTY_LOGIN_URL)
+ @Description("Endpoint to authenticate to")
+ @Macro
+ private String loginUrl;
+
+ @Name(SalesforceConstants.PROPERTY_OBJECTS)
+ @Description("Tracking Objects")
+ @Macro
+ @Nullable
+ private String objects;
+
+ @Name(SalesforceConstants.PROPERTY_ERROR_HANDLING)
+ @Description("Strategy used to handle erroneous records. Acceptable values are Skip on error, Stop on error.\n" +
+ "Skip on error - ignores erroneous record.\n" +
+ "Stop on error - fails pipeline due to erroneous record.")
+ @Macro
+ private String errorHandling;
+
+ public SalesforceConfig() {
+ super("");
+ }
+
+ public SalesforceConfig(String referenceName, String clientId, String clientSecret,
+ String username, String password, String loginUrl, String objects, String errorHandling) {
+ super(referenceName);
+ this.clientId = clientId;
+ this.clientSecret = clientSecret;
+ this.username = username;
+ this.password = password;
+ this.loginUrl = loginUrl;
+ this.objects = objects;
+ this.errorHandling = errorHandling;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public String getLoginUrl() {
+ return loginUrl;
+ }
+
+ public List getObjects() {
+ if (objects == null || objects.isEmpty()) {
+ return Collections.emptyList();
+ }
+ return Arrays.asList(objects.split(OBJECTS_SEPARATOR));
+ }
+
+ public ErrorHandling getErrorHandling() {
+ return ErrorHandling.fromValue(errorHandling)
+ .orElseThrow(() -> new InvalidConfigPropertyException("Unsupported error handling value: " + errorHandling,
+ SalesforceConstants.PROPERTY_ERROR_HANDLING));
+ }
+
+ @Override
+ public void validate() {
+ validateConnection();
+ validateErrorHandling();
+ }
+
+ public AuthenticatorCredentials getAuthenticatorCredentials() {
+ return SalesforceConnectionUtil.getAuthenticatorCredentials(username, password, clientId, clientSecret, loginUrl);
+ }
+
+ private void validateConnection() {
+ if (containsMacro(SalesforceConstants.PROPERTY_CLIENT_ID)
+ || containsMacro(SalesforceConstants.PROPERTY_CLIENT_SECRET)
+ || containsMacro(SalesforceConstants.PROPERTY_USERNAME)
+ || containsMacro(SalesforceConstants.PROPERTY_PASSWORD)
+ || containsMacro(SalesforceConstants.PROPERTY_LOGIN_URL)) {
+ return;
+ }
+
+ try {
+ SalesforceConnectionUtil.getPartnerConnection(getAuthenticatorCredentials());
+ } catch (ConnectionException e) {
+ throw new InvalidStageException("Cannot connect to Salesforce API with credentials specified", e);
+ }
+ }
+
+ private void validateErrorHandling() {
+ if (containsMacro(SalesforceConstants.PROPERTY_ERROR_HANDLING)) {
+ return;
+ }
+
+ getErrorHandling();
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceEventTopicListener.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceEventTopicListener.java
new file mode 100644
index 0000000..59072d1
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceEventTopicListener.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.cdc.source.salesforce;
+
+import io.cdap.plugin.cdc.source.salesforce.authenticator.AuthResponse;
+import io.cdap.plugin.cdc.source.salesforce.authenticator.Authenticator;
+import io.cdap.plugin.cdc.source.salesforce.authenticator.AuthenticatorCredentials;
+import io.cdap.plugin.cdc.source.salesforce.util.SalesforceConstants;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
+import org.cometd.bayeux.client.ClientSessionChannel;
+import org.cometd.client.BayeuxClient;
+import org.cometd.client.transport.LongPollingTransport;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.String.format;
+
+/**
+ * Listens to a specific Salesforce eventTopics and adds messages to the blocking queue,
+ * which can be read by a user of the class.
+ */
+public class SalesforceEventTopicListener {
+ private static final Logger LOG = LoggerFactory.getLogger(SalesforceEventTopicListener.class);
+
+ private static final String DEFAULT_EVENT_ENDPOINT = "/cometd/" + SalesforceConstants.API_VERSION;
+ /**
+ * Timeout of 110 seconds is enforced by Salesforce Streaming API and is not configurable.
+ * So we enforce the same on client.
+ */
+ private static final int CONNECTION_TIMEOUT = 110;
+ private static final long HANDSHAKE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(110);
+
+ private static final int HANDSHAKE_CHECK_INTERVAL_MS = 1000;
+
+ private static final String BASE_EVENT_TOPIC = "/data/ChangeEvents";
+ private static final String EVENT_TOPIC_PATTERN = "/data/%sChangeEvent";
+
+ // store message string not JSONObject, since it's not serializable for later Spark usage
+ private final BlockingQueue messagesQueue = new LinkedBlockingQueue<>();
+
+ private final AuthenticatorCredentials credentials;
+ private final List objectsForTracking;
+ private BayeuxClient bayeuxClient;
+
+ public SalesforceEventTopicListener(AuthenticatorCredentials credentials, List objectsForTracking) {
+ this.credentials = credentials;
+ this.objectsForTracking = new ArrayList<>(objectsForTracking);
+ }
+
+ /**
+ * Start the Bayeux Client which listens to the Salesforce EventTopic and saves received messages
+ * to the queue.
+ */
+ public void start() {
+ try {
+ bayeuxClient = getClient(credentials);
+ waitForHandshake(bayeuxClient);
+ LOG.debug("Client handshake done");
+
+ ClientSessionChannel.MessageListener messageListener = (channel, message) -> messagesQueue.add(message.getJSON());
+ if (objectsForTracking.isEmpty()) {
+ LOG.debug("Subscribe on '{}'", BASE_EVENT_TOPIC);
+ bayeuxClient.getChannel(BASE_EVENT_TOPIC)
+ .subscribe(messageListener);
+ } else {
+ for (String objectName : objectsForTracking) {
+ String topic = getObjectTopic(objectName);
+ LOG.debug("Subscribe on '{}'", topic);
+ bayeuxClient.getChannel(topic)
+ .subscribe(messageListener);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Could not start client", e);
+ }
+ }
+
+ /**
+ * Stop listening to the Salesforce EventTopic.
+ */
+ public void stop() {
+ if (bayeuxClient != null) {
+ bayeuxClient.disconnect(100);
+ }
+ }
+
+ /**
+ * Retrieves message from the messages queue, waiting up to the
+ * specified wait time if necessary for an element to become available.
+ *
+ * @param timeout how long to wait before giving up
+ * @param unit timeunit of timeout
+ * @return the message, or {@code null} if the specified
+ * waiting time elapses before an element is available
+ * @throws InterruptedException blocking call is interrupted
+ */
+ public String getMessage(long timeout, TimeUnit unit) throws InterruptedException {
+ return messagesQueue.poll(timeout, unit);
+ }
+
+ private String getObjectTopic(String objectName) {
+ String name = objectName.endsWith("__c") ? objectName.substring(0, objectName.length() - 1) : objectName;
+ return format(EVENT_TOPIC_PATTERN, name);
+ }
+
+ private BayeuxClient getClient(AuthenticatorCredentials credentials) throws Exception {
+ AuthResponse authResponse = Authenticator.oauthLogin(credentials);
+ String acessToken = authResponse.getAccessToken();
+ String instanceUrl = authResponse.getInstanceUrl();
+
+ SslContextFactory sslContextFactory = new SslContextFactory();
+
+ // Set up a Jetty HTTP client to use with CometD
+ HttpClient httpClient = new HttpClient(sslContextFactory);
+ httpClient.setConnectTimeout(CONNECTION_TIMEOUT);
+ httpClient.start();
+
+ Map options = new HashMap<>();
+ // Adds the OAuth header in LongPollingTransport
+ LongPollingTransport transport = new LongPollingTransport(options, httpClient) {
+ @Override
+ protected void customize(Request exchange) {
+ super.customize(exchange);
+ exchange.header("Authorization", "OAuth " + acessToken);
+ }
+ };
+
+ // Now set up the Bayeux client itself
+ BayeuxClient client = new BayeuxClient(instanceUrl + DEFAULT_EVENT_ENDPOINT, transport);
+ client.handshake();
+
+ return client;
+ }
+
+
+ private void waitForHandshake(BayeuxClient client) {
+ try {
+ Awaitility.await()
+ .atMost(HANDSHAKE_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+ .pollInterval(HANDSHAKE_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS)
+ .until(client::isHandshook);
+ } catch (ConditionTimeoutException e) {
+ throw new IllegalStateException("Client could not handshake with Salesforce server", e);
+ }
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceReceiver.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceReceiver.java
new file mode 100644
index 0000000..8e0e2e3
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/SalesforceReceiver.java
@@ -0,0 +1,250 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.cdc.source.salesforce;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.sforce.soap.partner.PartnerConnection;
+import com.sforce.soap.partner.QueryResult;
+import com.sforce.soap.partner.sobject.SObject;
+import com.sforce.ws.ConnectionException;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.plugin.cdc.common.ErrorHandling;
+import io.cdap.plugin.cdc.common.OperationType;
+import io.cdap.plugin.cdc.source.salesforce.authenticator.AuthenticatorCredentials;
+import io.cdap.plugin.cdc.source.salesforce.records.ChangeEventHeader;
+import io.cdap.plugin.cdc.source.salesforce.records.SalesforceRecord;
+import io.cdap.plugin.cdc.source.salesforce.sobject.SObjectDescriptor;
+import io.cdap.plugin.cdc.source.salesforce.sobject.SObjectsDescribeResult;
+import io.cdap.plugin.cdc.source.salesforce.util.SalesforceConnectionUtil;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Implementation of Spark receiver to receive Salesforce change events from EventTopic using Bayeux Client.
+ * Subscribes to all events if objectsForTracking is empty, otherwise subscribes to all topics in list.
+ * Produces DML structured records depending on change event type. Also produces DDL record if change event entity type
+ * is processed for the first time.
+ */
+public class SalesforceReceiver extends Receiver {
+ private static final Logger LOG = LoggerFactory.getLogger(SalesforceReceiver.class);
+ private static final String RECEIVER_THREAD_NAME = "salesforce_streaming_api_listener";
+ // every x seconds thread wakes up and checks if stream is not yet stopped
+ private static final long GET_MESSAGE_TIMEOUT_SECONDS = 2;
+ private static final Gson GSON = new Gson();
+
+ private final AuthenticatorCredentials credentials;
+ private final List objectsForTracking;
+ private final ErrorHandling errorHandling;
+ private final Map schemas = new HashMap<>();
+ private final Map> events = new HashMap<>();
+ private SalesforceEventTopicListener eventTopicListener;
+ private static final JsonParser JSON_PARSER = new JsonParser();
+
+ SalesforceReceiver(AuthenticatorCredentials credentials, List objectsForTracking,
+ ErrorHandling errorHandling) {
+ super(StorageLevel.MEMORY_AND_DISK_2());
+ this.credentials = credentials;
+ this.objectsForTracking = new ArrayList<>(objectsForTracking);
+ this.errorHandling = errorHandling;
+ }
+
+ @Override
+ public void onStart() {
+ eventTopicListener = new SalesforceEventTopicListener(credentials, objectsForTracking);
+ eventTopicListener.start();
+
+ ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat(RECEIVER_THREAD_NAME + "-%d")
+ .build();
+
+ Executors.newSingleThreadExecutor(namedThreadFactory).submit(this::receive);
+ }
+
+ @Override
+ public void onStop() {
+ // There is nothing we can do here as the thread calling receive()
+ // is designed to stop by itself if isStopped() returns false
+ }
+
+ private void receive() {
+ PartnerConnection connection;
+ try {
+ connection = SalesforceConnectionUtil.getPartnerConnection(credentials);
+ } catch (ConnectionException e) {
+ throw new RuntimeException("Failed to connect to Salesforce", e);
+ }
+
+ while (!isStopped()) {
+ try {
+ String message = eventTopicListener.getMessage(GET_MESSAGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ if (message != null) {
+ // whole message class is not needed because we are interested only in change event payload
+ JsonObject headerElement = JSON_PARSER.parse(message)
+ .getAsJsonObject()
+ .getAsJsonObject("data")
+ .getAsJsonObject("payload")
+ .getAsJsonObject("ChangeEventHeader");
+ ChangeEventHeader event = GSON.fromJson(headerElement, ChangeEventHeader.class);
+
+ List eventsList = events.getOrDefault(event.getTransactionKey(), new ArrayList<>());
+ eventsList.add(event);
+
+ if (event.isTransactionEnd()) {
+ processEvents(eventsList, connection);
+ events.remove(event.getTransactionKey());
+ } else {
+ events.put(event.getTransactionKey(), eventsList);
+ }
+ }
+ } catch (Exception e) {
+ switch (errorHandling) {
+ case SKIP:
+ LOG.warn("Failed to process message, skipping it.", e);
+ break;
+ case STOP:
+ throw new RuntimeException("Failed to process message", e);
+ default:
+ throw new IllegalStateException(String.format("Unknown error handling strategy '%s'", errorHandling));
+ }
+ }
+ }
+ eventTopicListener.stop();
+ }
+
+ private void processEvents(List events, PartnerConnection connection) throws ConnectionException {
+ for (ChangeEventHeader event : events) {
+ SObjectDescriptor descriptor = SObjectDescriptor.fromName(event.getEntityName(), connection);
+ SObjectsDescribeResult describeResult = new SObjectsDescribeResult(connection, descriptor.getAllParentObjects());
+
+ Schema schema = SalesforceRecord.getSchema(descriptor, describeResult);
+ updateSchemaIfNecessary(event.getEntityName(), schema);
+
+ if (getOperationType(event) != OperationType.DELETE) {
+ sendUpdateRecords(event, descriptor, schema, connection);
+ } else {
+ sendDeleteRecords(Arrays.asList(event.getRecordIds()), event.getEntityName(), schema);
+ }
+ }
+ }
+
+ private void updateSchemaIfNecessary(String entityName, Schema schema) {
+ Schema previousSchema = schemas.get(entityName);
+
+ if (!schema.equals(previousSchema)) {
+ StructuredRecord ddlRecord = SalesforceRecord.buildDDLStructuredRecord(entityName, schema);
+ schemas.put(entityName, schema);
+
+ LOG.debug("Sending ddl message for '{}'", entityName);
+ store(ddlRecord);
+ }
+ }
+
+ private void sendUpdateRecords(ChangeEventHeader event, SObjectDescriptor descriptor, Schema schema,
+ PartnerConnection connection) throws ConnectionException {
+ String query = getQuery(event, descriptor.getFieldsNames());
+ QueryResult queryResult = connection.query(query);
+
+ if (queryResult != null) {
+ if (queryResult.getRecords().length < event.getRecordIds().length && !isWildcardEvent(event)) {
+ List idsForDelete = findIdsMismatch(queryResult.getRecords(), event.getRecordIds());
+ sendDeleteRecords(idsForDelete, event.getEntityName(), schema);
+ }
+
+ for (SObject sObject : queryResult.getRecords()) {
+ StructuredRecord dmlRecord = SalesforceRecord
+ .buildDMLStructuredRecord(sObject.getId(), event.getEntityName(), schema, getOperationType(event), sObject);
+
+ LOG.debug("Sending dml message for '{}:{}'", event.getEntityName(), sObject.getId());
+ store(dmlRecord);
+ }
+ }
+ }
+
+ private List findIdsMismatch(SObject[] sObjectArray, String[] ids) {
+ Set idsFromQuery = Arrays.stream(sObjectArray)
+ .map(SObject::getId)
+ .collect(Collectors.toSet());
+
+ return Stream.of(ids)
+ .filter(id -> !idsFromQuery.contains(id))
+ .collect(Collectors.toList());
+ }
+
+ private void sendDeleteRecords(List ids, String entityName, Schema schema) {
+ for (String id : ids) {
+ StructuredRecord dmlRecord = SalesforceRecord
+ .buildDMLStructuredRecord(id, entityName, schema, OperationType.DELETE, null);
+
+ LOG.debug("Sending dml message for {}:{}", entityName, id);
+ store(dmlRecord);
+ }
+ }
+
+ private String getQuery(ChangeEventHeader event, List fields) {
+ String query = String.format("select %s from %s", String.join(",", fields), event.getEntityName());
+ if (isWildcardEvent(event)) {
+ return query;
+ } else {
+ String ids = Stream.of(event.getRecordIds())
+ .map(id -> String.format("'%s'", id))
+ .collect(Collectors.joining(","));
+ return String.format("%s where id in (%s)", query, ids);
+ }
+ }
+
+ private static boolean isWildcardEvent(ChangeEventHeader event) {
+ String[] ids = event.getRecordIds();
+ return ids.length == 0 || ids.length == 1 && ids[0].charAt(3) == '*';
+ }
+
+ private static OperationType getOperationType(ChangeEventHeader event) {
+ switch (event.getChangeType()) {
+ case CREATE:
+ case GAP_CREATE:
+ case UNDELETE:
+ case GAP_UNDELETE:
+ return OperationType.INSERT;
+ case UPDATE:
+ case GAP_UPDATE:
+ case GAP_OVERFLOW:
+ return OperationType.UPDATE;
+ case DELETE:
+ case GAP_DELETE:
+ return OperationType.DELETE;
+ }
+ throw new IllegalArgumentException(String.format("Unknown change operation '%s'", event.getChangeType()));
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/AuthResponse.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/AuthResponse.java
new file mode 100644
index 0000000..bf918d3
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/AuthResponse.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.cdc.source.salesforce.authenticator;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.util.Objects;
+
+/**
+ * Oauth2 response from salesforce server
+ */
+public class AuthResponse {
+ @SerializedName("access_token")
+ private final String accessToken;
+ @SerializedName("instance_url")
+ private final String instanceUrl;
+ private final String id;
+ @SerializedName("token_type")
+ private final String tokenType;
+ @SerializedName("issued_at")
+ private final String issuedAt;
+ private final String signature;
+
+ public AuthResponse(String accessToken, String instanceUrl, String id, String tokenType,
+ String issuedAt, String signature) {
+ this.accessToken = accessToken;
+ this.instanceUrl = instanceUrl;
+ this.id = id;
+ this.tokenType = tokenType;
+ this.issuedAt = issuedAt;
+ this.signature = signature;
+ }
+
+ public String getAccessToken() {
+ return accessToken;
+ }
+
+ public String getInstanceUrl() {
+ return instanceUrl;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public String getTokenType() {
+ return tokenType;
+ }
+
+ public String getIssuedAt() {
+ return issuedAt;
+ }
+
+ public String getSignature() {
+ return signature;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ AuthResponse that = (AuthResponse) o;
+
+ return (Objects.equals(accessToken, that.accessToken) &&
+ Objects.equals(instanceUrl, that.instanceUrl) &&
+ Objects.equals(id, that.id) &&
+ Objects.equals(tokenType, that.tokenType) &&
+ Objects.equals(issuedAt, that.issuedAt) &&
+ Objects.equals(signature, that.signature));
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(accessToken, instanceUrl, id, tokenType, issuedAt, signature);
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/Authenticator.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/Authenticator.java
new file mode 100644
index 0000000..1c66d78
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/Authenticator.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.cdc.source.salesforce.authenticator;
+
+import com.google.gson.Gson;
+import com.sforce.ws.ConnectorConfig;
+import io.cdap.plugin.cdc.source.salesforce.util.SalesforceConstants;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+
+/**
+ * Authentication to Salesforce via oauth2
+ */
+public class Authenticator {
+ private static final Gson GSON = new Gson();
+
+ /**
+ * Authenticates via oauth2 to salesforce and returns a connectorConfig
+ * which can be used by salesforce libraries to make a connection.
+ *
+ * @param credentials information to log in
+ * @return ConnectorConfig which can be used to create BulkConnection and PartnerConnection
+ */
+ public static ConnectorConfig createConnectorConfig(AuthenticatorCredentials credentials) {
+ try {
+ AuthResponse authResponse = oauthLogin(credentials);
+ ConnectorConfig connectorConfig = new ConnectorConfig();
+ connectorConfig.setSessionId(authResponse.getAccessToken());
+ String apiVersion = SalesforceConstants.API_VERSION;
+ String restEndpoint = String.format("%s/services/async/%s", authResponse.getInstanceUrl(), apiVersion);
+ String serviceEndPoint = String.format("%s/services/Soap/u/%s", authResponse.getInstanceUrl(), apiVersion);
+ connectorConfig.setRestEndpoint(restEndpoint);
+ connectorConfig.setServiceEndpoint(serviceEndPoint);
+ // This should only be false when doing debugging.
+ connectorConfig.setCompression(true);
+ // Set this to true to see HTTP requests and responses on stdout
+ connectorConfig.setTraceMessage(false);
+ return connectorConfig;
+ } catch (Exception e) {
+ throw new RuntimeException("Connection to Salesforce with plugin configurations failed", e);
+ }
+ }
+
+ /**
+ * Authenticate via oauth2 to salesforce and return response to auth request.
+ *
+ * @param credentials information to log in
+ * @return AuthResponse response to http request
+ */
+ public static AuthResponse oauthLogin(AuthenticatorCredentials credentials) throws Exception {
+ SslContextFactory sslContextFactory = new SslContextFactory();
+ HttpClient httpClient = new HttpClient(sslContextFactory);
+ try {
+ httpClient.start();
+ String response = httpClient.POST(credentials.getLoginUrl()).param("grant_type", "password")
+ .param("client_id", credentials.getClientId())
+ .param("client_secret", credentials.getClientSecret())
+ .param("username", credentials.getUsername())
+ .param("password", credentials.getPassword()).send().getContentAsString();
+ return GSON.fromJson(response, AuthResponse.class);
+ } finally {
+ httpClient.stop();
+ }
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/AuthenticatorCredentials.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/AuthenticatorCredentials.java
new file mode 100644
index 0000000..c6294c2
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/authenticator/AuthenticatorCredentials.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.cdc.source.salesforce.authenticator;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Stores information to connect to salesforce via oauth2
+ */
+public class AuthenticatorCredentials implements Serializable {
+ private final String username;
+ private final String password;
+ private final String clientId;
+ private final String clientSecret;
+ private final String loginUrl;
+
+ public AuthenticatorCredentials(String username, String password,
+ String clientId, String clientSecret, String loginUrl) {
+ this.username = username;
+ this.password = password;
+ this.clientId = clientId;
+ this.clientSecret = clientSecret;
+ this.loginUrl = loginUrl;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public String getClientSecret() {
+ return clientSecret;
+ }
+
+ public String getLoginUrl() {
+ return loginUrl;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ AuthenticatorCredentials that = (AuthenticatorCredentials) o;
+
+ return Objects.equals(username, that.username) &&
+ Objects.equals(password, that.password) &&
+ Objects.equals(clientId, that.clientId) &&
+ Objects.equals(clientSecret, that.clientSecret) &&
+ Objects.equals(loginUrl, that.loginUrl);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(username, password, clientId, clientSecret, loginUrl);
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventHeader.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventHeader.java
new file mode 100644
index 0000000..8e49bec
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventHeader.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.cdc.source.salesforce.records;
+
+import java.util.Arrays;
+
+/**
+ * Contains information about change event. Should be used instead of {@link com.sforce.soap.partner.ChangeEventHeader}
+ * because GSON does not support setters.
+ */
+public class ChangeEventHeader {
+ private String[] recordIds;
+ private String entityName;
+ private ChangeEventType changeType;
+ private String transactionKey;
+ private boolean isTransactionEnd;
+
+ public String[] getRecordIds() {
+ return recordIds;
+ }
+
+ public void setRecordIds(String[] recordIds) {
+ this.recordIds = recordIds.clone();
+ }
+
+ public String getEntityName() {
+ return entityName;
+ }
+
+ public void setEntityName(String entityName) {
+ this.entityName = entityName;
+ }
+
+ public ChangeEventType getChangeType() {
+ return changeType;
+ }
+
+ public void setChangeType(ChangeEventType changeType) {
+ this.changeType = changeType;
+ }
+
+ public String getTransactionKey() {
+ return transactionKey;
+ }
+
+ public void setTransactionKey(String transactionKey) {
+ this.transactionKey = transactionKey;
+ }
+
+ public boolean isTransactionEnd() {
+ return isTransactionEnd;
+ }
+
+ public void setTransactionEnd(boolean transactionEnd) {
+ isTransactionEnd = transactionEnd;
+ }
+
+ @Override
+ public String toString() {
+ return "ChangeEventHeader{" +
+ "recordIds=" + Arrays.toString(recordIds) +
+ ", entityName='" + entityName + '\'' +
+ ", changeType=" + changeType +
+ ", transactionKey='" + transactionKey + '\'' +
+ ", isTransactionEnd=" + isTransactionEnd +
+ '}';
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventType.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventType.java
new file mode 100644
index 0000000..d6421d1
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/ChangeEventType.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.cdc.source.salesforce.records;
+
+/**
+ * Contains Salesforce change event data types. Should be used instead of
+ * {@link com.sforce.soap.partner.ChangeEventType} because of GAP_OVERFLOW type.
+ */
+public enum ChangeEventType {
+ CREATE, DELETE, UNDELETE, UPDATE, GAP_CREATE, GAP_DELETE, GAP_UNDELETE, GAP_UPDATE, GAP_OVERFLOW
+}
diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/SalesforceRecord.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/SalesforceRecord.java
new file mode 100644
index 0000000..5b856b7
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/records/SalesforceRecord.java
@@ -0,0 +1,218 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.cdc.source.salesforce.records;
+
+import com.sforce.soap.partner.Field;
+import com.sforce.soap.partner.sobject.SObject;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.format.UnexpectedFormatException;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.plugin.cdc.common.OperationType;
+import io.cdap.plugin.cdc.common.Schemas;
+import io.cdap.plugin.cdc.source.salesforce.sobject.SObjectDescriptor;
+import io.cdap.plugin.cdc.source.salesforce.sobject.SObjectsDescribeResult;
+
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Converts salseforce data to cdap format
+ */
+public class SalesforceRecord {
+ private static final String PRIMARY_FIELD_KEY = "Id";
+
+ /**
+ * Builds structured record for DDL message
+ *
+ * @param entityName name of entity
+ * @param schema schema for entity
+ * @return structured record
+ */
+ public static StructuredRecord buildDDLStructuredRecord(String entityName, Schema schema) {
+ return StructuredRecord.builder(Schemas.DDL_SCHEMA)
+ .set(Schemas.TABLE_FIELD, entityName)
+ .set(Schemas.SCHEMA_FIELD, schema.toString())
+ .build();
+ }
+
+ /**
+ * Builds structured record for DML message
+ *
+ * @param id id of record
+ * @param entityName entity name of record
+ * @param schema schema for record
+ * @param operationType type of operation
+ * @param sObject Salesforce object
+ * @return structured record
+ */
+ public static StructuredRecord buildDMLStructuredRecord(String id, String entityName, Schema schema,
+ OperationType operationType, SObject sObject) {
+ return StructuredRecord.builder(Schemas.DML_SCHEMA)
+ .set(Schemas.TABLE_FIELD, entityName)
+ .set(Schemas.PRIMARY_KEYS_FIELD, Collections.singletonList(PRIMARY_FIELD_KEY))
+ .set(Schemas.OP_TYPE_FIELD, operationType.name())
+ .set(Schemas.UPDATE_SCHEMA_FIELD, schema.toString())
+ .set(Schemas.UPDATE_VALUES_FIELD, getChangeData(id, sObject, schema))
+ .build();
+ }
+
+ /**
+ * Builds schema from Salesforce object description
+ *
+ * @param sObjectDescriptor descriptor for Salesforce object
+ * @param describeResult JSON with change event
+ * @return structured record
+ */
+ public static Schema getSchema(SObjectDescriptor sObjectDescriptor, SObjectsDescribeResult describeResult) {
+ return Schema.recordOf(Schemas.SCHEMA_RECORD, getList(sObjectDescriptor, describeResult));
+ }
+
+ private static Map getChangeData(String id, SObject sObject, Schema changeSchema) {
+ Optional opSObject = Optional.ofNullable(sObject);
+
+ if (opSObject.isPresent()) {
+ Map changes = new HashMap<>();
+ for (Schema.Field field : Objects.requireNonNull(changeSchema.getFields())) {
+ changes.put(field.getName(), convertValue((String) sObject.getField(field.getName()), field));
+ }
+ return changes;
+ } else {
+ return Collections.singletonMap(PRIMARY_FIELD_KEY, id);
+ }
+ }
+
+ private static Object convertValue(String value, Schema.Field field) {
+ Schema fieldSchema = field.getSchema();
+
+ if (fieldSchema.getType() == Schema.Type.NULL) {
+ return null;
+ }
+
+ if (fieldSchema.isNullable()) {
+ if (value == null) {
+ return null;
+ }
+ fieldSchema = fieldSchema.getNonNullable();
+ }
+
+ Schema.Type fieldSchemaType = fieldSchema.getType();
+
+ if (value.isEmpty() && fieldSchemaType != Schema.Type.STRING) {
+ return null;
+ }
+
+ Schema.LogicalType logicalType = fieldSchema.getLogicalType();
+ if (fieldSchema.getLogicalType() != null) {
+ switch (logicalType) {
+ case DATE:
+ // date will be in yyyy-mm-dd format
+ return Math.toIntExact(LocalDate.parse(value).toEpochDay());
+ case TIMESTAMP_MILLIS:
+ return Instant.parse(value).toEpochMilli();
+ case TIMESTAMP_MICROS:
+ return TimeUnit.MILLISECONDS.toMicros(Instant.parse(value).toEpochMilli());
+ case TIME_MILLIS:
+ return Math.toIntExact(TimeUnit.NANOSECONDS.toMillis(LocalTime.parse(value).toNanoOfDay()));
+ case TIME_MICROS:
+ return TimeUnit.NANOSECONDS.toMicros(LocalTime.parse(value).toNanoOfDay());
+ default:
+ throw new UnexpectedFormatException(String.format("Field '%s' is of unsupported type '%s'",
+ field.getName(), logicalType.getToken()));
+ }
+ }
+
+ switch (fieldSchemaType) {
+ case BOOLEAN:
+ return Boolean.valueOf(value);
+ case INT:
+ return Integer.valueOf(value);
+ case LONG:
+ return Long.valueOf(value);
+ case FLOAT:
+ return Float.valueOf(value);
+ case DOUBLE:
+ return Double.valueOf(value);
+ case BYTES:
+ return Byte.valueOf(value);
+ case STRING:
+ return value;
+ }
+
+ throw new UnexpectedFormatException(
+ String.format("Unsupported schema type: '%s' for field: '%s'. Supported types are 'boolean, int, long, float, " +
+ "double, binary and string'.", field.getSchema(), field.getName()));
+ }
+
+ private static List getList(SObjectDescriptor sObjectDescriptor,
+ SObjectsDescribeResult describeResult) {
+ List schemaFields = new ArrayList<>();
+
+ for (SObjectDescriptor.FieldDescriptor fieldDescriptor : sObjectDescriptor.getFields()) {
+ String parent = fieldDescriptor.hasParents() ? fieldDescriptor.getLastParent() : sObjectDescriptor.getName();
+ Field field = describeResult.getField(parent, fieldDescriptor.getName());
+ if (field == null) {
+ throw new IllegalArgumentException(
+ String.format("Field '%s' is absent in Salesforce describe result", fieldDescriptor.getFullName()));
+ }
+ Schema.Field schemaField = Schema.Field.of(fieldDescriptor.getFullName(), getCdapSchemaField(field));
+ schemaFields.add(schemaField);
+ }
+
+ return schemaFields;
+ }
+
+ private static Schema getCdapSchemaField(Field field) {
+ Schema fieldSchema;
+ switch (field.getType()) {
+ case _boolean:
+ fieldSchema = Schema.of(Schema.Type.BOOLEAN);
+ break;
+ case _int:
+ fieldSchema = Schema.of(Schema.Type.INT);
+ break;
+ case _long:
+ fieldSchema = Schema.of(Schema.Type.LONG);
+ break;
+ case _double:
+ case currency:
+ case percent:
+ fieldSchema = Schema.of(Schema.Type.DOUBLE);
+ break;
+ case date:
+ fieldSchema = Schema.of(Schema.LogicalType.DATE);
+ break;
+ case datetime:
+ fieldSchema = Schema.of(Schema.LogicalType.TIMESTAMP_MILLIS);
+ break;
+ case time:
+ fieldSchema = Schema.of(Schema.LogicalType.TIME_MILLIS);
+ break;
+ default:
+ fieldSchema = Schema.of(Schema.Type.STRING);
+ }
+ return field.isNillable() ? Schema.nullableOf(fieldSchema) : fieldSchema;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/sobject/SObjectDescriptor.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/sobject/SObjectDescriptor.java
new file mode 100644
index 0000000..2fb1844
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/sobject/SObjectDescriptor.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.cdc.source.salesforce.sobject;
+
+import com.sforce.soap.partner.Field;
+import com.sforce.soap.partner.PartnerConnection;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Contains information about SObject, including its name and list of fields.
+ * Can be obtained from SObject name.
+ */
+public class SObjectDescriptor {
+
+ private final String name;
+ private final List fields;
+
+ /**
+ * Connects to Salesforce, gets describe result for the given sObject name and stores
+ * information about its fields into {@link SObjectDescriptor} class.
+ *
+ * @param name sObject name
+ * @param partnerConnection Salesforce connection
+ * @return sObject descriptor
+ */
+ public static SObjectDescriptor fromName(String name, PartnerConnection partnerConnection) {
+ SObjectsDescribeResult describeResult = new SObjectsDescribeResult(
+ partnerConnection, Collections.singletonList(name));
+ List fields = describeResult.getFields().stream()
+ .map(Field::getName)
+ .map(FieldDescriptor::new)
+ .collect(Collectors.toList());
+
+ return new SObjectDescriptor(name, fields);
+ }
+
+ public SObjectDescriptor(String name, List fields) {
+ this.name = name;
+ this.fields = new ArrayList<>(fields);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Collects sObject names needed to be described in order to obtains field type information.
+ *
+ * @return list of sObject names
+ */
+ public Set getAllParentObjects() {
+ Set parents = fields.stream()
+ .filter(FieldDescriptor::hasParents)
+ .map(FieldDescriptor::getLastParent)
+ .collect(Collectors.toSet());
+
+ // add top level sObject for fields that don't have parents
+ parents.add(name);
+
+ return parents;
+ }
+
+ /**
+ * Collects all field names, for fields with parents includes parents separated by dot.
+ *
+ * @return list of field names
+ */
+ public List getFieldsNames() {
+ return fields.stream()
+ .map(FieldDescriptor::getFullName)
+ .collect(Collectors.toList());
+ }
+
+ public List getFields() {
+ return fields;
+ }
+
+ @Override
+ public String toString() {
+ return "SObjectDescriptor{" + "name='" + name + '\'' + ", fields=" + fields + '}';
+ }
+
+ /**
+ * Contains information about field, including list of parents if present.
+ */
+ public static class FieldDescriptor {
+
+ private final String name;
+ private final List parents;
+
+ public FieldDescriptor(String name) {
+ this.name = name;
+ this.parents = new ArrayList<>();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Returns field name with parents connected by dots.
+ *
+ * @return full field name
+ */
+ public String getFullName() {
+ if (hasParents()) {
+ List nameParts = new ArrayList<>(parents);
+ nameParts.add(name);
+ return String.join(".", nameParts);
+ }
+ return name;
+ }
+
+ /**
+ * Checks if field has parents.
+ *
+ * @return true if field has at least one parent, false otherwise
+ */
+ public boolean hasParents() {
+ return !parents.isEmpty();
+ }
+
+ /**
+ * Return last parent of the field.
+ * Primary used to obtain describe result from Salesforce.
+ *
+ * @return last parent if field has parents, null otherwise
+ */
+ public String getLastParent() {
+ return hasParents() ? parents.get(parents.size() - 1) : null;
+ }
+
+ @Override
+ public String toString() {
+ return "FieldDescriptor{" + "name='" + name + '\'' + ", parents=" + parents + '}';
+ }
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/sobject/SObjectsDescribeResult.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/sobject/SObjectsDescribeResult.java
new file mode 100644
index 0000000..03298bb
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/sobject/SObjectsDescribeResult.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package io.cdap.plugin.cdc.source.salesforce.sobject;
+
+import com.google.common.collect.Lists;
+import com.sforce.soap.partner.DescribeSObjectResult;
+import com.sforce.soap.partner.Field;
+import com.sforce.soap.partner.PartnerConnection;
+import com.sforce.ws.ConnectionException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Retrieves {@link DescribeSObjectResult}s for the given sObjects
+ * and adds field information to the internal holder.
+ * This class will be used to populate {@link SObjectDescriptor} for queries by sObject
+ * or to generate CDAP schema based on Salesforce fields information.
+ */
+public class SObjectsDescribeResult {
+
+ // Salesforce limitation that we can describe only 100 sObjects at a time
+ private static final int DESCRIBE_SOBJECTS_LIMIT = 100;
+
+ // key -> [sObject name], value -> [key -> field name, value -> field]
+ private final Map> objectToFieldMap = new HashMap<>();
+
+ public SObjectsDescribeResult(PartnerConnection connection, Collection sObjects) {
+
+ // split the given sObjects into smaller partitions to ensure we don't exceed the limitation
+ Lists.partition(new ArrayList<>(sObjects), DESCRIBE_SOBJECTS_LIMIT).stream()
+ .map(partition -> {
+ try {
+ return connection.describeSObjects(partition.toArray(new String[0]));
+ } catch (ConnectionException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .flatMap(Arrays::stream)
+ .forEach(this::addSObjectDescribe);
+ }
+
+ /**
+ * Retrieves all stored fields.
+ *
+ * @return list of {@link Field}s
+ */
+ public List getFields() {
+ return objectToFieldMap.values().stream()
+ .map(Map::values)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Attempts to find {@link Field} by sObject name and field name.
+ *
+ * @param sObjectName sObject name
+ * @param fieldName field name
+ * @return field instance if found, null otherwise
+ */
+ public Field getField(String sObjectName, String fieldName) {
+ Map fields = objectToFieldMap.get(sObjectName.toLowerCase());
+ return fields == null ? null : fields.get(fieldName.toLowerCase());
+ }
+
+ private void addSObjectDescribe(DescribeSObjectResult sObjectDescribe) {
+ Map fields = Arrays.stream(sObjectDescribe.getFields())
+ .collect(Collectors.toMap(
+ field -> field.getName().toLowerCase(),
+ Function.identity(),
+ (o, n) -> n,
+ LinkedHashMap::new)); // preserve field order for queries by sObject
+
+ // sObjects names are case-insensitive
+ // store them in lower case to ensure we obtain them case-insensitively
+ objectToFieldMap.put(sObjectDescribe.getName().toLowerCase(), fields);
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/util/SalesforceConnectionUtil.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/util/SalesforceConnectionUtil.java
new file mode 100644
index 0000000..63ec836
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/util/SalesforceConnectionUtil.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.cdc.source.salesforce.util;
+
+import com.sforce.soap.partner.Connector;
+import com.sforce.soap.partner.PartnerConnection;
+import com.sforce.ws.ConnectionException;
+import com.sforce.ws.ConnectorConfig;
+import io.cdap.plugin.cdc.source.salesforce.authenticator.Authenticator;
+import io.cdap.plugin.cdc.source.salesforce.authenticator.AuthenticatorCredentials;
+
+/**
+ * Utility class which provides methods to establish connection with Salesforce.
+ */
+public class SalesforceConnectionUtil {
+
+ /**
+ * Based on given Salesforce credentials, attempt to establish {@link PartnerConnection}.
+ * This is mainly used to obtain sObject describe results.
+ *
+ * @param credentials Salesforce credentials
+ * @return partner connection instance
+ * @throws ConnectionException in case error when establishing connection
+ */
+ public static PartnerConnection getPartnerConnection(AuthenticatorCredentials credentials)
+ throws ConnectionException {
+ ConnectorConfig connectorConfig = Authenticator.createConnectorConfig(credentials);
+ return Connector.newConnection(connectorConfig);
+ }
+
+ /**
+ * Creates {@link AuthenticatorCredentials} instance based on given parameters.
+ *
+ * @param username Salesforce username
+ * @param password Salesforce password
+ * @param clientId Salesforce client id
+ * @param clientSecret Salesforce client secret
+ * @param loginUrl Salesforce authentication url
+ * @return authenticator credentials
+ */
+ public static AuthenticatorCredentials getAuthenticatorCredentials(String username, String password, String clientId,
+ String clientSecret, String loginUrl) {
+ return new AuthenticatorCredentials(username, password, clientId, clientSecret, loginUrl);
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/cdc/source/salesforce/util/SalesforceConstants.java b/src/main/java/io/cdap/plugin/cdc/source/salesforce/util/SalesforceConstants.java
new file mode 100644
index 0000000..0d86c60
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/cdc/source/salesforce/util/SalesforceConstants.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.cdc.source.salesforce.util;
+
+/**
+ * Constants related to Salesforce and configuration
+ */
+public class SalesforceConstants {
+
+ public static final String API_VERSION = "45.0";
+
+ public static final String PROPERTY_CLIENT_ID = "clientId";
+ public static final String PROPERTY_CLIENT_SECRET = "clientSecret";
+ public static final String PROPERTY_USERNAME = "username";
+ public static final String PROPERTY_PASSWORD = "password";
+ public static final String PROPERTY_LOGIN_URL = "loginUrl";
+ public static final String PROPERTY_OBJECTS = "objects";
+ public static final String PROPERTY_ERROR_HANDLING = "errorHandling";
+}
diff --git a/widgets/CDCDatabase-streamingsource.json b/widgets/CDCDatabase-streamingsource.json
index 4f0e3bf..8050875 100644
--- a/widgets/CDCDatabase-streamingsource.json
+++ b/widgets/CDCDatabase-streamingsource.json
@@ -46,7 +46,7 @@
"type": "record",
"fields": [
{
- "name": "cdcMessage",
+ "name": "cdc_msg",
"type": "bytes"
}
]
diff --git a/widgets/CDCSalesforce-streamingsource.json b/widgets/CDCSalesforce-streamingsource.json
new file mode 100644
index 0000000..7076ea0
--- /dev/null
+++ b/widgets/CDCSalesforce-streamingsource.json
@@ -0,0 +1,80 @@
+{
+ "metadata": {
+ "spec-version": "1.5"
+ },
+ "configuration-groups": [
+ {
+ "label": "Authentication",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Client Id",
+ "name": "clientId"
+ },
+ {
+ "widget-type": "password",
+ "label": "Client Secret",
+ "name": "clientSecret"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Username",
+ "name": "username"
+ },
+ {
+ "widget-type": "password",
+ "label": "Password",
+ "name": "password"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Login Url",
+ "name": "loginUrl",
+ "widget-attributes" : {
+ "default": "https://login.salesforce.com/services/oauth2/token"
+ }
+ }
+ ]
+ },
+ {
+ "label": "Advanced",
+ "properties": [
+ {
+ "widget-type": "dsv",
+ "label": "Tracking Objects",
+ "name": "objects",
+ "widget-attributes": {
+ "delimiter": ","
+ }
+ },
+ {
+ "widget-type": "select",
+ "label": "Error Handling",
+ "name": "errorHandling",
+ "widget-attributes": {
+ "values": [
+ "Skip on error",
+ "Stop on error"
+ ],
+ "default": "Skip on error"
+ }
+ }
+ ]
+ }
+ ],
+ "outputs": [
+ {
+ "widget-type": "non-editable-schema-editor",
+ "schema": {
+ "name": "CDCRecord",
+ "type": "record",
+ "fields": [
+ {
+ "name": "cdc_msg",
+ "type": "bytes"
+ }
+ ]
+ }
+ }
+ ]
+}
diff --git a/widgets/CTSQLServer-streamingsource.json b/widgets/CTSQLServer-streamingsource.json
index 17f708e..01e4f3e 100644
--- a/widgets/CTSQLServer-streamingsource.json
+++ b/widgets/CTSQLServer-streamingsource.json
@@ -56,7 +56,7 @@
"type": "record",
"fields": [
{
- "name": "cdcMessage",
+ "name": "cdc_msg",
"type": "bytes"
}
]