Skip to content

Commit d2cb6af

Browse files
authored
Merge pull request #14 from SAP/develop
Fix issues with migration resuming
2 parents 8fcdbe3 + 1c5eee0 commit d2cb6af

File tree

39 files changed

+313
-96
lines changed

39 files changed

+313
-96
lines changed

commercedbsync/resources/commercedbsync-beans.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
<property name="durationinseconds" type="double"/>
4040
<property name="copyMethod" type="String"/>
4141
<property name="keyColumns" type="java.util.List&lt;String&gt;"/>
42+
<property name="batchsize" type="int"/>
4243
</bean>
4344

4445
<bean class="com.sap.cx.boosters.commercedbsync.service.DatabaseCopyBatch">

commercedbsync/resources/commercedbsync-items.xml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,18 @@
100100
<persistence type="property"/>
101101
<modifiers optional="false"/>
102102
<defaultvalue>false</defaultvalue>
103-
</attribute>
103+
</attribute>
104+
<attribute qualifier="resumeMigration" type="boolean">
105+
<description>Resume a failed migration</description>
106+
<persistence type="property" />
107+
<modifiers optional="true"/>
108+
<defaultvalue>false</defaultvalue>
109+
</attribute>
110+
<attribute qualifier="migrationId" type="java.lang.String" >
111+
<description>Migration Id for the job</description>
112+
<persistence type="property" />
113+
<modifiers optional="true"/>
114+
</attribute>
104115
</attributes>
105116
</itemtype>
106117

commercedbsync/resources/commercedbsync-spring.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@
315315

316316
<bean id="fullMigrationJob" class="com.sap.cx.boosters.commercedbsync.jobs.FullMigrationJob"
317317
parent="abstractMigrationJobPerformable">
318+
<property name="modelService" ref="modelService"/>
318319
</bean>
319320

320321
<!-- <bean id="defaultCMTRemoveInterceptor"-->

commercedbsync/resources/localization/commercedbsync-locales_en.properties

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,9 @@ type.MigrationCronJob.maxWriterWorkers.description=Number of writer workers to b
3030

3131
type.MigrationCronJob.batchSize.name=Batch Size
3232
type.MigrationCronJob.batchSize.description=Batch size used to query data
33+
34+
type.FullMigrationCronJob.resumeMigration.name=Resume Migration
35+
type.FullMigrationCronJob.resumeMigration.description=
36+
37+
type.FullMigrationCronJob.migrationId.name=Migration ID
38+
type.FullMigrationCronJob.migrationId.description=

commercedbsync/resources/sql/createSchedulerTablesHANA.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYTASKS (
4444
copymethod NVARCHAR(255) NULL,
4545
keycolumns NVARCHAR(255) NULL,
4646
durationinseconds numeric(10,2) NULL DEFAULT 0,
47+
batchsize int NOT NULL DEFAULT 1000,
4748
PRIMARY KEY (migrationid, targetnodeid, pipelinename)
4849
);
4950

commercedbsync/resources/sql/createSchedulerTablesMSSQL.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYTASKS (
2121
copymethod NVARCHAR(255) NULL,
2222
keycolumns NVARCHAR(255) NULL,
2323
durationinseconds numeric(10,2) NULL DEFAULT 0,
24+
batchsize int NOT NULL DEFAULT 1000,
2425
PRIMARY KEY (migrationid, targetnodeid, pipelinename)
2526
);
2627

commercedbsync/resources/sql/createSchedulerTablesMYSQL.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYTASKS
2121
copymethod VARCHAR(255) NULL,
2222
keycolumns VARCHAR(255) NULL,
2323
durationinseconds numeric(10, 2) NULL DEFAULT 0,
24+
batchsize int NOT NULL DEFAULT 1000,
2425
PRIMARY KEY (migrationid, targetnodeid, pipelinename)
2526
);
2627
#

commercedbsync/resources/sql/createSchedulerTablesORACLE.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYTASKS (
2727
copymethod NVARCHAR2(255) NULL,
2828
keycolumns NVARCHAR2(255) NULL,
2929
durationinseconds number(10,2) DEFAULT 0 NULL,
30+
batchsize number(10) DEFAULT 1000 NOT NULL,
3031
PRIMARY KEY (migrationid, targetnodeid, pipelinename)
3132
)
3233
/

commercedbsync/resources/sql/createSchedulerTablesPOSTGRESQL.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYTASKS (
2222
copymethod VARCHAR(255) NULL,
2323
keycolumns VARCHAR(255) NULL,
2424
durationinseconds numeric(10,2) NULL DEFAULT 0,
25+
batchsize int NOT NULL DEFAULT 1000,
2526
PRIMARY KEY (migrationid, targetnodeid, pipelinename)
2627
);
2728

commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/impl/DefaultDataPipeFactory.java

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public DataPipe<DataSet> create(CopyContext context, CopyContext.DataCopyItem it
8484
try {
8585
pipe.put(MaybeFinished.poison());
8686
} catch (Exception p) {
87-
LOG.error("Cannot contaminate pipe ", p);
87+
LOG.error("Could not close contaminated pipe ", p);
8888
}
8989
if (e instanceof InterruptedException) {
9090
Thread.currentThread().interrupt();
@@ -107,7 +107,7 @@ private void scheduleWorkers(CopyContext context, DataWorkerExecutor<Boolean> wo
107107
context.getMigrationContext().getDataSourceRepository());
108108
String table = copyItem.getSourceItem();
109109
long totalRows = copyItem.getRowCount();
110-
long pageSize = getReaderBatchSizeForTable(context, table);
110+
int pageSize = copyItem.getBatchSize();
111111
try {
112112
PerformanceRecorder recorder = context.getPerformanceProfiler().createRecorder(PerformanceCategory.DB_READ,
113113
table);
@@ -143,25 +143,29 @@ private void scheduleWorkers(CopyContext context, DataWorkerExecutor<Boolean> wo
143143
taskRepository.updateTaskCopyMethod(context, copyItem, DataCopyMethod.OFFSET.toString());
144144
taskRepository.updateTaskKeyColumns(context, copyItem, batchColumns);
145145

146-
List<Long> batches = null;
146+
List<Pair<Long, Long>> batches;
147147
if (context.getMigrationContext().isSchedulerResumeEnabled()) {
148148
Set<DatabaseCopyBatch> pendingBatchesForPipeline = taskRepository
149149
.findPendingBatchesForPipeline(context, copyItem);
150150
batches = pendingBatchesForPipeline.stream()
151-
.map(b -> Long.valueOf(b.getLowerBoundary().toString())).collect(Collectors.toList());
151+
.map(b -> Pair.of(Long.valueOf(b.getLowerBoundary().toString()),
152+
Long.valueOf(b.getUpperBoundary().toString())))
153+
.collect(Collectors.toList());
152154
taskRepository.resetPipelineBatches(context, copyItem);
153155
} else {
154156
batches = new ArrayList<>();
155157
for (long offset = 0; offset < totalRows; offset += pageSize) {
156-
batches.add(offset);
158+
batches.add(Pair.of(offset, offset + pageSize));
157159
}
158160
}
159161

162+
Pair<Long, Long> boundaries;
160163
for (int batchId = 0; batchId < batches.size(); batchId++) {
161-
long offset = batches.get(batchId);
162-
DataReaderTask dataReaderTask = new BatchOffsetDataReaderTask(pipeTaskContext, batchId, offset,
163-
batchColumns);
164-
taskRepository.scheduleBatch(context, copyItem, batchId, offset, offset + pageSize);
164+
boundaries = batches.get(batchId);
165+
DataReaderTask dataReaderTask = new BatchOffsetDataReaderTask(pipeTaskContext, batchId,
166+
boundaries.getLeft(), batchColumns);
167+
taskRepository.scheduleBatch(context, copyItem, batchId, boundaries.getLeft(),
168+
boundaries.getRight());
165169
workerExecutor.safelyExecute(dataReaderTask);
166170
}
167171
} else {
@@ -182,13 +186,12 @@ private void scheduleWorkers(CopyContext context, DataWorkerExecutor<Boolean> wo
182186
taskRepository.updateTaskCopyMethod(context, copyItem, DataCopyMethod.SEEK.toString());
183187
taskRepository.updateTaskKeyColumns(context, copyItem, Lists.newArrayList(batchColumn));
184188

185-
List<List<Object>> batchMarkersList = null;
189+
List<List<Object>> batchMarkersList;
186190
if (context.getMigrationContext().isSchedulerResumeEnabled()) {
187-
batchMarkersList = new ArrayList<>();
188191
Set<DatabaseCopyBatch> pendingBatchesForPipeline = taskRepository
189192
.findPendingBatchesForPipeline(context, copyItem);
190-
batchMarkersList.addAll(pendingBatchesForPipeline.stream()
191-
.map(b -> Collections.list(b.getLowerBoundary())).collect(Collectors.toList()));
193+
batchMarkersList = pendingBatchesForPipeline.stream()
194+
.map(b -> Collections.list(b.getLowerBoundary())).collect(Collectors.toList());
192195
taskRepository.resetPipelineBatches(context, copyItem);
193196
} else {
194197
MarkersQueryDefinition queryDefinition = new MarkersQueryDefinition();
@@ -237,9 +240,4 @@ private void scheduleWorkers(CopyContext context, DataWorkerExecutor<Boolean> wo
237240
throw new RuntimeException("Exception while preparing reader tasks", ex);
238241
}
239242
}
240-
241-
private static int getReaderBatchSizeForTable(final CopyContext context, final String tableName) {
242-
Integer tableBatchSize = context.getMigrationContext().getReaderBatchSize(tableName);
243-
return tableBatchSize == null ? context.getMigrationContext().getReaderBatchSize() : tableBatchSize;
244-
}
245243
}

0 commit comments

Comments
 (0)