Skip to content

Commit 28b40d5

Browse files
authored
ENG-1121 endSyncTask error - wrong worker (#621)
1 parent 262e5e4 commit 28b40d5

File tree

3 files changed

+133
-17
lines changed

3 files changed

+133
-17
lines changed

packages/database/src/dbTypes.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,7 @@ export type Database = {
652652
Row: {
653653
failure_count: number | null
654654
id: number
655+
last_success_start: string | null
655656
last_task_end: string | null
656657
last_task_start: string | null
657658
status: Database["public"]["Enums"]["task_status"] | null
@@ -664,6 +665,7 @@ export type Database = {
664665
Insert: {
665666
failure_count?: number | null
666667
id?: number
668+
last_success_start?: string | null
667669
last_task_end?: string | null
668670
last_task_start?: string | null
669671
status?: Database["public"]["Enums"]["task_status"] | null
@@ -676,6 +678,7 @@ export type Database = {
676678
Update: {
677679
failure_count?: number | null
678680
id?: number
681+
last_success_start?: string | null
679682
last_task_end?: string | null
680683
last_task_start?: string | null
681684
status?: Database["public"]["Enums"]["task_status"] | null
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
2+
alter table "public"."sync_info" add column "last_success_start" timestamp with time zone;
3+
4+
alter table "public"."sync_info" alter column "last_task_start" set not null;
5+
6+
update sync_info set last_success_start = last_task_start where last_task_end is not null;
7+
8+
CREATE OR REPLACE FUNCTION public.end_sync_task(s_target bigint, s_function character varying, s_worker character varying, s_status public.task_status)
9+
RETURNS void
10+
LANGUAGE plpgsql
11+
SET search_path TO ''
12+
AS $function$
13+
DECLARE t_id INTEGER;
14+
DECLARE t_worker varchar;
15+
DECLARE t_status public.task_status;
16+
DECLARE t_failure_count SMALLINT;
17+
DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE;
18+
DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE;
19+
DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE;
20+
BEGIN
21+
SELECT id, worker, status, failure_count, last_task_start, last_task_end, last_success_start
22+
INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start
23+
FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function;
24+
ASSERT s_status > 'active';
25+
ASSERT t_worker = s_worker, 'Wrong worker';
26+
ASSERT s_status >= t_status, 'do not go back in status';
27+
IF s_status = 'complete' THEN
28+
t_last_task_end := now();
29+
t_last_success_start := t_last_task_start;
30+
t_failure_count := 0;
31+
ELSE
32+
IF t_status != s_status THEN
33+
t_failure_count := t_failure_count + 1;
34+
END IF;
35+
END IF;
36+
37+
UPDATE public.sync_info
38+
SET status = s_status,
39+
task_times_out_at=null,
40+
last_task_end=t_last_task_end,
41+
last_success_start=t_last_success_start,
42+
failure_count=t_failure_count
43+
WHERE id=t_id;
44+
END;
45+
$function$
46+
;
47+
48+
49+
CREATE OR REPLACE FUNCTION public.propose_sync_task(s_target bigint, s_function character varying, s_worker character varying, timeout interval, task_interval interval)
50+
RETURNS timestamp with time zone
51+
LANGUAGE plpgsql
52+
SET search_path TO ''
53+
AS $function$
54+
DECLARE s_id INTEGER;
55+
DECLARE start_time TIMESTAMP WITH TIME ZONE := now();
56+
DECLARE t_status public.task_status;
57+
DECLARE t_failure_count SMALLINT;
58+
DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE;
59+
DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE;
60+
DECLARE t_times_out_at TIMESTAMP WITH TIME ZONE;
61+
DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE;
62+
DECLARE result TIMESTAMP WITH TIME ZONE;
63+
BEGIN
64+
ASSERT timeout * 2 < task_interval;
65+
ASSERT timeout >= '1s'::interval;
66+
ASSERT task_interval >= '5s'::interval;
67+
INSERT INTO public.sync_info (sync_target, sync_function, status, worker, last_task_start, task_times_out_at)
68+
VALUES (s_target, s_function, 'active', s_worker, start_time, start_time+timeout)
69+
ON CONFLICT (sync_target, sync_function) DO NOTHING
70+
RETURNING id INTO s_id;
71+
IF s_id IS NOT NULL THEN
72+
-- totally new_row, no previous success.
73+
RETURN NULL;
74+
END IF;
75+
-- now we know it pre-existed. Maybe already active.
76+
SELECT id INTO STRICT s_id
77+
FROM public.sync_info
78+
WHERE sync_target = s_target AND sync_function = s_function
79+
FOR UPDATE;
80+
SELECT status, failure_count, last_task_start, last_task_end, task_times_out_at, last_success_start
81+
INTO t_status, t_failure_count, t_last_task_start, t_last_task_end, t_times_out_at, t_last_success_start
82+
FROM public.sync_info
83+
WHERE id = s_id;
84+
85+
IF t_status = 'active' AND t_last_task_start >= coalesce(t_last_task_end, t_last_task_start) AND start_time > t_times_out_at THEN
86+
t_status := 'timeout';
87+
t_failure_count := t_failure_count + 1;
88+
END IF;
89+
-- basic backoff
90+
task_interval := task_interval * (1+t_failure_count);
91+
IF coalesce(t_last_task_end, t_last_task_start) + task_interval < now() THEN
92+
-- we are ready to take on the task
93+
result := t_last_success_start;
94+
UPDATE public.sync_info
95+
SET worker=s_worker,
96+
status='active',
97+
task_times_out_at = now() + timeout,
98+
last_task_start = start_time,
99+
failure_count=t_failure_count,
100+
last_task_end = NULL
101+
WHERE id=s_id;
102+
ELSE
103+
-- the task has been tried recently enough
104+
IF t_status = 'timeout' THEN
105+
UPDATE public.sync_info
106+
SET status=t_status, failure_count=t_failure_count
107+
WHERE id=s_id;
108+
END IF;
109+
result := coalesce(t_last_task_end, t_last_task_start) + task_interval;
110+
END IF;
111+
112+
RETURN result;
113+
END;
114+
$function$
115+
;

packages/database/supabase/schemas/sync.sql

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ CREATE TABLE IF NOT EXISTS public.sync_info (
1515
status public.task_status DEFAULT 'active'::public.task_status,
1616
worker character varying(100) NOT NULL,
1717
failure_count smallint DEFAULT 0,
18-
last_task_start timestamp with time zone,
18+
last_task_start timestamp with time zone NOT NULL,
1919
last_task_end timestamp with time zone,
20-
task_times_out_at timestamp with time zone
20+
task_times_out_at timestamp with time zone,
21+
last_success_start timestamp with time zone
2122
);
2223

2324
ALTER TABLE public.sync_info OWNER TO "postgres";
@@ -58,16 +59,19 @@ DECLARE t_id INTEGER;
5859
DECLARE t_worker varchar;
5960
DECLARE t_status public.task_status;
6061
DECLARE t_failure_count SMALLINT;
62+
DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE;
63+
DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE;
6164
DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE;
6265
BEGIN
63-
SELECT id, worker, status, failure_count, last_task_end
64-
INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_end
66+
SELECT id, worker, status, failure_count, last_task_start, last_task_end, last_success_start
67+
INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start
6568
FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function;
6669
ASSERT s_status > 'active';
6770
ASSERT t_worker = s_worker, 'Wrong worker';
6871
ASSERT s_status >= t_status, 'do not go back in status';
6972
IF s_status = 'complete' THEN
7073
t_last_task_end := now();
74+
t_last_success_start := t_last_task_start;
7175
t_failure_count := 0;
7276
ELSE
7377
IF t_status != s_status THEN
@@ -79,6 +83,7 @@ BEGIN
7983
SET status = s_status,
8084
task_times_out_at=null,
8185
last_task_end=t_last_task_end,
86+
last_success_start=t_last_success_start,
8287
failure_count=t_failure_count
8388
WHERE id=t_id;
8489
END;
@@ -109,6 +114,7 @@ DECLARE t_failure_count SMALLINT;
109114
DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE;
110115
DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE;
111116
DECLARE t_times_out_at TIMESTAMP WITH TIME ZONE;
117+
DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE;
112118
DECLARE result TIMESTAMP WITH TIME ZONE;
113119
BEGIN
114120
ASSERT timeout * 2 < task_interval;
@@ -119,21 +125,16 @@ BEGIN
119125
ON CONFLICT (sync_target, sync_function) DO NOTHING
120126
RETURNING id INTO s_id;
121127
IF s_id IS NOT NULL THEN
122-
-- totally new_row, I'm on the task
123-
-- return last time it was run successfully
124-
SELECT max(last_task_start) INTO result FROM public.sync_info
125-
WHERE sync_target = s_target
126-
AND sync_function = s_function
127-
AND status = 'complete';
128-
RETURN result;
128+
-- totally new_row, no previous success.
129+
RETURN NULL;
129130
END IF;
130131
-- now we know it pre-existed. Maybe already active.
131132
SELECT id INTO STRICT s_id
132133
FROM public.sync_info
133134
WHERE sync_target = s_target AND sync_function = s_function
134135
FOR UPDATE;
135-
SELECT status, failure_count, last_task_start, last_task_end, task_times_out_at
136-
INTO t_status, t_failure_count, t_last_task_start, t_last_task_end, t_times_out_at
136+
SELECT status, failure_count, last_task_start, last_task_end, task_times_out_at, last_success_start
137+
INTO t_status, t_failure_count, t_last_task_start, t_last_task_end, t_times_out_at, t_last_success_start
137138
FROM public.sync_info
138139
WHERE id = s_id;
139140

@@ -145,10 +146,7 @@ BEGIN
145146
task_interval := task_interval * (1+t_failure_count);
146147
IF coalesce(t_last_task_end, t_last_task_start) + task_interval < now() THEN
147148
-- we are ready to take on the task
148-
SELECT max(last_task_start) INTO result FROM public.sync_info
149-
WHERE sync_target = s_target
150-
AND sync_function = s_function
151-
AND status = 'complete';
149+
result := t_last_success_start;
152150
UPDATE public.sync_info
153151
SET worker=s_worker,
154152
status='active',

0 commit comments

Comments
 (0)