Skip to content

Commit f1a4e00

Browse files
committed
lock_object_storage_task_distribution_ms setting
1 parent a5892c1 commit f1a4e00

File tree

7 files changed

+181
-44
lines changed

7 files changed

+181
-44
lines changed

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6141,6 +6141,9 @@ Cache the list of objects returned by list objects calls in object storage
61416141
)", EXPERIMENTAL) \
61426142
DECLARE(Bool, object_storage_remote_initiator, false, R"(
61436143
Execute request to object storage as remote on one of object_storage_cluster nodes.
6144+
)", EXPERIMENTAL) \
6145+
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"(
6146+
In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active.
61446147
)", EXPERIMENTAL) \
61456148
\
61466149

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
7272
{"object_storage_cluster", "", "", "New setting"},
7373
{"object_storage_max_nodes", 0, 0, "New setting"},
7474
{"use_object_storage_list_objects_cache", true, false, "New setting."},
75+
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
7576
});
7677
addSettingsChanges(settings_changes_history, "25.3",
7778
{

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ namespace Setting
2727
{
2828
extern const SettingsBool use_hive_partitioning;
2929
extern const SettingsString object_storage_cluster;
30+
extern const SettingsUInt64 lock_object_storage_task_distribution_ms;
3031
}
3132

3233
namespace ErrorCodes
@@ -386,7 +387,10 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
386387
}
387388
}
388389

389-
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, ids_of_hosts);
390+
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(
391+
iterator,
392+
ids_of_hosts,
393+
local_context->getSettingsRef()[Setting::lock_object_storage_task_distribution_ms]);
390394

391395
auto callback = std::make_shared<TaskIterator>(
392396
[task_distributor](size_t number_of_current_replica) mutable -> String {

src/Storages/ObjectStorage/StorageObjectStorageSource.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <QueryPipeline/QueryPipelineBuilder.h>
2020
#include <Storages/Cache/SchemaCache.h>
2121
#include <Storages/ObjectStorage/StorageObjectStorage.h>
22+
#include <Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h>
2223
#include <Storages/ObjectStorage/DataLakes/DeltaLake/ObjectInfoWithPartitionColumns.h>
2324
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
2425
#include <Storages/VirtualColumnUtils.h>
@@ -430,16 +431,32 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
430431
ObjectInfoPtr object_info;
431432
auto query_settings = configuration->getQuerySettings(context_);
432433

434+
bool not_a_path = false;
435+
433436
do
434437
{
438+
not_a_path = false;
435439
object_info = file_iterator->next(processor);
436440

437441
if (!object_info || object_info->getPath().empty())
438442
return {};
439443

444+
StorageObjectStorageStableTaskDistributor::CommandInTaskResponse command(object_info->getPath());
445+
if (command.is_parsed())
446+
{
447+
auto retry_after_us = command.get_retry_after_us();
448+
if (retry_after_us.has_value())
449+
{
450+
not_a_path = true;
451+
/// TODO: Make asyncronous waiting without sleep in thread
452+
sleepForMicroseconds(std::min(100000ul, retry_after_us.value()));
453+
continue;
454+
}
455+
}
456+
440457
object_info->loadMetadata(object_storage);
441458
}
442-
while (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0);
459+
while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0));
443460

444461
QueryPipelineBuilder builder;
445462
std::shared_ptr<ISource> source;

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp

Lines changed: 87 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,21 @@
33
#include <consistent_hashing.h>
44
#include <optional>
55

6+
#include <Poco/JSON/Object.h>
7+
#include <Poco/JSON/Parser.h>
8+
#include <Poco/JSON/JSONException.h>
9+
610
namespace DB
711
{
812

913
StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
1014
std::shared_ptr<IObjectIterator> iterator_,
11-
std::vector<std::string> ids_of_nodes_)
15+
std::vector<std::string> ids_of_nodes_,
16+
uint64_t lock_object_storage_task_distribution_ms_)
1217
: iterator(std::move(iterator_))
1318
, connection_to_files(ids_of_nodes_.size())
1419
, ids_of_nodes(ids_of_nodes_)
20+
, lock_object_storage_task_distribution_us(lock_object_storage_task_distribution_ms_ * 1000)
1521
, iterator_exhausted(false)
1622
{
1723
}
@@ -24,6 +30,8 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(siz
2430
number_of_current_replica
2531
);
2632

33+
saveLastNodeActivity(number_of_current_replica);
34+
2735
// 1. Check pre-queued files first
2836
if (auto file = getPreQueuedFile(number_of_current_replica))
2937
return file;
@@ -148,7 +156,7 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFile
148156
// Queue file for its assigned replica
149157
{
150158
std::lock_guard lock(mutex);
151-
unprocessed_files.insert(file_path);
159+
unprocessed_files[file_path] = number_of_current_replica;
152160
connection_to_files[file_replica_idx].push_back(file_path);
153161
}
154162
}
@@ -158,25 +166,96 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFile
158166

159167
std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica)
160168
{
169+
/// Limit time of node activity to keep task in queue
170+
Poco::Timestamp activity_limit;
171+
Poco::Timestamp oldest_activity;
172+
if (lock_object_storage_task_distribution_us)
173+
activity_limit -= lock_object_storage_task_distribution_us;
174+
161175
std::lock_guard lock(mutex);
162176

163177
if (!unprocessed_files.empty())
164178
{
165179
auto it = unprocessed_files.begin();
166-
String next_file = *it;
167-
unprocessed_files.erase(it);
180+
181+
while (it != unprocessed_files.end())
182+
{
183+
auto last_activity = last_node_activity.find(it->second);
184+
if (!lock_object_storage_task_distribution_us
185+
|| last_activity == last_node_activity.end()
186+
|| activity_limit > last_activity->second)
187+
{
188+
String next_file = it->first;
189+
unprocessed_files.erase(it);
190+
191+
LOG_TRACE(
192+
log,
193+
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
194+
next_file,
195+
number_of_current_replica
196+
);
197+
198+
return next_file;
199+
}
200+
201+
oldest_activity = std::min(oldest_activity, last_activity->second);
202+
++it;
203+
}
168204

169205
LOG_TRACE(
170206
log,
171-
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
172-
next_file,
173-
number_of_current_replica
207+
"No unprocessed file for replica {}, need to retry after {} us",
208+
number_of_current_replica,
209+
oldest_activity - activity_limit
174210
);
175211

176-
return next_file;
212+
/// All unprocessed files owned by alive replicas with recenlty activity
213+
/// Need to retry after (oldest_activity - activity_limit) microseconds
214+
CommandInTaskResponse response;
215+
response.set_retry_after_us(oldest_activity - activity_limit);
216+
return response.to_string();
177217
}
178218

179219
return std::nullopt;
180220
}
181221

222+
void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t number_of_current_replica)
223+
{
224+
Poco::Timestamp now;
225+
std::lock_guard lock(mutex);
226+
last_node_activity[number_of_current_replica] = now;
227+
}
228+
229+
StorageObjectStorageStableTaskDistributor::CommandInTaskResponse::CommandInTaskResponse(const std::string & task)
230+
{
231+
Poco::JSON::Parser parser;
232+
try
233+
{
234+
auto json = parser.parse(task).extract<Poco::JSON::Object::Ptr>();
235+
if (!json)
236+
return;
237+
238+
successfully_parsed = true;
239+
240+
if (json->has("retry_after_us"))
241+
retry_after_us = json->getValue<size_t>("retry_after_us");
242+
}
243+
catch (const Poco::JSON::JSONException &)
244+
{ /// Not a JSON
245+
return;
246+
}
247+
}
248+
249+
std::string StorageObjectStorageStableTaskDistributor::CommandInTaskResponse::to_string() const
250+
{
251+
Poco::JSON::Object json;
252+
if (retry_after_us.has_value())
253+
json.set("retry_after_us", retry_after_us.value());
254+
255+
std::ostringstream oss;
256+
oss.exceptions(std::ios::failbit);
257+
Poco::JSON::Stringifier::stringify(json, oss);
258+
return oss.str();
259+
}
260+
182261
}

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55
#include <Interpreters/Cluster.h>
66
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
77
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSource.h>
8+
9+
#include <Poco/Timestamp.h>
10+
811
#include <unordered_set>
12+
#include <unordered_map>
913
#include <vector>
1014
#include <mutex>
1115
#include <memory>
@@ -16,9 +20,28 @@ namespace DB
1620
class StorageObjectStorageStableTaskDistributor
1721
{
1822
public:
23+
class CommandInTaskResponse
24+
{
25+
public:
26+
CommandInTaskResponse() {}
27+
CommandInTaskResponse(const std::string & task);
28+
29+
bool is_parsed() const { return successfully_parsed; }
30+
void set_retry_after_us(uint64_t time_us) { retry_after_us = time_us; }
31+
32+
std::string to_string() const;
33+
34+
std::optional<uint64_t> get_retry_after_us() const { return retry_after_us; }
35+
36+
private:
37+
bool successfully_parsed = false;
38+
std::optional<uint64_t> retry_after_us;
39+
};
40+
1941
StorageObjectStorageStableTaskDistributor(
2042
std::shared_ptr<IObjectIterator> iterator_,
21-
std::vector<std::string> ids_of_nodes_);
43+
std::vector<std::string> ids_of_nodes_,
44+
uint64_t lock_object_storage_task_distribution_ms_);
2245

2346
std::optional<String> getNextTask(size_t number_of_current_replica);
2447

@@ -28,12 +51,17 @@ class StorageObjectStorageStableTaskDistributor
2851
std::optional<String> getMatchingFileFromIterator(size_t number_of_current_replica);
2952
std::optional<String> getAnyUnprocessedFile(size_t number_of_current_replica);
3053

54+
void saveLastNodeActivity(size_t number_of_current_replica);
55+
3156
std::shared_ptr<IObjectIterator> iterator;
3257

3358
std::vector<std::vector<String>> connection_to_files;
34-
std::unordered_set<String> unprocessed_files;
59+
/// Map of unprocessed files in format filename => number of prefetched replica
60+
std::unordered_map<String, size_t> unprocessed_files;
3561

3662
std::vector<std::string> ids_of_nodes;
63+
std::unordered_map<size_t, Poco::Timestamp> last_node_activity;
64+
Poco::Timestamp::TimeDiff lock_object_storage_task_distribution_us;
3765

3866
std::mutex mutex;
3967
bool iterator_exhausted = false;

0 commit comments

Comments
 (0)