Skip to content

Commit 508c3f6

Browse files
committed
improve the performance of byrow(join)
1 parent 50f4541 commit 508c3f6

File tree

3 files changed

+84
-67
lines changed

3 files changed

+84
-67
lines changed

src/byrow/byrow.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ byrow(ds::AbstractDataset, ::typeof(stdze!), cols::MultiColumnIndex = names(ds,
143143
byrow(ds::AbstractDataset, ::typeof(hash), cols::MultiColumnIndex = :; by = identity, threads = nrow(ds) > __NCORES*10) = row_hash(ds, by, cols, threads = threads)
144144
byrow(ds::AbstractDataset, ::typeof(hash), col::ColumnIndex; by = identity, threads = nrow(ds) > __NCORES*10) = byrow(ds, hash, [col]; by = by, threads = threads)
145145

146-
byrow(ds::AbstractDataset, ::typeof(join), col::MultiColumnIndex; threads = nrow(ds) > __NCORES*10, delim = "", last = "") = row_join2(ds, col, threads = threads, delim = delim, last = last)
146+
byrow(ds::AbstractDataset, ::typeof(join), col::MultiColumnIndex; threads = nrow(ds) > __NCORES*10, delim = "", last = "") = row_join(ds, col, threads = threads, delim = delim, last = last)
147147

148148
byrow(ds::AbstractDataset, ::typeof(mapreduce), cols::MultiColumnIndex = names(ds, Union{Missing, Number}); op = .+, f = identity, init = missings(mapreduce(eltype, promote_type, view(_columns(ds),index(ds)[cols])), nrow(ds)), kwargs...) = mapreduce(f, op, eachcol(ds[!, cols]), init = init; kwargs...)
149149

src/byrow/row_functions.jl

+38-26
Original file line numberDiff line numberDiff line change
@@ -1112,40 +1112,52 @@ function row_hash(ds::AbstractDataset, f::Function, cols = :; threads = true)
11121112
end
11131113
row_hash(ds::AbstractDataset, cols = :; threads = true) = row_hash(ds, identity, cols; threads = threads)
11141114

1115-
Base.@propagate_inbounds function _op_for_join!(x, y, delim, last, p, idx, lo, hi)
1116-
idx[] += 1
1117-
@simd for i in lo:hi
1118-
if idx[] == 1
1119-
x[i] = STRING(y[i])
1120-
x[i] *= delim
1121-
elseif idx[] < p
1122-
x[i] *= STRING(y[i])
1123-
x[i] *= delim
1124-
else
1125-
x[i] *= STRING(y[i])
1126-
x[i] *= last
1115+
function _convert_uint8_to_string!(res, init0, curr_pos, ds, threads)
1116+
if threads
1117+
Threads.@threads for i in 1:nrow(ds)
1118+
res[i] = String(view(init0, 1:curr_pos[i]-1, i))
1119+
end
1120+
else
1121+
for i in 1:nrow(ds)
1122+
res[i] = String(view(init0, 1:curr_pos[i]-1, i))
1123+
end
1124+
end
1125+
end
1126+
function _add_last_for_join!(init0, curr_pos, ds, last_uint, last_len, threads)
1127+
if threads
1128+
Threads.@threads for i in 1:nrow(ds)
1129+
init0[curr_pos[i]-1:curr_pos[i]+last_len-2, i] = last_uint
1130+
curr_pos[i] += last_len-1
1131+
end
1132+
else
1133+
for i in 1:nrow(ds)
1134+
init0[curr_pos[i]-1:curr_pos[i]+last_len-2, i] = last_uint
1135+
curr_pos[i] += last_len-1
11271136
end
11281137
end
1129-
x
11301138
end
11311139

1132-
function row_join2(ds::AbstractDataset, cols = :; threads = true, delim = ",", last = "")
1140+
function row_join(ds::AbstractDataset, cols = :; threads = true, delim::AbstractString = ",", last::AbstractString = "")
11331141
colsidx = multiple_getindex(index(ds), cols)
1134-
init0 = Vector{Union{Missing, String}}(undef, nrow(ds))
11351142

1136-
if threads
1137-
cz = div(length(init0), __NCORES)
1138-
idx = [Ref{Int}(0) for _ in 1:__NCORES]
1139-
Threads.@threads for i in 1:__NCORES
1140-
lo = (i-1)*cz+1
1141-
i == __NCORES ? hi = length(init0) : hi = i*cz
1142-
mapreduce(identity, (x,y) -> _op_for_join!(x, y, delim, last, length(colsidx), idx[i], lo, hi), view(_columns(ds),colsidx), init = init0)
1143-
end
1143+
max_line_size = maximum(byrow(ds, sum, colsidx, by = y->length(__STRING(y)), threads = threads))
1144+
max_line_size += length(delim)*(length(colsidx)) + length(last)+1
1145+
init0 = Matrix{UInt8}(undef, max_line_size, nrow(ds))
1146+
curr_pos = ones(Int, nrow(ds))
1147+
1148+
delimiter = Base.CodeUnits(delim)
1149+
row_join!(init0, curr_pos, ds, repeat([identity], length(colsidx)), colsidx; delim = delimiter, quotechar = nothing, threads = threads)
1150+
if length(last)>0
1151+
last_uint = Base.CodeUnits(last)
1152+
last_len = length(last_uint)
1153+
_add_last_for_join!(init0, curr_pos, ds, last_uint, last_len, threads)
11441154
else
1145-
idx = Ref{Int}(0)
1146-
mapreduce(identity, (x,y) -> _op_for_join!(x, y, delim, last, length(colsidx), idx, 1, length(x)), view(_columns(ds),colsidx), init = init0)
1155+
curr_pos .-= 1
11471156
end
1148-
init0
1157+
res = Vector{Union{String, Missing}}(undef, nrow(ds))
1158+
_convert_uint8_to_string!(res, init0, curr_pos, ds, threads)
1159+
res
1160+
11491161
end
11501162

11511163

src/byrow/util.jl

+45-40
Original file line numberDiff line numberDiff line change
@@ -63,44 +63,50 @@ function hp_row_all_multi(ds::AbstractDataset, f::Vector{<:Function}, cols = :)
6363
mapreduce_index(f, _hp_op_for_all_multi!, view(_columns(ds),colsidx), ones(Bool, size(ds,1)))
6464
end
6565

66-
STRING(x) = string(x)
67-
STRING(::Missing) = ""
68-
STRING(x::Bool) = x ? "1" : "0"
69-
STRING(::Nothing) = ""
70-
71-
function _op_for_row_join(x, y, f, delim, quotechar, idx, p)
72-
idx[] += 1
73-
if quotechar === nothing
74-
if idx[] < p
75-
x .*= STRING.(f[idx[]].(y))
76-
x .*= delim
77-
else
78-
x .*= STRING.(f[idx[]].(y))
79-
x .*= '\n'
80-
end
81-
else
82-
if idx[] < p
83-
x .*= quotechar
84-
x .*= STRING.(f[idx[]].(y))
85-
x .*= quotechar
86-
x .*= delim
87-
else
88-
x .*= quotechar
89-
x .*= STRING.(f[idx[]].(y))
90-
x .*= quotechar
91-
x .*= '\n'
92-
end
93-
end
94-
x
95-
end
96-
97-
function row_join(ds::AbstractDataset, f::Vector{<:Function}, cols = :; delim = ',', quotechar = nothing)
98-
colsidx = index(ds)[cols]
99-
idx = Ref{Int}(0)
100-
p = length(colsidx)
101-
init0 = fill("", nrow(ds))
102-
mapreduce(identity, (x,y)->_op_for_row_join(x,y,f, delim, quotechar, idx, p), view(_columns(ds), colsidx), init = init0)
103-
end
66+
__STRING(x) = string(x)
67+
__STRING(x::AbstractString) = x
68+
__STRING(::Missing) = ""
69+
__STRING(x::Bool) = x ? "1" : "0"
70+
__STRING(::Nothing) = ""
71+
72+
__CODEUNIT(x) = Base.CodeUnits(__STRING(x))
73+
74+
75+
76+
#
77+
# function _op_for_row_join(x, y, f, delim, quotechar, idx, p)
78+
# idx[] += 1
79+
# if quotechar === nothing
80+
# if idx[] < p
81+
# x .*= STRING.(f[idx[]].(y))
82+
# x .*= delim
83+
# else
84+
# x .*= STRING.(f[idx[]].(y))
85+
# x .*= '\n'
86+
# end
87+
# else
88+
# if idx[] < p
89+
# x .*= quotechar
90+
# x .*= STRING.(f[idx[]].(y))
91+
# x .*= quotechar
92+
# x .*= delim
93+
# else
94+
# x .*= quotechar
95+
# x .*= STRING.(f[idx[]].(y))
96+
# x .*= quotechar
97+
# x .*= '\n'
98+
# end
99+
# end
100+
# x
101+
# end
102+
#
103+
# function row_join(ds::AbstractDataset, f::Vector{<:Function}, cols = :; delim = ',', quotechar = nothing)
104+
# colsidx = index(ds)[cols]
105+
# idx = Ref{Int}(0)
106+
# p = length(colsidx)
107+
# init0 = fill("", nrow(ds))
108+
# mapreduce(identity, (x,y)->_op_for_row_join(x,y,f, delim, quotechar, idx, p), view(_columns(ds), colsidx), init = init0)
109+
# end
104110

105111
# some experimental ideas
106112

@@ -255,10 +261,9 @@ end
255261
# buffer = Matrix{UInt8}(undef, lsize , nrow(ds))
256262
# currentpos = ones(Int, nrow(ds))
257263
function row_join!(buffer, currentpos, ds::AbstractDataset, f::Vector{<:Function}, cols = :; delim = ',', quotechar = nothing, threads = true)
258-
colsidx = index(ds)[cols]
264+
colsidx = multiple_getindex(index(ds), cols)
259265
p = length(colsidx)
260266
dlm = UInt8.(delim)
261-
262267
if threads
263268
cz = div(nrow(ds), __NCORES)
264269
idx = [Ref{Int}(0) for _ in 1:__NCORES]

0 commit comments

Comments
 (0)