Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Azure Storage SDK to a modern version #629

Merged
merged 21 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4a2eba6
Upgrade Azure Storage SDK to a modern version
RomanIakovlev Dec 13, 2024
0d00eaf
Add back AzureStorageDocStore.count method
RomanIakovlev Dec 17, 2024
13bfce9
Tweak async error handling, apply prettier
RomanIakovlev Dec 17, 2024
7003187
Fix code style issues
RomanIakovlev Dec 17, 2024
305b882
Fix code style issues
RomanIakovlev Dec 17, 2024
a6a56ec
Fix code style issues
RomanIakovlev Dec 17, 2024
e12ba67
Add support for separate service principal credentials for blobs and …
RomanIakovlev Dec 18, 2024
73c02a3
Add missing config values
RomanIakovlev Dec 19, 2024
bebef96
Add more logging around queue message parsing
RomanIakovlev Jan 14, 2025
dad4e0d
Decode Azure queue message before parsing
RomanIakovlev Jan 15, 2025
b038b72
Fix the parameter passing in storage queue updateMessage call
RomanIakovlev Jan 17, 2025
b437f9c
Fix code review comments
RomanIakovlev Feb 5, 2025
6766b84
Ensure messageId is included into message receipt
RomanIakovlev Feb 6, 2025
4c41da0
Add safe XML+HTML codecs to storage queue
RomanIakovlev Feb 6, 2025
a7e714c
Queues can be configued separatly with SPN from harvest azblob
ljones140 Feb 11, 2025
51c4b3e
Merge pull request #633 from clearlydefined/seperate-crawler-queue-fr…
RomanIakovlev Feb 11, 2025
3354e53
Merge branch 'master' into roman/azure_sdk
qtomlinson Feb 11, 2025
5d6eca2
name is not set anymore, options.container is the new place for this
ljones140 Feb 13, 2025
edc8bb2
Fix integer here was breaking dead letter queue writing
ljones140 Feb 13, 2025
4b67c4a
single quotes
ljones140 Feb 13, 2025
19aa5cd
Modify ordered auth selection in azureBlobFactory and azureQueueStore
RomanIakovlev Feb 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions config/cdConfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ const config = require('painless-config')

const cd_azblob = {
connection: config.get('CRAWLER_AZBLOB_CONNECTION_STRING'),
container: config.get('CRAWLER_AZBLOB_CONTAINER_NAME')
container: config.get('CRAWLER_AZBLOB_CONTAINER_NAME'),
account: config.get('CRAWLER_AZBLOB_ACCOUNT_NAME'),
spnAuth: config.get('CRAWLER_AZBLOB_SPN_AUTH')
}

const githubToken = config.get('CRAWLER_GITHUB_TOKEN')
Expand Down Expand Up @@ -111,7 +113,9 @@ module.exports = {
},
azqueue: {
connectionString: cd_azblob.connection,
queueName: config.get('CRAWLER_HARVESTS_QUEUE_NAME') || 'harvests'
account: cd_azblob.account,
queueName: config.get('CRAWLER_HARVESTS_QUEUE_NAME') || 'harvests',
spnAuth: config.get('CRAWLER_HARVESTS_QUEUE_SPN_AUTH')
},
'cd(azblob)': cd_azblob,
'cd(file)': cd_file
Expand All @@ -135,7 +139,9 @@ module.exports = {
maxDequeueCount: 5,
attenuation: {
ttl: 3000
}
},
spnAuth: config.get('CRAWLER_HARVESTS_QUEUE_SPN_AUTH'),
account: cd_azblob.account
},
appVersion: config.get('APP_VERSION'),
buildsha: config.get('BUILD_SHA')
Expand Down
167 changes: 75 additions & 92 deletions ghcrawler/providers/queuing/storageQueue.js
Original file line number Diff line number Diff line change
@@ -1,50 +1,46 @@
// Copyright (c) Microsoft Corporation and others. Made available under the MIT license.
// SPDX-License-Identifier: MIT

// eslint-disable-next-line no-unused-vars
const { QueueServiceClient } = require('@azure/storage-queue')
const qlimit = require('qlimit')
const { cloneDeep } = require('lodash')

class StorageQueue {
/**
* @param {QueueServiceClient} client
* @param {string} name
* @param {string} queueName
* @param {object} formatter
* @param {object} options
*/
constructor(client, name, queueName, formatter, options) {
this.client = client
this.name = name
this.queueName = queueName
this.messageFormatter = formatter
this.options = options
this.logger = options.logger
this.queueClient = client.getQueueClient(this.queueName)
}

async subscribe() {
return new Promise((resolve, reject) => {
this.client.createQueueIfNotExists(this.queueName, error => {
if (error) {
return reject(error)
}
this.logger.info(`Subscribed to ${this.queueName} using Queue Storage`)
resolve()
})
})
await this.queueClient.createIfNotExists()
this.logger.info(`Subscribed to ${this.queueName} using Queue Storage`)
}

async unsubscribe() {
return
// No specific unsubscribe logic for Azure Queue Storage
}

async push(requests, option) {
async push(requests) {
requests = Array.isArray(requests) ? requests : [requests]
return Promise.all(
requests.map(
qlimit(this.options.parallelPush || 1)(request => {
qlimit(this.options.parallelPush || 1)(async request => {
const body = JSON.stringify(request)
return new Promise((resolve, reject) => {
this.client.createMessage(this.queueName, body, option, (error, queueMessageResult) => {
if (error) {
return reject(error)
}
this._log('Queued', request)
resolve(this._buildMessageReceipt(queueMessageResult, request))
})
})
const queueMessageResult = await this.queueClient.sendMessage(body)
this._log('Queued', request)
return this._buildMessageReceipt(queueMessageResult, request)
})
)
)
Expand All @@ -56,47 +52,51 @@ class StorageQueue {
}

async pop() {
const msgOptions = { numOfMessages: 1, visibilityTimeout: this.options.visibilityTimeout || 60 * 60 }
return new Promise((resolve, reject) => {
this.client.getMessages(this.queueName, msgOptions, (error, result) => {
if (error) {
return reject(error)
}
const message = result[0]
if (!message) {
this.logger.verbose('No messages to receive')
return resolve(null)
}
if (this.options.maxDequeueCount && message.dequeueCount > this.options.maxDequeueCount) {
this.logger.verbose('maxDequeueCount exceeded')
this.client.deleteMessage(this.queueName, message.messageId, message.popReceipt, error => {
if (error) return reject(error)
resolve(null)
})
} else {
message.body = JSON.parse(message.messageText)
const request = this.messageFormatter(message)
request._message = message
this._log('Popped', message.body)
resolve(request)
}
})
})
const msgOptions = { numberOfMessages: 1, visibilityTimeout: this.options.visibilityTimeout || 60 * 60 }
const response = await this.queueClient.receiveMessages(msgOptions)
const message = response.receivedMessageItems[0]
if (!message) {
this.logger.verbose('No messages to receive')
return null
}
if (this.options.maxDequeueCount && message.dequeueCount > this.options.maxDequeueCount) {
this.logger.verbose('maxDequeueCount exceeded')
try {
await this.queueClient.deleteMessage(message.messageId, message.popReceipt)
} catch (error) {
this.logger.error(`Failed to delete message ${message.messageId} in storageQueue, error: ${error.message}`)
throw error
}
return null
} else {
try {
const decodedText = message.messageText
.replace(/"/g, '"')
.replace(/&/g, '&')
.replace(/'/g, "'")
.replace(/&lt;/g, '<')
.replace(/&gt;/g, '>')
message.body = JSON.parse(decodedText)
} catch (error) {
this.logger.error(`Failed to parse message ${message.messageId}:`)
this.logger.error(`Raw message: ${message.messageText}`)
this.logger.error(`Parse error: ${error.message}`)
await this.queueClient.deleteMessage(message.messageId, message.popReceipt)
return null
}
const request = this.messageFormatter(message)
request._message = message
this._log('Popped', message.body)
return request
}
}

async done(request) {
if (!request || !request._message) {
return
}
return new Promise((resolve, reject) => {
this.client.deleteMessage(this.queueName, request._message.messageId, request._message.popReceipt, error => {
if (error) {
return reject(error)
}
this._log('ACKed', request._message.body)
resolve()
})
})
await this.queueClient.deleteMessage(request._message.messageId, request._message.popReceipt)
this._log('ACKed', request._message.body)
}

async defer(request) {
Expand All @@ -110,47 +110,30 @@ class StorageQueue {
await this.updateVisibilityTimeout(request)
}

updateVisibilityTimeout(request, visibilityTimeout = 0) {
return new Promise((resolve, reject) => {
// visibilityTimeout is updated to 0 to unlock/unlease the message
this.client.updateMessage(
this.queueName,
request._message.messageId,
request._message.popReceipt,
visibilityTimeout,
(error, result) => {
if (error) {
return reject(error)
}
this._log('NAKed', request._message.body)
resolve(this._buildMessageReceipt(result, request._message.body))
}
)
})
async updateVisibilityTimeout(request, visibilityTimeout = 0) {
const response = await this.queueClient.updateMessage(
request._message.messageId,
request._message.popReceipt,
undefined,
visibilityTimeout
)
this._log('NAKed', request._message.body)
return this._buildMessageReceipt(response, request)
}

async flush() {
return new Promise((resolve, reject) => {
this.client.deleteQueue(this.queueName, error => {
if (error) return reject(error)
this.client.createQueueIfNotExists(this.queueName, error => {
if (error) return reject(error)
resolve()
})
})
})
await this.queueClient.clearMessages()
this.logger.info(`Flushed all messages from ${this.queueName}`)
}

async getInfo() {
return new Promise(resolve => {
this.client.getQueueMetadata(this.queueName, (result, error) => {
if (error) {
this.logger.error(error)
resolve(null)
}
resolve({ count: result[0].approximateMessageCount })
})
})
try {
const properties = await this.queueClient.getProperties()
return { count: properties.approximateMessagesCount }
} catch (error) {
this.logger.error(error)
return null
}
}

getName() {
Expand Down
29 changes: 25 additions & 4 deletions ghcrawler/providers/queuing/storageQueueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,35 @@
// SPDX-License-Identifier: MIT

const AttenuatedQueue = require('./attenuatedQueue')
const AzureStorage = require('azure-storage')
const { QueueServiceClient, StorageRetryPolicyType } = require('@azure/storage-queue')
const Request = require('../../lib/request')
const StorageQueue = require('./storageQueue')
const { DefaultAzureCredential, ClientSecretCredential } = require('@azure/identity')

class StorageQueueManager {
constructor(connectionString) {
const retryOperations = new AzureStorage.ExponentialRetryPolicyFilter()
this.client = AzureStorage.createQueueService(connectionString).withFilter(retryOperations)
constructor(connectionString, options) {
const pipelineOptions = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the use of pipelineOptions. Makes it easier to see the configs related to the queues.

retryOptions: {
maxTries: 3,
retryDelayInMs: 1000,
maxRetryDelayInMs: 120 * 1000,
tryTimeoutInMs: 30000,
retryPolicyType: StorageRetryPolicyType.EXPONENTIAL
}
}
if (connectionString) {
this.client = QueueServiceClient.fromConnectionString(connectionString, pipelineOptions)
} else {
const { account, spnAuth } = options
let credential
if (spnAuth) {
const authParsed = JSON.parse(spnAuth)
credential = new ClientSecretCredential(authParsed.tenantId, authParsed.clientId, authParsed.clientSecret)
} else {
credential = new DefaultAzureCredential()
}
this.client = new QueueServiceClient(`https://${account}.queue.core.windows.net`, credential, pipelineOptions)
}
}

createQueueClient(name, formatter, options) {
Expand Down
50 changes: 43 additions & 7 deletions ghcrawler/providers/storage/azureBlobFactory.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,51 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT

const AzureStorage = require('azure-storage')
// @ts-check
const { BlobServiceClient, StorageRetryPolicyType } = require('@azure/storage-blob')
const AzureStorageDocStore = require('./storageDocStore')
const { DefaultAzureCredential, ClientSecretCredential } = require('@azure/identity')

/**
* @param {object} options
* @param {string} options.account
* @param {string} options.connection
* @param {string} options.container
* @param {object} options.logger
* @param {object} options.spnAuth
*/
module.exports = options => {
options.logger.info('creating azure storage store')
const { account, key, connection, container } = options
const retryOperations = new AzureStorage.ExponentialRetryPolicyFilter()
const blobService = connection
? AzureStorage.createBlobService(connection).withFilter(retryOperations)
: AzureStorage.createBlobService(account, key).withFilter(retryOperations)
return new AzureStorageDocStore(blobService, container, options)
const { account, connection, container, spnAuth } = options

const pipelineOptions = {
retryOptions: {
maxTries: 3,
retryDelayInMs: 1000,
maxRetryDelayInMs: 120 * 1000,
tryTimeoutInMs: 30000,
retryPolicyType: StorageRetryPolicyType.EXPONENTIAL
}
}

let blobServiceClient
if (connection) {
options.logger.info('using connection string')
blobServiceClient = BlobServiceClient.fromConnectionString(connection, pipelineOptions)
} else {
let credential
if (spnAuth) {
const authParsed = JSON.parse(spnAuth)
credential = new ClientSecretCredential(authParsed.tenantId, authParsed.clientId, authParsed.clientSecret)
options.logger.info('using service principal credentials')
} else {
credential = new DefaultAzureCredential()
options.logger.info('using default credentials')
}
blobServiceClient = new BlobServiceClient(`https://${account}.blob.core.windows.net`, credential, pipelineOptions)
}

const containerClient = blobServiceClient.getContainerClient(container)

return new AzureStorageDocStore(containerClient, options)
}
Loading