Skip to content

Commit e896dd9

Browse files
committed
Upd: transfer protect
1 parent 2950586 commit e896dd9

File tree

3 files changed

+23
-6
lines changed

3 files changed

+23
-6
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ async def async_root(*args):
126126

127127
def sync_prime_number(redis, mysql, x):
128128
# Example synchronous function to determine if the input x is a prime number.
129+
# redis and mysql clients are entered by default, starting from the third parameter
130+
# is your custom parameters, only keyword parameters are supported.
129131
import math, time
130132
if x == 1:
131133
return True
@@ -137,6 +139,12 @@ def sync_prime_number(redis, mysql, x):
137139

138140
async def async_fibonacci(redis, mysql, n):
139141
# Example asynchronous function to calculate the nth position of the Fibonacci series.
142+
# redis and mysql clients are entered by default, starting from the third parameter
143+
# is your custom parameters, only keyword parameters are supported.
144+
145+
# Be sure to note that all data to upload and download must be serializable by msgpack.
146+
# This means that if you transfer some custom object, or in this case a very large integer,
147+
# the request will be responsed with an internal server error (http 500).
140148
a, b = 0, 1
141149
for _ in range(n):
142150
a, b = b, a + b

fastapi_queue/gateway_manager.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ async def redis_channel_listening_result(self, *args, **kwargs):
5252
if message['data'] == self._confirm_token:
5353
task_status_activate = True
5454
break
55+
else:
56+
raise RuntimeError('This is special class to account for confirmation failures')
5557
except asyncio.TimeoutError:
5658
asyncio.create_task(self._withdraw_query(*args, **kwargs))
5759
except Exception as e:
@@ -66,15 +68,17 @@ async def redis_channel_listening_result(self, *args, **kwargs):
6668
while True:
6769
message = await channel.get_message(ignore_subscribe_messages=True, timeout = self._process_timeout + 5)
6870
if message is not None:
69-
result = json.loads(message['data'])
71+
run_success, result = json.loads(message['data'])
7072
break
7173
except asyncio.TimeoutError:
7274
return False, -2
7375
except Exception as e:
7476
if debug:
7577
raise e
78+
return False, e
7679
else:
77-
return True, result
80+
if run_success: return True, result
81+
else: return False, result
7882

7983
async def _publish_query(self, form_data: dict = {}, task_level: int = 0):
8084
'''

fastapi_queue/worker.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,11 +167,16 @@ async def _worker_thread(self, thread_idx: int):
167167
await self.redis.publish(result_channel_name, self._confirm_token)
168168
self._logger.debug(f"Pid: {self.pid}, `{result_channel_name}` confirmed")
169169
method = self._method_map[task_route]
170-
if method.__name__[0] == 'a':
171-
res = await method(self.redis, self.mysql, **form_data)
170+
try:
171+
if method.__name__[0] == 'a':
172+
res = await method(self.redis, self.mysql, **form_data)
173+
else:
174+
res = await loop.run_in_executor(self._executor, partial(method, self.redis, self.mysql, **form_data))
175+
except Exception as e:
176+
res = json.dumps([False, f'{type(e)}:{e}'])
172177
else:
173-
res = await loop.run_in_executor(self._executor, partial(method, self.redis, self.mysql, **form_data))
174-
res = json.dumps(res)
178+
# format: [run_success: bool, result: Any]
179+
res = json.dumps([True, res])
175180
self._logger.info(f"Pid: {self.pid}, return: {res[:20]}")
176181
await self.redis.publish(result_channel_name, res)
177182

0 commit comments

Comments
 (0)