Skip to content

Commit 51a2a7a

Browse files
committed
Implement PK Chunking
1 parent f3d38b2 commit 51a2a7a

16 files changed

+625
-16
lines changed

docs/Salesforce-batchsource.md

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,3 +113,99 @@ The Salesforce types will be automatically mapped to schema types as shown below
113113
| | textarea, phone, id, url, email, encryptedstring, | |
114114
| | datacategorygroupreference, location, address, anyType, json, complexvalue | |
115115

116+
**Enable PK Chunking:**
117+
Primary key (PK) Chunking splits query on large tables into chunks based on the record IDs, or primary keys, of the queried records.
118+
119+
Salesforce recommends that you enable PK chunking when querying tables with more than 10 million records or when a bulk query consistently times out.
120+
However, the effectiveness of PK chunking depends on the specifics of the query and the queried data.
121+
122+
For example, let’s say you enable PK chunking for the following query on an Account table with 10,000,000 records.
123+
124+
`SELECT Name FROM Account`
125+
126+
Assuming a chunk size of 250,000 the query is split into the following 40 queries. Each query is processed parallely
127+
128+
| Queries |
129+
| ------- |
130+
| SELECT Name FROM Account WHERE Id >= 001300000000000 AND Id < 00130000000132G |
131+
| SELECT Name FROM Account WHERE Id >= 00130000000132G AND Id < 00130000000264W |
132+
| SELECT Name FROM Account WHERE Id >= 00130000000264W AND Id < 00130000000396m |
133+
| ... |
134+
| SELECT Name FROM Account WHERE Id >= 00130000000euQ4 AND Id < 00130000000fxSK |
135+
136+
PK chunking works only with queries that don’t include `SELECT` clauses or conditions other than `WHERE`.
137+
138+
PK chunking only works with the following objects:
139+
140+
| Objects |
141+
| ------- |
142+
|Account|
143+
|AccountContactRelation|
144+
|AccountTeamMember|
145+
|AiVisitSummary|
146+
|Asset|
147+
|B2BMktActivity|
148+
|B2BMktProspect|
149+
|Campaign|
150+
|CampaignMember|
151+
|CandidateAnswer|
152+
|Case|
153+
|CaseArticle|
154+
|CaseComment|
155+
|Claim|
156+
|ClaimParticipant|
157+
|Contact|
158+
|ContractLineItem|
159+
|ConversationEntry|
160+
|CustomerProperty|
161+
|EinsteinAnswerFeedback|
162+
|EmailMessage|
163+
|EngagementScore|
164+
|Event|
165+
|EventRelation|
166+
|FeedItem|
167+
|Individual|
168+
|InsurancePolicy|
169+
|InsurancePolicyAsset|
170+
|InsurancePolicyParticipant|
171+
|Lead|
172+
|LeadInsight|
173+
|LiveChatTranscript|
174+
|LoginHistory|
175+
|LoyaltyLedger|
176+
|LoyaltyMemberCurrency|
177+
|LoyaltyMemberTier|
178+
|LoyaltyPartnerProduct|
179+
|LoyaltyProgramMember|
180+
|LoyaltyProgramPartner|
181+
|Note|
182+
|ObjectTerritory2Association|
183+
|Opportunity|
184+
|OpportunityContactRole|
185+
|OpportunityHistory|
186+
|OpportunityLineItem|
187+
|OpportunitySplit|
188+
|OpportunityTeamMember|
189+
|Pricebook2|
190+
|PricebookEntry|
191+
|Product2|
192+
|ProductConsumed|
193+
|ProductRequired|
194+
|QuickText|
195+
|Quote|
196+
|QuoteLineItem|
197+
|ReplyText|
198+
|ScoreIntelligence|
199+
|ServiceContract|
200+
|Task|
201+
|TermDocumentFrequency|
202+
|TransactionJournal|
203+
|User|
204+
|UserRole|
205+
|VoiceCall|
206+
|WorkOrder|
207+
|WorkOrderLineItem|
208+
209+
Support also includes custom objects, and any Sharing and History tables that support standard objects.
210+
211+
**Chunk Size:** Specify size of chunk. Maximum Size is 250,000. Default Size is 100,000.

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
3232
<!-- version properties -->
3333
<avro.version>1.7.7</avro.version>
34-
<cdap.version>6.2.0</cdap.version>
34+
<cdap.version>6.4.0-SNAPSHOT</cdap.version>
3535
<junit.version>4.12</junit.version>
3636
<commons.io.version>2.6</commons.io.version>
3737
<commons.lang.version>2.6</commons.lang.version>

src/main/java/io/cdap/plugin/salesforce/SObjectsDescribeResult.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,4 +221,8 @@ private static DescribeSObjectResult describe(PartnerConnection connection, Stri
221221
cache.put(name.toLowerCase(), describe);
222222
return describe;
223223
}
224+
225+
public static boolean isCustomObject(PartnerConnection connection, String name) throws ConnectionException {
226+
return describe(connection, name, new HashMap<>()).isCustom();
227+
}
224228
}

src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.io.InputStream;
3838
import java.io.SequenceInputStream;
3939
import java.util.ArrayList;
40+
import java.util.Arrays;
4041
import java.util.Collections;
4142
import java.util.HashMap;
4243
import java.util.List;
@@ -109,21 +110,59 @@ public static void closeJob(BulkConnection bulkConnection, String jobId) throws
109110
*
110111
* @param bulkConnection bulk connection instance
111112
* @param query a SOQL query
113+
* @param enablePKChunk enable PK Chunk
112114
* @return an array of batches
113115
* @throws AsyncApiException if there is an issue creating the job
114116
* @throws IOException failed to close the query
115117
*/
116-
public static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String query)
118+
public static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String query, boolean enablePKChunk)
117119
throws AsyncApiException, IOException {
118120

119121
SObjectDescriptor sObjectDescriptor = SObjectDescriptor.fromQuery(query);
120122
JobInfo job = createJob(bulkConnection, sObjectDescriptor.getName(), OperationEnum.query, null);
121-
123+
BatchInfo batchInfo;
122124
try (ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes())) {
123-
bulkConnection.createBatchFromStream(job, bout);
125+
batchInfo = bulkConnection.createBatchFromStream(job, bout);
124126
}
127+
return enablePKChunk ? waitForBatchChunks(bulkConnection, job.getId(), batchInfo.getId()) :
128+
bulkConnection.getBatchInfoList(job.getId()).getBatchInfo();
129+
}
130+
131+
/** When PK Chunk is enabled, wait for state of initial batch to be NotProcessed, in this case Salesforce API will
132+
* decide how many batches will be created
133+
* @param bulkConnection bulk connection instance
134+
* @param jobId a job id
135+
* @param initialBatchId a batch id
136+
* @return Array with Batches created by Salesforce API
137+
*
138+
* @throws AsyncApiException if there is an issue creating the job
139+
*/
140+
private static BatchInfo[] waitForBatchChunks(BulkConnection bulkConnection, String jobId, String initialBatchId)
141+
throws AsyncApiException {
142+
BatchInfo initialBatchInfo = null;
143+
for (int i = 0; i < GET_BATCH_RESULTS_TRIES; i++) {
144+
//check if the job is aborted
145+
if (bulkConnection.getJobStatus(jobId).getState() == JobStateEnum.Aborted) {
146+
LOG.info(String.format("Job with Id: '%s' is aborted", jobId));
147+
return new BatchInfo[0];
148+
}
149+
initialBatchInfo = bulkConnection.getBatchInfo(jobId, initialBatchId);
125150

126-
return bulkConnection.getBatchInfoList(job.getId()).getBatchInfo();
151+
if (initialBatchInfo.getState() == BatchStateEnum.NotProcessed) {
152+
BatchInfo[] result = bulkConnection.getBatchInfoList(jobId).getBatchInfo();
153+
return Arrays.stream(result).filter(batchInfo -> batchInfo.getState() != BatchStateEnum.NotProcessed)
154+
.toArray(BatchInfo[]::new);
155+
} else if (initialBatchInfo.getState() == BatchStateEnum.Failed) {
156+
throw new BulkAPIBatchException("Batch failed", initialBatchInfo);
157+
} else {
158+
try {
159+
Thread.sleep(GET_BATCH_RESULTS_SLEEP_MS);
160+
} catch (InterruptedException e) {
161+
throw new RuntimeException("Job is aborted", e);
162+
}
163+
}
164+
}
165+
throw new BulkAPIBatchException("Timeout waiting for batch results", initialBatchInfo);
127166
}
128167

129168
/**

src/main/java/io/cdap/plugin/salesforce/parser/SalesforceQueryParser.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,18 @@ public static boolean isRestrictedQuery(String query) {
6565
return visitor.visit(parser.statement());
6666
}
6767

68+
/**
69+
* Checks if query has restricted syntax that cannot be processed by Bulk API with PK Enable.
70+
*
71+
* @param query SOQL query
72+
* @return true if query has restricted syntax, false otherwise
73+
*/
74+
public static boolean isRestrictedPKQuery(String query) {
75+
SOQLParser parser = initParser(query);
76+
SalesforceQueryVisitor.RestrictedPKQueryVisitor visitor = new SalesforceQueryVisitor.RestrictedPKQueryVisitor();
77+
return visitor.visit(parser.statement());
78+
}
79+
6880
private static SOQLParser initParser(String query) {
6981
SOQLLexer lexer = new SOQLLexer(CharStreams.fromString(query));
7082
lexer.removeErrorListeners();

src/main/java/io/cdap/plugin/salesforce/parser/SalesforceQueryVisitor.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,4 +304,28 @@ protected Boolean defaultResult() {
304304
return Boolean.FALSE;
305305
}
306306
}
307+
308+
/**
309+
* Visits query fields that are restricted by Bulk API with PKChunk Enable.
310+
* Returns true if there is condition clause other than 'WHERE', false otherwise.
311+
* For example: SELECT Name FROM Opportunity LIMIT 10.
312+
*/
313+
public static class RestrictedPKQueryVisitor extends SOQLBaseVisitor<Boolean> {
314+
315+
@Override
316+
public Boolean visitStatement(SOQLParser.StatementContext ctx) {
317+
SOQLParser.FromStatementContext fromStatementContext = ctx.fromStatement();
318+
if (
319+
fromStatementContext.WITH() != null ||
320+
fromStatementContext.GROUP() != null ||
321+
fromStatementContext.ORDER() != null ||
322+
fromStatementContext.LIMIT() != null ||
323+
fromStatementContext.OFFSET() != null ||
324+
fromStatementContext.FOR() != null) {
325+
return true;
326+
}
327+
328+
return ctx.fieldList().accept(new RestrictedFieldVisitor());
329+
}
330+
}
307331
}

src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceInputFormat.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,16 @@ public List<InputSplit> getSplits(JobContext context) {
6464
List<String> queries = GSON.fromJson(configuration.get(SalesforceSourceConstants.CONFIG_QUERIES), QUERIES_TYPE);
6565
BulkConnection bulkConnection = getBulkConnection(configuration);
6666

67+
boolean enablePKChunk = configuration.getBoolean(SalesforceSourceConstants.CONFIG_PK_CHUNK_ENABLE, false);
68+
if (enablePKChunk) {
69+
int chunkSize = configuration.getInt(SalesforceSourceConstants.CONFIG_CHUNK_SIZE,
70+
SalesforceSourceConstants.DEFAULT_PK_CHUNK_SIZE);
71+
bulkConnection.addHeader(SalesforceSourceConstants.HEADER_ENABLE_PK_CHUNK,
72+
String.format(SalesforceSourceConstants.HEADER_VALUE_PK_CHUNK, chunkSize));
73+
}
74+
6775
return queries.parallelStream()
68-
.map(query -> getQuerySplits(query, bulkConnection))
76+
.map(query -> getQuerySplits(query, bulkConnection, enablePKChunk))
6977
.flatMap(Collection::stream)
7078
.collect(Collectors.toList());
7179
}
@@ -87,8 +95,8 @@ public RecordReader createRecordReader(InputSplit split, TaskAttemptContext cont
8795
return new SalesforceRecordReaderWrapper(sObjectName, sObjectNameField, getDelegateRecordReader(query, schema));
8896
}
8997

90-
private List<SalesforceSplit> getQuerySplits(String query, BulkConnection bulkConnection) {
91-
return Stream.of(getBatches(query, bulkConnection))
98+
private List<SalesforceSplit> getQuerySplits(String query, BulkConnection bulkConnection, boolean enablePKChunk) {
99+
return Stream.of(getBatches(query, bulkConnection, enablePKChunk))
92100
.map(batch -> new SalesforceSplit(batch.getJobId(), batch.getId(), query))
93101
.collect(Collectors.toList());
94102
}
@@ -116,15 +124,16 @@ private BulkConnection getBulkConnection(Configuration conf) {
116124
*
117125
* @param query SOQL query
118126
* @param bulkConnection bulk connection
127+
* @param enablePKChunk enable PK Chunking
119128
* @return array of batch info
120129
*/
121-
private BatchInfo[] getBatches(String query, BulkConnection bulkConnection) {
130+
private BatchInfo[] getBatches(String query, BulkConnection bulkConnection, boolean enablePKChunk) {
122131
try {
123132
if (!SalesforceQueryUtil.isQueryUnderLengthLimit(query)) {
124133
LOG.debug("Wide object query detected. Query length '{}'", query.length());
125134
query = SalesforceQueryUtil.createSObjectIdQuery(query);
126135
}
127-
BatchInfo[] batches = SalesforceBulkUtil.runBulkQuery(bulkConnection, query);
136+
BatchInfo[] batches = SalesforceBulkUtil.runBulkQuery(bulkConnection, query, enablePKChunk);
128137
LOG.debug("Number of batches received from Salesforce: '{}'", batches.length);
129138
return batches;
130139
} catch (AsyncApiException | IOException e) {

src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceInputFormatProvider.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,28 @@ public SalesforceInputFormatProvider(SalesforceBaseSourceConfig config,
6666
this.conf = configBuilder.build();
6767
}
6868

69+
public SalesforceInputFormatProvider(SalesforceSourceConfig config,
70+
List<String> queries,
71+
Map<String, String> schemas,
72+
@Nullable String sObjectNameField) {
73+
ImmutableMap.Builder<String, String> builder = new ImmutableMap.Builder<String, String>()
74+
.put(SalesforceConstants.CONFIG_USERNAME, config.getUsername())
75+
.put(SalesforceConstants.CONFIG_PASSWORD, config.getPassword())
76+
.put(SalesforceConstants.CONFIG_CONSUMER_KEY, config.getConsumerKey())
77+
.put(SalesforceConstants.CONFIG_CONSUMER_SECRET, config.getConsumerSecret())
78+
.put(SalesforceConstants.CONFIG_LOGIN_URL, config.getLoginUrl())
79+
.put(SalesforceSourceConstants.CONFIG_QUERIES, GSON.toJson(queries))
80+
.put(SalesforceSourceConstants.CONFIG_SCHEMAS, GSON.toJson(schemas))
81+
.put(SalesforceSourceConstants.CONFIG_PK_CHUNK_ENABLE, String.valueOf(config.getEnablePKChunk()))
82+
.put(SalesforceSourceConstants.CONFIG_CHUNK_SIZE, String.valueOf(config.getChunkSize()));
83+
84+
if (sObjectNameField != null) {
85+
builder.put(SalesforceSourceConstants.CONFIG_SOBJECT_NAME_FIELD, sObjectNameField);
86+
}
87+
88+
this.conf = builder.build();
89+
}
90+
6991
@Override
7092
public Map<String, String> getInputFormatConfiguration() {
7193
return conf;

0 commit comments

Comments
 (0)