concurrency

concurrency primitives

async vs concurrent

two task-spawning mechanisms with different semantics:

io.async — "run this, I'll need the result"

const future = io.async(someFunction, .{args});
// ... do other work ...
const result = future.await(io);
  • infallible (always succeeds in spawning)
  • under Threaded: bounded pool (async_limit, default = CPU count - 1). overflow runs task inline on caller's thread.
  • under Evented: fiber (cheap)
  • use when you want asynchrony but can tolerate inline fallback

io.concurrent — "run this independently"

const future = io.concurrent(someFunction, .{args}) catch |err| {
    // error.ConcurrencyUnavailable when limit reached
};
  • fallible — returns ConcurrentError!Future(R)
  • under Threaded: unbounded by default (concurrent_limit = .unlimited). overflow returns error.ConcurrencyUnavailable.
  • under Evented: fiber (cheap)
  • use when correctness requires actual parallelism (e.g., long-lived I/O loops)

summary

io.async() io.concurrent()
Threaded bounded pool, overflow runs inline unbounded (default), overflow is error
Evented fiber fiber
failure mode never fails, degrades to inline returns error
semantic intent "can happen in parallel" "must happen in parallel"

Future

returned by both io.async() and io.concurrent():

const Io = std.Io;
var future = try io.concurrent(worker, .{io, &state});

await

const result = future.await(io);

blocks until the task completes. idempotent. NOT threadsafe — only call from the parent task.

cancel

const result = future.cancel(io);

equivalent to await but sends a cancellation request first. the task receives error.Canceled from its next cancellation point (any Io function that returns Cancelable!T).

defer pattern (always do this)

var task = try io.concurrent(worker, .{io, &state});
defer _ = task.cancel(io);  // safe even if await already ran — idempotent

// ... work that might fail with try ...

_ = task.await(io);

if you try something before await, the future leaks without the defer.

Group — unordered task set

an unordered set of tasks awaited or canceled as a whole:

var group: Io.Group = .init;

// spawn tasks into the group (no individual Future returned)
group.concurrent(io, handleSubscriber, .{sub1}) catch {};
group.concurrent(io, handleSubscriber, .{sub2}) catch {};
group.concurrent(io, handleSubscriber, .{sub3}) catch {};

// wait for all
group.await(io) catch {};

// or cancel all
group.cancel(io);

key properties from the docs:

The resources associated with each task are guaranteed to be released when the individual task returns, as opposed to when the whole group completes.

this means it's safe to have a long-lived group with tasks dynamically added and removed. individual task cleanup happens immediately on return — you don't need to await the group just to free one task's resources.

  • group.async(io, fn, args) — like io.async but owned by group
  • group.concurrent(io, fn, args) — like io.concurrent but owned by group
  • group.await(io) — blocks until all tasks finish. cancellation propagates to all members.
  • group.cancel(io) — immediately cancels all members and waits

use cases: managing a dynamic set of subscribers, worker pools where you don't need individual results.

Select — typed fan-in

Select(U) where U is a tagged union. each field corresponds to a task type:

const Result = union(enum) {
    connection: net.Stream,
    timeout: void,
};

var select = Io.Select(Result).init(io, &result_buffer);

select.concurrent(.connection, connectToHost, .{io, host}) catch {};
select.async(.timeout, timeoutAfter, .{io, 5_000_000_000});

// blocks until first task completes, returns tagged result
const result = try select.await();
switch (result) {
    .connection => |stream| { ... },
    .timeout => { ... },
}

// cleanup remaining tasks
select.cancelDiscard();
  • select.async(field, fn, args) — spawn async task tagged with union field
  • select.concurrent(field, fn, args) — spawn concurrent task tagged with union field
  • select.await() — blocks until first task completes, returns tagged union
  • select.awaitMany(buffer, min) — blocks until at least min results
  • select.cancel() — cancel all remaining, return last result if any
  • select.cancelDiscard() — cancel all remaining, discard results

use cases: "first of N" patterns, connect-with-timeout, race between alternatives.

Queue — bounded MPMC channel

many-producer, many-consumer, thread-safe, bounded:

var buf: [16]Item = undefined;
var queue: Io.Queue(Item) = .init(&buf);

// producer (blocks when full):
try queue.putOne(io, item);

// consumer (blocks when empty):
const item = try queue.getOne(io);

// shutdown — wakes all waiters:
queue.close(io);  // subsequent ops return error.Closed
  • putOne / getOne — single element, blocking
  • put(elements, min) / get(buffer, min) — batch, blocks until at least min transferred
  • putAll / getUncancelable etc. — variants for different cancellation semantics
  • all blocking operations are cancellation points

from the devlog DNS resolver example: io.async(connectMany, .{...}) produces results into a Queue, consumer pulls from queue in a loop. the queue provides natural backpressure.

use cases: producer-consumer pipelines, fan-out/fan-in, bounded work queues.