Skip to content

Commit 3dfc639

Browse files
committed
Fix the race condition in FFI table functions #3
* The FFI table functions now signal that they don't preserve input order. This will prevent PhysicalBatchInsert batch index collisions when materializing results with multiple threads. * The propery fix may need update in DuckDB's code.
1 parent ace1d49 commit 3dfc639

File tree

11 files changed

+52
-0
lines changed

11 files changed

+52
-0
lines changed

onager/bindings/functions/approximation.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,21 +203,25 @@ void RegisterApproximationFunctions(ExtensionLoader &loader) {
203203
TableFunction max_clique("onager_apx_max_clique", {LogicalType::TABLE}, nullptr, MaxCliqueBind, MaxCliqueInitGlobal);
204204
max_clique.in_out_function = MaxCliqueInOut;
205205
max_clique.in_out_function_final = MaxCliqueFinal;
206+
max_clique.order_preservation_type = OrderPreservationType::NO_ORDER;
206207
loader.RegisterFunction(max_clique);
207208

208209
TableFunction independent_set("onager_apx_independent_set", {LogicalType::TABLE}, nullptr, IndependentSetBind, IndependentSetInitGlobal);
209210
independent_set.in_out_function = IndependentSetInOut;
210211
independent_set.in_out_function_final = IndependentSetFinal;
212+
independent_set.order_preservation_type = OrderPreservationType::NO_ORDER;
211213
loader.RegisterFunction(independent_set);
212214

213215
TableFunction vertex_cover("onager_apx_vertex_cover", {LogicalType::TABLE}, nullptr, VertexCoverBind, VertexCoverInitGlobal);
214216
vertex_cover.in_out_function = VertexCoverInOut;
215217
vertex_cover.in_out_function_final = VertexCoverFinal;
218+
vertex_cover.order_preservation_type = OrderPreservationType::NO_ORDER;
216219
loader.RegisterFunction(vertex_cover);
217220

218221
TableFunction tsp("onager_apx_tsp", {LogicalType::TABLE}, nullptr, TspBind, TspInitGlobal);
219222
tsp.in_out_function = TspInOut;
220223
tsp.in_out_function_final = TspFinal;
224+
tsp.order_preservation_type = OrderPreservationType::NO_ORDER;
221225
loader.RegisterFunction(tsp);
222226
}
223227

onager/bindings/functions/centrality.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,28 +401,33 @@ void RegisterCentralityFunctions(ExtensionLoader &loader) {
401401
pagerank.named_parameters["damping"] = LogicalType::DOUBLE;
402402
pagerank.named_parameters["iterations"] = LogicalType::BIGINT;
403403
pagerank.named_parameters["directed"] = LogicalType::BOOLEAN;
404+
pagerank.order_preservation_type = OrderPreservationType::NO_ORDER;
404405
loader.RegisterFunction(pagerank);
405406

406407
TableFunction degree("onager_ctr_degree", {LogicalType::TABLE}, nullptr, DegreeBind, DegreeInitGlobal);
407408
degree.in_out_function = DegreeInOut;
408409
degree.in_out_function_final = DegreeFinal;
409410
degree.named_parameters["directed"] = LogicalType::BOOLEAN;
411+
degree.order_preservation_type = OrderPreservationType::NO_ORDER;
410412
loader.RegisterFunction(degree);
411413

412414
TableFunction betweenness("onager_ctr_betweenness", {LogicalType::TABLE}, nullptr, BetweennessBind, BetweennessInitGlobal);
413415
betweenness.in_out_function = BetweennessInOut;
414416
betweenness.in_out_function_final = BetweennessFinal;
415417
betweenness.named_parameters["normalized"] = LogicalType::BOOLEAN;
418+
betweenness.order_preservation_type = OrderPreservationType::NO_ORDER;
416419
loader.RegisterFunction(betweenness);
417420

418421
TableFunction closeness("onager_ctr_closeness", {LogicalType::TABLE}, nullptr, ClosenessBind, ClosenessInitGlobal);
419422
closeness.in_out_function = ClosenessInOut;
420423
closeness.in_out_function_final = ClosenessFinal;
424+
closeness.order_preservation_type = OrderPreservationType::NO_ORDER;
421425
loader.RegisterFunction(closeness);
422426

423427
TableFunction harmonic("onager_ctr_harmonic", {LogicalType::TABLE}, nullptr, HarmonicBind, HarmonicInitGlobal);
424428
harmonic.in_out_function = HarmonicInOut;
425429
harmonic.in_out_function_final = HarmonicFinal;
430+
harmonic.order_preservation_type = OrderPreservationType::NO_ORDER;
426431
loader.RegisterFunction(harmonic);
427432

428433
TableFunction katz("onager_ctr_katz", {LogicalType::TABLE}, nullptr, KatzBind, KatzInitGlobal);
@@ -431,13 +436,15 @@ void RegisterCentralityFunctions(ExtensionLoader &loader) {
431436
katz.named_parameters["alpha"] = LogicalType::DOUBLE;
432437
katz.named_parameters["max_iter"] = LogicalType::BIGINT;
433438
katz.named_parameters["tolerance"] = LogicalType::DOUBLE;
439+
katz.order_preservation_type = OrderPreservationType::NO_ORDER;
434440
loader.RegisterFunction(katz);
435441

436442
TableFunction eigenvector("onager_ctr_eigenvector", {LogicalType::TABLE}, nullptr, EigenvectorBind, EigenvectorInitGlobal);
437443
eigenvector.in_out_function = EigenvectorInOut;
438444
eigenvector.in_out_function_final = EigenvectorFinal;
439445
eigenvector.named_parameters["max_iter"] = LogicalType::BIGINT;
440446
eigenvector.named_parameters["tolerance"] = LogicalType::DOUBLE;
447+
eigenvector.order_preservation_type = OrderPreservationType::NO_ORDER;
441448
loader.RegisterFunction(eigenvector);
442449
}
443450

@@ -512,6 +519,7 @@ void RegisterVoteRankFunction(ExtensionLoader &loader) {
512519
voterank.in_out_function = VoteRankInOut;
513520
voterank.in_out_function_final = VoteRankFinal;
514521
voterank.named_parameters["num_seeds"] = LogicalType::BIGINT;
522+
voterank.order_preservation_type = OrderPreservationType::NO_ORDER;
515523
loader.RegisterFunction(voterank);
516524
}
517525
} // namespace onager
@@ -623,12 +631,14 @@ void RegisterLocalReachingFunction(ExtensionLoader &loader) {
623631
lr.in_out_function = LocalReachingInOut;
624632
lr.in_out_function_final = LocalReachingFinal;
625633
lr.named_parameters["distance"] = LogicalType::BIGINT;
634+
lr.order_preservation_type = OrderPreservationType::NO_ORDER;
626635
loader.RegisterFunction(lr);
627636
}
628637
void RegisterLaplacianFunction(ExtensionLoader &loader) {
629638
TableFunction lap("onager_ctr_laplacian", {LogicalType::TABLE}, nullptr, LaplacianBind, LaplacianInitGlobal);
630639
lap.in_out_function = LaplacianInOut;
631640
lap.in_out_function_final = LaplacianFinal;
641+
lap.order_preservation_type = OrderPreservationType::NO_ORDER;
632642
loader.RegisterFunction(lap);
633643
}
634644
} // namespace onager

onager/bindings/functions/community.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,36 +310,42 @@ void RegisterCommunityFunctions(ExtensionLoader &loader) {
310310
louvain.in_out_function = LouvainInOut;
311311
louvain.in_out_function_final = LouvainFinal;
312312
louvain.named_parameters["seed"] = LogicalType::BIGINT;
313+
louvain.order_preservation_type = OrderPreservationType::NO_ORDER;
313314
loader.RegisterFunction(louvain);
314315

315316
TableFunction components("onager_cmm_components", {LogicalType::TABLE}, nullptr, ComponentsBind, ComponentsInitGlobal);
316317
components.in_out_function = ComponentsInOut;
317318
components.in_out_function_final = ComponentsFinal;
319+
components.order_preservation_type = OrderPreservationType::NO_ORDER;
318320
loader.RegisterFunction(components);
319321

320322
TableFunction label_prop("onager_cmm_label_prop", {LogicalType::TABLE}, nullptr, LabelPropBind, LabelPropInitGlobal);
321323
label_prop.in_out_function = LabelPropInOut;
322324
label_prop.in_out_function_final = LabelPropFinal;
325+
label_prop.order_preservation_type = OrderPreservationType::NO_ORDER;
323326
loader.RegisterFunction(label_prop);
324327

325328
TableFunction girvan_newman("onager_cmm_girvan_newman", {LogicalType::TABLE}, nullptr, GirvanNewmanBind, GirvanNewmanInitGlobal);
326329
girvan_newman.in_out_function = GirvanNewmanInOut;
327330
girvan_newman.in_out_function_final = GirvanNewmanFinal;
328331
girvan_newman.named_parameters["communities"] = LogicalType::BIGINT;
332+
girvan_newman.order_preservation_type = OrderPreservationType::NO_ORDER;
329333
loader.RegisterFunction(girvan_newman);
330334

331335
TableFunction spectral("onager_cmm_spectral", {LogicalType::TABLE}, nullptr, SpectralBind, SpectralInitGlobal);
332336
spectral.in_out_function = SpectralInOut;
333337
spectral.in_out_function_final = SpectralFinal;
334338
spectral.named_parameters["k"] = LogicalType::BIGINT;
335339
spectral.named_parameters["seed"] = LogicalType::BIGINT;
340+
spectral.order_preservation_type = OrderPreservationType::NO_ORDER;
336341
loader.RegisterFunction(spectral);
337342

338343
TableFunction infomap("onager_cmm_infomap", {LogicalType::TABLE}, nullptr, InfomapBind, InfomapInitGlobal);
339344
infomap.in_out_function = InfomapInOut;
340345
infomap.in_out_function_final = InfomapFinal;
341346
infomap.named_parameters["max_iter"] = LogicalType::BIGINT;
342347
infomap.named_parameters["seed"] = LogicalType::BIGINT;
348+
infomap.order_preservation_type = OrderPreservationType::NO_ORDER;
343349
loader.RegisterFunction(infomap);
344350
}
345351

onager/bindings/functions/generators.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,14 +140,17 @@ namespace onager {
140140
void RegisterGeneratorFunctions(ExtensionLoader &loader) {
141141
TableFunction erdos_renyi("onager_gen_erdos_renyi", {LogicalType::BIGINT, LogicalType::DOUBLE}, ErdosRenyiFunction, ErdosRenyiBind, ErdosRenyiInitGlobal);
142142
erdos_renyi.named_parameters["seed"] = LogicalType::BIGINT;
143+
erdos_renyi.order_preservation_type = OrderPreservationType::NO_ORDER;
143144
loader.RegisterFunction(erdos_renyi);
144145

145146
TableFunction barabasi_albert("onager_gen_barabasi_albert", {LogicalType::BIGINT, LogicalType::BIGINT}, BarabasiAlbertFunction, BarabasiAlbertBind, BarabasiAlbertInitGlobal);
146147
barabasi_albert.named_parameters["seed"] = LogicalType::BIGINT;
148+
barabasi_albert.order_preservation_type = OrderPreservationType::NO_ORDER;
147149
loader.RegisterFunction(barabasi_albert);
148150

149151
TableFunction watts_strogatz("onager_gen_watts_strogatz", {LogicalType::BIGINT, LogicalType::BIGINT, LogicalType::DOUBLE}, WattsStrogatzFunction, WattsStrogatzBind, WattsStrogatzInitGlobal);
150152
watts_strogatz.named_parameters["seed"] = LogicalType::BIGINT;
153+
watts_strogatz.order_preservation_type = OrderPreservationType::NO_ORDER;
151154
loader.RegisterFunction(watts_strogatz);
152155
}
153156

onager/bindings/functions/links.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,26 +255,31 @@ void RegisterLinkFunctions(ExtensionLoader &loader) {
255255
TableFunction jaccard("onager_lnk_jaccard", {LogicalType::TABLE}, nullptr, JaccardBind, JaccardInitGlobal);
256256
jaccard.in_out_function = JaccardInOut;
257257
jaccard.in_out_function_final = JaccardFinal;
258+
jaccard.order_preservation_type = OrderPreservationType::NO_ORDER;
258259
loader.RegisterFunction(jaccard);
259260

260261
TableFunction adamic_adar("onager_lnk_adamic_adar", {LogicalType::TABLE}, nullptr, AdamicAdarBind, AdamicAdarInitGlobal);
261262
adamic_adar.in_out_function = AdamicAdarInOut;
262263
adamic_adar.in_out_function_final = AdamicAdarFinal;
264+
adamic_adar.order_preservation_type = OrderPreservationType::NO_ORDER;
263265
loader.RegisterFunction(adamic_adar);
264266

265267
TableFunction pref_attach("onager_lnk_pref_attach", {LogicalType::TABLE}, nullptr, PrefAttachBind, PrefAttachInitGlobal);
266268
pref_attach.in_out_function = PrefAttachInOut;
267269
pref_attach.in_out_function_final = PrefAttachFinal;
270+
pref_attach.order_preservation_type = OrderPreservationType::NO_ORDER;
268271
loader.RegisterFunction(pref_attach);
269272

270273
TableFunction resource_alloc("onager_lnk_resource_alloc", {LogicalType::TABLE}, nullptr, ResourceAllocBind, ResourceAllocInitGlobal);
271274
resource_alloc.in_out_function = ResourceAllocInOut;
272275
resource_alloc.in_out_function_final = ResourceAllocFinal;
276+
resource_alloc.order_preservation_type = OrderPreservationType::NO_ORDER;
273277
loader.RegisterFunction(resource_alloc);
274278

275279
TableFunction common_neighbors("onager_lnk_common_neighbors", {LogicalType::TABLE}, nullptr, CommonNeighborsBind, CommonNeighborsInitGlobal);
276280
common_neighbors.in_out_function = CommonNeighborsInOut;
277281
common_neighbors.in_out_function_final = CommonNeighborsFinal;
282+
common_neighbors.order_preservation_type = OrderPreservationType::NO_ORDER;
278283
loader.RegisterFunction(common_neighbors);
279284
}
280285

onager/bindings/functions/metrics.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,42 +348,50 @@ void RegisterMetricFunctions(ExtensionLoader &loader) {
348348
TableFunction diameter("onager_mtr_diameter", {LogicalType::TABLE}, nullptr, DiameterBind, DiameterInitGlobal);
349349
diameter.in_out_function = DiameterInOut;
350350
diameter.in_out_function_final = DiameterFinal;
351+
diameter.order_preservation_type = OrderPreservationType::NO_ORDER;
351352
loader.RegisterFunction(diameter);
352353

353354
TableFunction radius("onager_mtr_radius", {LogicalType::TABLE}, nullptr, RadiusBind, RadiusInitGlobal);
354355
radius.in_out_function = RadiusInOut;
355356
radius.in_out_function_final = RadiusFinal;
357+
radius.order_preservation_type = OrderPreservationType::NO_ORDER;
356358
loader.RegisterFunction(radius);
357359

358360
TableFunction avg_clustering("onager_mtr_avg_clustering", {LogicalType::TABLE}, nullptr, AvgClusteringBind, AvgClusteringInitGlobal);
359361
avg_clustering.in_out_function = AvgClusteringInOut;
360362
avg_clustering.in_out_function_final = AvgClusteringFinal;
363+
avg_clustering.order_preservation_type = OrderPreservationType::NO_ORDER;
361364
loader.RegisterFunction(avg_clustering);
362365

363366
TableFunction triangles("onager_mtr_triangles", {LogicalType::TABLE}, nullptr, TriangleCountBind, TriangleCountInitGlobal);
364367
triangles.in_out_function = TriangleCountInOut;
365368
triangles.in_out_function_final = TriangleCountFinal;
369+
triangles.order_preservation_type = OrderPreservationType::NO_ORDER;
366370
loader.RegisterFunction(triangles);
367371

368372
TableFunction transitivity("onager_mtr_transitivity", {LogicalType::TABLE}, nullptr, TransitivityBind, TransitivityInitGlobal);
369373
transitivity.in_out_function = TransitivityInOut;
370374
transitivity.in_out_function_final = TransitivityFinal;
375+
transitivity.order_preservation_type = OrderPreservationType::NO_ORDER;
371376
loader.RegisterFunction(transitivity);
372377

373378
TableFunction avg_path_length("onager_mtr_avg_path_length", {LogicalType::TABLE}, nullptr, AvgPathLengthBind, AvgPathLengthInitGlobal);
374379
avg_path_length.in_out_function = AvgPathLengthInOut;
375380
avg_path_length.in_out_function_final = AvgPathLengthFinal;
381+
avg_path_length.order_preservation_type = OrderPreservationType::NO_ORDER;
376382
loader.RegisterFunction(avg_path_length);
377383

378384
TableFunction assortativity("onager_mtr_assortativity", {LogicalType::TABLE}, nullptr, AssortativityBind, AssortativityInitGlobal);
379385
assortativity.in_out_function = AssortativityInOut;
380386
assortativity.in_out_function_final = AssortativityFinal;
387+
assortativity.order_preservation_type = OrderPreservationType::NO_ORDER;
381388
loader.RegisterFunction(assortativity);
382389

383390
TableFunction density("onager_mtr_density", {LogicalType::TABLE}, nullptr, DensityBind, DensityInitGlobal);
384391
density.in_out_function = DensityInOut;
385392
density.in_out_function_final = DensityFinal;
386393
density.named_parameters["directed"] = LogicalType::BOOLEAN;
394+
density.order_preservation_type = OrderPreservationType::NO_ORDER;
387395
loader.RegisterFunction(density);
388396
}
389397

onager/bindings/functions/mst.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ void RegisterMstFunctions(ExtensionLoader &loader) {
7070
TableFunction kruskal("onager_mst_kruskal", {LogicalType::TABLE}, nullptr, KruskalMstBind, KruskalMstInitGlobal);
7171
kruskal.in_out_function = KruskalMstInOut;
7272
kruskal.in_out_function_final = KruskalMstFinal;
73+
kruskal.order_preservation_type = OrderPreservationType::NO_ORDER;
7374
loader.RegisterFunction(kruskal);
7475
}
7576

onager/bindings/functions/parallel.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,33 +313,39 @@ void RegisterParallelFunctions(ExtensionLoader &loader) {
313313
par_pr.named_parameters["damping"] = LogicalType::DOUBLE;
314314
par_pr.named_parameters["iterations"] = LogicalType::BIGINT;
315315
par_pr.named_parameters["directed"] = LogicalType::BOOLEAN;
316+
par_pr.order_preservation_type = OrderPreservationType::NO_ORDER;
316317
loader.RegisterFunction(par_pr);
317318

318319
TableFunction par_bfs("onager_par_bfs", {LogicalType::TABLE}, nullptr, ParallelBfsBind, ParallelBfsInitGlobal);
319320
par_bfs.in_out_function = ParallelBfsInOut;
320321
par_bfs.in_out_function_final = ParallelBfsFinal;
321322
par_bfs.named_parameters["source"] = LogicalType::BIGINT;
323+
par_bfs.order_preservation_type = OrderPreservationType::NO_ORDER;
322324
loader.RegisterFunction(par_bfs);
323325

324326
TableFunction par_paths("onager_par_shortest_paths", {LogicalType::TABLE}, nullptr, ParallelPathsBind, ParallelPathsInitGlobal);
325327
par_paths.in_out_function = ParallelPathsInOut;
326328
par_paths.in_out_function_final = ParallelPathsFinal;
327329
par_paths.named_parameters["source"] = LogicalType::BIGINT;
330+
par_paths.order_preservation_type = OrderPreservationType::NO_ORDER;
328331
loader.RegisterFunction(par_paths);
329332

330333
TableFunction par_components("onager_par_components", {LogicalType::TABLE}, nullptr, ParallelComponentsBind, ParallelComponentsInitGlobal);
331334
par_components.in_out_function = ParallelComponentsInOut;
332335
par_components.in_out_function_final = ParallelComponentsFinal;
336+
par_components.order_preservation_type = OrderPreservationType::NO_ORDER;
333337
loader.RegisterFunction(par_components);
334338

335339
TableFunction par_clustering("onager_par_clustering", {LogicalType::TABLE}, nullptr, ParallelClusteringBind, ParallelClusteringInitGlobal);
336340
par_clustering.in_out_function = ParallelClusteringInOut;
337341
par_clustering.in_out_function_final = ParallelClusteringFinal;
342+
par_clustering.order_preservation_type = OrderPreservationType::NO_ORDER;
338343
loader.RegisterFunction(par_clustering);
339344

340345
TableFunction par_triangles("onager_par_triangles", {LogicalType::TABLE}, nullptr, ParallelTrianglesBind, ParallelTrianglesInitGlobal);
341346
par_triangles.in_out_function = ParallelTrianglesInOut;
342347
par_triangles.in_out_function_final = ParallelTrianglesFinal;
348+
par_triangles.order_preservation_type = OrderPreservationType::NO_ORDER;
343349
loader.RegisterFunction(par_triangles);
344350
}
345351

onager/bindings/functions/personalized.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ void RegisterPersonalizedFunctions(ExtensionLoader &loader) {
104104
pers_pr.named_parameters["damping"] = LogicalType::DOUBLE;
105105
pers_pr.named_parameters["max_iter"] = LogicalType::BIGINT;
106106
pers_pr.named_parameters["tolerance"] = LogicalType::DOUBLE;
107+
pers_pr.order_preservation_type = OrderPreservationType::NO_ORDER;
107108
loader.RegisterFunction(pers_pr);
108109
}
109110

onager/bindings/functions/subgraphs.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,18 +175,21 @@ void RegisterSubgraphFunctions(ExtensionLoader &loader) {
175175
ego_graph.in_out_function_final = EgoGraphFinal;
176176
ego_graph.named_parameters["center"] = LogicalType::BIGINT;
177177
ego_graph.named_parameters["radius"] = LogicalType::BIGINT;
178+
ego_graph.order_preservation_type = OrderPreservationType::NO_ORDER;
178179
loader.RegisterFunction(ego_graph);
179180

180181
TableFunction k_hop("onager_sub_k_hop", {LogicalType::TABLE}, nullptr, KHopBind, KHopInitGlobal);
181182
k_hop.in_out_function = KHopInOut;
182183
k_hop.in_out_function_final = KHopFinal;
183184
k_hop.named_parameters["start"] = LogicalType::BIGINT;
184185
k_hop.named_parameters["k"] = LogicalType::BIGINT;
186+
k_hop.order_preservation_type = OrderPreservationType::NO_ORDER;
185187
loader.RegisterFunction(k_hop);
186188

187189
TableFunction induced("onager_sub_induced", {LogicalType::TABLE}, nullptr, InducedSubgraphBind, InducedSubgraphInitGlobal);
188190
induced.in_out_function = InducedSubgraphInOut;
189191
induced.in_out_function_final = InducedSubgraphFinal;
192+
induced.order_preservation_type = OrderPreservationType::NO_ORDER;
190193
loader.RegisterFunction(induced);
191194
}
192195

0 commit comments

Comments
 (0)