Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 291 additions & 0 deletions services/apps/data_sink_worker/src/bin/fix-member-attributes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
import isEqual from 'lodash.isequal'
import mergeWith from 'lodash.mergewith'

import { connQx, updateMember } from '@crowd/data-access-layer'
import {
DbConnOrTx,
DbStore,
WRITE_DB_CONFIG,
getDbConnection,
} from '@crowd/data-access-layer/src/database'
import { getServiceLogger } from '@crowd/logging'
import { REDIS_CONFIG, getRedisClient } from '@crowd/redis'

import MemberAttributeService from '../service/memberAttribute.service'

/* eslint-disable @typescript-eslint/no-explicit-any */

const BATCH_SIZE = process.env.TEST_RUN ? 1 : 5000

const log = getServiceLogger()

async function getMemberIds(
db: DbConnOrTx,
lastId?: string,
): Promise<{ id: string; attributes: any; manuallyChangedFields: any }[]> {
try {
log.debug({ lastId }, 'Querying for members with attribute issues')

const results = await db.any(
`
with relevant_members as (with member_with_attributes as (select id,
"createdAt",
jsonb_object_keys(attributes) as attr_key,
attributes -> jsonb_object_keys(attributes) as attr_value
from members
where "deletedAt" is null
and attributes is not null
and attributes != 'null'::jsonb
and attributes != '{}'::jsonb)
select distinct id
from member_with_attributes
where jsonb_typeof(attr_value) = 'object'
and coalesce(attr_value ->> 'default', '') = ''
and exists (select 1
from jsonb_each_text(attr_value) as kv
where kv.key != 'default'
and coalesce(kv.value, '') != '')
${lastId ? `and id < '${lastId}'` : ''}
order by id desc
limit ${BATCH_SIZE})
select m.id, m.attributes
from members m
inner join
relevant_members rm on rm.id = m.id;
`,
{ lastId },
)

log.debug(
{
resultCount: results.length,
lastId,
firstId: results.length > 0 ? results[0].id : null,
lastResultId: results.length > 0 ? results[results.length - 1].id : null,
},
'Query completed',
)

return results
} catch (error) {
log.error(
{
error: error.message,
lastId,
stack: error.stack,
},
'Failed to query member IDs',
)
throw error
}
}

setImmediate(async () => {
let dbClient: DbConnOrTx | undefined
let redisClient: any | undefined

try {
log.info('Starting member attributes fix script')

// Initialize connections
log.info('Connecting to database...')
dbClient = await getDbConnection(WRITE_DB_CONFIG())
log.info('Database connection established')

log.info('Connecting to Redis...')
redisClient = await getRedisClient(REDIS_CONFIG())
log.info('Redis connection established')

const pgQx = connQx(dbClient)
const mas = new MemberAttributeService(redisClient, new DbStore(log, dbClient), log)

let totalProcessed = 0
let totalUpdated = 0
let batchNumber = 1

log.info('Starting to process members with attribute issues')
let membersToFix = await getMemberIds(dbClient)
log.info({ count: membersToFix.length }, 'Found members to process in first batch')

while (membersToFix.length > 0) {
log.info({ batchNumber, batchSize: membersToFix.length }, 'Processing batch')
let batchUpdated = 0

for (const data of membersToFix) {
try {
if (data.attributes) {
log.debug(
{ memberId: data.id, oldAttributes: data.attributes },
'Processing member attributes',
)

// check if any has default empty but other are full
let toProcess = false
for (const attName of Object.keys(data.attributes)) {
const defValue = data.attributes[attName].default

if (defValue === undefined || defValue === null || String(defValue) === '') {
log.debug(
{
memberId: data.id,
attribute: data.attributes[attName],
attName,
defValue: defValue ? String(defValue) : 'undefined',
},
'Attribute has default empty',
)
for (const platform of Object.keys(data.attributes[attName]).filter(
(p) => p !== 'default',
)) {
const value = data.attributes[attName][platform]

if (value !== undefined && value !== null && String(value) !== '') {
log.debug(
{ memberId: data.id, attName, platform, value },
'Found value for attribute',
)
toProcess = true
break
}
}

if (toProcess) {
break
}
}
}

if (toProcess) {
const oldAttributes = JSON.parse(JSON.stringify(data.attributes)) // Deep copy
data.attributes = await mas.setAttributesDefaultValues(data.attributes)

let attributes: Record<string, unknown> | undefined
const temp = mergeWith({}, oldAttributes, data.attributes)
const manuallyChangedFields: string[] = data.manuallyChangedFields || []

if (manuallyChangedFields.length > 0) {
log.warn(
{
memberId: data.id,
manuallyChangedFieldsCount: manuallyChangedFields.length,
},
'Member has manually changed fields',
)

const prefix = 'attributes.'
const manuallyChangedAttributes = [
...new Set(
manuallyChangedFields
.filter((f) => f.startsWith(prefix))
.map((f) => f.slice(prefix.length)),
),
]

log.warn(
{
memberId: data.id,
manuallyChangedAttributes,
},
'Preserving manually changed attributes',
)

// Preserve manually changed attributes
for (const key of manuallyChangedAttributes) {
if (oldAttributes?.[key] !== undefined) {
temp[key] = oldAttributes[key] // Fixed: removed .attributes
}
}
}

if (!isEqual(temp, oldAttributes)) {
attributes = temp
log.info({ memberId: data.id }, 'Attributes changed, will update')
} else {
log.debug(
{ memberId: data.id, newAttributes: temp, oldAttributes },
'No changes needed for attributes',
)
}

if (attributes) {
log.info({ memberId: data.id }, 'Updating member attributes')

if (!process.env.TEST_RUN) {
await updateMember(pgQx, data.id, { attributes } as any)
}

batchUpdated++
totalUpdated++
log.debug({ memberId: data.id }, 'Member attributes updated successfully')
}
} else {
log.debug(
{ memberId: data.id, attributes: data.attributes },
'No changes needed for attributes',
)
}
} else {
log.debug({ memberId: data.id }, 'Member has no attributes to process')
}

totalProcessed++
} catch (error) {
log.error(
{
error: error.message,
memberId: data.id,
stack: error.stack,
},
'Failed to process member',
)
// Continue processing other members
}
}

log.info(
{
batchNumber,
batchProcessed: membersToFix.length,
batchUpdated,
totalProcessed,
totalUpdated,
},
'Completed batch processing',
)

// Get next batch
const lastId = membersToFix[membersToFix.length - 1].id
log.debug({ lastId }, 'Fetching next batch starting from last ID')

if (process.env.TEST_RUN) {
break
}

membersToFix = await getMemberIds(dbClient, lastId)
log.info({ count: membersToFix.length }, 'Found members for next batch')

batchNumber++
}

log.info(
{
totalProcessed,
totalUpdated,
totalBatches: batchNumber - 1,
},
'Member attributes fix completed successfully',
)
} catch (error) {
log.error(
{
error: error.message,
stack: error.stack,
},
'Fatal error in member attributes fix script',
)
process.exit(1)
} finally {
log.info('Script execution completed')
process.exit(0)
}
})
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,7 @@ export default class ActivityService extends LoggerBase {
organizations: value.member.organizations,
reach: value.member.reach,
},
value.platform,
value.platform as PlatformType,
)
.then((memberId) => {
// map ids for members
Expand Down
12 changes: 6 additions & 6 deletions services/apps/data_sink_worker/src/service/member.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export default class MemberService extends LoggerBase {
segmentIds: string[],
integrationId: string,
data: IMemberCreateData,
source: string,
platform: PlatformType,
releaseMemberLock?: () => Promise<void>,
): Promise<string> {
return logExecutionTimeV2(
Expand All @@ -91,7 +91,7 @@ export default class MemberService extends LoggerBase {
let attributes: Record<string, unknown> = {}
if (data.attributes) {
attributes = await logExecutionTimeV2(
() => memberAttributeService.validateAttributes(data.attributes),
() => memberAttributeService.validateAttributes(platform, data.attributes),
this.log,
'memberService -> create -> validateAttributes',
)
Expand Down Expand Up @@ -192,7 +192,7 @@ export default class MemberService extends LoggerBase {
if (data.organizations) {
for (const org of data.organizations) {
const id = await logExecutionTimeV2(
() => orgService.findOrCreate(source, integrationId, org),
() => orgService.findOrCreate(platform, integrationId, org),
this.log,
'memberService -> create -> findOrCreateOrg',
)
Expand Down Expand Up @@ -267,7 +267,7 @@ export default class MemberService extends LoggerBase {
data: IMemberUpdateData,
original: IDbMember,
originalIdentities: IMemberIdentity[],
source: string,
platform: PlatformType,
releaseMemberLock?: () => Promise<void>,
): Promise<void> {
await logExecutionTimeV2(
Expand All @@ -284,7 +284,7 @@ export default class MemberService extends LoggerBase {
if (data.attributes) {
this.log.trace({ memberId: id }, 'Validating member attributes!')
data.attributes = await logExecutionTimeV2(
() => memberAttributeService.validateAttributes(data.attributes),
() => memberAttributeService.validateAttributes(platform, data.attributes),
this.log,
'memberService -> update -> validateAttributes',
)
Expand Down Expand Up @@ -404,7 +404,7 @@ export default class MemberService extends LoggerBase {
this.log.trace({ memberId: id }, 'Finding or creating organization!')

const orgId = await logExecutionTimeV2(
() => orgService.findOrCreate(source, integrationId, org),
() => orgService.findOrCreate(platform, integrationId, org),
this.log,
'memberService -> update -> findOrCreateOrg',
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
import { dbStoreQx } from '@crowd/data-access-layer/src/queryExecutor'
import { Logger, LoggerBase } from '@crowd/logging'
import { RedisClient } from '@crowd/redis'
import { MemberAttributeType } from '@crowd/types'
import { MemberAttributeType, PlatformType } from '@crowd/types'

export default class MemberAttributeService extends LoggerBase {
constructor(
Expand All @@ -26,6 +26,7 @@ export default class MemberAttributeService extends LoggerBase {
}

public async validateAttributes(
platform: PlatformType,
attributes: Record<string, unknown>,
): Promise<Record<string, unknown>> {
const settings = await getMemberAttributeSettings(dbStoreQx(this.store), this.redis)
Expand All @@ -45,7 +46,7 @@ export default class MemberAttributeService extends LoggerBase {
}
if (typeof attributes[attributeName] !== 'object') {
attributes[attributeName] = {
custom: attributes[attributeName],
[platform]: attributes[attributeName],
}
}

Expand Down
8 changes: 7 additions & 1 deletion services/libs/common/src/member.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@ export async function setAttributesDefaultValues(
throw err
}
}

const nonEmptyPlatform = Object.keys(attributes[attributeName]).filter((p) => {
const value = attributes[attributeName][p]
return value !== undefined && value !== null && String(value).trim().length > 0
})

const highestPriorityPlatform = getHighestPriorityPlatformForAttributes(
Object.keys(attributes[attributeName]),
nonEmptyPlatform,
priorities,
)

Expand Down
Loading