From af496fe5e3d49c873555bcb27ad4a85e47e35c11 Mon Sep 17 00:00:00 2001
From: guyu <guyu@fordeal.com>
Date: Fri, 24 Jan 2025 11:41:18 +0800
Subject: [PATCH 1/5] for same query_text refresh just execution once

---
 redash/tasks/queries/execution.py | 12 +++++--
 redash/utils/locks.py             | 53 +++++++++++++++++++++++++++++++
 2 files changed, 62 insertions(+), 3 deletions(-)
 create mode 100644 redash/utils/locks.py

diff --git a/redash/tasks/queries/execution.py b/redash/tasks/queries/execution.py
index a863903cdb..a545e29cba 100644
--- a/redash/tasks/queries/execution.py
+++ b/redash/tasks/queries/execution.py
@@ -15,6 +15,7 @@
 from redash.tasks.failure_report import track_failure
 from redash.tasks.worker import Job, Queue
 from redash.utils import gen_query_hash, utcnow
+from redash.utils.locks import acquire_lock, release_lock
 from redash.worker import get_job_logger
 
 logger = get_job_logger(__name__)
@@ -34,14 +35,18 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query
     logger.info("Inserting job for %s with metadata=%s", query_hash, metadata)
     try_count = 0
     job = None
+    job_lock_id = _job_lock_id(query_hash, data_source.id)
 
     while try_count < 5:
         try_count += 1
+        identifier = acquire_lock(job_lock_id)
+        if identifier is None:
+            continue
 
         pipe = redis_connection.pipeline()
         try:
-            pipe.watch(_job_lock_id(query_hash, data_source.id))
-            job_id = pipe.get(_job_lock_id(query_hash, data_source.id))
+            pipe.watch(job_lock_id)
+            job_id = pipe.get(job_lock_id)
             if job_id:
                 logger.info("[%s] Found existing job: %s", query_hash, job_id)
                 job_complete = None
@@ -66,7 +71,7 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query
 
                 if lock_is_irrelevant:
                     logger.info("[%s] %s, removing lock", query_hash, message)
-                    redis_connection.delete(_job_lock_id(query_hash, data_source.id))
+                    redis_connection.delete(job_lock_id)
                     job = None
 
             if not job:
@@ -115,6 +120,7 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query
         except redis.WatchError:
             continue
         finally:
+            release_lock(job_lock_id, identifier)
             pipe.reset()
 
     if not job:
diff --git a/redash/utils/locks.py b/redash/utils/locks.py
new file mode 100644
index 0000000000..ca1324a87e
--- /dev/null
+++ b/redash/utils/locks.py
@@ -0,0 +1,53 @@
+import random
+import time
+import uuid
+import logging
+from redis import WatchError
+from redash import redis_connection
+
+logger = logging.getLogger(__name__)
+
+
+def acquire_lock(name, acquire_timeout=10, lock_timeout=5):
+    identifier = str(uuid.uuid4())
+    lock_name = f"lock:{name}"
+    end = time.time() + acquire_timeout
+
+    base_delay = 0.001
+    max_delay = 0.05
+
+    while time.time() < end:
+        if redis_connection.set(lock_name, identifier, ex=lock_timeout, nx=True):
+            logger.info("acquire_lock, lock_name=[%s], identifier=[%s]", lock_name, identifier)
+            return identifier
+
+        delay = base_delay + random.uniform(0, base_delay)
+        time.sleep(min(delay, max_delay))
+        base_delay = min(base_delay * 2, max_delay)
+
+    return None
+
+
+def release_lock(name, identifier):
+    lock_name = f"lock:{name}"
+    logger.info("release_lock, lock_name=[%s], identifier=[%s]", lock_name, identifier)
+    with redis_connection.pipeline() as pipe:
+        while True:
+            try:
+                pipe.watch(lock_name)
+                if pipe.get(lock_name) == identifier:
+                    pipe.multi()
+                    pipe.delete(lock_name)
+                    pipe.execute()
+                    logger.info("Lock released successfully, lock_name=[%s], identifier=[%s]", lock_name, identifier)
+                    return True
+                pipe.unwatch()
+                logger.warning("Lock not owned by this identifier, lock_name=[%s], identifier=[%s]", lock_name, identifier)
+                break
+            except WatchError:
+                logger.warning("WatchError occurred, retrying lock release, lock_name=[%s], identifier=[%s]", lock_name, identifier)
+            except Exception as e:
+                logger.error("Error releasing lock: %s", str(e))
+                break
+
+        return False

From f841b217e8173e01062e3bb13ac5ce5b60c7c655 Mon Sep 17 00:00:00 2001
From: guyu <guyu@fordeal.com>
Date: Fri, 24 Jan 2025 11:46:17 +0800
Subject: [PATCH 2/5] format

---
 redash/utils/locks.py | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/redash/utils/locks.py b/redash/utils/locks.py
index ca1324a87e..4660d8cdae 100644
--- a/redash/utils/locks.py
+++ b/redash/utils/locks.py
@@ -2,8 +2,8 @@
 import time
 import uuid
 import logging
-from redis import WatchError
 from redash import redis_connection
+from redis import WatchError
 
 logger = logging.getLogger(__name__)
 
@@ -42,10 +42,12 @@ def release_lock(name, identifier):
                     logger.info("Lock released successfully, lock_name=[%s], identifier=[%s]", lock_name, identifier)
                     return True
                 pipe.unwatch()
-                logger.warning("Lock not owned by this identifier, lock_name=[%s], identifier=[%s]", lock_name, identifier)
+                logger.warning("Lock not owned by this identifier, lock_name=[%s], identifier=[%s]", lock_name,
+                               identifier)
                 break
             except WatchError:
-                logger.warning("WatchError occurred, retrying lock release, lock_name=[%s], identifier=[%s]", lock_name, identifier)
+                logger.warning("WatchError occurred, retrying lock release, lock_name=[%s], identifier=[%s]", lock_name,
+                               identifier)
             except Exception as e:
                 logger.error("Error releasing lock: %s", str(e))
                 break

From 06c9a2b21a7d1ffca0c67a564e23caaf72a8221d Mon Sep 17 00:00:00 2001
From: guyu <guyu@fordeal.com>
Date: Fri, 24 Jan 2025 11:48:29 +0800
Subject: [PATCH 3/5] fix

---
 redash/utils/locks.py | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/redash/utils/locks.py b/redash/utils/locks.py
index 4660d8cdae..126496096f 100644
--- a/redash/utils/locks.py
+++ b/redash/utils/locks.py
@@ -42,12 +42,16 @@ def release_lock(name, identifier):
                     logger.info("Lock released successfully, lock_name=[%s], identifier=[%s]", lock_name, identifier)
                     return True
                 pipe.unwatch()
-                logger.warning("Lock not owned by this identifier, lock_name=[%s], identifier=[%s]", lock_name,
-                               identifier)
+                logger.warning(
+                    "Lock not owned by this identifier, lock_name=[%s], identifier=[%s]", lock_name, identifier
+                )
                 break
             except WatchError:
-                logger.warning("WatchError occurred, retrying lock release, lock_name=[%s], identifier=[%s]", lock_name,
-                               identifier)
+                logger.warning(
+                    "WatchError occurred, retrying lock release, lock_name=[%s], identifier=[%s]",
+                    lock_name,
+                    identifier,
+                )
             except Exception as e:
                 logger.error("Error releasing lock: %s", str(e))
                 break

From 5cfa6bc217cee45b6d1fd8671efbaa735a6c8478 Mon Sep 17 00:00:00 2001
From: Arik Fraimovich <arik@arikfr.com>
Date: Fri, 31 Jan 2025 10:29:54 +0200
Subject: [PATCH 4/5] Update ci.yml to match latest master

---
 .github/workflows/ci.yml | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 520f5c0a60..1cee14f8ab 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -3,7 +3,7 @@ on:
   push:
     branches:
       - master
-  pull_request_target:
+  pull_request:
     branches:
       - master
 env:
@@ -60,10 +60,10 @@ jobs:
           mkdir -p /tmp/test-results/unit-tests
           docker cp tests:/app/coverage.xml ./coverage.xml
           docker cp tests:/app/junit.xml /tmp/test-results/unit-tests/results.xml
-      - name: Upload coverage reports to Codecov
-        uses: codecov/codecov-action@v3
-        with:
-          token: ${{ secrets.CODECOV_TOKEN }}
+      # - name: Upload coverage reports to Codecov
+      #   uses: codecov/codecov-action@v3
+      #   with:
+      #     token: ${{ secrets.CODECOV_TOKEN }}
       - name: Store Test Results
         uses: actions/upload-artifact@v4
         with:
@@ -134,9 +134,9 @@ jobs:
       COMPOSE_PROJECT_NAME: cypress
       CYPRESS_INSTALL_BINARY: 0
       PUPPETEER_SKIP_CHROMIUM_DOWNLOAD: 1
-      PERCY_TOKEN: ${{ secrets.PERCY_TOKEN }}
-      CYPRESS_PROJECT_ID: ${{ secrets.CYPRESS_PROJECT_ID }}
-      CYPRESS_RECORD_KEY: ${{ secrets.CYPRESS_RECORD_KEY }}
+      # PERCY_TOKEN: ${{ secrets.PERCY_TOKEN }}
+      # CYPRESS_PROJECT_ID: ${{ secrets.CYPRESS_PROJECT_ID }}
+      # CYPRESS_RECORD_KEY: ${{ secrets.CYPRESS_RECORD_KEY }}
     steps:
       - if: github.event.pull_request.mergeable == 'false'
         name: Exit if PR is not mergeable

From 9e0e128244495822561a8b62906e38e0d52e7673 Mon Sep 17 00:00:00 2001
From: guyu <guyu@fordeal.com>
Date: Mon, 10 Mar 2025 18:10:27 +0800
Subject: [PATCH 5/5] fix

---
 redash/utils/locks.py | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/redash/utils/locks.py b/redash/utils/locks.py
index 126496096f..78ad2693c9 100644
--- a/redash/utils/locks.py
+++ b/redash/utils/locks.py
@@ -1,10 +1,12 @@
+import logging
 import random
 import time
 import uuid
-import logging
-from redash import redis_connection
+
 from redis import WatchError
 
+from redash import redis_connection
+
 logger = logging.getLogger(__name__)