-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathsearch_index.js
3 lines (3 loc) · 54.1 KB
/
search_index.js
1
2
3
var documenterSearchIndex = {"docs":
[{"location":"api/#API","page":"API","title":"API","text":"","category":"section"},{"location":"api/#Modules","page":"API","title":"Modules","text":"","category":"section"},{"location":"api/","page":"API","title":"API","text":"Order = [:module]","category":"page"},{"location":"api/#Types-and-constants","page":"API","title":"Types and constants","text":"","category":"section"},{"location":"api/","page":"API","title":"API","text":"Order = [:type, :constant]","category":"page"},{"location":"api/#Functions-and-macros","page":"API","title":"Functions and macros","text":"","category":"section"},{"location":"api/","page":"API","title":"API","text":"Order = [:macro, :function]","category":"page"},{"location":"api/#Documentation","page":"API","title":"Documentation","text":"","category":"section"},{"location":"api/","page":"API","title":"API","text":"Modules = [ParallelProcessingTools]\nOrder = [:module, :type, :constant, :macro, :function]","category":"page"},{"location":"api/#ParallelProcessingTools.AbstractThreadLocal","page":"API","title":"ParallelProcessingTools.AbstractThreadLocal","text":"abstract type AbstractThreadLocal{T} end\n\nAbstract type for thread-local values of type T.\n\nThe value for the current thread is accessed via getindex(::AbstractThreadLocal) and `setindex(::AbstractThreadLocal, x).\n\nTo access both regular and thread-local values in a unified manner, use the function getlocalvalue.\n\nTo get the all values across all threads, use the function getallvalues.\n\nDefault implementation is ThreadLocal.\n\n\n\n\n\n","category":"type"},{"location":"api/#ParallelProcessingTools.AutoThreadPinning","page":"API","title":"ParallelProcessingTools.AutoThreadPinning","text":"struct AutoThreadPinning\n\nParallelProcessingTools default thread pinning mode.\n\nConstructor:\n\nAutoThreadPinning(; random::Bool = false, pin_blas::Bool = false)\n\nArguments:\n\nrandom: Use system topology based random thread pinning if no thread affinity mask is set (e.g. via SLURM, taskset).\nblas: Try to pin BLAS threads. Not fully functional due to bugs in BLAS thread pinning (see ThreadPinning issue #105).\n\nUse with ThreadPinning.pinthreads:\n\nusing ParallelProcessingTools, ThreadPinning\npinthreads(AutoThreadPinning())\n\n\n\n\n\n","category":"type"},{"location":"api/#ParallelProcessingTools.CreateNew","page":"API","title":"ParallelProcessingTools.CreateNew","text":"CreateNew() isa WriteMode\n\nIndicates that new files should be created and to throw and eror if the files already exist.\n\nSee WriteMode and write_files.\n\n\n\n\n\n","category":"type"},{"location":"api/#ParallelProcessingTools.CreateOrIgnore","page":"API","title":"ParallelProcessingTools.CreateOrIgnore","text":"CreateOrIgnore() isa WriteMode\n\nIndicates that new files should be created, and that nothing should be done if if the files already exist.\n\nCauses an error to be thrown if only some of the files exist, to indicate an inconsistent state.\n\nCreateOrIgnore() is the recommended default when creating files in a parallel computing context, especially if failure or timeouts might result in re-tries. This way, if multiple workers try to create the same file(s), only one file or consistent set of files will be created under the target filenames.\n\nSee WriteMode and write_files.\n\n\n\n\n\n","category":"type"},{"location":"api/#ParallelProcessingTools.CreateOrModify","page":"API","title":"ParallelProcessingTools.CreateOrModify","text":"CreateOrIgnore() isa WriteMode\n\nIndicates that either new files should be created, or that existing files should be modified.\n\nCauses an error to be thrown if only some of the files exist already, to indicate an inconsistent state.\n\nSee WriteMode and write_files.\n\n\n\n\n\n","category":"type"},{"location":"api/#ParallelProcessingTools.CreateOrReplace","page":"API","title":"ParallelProcessingTools.CreateOrReplace","text":"CreateOrReplace() isa WriteMode\n\nIndicates that new files should be created and existing files should be replaced.\n\nSee WriteMode and write_files.\n\n\n\n\n\n","category":"type"},{"location":"api/#ParallelProcessingTools.DynamicAddProcsMode","page":"API","title":"ParallelProcessingTools.DynamicAddProcsMode","text":"abstract type ParallelProcessingTools.DynamicAddProcsMode <: ParallelProcessingTools.RunProcsMode\n\nAbstract supertype for worker start modes that use an elastic cluster manager that enables dynamic addition and removal of worker processes.\n\nSubtypes must implement:\n\nParallelProcessingTools.worker_start_command(runmode::SomeDynamicAddProcsMode, manager::ClusterManager)\nParallelProcessingTools.runworkers(runmode::SomeDynamicAddProcsMode, manager::ClusterManager)\n\n\n\n\n\n","category":"type"},{"location":"api/#ParallelProcessingTools.FilesToRead","page":"API","title":"ParallelProcessingTools.FilesToRead","text":"struct ParallelProcessingTools.FilesToRead\n\nCreated by read_files, represents a set of (temporary) files to read from.\n\nWith ftr::FilesToRead, use collect(ftr) or iterate(ftr) to access the filenames to read from. Use close(ftr) or close(ftr, true) to close things in good order, indicating success, and use close(ftr, false) or close(ftr, err:Exception) to abort, indicating failure.\n\nSee read_files for example code.\n\nIf aborted or if the Julia process exits without ftr being closed, temporary files are still cleaned up, unless read_files was used with delete_tmp_onerror = false.\n\n\n\n\n\n","category":"type"},{"location":"api/#ParallelProcessingTools.FilesToWrite","page":"API","title":"ParallelProcessingTools.FilesToWrite","text":"struct ParallelProcessingTools.FilesToWrite\n\nCreated by write_files, represents a set of (temporary) files to write to.\n\nWith ftw::FilesToWrite, use collect(ftw) or iterate(ftw) to access the filenames to write to. Use close(ftw) or close(ftw, true) to close things in good order, indicating success, and use close(ftw, false) or close(ftw, err:Exception) to abort, indicating failure.\n\nSee write_files for example code.\n\nIf aborted or if the Julia process exits without ftw being closed, temporary files are still cleaned up, unless write_files was used with delete_tmp_onerror = false.\n\n\n\n\n\n","category":"type"},{"location":"api/#ParallelProcessingTools.FlexWorkerPool","page":"API","title":"ParallelProcessingTools.FlexWorkerPool","text":"FlexWorkerPool{WP<:AbstractWorkerPool}(\n worker_pids::AbstractVector{Int};\n label::AbstractString = \"\", maxoccupancy::Int = 1, init_workers::Bool = true\n)::AbstractWorkerPool\n\nFlexWorkerPool(; caching = false, withmyid::Bool = true, kwargs...)\n\nAn flexible worker pool, intended to work with cluster managers that may add and remove Julia processes dynamically.\n\nIf the current process (Distributed.myid()) is part of the pool, resp. if withmyid is true, it will be used as a fallback when no other workers are in are members of the pool (e.g. because no other processes have been added yet or because all other processes in the pool have terminated and been removed from it). The current process will not be used as a fallback when all other workers are currently in use.\n\nIf caching is true, the pool will use a Distributed.CachingPool as the underlying pool, otherwise a Distributed.WorkerPool.\n\nIf maxoccupancyis greater than one, individual workers can be used maxoccupancy times in parallel. So take!(pool) may return the same process ID pid multiple times without a put!(pool, pid) in between. Such a (ideally moderate) oversubscription can be useful to reduce latency-related idle times on workers: e.g. if communication latency to the worker is not short compared the the runtime of the function called on them. Or if the remote functions are often blocked waiting for I/O. Note: Workers still must be put back the same number of times they were taken from the pool, in total.\n\nIf init_workers is true, workers taken from the pool will be guaranteed to be initialized to the current global initialization level (see @always_everywhere).\n\nWP is the type of the underlying worker pool used, e.g. Distributed.WorkerPool (default) or Distributed.CachingPool.\n\nExample:\n\nusing ParallelProcessingTools, Distributed\n\npool = FlexWorkerPool(withmyid = true, maxoccupancy = 3)\n\nworkers(pool)\n\npids = [take!(pool) for _ in 1:3]\n@assert pids == repeat([myid()], 3)\nforeach(pid -> put!(pool, pid), pids)\n\naddprocs(4)\nworker_procs = workers()\npush!.(Ref(pool), worker_procs)\n\npids = [take!(pool) for _ in 1:4*3]\n@assert pids == repeat(worker_procs, 3)\nforeach(pid -> put!(pool, pid), pids)\nrmprocs(worker_procs)\n\npids = [take!(pool) for _ in 1:3]\n@assert pids == repeat([myid()], 3)\nforeach(pid -> put!(pool, pid), pids)\n\n\n\n\n\n","category":"type"},{"location":"api/#ParallelProcessingTools.HTCondorRun","page":"API","title":"ParallelProcessingTools.HTCondorRun","text":"HTCondorRun(;\n n::Int = 1\n condor_flags::Cmd = _default_condor_flags()\n condor_settings::Dict{String,String} = Dict{String,String}()\n julia_flags::Cmd = _default_julia_flags()\n julia_depot::Vector{String} = DEPOT_PATH\n jobfile_dir = homedir()\n env::Dict{String,String} = Dict{String,String}()\n redirect_output::Bool = true\n)\n\nMode to add worker processes via HTCondor condor_submit.\n\nCondor submit script and steering .sh files are stored in jobfile_dir.\n\nExample:\n\njulia> runmode = HTCondorRun(n = 10; condor_settings=Dict(\"universe\" => \"vanilla\", \"+queue\" => \"short\", \"request_memory\" => \"4GB\"))\ntask = runworkers(runmode)\n\njulia> runworkers(runmode)\n[ Info: Submitting HTCondor job: `condor_submit /home/jiling/jl_rAHyFadwHa.sub`\nSubmitting job(s)..........\n10 job(s) submitted to cluster 3198291.\n[ Info: HTCondor job submitted: `condor_submit /home/jiling/jl_rAHyFadwHa.sub`\n(nothing, 10)\n\njulia> sleep(10)\n\njulia> nworkers()\n10\n\nWorkers can also be started manually, use worker_start_command(runmode) to get the condor_submit start command and run it from a separate process or so.\n\n\n\n\n\n","category":"type"},{"location":"api/#ParallelProcessingTools.MaxTriesExceeded","page":"API","title":"ParallelProcessingTools.MaxTriesExceeded","text":"MaxTriesExceeded <: Exception\n\nException thrown when a number of (re-)tries was exceeded.\n\n\n\n\n\n","category":"type"},{"location":"api/#ParallelProcessingTools.ModifyExisting","page":"API","title":"ParallelProcessingTools.ModifyExisting","text":"ModifyExisting() isa WriteMode\n\nIndicates that existing files should be modified.\n\nCauses an error to be thrown if not all of the files exist already.\n\nSee WriteMode and write_files.\n\n\n\n\n\n","category":"type"},{"location":"api/#ParallelProcessingTools.NonZeroExitCode","page":"API","title":"ParallelProcessingTools.NonZeroExitCode","text":"ParallelProcessingTools.NonZeroExitCode(cmd::Cmd, exitcode::Integer) isa Exception\n\nException to indicate that a an external process running cmd failed with the given exit code (not equal zero).\n\n\n\n\n\n","category":"type"},{"location":"api/#ParallelProcessingTools.OnLocalhost","page":"API","title":"ParallelProcessingTools.OnLocalhost","text":"OnLocalhost(;\n n::Integer = 1\n env::Dict{String,String} = Dict{String,String}()\n) isa DynamicAddProcsMode\n\nMode that runs n worker processes on the current host.\n\nExample:\n\nrunmode = OnLocalhost(n = 4)\ntask, n = runworkers(runmode)\n\nThreads.@async begin\n wait(task)\n @info \"SLURM workers have terminated.\"\nend\n\n@wait_while nprocs()-1 < n)\n\nWorkers can also be started manually, use worker_start_command(runmode) to get the system (shell) command and run it from a separate process or so.\n\n\n\n\n\n","category":"type"},{"location":"api/#ParallelProcessingTools.RunProcsMode","page":"API","title":"ParallelProcessingTools.RunProcsMode","text":"abstract type ParallelProcessingTools.RunProcsMode\n\nAbstract supertype for worker process run modes.\n\nSubtypes must implement:\n\nParallelProcessingTools.runworkers(runmode::SomeRunProcsMode, manager::Distributed.AbstractClusterManager)\n\n\n\n\n\n","category":"type"},{"location":"api/#ParallelProcessingTools.SlurmRun","page":"API","title":"ParallelProcessingTools.SlurmRun","text":"SlurmRun(;\n slurm_flags::Cmd = {defaults}\n julia_flags::Cmd = {defaults}\n dir = pwd()\n env::Dict{String,String} = Dict{String,String}()\n redirect_output::Bool = true\n)\n\nMode to add worker processes via SLURM srun.\n\nsrun and Julia worker julia command line flags are inferred from SLURM environment variables (e.g. when inside of an salloc or batch job), as well as slurm_flags and julia_flags.\n\nWorkers are started with current directory set to dir.\n\nExample:\n\nrunmode = SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)\ntask = runworkers(runmode)\n\nThreads.@async begin\n wait(task)\n @info \"SLURM workers have terminated.\"\nend\n\n@wait_while nprocs()-1 < n\n\nWorkers can also be started manually, use worker_start_command(runmode) to get the srun start command and run it from a separate process or so.\n\n\n\n\n\n","category":"type"},{"location":"api/#ParallelProcessingTools.ThreadLocal","page":"API","title":"ParallelProcessingTools.ThreadLocal","text":"ThreadLocal{T} <: AbstractThreadLocal{T}\n\nRepresents a thread-local value. See AbstractThreadLocal for the API.\n\nConstructors:\n\nThreadLocal{T}() where {T}\nThreadLocal(value::T) where {T}\nThreadLocal{T}(f::Base.Callable) where {T}\n\nExamples:\n\ntlvalue = ThreadLocal(0)\n@onthreads allthreads() tlvalue[] = Base.Threads.threadid()\ngetallvalues(tlvalue) == allthreads()\n\nrand_value_on_each_thread = ThreadLocal{Float64}(rand)\nall(x -> 0 < x < 1, getallvalues(rand_value_on_each_thread))\n\n\n\n\n\n","category":"type"},{"location":"api/#ParallelProcessingTools.TimelimitExceeded","page":"API","title":"ParallelProcessingTools.TimelimitExceeded","text":"TimelimitExceeded <: Exception\n\nException thrown something timed out.\n\n\n\n\n\n","category":"type"},{"location":"api/#ParallelProcessingTools.WriteMode","page":"API","title":"ParallelProcessingTools.WriteMode","text":"abstract type WriteMode\n\nAbstract type for write modes.\n\nMay be one of the following subtypes: CreateNew, CreateOrIgnore, CreateOrReplace, CreateOrModify, ModifyExisting.\n\nUsed by write_files.\n\n\n\n\n\n","category":"type"},{"location":"api/#ParallelProcessingTools.@always_everywhere-Tuple{Any}","page":"API","title":"ParallelProcessingTools.@always_everywhere","text":"@always_everywhere(expr)\n\nRuns expr on all current Julia processes, but also all future Julia processes after an ensure_procinit()) when managed using a FlexWorkerPool.\n\nSimilar to Distributed.everywhere, but also stores expr so that ensure_procinit can execute it on future worker processes.\n\nExample:\n\n@always_everywhere begin\n using SomePackage\n using SomeOtherPackage\n \n some_global_variable = 42\nend\n\nSee also ParallelProcessingTools.add_procinit_code and ParallelProcessingTools.ensure_procinit.\n\n\n\n\n\n","category":"macro"},{"location":"api/#ParallelProcessingTools.@critical-Tuple{Any}","page":"API","title":"ParallelProcessingTools.@critical","text":"@critical expr\n\nMark code in expr as a critical section. Code in critical sections will never be executed in parallel (via multithreading) to any other critical section.\n\n@critical is very useful to mark non-threadsafe code.\n\nExample:\n\n@onthreads allthreads() begin\n @critical @info Base.Threads.threadid()\nend\n\nWithout `@critical`, the above will typically crash Julia.\n\n\n\n\n\n","category":"macro"},{"location":"api/#ParallelProcessingTools.@mt_out_of_order-Tuple{Any}","page":"API","title":"ParallelProcessingTools.@mt_out_of_order","text":"@mt_out_of_order begin expr... end\n\nRuns all top-level expressions in begin expr... end on parallel multi-threaded tasks.\n\nExample:\n\n``` @mtoutof_order begin a = foo() bar() c = baz() end\n\nwill run a = foo(), bar() and c = baz() in parallel and in arbitrary order, results of assignments will appear in the outside scope.\n\n\n\n\n\n","category":"macro"},{"location":"api/#ParallelProcessingTools.@onprocs-Tuple{Any, Any}","page":"API","title":"ParallelProcessingTools.@onprocs","text":"@onprocs procsel expr\n\nExecutes expr in parallel on all processes in procsel. Waits until all processes are done. Returns all results as a vector (or as a single scalar value, if procsel itself is a scalar).\n\nExample:\n\nusing Distributed\naddprocs(2)\nworkers() == @onprocs workers() myid()\n\n\n\n\n\n","category":"macro"},{"location":"api/#ParallelProcessingTools.@onthreads-Tuple{Any, Any}","page":"API","title":"ParallelProcessingTools.@onthreads","text":"@onthreads threadsel expr\n\nExecute code in expr in parallel on the threads in threadsel.\n\nthreadsel should be a single thread-ID or a range (or array) of thread-ids. If threadsel == Base.Threads.threadid(), expr is run on the current tread with only minimal overhead.\n\nExample 1:\n\ntlsum = ThreadLocal(0.0)\ndata = rand(100)\n@onthreads allthreads() begin\n tlsum[] = sum(workpart(data, allthreads(), Base.Threads.threadid()))\nend\nsum(getallvalues(tlsum)) ≈ sum(data)\n\nExample 2:\n\n# Assuming 4 threads:\ntl = ThreadLocal(42)\nthreadsel = 2:3\n@onthreads threadsel begin\n tl[] = Base.Threads.threadid()\nend\ngetallvalues(tl)[threadsel] == [2, 3]\ngetallvalues(tl)[[1,4]] == [42, 42]\n\n\n\n\n\n","category":"macro"},{"location":"api/#ParallelProcessingTools.@return_exceptions-Tuple{Any}","page":"API","title":"ParallelProcessingTools.@return_exceptions","text":"@return_exceptions expr\n\nRuns expr and catches and returns exceptions as values instead of having them thrown.\n\nUseful for user-side debugging, especially of parallel and/or remote code execution.\n\nSee also @userfriendly_exceptions.\n\n\n\n\n\n","category":"macro"},{"location":"api/#ParallelProcessingTools.@userfriendly_exceptions-Tuple{Any}","page":"API","title":"ParallelProcessingTools.@userfriendly_exceptions","text":"@userfriendly_exceptions expr\n\nTransforms exceptions originating from expr into more user-friendly ones.\n\nIf multiple exceptions originate from parallel code in expr, only one is rethrown, and TaskFailedExceptions and RemoteExceptions are replaced by the original exceptions that caused them.\n\nSee [inner_exception] and onlyfirst_exception.\n\n\n\n\n\n","category":"macro"},{"location":"api/#ParallelProcessingTools.@wait_while-Tuple","page":"API","title":"ParallelProcessingTools.@wait_while","text":"@wait_while [maxtime=nothing] [timeout_error=false] cond\n\nWait while cond is true, using slowly increasing sleep times in between evaluating cond.\n\ncond may be an arbitrary Julia expression.\n\nIf maxtime is given with an real value, will only wait for maxtime seconds, if the value is zero or negative will not wait at all.\n\nIf timeout_error is true, will throw a TimelimitExceeded exception if the maximum waiting time is exceeded.\n\nExample, wait for a task with a maxtime:\n\ntask = Threads.@spawn sleep(10)\ntimer = Timer(2)\n@wait_while !istaskdone(task) && isopen(timer)\nistaskdone(task) == false\n\n\n\n\n\n","category":"macro"},{"location":"api/#ParallelProcessingTools.add_procinit_code-Tuple{Any}","page":"API","title":"ParallelProcessingTools.add_procinit_code","text":"ParallelProcessingTools.add_procinit_code(expr; run_everywhere::Bool = false)\n\nAdd expr to process init code. expr is run on the current proccess immediately, but not automatically on remote processes unless run_everywhere is true.\n\nUser code should typically not need to call this function, but should use @always_everywhere instead.\n\nSee also ParallelProcessingTools.get_procinit_code and ParallelProcessingTools.ensure_procinit.\n\n\n\n\n\n","category":"method"},{"location":"api/#ParallelProcessingTools.allprocs_management_lock-Tuple{}","page":"API","title":"ParallelProcessingTools.allprocs_management_lock","text":"ParallelProcessingTools.allprocs_management_lock()::ReentrantLock\n\nReturns the global process operations lock. This lock is used to protect operations that concern the management of all processes.\n\n\n\n\n\n","category":"method"},{"location":"api/#ParallelProcessingTools.allthreads-Tuple{}","page":"API","title":"ParallelProcessingTools.allthreads","text":"allthreads()\n\nConvencience function, returns an equivalent of 1:Base.Threads.nthreads().\n\n\n\n\n\n","category":"method"},{"location":"api/#ParallelProcessingTools.clear_worker_caches!","page":"API","title":"ParallelProcessingTools.clear_worker_caches!","text":"clear_worker_caches!(pool::AbstractWorkerPool)\n\nClear the worker caches (cached function closures, etc.) on the workers In pool.\n\nDoes nothing if the pool doesn't perform any on-worker caching.\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.current_procinit_level-Tuple{}","page":"API","title":"ParallelProcessingTools.current_procinit_level","text":"ParallelProcessingTools.current_procinit_level()\n\nReturn the init level of the current process.\n\nSee also global_procinit_level.\n\n\n\n\n\n","category":"method"},{"location":"api/#ParallelProcessingTools.default_cache_dir!-Tuple{AbstractString}","page":"API","title":"ParallelProcessingTools.default_cache_dir!","text":"ParallelProcessingTools.default_cache_dir!(dir::AbstractString)\n\nSets the default cache directory to dir and returns it.\n\nSee also default_cache_dir!.\n\n\n\n\n\n","category":"method"},{"location":"api/#ParallelProcessingTools.default_cache_dir-Tuple{}","page":"API","title":"ParallelProcessingTools.default_cache_dir","text":"ParallelProcessingTools.default_cache_dir()::String\n\nReturns the default cache directory, e.g. for write_files and read_files(@ref).\n\nSee also default_cache_dir!.\n\n\n\n\n\n","category":"method"},{"location":"api/#ParallelProcessingTools.ensure_procinit","page":"API","title":"ParallelProcessingTools.ensure_procinit","text":"ensure_procinit(pid::Int)\nensure_procinit(pids::AbstractVector{Int} = workers())\n\nRun process initialization code on the given process(es) if necessary, returns after initialization is complete.\n\nWhen using a FlexWorkerPool, worker initialization can safely be run in the background though, as the pool will only offer workers (via take!(pool)) after it has fully initialized them.\n\nSee also ParallelProcessingTools.get_procinit_code and ParallelProcessingTools.add_procinit_code.\n\nSee also ParallelProcessingTools.get_procinit_code, ParallelProcessingTools.ensure_procinit, ParallelProcessingTools.global_procinit_level and ParallelProcessingTools.current_procinit_level.\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.ensure_procinit_or_kill","page":"API","title":"ParallelProcessingTools.ensure_procinit_or_kill","text":"ParallelProcessingTools.ensure_procinit_or_kill(pid::Int)\nParallelProcessingTools.ensure_procinit_or_kill(pids::AbstractVector{Int} = workers())\n\nRun process initialization code on the given process(es) if necessary, kill and remove process(es) for which initialization fails.\n\nSee also ParallelProcessingTools.ensure_procinit.\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.get_procinit_code-Tuple{}","page":"API","title":"ParallelProcessingTools.get_procinit_code","text":"ParallelProcessingTools.get_procinit_code()\n\nReturns the code that should be run on each process to ensure that desired packages are loaded and global variable are set up as expected.\n\nSee also ParallelProcessingTools.add_procinit_code and ParallelProcessingTools.ensure_procinit.\n\n\n\n\n\n","category":"method"},{"location":"api/#ParallelProcessingTools.getallvalues","page":"API","title":"ParallelProcessingTools.getallvalues","text":"getallvalues(v::AbstractThreadLocal{T})::AbstractVector{T}\n\nAccess the all values (one for each thread) of a thread-local value as a vector. Can only be called in single-threaded code sections.\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.getlabel","page":"API","title":"ParallelProcessingTools.getlabel","text":"ParallelProcessingTools.getlabel(obj)\n\nReturns a descriptive label for obj suitable for using in exceptions and logging messages. Defaults to string(obj).\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.getlocalvalue","page":"API","title":"ParallelProcessingTools.getlocalvalue","text":"getlocalvalue(x::Any) = x\ngetlocalvalue(x::ThreadLocal) = x[]\n\nAccess plain values and thread-local values in a unified fashion.\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.global_procinit_level-Tuple{}","page":"API","title":"ParallelProcessingTools.global_procinit_level","text":"ParallelProcessingTools.global_procinit_level()\n\nReturn the global process init level.\n\nReturns, e.g., the number of times add_procinit_code resp. @always_everywhere have been called.\n\nSee also current_procinit_level.\n\n\n\n\n\n","category":"method"},{"location":"api/#ParallelProcessingTools.hasfailed","page":"API","title":"ParallelProcessingTools.hasfailed","text":"ParallelProcessingTools.hasfailed(obj)::Bool\n\nChecks if obj has failed in some way.\n\nSupports Task and Process and may be extended to other object types.\n\nReturns false if isnothing(obj) or ismissing(obj).\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.idle_sleep-Tuple{Integer, Real, Real}","page":"API","title":"ParallelProcessingTools.idle_sleep","text":"idle_sleep(n_idle::Integer, t_interval_s, t_max_s)\n\nSleep because something has been idle for n_idle times.\n\nWill sleep for log2(n_idle + 1) * t_interval_s seconds, but at most for t_max_s seconds.\n\nGuaranteed yield() at least once, even if n_idle is zero.\n\n\n\n\n\n","category":"method"},{"location":"api/#ParallelProcessingTools.in_vscode_notebook-Tuple{}","page":"API","title":"ParallelProcessingTools.in_vscode_notebook","text":"ParallelProcessingTools.in_vscode_notebook():Bool\n\nTest if running within a Visual Studio Code notebook.\n\n\n\n\n\n","category":"method"},{"location":"api/#ParallelProcessingTools.inner_exception","page":"API","title":"ParallelProcessingTools.inner_exception","text":"ParallelProcessingTools.inner_exception(err)\n\nReplaces exceptions like a TaskFailedException or a RemoteException with their underlying cause. Leaves other exceptions unchanged.\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.isactive","page":"API","title":"ParallelProcessingTools.isactive","text":"ParallelProcessingTools.isactive(obj)::Bool\n\nChecks if obj is still active, running or whatever applies to the type of obj.\n\nSupports Task, Process, Future, Channel, Timer, Base.AsyncCondition and may be extended to other object types.\n\nReturns false if isnothing(obj) and true if ismissing(obj).\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.isvalid_pid","page":"API","title":"ParallelProcessingTools.isvalid_pid","text":"isvalid_pid(pid::Int)::Bool\n\nTests if pid is a valid Julia process ID.\n\nEquivalent to pid in Distributed.procs(), but faster.\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.memory_limit","page":"API","title":"ParallelProcessingTools.memory_limit","text":"memory_limit()\n\nGets the virtual memory limit for the current Julia process.\n\nReturns a tuple (soft_limit::Int64, hard_limit::Int64) (in units of bytes). Values of -1 mean unlimited. \n\nnote: Note\nCurrently only works on Linux, simply returns (Int64(-1), Int64(-1)) on other operationg systems.\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.memory_limit!","page":"API","title":"ParallelProcessingTools.memory_limit!","text":"memory_limit!(soft_limit::Integer, hard_limit::Integer = -1)\n\nSets the virtual memory limit for the current Julia process.\n\nsoft_limit and hard_limit are in units of bytes. Values of -1 mean unlimited. hard_limit must not be stricter than soft_limit, and should typically be set to -1.\n\nReturns (soft_limit::Int64, hard_limit::Int64).\n\nnote: Note\nCurrently only has an effect on Linux, does nothing and simply returns (Int64(-1), Int64(-1)) on other operating systems.\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.onlyfirst_exception","page":"API","title":"ParallelProcessingTools.onlyfirst_exception","text":"ParallelProcessingTools.onlyfirst_exception(err)\n\nReplaces CompositeExceptions with their first exception.\n\nAlso employs inner_exception if simplify is true.\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.onworker","page":"API","title":"ParallelProcessingTools.onworker","text":"onworker(\n f::Function, args...;\n pool::AbstractWorkerPool = ppt_worker_pool(),\n maxtime::Real = 0, tries::Integer = 1, label::AbstractString = \"\"\n)\n\nRuns f(args...) on an available worker process from the given pool and returns the result.\n\nIf maxtime > 0, a maximum time for the activity is set. If the activity takes longer than maxtime seconds, the process running it (if not the main process) will be terminated.\n\nlabel is used for debug-logging.\n\nIf a problem occurs (maxtime or worker failure) while running the activity, reschedules the task if the maximum number of tries has not yet been reached, otherwise throws an exception.\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.original_exception","page":"API","title":"ParallelProcessingTools.original_exception","text":"ParallelProcessingTools.original_exception(err)\n\nReplaces (possibly nested) exceptions like a TaskFailedException or RemoteExceptions with the innermost exception, likely to be the one that was thrown originally. Leaves other exceptions unchanged.\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.ppt_cluster_manager","page":"API","title":"ParallelProcessingTools.ppt_cluster_manager","text":"ParallelProcessingTools.ppt_cluster_manager()\nParallelProcessingTools.ppt_cluster_manager(manager::ClusterManager)\n\nGet the default ParallelProcessingTools cluster manager.\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.ppt_cluster_manager!-Tuple{ElasticClusterManager.ElasticManager}","page":"API","title":"ParallelProcessingTools.ppt_cluster_manager!","text":"ParallelProcessingTools.ppt_cluster_manager!(manager::ElasticClusterManager.ElasticManager)\n\nSet the default ParallelProcessingTools cluster manager.\n\n\n\n\n\n","category":"method"},{"location":"api/#ParallelProcessingTools.ppt_worker_pool!-Tuple{FlexWorkerPool}","page":"API","title":"ParallelProcessingTools.ppt_worker_pool!","text":"ppt_worker_pool!(wp::FlexWorkerPool)\n\nSets the default ParallelProcessingTools worker pool to wp and returns it.\n\nSee ppt_worker_pool().\n\n\n\n\n\n","category":"method"},{"location":"api/#ParallelProcessingTools.ppt_worker_pool-Tuple{}","page":"API","title":"ParallelProcessingTools.ppt_worker_pool","text":"ppt_worker_pool()\n\nReturns the default ParallelProcessingTools worker pool.\n\nIf the default instance doesn't exist yet, then a FlexWorkerPool will be created that initially contains Distributed.myid() as the only worker.\n\n\n\n\n\n","category":"method"},{"location":"api/#ParallelProcessingTools.printover-Tuple{Any, Any}","page":"API","title":"ParallelProcessingTools.printover","text":"ParallelProcessingTools.printover(f_show::Function, io::IOBuffer)\n\nRuns f_show(tmpio) with an IO buffer, then clears the required number of lines on io (typically stdout) and prints the output over them.\n\n\n\n\n\n","category":"method"},{"location":"api/#ParallelProcessingTools.proc_management_lock-Tuple{Integer}","page":"API","title":"ParallelProcessingTools.proc_management_lock","text":"ParallelProcessingTools.proc_management_lock(pid::Integer)::ReentrantLock\n\nReturns a process-specific lock. This lock is used to protect operations that concern the management process pid.\n\n\n\n\n\n","category":"method"},{"location":"api/#ParallelProcessingTools.read_files","page":"API","title":"ParallelProcessingTools.read_files","text":"function read_files(\n [f_read, ], filenames::AbstractString...;\n use_cache::Bool = true, cache_dir::AbstractString = default_cache_dir(),\n create_cachedir::Bool = true, delete_tmp_onerror::Bool=true,\n verbose::Bool = false\n)\n\nReads filenames in an atomic fashion (i.e. only if all filenames exist) on a best-effort basis (depending on the OS and file-system used).\n\nIf a reading function f_read is given, calls f_read(filenames...). The return value of f_read is passed through.\n\nIf use_cache is true, then the files are first copied to the cache directory cache_dir under temporary names, and then read via f_read(temporary_filenames...). The temporary files are deleted after f_read exits (except if an exception is thrown during reading and delete_tmp_onerror is set to false).\n\nSet ENV[\"JULIA_DEBUG\"] = \"ParallelProcessingTools\" to see a log of all intermediate steps.\n\nFor example:\n\nwrite(\"foo.txt\", \"Hello\"); write(\"bar.txt\", \"World\")\n\nresult = read_files(\"foo.txt\", \"bar.txt\", use_cache = true) do foo, bar\n read(foo, String) * \" \" * read(bar, String)\nend\n\nIf no reading funcion f_read is given, then read_files returns an object of type FilesToRead that holds the temporary filenames. Closing it will clean up temporary files, like described above. So\n\nftr = read_files(\"foo.txt\", \"bar.txt\"; use_cache = true)\nresult = try\n foo, bar = collect(ftr)\n data_read = read(foo, String) * \" \" * read(bar, String)\n close(ftr)\n data_read\ncatch err\n close(ftr, err)\n rethrow()\nend\n\nis equivalent to the example using read_files(f_read, ...)above.\n\nIf create_cachedir is true, then cache_dir will be created if it doesn't exist yet.\n\nIf verbose is true, uses log-level Logging.Info to log file reading, otherwise Logging.Debug.\n\nOn Linux you can set use_cache = true and cache_dir = \"/dev/shm\" to use the default Linux RAM disk as an intermediate directory.\n\nSee also write_files and ParallelProcessingTools.default_cache_dir.\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.runworkers","page":"API","title":"ParallelProcessingTools.runworkers","text":"runworkers(\n runmode::ParallelProcessingTools.RunProcsMode\n manager::Distributed.AbstractClusterManager = ppt_cluster_manager()\n)\n\nRun Julia worker processes.\n\nBy default ensures that all workers processes use the same Julia project environment as the current process (requires that file systems paths are consistenst across compute hosts).\n\nThe new workers are managed via ppt_cluster_manager() and automatically added to the ppt_worker_pool()\n\nReturns a tuple (task, n). Here, task::Task is done when all workers have terminated. n is either an Integer, if the number of workers that will be started is known, or Nothing, if the number of workers can't be predicted (accurately).\n\nExample:\n\ntask, n = runworkers(OnLocalhost(n = 4))\n\nSee also worker_resources().\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.sleep_ns-Tuple{Integer}","page":"API","title":"ParallelProcessingTools.sleep_ns","text":"sleep_ns(t_in_ns::Real)\n\nSleep for t_in_ns nanoseconds, using a mixture of yield(), sleep(0) and sleep(t) to be able sleep for short times as well as long times with good relative precision.\n\nGuaranteed to yield() at least once, even if t_in_ns is zero.\n\n\n\n\n\n","category":"method"},{"location":"api/#ParallelProcessingTools.split_basename_ext-Tuple{AbstractString}","page":"API","title":"ParallelProcessingTools.split_basename_ext","text":"ParallelProcessingTools.split_basename_ext(file_basename_with_ext::AbstractString)\n\nSplits a filename (given without its directory path) into a basename without file extension and the file extension. Returns a tuple (basename_noext, ext).\n\nExample:\n\nParallelProcessingTools.split_basename_ext(\"myfile.tar.gz\") == (\"myfile\", \".tar.gz\")\n\n\n\n\n\n","category":"method"},{"location":"api/#ParallelProcessingTools.stopworkers","page":"API","title":"ParallelProcessingTools.stopworkers","text":"stopworkers()\nstopworkers(pid::Int)\nstopworkers(pids::AbstractVector{Int})\n\nStops all or the specified worker processes. The current process is ignored.\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.tmp_filename","page":"API","title":"ParallelProcessingTools.tmp_filename","text":"ParallelProcessingTools.tmp_filename(fname::AbstractString)\nParallelProcessingTools.tmp_filename(fname::AbstractString, dir::AbstractString)\n\nReturns a temporary filename, based on fname.\n\nBy default, the temporary filename is in the same directory as fname, otherwise in dir.\n\nDoes not create the temporary file, only returns the filename (including directory path).\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.wait_for_all","page":"API","title":"ParallelProcessingTools.wait_for_all","text":"wait_for_all(\n objs...;\n maxtime::Union{Real,Nothing} = nothing, timeout_error::Bool = false\n)\n\nwait_for_all(objs::Union{Tuple,AbstractVector,Base.Generator,Base.ValueIterator}; kwargs...)\n\nWait for all of the objs to become ready.\n\nReadiness of objects is as defined by wouldwait. Objects that are Nothing are ignored, i.e. not waited for.\n\nSee @wait_while for the effects of maxtime and timeout_error.\n\nExample, wait for two tasks to finish:\n\ntask1 = Threads.@spawn sleep(10)\ntask2 = Threads.@spawn sleep(2)\nwait_for_all(task1, task2)\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.wait_for_any","page":"API","title":"ParallelProcessingTools.wait_for_any","text":"wait_for_any(\n objs...;\n maxtime::Union{Real,Nothing} = nothing, timeout_error::Bool = false\n)\n\nwait_for_all(objs::Union{Tuple,AbstractVector,Base.Generator,Base.ValueIterator}; kwargs...)\n\nWait for any of the objects objs to become ready.\n\nReadiness of objects is as defined by wouldwait. Objects that are Nothing are ignored, i.e. not waited for.\n\nSee @wait_while for the effects of maxtime and timeout_error.\n\nExample, wait for a task with a timeout:\n\ntask1 = Threads.@spawn sleep(1.0)\ntask2 = Threads.@spawn sleep(5.0)\nwait_for_any(task1, task2, maxtime = 3.0)\nistaskdone(task1) == true\nistaskdone(task2) == false\n\nSimilar to waitany (new in Julia v1.12), but applies to a wider range of object types.\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.whyfailed","page":"API","title":"ParallelProcessingTools.whyfailed","text":"ParallelProcessingTools.whyfailed(obj)::Exception\n\nReturns a reason, as an Exception instance, why obj has failed.\n\nSupports Task and Process and may be extended to other object types.\n\nobj must not be nothing or missing.\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.worker_local_startcmd-Tuple{Distributed.ClusterManager}","page":"API","title":"ParallelProcessingTools.worker_local_startcmd","text":"ParallelProcessingTools.worker_local_startcmd(\n manager::Distributed.ClusterManager;\n julia_cmd::Cmd = _default_julia_cmd(),\n julia_flags::Cmd = _default_julia_flags(),\n julia_project::AbstractString = _default_julia_project()\n redirect_output::Bool = true,\n env::AbstractDict{<:AbstractString,<:AbstractString} = ...,\n)::Cmd\n\nReturn the system command required to start a Julia worker process locally on some host, so that it will connect to manager.\n\n\n\n\n\n","category":"method"},{"location":"api/#ParallelProcessingTools.worker_resources-Tuple{}","page":"API","title":"ParallelProcessingTools.worker_resources","text":"worker_resources\n\nGet the distributed Julia worker process resources currently available.\n\nThis may take some time as some code needs to be loaded on all processes. Automatically runs ensure_procinit() before querying worker resources.\n\nNote: CPU ID information will only be available if ThreadPinning is loaded.\n\n\n\n\n\n","category":"method"},{"location":"api/#ParallelProcessingTools.worker_start_command","page":"API","title":"ParallelProcessingTools.worker_start_command","text":"worker_start_command(\n runmode::DynamicAddProcsMode,\n manager::ClusterManager = ParallelProcessingTools.ppt_cluster_manager()\n)::Tuple{Cmd,Integer,Integer}\n\nReturn a tuple (cmd, m, n), with system command cmd that needs to be run m times (in parallel) to start n workers.\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.workpart","page":"API","title":"ParallelProcessingTools.workpart","text":"workpart(data::AbstractArray, workersel::AbstractVector{W}, current_worker::W) where {W}\n\nGet the part of data that the execution unit current_worker is responsible for. Implies a partition of data across the workers listed in workersel.\n\nFor generic data arrays, workpart will return a view. If data is a Range (e.g. indices to be processed), a sub-range will be returned.\n\nType W will typically be Int and workersel will usually be a range/array of thread/process IDs.\n\nNote: workersel is required to be sorted in ascending order and to contain no duplicate entries.\n\nExamples:\n\nusing Distributed, Base.Threads\nA = rand(100)\n# ...\nsub_A = workpart(A, workers(), myid())\n# ...\nidxs = workpart(eachindex(sub_A), allthreads(), threadid())\nfor i in idxs\n # ...\nend\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.wouldwait","page":"API","title":"ParallelProcessingTools.wouldwait","text":"ParallelProcessingTools.wouldwait(obj)::Bool\n\nReturns true if wait(obj) would result in waiting and false if wait(obj) would return (almost) immediately.\n\nSupports Task, Process, Future, Channel, Timer, Base.AsyncCondition and may be extended to other object types.\n\nReturns false if isnothing(obj) but obj must not be missing.\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.write_files","page":"API","title":"ParallelProcessingTools.write_files","text":"function write_files(\n [f_write,] filenames::AbstractString...;\n mode::WriteMode = CreateOrIgnore(),\n use_cache::Bool = false, cache_dir::AbstractString = default_cache_dir(),\n create_dirs::Bool = true, delete_tmp_onerror::Bool=true,\n verbose::Bool = false\n)\n\nWrites to filenames in an atomic fashion, on a best-effort basis (depending on the OS and file-system used).\n\nmode determines how to handle pre-existing files, it may be CreateOrIgnore() (default), CreateNew(), CreateOrReplace(), CreateOrModify() or ModifyExisting().\n\nIf a writing function f_write is given, calls f_create(temporary_filenames...). If f_create doesn't throw an exception, the files temporary_filenames are renamed to filenames, otherwise the temporary files are are either deleted (if delete_tmp_onerror is `true) or left in place (e.g. for debugging purposes).\n\nSet ENV[\"JULIA_DEBUG\"] = \"ParallelProcessingTools\" to see a log of all intermediate steps.\n\nFor example:\n\nwrite_files(\"foo.txt\", \"bar.txt\", use_cache = true) do tmp_foo, tmp_bar\n write(tmp_foo, \"Hello\")\n write(tmp_bar, \"World\")\nend\n\nwrite_files(f_write, filenames...) returns either filenames, if the files were (re-)written or nothing if there was nothing to do (depending on mode).\n\nIf no writing funcion f_write is given then, write_files returns an object of type FilesToWrite that holds the temporary filenames. Closing it will, like above, either rename temporary files to filenames or remove them. So\n\nftw = write_files(\"foo.txt\", \"bar.txt\")\nif !isnothing(ftw)\n try\n foo, bar = ftw\n write(foo, \"Hello\")\n write(bar, \"World\")\n close(ftw)\n catch err\n close(ftw, err)\n rethrow()\n end\nend\n\nis equivalent to the example using write_files(f_write, ...)above.\n\nWhen modifying files, write_files first copies existing files filenames to temporary_filenames and otherwise behaves as described above.\n\nIf use_cache is true, the temporary_filenames are located in cache_dir and then atomically moved to filenames, otherwise they located next to filenames (so in the same directories).\n\nIf create_dirs is true, target and cache directory paths are created if necessary.\n\nIf verbose is true, uses log-level Logging.Info to log file creation, otherwise Logging.Debug.\n\nOn Linux you can set use_cache = true and cache_dir = \"/dev/shm\" to use the default Linux RAM disk as an intermediate directory.\n\nSee also read_files and ParallelProcessingTools.default_cache_dir.\n\n\n\n\n\n","category":"function"},{"location":"api/#ParallelProcessingTools.write_worker_start_script","page":"API","title":"ParallelProcessingTools.write_worker_start_script","text":"write_worker_start_script(\n filename::AbstractString,\n runmode::DynamicAddProcsMode,\n manager::ClusterManager = ParallelProcessingTools.ppt_cluster_manager()\n)\n\nWrites the system command to start worker processes to a shell script.\n\n\n\n\n\n","category":"function"},{"location":"LICENSE/#LICENSE","page":"LICENSE","title":"LICENSE","text":"","category":"section"},{"location":"LICENSE/","page":"LICENSE","title":"LICENSE","text":"using Markdown\nMarkdown.parse_file(joinpath(@__DIR__, \"..\", \"..\", \"LICENSE.md\"))","category":"page"},{"location":"#ParallelProcessingTools.jl","page":"Home","title":"ParallelProcessingTools.jl","text":"","category":"section"},{"location":"","page":"Home","title":"Home","text":"This Julia package provides some tools to ease multithreaded and distributed programming.","category":"page"},{"location":"#Distributed-computing","page":"Home","title":"Distributed computing","text":"","category":"section"},{"location":"","page":"Home","title":"Home","text":"Julia provides native support for distributed computing on multiple Julia processes that run in parallel on the same or on different machines. ParallelProcessingTools add some machinery to make some aspects of this even easier.","category":"page"},{"location":"","page":"Home","title":"Home","text":"An internal elastic cluster manager (ppt_cluster_manager, a modified version of ParallelProcessingTools.ElasticManager), started on demand, allows for starting (runworkers) an stopping (stopworkers) worker processes in a dynamic fashion. The worker processes can also be started outside of the Julia session (worker_start_command and write_worker_start_script), this can be useful to add worker to a running Julia session via manually controlled batch jobs, for example. Workers can be started locally (OnLocalhost), via SLURM (SlurmRun), or via HTCondor (HTCondorRun). Other methods to start workers (e.g. via SSH) may be added in the future (contributions are very welcome).","category":"page"},{"location":"","page":"Home","title":"Home","text":"The elastic cluster manager automatically adds new workers to an automatically created dynamic worker pool (ppt_worker_pool) of type FlexWorkerPool that optionally supports oversubscription. Users can take! workers from the pool and put! them back, or use onworker to send work to workers in the pool without exceeding their maximum occupancy.","category":"page"},{"location":"","page":"Home","title":"Home","text":"Since workers can appear and disappear dynamically, initializing them (loading packages, etc.) via the standard Distributed.@everywhere macro is problematic, as workers added afterwards won't be initialized. Parallel processing tools provides the macro @always_everywhere to run code globally on all current processes, but also store the code so it can be run again on future new worker processes. Workers that are part of a FlexWorkerPool will be updated automatically on take! and onworker. You can also use ensure_procinit to manually update all workers to all @always_everywhere used so far.","category":"page"},{"location":"","page":"Home","title":"Home","text":"AutoThreadPinning, in conjunction with the package ThreadPinning, provides a convenient way to perform automatic thread pinning (e.g. inside of @always_everywhere, to apply thead pinning to all processes). Note that ThreadPinning.pinthreads(AutoThreadPinning()) works on a best-effort basis and that advanced applications may require customized thread pinning for best performance.","category":"page"},{"location":"","page":"Home","title":"Home","text":"Some batch system configurations can result in whole Julia processes, or even a whole batch job, being terminated if a process exceeds its memory limit. In such cases, you can try to gain a softer failure mode by setting a custom (slightly smaller) memory limit using memory_limit!.","category":"page"},{"location":"","page":"Home","title":"Home","text":"For example:","category":"page"},{"location":"","page":"Home","title":"Home","text":"ENV[\"JULIA_DEBUG\"] = \"ParallelProcessingTools\"\nENV[\"JULIA_WORKER_TIMEOUT\"] = \"120\"\n\nusing ParallelProcessingTools, Distributed\n\n@always_everywhere begin\n using ParallelProcessingTools\n using Statistics\n\n import ThreadPinning\n pinthreads_auto()\n\n # Optional: Set a custom memory limit for worker processes:\n # myid() != 1 && memory_limit!(8 * 1000^3) # 8 GB\nend\n\nrunmode = OnLocalhost(n = 4)\n# runmode = lkSlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)\n\ndisplay(worker_start_command(runmode))\n\n# Add some workers and initialize with all `@always_everywhere` code:\nold_nprocs = nprocs()\n_, n = runworkers(runmode)\n@wait_while nprocs() < old_nprocs + n\nensure_procinit()\n\n# Show worker resources:\npool = ppt_worker_pool()\ndisplay(pool)\ndisplay(worker_resources())\n\n# Confirm that Distributions is loaded on a worker:\nworker = last(workers())\n@fetchfrom worker mean(rand(100))\n\n# Some more init code\n@always_everywhere begin\n X = rand(100)\nend\n\n# Add some more workers, we won't run `ensure_procinit()` manually this time:\nold_nprocs = nprocs()\n_, n = runworkers(runmode)\n@wait_while nprocs() < old_nprocs + n\n\n# Worker hasn't run @always_everywhere code yet, so it doesn't have `mean`:\nworker = last(workers())\ndisplay(@return_exceptions @userfriendly_exceptions begin\n @fetchfrom worker mean(X)\nend)\n\n# Using `take!` on a `FlexWorkerPool` automatically runs init code as necessary:\npid = take!(pool)\ntry\n remotecall_fetch(() -> mean(X), pid)\nfinally\n put!(pool, pid)\nend\n\n# `onworker` (using the default `FlexWorkerPool` here) does the same:\nonworker(mean, X)\n\n# If we don't need workers processes for a while, let's stop them:\nstopworkers()","category":"page"},{"location":"","page":"Home","title":"Home","text":"We can also use SLURM batch scripts, like this (e.g. \"batchtest.jl\"):","category":"page"},{"location":"","page":"Home","title":"Home","text":"#!/usr/bin/env julia\n#SBATCH --ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G --time=00:15:00\n\nusing Pkg; pkg\"activate @SOME_JULIA_ENVIRONMENT\"\n\nENV[\"JULIA_DEBUG\"] = \"ParallelProcessingTools\"\nENV[\"JULIA_WORKER_TIMEOUT\"] = \"120\"\n\nusing ParallelProcessingTools, Distributed\n\n@always_everywhere begin\n using ParallelProcessingTools\n import ThreadPinning\n pinthreads_auto()\nend\n\n_, n = runworkers(SlurmRun(slurm_flags = `--cpu-bind=cores --mem-bind=local`))\n@wait_while maxtime=240 nprocs() < n + 1\n\nresources = worker_resources()\ndisplay(resources)\n\nstopworkers()","category":"page"},{"location":"","page":"Home","title":"Home","text":"This should run with a simple","category":"page"},{"location":"","page":"Home","title":"Home","text":"sbatch -o out.txt batchtest.jl","category":"page"},{"location":"","page":"Home","title":"Home","text":"and \"out.txt\" should then contain debugging output and a list of the worker resources.","category":"page"},{"location":"#Multithreading","page":"Home","title":"Multithreading","text":"","category":"section"},{"location":"","page":"Home","title":"Home","text":"To test multithreading performance and help debug and optimize multithreaded code, ParallelProcessingTools provides the utility macros @onthreads to run code explicitly on the selected Julia threads (all threads can be listed using allthreads).","category":"page"},{"location":"","page":"Home","title":"Home","text":"You can use the macro @critical to prevent code that may suffer from race conditions in parallel to other code fenced by @critical.","category":"page"},{"location":"","page":"Home","title":"Home","text":"The macro @mt_out_of_order is useful to run different code on in parallel on Julia threads.","category":"page"},{"location":"#Waiting-and-sleeping","page":"Home","title":"Waiting and sleeping","text":"","category":"section"},{"location":"","page":"Home","title":"Home","text":"In a parallel computing scenario, on threads, distributed processes or both, or when dealing with I/O operations, code often needs to wait. In addition a timeout mechanism is often necessary. Julia's standard wait function can only waits a single object without a timeout. (waitany, requires Julia >= v1.12, can be used to wait for multiple tasks).","category":"page"},{"location":"","page":"Home","title":"Home","text":"ParallelProcessingTools provides a very flexible macro @wait_while to wait for custom conditions with an optional timeout, as well as the functions wait_for_all and wait_for_any that can wait for different kinds of objects, also with an optional timeout.","category":"page"},{"location":"","page":"Home","title":"Home","text":"The functions sleep_ns and idle_sleep can be used to implement custom scenarios that require precise sleeping for both very short and long intervals.","category":"page"},{"location":"#Exception-handling","page":"Home","title":"Exception handling","text":"","category":"section"},{"location":"","page":"Home","title":"Home","text":"Exceptions throws during remote code execution can be complex, nested and sometimes hard to understand. You can use the functions inner_exception, onlyfirst_exception and original_exception to get to the underlying reason of a failure more easily. The macro @userfriendly_exceptions automatizes this to some extent for a given piece of code.","category":"page"},{"location":"","page":"Home","title":"Home","text":"To get an exception \"in hand\" for further analysis, you can use the macro @return_exceptions to make (possibly failing) code return the exceptions instead of throwing it.","category":"page"},{"location":"#File-I/O","page":"Home","title":"File I/O","text":"","category":"section"},{"location":"","page":"Home","title":"Home","text":"File handling can become more challenging when working in a parallel and possibly distributed fashion. Code or whole workers can crash, resulting in corrupt files, or workers may become disconnected, but still write files and clash with restarted code (resulting in race conditions and may also result in corrupt files).","category":"page"},{"location":"","page":"Home","title":"Home","text":"ParallelProcessingTools provides the functions write_files and read_files to implement atomic file operations, on a best-effort basis (depending on the operating system and underlying file systems).","category":"page"}]
}