Skip to content

Commit 38ab4de

Browse files
committed
Survive minor db drops
Essentially it means wrapping all db interactions in the persistence adapter with retries. We don't really need the retries, but we need to wrap the db-specific errors with general dynflow persistence errors which the rest of the codebase expects.
1 parent 57261dc commit 38ab4de

File tree

1 file changed

+54
-40
lines changed

1 file changed

+54
-40
lines changed

lib/dynflow/persistence_adapters/sequel.rb

Lines changed: 54 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -71,36 +71,39 @@ def find_execution_plans(options = {})
7171
paginate(table(table_name), options),
7272
options),
7373
options[:filters])
74-
data_set.all.map { |record| execution_plan_column_map(load_data(record, table_name)) }
74+
records = with_retry { data_set.all }
75+
records.map { |record| execution_plan_column_map(load_data(record, table_name)) }
7576
end
7677

7778
def find_execution_plan_counts(options = {})
78-
filter(:execution_plan, table(:execution_plan), options[:filters]).count
79+
with_retry { filter(:execution_plan, table(:execution_plan), options[:filters]).count }
7980
end
8081

8182
def delete_execution_plans(filters, batch_size = 1000, backup_dir = nil)
8283
count = 0
83-
filter(:execution_plan, table(:execution_plan), filters).each_slice(batch_size) do |plans|
84-
uuids = plans.map { |p| p.fetch(:uuid) }
85-
@db.transaction do
86-
table(:delayed).where(execution_plan_uuid: uuids).delete
84+
with_retry do
85+
filter(:execution_plan, table(:execution_plan), filters).each_slice(batch_size) do |plans|
86+
uuids = plans.map { |p| p.fetch(:uuid) }
87+
@db.transaction do
88+
table(:delayed).where(execution_plan_uuid: uuids).delete
8789

88-
steps = table(:step).where(execution_plan_uuid: uuids)
89-
backup_to_csv(:step, steps, backup_dir, 'steps.csv') if backup_dir
90-
steps.delete
90+
steps = table(:step).where(execution_plan_uuid: uuids)
91+
backup_to_csv(:step, steps, backup_dir, 'steps.csv') if backup_dir
92+
steps.delete
9193

92-
output_chunks = table(:output_chunk).where(execution_plan_uuid: uuids).delete
94+
output_chunks = table(:output_chunk).where(execution_plan_uuid: uuids).delete
9395

94-
actions = table(:action).where(execution_plan_uuid: uuids)
95-
backup_to_csv(:action, actions, backup_dir, 'actions.csv') if backup_dir
96-
actions.delete
96+
actions = table(:action).where(execution_plan_uuid: uuids)
97+
backup_to_csv(:action, actions, backup_dir, 'actions.csv') if backup_dir
98+
actions.delete
9799

98-
execution_plans = table(:execution_plan).where(uuid: uuids)
99-
backup_to_csv(:execution_plan, execution_plans, backup_dir, 'execution_plans.csv') if backup_dir
100-
count += execution_plans.delete
100+
execution_plans = table(:execution_plan).where(uuid: uuids)
101+
backup_to_csv(:execution_plan, execution_plans, backup_dir, 'execution_plans.csv') if backup_dir
102+
count += execution_plans.delete
103+
end
101104
end
105+
return count
102106
end
103-
return count
104107
end
105108

106109
def load_execution_plan(execution_plan_id)
@@ -113,30 +116,37 @@ def save_execution_plan(execution_plan_id, value)
113116

114117
def delete_delayed_plans(filters, batch_size = 1000)
115118
count = 0
116-
filter(:delayed, table(:delayed), filters).each_slice(batch_size) do |plans|
117-
uuids = plans.map { |p| p.fetch(:execution_plan_uuid) }
118-
@db.transaction do
119-
count += table(:delayed).where(execution_plan_uuid: uuids).delete
119+
with_retry do
120+
filter(:delayed, table(:delayed), filters).each_slice(batch_size) do |plans|
121+
uuids = plans.map { |p| p.fetch(:execution_plan_uuid) }
122+
@db.transaction do
123+
count += table(:delayed).where(execution_plan_uuid: uuids).delete
124+
end
120125
end
121126
end
122127
count
123128
end
124129

125130
def find_old_execution_plans(age)
126131
table_name = :execution_plan
127-
table(table_name)
128-
.where(::Sequel.lit('ended_at <= ? AND state = ?', age, 'stopped'))
129-
.all.map { |plan| execution_plan_column_map(load_data plan, table_name) }
132+
records = with_retry do
133+
table(table_name)
134+
.where(::Sequel.lit('ended_at <= ? AND state = ?', age, 'stopped'))
135+
.all
136+
end
137+
records.map { |plan| execution_plan_column_map(load_data plan, table_name) }
130138
end
131139

132140
def find_past_delayed_plans(time)
133141
table_name = :delayed
134-
table(table_name)
135-
.where(::Sequel.lit('start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?)', time, time))
136-
.where(:frozen => false)
137-
.order_by(:start_at)
138-
.all
139-
.map { |plan| load_data(plan, table_name) }
142+
records = with_retry do
143+
table(table_name)
144+
.where(::Sequel.lit('start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?)', time, time))
145+
.where(:frozen => false)
146+
.order_by(:start_at)
147+
.all
148+
end
149+
records.map { |plan| load_data(plan, table_name) }
140150
end
141151

142152
def load_delayed_plan(execution_plan_id)
@@ -203,28 +213,30 @@ def save_envelope(data)
203213

204214
def pull_envelopes(receiver_id)
205215
connector_feature!
206-
db.transaction do
207-
data_set = table(:envelope).where(receiver_id: receiver_id).all
208-
envelopes = data_set.map { |record| load_data(record) }
216+
with_retry do
217+
db.transaction do
218+
data_set = table(:envelope).where(receiver_id: receiver_id).all
219+
envelopes = data_set.map { |record| load_data(record) }
209220

210-
table(:envelope).where(id: data_set.map { |d| d[:id] }).delete
211-
return envelopes
221+
table(:envelope).where(id: data_set.map { |d| d[:id] }).delete
222+
return envelopes
223+
end
212224
end
213225
end
214226

215227
def push_envelope(envelope)
216228
connector_feature!
217-
table(:envelope).insert(prepare_record(:envelope, envelope))
229+
with_retry { table(:envelope).insert(prepare_record(:envelope, envelope)) }
218230
end
219231

220232
def prune_envelopes(receiver_ids)
221233
connector_feature!
222-
table(:envelope).where(receiver_id: receiver_ids).delete
234+
with_retry { table(:envelope).where(receiver_id: receiver_ids).delete }
223235
end
224236

225237
def prune_undeliverable_envelopes
226238
connector_feature!
227-
table(:envelope).where(receiver_id: table(:coordinator_record).select(:id)).invert.delete
239+
with_retry { table(:envelope).where(receiver_id: table(:coordinator_record).select(:id)).invert.delete }
228240
end
229241

230242
def coordinator_feature!
@@ -245,7 +257,7 @@ def update_coordinator_record(class_name, record_id, value)
245257

246258
def delete_coordinator_record(class_name, record_id)
247259
coordinator_feature!
248-
table(:coordinator_record).where(class: class_name, id: record_id).delete
260+
with_retry { table(:coordinator_record).where(class: class_name, id: record_id).delete }
249261
end
250262

251263
def find_coordinator_records(options)
@@ -257,7 +269,9 @@ def find_coordinator_records(options)
257269
if exclude_owner_id
258270
data_set = data_set.exclude(:owner_id => exclude_owner_id)
259271
end
260-
data_set.all.map { |record| load_data(record) }
272+
with_retry do
273+
data_set.all.map { |record| load_data(record) }
274+
end
261275
end
262276

263277
def to_hash

0 commit comments

Comments
 (0)