ThreadPools.jl

Improved thread management for background and nonuniform tasks

Overview

Documentation at https://tro3.github.io/ThreadPools.jl

ThreadPools.jl is a simple package that exposes a few macros and functions that mimic Base.Threads.@threads, Base.map, and Base.foreach. These macros (and the underlying API) handle cases that the built-in functions are not always well-suited for:

  • A group of tasks that the user wants to keep off of the primary thread
  • A group of tasks that are very nonuniform in duration

For the first case, ThreadPools exposes a @bthreads ("background threads") macro that behaves identically to Threads.@threads, but keeps the primary thread job-free. There are also related bmap and bforeach functions that mimic their Base counterparts, but with the same non-primary thread usage.

For the second case, the package exposes a @qthreads ("queued threads") macro. This macro uses a different scheduling strategy to help with nonuniform jobs. @threads and @bthreads first divide the incoming job list into equal job "chunks", then launch each chunk on a separate thread for processing. If the jobs are not uniform, this can lead to some long jobs all getting assigned to one thread, delaying completion. @qthreads does not pre-assign threads - it only starts a new job as an old one finishes, so if a long job comes along, the other threads will keep operating on the shorter ones. @qthreads itself does use the primary thread, but its cousin @qbthreads uses the same strategy but in the background. There are also qmap, qforeach, qbmap, and qbforeach.

The package also exposes a lower-level @tspawnat macro that mimics the Base.Threads.@spawn macro but allows direct thread assignment for users who want to develop their own scheduling and a spawnbg function that will spawn a function onto an available background thread.

Simple Macro/Function Selection

Task TypeForeground (primary allowed)Background (primary forbidden)
Uniform tasksBase.Threads.@threads ThreadPools.tmap(fn, itrs) ThreadPools.tforeach(fn, itrs)ThreadPools.@bthreads ThreadPools.bmap(fn, itrs) ThreadPools.bforeach(fn, itrs)
Nonuniform tasksThreadPools.@qthreads ThreadPools.qmap(fn, itrs) ThreadPools.qforeach(fn, itrs)ThreadPools.@qbthreads ThreadPools.qbmap(fn, itrs) ThreadPools.qbforeach(fn, itrs)

Job Logging for Performance Tuning

Each of the above macros comes with a logging version that allows the user to analyze the performance of the chosen strategy and thread count:

Task TypeForegroundBackground
Uniform tasksThreadPools.@logthreads ThreadPools.logtmap(fn, itrs) ThreadPools.logtforeach(fn, itrs)ThreadPools.@logbthreads ThreadPools.logbmap(fn, itrs) ThreadPools.logbforeach(fn, itrs)
Nonuniform tasksThreadPools.@logqthreads ThreadPools.logqmap(fn, itrs) ThreadPools.logqforeach(fn, itrs)ThreadPools.@logqbthreads ThreadPools.logqbmap(fn, itrs) ThreadPools.logqbforeach(fn, itrs)

Please see below for usage examples.

Usage

Each of the simple API functions can be used like the Base versions of the same function:

julia> @qbthreads for x in 1:3
         println("$x $(Threads.threadid())")
       end
2 3
3 4
1 2

julia> bmap([1,2,3]) do x
         println("$x $(Threads.threadid())")
         x^2
       end
2 3
3 4
1 2
3-element Array{Int64,1}:
 1
 4
 9

julia> t = @tspawnat 4 Threads.threadid()
Task (runnable) @0x0000000010743c70

julia> fetch(t)
4

Note that both of the above examples use the background versions and no threadid==1 is seen. Also note that while the execution order is not guaranteed across threads, but the result of bmap will of course match the input.

Logger Usage

The logging versions of the above functions all produce an AbstractThreadPool object that has an in-memory log of the start and stop times of each job that ran through the pool. A PlotRecipe from RecipesBase is exposed in the package, so all that is needed to generate a visualization of the job times is the plot command from Plots. In these plots, each job is shown by index, start time, and stop time and is given a color corresponding to its thread:

julia> using Plots

julia> pool = logtforeach(x -> sleep(0.1*x), 1:8);

julia> plot(pool)

tforeach plot

julia> pool = logqforeach(x -> sleep(0.1*x), 1:8);

julia> plot(pool)

qforeach plot

Note the two different scheduling strategies are seen in the above plots. The tforeach log shows that the jobs were assigned in order: 1 & 2 to thread 1, 3 & 4 to thread 2, and so on. The qforeach shows that each job (any thread) is started when the previous job on that thread completes. Because these jobs are very nonuniform (and stacked against the first strategy), this results in the pre-assign method taking 25% longer.

Simple API

Each function of the simple API tries to mimic an existing function in Base or Base.Threads to keep any code rework to a minimum.

Regular Versions

Logging Versions

ThreadPools.@bthreadsMacro
@bthreads

Mimics Base.Threads.@threads, but keeps the iterated tasks off if the primary thread.

Example

julia> @bthreads for x in 1:8
         println((x, Threads.threadid()))
       end
(1, 2)
(6, 4)
(3, 3)
(7, 4)
(4, 3)
(8, 4)
(5, 3)
(2, 2)

Note that execution order is not guaranteed, but the primary thread does not show up on any of the jobs.

source
ThreadPools.@qthreadsMacro
@qthreads

Mimics Base.Threads.@threads, but uses a task queueing strategy, only starting a new task when an previous one (on any thread) has completed. This can provide performance advantages when the iterated tasks are very nonuniform in length. The primary thread is used. To prevent usage of the primary thread, see @qbthreads.

Example

julia> @qthreads for x in 1:8
         println((x, Threads.threadid()))
       end
(2, 4)
(3, 3)
(4, 2)
(5, 4)
(6, 3)
(7, 2)
(8, 4)
(1, 1)

Note that execution order is not guaranteed and the primary thread is used.

source
ThreadPools.@qbthreadsMacro
@qbthreads

Mimics Base.Threads.@threads, but uses a task queueing strategy, only starting a new task when an previous one (on any thread) has completed. This can provide performance advantages when the iterated tasks are very nonuniform in length. The primary thread is not used. To allow usage of the primary thread, see @qthreads.

Example

julia> @qbthreads for x in 1:8
         println((x, Threads.threadid()))
       end
(2, 4)
(3, 2)
(1, 3)
(4, 4)
(5, 2)
(6, 3)
(7, 4)
(8, 2)

Note that execution order is not guaranteed, but the primary thread does not show up on any of the jobs.

source
ThreadPools.tmapMethod
tmap(fn, itrs...) -> collection

Mimics Base.map, but launches the function evaluations onto all available threads, using a pre-assigned scheduling strategy appropriate for uniform task durations.

Example

julia> tmap(x -> begin; println((x,Threads.threadid())); x^2; end, 1:8)'
(7, 4)
(5, 3)
(8, 4)
(1, 1)
(6, 3)
(2, 1)
(3, 2)
(4, 2)
1×8 LinearAlgebra.Adjoint{Int64,Array{Int64,1}}:
 1  4  9  16  25  36  49  64

Note that while the execution order is not guaranteed, the result order is. Also note that the primary thread is used.

source
ThreadPools.bmapMethod
bmap(fn, itrs...) -> collection

Mimics Base.map, but launches the function evaluations onto all available threads except the primary, using a pre-assigned scheduling strategy appropriate for uniform task durations.

Example

julia> bmap(x -> begin; println((x,Threads.threadid())); x^2; end, 1:8)'
(6, 4)
(1, 2)
(3, 3)
(2, 2)
(4, 3)
(7, 4)
(5, 3)
(8, 4)
1×8 LinearAlgebra.Adjoint{Int64,Array{Int64,1}}:
 1  4  9  16  25  36  49  64

Note that while the execution order is not guaranteed, the result order is, Also note that the primary thread is not used.

source
ThreadPools.qmapMethod
qmap(fn, itrs...) -> collection

Mimics Base.map, but launches the function evaluations onto all available threads, using a queued scheduling strategy appropriate for nonuniform task durations.

Example

julia> qmap(x -> begin; println((x,Threads.threadid())); x^2; end, 1:8)'
(2, 3)
(3, 2)
(4, 4)
(5, 3)
(6, 2)
(7, 4)
(8, 3)
(1, 1)
1×8 LinearAlgebra.Adjoint{Int64,Array{Int64,1}}:
 1  4  9  16  25  36  49  64

Note that while the execution order is not guaranteed, the result order is. Also note that the primary thread is used.

source
ThreadPools.qbmapMethod
qbmap(fn, itrs...) -> collection

Mimics Base.map, but launches the function evaluations onto all available threads except the primary, using a queued scheduling strategy appropriate for nonuniform task durations.

Example

julia> qbmap(x -> begin; println((x,Threads.threadid())); x^2; end, 1:8)'
(2, 3)
(1, 2)
(3, 4)
(5, 2)
(4, 3)
(6, 4)
(7, 2)
(8, 3)
1×8 LinearAlgebra.Adjoint{Int64,Array{Int64,1}}:
 1  4  9  16  25  36  49  64

Note that while the execution order is not guaranteed, the result order is, Also note that the primary thread is not used.

source
ThreadPools.tforeachMethod
tforeach(fn, itrs...)

Mimics Base.foreach, but launches the function evaluations onto all available threads, using a pre-assigned scheduling strategy appropriate for uniform task durations.

Example

julia> tforeach(x -> println((x,Threads.threadid())), 1:8)
(1, 1)
(3, 2)
(5, 3)
(2, 1)
(7, 4)
(4, 2)
(8, 4)
(6, 3)

Note that the execution order is not guaranteed, and that the primary thread is used.

source
ThreadPools.bforeachMethod
bforeach(fn, itrs...)

Mimics Base.foreach, but launches the function evaluations onto all available threads except the primary, using a pre-assigned scheduling strategy appropriate for uniform task durations.

Example

julia> bforeach(x -> println((x,Threads.threadid())), 1:8)
(1, 2)
(6, 4)
(2, 2)
(7, 4)
(8, 4)
(3, 3)
(4, 3)
(5, 3)

Note that the execution order is not guaranteed, and that the primary thread is not used.

source
ThreadPools.qforeachMethod
qforeach(fn, itrs...)

Mimics Base.foreach, but launches the function evaluations onto all available threads, using a queued scheduling strategy appropriate for nonuniform task durations.

Example

julia> qforeach(x -> println((x,Threads.threadid())), 1:8)
(4, 3)
(2, 2)
(3, 4)
(5, 3)
(6, 2)
(7, 4)
(8, 3)
(1, 1)

Note that the execution order is not guaranteed, and that the primary thread is used.

source
ThreadPools.qbforeachMethod
qbforeach(fn, itrs...)

Mimics Base.foreach, but launches the function evaluations onto all available threads except the primary, using a queued scheduling strategy appropriate for nonuniform task durations.

Example

julia> qbforeach(x -> println((x,Threads.threadid())), 1:8)
(3, 3)
(2, 4)
(1, 2)
(4, 3)
(5, 4)
(6, 2)
(7, 3)
(8, 4)

Note that the execution order is not guaranteed, and that the primary thread is not used.

source
ThreadPools.@logthreadsMacro
@logthreads -> pool

Mimics Base.Threads.@threads. Returns a logged pool that can be analyzed with the logging functions and plotted.

Example

julia> pool = @logthreads for x in 1:8
         println((x, Threads.threadid()))
       end;
(1, 1)
(5, 3)
(7, 4)
(2, 1)
(6, 3)
(8, 4)
(3, 2)
(4, 2)

julia> plot(pool)

Note that execution order is not guaranteed and the primary thread is used.

source
ThreadPools.@logbthreadsMacro
@logbthreads -> pool

Mimics Base.Threads.@threads, but keeps the iterated tasks off if the primary thread. Returns a logged pool that can be analyzed with the logging functions and plotted.

Example

julia> pool = @logbthreads for x in 1:8
         println((x, Threads.threadid()))
       end;
(3, 4)
(2, 3)
(1, 2)
(4, 4)
(5, 3)
(6, 2)
(8, 3)
(7, 4)

julia> plot(pool)

Note that execution order is not guaranteed, but the primary thread does not show up on any of the jobs.

source
ThreadPools.@logqthreadsMacro
@logqthreads -> pool

Mimics Base.Threads.@threads, but uses a task queueing strategy, only starting a new task when an previous one (on any thread) has completed. Returns a logged pool that can be analyzed with the logging functions and plotted. The primary thread is used. To prevent usage of the primary thread, see @logqbthreads.

Example

julia> pool = @logqthreads for x in 1:8
         println((x, Threads.threadid()))
       end;
(1, 1)
(3, 2)
(7, 4)
(5, 3)
(2, 1)
(8, 4)
(6, 3)
(4, 2)

julia> plot(pool)

Note that execution order is not guaranteed and the primary thread is used.

source
ThreadPools.@logqbthreadsMacro
@logqbthreads -> pool

Mimics Base.Threads.@threads, but uses a task queueing strategy, only starting a new task when an previous one (on any thread) has completed. Returns a logged pool that can be analyzed with the logging functions and plotted. The primary thread is not used. To allow usage of the primary thread, see @logqthreads.

Example

julia> pool = @logqbthreads for x in 1:8
         println((x, Threads.threadid()))
       end;
(2, 3)
(1, 4)
(3, 2)
(4, 3)
(5, 4)
(6, 2)
(7, 3)
(8, 4)

julia> plot(pool)

Note that execution order is not guaranteed, but the primary thread does not show up on any of the jobs.

source
ThreadPools.logtmapMethod
logtmap(fn, itrs...) -> (pool, collection)

Mimics Base.map, but launches the function evaluations onto all available threads, using a pre-assigned scheduling strategy appropriate for uniform task durations. Also returns a logged pool that can be analyzed with the logging functions and plotted.

Example

julia> (pool, result) = logtmap(1:8) do x
         println((x,Threads.threadid()))
         x^2
       end;
(1, 1)
(3, 2)
(7, 4)
(5, 3)
(8, 4)
(4, 2)
(2, 1)
(6, 3)

julia> result'
1×8 LinearAlgebra.Adjoint{Int64,Array{Int64,1}}:
1  4  9  16  25  36  49  64

julia> plot(pool)

Note that while the execution order is not guaranteed, the result order is. Also note that the primary thread is used.

source
ThreadPools.logbmapMethod
logbmap(fn, itrs...) -> (pool, collection)

Mimics Base.map, but launches the function evaluations onto all available threads except the primary, using a pre-assigned scheduling strategy appropriate for uniform task durations. Also returns a logged pool that can be analyzed with the logging functions and plotted.

Example

julia> (pool, result) = logbmap(1:8) do x
         println((x,Threads.threadid()))
         x^2
       end;
(1, 2)
(6, 4)
(3, 3)
(7, 4)
(2, 2)
(4, 3)
(8, 4)
(5, 3)

julia> result'
1×8 LinearAlgebra.Adjoint{Int64,Array{Int64,1}}:
1  4  9  16  25  36  49  64

julia> plot(pool)

Note that while the execution order is not guaranteed, the result order is, Also note that the primary thread is not used.

source
ThreadPools.logqmapMethod
logqmap(fn, itrs...) -> (pool, collection)

Mimics Base.map, but launches the function evaluations onto all available threads, using a queued scheduling strategy appropriate for nonuniform task durations. Also returns a logged pool that can be analyzed with the logging functions and plotted.

Example

julia> (pool, result) = logqmap(1:8) do x
         println((x,Threads.threadid()))
         x^2
        end;
(3, 3)
(4, 4)
(2, 2)
(5, 3)
(7, 2)
(6, 4)
(8, 3)
(1, 1)

julia> result'
1×8 LinearAlgebra.Adjoint{Int64,Array{Int64,1}}:
1  4  9  16  25  36  49  64

julia> plot(pool)

Note that while the execution order is not guaranteed, the result order is. Also note that the primary thread is used.

source
ThreadPools.logqbmapMethod
logqbmap(fn, itrs...) -> (pool, collection)

Mimics Base.map, but launches the function evaluations onto all available threads except the primary, using a queued scheduling strategy appropriate for nonuniform task durations. Also returns a logged pool that can be analyzed with the logging functions and plotted.

Example

julia> (pool, result) = logqbmap(1:8) do x
         println((x,Threads.threadid()))
         x^2
       end;
(3, 3)
(2, 4)
(1, 2)
(4, 3)
(5, 4)
(6, 2)
(7, 3)
(8, 4)

julia> result'
1×8 LinearAlgebra.Adjoint{Int64,Array{Int64,1}}:
1  4  9  16  25  36  49  64

julia> plot(pool)

Note that while the execution order is not guaranteed, the result order is, Also note that the primary thread is not used.

source
ThreadPools.logtforeachMethod
logtforeach(fn, itrs...) -> pool

Mimics Base.foreach, but launches the function evaluations onto all available threads, using a pre-assigned scheduling strategy appropriate for uniform task durations. Returns a logged pool that can be analyzed with the logging functions and plotted.

Example

julia> pool = logtforeach(x -> println((x,Threads.threadid())), 1:8);
(1, 1)
(3, 2)
(7, 4)
(2, 1)
(4, 2)
(5, 3)
(8, 4)
(6, 3)

julia> plot(pool)

Note that the execution order is not guaranteed, and that the primary thread is used.

source
ThreadPools.logbforeachMethod
logbforeach(fn, itrs...)

Mimics Base.foreach, but launches the function evaluations onto all available threads except the primary, using a pre-assigned scheduling strategy appropriate for uniform task durations. Returns a logged pool that can be analyzed with the logging functions and plotted.

Example

julia> pool = logbforeach(x -> println((x,Threads.threadid())), 1:8);
(1, 2)
(3, 3)
(6, 4)
(4, 3)
(2, 2)
(7, 4)
(5, 3)
(8, 4)

julia> plot(pool)

Note that the execution order is not guaranteed, and that the primary thread is not used.

source
ThreadPools.logqforeachMethod
logqforeach(fn, itrs...)

Mimics Base.foreach, but launches the function evaluations onto all available threads, using a queued scheduling strategy appropriate for nonuniform task durations.

Example

julia> pool = logqforeach(x -> println((x,Threads.threadid())), 1:8);
(2, 4)
(3, 3)
(4, 2)
(5, 4)
(6, 3)
(7, 2)
(8, 4)
(1, 1)

julia> plot(pool)

Note that the execution order is not guaranteed, and that the primary thread is used. Returns a logged pool that can be analyzed with the logging functions and plotted.

source
ThreadPools.logqbforeachMethod
logqbforeach(fn, itrs...)

Mimics Base.foreach, but launches the function evaluations onto all available threads except the primary, using a queued scheduling strategy appropriate for nonuniform task durations. Returns a logged pool that can be analyzed with the logging functions and plotted.

Example

julia> pool = logqbforeach(x -> println((x,Threads.threadid())), 1:8);
(2, 2)
(1, 3)
(3, 4)
(4, 2)
(5, 3)
(6, 4)
(7, 2)
(8, 3)

julia> plot(pool)

Note that the execution order is not guaranteed, and that the primary thread is not used.

source
ThreadPools.@tspawnatMacro
@tspawnat tid -> task

Mimics Base.Threads.@spawn, but assigns the task to thread tid.

Example

julia> t = @tspawnat 4 Threads.threadid()
Task (runnable) @0x0000000010743c70

julia> fetch(t)
4
source
ThreadPools.spawnbgMethod
spawnbg(f)

Spawn work on any available background thread. Captures any exception thrown in the thread, to give better stacktraces.

You can use checked_fetch(spawnbg(f)) to rethrow any exception.

** Warning ** this doesn't compose with other ways of scheduling threads
So, one should use `spawn_background` exclusively in each Julia process.
source

Composable API

Functions

The above macros invoke two base structures, ThreadPools.StaticPool and ThreadPools.QueuePool, each of which can be assigned to a subset of the available threads. This allows for composition with the twith and @tthreads commands, and usage in more complex scenarios, such as stack processing.

ThreadPools.twithMethod
twith(fn, pool) -> pool

Apply the functon fn to the provided pool and close the pool. Returns the closed pool for any desired analysis or plotting.

Example

julia> twith(ThreadPools.QueuePool(1,2)) do pool
         tforeach(x -> println((x,Threads.threadid())), pool, 1:8)
       end;
(2, 2)
(1, 1)
(3, 2)
(4, 1)
(5, 2)
(6, 1)
(7, 2)
(8, 1)

Note in the above example, only two threads were used, as set by the QueuePool setting.

source
ThreadPools.@tthreadsMacro
@tthreads pool

Mimic the Base.Threads.@threads macro, but uses the provided pool to assign the tasks.

Example

julia> twith(ThreadPools.QueuePool(1,2)) do pool
         @tthreads pool for x in 1:8
           println((x,Threads.threadid()))
         end
       end;
(2, 2)
(3, 2)
(1, 1)
(4, 2)
(5, 1)
(6, 2)
(8, 2)
(7, 1)
source
ThreadPools.tmapMethod
tmap(fn, pool, itr)

Mimics Base.map, but launches the function evaluations onto the provided pool to assign the tasks.

Example

julia> pool = twith(ThreadPools.LoggedQueuePool(1,2)) do pool
         tmap(pool, 1:8) do x
           println((x,Threads.threadid()))
         end
       end;
(2, 2)
(1, 1)
(3, 2)
(4, 1)
(5, 2)
(6, 1)
(7, 2)
(8, 1)

julia> plot(pool)
source
ThreadPools.tforeachMethod
tforeach(fn, pool, itr)

Mimics Base.foreach, but launches the function evaluations onto the provided pool to assign the tasks.

Example

julia> pool = twith(ThreadPools.LoggedQueuePool(1,2)) do pool
         tforeach(x -> println((x,Threads.threadid())), pool, 1:8)
       end;
(2, 2)
(1, 1)
(3, 2)
(5, 2)
(4, 1)
(6, 2)
(7, 1)
(8, 2)

julia> plot(pool)
source

AbstractThreadPool

Base.closeMethod
Base.close(pool::AbstractThreadPool)

Closes the pool, shuts down any handlers and finalizes any logging activities.

source

StaticPools

QueuePools

ThreadPools.QueuePoolMethod
QueuePool(init_thrd=1, nthrds=Threads.nthreads())

The main QueuePool object. Its API mimics that of a Channel{Task}, but each submitted task is executed on a different thread. If allow_primary is true, the assigned thread might be the primary, which will interfere with future thread management for the duration of any heavy-computational (blocking) processes. If it is false, all assigned threads will be off of the primary. Each thread will only be allowed one Task at a time, but each thread will backfill with the next queued Task immediately on completion of the previous, without regard to how bust the other threads may be.

source
ThreadPools.LoggedQueuePoolMethod
LoggedQueuePool(init_thrd=1, nthrds=Threads.nthreads())

The main LoggedQueuePool object. Its API mimics that of a Channel{Task}, but each submitted task is executed on a different thread. If allow_primary is true, the assigned thread might be the primary, which will interfere with future thread management for the duration of any heavy-computational (blocking) processes. If it is false, all assigned threads will be off of the primary. Each thread will only be allowed one Task at a time, but each thread will backfill with the next queued Task immediately on completion of the previous, without regard to how bust the other threads may be.

The API for the LoggedQueuePool is Identical to that for QueuePool.

source
Base.put!Method
Base.put!(pool::QueuePool, t::Task)

Put the task t into the pool, blocking until the pool has an available thread.

source
Base.put!Method
Base.put!(pool::QueuePool, fn, args...)
Base.put!(fn, pool::QueuePool, args...)

Creates a task that runs fn(args...) and adds it to the pool, blocking until the pool has an available thread.

source
Base.take!Method
Base.take!(pool::QueuePool) -> Task

Takes the next available completed task from the pool, blocking until a task is available.

source
Base.iterateMethod
Base.iterate(pool::QueuePool[, state])

Iterates over the completed Tasks, grabbing the next one available and ending when the pool has been closeed.

source
ThreadPools.poolresultsMethod
poolresults(pool::QueuePool) -> result iterator

Returns an iterator over the fetched results of the pooled tasks.

Example

julia> pool = QueuePool();

julia> @async begin
         for i in 1:4
           put!(pool, x -> 2*x, i)
         end
         close(pool)
       end;

julia> for r in poolresults(pool)
         println(r)
       end
6
2
4
8

Note that the execution order across the threads is not guaranteed.

source
ThreadPools.isactiveMethod
ThreadPools.isactive(pool::QueuePool)

Returns true if there are queued Tasks anywhere in the pool, either awaiting execution, executing, or waiting to be retrieved.

source