From e5c99a4d68ace57635186ae90d260522a6466c8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 3 Oct 2025 08:03:25 +0200 Subject: [PATCH 01/21] fix: use platform instead of default for incorrectly formatted attributes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../data_sink_worker/src/service/member.service.ts | 12 ++++++------ .../src/service/memberAttribute.service.ts | 5 +++-- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index 6070fef17d..10c084141b 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -67,7 +67,7 @@ export default class MemberService extends LoggerBase { segmentIds: string[], integrationId: string, data: IMemberCreateData, - source: string, + platform: PlatformType, releaseMemberLock?: () => Promise, ): Promise { return logExecutionTimeV2( @@ -91,7 +91,7 @@ export default class MemberService extends LoggerBase { let attributes: Record = {} if (data.attributes) { attributes = await logExecutionTimeV2( - () => memberAttributeService.validateAttributes(data.attributes), + () => memberAttributeService.validateAttributes(platform, data.attributes), this.log, 'memberService -> create -> validateAttributes', ) @@ -192,7 +192,7 @@ export default class MemberService extends LoggerBase { if (data.organizations) { for (const org of data.organizations) { const id = await logExecutionTimeV2( - () => orgService.findOrCreate(source, integrationId, org), + () => orgService.findOrCreate(platform, integrationId, org), this.log, 'memberService -> create -> findOrCreateOrg', ) @@ -267,7 +267,7 @@ export default class MemberService extends LoggerBase { data: IMemberUpdateData, original: IDbMember, originalIdentities: IMemberIdentity[], - source: string, + platform: PlatformType, releaseMemberLock?: () => Promise, ): Promise { await logExecutionTimeV2( @@ -284,7 +284,7 @@ export default class MemberService extends LoggerBase { if (data.attributes) { this.log.trace({ memberId: id }, 'Validating member attributes!') data.attributes = await logExecutionTimeV2( - () => memberAttributeService.validateAttributes(data.attributes), + () => memberAttributeService.validateAttributes(platform, data.attributes), this.log, 'memberService -> update -> validateAttributes', ) @@ -404,7 +404,7 @@ export default class MemberService extends LoggerBase { this.log.trace({ memberId: id }, 'Finding or creating organization!') const orgId = await logExecutionTimeV2( - () => orgService.findOrCreate(source, integrationId, org), + () => orgService.findOrCreate(platform, integrationId, org), this.log, 'memberService -> update -> findOrCreateOrg', ) diff --git a/services/apps/data_sink_worker/src/service/memberAttribute.service.ts b/services/apps/data_sink_worker/src/service/memberAttribute.service.ts index 4dd6e0a68b..5d9eb52149 100644 --- a/services/apps/data_sink_worker/src/service/memberAttribute.service.ts +++ b/services/apps/data_sink_worker/src/service/memberAttribute.service.ts @@ -7,7 +7,7 @@ import { import { dbStoreQx } from '@crowd/data-access-layer/src/queryExecutor' import { Logger, LoggerBase } from '@crowd/logging' import { RedisClient } from '@crowd/redis' -import { MemberAttributeType } from '@crowd/types' +import { MemberAttributeType, PlatformType } from '@crowd/types' export default class MemberAttributeService extends LoggerBase { constructor( @@ -26,6 +26,7 @@ export default class MemberAttributeService extends LoggerBase { } public async validateAttributes( + platform: PlatformType, attributes: Record, ): Promise> { const settings = await getMemberAttributeSettings(dbStoreQx(this.store), this.redis) @@ -45,7 +46,7 @@ export default class MemberAttributeService extends LoggerBase { } if (typeof attributes[attributeName] !== 'object') { attributes[attributeName] = { - custom: attributes[attributeName], + [platform]: attributes[attributeName], } } From b0eadeeee78e9dface8876ec922346d1d62c3a8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 3 Oct 2025 08:59:05 +0200 Subject: [PATCH 02/21] chore: script to fix the corrupted attributes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../src/bin/fix-member-attributes.ts | 225 ++++++++++++++++++ 1 file changed, 225 insertions(+) create mode 100644 services/apps/data_sink_worker/src/bin/fix-member-attributes.ts diff --git a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts new file mode 100644 index 0000000000..6fb5da0a79 --- /dev/null +++ b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts @@ -0,0 +1,225 @@ +import isEqual from 'lodash.isequal' + +import { connQx, updateMember } from '@crowd/data-access-layer' +import { + DbConnOrTx, + DbStore, + WRITE_DB_CONFIG, + getDbConnection, +} from '@crowd/data-access-layer/src/database' +import { getServiceLogger } from '@crowd/logging' +import { REDIS_CONFIG, getRedisClient } from '@crowd/redis' + +import MemberAttributeService from '../service/memberAttribute.service' + +/* eslint-disable @typescript-eslint/no-explicit-any */ + +const log = getServiceLogger() + +async function getMemberIds( + db: DbConnOrTx, + lastId?: string, +): Promise<{ id: string; attributes: any; manuallyChangedFields: any }[]> { + try { + log.debug({ lastId }, 'Querying for members with attribute issues') + + const results = await db.any( + `with relevant_members as (with member_with_attributes as (select id, + "createdAt", + jsonb_object_keys(attributes) as attr_key, + attributes -> jsonb_object_keys(attributes) as attr_value + from members + where "deletedAt" is null + and attributes is not null + and attributes != 'null'::jsonb + and attributes != '{}'::jsonb) + select distinct id + from member_with_attributes + where jsonb_typeof(attr_value) = 'object' + and coalesce(attr_value ->> 'default', '') = '' + ${lastId ? `and id < '${lastId}'` : ''} + order by id desc + limit 5000) + select m.id, m.attributes, m."manuallyChangedFields" + from members m + inner join + relevant_members rm on rm.id = m.id;`, + { lastId }, + ) + + log.debug( + { + resultCount: results.length, + lastId, + firstId: results.length > 0 ? results[0].id : null, + lastResultId: results.length > 0 ? results[results.length - 1].id : null, + }, + 'Query completed', + ) + + return results + } catch (error) { + log.error( + { + error: error.message, + lastId, + stack: error.stack, + }, + 'Failed to query member IDs', + ) + throw error + } +} + +setImmediate(async () => { + let dbClient: DbConnOrTx | undefined + let redisClient: any | undefined + + try { + log.info('Starting member attributes fix script') + + // Initialize connections + log.info('Connecting to database...') + dbClient = await getDbConnection(WRITE_DB_CONFIG()) + log.info('Database connection established') + + log.info('Connecting to Redis...') + redisClient = await getRedisClient(REDIS_CONFIG()) + log.info('Redis connection established') + + const pgQx = connQx(dbClient) + const mas = new MemberAttributeService(redisClient, new DbStore(log, dbClient), log) + + let totalProcessed = 0 + let totalUpdated = 0 + let batchNumber = 1 + + log.info('Starting to process members with attribute issues') + let membersToFix = await getMemberIds(dbClient) + log.info({ count: membersToFix.length }, 'Found members to process in first batch') + + while (membersToFix.length > 0) { + log.info({ batchNumber, batchSize: membersToFix.length }, 'Processing batch') + let batchUpdated = 0 + + for (const data of membersToFix) { + try { + if (data.attributes) { + log.debug({ memberId: data.id }, 'Processing member attributes') + + const oldAttributes = data.attributes + data.attributes = await mas.setAttributesDefaultValues(data.attributes) + + let attributes: Record | undefined + const temp = { ...data.attributes } + const manuallyChangedFields: string[] = data.manuallyChangedFields || [] + + if (manuallyChangedFields.length > 0) { + log.debug( + { + memberId: data.id, + manuallyChangedFieldsCount: manuallyChangedFields.length, + }, + 'Member has manually changed fields', + ) + + const prefix = 'attributes.' + const manuallyChangedAttributes = [ + ...new Set( + manuallyChangedFields + .filter((f) => f.startsWith(prefix)) + .map((f) => f.slice(prefix.length)), + ), + ] + + log.debug( + { + memberId: data.id, + manuallyChangedAttributes, + }, + 'Preserving manually changed attributes', + ) + + // Preserve manually changed attributes + for (const key of manuallyChangedAttributes) { + if (oldAttributes?.[key] !== undefined) { + temp[key] = oldAttributes[key] // Fixed: removed .attributes + } + } + } + + if (!isEqual(temp, oldAttributes)) { + attributes = temp + log.debug({ memberId: data.id }, 'Attributes changed, will update') + } else { + log.debug({ memberId: data.id }, 'No changes needed for attributes') + } + + if (attributes) { + log.debug({ memberId: data.id }, 'Updating member attributes') + await updateMember(pgQx, data.id, { attributes } as any) + batchUpdated++ + totalUpdated++ + log.debug({ memberId: data.id }, 'Member attributes updated successfully') + } + } else { + log.debug({ memberId: data.id }, 'Member has no attributes to process') + } + + totalProcessed++ + } catch (error) { + log.error( + { + error: error.message, + memberId: data.id, + stack: error.stack, + }, + 'Failed to process member', + ) + // Continue processing other members + } + } + + log.info( + { + batchNumber, + batchProcessed: membersToFix.length, + batchUpdated, + totalProcessed, + totalUpdated, + }, + 'Completed batch processing', + ) + + // Get next batch + const lastId = membersToFix[membersToFix.length - 1].id + log.debug({ lastId }, 'Fetching next batch starting from last ID') + + membersToFix = await getMemberIds(dbClient, lastId) + log.info({ count: membersToFix.length }, 'Found members for next batch') + + batchNumber++ + } + + log.info( + { + totalProcessed, + totalUpdated, + totalBatches: batchNumber - 1, + }, + 'Member attributes fix completed successfully', + ) + } catch (error) { + log.error( + { + error: error.message, + stack: error.stack, + }, + 'Fatal error in member attributes fix script', + ) + process.exit(1) + } finally { + log.info('Script execution completed') + process.exit(0) + } +}) From 816d4dc730d2a4b731639d1c555c089b33d43246 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 3 Oct 2025 20:34:36 +0200 Subject: [PATCH 03/21] chore: fixed small issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../src/bin/fix-member-attributes.ts | 127 +++++++++++------- 1 file changed, 78 insertions(+), 49 deletions(-) diff --git a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts index 6fb5da0a79..4359245983 100644 --- a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts +++ b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts @@ -107,60 +107,89 @@ setImmediate(async () => { if (data.attributes) { log.debug({ memberId: data.id }, 'Processing member attributes') - const oldAttributes = data.attributes - data.attributes = await mas.setAttributesDefaultValues(data.attributes) - - let attributes: Record | undefined - const temp = { ...data.attributes } - const manuallyChangedFields: string[] = data.manuallyChangedFields || [] - - if (manuallyChangedFields.length > 0) { - log.debug( - { - memberId: data.id, - manuallyChangedFieldsCount: manuallyChangedFields.length, - }, - 'Member has manually changed fields', - ) - - const prefix = 'attributes.' - const manuallyChangedAttributes = [ - ...new Set( - manuallyChangedFields - .filter((f) => f.startsWith(prefix)) - .map((f) => f.slice(prefix.length)), - ), - ] - - log.debug( - { - memberId: data.id, - manuallyChangedAttributes, - }, - 'Preserving manually changed attributes', - ) - - // Preserve manually changed attributes - for (const key of manuallyChangedAttributes) { - if (oldAttributes?.[key] !== undefined) { - temp[key] = oldAttributes[key] // Fixed: removed .attributes + // check if any has default empty but other are full + let process = false + for (const attName of Object.keys(data.attributes)) { + const defValue = data.attributes[attName].default + + if (defValue === undefined || defValue === null || defValue === '') { + for (const platform of Object.keys(data.attributes[attName]).filter( + (p) => p !== 'default', + )) { + const value = data.attributes[attName][platform] + + if (value !== undefined && value !== null && value !== '') { + log.debug( + { memberId: data.id, attName, platform, value }, + 'Found value for attribute', + ) + process = true + break + } + } + + if (process) { + break } } } - if (!isEqual(temp, oldAttributes)) { - attributes = temp - log.debug({ memberId: data.id }, 'Attributes changed, will update') - } else { - log.debug({ memberId: data.id }, 'No changes needed for attributes') - } + if (process) { + const oldAttributes = data.attributes + data.attributes = await mas.setAttributesDefaultValues(data.attributes) + + let attributes: Record | undefined + const temp = { ...data.attributes } + const manuallyChangedFields: string[] = data.manuallyChangedFields || [] + + if (manuallyChangedFields.length > 0) { + log.debug( + { + memberId: data.id, + manuallyChangedFieldsCount: manuallyChangedFields.length, + }, + 'Member has manually changed fields', + ) + + const prefix = 'attributes.' + const manuallyChangedAttributes = [ + ...new Set( + manuallyChangedFields + .filter((f) => f.startsWith(prefix)) + .map((f) => f.slice(prefix.length)), + ), + ] + + log.debug( + { + memberId: data.id, + manuallyChangedAttributes, + }, + 'Preserving manually changed attributes', + ) + + // Preserve manually changed attributes + for (const key of manuallyChangedAttributes) { + if (oldAttributes?.[key] !== undefined) { + temp[key] = oldAttributes[key] // Fixed: removed .attributes + } + } + } + + if (!isEqual(temp, oldAttributes)) { + attributes = temp + log.debug({ memberId: data.id }, 'Attributes changed, will update') + } else { + log.debug({ memberId: data.id }, 'No changes needed for attributes') + } - if (attributes) { - log.debug({ memberId: data.id }, 'Updating member attributes') - await updateMember(pgQx, data.id, { attributes } as any) - batchUpdated++ - totalUpdated++ - log.debug({ memberId: data.id }, 'Member attributes updated successfully') + if (attributes) { + log.debug({ memberId: data.id }, 'Updating member attributes') + await updateMember(pgQx, data.id, { attributes } as any) + batchUpdated++ + totalUpdated++ + log.debug({ memberId: data.id }, 'Member attributes updated successfully') + } } } else { log.debug({ memberId: data.id }, 'Member has no attributes to process') From 2c56faecff631da01ee0425f0a4c2007feab35bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 3 Oct 2025 20:38:06 +0200 Subject: [PATCH 04/21] chore: fixed small issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../data_sink_worker/src/bin/fix-member-attributes.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts index 4359245983..5471530ba3 100644 --- a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts +++ b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts @@ -184,8 +184,12 @@ setImmediate(async () => { } if (attributes) { - log.debug({ memberId: data.id }, 'Updating member attributes') - await updateMember(pgQx, data.id, { attributes } as any) + log.debug( + { memberId: data.id, oldAttributes, attributes }, + 'Updating member attributes', + ) + + // await updateMember(pgQx, data.id, { attributes } as any) batchUpdated++ totalUpdated++ log.debug({ memberId: data.id }, 'Member attributes updated successfully') From 0eee447faeed92aade6aca1566310229740da33b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 3 Oct 2025 20:40:25 +0200 Subject: [PATCH 05/21] chore: fixed small issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../apps/data_sink_worker/src/bin/fix-member-attributes.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts index 5471530ba3..91f292e8ef 100644 --- a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts +++ b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts @@ -194,6 +194,11 @@ setImmediate(async () => { totalUpdated++ log.debug({ memberId: data.id }, 'Member attributes updated successfully') } + } else { + log.debug( + { memberId: data.id, attributes: data.attributes }, + 'No changes needed for attributes', + ) } } else { log.debug({ memberId: data.id }, 'Member has no attributes to process') From 2fe4062a16545808de42fcf26639e113f3bf8337 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 3 Oct 2025 20:45:10 +0200 Subject: [PATCH 06/21] chore: fixed small issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../apps/data_sink_worker/src/bin/fix-member-attributes.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts index 91f292e8ef..29a1ad1c21 100644 --- a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts +++ b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts @@ -112,13 +112,13 @@ setImmediate(async () => { for (const attName of Object.keys(data.attributes)) { const defValue = data.attributes[attName].default - if (defValue === undefined || defValue === null || defValue === '') { + if (defValue === undefined || defValue === null || String(defValue) === '') { for (const platform of Object.keys(data.attributes[attName]).filter( (p) => p !== 'default', )) { const value = data.attributes[attName][platform] - if (value !== undefined && value !== null && value !== '') { + if (value !== undefined && value !== null && String(value) !== '') { log.debug( { memberId: data.id, attName, platform, value }, 'Found value for attribute', From 866ee960b7461879465a72446d7ad14b47b26a19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 3 Oct 2025 20:51:27 +0200 Subject: [PATCH 07/21] chore: fixed small issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../src/bin/fix-member-attributes.ts | 46 +++++++++++-------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts index 29a1ad1c21..cda43c4fb4 100644 --- a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts +++ b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts @@ -24,26 +24,32 @@ async function getMemberIds( log.debug({ lastId }, 'Querying for members with attribute issues') const results = await db.any( - `with relevant_members as (with member_with_attributes as (select id, - "createdAt", - jsonb_object_keys(attributes) as attr_key, - attributes -> jsonb_object_keys(attributes) as attr_value - from members - where "deletedAt" is null - and attributes is not null - and attributes != 'null'::jsonb - and attributes != '{}'::jsonb) - select distinct id - from member_with_attributes - where jsonb_typeof(attr_value) = 'object' - and coalesce(attr_value ->> 'default', '') = '' - ${lastId ? `and id < '${lastId}'` : ''} - order by id desc - limit 5000) - select m.id, m.attributes, m."manuallyChangedFields" - from members m - inner join - relevant_members rm on rm.id = m.id;`, + ` + with relevant_members as (with member_with_attributes as (select id, + "createdAt", + jsonb_object_keys(attributes) as attr_key, + attributes -> jsonb_object_keys(attributes) as attr_value + from members + where "deletedAt" is null + and attributes is not null + and attributes != 'null'::jsonb + and attributes != '{}'::jsonb) + select distinct id + from member_with_attributes + where jsonb_typeof(attr_value) = 'object' + and coalesce(attr_value ->> 'default', '') = '' + and exists (select 1 + from jsonb_each_text(attr_value) as kv + where kv.key != 'default' + and coalesce(kv.value, '') != '') + ${lastId ? `and id < '${lastId}'` : ''} + order by id desc + limit 5000) +select m.id, m.attributes +from members m + inner join + relevant_members rm on rm.id = m.id; + `, { lastId }, ) From c15177eb0e643c8e92a338530616e51a5957606e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 3 Oct 2025 20:59:02 +0200 Subject: [PATCH 08/21] chore: fixed small issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../data_sink_worker/src/bin/fix-member-attributes.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts index cda43c4fb4..a21fa1d692 100644 --- a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts +++ b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts @@ -119,6 +119,14 @@ setImmediate(async () => { const defValue = data.attributes[attName].default if (defValue === undefined || defValue === null || String(defValue) === '') { + log.debug( + { + memberId: data.id, + attName, + defValue: defValue ? String(defValue) : 'undefined', + }, + 'Attribute has default empty', + ) for (const platform of Object.keys(data.attributes[attName]).filter( (p) => p !== 'default', )) { From 5fff263ab443d60da159f7682e8e880b07d5ee89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 3 Oct 2025 21:00:35 +0200 Subject: [PATCH 09/21] chore: fixed small issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- services/apps/data_sink_worker/src/bin/fix-member-attributes.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts index a21fa1d692..ed41bd39ad 100644 --- a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts +++ b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts @@ -122,6 +122,7 @@ setImmediate(async () => { log.debug( { memberId: data.id, + attributes: data.attributes, attName, defValue: defValue ? String(defValue) : 'undefined', }, From 33c65ba8f0bae1c2041676576a95b0aec05de09a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 3 Oct 2025 21:02:12 +0200 Subject: [PATCH 10/21] chore: fixed small issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../apps/data_sink_worker/src/bin/fix-member-attributes.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts index ed41bd39ad..60140e733e 100644 --- a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts +++ b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts @@ -195,7 +195,10 @@ setImmediate(async () => { attributes = temp log.debug({ memberId: data.id }, 'Attributes changed, will update') } else { - log.debug({ memberId: data.id }, 'No changes needed for attributes') + log.debug( + { memberId: data.id, newAttributes: temp }, + 'No changes needed for attributes', + ) } if (attributes) { From 51767d4f6c56de8f175ed5f1427d29cdac773bf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 3 Oct 2025 21:09:15 +0200 Subject: [PATCH 11/21] chore: fixed small issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- services/libs/common/src/member.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/services/libs/common/src/member.ts b/services/libs/common/src/member.ts index e9253847a4..f0dc1a14bb 100644 --- a/services/libs/common/src/member.ts +++ b/services/libs/common/src/member.ts @@ -21,8 +21,14 @@ export async function setAttributesDefaultValues( throw err } } + + const nonEmptyPlatform = Object.keys(attributes[attributeName]).filter((p) => { + const value = attributes[attributeName][p] + return value !== undefined && value !== null && String(value).trim().length > 0 + }) + const highestPriorityPlatform = getHighestPriorityPlatformForAttributes( - Object.keys(attributes[attributeName]), + nonEmptyPlatform, priorities, ) From 584cab9645118b7612eafa37157463a23df41f4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 3 Oct 2025 21:14:05 +0200 Subject: [PATCH 12/21] chore: fixed small issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- services/apps/data_sink_worker/src/bin/fix-member-attributes.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts index 60140e733e..23247e4237 100644 --- a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts +++ b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts @@ -122,7 +122,7 @@ setImmediate(async () => { log.debug( { memberId: data.id, - attributes: data.attributes, + attribute: data.attributes[attName], attName, defValue: defValue ? String(defValue) : 'undefined', }, From a7a3851b8440a3a69c78500b0a7fcc1c9d988941 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 3 Oct 2025 21:15:50 +0200 Subject: [PATCH 13/21] chore: fixed small issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- services/apps/data_sink_worker/src/bin/fix-member-attributes.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts index 23247e4237..ec108e12cc 100644 --- a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts +++ b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts @@ -196,7 +196,7 @@ setImmediate(async () => { log.debug({ memberId: data.id }, 'Attributes changed, will update') } else { log.debug( - { memberId: data.id, newAttributes: temp }, + { memberId: data.id, newAttributes: temp, oldAttributes }, 'No changes needed for attributes', ) } From 2ccc4f5e7f8e0ae8ba6630f2a53001f9058a3d26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 3 Oct 2025 21:27:39 +0200 Subject: [PATCH 14/21] chore: fixed small issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../apps/data_sink_worker/src/bin/fix-member-attributes.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts index ec108e12cc..7b3b2b8188 100644 --- a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts +++ b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts @@ -111,7 +111,10 @@ setImmediate(async () => { for (const data of membersToFix) { try { if (data.attributes) { - log.debug({ memberId: data.id }, 'Processing member attributes') + log.debug( + { memberId: data.id, oldAttributes: data.attributes }, + 'Processing member attributes', + ) // check if any has default empty but other are full let process = false From c89889a5eb044f29c19295c1822069925ad5c986 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 3 Oct 2025 21:28:51 +0200 Subject: [PATCH 15/21] chore: fixed small issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../data_sink_worker/src/bin/fix-member-attributes.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts index 7b3b2b8188..3dceb73571 100644 --- a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts +++ b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts @@ -14,6 +14,8 @@ import MemberAttributeService from '../service/memberAttribute.service' /* eslint-disable @typescript-eslint/no-explicit-any */ +const BATCH_SIZE = process.env.TEST_RUN ? 1 : 5000 + const log = getServiceLogger() async function getMemberIds( @@ -44,7 +46,7 @@ async function getMemberIds( and coalesce(kv.value, '') != '') ${lastId ? `and id < '${lastId}'` : ''} order by id desc - limit 5000) + limit ${BATCH_SIZE}) select m.id, m.attributes from members m inner join @@ -254,6 +256,10 @@ setImmediate(async () => { const lastId = membersToFix[membersToFix.length - 1].id log.debug({ lastId }, 'Fetching next batch starting from last ID') + if (process.env.TEST_RUN) { + break + } + membersToFix = await getMemberIds(dbClient, lastId) log.info({ count: membersToFix.length }, 'Found members for next batch') From f90ffc8f970a51c607486a4b0172c0d92e700652 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 3 Oct 2025 21:33:01 +0200 Subject: [PATCH 16/21] chore: fixed small issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- services/apps/data_sink_worker/src/bin/fix-member-attributes.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts index 3dceb73571..06e8c72fa2 100644 --- a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts +++ b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts @@ -155,7 +155,7 @@ setImmediate(async () => { } if (process) { - const oldAttributes = data.attributes + const oldAttributes = JSON.parse(JSON.stringify(data.attributes)) // Deep copy data.attributes = await mas.setAttributesDefaultValues(data.attributes) let attributes: Record | undefined From 80df04255c79dd7308bbd5d6c9d17e3f07ee4ece Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 3 Oct 2025 21:38:41 +0200 Subject: [PATCH 17/21] chore: fixed small issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../apps/data_sink_worker/src/bin/fix-member-attributes.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts index 06e8c72fa2..78335f3856 100644 --- a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts +++ b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts @@ -1,4 +1,5 @@ import isEqual from 'lodash.isequal' +import mergeWith from 'lodash.mergewith' import { connQx, updateMember } from '@crowd/data-access-layer' import { @@ -159,7 +160,7 @@ setImmediate(async () => { data.attributes = await mas.setAttributesDefaultValues(data.attributes) let attributes: Record | undefined - const temp = { ...data.attributes } + const temp = mergeWith({}, oldAttributes, data.attributes) const manuallyChangedFields: string[] = data.manuallyChangedFields || [] if (manuallyChangedFields.length > 0) { @@ -212,7 +213,7 @@ setImmediate(async () => { 'Updating member attributes', ) - // await updateMember(pgQx, data.id, { attributes } as any) + await updateMember(pgQx, data.id, { attributes } as any) batchUpdated++ totalUpdated++ log.debug({ memberId: data.id }, 'Member attributes updated successfully') From 47d3eef38ab58cec0854fcabec4551b0f8e18b75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 3 Oct 2025 21:40:17 +0200 Subject: [PATCH 18/21] chore: fixed small issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../src/bin/fix-member-attributes.ts | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts index 78335f3856..a6f971779e 100644 --- a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts +++ b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts @@ -120,7 +120,7 @@ setImmediate(async () => { ) // check if any has default empty but other are full - let process = false + let toProcess = false for (const attName of Object.keys(data.attributes)) { const defValue = data.attributes[attName].default @@ -144,18 +144,18 @@ setImmediate(async () => { { memberId: data.id, attName, platform, value }, 'Found value for attribute', ) - process = true + toProcess = true break } } - if (process) { + if (toProcess) { break } } } - if (process) { + if (toProcess) { const oldAttributes = JSON.parse(JSON.stringify(data.attributes)) // Deep copy data.attributes = await mas.setAttributesDefaultValues(data.attributes) @@ -213,7 +213,10 @@ setImmediate(async () => { 'Updating member attributes', ) - await updateMember(pgQx, data.id, { attributes } as any) + if (!process.env.TEST_RUN) { + await updateMember(pgQx, data.id, { attributes } as any) + } + batchUpdated++ totalUpdated++ log.debug({ memberId: data.id }, 'Member attributes updated successfully') From 074e003426a54954c5da4e4599e09ed8eb5256dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Sat, 4 Oct 2025 15:24:32 +0200 Subject: [PATCH 19/21] chore: fixed small issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../data_sink_worker/src/bin/fix-member-attributes.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts index a6f971779e..eeb697fc35 100644 --- a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts +++ b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts @@ -164,7 +164,7 @@ setImmediate(async () => { const manuallyChangedFields: string[] = data.manuallyChangedFields || [] if (manuallyChangedFields.length > 0) { - log.debug( + log.warn( { memberId: data.id, manuallyChangedFieldsCount: manuallyChangedFields.length, @@ -181,7 +181,7 @@ setImmediate(async () => { ), ] - log.debug( + log.warn( { memberId: data.id, manuallyChangedAttributes, @@ -199,7 +199,7 @@ setImmediate(async () => { if (!isEqual(temp, oldAttributes)) { attributes = temp - log.debug({ memberId: data.id }, 'Attributes changed, will update') + log.info({ memberId: data.id }, 'Attributes changed, will update') } else { log.debug( { memberId: data.id, newAttributes: temp, oldAttributes }, @@ -208,7 +208,7 @@ setImmediate(async () => { } if (attributes) { - log.debug( + log.info( { memberId: data.id, oldAttributes, attributes }, 'Updating member attributes', ) From b2d6dd2bdd944d5c0145375e21efbc673ffac573 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Sat, 4 Oct 2025 15:29:56 +0200 Subject: [PATCH 20/21] chore: fixed small issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../apps/data_sink_worker/src/bin/fix-member-attributes.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts index eeb697fc35..c253c640c0 100644 --- a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts +++ b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts @@ -208,10 +208,7 @@ setImmediate(async () => { } if (attributes) { - log.info( - { memberId: data.id, oldAttributes, attributes }, - 'Updating member attributes', - ) + log.info({ memberId: data.id }, 'Updating member attributes') if (!process.env.TEST_RUN) { await updateMember(pgQx, data.id, { attributes } as any) From 40090765bffda77635b0b0d01d4589f48dd5532d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Sat, 4 Oct 2025 19:26:19 +0200 Subject: [PATCH 21/21] fix: lint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- services/apps/data_sink_worker/src/service/activity.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 16e7df2d58..8cb9958d36 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -1150,7 +1150,7 @@ export default class ActivityService extends LoggerBase { organizations: value.member.organizations, reach: value.member.reach, }, - value.platform, + value.platform as PlatformType, ) .then((memberId) => { // map ids for members