Skip to content

Commit cd612fe

Browse files
authored
add: email verification worker priority queueing, backpressure, heartbeat progress (#2577)
* refactor email verification handler * add debug logs to rate limiter * disable executeEvenly for rate-limiter * add queue based email verification handler * remove unused import & type any * remove remainingTokens size from logs
1 parent e4a0374 commit cd612fe

File tree

5 files changed

+296
-208
lines changed

5 files changed

+296
-208
lines changed

backend/src/emailVerificationWorker.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@ const emailStatusVerifier = new EmailStatusVerifierFactory(ENV, logger);
2323
const emailStatusCache = new RedisEmailStatusCache(redisClient);
2424
const contacts = new PgContacts(pool, logger);
2525

26-
const { processStreamData } = initializeEmailVerificationProcessor(
26+
const streamsHandler = initializeEmailVerificationProcessor(
2727
contacts,
2828
emailStatusCache,
29-
emailStatusVerifier
29+
emailStatusVerifier,
30+
redisClient,
31+
logger
3032
);
3133

3234
const tasksManagementSubscriber = new RedisSubscriber<PubSubMessage>(
@@ -47,7 +49,7 @@ const emailsStreamConsumer = new EmailVerificationConsumer(
4749
tasksManagementSubscriber,
4850
emailStreamsConsumer,
4951
ENV.REDIS_EMAIL_VERIFICATION_CONSUMER_BATCH_SIZE,
50-
processStreamData,
52+
streamsHandler,
5153
redisClient,
5254
logger
5355
);

backend/src/services/email-status/EmailStatusVerifierFactory.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ export default class EmailStatusVerifierFactory {
203203
}
204204
},
205205
new TokenBucketRateLimiter({
206-
executeEvenly: true,
206+
executeEvenly: false,
207207
uniqueKey: 'email-verification-reacher',
208208
distribution: Distribution.Memory,
209209
requests: config.REACHER_RATE_LIMITER_REQUESTS,
@@ -225,7 +225,7 @@ export default class EmailStatusVerifierFactory {
225225
const client = new MailerCheckClient(
226226
{ apiToken: MAILERCHECK_API_KEY },
227227
new TokenBucketRateLimiter({
228-
executeEvenly: true,
228+
executeEvenly: false,
229229
uniqueKey: 'email-verification-mailercheck',
230230
distribution: Distribution.Memory,
231231
requests: 60,
@@ -247,14 +247,14 @@ export default class EmailStatusVerifierFactory {
247247
const client = new ZerobounceClient(
248248
{ apiToken: ZEROBOUNCE_API_KEY },
249249
new TokenBucketRateLimiter({
250-
executeEvenly: true,
250+
executeEvenly: false,
251251
uniqueKey: 'email-verification-zerobounce-single',
252252
distribution: Distribution.Memory,
253253
requests: ZerobounceClient.SINGLE_VALIDATION_PER_10_SECONDS,
254254
intervalSeconds: 60
255255
}),
256256
new TokenBucketRateLimiter({
257-
executeEvenly: true,
257+
executeEvenly: false,
258258
uniqueKey: 'email-verification-zerobounce-bulk',
259259
distribution: Distribution.Memory,
260260
requests: ZerobounceClient.BATCH_VALIDATION_PER_MINUTE,

backend/src/services/rate-limiter/index.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
} from 'rate-limiter-flexible';
77
import redis from '../../utils/redis';
88
import pool from '../../db/pg';
9+
import logger from '../../utils/logger';
910

1011
export interface IRateLimiter {
1112
throttleRequests<T>(callback: () => Promise<T>): Promise<T>;
@@ -30,13 +31,22 @@ export interface RateLimiterOptions {
3031
export class TokenBucketRateLimiter implements IRateLimiter {
3132
private readonly limiter: RateLimiterQueue;
3233

34+
private readonly options: RateLimiterOptions;
35+
3336
constructor({
3437
intervalSeconds,
3538
requests,
3639
uniqueKey,
3740
executeEvenly,
3841
distribution
3942
}: RateLimiterOptions) {
43+
this.options = {
44+
intervalSeconds,
45+
requests,
46+
uniqueKey,
47+
executeEvenly,
48+
distribution
49+
};
4050
const baseLimiter = TokenBucketRateLimiter.createLimiter({
4151
intervalSeconds,
4252
requests,
@@ -87,7 +97,20 @@ export class TokenBucketRateLimiter implements IRateLimiter {
8797
* Executes a request and consumes 1 tokens.
8898
*/
8999
async throttleRequests<T>(callback: () => Promise<T>): Promise<T> {
100+
logger.debug('Rate limiter: queueing request', {
101+
instance: this.options.uniqueKey
102+
});
103+
104+
const startTime = performance.now();
90105
await this.limiter.removeTokens(1);
106+
const waitTimeMs = (performance.now() - startTime).toFixed(2);
107+
108+
logger.debug('Rate limiter: request granted', {
109+
instance: this.options.uniqueKey,
110+
metrics: {
111+
waitTimeMs
112+
}
113+
});
91114
return callback();
92115
}
93116

backend/src/workers/email-verification/EmailsVerificationConsumer.ts

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ import { Redis } from 'ioredis';
22
import { Logger } from 'winston';
33
import RedisSubscriber from '../../utils/pubsub/redis/RedisSubscriber';
44
import MultipleStreamsConsumer from '../../utils/streams/MultipleStreamsConsumer';
5-
import { EmailVerificationData } from './emailVerificationHandlers';
5+
import {
6+
EmailVerificationData,
7+
EmailVerificationHandler
8+
} from './emailVerificationHandlers';
69

710
export interface PubSubMessage {
811
miningId: string;
@@ -20,10 +23,7 @@ export default class EmailVerificationConsumer {
2023
private readonly taskManagementSubscriber: RedisSubscriber<PubSubMessage>,
2124
private readonly emailStreamsConsumer: MultipleStreamsConsumer<EmailVerificationData>,
2225
private readonly batchSize: number,
23-
private readonly emailProcessor: (
24-
data: EmailVerificationData[],
25-
progressCallback: (verified: number) => Promise<void>
26-
) => Promise<void>,
26+
private readonly streamsHandler: EmailVerificationHandler,
2727
private readonly redisClient: Redis,
2828
private readonly logger: Logger
2929
) {
@@ -34,8 +34,10 @@ export default class EmailVerificationConsumer {
3434
if (emailsStream) {
3535
if (command === 'REGISTER') {
3636
this.activeStreams.add(emailsStream);
37+
this.streamsHandler.registerStream(emailsStream);
3738
} else {
3839
this.activeStreams.delete(emailsStream);
40+
this.streamsHandler.unregisterStream(emailsStream);
3941
}
4042
}
4143

@@ -68,17 +70,11 @@ export default class EmailVerificationConsumer {
6870
this.logger.info(
6971
`Consuming ${data.length} emails from stream name ${streamName}`
7072
);
71-
const progressCallback = async (verified: number) => {
72-
await this.redisClient.publish(
73-
streamName.split('-')[1],
74-
JSON.stringify({
75-
miningId: streamName.split('-')[1],
76-
progressType: 'verifiedContacts',
77-
count: verified
78-
})
79-
);
80-
};
81-
const processed = await this.emailProcessor(data, progressCallback);
73+
74+
const processed = await this.streamsHandler.handle(
75+
streamName,
76+
data
77+
);
8278

8379
if (!this.activeStreams.has(streamName)) {
8480
return null;
@@ -121,7 +117,7 @@ export default class EmailVerificationConsumer {
121117

122118
setTimeout(() => {
123119
this.consume();
124-
}, 3000);
120+
}, 10000);
125121
}
126122

127123
/**

0 commit comments

Comments
 (0)