Skip to content

Commit 46e97a0

Browse files
author
Omer Lachish
authored
Upgrade RQ to v1.5 (#5207)
* upgrade RQ to v1.5 * set job's started_at * update healthcheck to match string worker names * delay worker healthcheck for 5 minutes from start to allow enough time to load in case many workers try to load simultaneously * log when worker cannot be found
1 parent 640fea5 commit 46e97a0

File tree

9 files changed

+40
-73
lines changed

9 files changed

+40
-73
lines changed

redash/cli/rq.py

+28-28
Original file line numberDiff line numberDiff line change
@@ -50,30 +50,22 @@ def worker(queues):
5050

5151

5252
class WorkerHealthcheck(base.BaseCheck):
53-
NAME = 'RQ Worker Healthcheck'
54-
INTERVAL = datetime.timedelta(minutes=5)
55-
_last_check_time = {}
56-
57-
def time_to_check(self, pid):
58-
now = datetime.datetime.utcnow()
59-
60-
if pid not in self._last_check_time:
61-
self._last_check_time[pid] = now
62-
63-
if now - self._last_check_time[pid] >= self.INTERVAL:
64-
self._last_check_time[pid] = now
65-
return True
66-
67-
return False
53+
NAME = "RQ Worker Healthcheck"
6854

6955
def __call__(self, process_spec):
70-
pid = process_spec['pid']
71-
if not self.time_to_check(pid):
72-
return True
73-
56+
pid = process_spec["pid"]
7457
all_workers = Worker.all(connection=rq_redis_connection)
75-
worker = [w for w in all_workers if w.hostname == socket.gethostname().encode() and
76-
w.pid == pid].pop()
58+
workers = [
59+
w
60+
for w in all_workers
61+
if w.hostname == socket.gethostname() and w.pid == pid
62+
]
63+
64+
if not workers:
65+
self._log(f"Cannot find worker for hostname {socket.gethostname()} and pid {pid}. ==> Is healthy? False")
66+
return False
67+
68+
worker = workers.pop()
7769

7870
is_busy = worker.get_state() == WorkerStatus.BUSY
7971

@@ -85,17 +77,25 @@ def __call__(self, process_spec):
8577

8678
is_healthy = is_busy or seen_lately or has_nothing_to_do
8779

88-
self._log("Worker %s healthcheck: Is busy? %s. "
89-
"Seen lately? %s (%d seconds ago). "
90-
"Has nothing to do? %s (%d jobs in watched queues). "
91-
"==> Is healthy? %s",
92-
worker.key, is_busy, seen_lately, time_since_seen.seconds,
93-
has_nothing_to_do, total_jobs_in_watched_queues, is_healthy)
80+
self._log(
81+
"Worker %s healthcheck: Is busy? %s. "
82+
"Seen lately? %s (%d seconds ago). "
83+
"Has nothing to do? %s (%d jobs in watched queues). "
84+
"==> Is healthy? %s",
85+
worker.key,
86+
is_busy,
87+
seen_lately,
88+
time_since_seen.seconds,
89+
has_nothing_to_do,
90+
total_jobs_in_watched_queues,
91+
is_healthy,
92+
)
9493

9594
return is_healthy
9695

9796

9897
@manager.command()
9998
def healthcheck():
10099
return check_runner.CheckRunner(
101-
'worker_healthcheck', 'worker', None, [(WorkerHealthcheck, {})]).run()
100+
"worker_healthcheck", "worker", None, [(WorkerHealthcheck, {})]
101+
).run()

redash/tasks/__init__.py

-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
version_check,
44
send_mail,
55
sync_user_details,
6-
purge_failed_jobs,
76
)
87
from .queries import (
98
enqueue_query,

redash/tasks/general.py

+2-37
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,10 @@
22
from datetime import datetime
33

44
from flask_mail import Message
5-
from rq import Connection, Queue
6-
from rq.registry import FailedJobRegistry
7-
from rq.job import Job
8-
from redash import mail, models, settings, rq_redis_connection
5+
from redash import mail, models, settings
96
from redash.models import users
107
from redash.version_check import run_version_check
11-
from redash.worker import job, get_job_logger, default_operational_queues
8+
from redash.worker import job, get_job_logger
129
from redash.tasks.worker import Queue
1310
from redash.query_runner import NotSupported
1411

@@ -94,35 +91,3 @@ def get_schema(data_source_id, refresh):
9491

9592
def sync_user_details():
9693
users.sync_last_active_at()
97-
98-
99-
def purge_failed_jobs():
100-
with Connection(rq_redis_connection):
101-
queues = [q for q in Queue.all() if q.name not in default_operational_queues]
102-
for queue in queues:
103-
failed_job_ids = FailedJobRegistry(queue=queue).get_job_ids()
104-
failed_jobs = Job.fetch_many(failed_job_ids, rq_redis_connection)
105-
stale_jobs = []
106-
for failed_job in failed_jobs:
107-
# the job may not actually exist anymore in Redis
108-
if not failed_job:
109-
continue
110-
# the job could have an empty ended_at value in case
111-
# of a worker dying before it can save the ended_at value,
112-
# in which case we also consider them stale
113-
if not failed_job.ended_at:
114-
stale_jobs.append(failed_job)
115-
elif (
116-
datetime.utcnow() - failed_job.ended_at
117-
).total_seconds() > settings.JOB_DEFAULT_FAILURE_TTL:
118-
stale_jobs.append(failed_job)
119-
120-
for stale_job in stale_jobs:
121-
stale_job.delete()
122-
123-
if stale_jobs:
124-
logger.info(
125-
"Purged %d old failed jobs from the %s queue.",
126-
len(stale_jobs),
127-
queue.name,
128-
)

redash/tasks/queries/execution.py

+1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ def enqueue_query(
9090
"scheduled_query_id": scheduled_query_id,
9191
"is_api_key": is_api_key,
9292
"job_timeout": time_limit,
93+
"failure_ttl": settings.JOB_DEFAULT_FAILURE_TTL,
9394
"meta": {
9495
"data_source_id": data_source.id,
9596
"org_id": data_source.org_id,

redash/tasks/schedule.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
empty_schedules,
1616
refresh_schemas,
1717
cleanup_query_results,
18-
purge_failed_jobs,
1918
version_check,
2019
send_aggregated_errors,
2120
Queue,
@@ -71,14 +70,13 @@ def periodic_job_definitions():
7170
{
7271
"func": refresh_schemas,
7372
"interval": timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE),
74-
},
73+
},
7574
{
7675
"func": sync_user_details,
7776
"timeout": 60,
7877
"interval": timedelta(minutes=1),
7978
"result_ttl": 600,
8079
},
81-
{"func": purge_failed_jobs, "timeout": 3600, "interval": timedelta(days=1)},
8280
{
8381
"func": send_aggregated_errors,
8482
"interval": timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL),

redash/tasks/worker.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,13 @@ def enforce_hard_limit(self, job):
101101
)
102102
self.kill_horse()
103103

104-
def monitor_work_horse(self, job):
104+
def monitor_work_horse(self, job, queue):
105105
"""The worker will monitor the work horse and make sure that it
106106
either executes successfully or the status of the job is set to
107107
failed
108108
"""
109109
self.monitor_started = utcnow()
110+
job.started_at = utcnow()
110111
while True:
111112
try:
112113
with UnixSignalDeathPenalty(
@@ -158,6 +159,7 @@ def monitor_work_horse(self, job):
158159

159160
self.handle_job_failure(
160161
job,
162+
queue=queue,
161163
exc_string="Work-horse process was terminated unexpectedly "
162164
"(waitpid returned %s)" % ret_val,
163165
)

redash/worker.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class StatsdRecordingJobDecorator(rq_job): # noqa
3030
queue_class = RedashQueue
3131

3232

33-
job = partial(StatsdRecordingJobDecorator, connection=rq_redis_connection)
33+
job = partial(StatsdRecordingJobDecorator, connection=rq_redis_connection, failure_ttl=settings.JOB_DEFAULT_FAILURE_TTL)
3434

3535

3636
class CurrentJobFilter(logging.Filter):

requirements.txt

+3-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ psycopg2==2.8.3
2424
python-dateutil==2.8.0
2525
pytz>=2019.3
2626
PyYAML==5.1.2
27-
redis==3.3.11
27+
redis==3.5.0
2828
requests==2.21.0
2929
SQLAlchemy==1.3.10
3030
# We can't upgrade SQLAlchemy-Searchable version as newer versions require PostgreSQL > 9.6, but we target older versions at the moment.
@@ -34,8 +34,9 @@ pyparsing==2.3.0
3434
SQLAlchemy-Utils==0.34.2
3535
sqlparse==0.3.0
3636
statsd==3.3.0
37+
greenlet==0.4.16
3738
gunicorn==20.0.4
38-
rq==1.1.0
39+
rq==1.5.0
3940
rq-scheduler==0.9.1
4041
jsonschema==3.1.1
4142
RestrictedPython==5.0

worker.conf

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ directory=/app
1717
stopsignal=TERM
1818
autostart=true
1919
autorestart=true
20+
startsecs=300
2021
stdout_logfile=/dev/stdout
2122
stdout_logfile_maxbytes=0
2223
stderr_logfile=/dev/stderr

0 commit comments

Comments
 (0)