Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Azure Storage SDK to a modern version #629

Merged
merged 21 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4a2eba6
Upgrade Azure Storage SDK to a modern version
RomanIakovlev Dec 13, 2024
0d00eaf
Add back AzureStorageDocStore.count method
RomanIakovlev Dec 17, 2024
13bfce9
Tweak async error handling, apply prettier
RomanIakovlev Dec 17, 2024
7003187
Fix code style issues
RomanIakovlev Dec 17, 2024
305b882
Fix code style issues
RomanIakovlev Dec 17, 2024
a6a56ec
Fix code style issues
RomanIakovlev Dec 17, 2024
e12ba67
Add support for separate service principal credentials for blobs and …
RomanIakovlev Dec 18, 2024
73c02a3
Add missing config values
RomanIakovlev Dec 19, 2024
bebef96
Add more logging around queue message parsing
RomanIakovlev Jan 14, 2025
dad4e0d
Decode Azure queue message before parsing
RomanIakovlev Jan 15, 2025
b038b72
Fix the parameter passing in storage queue updateMessage call
RomanIakovlev Jan 17, 2025
b437f9c
Fix code review comments
RomanIakovlev Feb 5, 2025
6766b84
Ensure messageId is included into message receipt
RomanIakovlev Feb 6, 2025
4c41da0
Add safe XML+HTML codecs to storage queue
RomanIakovlev Feb 6, 2025
a7e714c
Queues can be configued separatly with SPN from harvest azblob
ljones140 Feb 11, 2025
51c4b3e
Merge pull request #633 from clearlydefined/seperate-crawler-queue-fr…
RomanIakovlev Feb 11, 2025
3354e53
Merge branch 'master' into roman/azure_sdk
qtomlinson Feb 11, 2025
5d6eca2
name is not set anymore, options.container is the new place for this
ljones140 Feb 13, 2025
edc8bb2
Fix integer here was breaking dead letter queue writing
ljones140 Feb 13, 2025
4b67c4a
single quotes
ljones140 Feb 13, 2025
19aa5cd
Modify ordered auth selection in azureBlobFactory and azureQueueStore
RomanIakovlev Feb 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions config/cdConfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ const config = require('painless-config')

const cd_azblob = {
connection: config.get('CRAWLER_AZBLOB_CONNECTION_STRING'),
container: config.get('CRAWLER_AZBLOB_CONTAINER_NAME')
container: config.get('CRAWLER_AZBLOB_CONTAINER_NAME'),
account: config.get('CRAWLER_AZBLOB_ACCOUNT_NAME'),
spnAuth: config.get('CRAWLER_AZBLOB_SPN_AUTH'),
isSpnAuth: config.get('CRAWLER_AZBLOB_IS_SPN_AUTH') || false
}

const githubToken = config.get('CRAWLER_GITHUB_TOKEN')
Expand Down Expand Up @@ -111,7 +114,10 @@ module.exports = {
},
azqueue: {
connectionString: cd_azblob.connection,
queueName: config.get('CRAWLER_HARVESTS_QUEUE_NAME') || 'harvests'
account: cd_azblob.account,
queueName: config.get('CRAWLER_HARVESTS_QUEUE_NAME') || 'harvests',
spnAuth: config.get('CRAWLER_HARVESTS_QUEUE_SPN_AUTH'),
isSpnAuth: config.get('CRAWLER_HARVESTS_QUEUE_IS_SPN_AUTH') || false
},
'cd(azblob)': cd_azblob,
'cd(file)': cd_file
Expand All @@ -135,7 +141,10 @@ module.exports = {
maxDequeueCount: 5,
attenuation: {
ttl: 3000
}
},
spnAuth: config.get('CRAWLER_QUEUE_AZURE_SPN_AUTH') || cd_azblob.spnAuth,
account: config.get('CRAWLER_QUEUE_AZURE_ACCOUNT_NAME') || cd_azblob.account,
isSpnAuth: config.get('CRAWLER_QUEUE_AZURE_IS_SPN_AUTH') || false
},
appVersion: config.get('APP_VERSION'),
buildsha: config.get('BUILD_SHA')
Expand Down
2 changes: 1 addition & 1 deletion ghcrawler/lib/crawler.js
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ class Crawler {
metadata.errorMessage = request._error.message
metadata.errorStack = request._error.stack
}
metadata.version = 1
metadata.version = '1'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be updated to '1.1' as we are changing the file format for the deadletter here?

  • Deadletters are internal exception information. The change here looks acceptable to me.
  • Comparing the metadata structure of a deadletter and a harvested result (both are persisted documents in storage), there is no metadata.version in recent harvest results. Older harvest results prior to 2019 have metadata.version as a string. In the future, we may want to align with the metadata in harvested results and consider using schemaVersion instead.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ljones140 we can merge this as is and discuss on the version upgrade during our regular sync?

metadata.meta = request.meta
metadata.type = 'deadletter'
metadata.url = request.url.replace('//', '//deadletter.')
Expand Down
192 changes: 100 additions & 92 deletions ghcrawler/providers/queuing/storageQueue.js
Original file line number Diff line number Diff line change
@@ -1,50 +1,47 @@
// Copyright (c) Microsoft Corporation and others. Made available under the MIT license.
// SPDX-License-Identifier: MIT

// eslint-disable-next-line no-unused-vars
const { QueueServiceClient } = require('@azure/storage-queue')
const qlimit = require('qlimit')
const { cloneDeep } = require('lodash')

class StorageQueue {
/**
* @param {QueueServiceClient} client
* @param {string} name
* @param {string} queueName
* @param {object} formatter
* @param {object} options
*/
constructor(client, name, queueName, formatter, options) {
this.client = client
this.name = name
this.queueName = queueName
this.messageFormatter = formatter
this.options = options
this.logger = options.logger
this.queueClient = client.getQueueClient(this.queueName)
}

async subscribe() {
return new Promise((resolve, reject) => {
this.client.createQueueIfNotExists(this.queueName, error => {
if (error) {
return reject(error)
}
this.logger.info(`Subscribed to ${this.queueName} using Queue Storage`)
resolve()
})
})
await this.queueClient.createIfNotExists()
this.logger.info(`Subscribed to ${this.queueName} using Queue Storage`)
}

async unsubscribe() {
return
// No specific unsubscribe logic for Azure Queue Storage
}

async push(requests, option) {
async push(requests) {
requests = Array.isArray(requests) ? requests : [requests]
return Promise.all(
requests.map(
qlimit(this.options.parallelPush || 1)(request => {
qlimit(this.options.parallelPush || 1)(async request => {
const body = JSON.stringify(request)
return new Promise((resolve, reject) => {
this.client.createMessage(this.queueName, body, option, (error, queueMessageResult) => {
if (error) {
return reject(error)
}
this._log('Queued', request)
resolve(this._buildMessageReceipt(queueMessageResult, request))
})
})
const encoded = this._encodeXMLSafe(body)
const queueMessageResult = await this.queueClient.sendMessage(encoded)
this._log('Queued', request)
return this._buildMessageReceipt(queueMessageResult, request)
})
)
)
Expand All @@ -56,47 +53,46 @@ class StorageQueue {
}

async pop() {
const msgOptions = { numOfMessages: 1, visibilityTimeout: this.options.visibilityTimeout || 60 * 60 }
return new Promise((resolve, reject) => {
this.client.getMessages(this.queueName, msgOptions, (error, result) => {
if (error) {
return reject(error)
}
const message = result[0]
if (!message) {
this.logger.verbose('No messages to receive')
return resolve(null)
}
if (this.options.maxDequeueCount && message.dequeueCount > this.options.maxDequeueCount) {
this.logger.verbose('maxDequeueCount exceeded')
this.client.deleteMessage(this.queueName, message.messageId, message.popReceipt, error => {
if (error) return reject(error)
resolve(null)
})
} else {
message.body = JSON.parse(message.messageText)
const request = this.messageFormatter(message)
request._message = message
this._log('Popped', message.body)
resolve(request)
}
})
})
const msgOptions = { numberOfMessages: 1, visibilityTimeout: this.options.visibilityTimeout || 60 * 60 }
const response = await this.queueClient.receiveMessages(msgOptions)
const message = response.receivedMessageItems[0]
if (!message) {
this.logger.verbose('No messages to receive')
return null
}
if (this.options.maxDequeueCount && message.dequeueCount > this.options.maxDequeueCount) {
this.logger.verbose('maxDequeueCount exceeded')
try {
await this.queueClient.deleteMessage(message.messageId, message.popReceipt)
} catch (error) {
this.logger.error(`Failed to delete message ${message.messageId} in storageQueue, error: ${error.message}`)
throw error
}
return null
} else {
try {
const decodedText = this._decodeXMLSafe(message.messageText)
message.body = JSON.parse(decodedText)
} catch (error) {
this.logger.error(`Failed to parse message ${message.messageId}:`)
this.logger.error(`Raw message: ${message.messageText}`)
this.logger.error(`Parse error: ${error.message}`)
await this.queueClient.deleteMessage(message.messageId, message.popReceipt)
return null
}
const request = this.messageFormatter(message)
request._message = message
this._log('Popped', message.body)
return request
}
}

async done(request) {
if (!request || !request._message) {
return
}
return new Promise((resolve, reject) => {
this.client.deleteMessage(this.queueName, request._message.messageId, request._message.popReceipt, error => {
if (error) {
return reject(error)
}
this._log('ACKed', request._message.body)
resolve()
})
})
await this.queueClient.deleteMessage(request._message.messageId, request._message.popReceipt)
this._log('ACKed', request._message.body)
}

async defer(request) {
Expand All @@ -110,47 +106,30 @@ class StorageQueue {
await this.updateVisibilityTimeout(request)
}

updateVisibilityTimeout(request, visibilityTimeout = 0) {
return new Promise((resolve, reject) => {
// visibilityTimeout is updated to 0 to unlock/unlease the message
this.client.updateMessage(
this.queueName,
request._message.messageId,
request._message.popReceipt,
visibilityTimeout,
(error, result) => {
if (error) {
return reject(error)
}
this._log('NAKed', request._message.body)
resolve(this._buildMessageReceipt(result, request._message.body))
}
)
})
async updateVisibilityTimeout(request, visibilityTimeout = 0) {
const response = await this.queueClient.updateMessage(
request._message.messageId,
request._message.popReceipt,
undefined,
visibilityTimeout
)
this._log('NAKed', request._message.body)
return this._buildMessageReceipt({ messageId: request._message.messageId, ...response }, request)
}

async flush() {
return new Promise((resolve, reject) => {
this.client.deleteQueue(this.queueName, error => {
if (error) return reject(error)
this.client.createQueueIfNotExists(this.queueName, error => {
if (error) return reject(error)
resolve()
})
})
})
await this.queueClient.clearMessages()
this.logger.info(`Flushed all messages from ${this.queueName}`)
}

async getInfo() {
return new Promise(resolve => {
this.client.getQueueMetadata(this.queueName, (result, error) => {
if (error) {
this.logger.error(error)
resolve(null)
}
resolve({ count: result[0].approximateMessageCount })
})
})
try {
const properties = await this.queueClient.getProperties()
return { count: properties.approximateMessagesCount }
} catch (error) {
this.logger.error(error)
return null
}
}

getName() {
Expand All @@ -164,6 +143,35 @@ class StorageQueue {
isMessageNotFound(error) {
return error?.code === 'MessageNotFound'
}

_encodeXMLSafe(text) {
if (typeof text !== 'string') return text

return (
text
// Handle & first to prevent double-encoding
.replace(/&/g, '&')
.replace(/"/g, '"')
.replace(/'/g, ''')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
)
}

_decodeXMLSafe(text) {
if (typeof text !== 'string') return text

return (
text
// Handle both XML and HTML encodings for quotes and apostrophes
.replace(/&apos;|&#39;|&#x27;/g, "'")
.replace(/&quot;|&#34;|&#x22;/g, '"')
// Handle basic XML entities
.replace(/&lt;|&#60;|&#x3[Cc];/g, '<')
.replace(/&gt;|&#62;|&#x3[Ee];/g, '>')
.replace(/&amp;|&#38;|&#x26;/g, '&') // Must be after other & entities
)
}
}

module.exports = StorageQueue
41 changes: 37 additions & 4 deletions ghcrawler/providers/queuing/storageQueueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,47 @@
// SPDX-License-Identifier: MIT

const AttenuatedQueue = require('./attenuatedQueue')
const AzureStorage = require('azure-storage')
const { QueueServiceClient, StorageRetryPolicyType } = require('@azure/storage-queue')
const Request = require('../../lib/request')
const StorageQueue = require('./storageQueue')
const { DefaultAzureCredential, ClientSecretCredential } = require('@azure/identity')

class StorageQueueManager {
constructor(connectionString) {
const retryOperations = new AzureStorage.ExponentialRetryPolicyFilter()
this.client = AzureStorage.createQueueService(connectionString).withFilter(retryOperations)
constructor(connectionString, options) {
const pipelineOptions = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the use of pipelineOptions. Makes it easier to see the configs related to the queues.

retryOptions: {
maxTries: 3,
retryDelayInMs: 1000,
maxRetryDelayInMs: 120 * 1000,
tryTimeoutInMs: 30000,
retryPolicyType: StorageRetryPolicyType.EXPONENTIAL
}
}

const { account, spnAuth, isSpnAuth } = options
if (isSpnAuth) {
options.logger.info('using service principal credentials in storageQueueManager')
const authParsed = JSON.parse(spnAuth)
this.client = new QueueServiceClient(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice update here for maintaining the old approach and setting up a more flexible long term approach.

`https://${account}.queue.core.windows.net`,
new ClientSecretCredential(authParsed.tenantId, authParsed.clientId, authParsed.clientSecret),
pipelineOptions
)
return
}

if (connectionString) {
options.logger.info('using connection string in storageQueueManager')
this.client = QueueServiceClient.fromConnectionString(connectionString, pipelineOptions)
return
}

options.logger.info('using default credentials in storageQueueManager')
this.client = new QueueServiceClient(
`https://${account}.queue.core.windows.net`,
new DefaultAzureCredential(),
pipelineOptions
)
}

createQueueClient(name, formatter, options) {
Expand Down
Loading
Loading