Skip to content

Commit 06c176d

Browse files
authored
fix: bulk migration logging, tests, related query optimizations (#1131)
1 parent 2607c84 commit 06c176d

File tree

9 files changed

+162
-9
lines changed

9 files changed

+162
-9
lines changed

CHANGELOG.md

+21
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,27 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

88
## [Unreleased]
99

10+
## [10.1.2]
11+
12+
13+
- Adds user_id index to the user roles table
14+
- Adds more debug logging to bulk migration
15+
- Adds more tests to bulk migration
16+
17+
### Migration
18+
19+
If using PostgreSQL, run the following SQL script:
20+
21+
```sql
22+
CREATE INDEX IF NOT EXISTS user_roles_app_id_user_id_index ON user_roles (app_id, user_id);
23+
```
24+
25+
If using MySQL, run the following SQL script:
26+
```sql
27+
CREATE INDEX user_roles_app_id_user_id_index ON user_roles (app_id, user_id);
28+
```
29+
30+
1031
## [10.1.1]
1132

1233
- Adds debug logging for the bulk migration process

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ compileTestJava { options.encoding = "UTF-8" }
1919
// }
2020
//}
2121

22-
version = "10.1.1"
22+
version = "10.1.2"
2323

2424
repositories {
2525
mavenCentral()

src/main/java/io/supertokens/bulkimport/BulkImport.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ private static void associateUserToTenants(Main main, AppIdentifier appIdentifie
411411

412412
TenantIdentifier tenantIdentifier = new TenantIdentifier(appIdentifier.getConnectionUriDomain(),
413413
appIdentifier.getAppId(), tenantId);
414-
Multitenancy.addUserIdToTenant(main, tenantIdentifier, storage, lm.getSuperTokenOrExternalUserId());
414+
Multitenancy.addUserIdToTenant(main, tenantIdentifier, storage, lm.superTokensUserId);
415415
} catch (TenantOrAppNotFoundException e) {
416416
throw new StorageTransactionLogicException(new Exception("E009: " + e.getMessage()));
417417
} catch (StorageQueryException e) {

src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkImportUsers.java

+3
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ protected void doTaskPerApp(AppIdentifier app)
7878
int bulkMigrationBatchSize = Config.getConfig(app.getAsPublicTenantIdentifier(), main)
7979
.getBulkMigrationBatchSize();
8080

81+
Logging.debug(main, app.getAsPublicTenantIdentifier(), "CronTask starts. Instance: " + this);
8182
Logging.debug(main, app.getAsPublicTenantIdentifier(), "CronTask starts. Processing bulk import users with " + bulkMigrationBatchSize
8283
+ " batch size, one batch split into " + numberOfBatchChunks + " chunks");
8384

@@ -119,8 +120,10 @@ protected void doTaskPerApp(AppIdentifier app)
119120

120121
for (Future<?> task : tasks) {
121122
while (!task.isDone()) {
123+
Logging.debug(main, app.getAsPublicTenantIdentifier(), "Waiting for task " + task + " to finish");
122124
Thread.sleep(1000);
123125
}
126+
Logging.debug(main, app.getAsPublicTenantIdentifier(), "Task " + task + " finished");
124127
Void result = (Void) task.get(); //to know if there were any errors while executing and for waiting in this thread for all the other threads to finish up
125128
usersProcessed += loadedUsersChunks.get(tasks.indexOf(task)).size();
126129
failedUsers = bulkImportSQLStorage.getBulkImportUsersCount(app, BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED);

src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkUsersImportWorker.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ private void processMultipleUsers(AppIdentifier appIdentifier, List<BulkImportUs
113113
while (shouldRetryImmediatley) {
114114
shouldRetryImmediatley = bulkImportProxyStorage.startTransaction(con -> {
115115
try {
116-
BulkImport.processUsersImportSteps(main, appIdentifier, bulkImportProxyStorage, partitionedUsers.get(bulkImportProxyStorage),
116+
BulkImport.processUsersImportSteps(main, appIdentifier, bulkImportProxyStorage,
117+
partitionedUsers.get(bulkImportProxyStorage),
117118
allStoragesForApp);
118119

119120
bulkImportProxyStorage.commitTransactionForBulkImportProxyStorage();
@@ -123,8 +124,16 @@ private void processMultipleUsers(AppIdentifier appIdentifier, List<BulkImportUs
123124
toDelete[i] = validUsers.get(i).id;
124125
}
125126

126-
baseTenantStorage.deleteBulkImportUsers(appIdentifier, toDelete);
127-
} catch (StorageTransactionLogicException e) {
127+
while (true){
128+
try {
129+
baseTenantStorage.deleteBulkImportUsers(appIdentifier, toDelete);
130+
break;
131+
} catch (Exception e) {
132+
// ignore and retry delete. The import transaction is already committed, the delete should happen no matter what
133+
Logging.debug(main, app.getAsPublicTenantIdentifier(), "Exception while deleting bulk import users: " + e.getMessage());
134+
}
135+
}
136+
} catch (StorageTransactionLogicException | StorageQueryException e) {
128137
// We need to rollback the transaction manually because we have overridden that in the proxy
129138
// storage
130139
bulkImportProxyStorage.rollbackTransactionForBulkImportProxyStorage();

src/main/java/io/supertokens/inmemorydb/queries/GeneralQueries.java

+1
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,7 @@ public static void createTablesIfNotExists(Start start, Main main) throws SQLExc
402402
update(start, UserRolesQueries.getQueryToCreateRolePermissionsTable(start), NO_OP_SETTER);
403403
// index
404404
update(start, UserRolesQueries.getQueryToCreateRolePermissionsPermissionIndex(start), NO_OP_SETTER);
405+
update(start, UserRolesQueries.getQueryToCreateUserRolesUserIdAppIdIndex(start), NO_OP_SETTER);
405406
}
406407

407408
if (!doesTableExists(start, Config.getConfig(start).getUserRolesTable())) {

src/main/java/io/supertokens/inmemorydb/queries/UserRolesQueries.java

+5
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ static String getQueryToCreateRolePermissionsPermissionIndex(Start start) {
6666
+ Config.getConfig(start).getUserRolesPermissionsTable() + "(app_id, permission);";
6767
}
6868

69+
public static String getQueryToCreateUserRolesUserIdAppIdIndex(Start start) {
70+
return "CREATE INDEX user_roles_app_id_user_id_index ON " + Config.getConfig(start).getUserRolesTable() +
71+
"(app_id, user_id)";
72+
}
73+
6974
public static String getQueryToCreateUserRolesTable(Start start) {
7075
String tableName = Config.getConfig(start).getUserRolesTable();
7176
// @formatter:off

src/test/java/io/supertokens/test/bulkimport/BulkImportFlowTest.java

+117-4
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public void testWithALotOfUsers() throws Exception {
127127
throw e;
128128
}
129129
}
130-
Thread.sleep(5000);
130+
Thread.sleep(1000);
131131
}
132132
}
133133

@@ -145,6 +145,120 @@ public void testWithALotOfUsers() throws Exception {
145145

146146
}
147147

148+
149+
@Test
150+
public void testCoreRestartMidImportShouldResultInSuccessfulImport() throws Exception {
151+
String[] args = { "../" };
152+
153+
// set processing thread number
154+
Utils.setValueInConfig("bulk_migration_parallelism", "8");
155+
Utils.setValueInConfig("bulk_migration_batch_size", "1000");
156+
Utils.setValueInConfig("log_level", "DEBUG");
157+
158+
TestingProcessManager.TestingProcess process = TestingProcessManager.start(args, true);
159+
Main main = process.getProcess();
160+
setFeatureFlags(main, new EE_FEATURES[] {
161+
EE_FEATURES.ACCOUNT_LINKING, EE_FEATURES.MULTI_TENANCY, EE_FEATURES.MFA});
162+
// We are setting a non-zero initial wait for tests to avoid race condition with the beforeTest process that deletes data in the storage layer
163+
CronTaskTest.getInstance(main).setInitialWaitTimeInSeconds(ProcessBulkImportUsers.RESOURCE_KEY, 5);
164+
CronTaskTest.getInstance(main).setIntervalInSeconds(ProcessBulkImportUsers.RESOURCE_KEY, 60);
165+
166+
assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STARTED));
167+
168+
Cronjobs.addCronjob(main, (ProcessBulkImportUsers) main.getResourceDistributor().getResource(new TenantIdentifier(null, null, null), ProcessBulkImportUsers.RESOURCE_KEY));
169+
170+
int NUMBER_OF_USERS_TO_UPLOAD = 10000;
171+
172+
if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) {
173+
return;
174+
}
175+
176+
// Create user roles before inserting bulk users
177+
{
178+
UserRoles.createNewRoleOrModifyItsPermissions(main, "role1", null);
179+
UserRoles.createNewRoleOrModifyItsPermissions(main, "role2", null);
180+
}
181+
182+
// upload a bunch of users through the API
183+
{
184+
for (int i = 0; i < (NUMBER_OF_USERS_TO_UPLOAD / 1000); i++) {
185+
JsonObject request = generateUsersJson(1000, i * 1000); // API allows 10k users upload at once
186+
JsonObject response = uploadBulkImportUsersJson(main, request);
187+
assertEquals("OK", response.get("status").getAsString());
188+
}
189+
190+
}
191+
192+
long processingStarted = System.currentTimeMillis();
193+
boolean restartHappened = false;
194+
// wait for the cron job to process them
195+
// periodically check the remaining unprocessed users
196+
// Note1: the cronjob starts the processing automatically
197+
// Note2: the successfully processed users get deleted from the bulk_import_users table
198+
{
199+
long count = NUMBER_OF_USERS_TO_UPLOAD;
200+
while(true) {
201+
try {
202+
JsonObject response = loadBulkImportUsersCountWithStatus(main, null);
203+
assertEquals("OK", response.get("status").getAsString());
204+
count = response.get("count").getAsLong();
205+
int newUsersNumber = loadBulkImportUsersCountWithStatus(main,
206+
BulkImportStorage.BULK_IMPORT_USER_STATUS.NEW).get("count").getAsInt();
207+
int processingUsersNumber = loadBulkImportUsersCountWithStatus(main,
208+
BulkImportStorage.BULK_IMPORT_USER_STATUS.PROCESSING).get("count").getAsInt();
209+
int failedUsersNumber = loadBulkImportUsersCountWithStatus(main,
210+
BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED).get("count").getAsInt();
211+
count = newUsersNumber + processingUsersNumber;
212+
System.out.println("Remaining users: " + count);
213+
214+
if (count == 0) {
215+
break;
216+
}
217+
if((System.currentTimeMillis() - processingStarted > 10000) && !restartHappened) {
218+
System.out.println("Killing core");
219+
process.kill(false);
220+
Utils.setValueInConfig("bulk_migration_parallelism", "14");
221+
Utils.setValueInConfig("bulk_migration_batch_size", "4000");
222+
System.out.println("Started new core");
223+
process = TestingProcessManager.start(args, true);
224+
main = process.getProcess();
225+
setFeatureFlags(main, new EE_FEATURES[] {
226+
EE_FEATURES.ACCOUNT_LINKING, EE_FEATURES.MULTI_TENANCY, EE_FEATURES.MFA});
227+
// We are setting a non-zero initial wait for tests to avoid race condition with the beforeTest process that deletes data in the storage layer
228+
CronTaskTest.getInstance(main).setInitialWaitTimeInSeconds(ProcessBulkImportUsers.RESOURCE_KEY, 5);
229+
CronTaskTest.getInstance(main).setIntervalInSeconds(ProcessBulkImportUsers.RESOURCE_KEY, 60);
230+
231+
assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STARTED));
232+
233+
Cronjobs.addCronjob(main, (ProcessBulkImportUsers) main.getResourceDistributor().getResource(new TenantIdentifier(null, null, null), ProcessBulkImportUsers.RESOURCE_KEY));
234+
restartHappened = true;
235+
}
236+
} catch (Exception e) {
237+
if(e instanceof SocketTimeoutException) {
238+
//ignore
239+
} else {
240+
throw e;
241+
}
242+
}
243+
Thread.sleep(1000);
244+
}
245+
}
246+
247+
long processingFinished = System.currentTimeMillis();
248+
System.out.println("Processed " + NUMBER_OF_USERS_TO_UPLOAD + " users in " + (processingFinished - processingStarted) / 1000
249+
+ " seconds ( or " + (processingFinished - processingStarted) / 60000 + " minutes)");
250+
251+
// after processing finished, make sure every user got processed correctly
252+
{
253+
int failedImportedUsersNumber = loadBulkImportUsersCountWithStatus(main, BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED).get("count").getAsInt();
254+
int usersInCore = loadUsersCount(main).get("count").getAsInt();
255+
assertEquals(NUMBER_OF_USERS_TO_UPLOAD, usersInCore + failedImportedUsersNumber);
256+
assertEquals(NUMBER_OF_USERS_TO_UPLOAD, usersInCore);
257+
}
258+
259+
}
260+
261+
148262
@Test
149263
public void testBatchWithOneUser() throws Exception {
150264
Main main = startCronProcess("14");
@@ -801,7 +915,7 @@ private static JsonObject uploadBulkImportUsersJson(Main main, JsonObject reques
801915

802916
@NotNull
803917
private Main startCronProcess(String parallelism) throws IOException, InterruptedException, TenantOrAppNotFoundException {
804-
return startCronProcess(parallelism, 5*60);
918+
return startCronProcess(parallelism, 5 * 60);
805919
}
806920

807921

@@ -814,15 +928,14 @@ private Main startCronProcess(String parallelism, int intervalInSeconds) throws
814928
//Utils.setValueInConfig("bulk_migration_batch_size", "1000");
815929
Utils.setValueInConfig("log_level", "DEBUG");
816930

817-
TestingProcessManager.TestingProcess process = TestingProcessManager.start(args, false);
931+
TestingProcessManager.TestingProcess process = TestingProcessManager.start(args, true);
818932
Main main = process.getProcess();
819933
setFeatureFlags(main, new EE_FEATURES[] {
820934
EE_FEATURES.ACCOUNT_LINKING, EE_FEATURES.MULTI_TENANCY, EE_FEATURES.MFA});
821935
// We are setting a non-zero initial wait for tests to avoid race condition with the beforeTest process that deletes data in the storage layer
822936
CronTaskTest.getInstance(main).setInitialWaitTimeInSeconds(ProcessBulkImportUsers.RESOURCE_KEY, 5);
823937
CronTaskTest.getInstance(main).setIntervalInSeconds(ProcessBulkImportUsers.RESOURCE_KEY, intervalInSeconds);
824938

825-
process.startProcess();
826939
assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STARTED));
827940

828941
Cronjobs.addCronjob(main, (ProcessBulkImportUsers) main.getResourceDistributor().getResource(new TenantIdentifier(null, null, null), ProcessBulkImportUsers.RESOURCE_KEY));

src/test/java/io/supertokens/test/bulkimport/ProcessBulkImportUsersCronJobTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,7 @@ public void shouldDeleteEverythingFromTheDBIfAnythingFailsOnMultipleThreads() th
531531
Main main = process.getProcess();
532532

533533
BulkImportTestUtils.createTenants(main);
534+
Thread.sleep(1000);
534535

535536
BulkImportSQLStorage storage = (BulkImportSQLStorage) StorageLayer.getStorage(main);
536537
AppIdentifier appIdentifier = new AppIdentifier(null, null);

0 commit comments

Comments
 (0)