Skip to content

Commit d248b95

Browse files
committed
Fix the data race condition table function bindings #3
1 parent cff686f commit d248b95

File tree

10 files changed

+49
-0
lines changed

10 files changed

+49
-0
lines changed

onager/bindings/functions/approximation.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ static OperatorResultType MaxCliqueInOut(ExecutionContext &ctx, TableFunctionInp
3737
}
3838
static OperatorFinalizeResultType MaxCliqueFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
3939
auto &gs = data.global_state->Cast<MaxCliqueGlobalState>();
40+
std::lock_guard<std::mutex> lock(gs.input_mutex);
4041
if (!gs.computed) {
4142
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
4243
int64_t nc = ::onager::onager_compute_max_clique(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), nullptr);
@@ -80,6 +81,7 @@ static OperatorResultType IndependentSetInOut(ExecutionContext &ctx, TableFuncti
8081
}
8182
static OperatorFinalizeResultType IndependentSetFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
8283
auto &gs = data.global_state->Cast<IndependentSetGlobalState>();
84+
std::lock_guard<std::mutex> lock(gs.input_mutex);
8385
if (!gs.computed) {
8486
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
8587
int64_t nc = ::onager::onager_compute_independent_set(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), nullptr);
@@ -123,6 +125,7 @@ static OperatorResultType VertexCoverInOut(ExecutionContext &ctx, TableFunctionI
123125
}
124126
static OperatorFinalizeResultType VertexCoverFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
125127
auto &gs = data.global_state->Cast<VertexCoverGlobalState>();
128+
std::lock_guard<std::mutex> lock(gs.input_mutex);
126129
if (!gs.computed) {
127130
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
128131
int64_t nc = ::onager::onager_compute_vertex_cover(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), nullptr);
@@ -172,6 +175,7 @@ static OperatorResultType TspInOut(ExecutionContext &ctx, TableFunctionInput &da
172175
}
173176
static OperatorFinalizeResultType TspFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
174177
auto &gs = data.global_state->Cast<TspGlobalState>();
178+
std::lock_guard<std::mutex> lock(gs.input_mutex);
175179
if (!gs.computed) {
176180
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
177181
int64_t nc = ::onager::onager_compute_tsp(gs.src_nodes.data(), gs.dst_nodes.data(), gs.weights.data(), gs.src_nodes.size(), nullptr, nullptr);

onager/bindings/functions/centrality.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ static OperatorResultType PageRankInOut(ExecutionContext &context, TableFunction
7070
static OperatorFinalizeResultType PageRankFinal(ExecutionContext &context, TableFunctionInput &data, DataChunk &output) {
7171
auto &bind = data.bind_data->Cast<PageRankBindData>();
7272
auto &gs = data.global_state->Cast<PageRankGlobalState>();
73+
std::lock_guard<std::mutex> lock(gs.input_mutex);
7374
if (!gs.computed) {
7475
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
7576
size_t ec = gs.src_nodes.size();
@@ -123,6 +124,7 @@ static OperatorResultType DegreeInOut(ExecutionContext &ctx, TableFunctionInput
123124
}
124125
static OperatorFinalizeResultType DegreeFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
125126
auto &bd = data.bind_data->Cast<DegreeBindData>(); auto &gs = data.global_state->Cast<DegreeGlobalState>();
127+
std::lock_guard<std::mutex> lock(gs.input_mutex);
126128
if (!gs.computed) {
127129
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
128130
int64_t nc = ::onager::onager_compute_degree(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), bd.directed, nullptr, nullptr, nullptr);
@@ -171,6 +173,7 @@ static OperatorResultType BetweennessInOut(ExecutionContext &ctx, TableFunctionI
171173
}
172174
static OperatorFinalizeResultType BetweennessFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
173175
auto &bd = data.bind_data->Cast<BetweennessBindData>(); auto &gs = data.global_state->Cast<BetweennessGlobalState>();
176+
std::lock_guard<std::mutex> lock(gs.input_mutex);
174177
if (!gs.computed) {
175178
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
176179
int64_t nc = ::onager::onager_compute_betweenness(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), bd.normalized, nullptr, nullptr);
@@ -216,6 +219,7 @@ static OperatorResultType ClosenessInOut(ExecutionContext &ctx, TableFunctionInp
216219
}
217220
static OperatorFinalizeResultType ClosenessFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
218221
auto &gs = data.global_state->Cast<ClosenessGlobalState>();
222+
std::lock_guard<std::mutex> lock(gs.input_mutex);
219223
if (!gs.computed) {
220224
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
221225
int64_t nc = ::onager::onager_compute_closeness(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), nullptr, nullptr);
@@ -261,6 +265,7 @@ static OperatorResultType HarmonicInOut(ExecutionContext &ctx, TableFunctionInpu
261265
}
262266
static OperatorFinalizeResultType HarmonicFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
263267
auto &gs = data.global_state->Cast<HarmonicGlobalState>();
268+
std::lock_guard<std::mutex> lock(gs.input_mutex);
264269
if (!gs.computed) {
265270
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
266271
int64_t nc = ::onager::onager_compute_harmonic(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), nullptr, nullptr);
@@ -313,6 +318,7 @@ static OperatorResultType KatzInOut(ExecutionContext &ctx, TableFunctionInput &d
313318
}
314319
static OperatorFinalizeResultType KatzFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
315320
auto &bd = data.bind_data->Cast<KatzBindData>(); auto &gs = data.global_state->Cast<KatzGlobalState>();
321+
std::lock_guard<std::mutex> lock(gs.input_mutex);
316322
if (!gs.computed) {
317323
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
318324
int64_t nc = ::onager::onager_compute_katz(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), bd.alpha, bd.max_iter, bd.tolerance, nullptr, nullptr);
@@ -364,6 +370,7 @@ static OperatorResultType EigenvectorInOut(ExecutionContext &ctx, TableFunctionI
364370
}
365371
static OperatorFinalizeResultType EigenvectorFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
366372
auto &bd = data.bind_data->Cast<EigenvectorBindData>(); auto &gs = data.global_state->Cast<EigenvectorGlobalState>();
373+
std::lock_guard<std::mutex> lock(gs.input_mutex);
367374
if (!gs.computed) {
368375
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
369376
int64_t nc = ::onager::onager_compute_eigenvector(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), bd.max_iter, bd.tolerance, nullptr, nullptr);
@@ -480,6 +487,7 @@ static OperatorResultType VoteRankInOut(ExecutionContext &ctx, TableFunctionInpu
480487
}
481488
static OperatorFinalizeResultType VoteRankFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
482489
auto &bd = data.bind_data->Cast<VoteRankBindData>(); auto &gs = data.global_state->Cast<VoteRankGlobalState>();
490+
std::lock_guard<std::mutex> lock(gs.input_mutex);
483491
if (!gs.computed) {
484492
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
485493
int64_t nc = ::onager::onager_compute_voterank(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), bd.num_seeds, nullptr);
@@ -545,6 +553,7 @@ static OperatorResultType LocalReachingInOut(ExecutionContext &ctx, TableFunctio
545553
}
546554
static OperatorFinalizeResultType LocalReachingFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
547555
auto &bd = data.bind_data->Cast<LocalReachingBindData>(); auto &gs = data.global_state->Cast<LocalReachingGlobalState>();
556+
std::lock_guard<std::mutex> lock(gs.input_mutex);
548557
if (!gs.computed) {
549558
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
550559
int64_t nc = ::onager::onager_compute_local_reaching(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), bd.distance, nullptr, nullptr);
@@ -590,6 +599,7 @@ static OperatorResultType LaplacianInOut(ExecutionContext &ctx, TableFunctionInp
590599
}
591600
static OperatorFinalizeResultType LaplacianFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
592601
auto &gs = data.global_state->Cast<LaplacianGlobalState>();
602+
std::lock_guard<std::mutex> lock(gs.input_mutex);
593603
if (!gs.computed) {
594604
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
595605
int64_t nc = ::onager::onager_compute_laplacian(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), nullptr, nullptr);

onager/bindings/functions/community.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ static OperatorResultType LouvainInOut(ExecutionContext &ctx, TableFunctionInput
4141
}
4242
static OperatorFinalizeResultType LouvainFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
4343
auto &bd = data.bind_data->Cast<LouvainBindData>(); auto &gs = data.global_state->Cast<LouvainGlobalState>();
44+
std::lock_guard<std::mutex> lock(gs.input_mutex);
4445
if (!gs.computed) {
4546
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
4647
int64_t nc = ::onager::onager_compute_louvain(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), bd.seed, nullptr, nullptr);
@@ -85,6 +86,7 @@ static OperatorResultType ComponentsInOut(ExecutionContext &ctx, TableFunctionIn
8586
}
8687
static OperatorFinalizeResultType ComponentsFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
8788
auto &gs = data.global_state->Cast<ComponentsGlobalState>();
89+
std::lock_guard<std::mutex> lock(gs.input_mutex);
8890
if (!gs.computed) {
8991
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
9092
int64_t nc = ::onager::onager_compute_connected_components(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), nullptr, nullptr);
@@ -129,6 +131,7 @@ static OperatorResultType LabelPropInOut(ExecutionContext &ctx, TableFunctionInp
129131
}
130132
static OperatorFinalizeResultType LabelPropFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
131133
auto &gs = data.global_state->Cast<LabelPropGlobalState>();
134+
std::lock_guard<std::mutex> lock(gs.input_mutex);
132135
if (!gs.computed) {
133136
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
134137
int64_t nc = ::onager::onager_compute_label_propagation(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), nullptr, nullptr);
@@ -176,6 +179,7 @@ static OperatorResultType GirvanNewmanInOut(ExecutionContext &ctx, TableFunction
176179
}
177180
static OperatorFinalizeResultType GirvanNewmanFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
178181
auto &bd = data.bind_data->Cast<GirvanNewmanBindData>(); auto &gs = data.global_state->Cast<GirvanNewmanGlobalState>();
182+
std::lock_guard<std::mutex> lock(gs.input_mutex);
179183
if (!gs.computed) {
180184
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
181185
int64_t nc = ::onager::onager_compute_girvan_newman(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), bd.target_communities, nullptr, nullptr);
@@ -226,6 +230,7 @@ static OperatorResultType SpectralInOut(ExecutionContext &ctx, TableFunctionInpu
226230
}
227231
static OperatorFinalizeResultType SpectralFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
228232
auto &bd = data.bind_data->Cast<SpectralBindData>(); auto &gs = data.global_state->Cast<SpectralGlobalState>();
233+
std::lock_guard<std::mutex> lock(gs.input_mutex);
229234
if (!gs.computed) {
230235
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
231236
int64_t nc = ::onager::onager_compute_spectral_clustering(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), bd.k, bd.seed, nullptr, nullptr);
@@ -276,6 +281,7 @@ static OperatorResultType InfomapInOut(ExecutionContext &ctx, TableFunctionInput
276281
}
277282
static OperatorFinalizeResultType InfomapFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
278283
auto &bd = data.bind_data->Cast<InfomapBindData>(); auto &gs = data.global_state->Cast<InfomapGlobalState>();
284+
std::lock_guard<std::mutex> lock(gs.input_mutex);
279285
if (!gs.computed) {
280286
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
281287
int64_t nc = ::onager::onager_compute_infomap(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), bd.max_iter, bd.seed, nullptr, nullptr);

onager/bindings/functions/links.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ static OperatorResultType JaccardInOut(ExecutionContext &ctx, TableFunctionInput
4040
}
4141
static OperatorFinalizeResultType JaccardFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
4242
auto &gs = data.global_state->Cast<JaccardGlobalState>();
43+
std::lock_guard<std::mutex> lock(gs.input_mutex);
4344
if (!gs.computed) {
4445
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
4546
int64_t nc = ::onager::onager_compute_jaccard(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), nullptr, nullptr, nullptr);
@@ -86,6 +87,7 @@ static OperatorResultType AdamicAdarInOut(ExecutionContext &ctx, TableFunctionIn
8687
}
8788
static OperatorFinalizeResultType AdamicAdarFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
8889
auto &gs = data.global_state->Cast<AdamicAdarGlobalState>();
90+
std::lock_guard<std::mutex> lock(gs.input_mutex);
8991
if (!gs.computed) {
9092
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
9193
int64_t nc = ::onager::onager_compute_adamic_adar(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), nullptr, nullptr, nullptr);
@@ -132,6 +134,7 @@ static OperatorResultType PrefAttachInOut(ExecutionContext &ctx, TableFunctionIn
132134
}
133135
static OperatorFinalizeResultType PrefAttachFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
134136
auto &gs = data.global_state->Cast<PrefAttachGlobalState>();
137+
std::lock_guard<std::mutex> lock(gs.input_mutex);
135138
if (!gs.computed) {
136139
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
137140
int64_t nc = ::onager::onager_compute_preferential_attachment(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), nullptr, nullptr, nullptr);
@@ -178,6 +181,7 @@ static OperatorResultType ResourceAllocInOut(ExecutionContext &ctx, TableFunctio
178181
}
179182
static OperatorFinalizeResultType ResourceAllocFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
180183
auto &gs = data.global_state->Cast<ResourceAllocGlobalState>();
184+
std::lock_guard<std::mutex> lock(gs.input_mutex);
181185
if (!gs.computed) {
182186
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
183187
int64_t nc = ::onager::onager_compute_resource_allocation(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), nullptr, nullptr, nullptr);
@@ -223,6 +227,7 @@ static OperatorResultType CommonNeighborsInOut(ExecutionContext &ctx, TableFunct
223227
}
224228
static OperatorFinalizeResultType CommonNeighborsFinal(ExecutionContext &ctx, TableFunctionInput &data, DataChunk &output) {
225229
auto &gs = data.global_state->Cast<CommonNeighborsGlobalState>();
230+
std::lock_guard<std::mutex> lock(gs.input_mutex);
226231
if (!gs.computed) {
227232
if (gs.src_nodes.empty()) { gs.computed = true; output.SetCardinality(0); return OperatorFinalizeResultType::FINISHED; }
228233
int64_t nc = ::onager::onager_compute_common_neighbors(gs.src_nodes.data(), gs.dst_nodes.data(), gs.src_nodes.size(), nullptr, nullptr, nullptr);

0 commit comments

Comments
 (0)