Skip to content

Commit

Permalink
chore: pdf concurrency processing
Browse files Browse the repository at this point in the history
  • Loading branch information
arshad-yaseen committed Nov 5, 2024
1 parent 5638fde commit ff694ed
Show file tree
Hide file tree
Showing 14 changed files with 293 additions and 201 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"@release-it/conventional-changelog": "^8.0.2",
"@typescript-eslint/eslint-plugin": "^7.3.1",
"eslint": "^8.57.0",
"groq-sdk": "^0.7.0",
"husky": "^9.1.6",
"openai": "^4.60.1",
"prettier": "^3.2.5",
Expand Down
42 changes: 42 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 0 additions & 9 deletions src/constants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,6 @@ import {Provider} from '../types';
*/
export const SUPPORTED_PROVIDERS = ['openai'] as const;

/**
* Supported image MIME types.
*/
export const SUPPORTED_IMAGE_TYPES = [
'image/jpeg',
'image/png',
'image/webp',
] as const;

/**
* Maximum number of concurrent API requests.
*/
Expand Down
65 changes: 45 additions & 20 deletions src/processors/image.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {promises as fs} from 'fs';

import {_pm, report} from '../logger';
import {ImageResult, InputSource, Provider} from '../types';
import {callLLM} from '../utils/call-llm';
import {removeCodeBlockMarkers} from '../utils/string';
Expand All @@ -15,29 +16,53 @@ export async function processImage(
provider: Provider,
apiKey: string,
): Promise<ImageResult> {
let imageBuffer: Buffer;
try {
let imageBuffer: Buffer;

if (typeof input === 'string') {
if (input.startsWith('http')) {
const response = await fetch(input);
imageBuffer = Buffer.from(await response.arrayBuffer());
} else if (input.startsWith('data:image')) {
imageBuffer = Buffer.from(input.split(',')[1], 'base64');
if (typeof input === 'string') {
try {
if (input.startsWith('http')) {
const response = await fetch(input);
if (!response.ok) {
throw new Error(
`Failed to fetch image: ${response.status} ${response.statusText}`,
);
}
imageBuffer = Buffer.from(await response.arrayBuffer());
} else if (input.startsWith('data:image')) {
imageBuffer = Buffer.from(input.split(',')[1], 'base64');
} else {
imageBuffer = await fs.readFile(input);
}
} catch (error: unknown) {
throw new Error(`Failed to read image input: ${_pm(error)}`);
}
} else {
imageBuffer = await fs.readFile(input);
imageBuffer = input;
}

if (!imageBuffer?.length) {
throw new Error('Empty or invalid image buffer');
}
} else {
imageBuffer = input;
}

const base64Image = imageBuffer.toString('base64');
const content = await callLLM(apiKey, base64Image, provider);
const base64Image = imageBuffer.toString('base64');
const content = await callLLM(apiKey, base64Image, provider);

return {
content: removeCodeBlockMarkers(content),
metadata: {
size: imageBuffer.length,
timestamp: new Date().toISOString(),
},
};
return {
content: removeCodeBlockMarkers(content),
metadata: {
size: imageBuffer.length,
timestamp: new Date().toISOString(),
},
};
} catch (error: unknown) {
report(error);
return {
content: '',
metadata: {
error: _pm(error),
timestamp: new Date().toISOString(),
},
};
}
}
137 changes: 79 additions & 58 deletions src/processors/pdf.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {processImage} from './image';
/**
* Processes a PDF input and extracts text content from each page.
* @param input - PDF input source.
* @param provider - The OCR provider to use.
* @param apiKey - API key for authentication.
* @returns An array of extracted text and metadata for each page.
*/
Expand All @@ -24,92 +25,112 @@ export async function processPdf(
): Promise<PageResult[]> {
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'ocra-'));
const tempPdfPath = path.join(tempDir, 'temp.pdf');
const limiter = new ConcurrencyLimit(MAX_CONCURRENT_REQUESTS);

const conversionLimiter = new ConcurrencyLimit(MAX_CONCURRENT_REQUESTS);
const processingLimiter = new ConcurrencyLimit(MAX_CONCURRENT_REQUESTS);

try {
// Save input to temporary PDF file
let pdfBuffer: Buffer;

if (typeof input === 'string') {
if (input.startsWith('http')) {
const response = await fetch(input);
pdfBuffer = Buffer.from(await response.arrayBuffer());
} else if (input.startsWith('data:application/pdf')) {
pdfBuffer = Buffer.from(input.split(',')[1], 'base64');
} else {
pdfBuffer = await fs.readFile(input);
try {
if (input.startsWith('http')) {
const response = await fetch(input);
if (!response.ok) {
throw new Error(
`Failed to fetch PDF: ${response.status} ${response.statusText}`,
);
}
pdfBuffer = Buffer.from(await response.arrayBuffer());
} else if (input.startsWith('data:application/pdf')) {
pdfBuffer = Buffer.from(input.split(',')[1], 'base64');
} else {
pdfBuffer = await fs.readFile(input);
}
} catch (error: unknown) {
throw new Error(`Failed to read PDF input: ${_pm(error)}`);
}
} else {
pdfBuffer = input;
}

if (!pdfBuffer?.length) {
throw new Error('Empty or invalid PDF buffer');
}

await fs.writeFile(tempPdfPath, pdfBuffer);

const pdfDoc = await PDFDocument.load(pdfBuffer);
const pageCount = pdfDoc.getPageCount();

const options = {
const convert = fromPath(tempPdfPath, {
density: 300,
saveFilename: 'page',
savePath: tempDir,
format: 'png',
width: 2480,
height: 3508,
};

const convert = fromPath(tempPdfPath, options);

// Convert pages to images concurrently
const pages = await Promise.all(
Array.from({length: pageCount}, (_, i) => i + 1).map(pageNum =>
limiter.run(async () => {
try {
const result = await convert(pageNum);
if (!result?.path) {
throw new Error('Failed to convert PDF page to image');
}
const imageBuffer = await fs.readFile(result.path);
await fs.unlink(result.path);
return {
buffer: imageBuffer,
pageNum,
};
} catch (error) {
report(error);
throw error;
});

const processingPromises: Promise<PageResult>[] = [];

for (let pageNum = 1; pageNum <= pageCount; pageNum++) {
// Start conversion without waiting for processing to finish
await conversionLimiter.run(async () => {
try {
// Convert page to image
const conversionResult = await convert(pageNum);
if (!conversionResult?.path) {
throw new Error('Failed to convert PDF page to image');
}
}),
),
);

// Process all pages concurrently
const results = await Promise.all(
pages.map(({buffer, pageNum}) =>
limiter.run(async () => {
try {
const result = await processImage(buffer, provider, apiKey);
return {
page: pageNum,
content: result.content,
metadata: {
...result.metadata,
pageNumber: pageNum,
},
};
} catch (error: unknown) {
report(error);
return {
const imageBuffer = await fs.readFile(conversionResult.path);
await fs.unlink(conversionResult.path);

// Start processing the image in the background
const processingPromise = processingLimiter.run(async () => {
try {
const processResult = await processImage(
imageBuffer,
provider,
apiKey,
);
return {
page: pageNum,
...processResult,
} satisfies PageResult;
} catch (error: unknown) {
report(error);
return {
page: pageNum,
content: '',
metadata: {
error: _pm(error),
},
} satisfies PageResult;
}
});

// Collect the processing promise
processingPromises.push(processingPromise);
} catch (error: unknown) {
report(error);
// In case of conversion error, add a failed result
processingPromises.push(
Promise.resolve({
page: pageNum,
content: '',
metadata: {
error: _pm(error),
pageNumber: pageNum,
},
};
}
}),
),
);
} satisfies PageResult),
);
}
});
}

// Wait for all processing tasks to complete
const results = await Promise.all(processingPromises);

return results.sort((a, b) => a.page - b.page);
} finally {
Expand Down
2 changes: 0 additions & 2 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ export interface OCRMetadata {
timestamp?: string;
/** Size of the processed file in bytes */
size?: number;
/** Page number for multi-page documents */
pageNumber?: number;
/** Error message if processing failed */
error?: string;
}
Expand Down
2 changes: 1 addition & 1 deletion src/utils/call-llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export async function callLLM(
for (let attempt = 0; attempt < RETRY_ATTEMPTS; attempt++) {
try {
const response = await fetchWithTimeout(
API_ENDPOINTS.openai,
API_ENDPOINTS[provider],
{
method: 'POST',
headers: createProviderHeaders(apiKey, provider),
Expand Down
Loading

0 comments on commit ff694ed

Please sign in to comment.