-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathOCR-Space Function.py
256 lines (224 loc) · 9.78 KB
/
OCR-Space Function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
"""
title: OCR.Space Text Extraction
author: cloph-dsp
version: 1.0
description: Extract text from images in chat messages using OCR.Space API.
Get your key here -> https://ocr.space/OCRAPI
"""
from pydantic import BaseModel, Field
from typing import Callable, Awaitable, Any, Optional
import asyncio
import base64
import json
import requests
import re
class Filter:
class Valves(BaseModel):
priority: int = Field(
default=0, description="Priority level for filtering operations."
)
API_KEY: str = Field(
default="", description="OCR.Space API Key"
)
MAX_RETRIES: int = Field(default=3, description="Maximum retry attempts for OCR processing")
LANGUAGE: str = Field(
default="eng",
description="OCR language code (e.g., eng, ger, spa, fra, ita, por, etc.)"
)
def __init__(self):
self.valves = self.Valves()
def _extract_base64_content(self, image_url: str) -> str:
"""Extract base64 content from data URL."""
# Check if it's a data URL (base64)
match = re.match(r'data:image/([a-zA-Z]+);base64,(.+)', image_url)
if match:
return match.group(2)
return image_url
async def _process_image_ocr(self, image_url: str, event_emitter) -> str:
"""Process image through OCR.Space API with retry mechanism."""
retries = 0
while retries < self.valves.MAX_RETRIES:
try:
# Extract base64 content if it's a data URL
image_data = self._extract_base64_content(image_url)
# Determine if it's a URL or base64 data
if image_data == image_url: # It's a URL
ocr_result = requests.post(
'https://api.ocr.space/parse/image',
data={
'url': image_url,
'apikey': self.valves.API_KEY,
'language': self.valves.LANGUAGE,
'isOverlayRequired': False
},
)
else: # It's base64 data
ocr_result = requests.post(
'https://api.ocr.space/parse/image',
data={
'base64Image': f'data:image/jpeg;base64,{image_data}',
'apikey': self.valves.API_KEY,
'language': self.valves.LANGUAGE,
'isOverlayRequired': False
},
)
result = json.loads(ocr_result.content.decode())
if result.get("OCRExitCode") == 1: # Success
extracted_text = ""
for text_result in result.get("ParsedResults", []):
extracted_text += text_result.get("ParsedText", "")
if not extracted_text:
raise Exception("No text extracted from the image")
return extracted_text
else:
error_message = result.get("ErrorMessage", "Unknown OCR error")
raise Exception(f"OCR error: {error_message}")
except Exception as e:
retries += 1
if retries < self.valves.MAX_RETRIES:
await event_emitter(
{
"type": "status",
"data": {
"description": f"⚠️ OCR processing failed, retrying ({retries}/{self.valves.MAX_RETRIES})...",
"done": False,
},
}
)
await asyncio.sleep(2**retries) # Exponential backoff
else:
await event_emitter(
{
"type": "status",
"data": {
"description": f"❌ OCR failed after {self.valves.MAX_RETRIES} attempts: {str(e)}",
"done": True,
},
}
)
raise
raise Exception(f"Failed to process image after {self.valves.MAX_RETRIES} attempts")
def _find_image_in_messages(self, messages):
"""Find images in the user messages."""
for m_index, message in enumerate(messages):
if message["role"] == "user" and isinstance(message.get("content"), list):
for c_index, content in enumerate(message["content"]):
if content["type"] == "image_url":
return m_index, c_index, content["image_url"]["url"]
return None
async def inlet(
self,
body: dict,
__event_emitter__: Callable[[Any], Awaitable[None]],
__user__: Optional[dict] = None,
__model__: Optional[dict] = None,
) -> dict:
messages = body.get("messages", [])
# Check for API key
if not self.valves.API_KEY or self.valves.API_KEY == "helloworld":
# Add a system message informing about missing API key
await __event_emitter__(
{
"type": "status",
"data": {
"description": "❌ OCR feature disabled: No valid API key configured",
"done": True,
},
}
)
# Add a message to the conversation
if messages and messages[-1]["role"] == "user":
content = messages[-1].get("content", "")
if isinstance(content, list):
# Add a text note about the missing API key to the existing content
for item in content:
if item["type"] == "image_url":
messages[-1]["content"] = [
{"type": "text", "text": "I notice you've uploaded an image, but OCR processing is not available (missing API key)."}
]
return body
# Look for images in messages
image_info = self._find_image_in_messages(messages)
if not image_info:
return body
message_index, content_index, image_url = image_info
try:
# Show processing status
await __event_emitter__(
{
"type": "status",
"data": {
"description": "🔍 Processing image with OCR.Space...",
"done": False,
},
}
)
# Extract text from image
extracted_text = await self._process_image_ocr(image_url, __event_emitter__)
# Update status to complete
await __event_emitter__(
{
"type": "status",
"data": {
"description": "✅ Text successfully extracted from image",
"done": True,
},
}
)
# Format the extracted text with context
ocr_context = (
f"[System note: The following text was extracted from an uploaded image via OCR. "
f"Use it to inform your response.]\n\n{extracted_text}"
)
# Update the message content
user_message = ""
if isinstance(messages[message_index]["content"], list):
for item in messages[message_index]["content"]:
if item["type"] == "text":
user_message = item["text"]
break
# Replace the image with the extracted text
messages[message_index]["content"] = [
{
"type": "text",
"text": f"{ocr_context}\n\nUser query: {user_message}" if user_message else ocr_context
}
]
body["messages"] = messages
except Exception as e:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"❌ OCR processing failed: {str(e)}",
"done": True,
},
}
)
# Replace the image with an error message
if isinstance(messages[message_index]["content"], list):
user_message = ""
for item in messages[message_index]["content"]:
if item["type"] == "text":
user_message = item["text"]
break
messages[message_index]["content"] = [
{
"type": "text",
"text": f"[Image processing failed: {str(e)}]\n\n{user_message}" if user_message else f"[Image processing failed: {str(e)}]"
}
]
body["messages"] = messages
return body
async def stream(self, event: dict) -> dict:
# No modifications needed for streaming events
return event
async def outlet(
self,
body: dict,
__event_emitter__: Callable[[Any], Awaitable[None]],
__user__: Optional[dict] = None,
__model__: Optional[dict] = None,
) -> dict:
# No modifications needed for the response
return body