Skip to content

Commit

Permalink
feat: more sharding utilities (#809)
Browse files Browse the repository at this point in the history
* feat: add support for DimsSharding

* fix: provide better error message

* fix: minor fixes

* feat: add shard_type and ndevices to infer the final shard types

* fix: reverse incorrect patch
  • Loading branch information
avik-pal authored Feb 27, 2025
1 parent 24fed87 commit e995366
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "Reactant"
uuid = "3c362404-f566-11ee-1572-e11a4b42c853"
authors = ["William Moses <[email protected]>", "Valentin Churavy <[email protected]>", "Sergio Sánchez Ramírez <[email protected]>", "Paul Berg <[email protected]>", "Avik Pal <[email protected]>"]
version = "0.2.32"
version = "0.2.33"

[deps]
Adapt = "79e6a3ab-5dfb-504d-930d-738a2a938a0e"
Expand Down
100 changes: 100 additions & 0 deletions src/Sharding.jl
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,16 @@ See also: [`Sharding.NamedSharding`](@ref)
"""
struct NoSharding <: AbstractSharding end

@inline ndevices(::NoSharding) = 1

@inline shard_type(::Type{NoSharding}, _) = ShardInfo{NoSharding,Nothing}

# This allows us to mark entire branches as NoSharding
Base.getproperty(::NoSharding, x) = NoSharding()
Base.getproperty(::NoSharding, x::Symbol) = NoSharding()

function (::NoSharding)(client::XLA.PJRT.Client, device, x::Union{AbstractArray,Number})
device === nothing && (device = XLA.default_device(client))
buffer = XLA.PJRT.AsyncBuffer(client, x, device)
return (buffer,), ShardInfo(NoSharding(), nothing)
end
Expand Down Expand Up @@ -185,6 +190,12 @@ struct NamedSharding{D1,D2,P<:Tuple} <: AbstractSharding
end
end

@inline ndevices(sharding::NamedSharding) = length(sharding.mesh.device_ids)

@inline function shard_type(::Type{NamedSharding{D1,D2,P}}, N) where {D1,D2,P}
return shard_type(HloSharding{D1,D2}, N)
end

function (sharding::NamedSharding)(
client::XLA.PJRT.Client, device::Nothing, x::Union{AbstractArray,Number}
)
Expand Down Expand Up @@ -226,6 +237,84 @@ function get_shardy_tensor_sharding_attribute(
)
end

# TODO: Something like NamedDims.jl will allow us to support NamedDimsSharding similar to
# `levanter`

"""
DimsSharding(
mesh::Mesh{M},
dims::NTuple{D,Int},
partition_spec;
is_closed::NTuple{D,Bool}=ntuple(Returns(true), D),
priority::NTuple{D,Int}=ntuple(i -> -1, D),
)
Similar to [`NamedSharding`](@ref) but works for a arbitrary dimensional array. Dimensions
not specified in `dims` are replicated. If any dimension in `dims` is greater than the total
number of dimensions in the array, the corresponding `partition_spec`, `is_closed` and
`priority` are ignored. Additionally for any negative dimensions in `dims`, the true
dims are calculated as `ndims(x) - dim + 1`. A dims value of `0` will throw an error.
"""
struct DimsSharding{M,D,P} <: AbstractSharding
mesh::Mesh{M}
dims::NTuple{D,Int}
partition_spec::P
is_closed::NTuple{D,Bool}
priority::NTuple{D,Int}

function DimsSharding(
mesh::Mesh{M},
dims::NTuple{D,Int},
partition_spec;
is_closed::NTuple{D,Bool}=ntuple(Returns(true), length(partition_spec)),
priority::NTuple{D,Int}=ntuple(i -> -1, length(partition_spec)),
) where {M,D}
@assert length(partition_spec) == length(dims)
# Validity checks on the inputs are deferred to NamedSharding
return new{M,D,typeof(partition_spec)}(
mesh, dims, partition_spec, is_closed, priority
)
end
end

@inline ndevices(sharding::DimsSharding) = length(sharding.mesh.device_ids)

@inline function shard_type(::Type{DimsSharding{M,D,P}}, N) where {M,D,P}
return shard_type(HloSharding{M,N}, N)
end

function standardize_sharding(sharding::DimsSharding, x::Union{AbstractArray,Number})
final_dims = map(sharding.dims) do d
@assert !iszero(d) "dims cannot contain 0"
return ifelse(d < 0, ndims(x) + d + 1, d)
end

dim_indices = ntuple(i -> findfirst(==(i), final_dims), ndims(x))
partition_spec = ntuple(ndims(x)) do i
dim_index = dim_indices[i]
dim_index === nothing && return nothing # replicated dimension
return sharding.partition_spec[dim_index]
end
is_closed = ntuple(ndims(x)) do i
dim_index = dim_indices[i]
dim_index === nothing && return true # replicated dimension
return sharding.is_closed[dim_index]
end
priority = ntuple(ndims(x)) do i
dim_index = dim_indices[i]
dim_index === nothing && return -1 # replicated dimension
return sharding.priority[dim_index]
end

return NamedSharding(sharding.mesh, partition_spec; is_closed, priority)
end

function (sharding::DimsSharding)(
client::XLA.PJRT.Client, device::Nothing, x::Union{AbstractArray,Number}
)
return (standardize_sharding(sharding, x))(client, device, x)
end

# HloSharding
# This stores the sharding information in the form of XLA.HloSharding, and provides a
# central type for the final storage. It also potentially saves us the pain of not having
Expand All @@ -244,6 +333,12 @@ struct HloSharding{D1,D2} <: AbstractSharding
end
end

@inline ndevices(sharding::HloSharding) = length(sharding.mesh.device_ids)

@inline function shard_type(::Type{HloSharding{D1,D2}}, N) where {D1,D2}
return ShardInfo{HloSharding{D1,D2},Vector{NTuple{N,UnitRange{Int64}}}}
end

function Base.convert(::Type{HloSharding}, sharding::NamedSharding)
if MLIR.IR._has_context()
ctx = MLIR.IR.context()
Expand Down Expand Up @@ -321,6 +416,10 @@ struct ShardInfo{S,D} <: AbstractSharding
device_to_array_slices::D
end

@inline ndevices(sharding::ShardInfo) = length(sharding.mesh)

@inline shard_type(::Type{ShardInfo{S,D}}, N) where {S,D} = shard_type(S, N)

function Base.getproperty(sharding::ShardInfo, name::Symbol)
name (:sharding, :device_to_array_slices) && return getfield(sharding, name)
return getproperty(sharding.sharding, name)
Expand Down Expand Up @@ -348,6 +447,7 @@ Checks whether the given sharding refers to no sharding.
"""
is_sharded(::NoSharding) = false
is_sharded(::NamedSharding) = true
is_sharded(::DimsSharding) = true
is_sharded(::HloSharding) = true
is_sharded(s::ShardInfo) = is_sharded(s.sharding)

Expand Down
59 changes: 30 additions & 29 deletions src/Tracing.jl
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,9 @@ Base.@nospecializeinfer function traced_type_inner(
@nospecialize(sharding)
)
if Mode == ArrayToConcrete && T <: track_numbers
if !Sharding.is_sharded(sharding)
return ConcretePJRTNumber{T,1,Sharding.NoShardInfo}
else
error("TODO: implement sharding")
end
return ConcretePJRTNumber{
T,Sharding.ndevices(sharding),Sharding.shard_type(typeof(sharding), 0)
}
elseif (mode == NoStopTracedTrack || mode == TracedTrack || mode == TracedSetPath) &&
T <: track_numbers
return TracedRNumber{T}
Expand Down Expand Up @@ -300,11 +298,12 @@ Base.@nospecializeinfer function traced_type_inner(
if mode == ConcreteToTraced
throw("TracedRArray cannot be traced")
elseif mode == TracedToConcrete
if !Sharding.is_sharded(sharding)
return ConcretePJRTArray{T.parameters[1],T.parameters[2],1,Sharding.NoShardInfo}
else
error("TODO: implement sharding")
end
return ConcretePJRTArray{
T.parameters[1],
T.parameters[2],
Sharding.ndevices(sharding),
Sharding.shard_type(typeof(sharding), T.parameters[2]),
}
elseif mode == TracedTrack || mode == NoStopTracedTrack || mode == TracedSetPath
return T
else
Expand All @@ -322,14 +321,21 @@ Base.@nospecializeinfer function traced_type_inner(
if mode == ConcreteToTraced
throw("TracedRNumber cannot be traced")
elseif mode == TracedToConcrete
if !Sharding.is_sharded(sharding)
if T isa UnionAll
return UnionAll(T.var, ConcretePJRTNumber{T.var,1,Sharding.NoShardInfo})
end
return ConcretePJRTNumber{T.parameters[1],1,Sharding.NoShardInfo}
else
error("TODO: implement sharding")
if T isa UnionAll
return UnionAll(
T.var,
ConcretePJRTNumber{
T.var,
Sharding.ndevices(sharding),
Sharding.shard_type(typeof(sharding), 0),
},
)
end
return ConcretePJRTNumber{
T.parameters[1],
Sharding.ndevices(sharding),
Sharding.shard_type(typeof(sharding), 0),
}
elseif mode == TracedTrack || mode == NoStopTracedTrack || mode == TracedSetPath
return T
else
Expand All @@ -347,11 +353,9 @@ Base.@nospecializeinfer function traced_type_inner(
if mode == ConcreteToTraced
throw("TracedRNG cannot be traced")
elseif mode == TracedToConcrete
if !Sharding.is_sharded(sharding)
return ConcreteRNG{1,Sharding.NoShardInfo}
else
error("TODO: implement sharding")
end
return ConcreteRNG{
traced_type_inner(TracedRArray{UInt64,1}, seen, mode, track_numbers, sharding)
}
elseif mode == TracedTrack || mode == NoStopTracedTrack || mode == TracedSetPath
return T
else
Expand Down Expand Up @@ -413,11 +417,9 @@ Base.@nospecializeinfer function traced_type_inner(
else
N = ndims(A)
if mode == ArrayToConcrete && T <: Reactant.ReactantPrimitive
if !Sharding.is_sharded(sharding)
return ConcretePJRTArray{T,N,1,Sharding.NoShardInfo}
else
error("TODO: implement sharding")
end
return ConcretePJRTArray{
T,N,Sharding.ndevices(sharding),Sharding.shard_type(typeof(sharding), N)
}
else
return Array{
traced_type_inner(T, seen, mode, track_numbers, getproperty(sharding, 1)),N
Expand Down Expand Up @@ -914,7 +916,7 @@ function make_tracer(
if !Sharding.is_sharded(sharding)
return prev
else
error("TODO: implement sharding")
return ConcretePJRTNumber(prev; sharding)
end
end
if mode != ConcreteToTraced
Expand Down Expand Up @@ -1106,7 +1108,6 @@ function make_tracer(
return nothing
end
RT = Core.Typeof(prev)
Sharding.is_sharded(sharding) && error("Cannot specify sharding for Numbers")
if RT <: track_numbers
if mode == ArrayToConcrete
return ConcretePJRTNumber(prev; sharding)
Expand Down
11 changes: 10 additions & 1 deletion src/xla/Sharding.jl
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,16 @@ function sharding_to_concrete_array_indices(
@assert n_shards > 0 "Invalid number of shards: $n_shards"
n_shards == 1 && return [1:dim]
shard_size, remainder = divrem(dim, n_shards)
@assert remainder == 0 "Dimension $dim not evenly divisible by $n_shards shards"

if remainder != 0
throw(
DimensionMismatch(
"Dimension of Size $(dim) cannot be partitioned into $(n_shards) \
shards each of size $(shard_size) (remainder = $(remainder)).",
),
)
end

return [(i * shard_size + 1):((i + 1) * shard_size) for i in 0:(n_shards - 1)]
end

Expand Down
3 changes: 3 additions & 0 deletions test/sharding.jl
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,19 @@ fn_test3(x) = sum(x; dims=1)
Sharding.NamedSharding(mesh, ("model", "data")),
Sharding.NamedSharding(mesh, ("model", nothing)),
Sharding.NamedSharding(mesh, (nothing, "data")),
Sharding.DimsSharding(mesh, (2,), (:data,)),
),
(
Sharding.NamedSharding(mesh, ("model", "data")),
Sharding.NamedSharding(mesh, (nothing, "data")),
Sharding.NoSharding(),
Sharding.DimsSharding(mesh, (-2,), (:model,)),
),
(
Sharding.NamedSharding(mesh, ("model", "data")),
Sharding.NoSharding(),
Sharding.NoSharding(),
Sharding.NamedSharding(mesh, ("model", "data")),
),
)
samples_ra = Reactant.to_rarray(samples; sharding=samples_sharding)
Expand Down

2 comments on commit e995366

@avik-pal
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/125929

Tip: Release Notes

Did you know you can add release notes too? Just add markdown formatted text underneath the comment after the text
"Release notes:" and it will be added to the registry PR, and if TagBot is installed it will also be added to the
release that TagBot creates. i.e.

@JuliaRegistrator register

Release notes:

## Breaking changes

- blah

To add them here just re-invoke and the PR will be updated.

Tagging

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.2.33 -m "<description of version>" e99536676791c30e4e01668646d212ed2ec08716
git push origin v0.2.33

Please sign in to comment.