Discuss this article on r/rust or on Hacker News.
Low-level or systems programming languages generally strive to provide libraries and interfaces that enable developers, boost productivity, enhance safety, provide resistance to misuse, and more — all while trying to reduce the runtime cost of such initiatives. Strong type systems turn runtime safety/sanity checks into compile-time errors, optimizing compilers try to reduce an enforced sequence of api calls into a single instruction, and library developers think up of clever hacks to even completely erase any trace of an abstraction from the resulting binaries. And as anyone that’s familiar with them can tell you, the rust programming language and its developers/community have truly embraced this ethos of zero-cost abstractions, perhaps more so than any others.
I’m not going to go into detail about what the rust language and standard library do to enable zero-cost abstractions or spend a lot of time going over some of the many examples of zero-cost interfaces available to rust programmers, though I’ll just quickly mention a few of my favorites: iterators and all the methods the Iterator
trait exposes have to be at the top of every list given the amount of black magic voodoo the compiler has to do to turn these into their loop-based equivalents, zero-sized types make developing embedded firmware in rust a dream and it’s really crazy to see how all the various peripheral abstractions can be completely erased giving you small firmware blobs despite all the safety abstractions, and no list is complete the newest member of the team, async
/await
and how rust manages to turn an entire web server api into a single state machine and event loop. (And to think this can be used even on embedded without a relatively heavy async framework like tokio and with even zero allocations to boot!)
But the tricky thing with abstractions is that the relative price you pay scales rather unfairly with the size of the interface you are abstracting over. While a byte here and a byte there may mean nothing when we’re talking framework-scale interfaces, when you are modeling smaller and finer-grained abstractions, every byte and every instruction begin to count.
A couple of weeks ago, we released an update to rsevents
, our crate that contains a rusty cross-platform equivalent to WIN32 events for signaling between threads and writing your own synchronization primitives, and rsevents-extra
a companion crate that provides a few handy synchronization types built on top of the manual- and auto-reset events from the rsevents
crate. Aside from the usual awesome helpings of performance improvements, ergonomics enhancements, and more, this latest version of rsevents-extra
includes a Semaphore
synchronization primitive – something that the rust standard library surprisingly lacks… but not without good reason.
What makes a semaphore a semaphore?
Semaphores are well-documented and fairly well-understood underpinnings of any concurrency library or framework and essential Computer Science knowledge. So why doesn’t the rust standard library have a semaphore type?
Unlike the synchronization types that the rust standard library currently provides (such as Mutex<T>
and RwLock<T>
, a semaphore is somewhat harder to model as it doesn’t so much restrict concurrent access to a single object or variable so much as it does limit concurrency within a region of code.
Of course it can be argued that in traditional programming a semaphore is just a more general case of a mutex, and just like mutexes traditionally protected a region of code from concurrent access1 but were converted into synchronization primitives owning the data they protect and marshalling access to it, there’s no reason a rust semaphore couldn’t do the same. But therein lies the problem: a mutex and a read-write lock can both be understood in terms of readers and writers,2 a semaphore makes no such guarantees. And rust is quite fundamentally built on the concept of read ^ write: it needs to know if a thread/scope is reading or writing from a variable or memory location in order to uphold its most basic memory safety guarantee: there can either be multiple “live” read-only references to an object or a single write-enabled (&mut
) reference to the same — but a semaphore doesn’t make that distinction!
While a strictly binary semaphore (max concurrency == 1) can guarantee that there will never be multiple writers accessing a memory region, there’s not much theoretical benefit to such a binary semaphore over a mutex – in fact, they’re interchangeable. What makes a semaphore truly special is that it can be created (or even dynamically modified) with a concurrency limit n and then uphold its core precondition, guaranteeing that at any given time there will never be more than n threads/stacks3 accessing a semaphore-protected region at any given time.
The problem is that with n > 1, there’s no concept of a “privileged” owning thread and all threads that have “obtained” the semaphore do so equally. Therefore, a rust semaphore can only ever provide read-only (&T
) access to an underlying resource, limiting the usefulness of such a semaphore almost to the point of having no utility. As such, the only safe “owning” semaphore with read-write access that can exist in the rust world would be Semaphore<()>
,4 or one that actually owns no data and can only be used for its side effect of limiting concurrency while the semaphore is “owned,” so to speak.5 (Actual mutation of accessed resources within the concurrency-limited region, if needed, would continue to be marshalled via Mutex<T>
or RwLock<T>
on a fine-grained level.)
Ok, so this explains why the rust standard library doesn’t contain a Semaphore<T>
type to mirror Mutex<T>
and its friends, but then what’s so hard about shipping a non-owning std::sync::Semaphore
instead?
Designing a safe Semaphore
for rust
To answer this, we need to look at what a semaphore API generally looks like in other languages. While the names and calling semantics differ, a semaphore is generally found as a type that provides the following, starting with the most fundamental to de facto properties:
- It is a type that can be used to limit concurrency to a resource or region of code, up to a dev-defined limit n.
- It is a type that has a concept of “currently available concurrency,” which represents and tracks the remaining number of threads/stacks that can “obtain” the semaphore, thereby reducing its available concurrency and generally giving the calling thread access to the concurrency-limited region,
- A semaphore can be created/declared with an “initially available concurrency” and a “maximum possible concurrency,” which may differ (indeed, “initially available concurrency” is often zero),
- Semaphores don’t generally have a concept of ownership, meaning a thread (or any thread) can increment (up to the pre-defined limit) or decrement (down to zero) the available concurrency for a semaphore without having “obtained” or “created” it. (This is necessary otherwise it’d be impossible to initialize a semaphore with a lower initial concurrency limit than its maximum, because no thread could then increase it.)
It’s the last of these points that makes a semaphore so tricky to model in any language that prides itself on safety. While a semaphore that acts strictly as a variable-occupancy mutex (i.e. initial concurrency equals the max concurrency and each time it is obtained, it must be subsequently released by the same thread that obtained it), that’s not generally a requirement that semaphores impose, and such a requirement would considerably limit the utility that a semaphore could offer.
Let’s look at some ways we might design such a semaphore in rust, some of which we actually tried while prototyping rsevents_extra::Semaphore
.
Before anything else, let’s get the hard part out of the way by introducing you to rsevents::AutoResetEvent
, a one-byte6 synchronization primitive that takes care of putting threads to sleep when the event isn’t signalled/available and allowing one-and-only-one waiting thread to either consume the event (if it’s not already asleep) or to wake up (if it’s asleep waiting for the event) when the event is signalled (after which the event is atomically reset to a “not signalled” state). It doesn’t even have any spurious waits, making it really nice and easy to work with in a safe fashion. All of our Semaphore
implementations will use this auto-reset event to take care of the synchronization and we’ll omit the details of when and where to call AutoResetEvent::set()
and AutoResetEvent::reset()
for now.
So here’s what our initial semaphore skeleton looks like. We know we need an internal count
of some integral type to keep track of the current concurrency (since we already established that it’s going to be variable and not just zero or one), and we know that at minimum a semaphore’s interface needs to provide a wait to “obtain” the semaphore (decrementing the available concurrency for future callers) and a way to “release” the semaphore (at least to be used by a thread that has already obtained a “concurrency token” to re-increment the count after it is done and wants to give up its access to the concurrency-restricted region for some other caller to take).
struct Semaphore {
event: AutoResetEvent,
count: AtomicU32,
// TODO: Fill in the rest
}
impl Semaphore {
/// Create a new `Semaphore`
pub fn new(/* TODO */) -> Semaphore { /* TODO */ }
/// Obtain the `Semaphore`, gaining access to the protected concurrency
/// region and reducing the semaphore's internal count. If the
/// `Semaphore` is not currently available, blocks sleeping until it
/// becomes available and is successfully obtained.
pub fn wait(&self) -> ??? { ... }
/// Increment the semaphore's internal count, increasing the available
/// concurrency limit.
pub fn release(&self, ...) { ... }
}
Our goal now is to fill in the blanks, attempting to make a semaphore that’s maximally useful and as safe as possible, while limiting its size and runtime checks (the costs of said safety).
A constant maximum concurrency?
We’ve already established that a semaphore needs to offer a tunable “maximum concurrency” parameter that decides the maximum number of threads that can have access to the concurrency-limited region at any given time. This number is typically supplied at the time the semaphore is instantiated, and it’s quite normal for it to be fixed thereafter: while the current available concurrency may be artificially constrained beyond the number of threads that have actually borrowed/obtained the semaphore, it’s OK for the absolute maximum to be unchangeable after a semaphore is created.
We really have only two choices here: we either add a max_count
integral struct member to our Semaphore
or we take advantage of rust’s const generics (another brilliant zero-cost abstraction!) to eliminate this field from our struct altogether… but at a considerable cost.
Let’s consider some constraints that might determine the maximum concurrency a semaphore can have:
- We’ve experimentally determined that our backup application works best with up to eight simultaneous threads in the file read part of the pipeline and up to two simultaneous threads in the file write part of the pipeline, to balance throughput against maxing out the disk’s available IOPS. Here we can use two separate semaphores, each with a different hard-coded maximum concurrency.
- Almost all modern web browsers limit the maximum number of live TCP connections per network address to a hard-coded limit, typically 16. Internally, the browser has a semaphore for each domain name with an active TCP connection (perhaps in a
HashMap<DomainName, Semaphore>
), and blocks before opening a new connection if the maximum concurrency is exceeded.
In these cases, we could get away with changing our Semaphore
declaration to Semaphore<const MAX: usize>
and using const generics when initializing a semaphore to specify the hard-coded maximum concurrency limit, e.g. let sem = Semaphore::<16>::new(...)
. But const generics doesn’t just lock us into a hard-coded value specified at the time of object initialization, it locks us into a hard-coded value specified at the time of writing the source code for the object’s initialization. That means we can’t use a const generic parameter in lieu of a max_concurrency
field in cases like the following, which we unfortunately can’t just ignore:
- We want to limit the number of threads accessing a code section or running some operation to the number (or a ratio of the number) of CPU cores the code is running on (not compiled on).
- We want to let the user select the max concurrency at runtime, perhaps by parsing a
-j THREADS
argument or letting the user choose from a drop-down menu or numselect widget in a GUI application. - Going back to our backup application example, we want to use different read/write concurrency limits in the case of an SSD vs in the case of an old-school HDD.
This means that we’re going to have to approximately double the size of our Semaphore
object, mirroring whatever type we’re using for Semaphore::count
to add a Semaphore::max_concurrency
member (they have to be the same size because count
can vary from zero to max_concurrency
), giving us the following, fairly complete, struct declaration.
A functionally complete semaphore
As we’ve determined, we unfortunately can’t use rust’s const generics to roughly halve the space a Semaphore
object takes in memory, giving us the following declaration:
struct Semaphore {
event: AutoResetEvent,
count: AtomicU32,
max_count: AtomicU32,
}
While we were forced to add a max_count
field to store the maximum allowed concurrency for the semaphore, this wasn’t at all a violation of the “zero-cost abstraction” principle: if we want to allow the user to specify something at runtime and match against it later, this is the cost we invariably pay (whether in rust or in assembly).
As bare as our Semaphore
structure is, this is actually enough to implement a completely functional and – for the most part – safe semaphore. Thanks to how AutoResetEvent
is implemented internally and its own safety guarantees, we can get away with just some extremely careful use of atomics, without a lock or mutex of any sort:
impl Semaphore {
/// Create a new `Semaphore`
pub fn new(initial_count: u32, max_count: u32) -> Semaphore {
Semaphore {
event: AutoResetEvent::new(EventState::Unset),
count: initial_count,
max_count,
}
}
/// Obtain the `Semaphore`, gaining access to the protected concurrency
/// region and reducing the semaphore's internal count. If the
/// `Semaphore` is not currently available, blocks sleeping until it
/// becomes available and is successfully obtained.
pub fn wait(&self) {
let mut count = self.count.load(Ordering::Relaxed);
loop {
count = if self.count == 0 {
// Semaphore isn't available, sleep until it is.
self.event.wait();
// Refresh the count after waking up
self.count.load(Ordering::Relaxed)
} else {
// We can't just fetch_sub(1) because it might underflow in a race
match self.count.compare_exchange(count, count - 1,
Ordering::Acquire, Ordering::Relaxed)
{
Ok(_) => {
// We officially obtained the semaphore.
// If the (now perhaps stale) new `count` value is non-zero,
// it's our job to wake someone up (or let the next caller in).
if count - 1 > 0 {
self.event.set();
}
break;
},
Err(count) => count, // Refresh the cached value and try again
}
}
}
// This must hold true at all times!
assert!(count <= self.max_count);
return ();
}
/// Increment the semaphore's internal count, increasing the available
/// concurrency limit.
pub fn release(&self) {
// Try to increment the current count, but don't exceed max_count
let old_count = self.count.fetch_add(1, Ordering::Release);
if old_count + 1 > self.max_count {
panic!("Attempt to release past max concurrency!");
}
// Check if we need to wake a sleeping waiter
if old_count == 0 {
self.event.set();
}
}
}
If you’re familiar with atomic CAS, the annotated source code above should be fairly self-explanatory. In case you’re not, briefly, AtomicXXX::compare_and_exchange()
is how most lock-free data structures work: you first read the old value (via self.count.load()
) and decide based on it what the new value should be, then use compare_and_exchange()
to change $old
to $new
, if and only if the value is still $old
and hasn’t been changed by another thread in the meantime (if it has changed, we re-read to get the new value and try all over again until we succeed).
With the Semaphore
skeleton filled out, we now have a functionally complete semaphore with features comparable to those available in other languages. At the moment, Semaphore::wait()
returns nothing and any thread that’s obtained the semaphore must ensure that Semaphore::release()
is called before it returns, but that’s just an ergonomic issue that we can easily work around by returning a concurrency token that calls Semaphore::release()
when it’s dropped instead.7
But can we make it safer?
The problem with Semaphore::release()
For the remainder of this article, we’ll be focusing on the ugly part of a semaphore, the part that makes writing a truly safe semaphore so challenging: Semaphore::release()
.
In rust, the standard library concurrency primitives all return “scope guards” that automatically release (or poison) their associated synchronization object at the end of the scope or in case of panic. This actually isn’t just a question of ergonomics (as we mentioned before), it’s a core part of their safety. On Windows, you can create a mutex/critical section with InitializeCriticalSection()
and obtain it with EnterCriticalSection()
but any thread can call LeaveCriticalSection()
, even if it invokes undefined behavior that may cause a deadlock. On Linux and its pals, pthread_mutex_init()
can be used to create a mutex and pthread_mutex_lock()
can be used to enter the mutex. While pthread_mutex_unlock()
is documented as may return an EPERM
error if the current thread doesn’t own the mutex, the notes clarify that the current owning thread id isn’t stored and deadlocks are allowed in order to avoid overhead – meaning its implementation-defined whether or not pthread_mutex_unlock()
actually protects against unlocking from a different thread. In practice, it doesn’t.8
Rust, as a partially object-oriented language, sidesteps all this via our good friend RAII by simply making the equivalent of Mutex<T>::unlock(&self)
unavailable except via the scope guard (MutexGuard
) returned by Mutex<T>::lock(&self)
(and the same for RwLock<T>
). The type system prevents you from calling the equivalent of pthread_mutex_unlock()
unless you already own a MutexGuard
, which can only be acquired as a result of a successful call to Mutex::lock()
– without needing any runtime code to check whether or not the calling thread is the owning thread, because the type system provides that safety guarantee at zero cost.
Unfortunately, as I mentioned earlier in passing, even if we did make our Semaphore::wait()
function return some sort of concurrency token/scope guard, it would at best by an ergonomic improvement to make sure one doesn’t forget to call Semaphore::release()
before exiting the scope, but it wouldn’t allow us to eliminate a publicly callable Semaphore::release()
function that any thread could call at any time, at least not without making it impossible to create a semaphore with an initial count that doesn’t equal the maximum count, and not without making the “current maximum concurrency” limit non-adjustable at runtime.
Do we even need Semaphore::release()
anyway?
At this point, you might be tempted to ask if we really truly absolutely need these options and questioning what purpose they serve – which is a good and very valid question, given the cost we have to pay to support it and especially because in all the cases we mentioned above (those with a fixed max_count
and those without), you’d always create a semaphore with initial_count == max_count
. Here are some hopefully convincing reasons:
- Semaphores aren’t just used to limit concurrency up to a maximum pre-defined limit, they’re also used to temporarily artificially constrain available concurrency to a value in the range of
[0, max_count]
– in fact, this is where you might find their greatest utility. - CS principle: Preventing a “thundering herd” when a number of threads are blocked waiting on a semaphore to allow them access to a resource or code region. You can have up to n threads performing some task at the same time, but you don’t want them to all start synchronously because it’ll cause unwanted contention on some shared state, e.g.
- Perhaps an atomic that needs to be incremented, and you don’t want to waste cycles attempting to satisfy
compare_exchange()
calls under contention or you otherwise have a traditional fine-grained lock or even lock-free data structure where uncontended access is ~free but contention wastes CPU cycles busy-waiting in a spinlock or even puts threads to sleep; - Starting operations at ~the same time can run into real-world physical limitations, e.g. it takes n IO threads to achieve maximum saturation of available bulk bandwidth but the initial seeks can’t satisfy as many IOPS without unduly increasing latency and decreasing response times.
- The semaphore controls the number of threads downloading in parallel but you don’t want to DOS the DNS or HTTP servers by flooding them with simultaneous requests, though you do ultimately want n threads handling the response, downloading the files, etc. in parallel.
- Perhaps an atomic that needs to be incremented, and you don’t want to waste cycles attempting to satisfy
- CS principle: Making available concurrency accrue over time, up to a maximum saturation limit (represented by
max_count
), for example:- A family of AWS EC2 virtual machine instances have what is referred to as “burstable performance” where for every m minutes of low CPU usage, you get t time slices of “more than what you paid for” instantaneous CPU performance, up to n maximum time slices accrued. As a simplified example, you “rent” a virtual machine with a lower nominal CPU speed of 2.0 GHz and for every 5 minutes with a p95 per-5-second-time-slice CPU utilization below 50%, you get a “free” 5-second-time-slice of 3.0 GHz compute. If you launch a burstable instance and immediately try running Prime95, you’ll be capped to 2.0 GHz, but if you are running nginx and serving web requests under relatively no load, then after 10 minutes you’ll have accrued 10 seconds of “free” 3.0 GHz performance. When you suddenly get a burst of traffic because a link to your site was just shared on an iMessage or WhatsApp group and 200 phones are hitting your server to generate a web preview, you can “cash in” those accrued 3.0 GHz time slices to satisfy those requests quickly, after which you’re constrained to the baseline 2.0 GHz performance until the requests settle down and you begin to accrue 3.0 GHz time slices once again.
- While you can implement a rate limiter by giving each connected user/IP address a maximum number of requests they can make per 5 minutes and resetting that limit every 5 minutes, that still means your server can be DDOS’d by n clients saturating their limit of 100-requests-per-5-minutes in the first few seconds after establishing a connection. You can instead give each client a semaphore9 with an initial available count of, say, 2 and then every three seconds increment the available concurrency by 3; meaning clients would have to be connected for a full 5 minutes before they can hit you with 100 requests in one go (making it more expensive to mount an attack and giving your firewall’s heuristics a chance to disconnect them).
Hopefully the above reasons demonstrate the utility in being able to create a Semaphore
with an initial count lower than the maximum count, and therefore, the need to have a free-standing Semaphore::release()
function that, at the very least, the creator of the semaphore can call even without having previously made a corresponding call to Semaphore::wait()
.
Can we make Semaphore::release()
safer?
There’s an immediately obvious way to make Semaphore::release()
safer to call, and that’s to replace it with a Semaphore::try_release()
method that first checks to ensure that the semaphore’s core integrity predicate (self.count <= self.max_count
) is upheld instead of blindly incrementing self.count
and then panicking if it then exceeds self.max_count
, returning true
or false
instead to denote whether the operation completed successfully or not.
It’s actually not that hard of a change to make, provided you’re familiar with compare-and-exchange atomics (which we used above in safely implementing Semaphore::wait()
to prevent two threads from racing to increment self.count
after checking that it’s less than self.max_count
), which we’ll use to write our try_release()
method now:
impl Semaphore {
/// Attempts to increment the `Semaphore`'s internal count,
/// returning `false` if it would exceed the maximum allowed.
pub fn try_release(&self) -> bool {
let mut prev_count = self.count.load(Ordering::Relaxed);
loop {
if prev_count == self.max_count {
// Incrementing self.count would violate our core precondition
return false;
}
match self.count.compare_exchange_weak(
prev_count, // only exchange `count` if it's still `prev_count`
prev_count + 1, // what to exchange `count` with
Ordering::Release, Ordering::Relaxed)
{
// The CAS succeeded
Ok(_) => {
if prev_count == 0 {
// Wake a waiting thread
self.event.set();
}
return true;
}
// If it failed, refresh `prev_count` and retry, failing if
// another thread has caused `prev_count` to equal `max_count`.
Err(new_count) => prev_count = new_count,
}
}
}
}
This is much better. We only increment count
if we can guarantee that it won’t cause it to exceed max_count
, and relay our results to the caller. We can now guarantee that call to Semaphore::try_release()
that wasn’t paired with a previous Semaphore::wait()
won’t inadvertently violate the absolute limit on the allowed concurrency.
But can you spot the problem? Look at the code sample carefully, and consider it not in the context of a single call to Semaphore::try_release()
but as a whole.
If you think you’ve figured it out or you just want to skip to the answers, read on to find out where the problem still lies. (Yes, this is just a filler paragraph to avoid your eyes immediately seeing the answer while you think this over. Hopefully you haven’t already scrolled down too much so that the answer is already in view. Sorry, I can’t really do better than this, my typesetting system doesn’t let me insert empty vertical whitespace very elegantly.)
The issue is that we’ve prevented this call to Semaphore::try_release()
from incrementing count
past max_count
, but we might already have extant threads chugging away in parallel that have already obtained the semaphore, oblivious to the fact that count
has changed from under them. The problem is easy to spot if we keep a copy of our old Semaphore::release()
around and mix-and-match between it and try_release()
, with calls we expect to always succeed using the former and calls that may overflow the current count using the latter:
fn main() {
// Create a semaphore with a count of 1 and max count of 2
let semaphore = Semaphore::new(1, 2);
// Create a number of threads that will use the semaphore
// to make sure concurrency limits are observed.
for n in 0..NUM_CPUS {
std::thread::spawn(|| {
// Make sure to obtain a concurrency token before beginning work
semaphore.wait();
// TODO: Replace `sleep()` with actual work!
std::thread::sleep(Duration::from_secs(5));
// Release the concurrency token when we're done so another
// thread may enter this code block and do its thing.
semaphore.release();
});
}
while !work_finished {
// <Read user input from keyboard here>
// In response to a user command, increase concurrency
if !semaphore.try_release() {
eprintln!("Cannot raise limit, max already reached!");
}
}
}
We’re using our semaphore to limit the number of simultaneous worker threads, originally operating at 50% of our maximum concurrency limit (count == 1
). To start, one of NUM_CPUS10 worker threads obtains the semaphore (count == 0
) to access the concurrency limited region and the rest are locked out.
In the main thread’s event loop, we wait for work to finish or a user command to be entered at the keyboard (omitted from the example).11 The user keys in an input that’s interpreted as “try to speed things up by increasing the concurrency limit,” and in response the main thread calls Semaphore::try_release()
and succeeds in incrementing its internal self.count
(now count == 1
again), thereby allowing a second thread to enter the semaphore-protected region (giving count == 0
) and contribute to progress.
But what happens now that two threads are already in the semaphore-protected scope but the user triggers a second call to Semaphore::try_release()
? As far as the semaphore knows, count
(equal to 0) is less than max_count
, and can be safely incremented, yielding count == 1
and thereby unblocking yet another worker thread sleeping in semaphore::wait()
(reducing count
to zero).
But what does that mean? Even though our core precondition hasn’t been violated in this instant (Semaphore::count <= Semaphore::max_count
), we now have three worker threads in the concurrency-limited region, exceeding the hard-coded max_count
provided during the semaphore initialization! In our example above where each worker thread, having first obtained a concurrency token via Semaphore::wait()
, assumes that it can call the infallible Semaphore::release()
method safely when it’s done, but when the last worker thread finishes its work, it’ll end up incrementing count
past max_count
and panicking in the process.
Of course, the panic isn’t the problem – it’s just reporting the violation when it sees it. We could replace all Semaphore::release()
calls with Semaphore::try_release()
and we’d still have a problem: there are more worker threads in the semaphore-protected region than allowed, and one of the “shouldn’t fail” calls to Semaphore::try_release()
will eventually return false
, whether that triggers a panic or not.
The crux of the matter is that we borrow from Semaphore::count
but don’t have a way to track the total “live” concurrency count available (the remaining concurrency tokens in self.count
plus any temporarily-unavailable-but-soon-to-be-returned concurrency tokens borrowed by the various worker threads). And, importantly, we don’t need this for any core semaphore functionality but rather only to protect against a user/dev calling Semaphore::release()
more times than they’re allowed to. In a perfect world, it would be the developer’s responsibility to make sure that there are never more outstanding concurrency tokens than allowed, and we could omit these safety checks altogether. She could statically ensure it’s the case by only ever pairing together wait()
and release()
calls (perhaps after issuing a fixed, verifiable number of release()
calls upon init), or by tracking when and where needed the number of free-standing calls to release()
in an external variable to make sure the limit is never surpassed.
At last, a truly safe semaphore?
While we could offload the cost of verifying that there will never be more than max_count
calls to Semaphore::release()
to the developer, we’re rust library authors and we hold ourselves to a higher standard. We can most certainly do just that, but we would have to mark Semaphore::release()
as an unsafe
function to warn users that there are preconditions to calling this and dangers to be had if calling it willy-nilly without taking them into account. But what if we want a synchronization type that’s easier to use and worthy of including in the standard library, being both safe and easy-to-use? What then?
There are actually several approaches we can take to solving this gnarly problem. The easiest and safest solution would be to simply change our Semaphore
definition, adding another field of the same integral count type, perhaps called total_count
or live_count
that would essentially track initial_count
plus the number of successful free-standing calls to Semaphore::release()
, but not decrement it when a call to Semaphore::wait()
is made.12 This way each time try_release()
is called, we can check if total_count
(and not count
) would exceed max_count
— and continue to use just self.count
for the core semaphore functionality in Semaphore::wait()
:
struct Semaphore {
event: AutoResetEvent,
count: AtomicU32,
total_count: AtomicU32,
max_count: AtomicU32,
}
impl Semaphore {
/// Attempts to increment the `Semaphore`'s internal count,
/// returning `false` if it would exceed the maximum allowed.
pub fn try_release(&self) -> bool {
// First try incrementing `total_count`:
let mut prev_count = self.total_count.load(Ordering::Relaxed);
loop {
if prev_count == self.max_count {
// Incrementing self.total_count would violate our core precondition
return false;
}
match self.total_count.compare_exchange_weak(
prev_count, // only exchange `total_count` if it's still `prev_count`
prev_count + 1, // what to exchange `total_count` with
Ordering::Relaxed, Ordering::Relaxed)
{
// If the CAS succeeded, continue to the next phase
Ok(_) => break,
// If it failed, refresh `prev_count` and retry, failing if
// another thread has caused `prev_count` to equal `max_count`.
Err(new_count) => prev_count = new_count,
}
}
// Now increment the actual available count:
let prev_count = self.count.fetch_add(1, Ordering::Release);
if prev_count == 0 {
// Wake up a sleeping waiter, if any
self.event.set();
}
return true;
}
}
But now, something else breaks!
Let’s go back to our last example with many worker threads attempting to access a concurrency-restricted region and a main event loop that reads user commands that can affect the available concurrency limit. To make this more relatable, let’s use a real-world example: we’re developing a command-line BitTorrent client and we want the user to control the number of simultaneous uploads or downloads, up to some limit (that might very well be u32::MAX
). In the last example we were focused on user commands that increased the available concurrency limit, in our real-world example perhaps reflecting a user trying to speed up ongoing downloads or seeds by allowing more concurrent files or connections.
But this isn’t a one-way street! Just as a user may wish to unthrottle BitTorrent downloads, they might very well wish to throttle them to make sure they’re just seeding in the background without saturating their Top Tier Cable Provider’s pathetic upload speed and killing their internet connection in the process. How do we do that safely?
One way would be to introduce a method to directly decrement our semaphore’s self.total_count
and self.count
fields (down to zero), but what do we do if total_count
was non-zero but count
was zero (i.e. all available concurrency was currently in use)? Apart from the fact that we are using unsigned atomic integers to store the counts, we could decrement count
(but not total_count
) past zero, for example to -1
, and let the “live” concurrency eventually settle at the now-reduced total_count
after a sufficient number of threads finish their work and leave the concurrency-limited region.
But we don’t actually need to do any of that: by its nature a semaphore already provides a way to artificially reduce the available concurrency by means of a free-standing call to Semaphore::wait()
, i.e. calling wait()
without calling release()
afterwards. It ensures that the count
isn’t reduced until it’s non-zero and that count
never exceeds max_count
or total_count
at any time, not even temporarily.
Unfortunately, herein lies a subtle problem. With our revised semaphore implementation, we increase both count
and total_count
when the free-standing release()
is called and assume that each call to Semaphore::wait()
will have a matching call to some Semaphore::special_release()
that increases count
without touching total_count
. This way, total_count
tracks the “total available” concurrency, assuming that it’s equal to “remaining concurrency limit” plus “outstanding calls to wait()
that haven’t yet called xxx_release()
.
While free-standing calls to Semaphore::release()
were our problem before, here we’ve shifted that to an issue with free-standing calls to Semaphore::wait()
– an admittedly less hairy of a situation but, as we have seen, still not one that we can afford to ignore!
More importantly, even if we weren’t using free-standing calls to Semaphore::wait()
to artificially reduce the available concurrency, we actually can’t guarantee that release()
is always called after wait()
: it’s a form of the halting problem, and even if we ignore panics and have wait()
return a scope guard that automatically calls release()
when it’s dropped, it’s still completely safe for a user to call std::mem::forget(scope_guard)
thereby preventing release()
from being called!13
Fundamentally, we can’t really solve this problem. We either err on the side of potentially allowing too many free-standing calls to release()
to be made, with safety checks delaying the overflow of max_count
until the last borrowed concurrency token is returned after a call to wait()
; or we err on the side of prudence and incorrectly prevent a free-standing call to release()
from going through because we don’t know (and can’t know) that a thread which previously call wait()
and took one of our precious concurrency tokens has decided it’s not going to ever return it.
But don’t despair! Do you remember the old schoolyard riddle? You’re silently passed a pen and notebook, on which you see the following:
Like with the riddle,14 we can implement scope guards to make it more likely that every call to wait()
is matched by call to release()
but we can’t actually stop the user from calling std::mem::forget(sem.wait())
– and we don’t have to. Without trying to think of ways to cause a compiler error when a scope guard is leaked and not dropped, we can still make it, if not hard then at least harder for the user to leak a scope guard and throw our internal count off. How? Not by hiding the ability to forget a scope guard but by highlighting it front and center!
Let’s fast forward to our semaphore from above, but modified to return a scope guard instead to encourage returning concurrency tokens back to the semaphore when a thread has finished with a concurrency-limited operation:
/// This concurrency token is returned from a call to `Semaphore::wait()`.
/// It's automatically returned to the semaphore upon drop, incrementing
/// the semaphore's internal available concurrency counter once more.
struct ConcurrencyToken<'a> {
sem: &'a Semaphore
}
impl Semaphore {
pub fn wait<'a>(&'a self) -> ConcurrencyToken<'a> {
include!("earlier definition of Semaphore::wait() here");
// Now instead of returning () we return a ConcurrencyToken
return ConcurrencyToken {
sem: self,
}
}
/// Directly increments the internal concurrency count without touching
/// `total_count` and without checking if it would exceed `max_count`.
unsafe fn release_internal() {
let prev_count = self.count.fetch_add(1, Ordering::Release);
// We only need to wake a sleeping waiter if the previous count
// was zero. In all other cases, no one will be asleep.
if prev_count == 0 {
self.event.set();
}
}
}
impl Drop for ConcurrencyToken<'_> {
fn drop (&mut self) {
unsafe { self.sem.release_internal(); }
}
}
This was our initial attempt at “strongly encouraging” calls to Semaphore::wait()
to always be paired with calls to Semaphore::internal_release()
(called by the ConcurrencyToken
on drop), which decrements count
without touching total_count
so our logic in Semaphore::try_release()
can continue to work.
As we said though, if one were to call std::mem::forget(sem.wait())
the ConcurrencyToken
would be forgotten without internal_release()
ever being called, and the count we track in total_count
would be off by one, preventing a free-standing call to Semaphore::release()
that should have been allowed.
So what if we just directly add a new method to our concurrency token? A ConcurrencyToken::forget()
makes it harder to call std::mem::forget()
on our concurrency token than it is to just call Semaphore::wait().forget()
directly! (See, I really was going somewhere with that riddle!)
impl ConcurrencyToken<'_> {
/// It is a violation of this crate's contract to call `std::mem::forget()`
/// on the result of `Semaphore::wait()`. To forget a `ConcurrencyToken`,
/// use this method instead.
pub fn forget(self) {
// We're keeping `count` permanently reduced, but we need to decrement
// `total_count` to reflect this as well before forgetting ourselves.
self.sem.total_count.fetch_sub(1, Ordering::Relaxed);
std::mem::forget(self);
}
}
And just like that, we now have something we can reasonably call a “safe” semaphore, worthy of rust!
The price we pay for safety
While I can’t say with complete confidence that this is the optimal implementation of a safe semaphore (exposing the same functionality), our journey above is still representative of the constant tug-of-war that takes place when trying to build an API as you juggle performance, the desire for zero-cost abstractions, and the imperative of surfacing a (within the boundaries of reasonable use) safe and misuse-resistant interface.
We started with something completely simple: two words for the count and a single byte auto-reset event we could use to impose strict ordering and optimized waiting/sleeping in cases of contention. Correctness (which, if you squint at it in a particular way, is just another kind of safety) mandated the use of atomics from the very start, preventing us from playing fast and loose with our integer math and imposing heavy penalties when it came to ensuring cache coherence and cross-core integrity. Then, just when we thought we had everything figured out, we needed to completely change our approach and even add a new struct member to boot (raising the size of the Semaphore
struct by 33-45% depending on the integer width, which sounds really scary until you realize it’s still just a handful of bytes).
There are of course other possible solutions to the same problem, all of which potentially have their own drawbacks.15 And even if there are cost-free solutions here, the general picture isn’t unique to semaphores or even concurrency primitives in general: it’s a story that’s played on repeat and comes up every time an interface needs to be added that has some caveats the caller needs to keep in mind. Writing correct code is hard, writing safe and correct code is harder. But, in my opinion, this is what actually makes rust special.
Rust’s concepts of ownership and exclusive RO/RW semantics play a huge role in making it such a popular language with low-level developers, but I would argue that it’s this attention that’s paid when writing libraries that deal with intricate or abstract concepts that can’t be reduced to simple &foo
vs &mut foo
semantics that make rust truly unique. As an old hat at C and C++ development, I’ve already worn my “low-level library developer” mantle thin, and it’s absolutely awesome to be able to write an abstraction that other developers can use without having to dive into syscalls and kernel headers as the only source of literature. But with rust, I’m experiencing a completely different kind of challenge in writing libraries and APIs: here there’s a bar even higher than just writing clean abstractions, and it’s being able to write these low-level abstractions in a way that not only can clever developers that have previously dealt with these system internals appreciate and use our libraries to their advantage, but that even others new to the scene can just read your documentation (or maybe not even that) and then let the shape of your API guide them to the rust, figuring out the right way of using it.
In any language, a savvy enough developer can usually figure their way around even a completely new library dealing with concepts and mechanisms completely alien to them. But in the process of figuring it out, they’re bound to make mistakes, bump into leaky abstractions (“but why shouldn’t I call pthread_mutex_unlock
from another thread if I need to access the mutex and its currently locked? What is it there for, then?” – whether they’re asking it on SO or mulling it over quietly in their head as they figure out the black-box internals by poking and prodding away at the API surface), pull out what’s left of their hair, and bang their head against the wall some before arriving at a generally correct and workable solution.
But it doesn’t have to be that way, and the burden is on us as developers of these libraries and crates to give our fellow devs a better option. Runtime errors (like the ones the pthread API doesn’t even bother returning!) are good and other languages16 have demonstrated how it can be used with non-zero but still fairly minimal overhead, but with the benefits of a strongly typed language, powerful type abstractions, and perhaps a smattering of generics and ZSTs, we can and should do better.
The truly safe semaphore, for your benefit and review
The semaphore we iteratively designed in this article is available for you to use, study, or review as part of the 0.2 release of the rsevents-extra crate. This is the current API exposed by our Semaphore
type (source code here), incorporating some of the ideas discussed above.17
The Semaphore
type in rsevents-extra
actually includes even more safety than we’ve demonstrated above, but it’s of the bog-standard variety (checking for integer overflows, making sure we’re not decrementing past zero, etc) and not something unique to the challenges presented by semaphores in particular. The Semaphore
docs example shows a more fleshed-out version of the “listen for user events to artificially throttle/unthrottle the semaphore,” if you want to check it out.
If you have an itch to try your own hand at writing concurrency primitives, I cannot encourage you enough: it’s all kinds of challenging and rewarding, and really opens your mind to what goes on behind-the-scenes with synchronization types. The rsevents
crate was written to make doing just that a breeze, and I recommend at least starting off with either manual- or auto-reset events to take care of the intricacies of sleeping and the correctness of waking one and only one past/future awaiter at a time. Rust generally uses channels and mutexes to take care of synchronization, but there’s always a time and place for lower level thread signalling constructs!
Show some love and be the first to get new rust articles!
Last but not least: a request for you, dear reader. I put a lot of effort into these rust writeups (and into the open source libraries and crates I author) for nothing but love. I’ve heard good things about Patreon, and have literally just now put up a page to see if anyone would be interested in sponsoring my open source work. If you can’t spare some change to support me and my work on Patreon, please consider following me and my work on twitter, and starring rsevents and rsevents-extra on GitHub.
I’m currently looking for work opportunities as a senior engineer (rust is preferable, but I’m a polyglot). If you or your team is hiring, let me know!
If you liked this writeup, please share it with others on your favorite nerdy social media platform. I also set up a small mailing list that only sends out an email when I post about rust here on this blog, you can sign up below to join (no spam, double opt-in required, one-click unsubscribe, I never share your email, etc. etc.):
Just posted a ~longer writeup on what it takes to implement a truly safe Semaphore type in #rust. Feedback welcome. https://t.co/QZdeCACpnH
— Mahmoud Al-Qudsi (@mqudsi) October 4, 2022
Update (10/5/2022): The examples in this post have been updated to use Ordering::Acquire
or Ordering::Release
when reading/writing self.count
in the wait()
and release()
family of functions to synchronize memory/cache coherence between threads and ensure correct instruction ordering. In the original version of this article, the examples all used Ordering::Relaxed
and relied on self.event
to take care of ordering, but as self.event
is skipped as an optimization in cases where the semaphore has available concurrency, this was insufficient.
In fact, in some languages/apis a “critical section” is another name for a mutex. ↩
For a mutex, they are one and the same as it mandates exclusive access to a region. ↩
Semaphores are generally non-reentrant so recursively obtaining a semaphore will count against its limit! ↩
The empty parentheses/tuple
()
is rust-speak forvoid
, for those of you not familiar with the language. ↩In rust, there’s actually (out of necessity) an entire class of types or values that can be changed even through read-only references, a concept known as interior mutability and exposed via types like
AtomicXXX
and the variousstd::cell Cell
types — but those are generally to be used on a fine-grained level and in general you wouldn’t be using them to make entire objects writeable via read-only references. ↩Actually, the
AutoResetEvent
implementation only takes all of two bits, but let’s just call it a byte to make everything nice and easy. ↩We would still have to keep
Semaphore::release()
around and make sure it can be publicly called so that a semaphore initialized with{ count: m, max_count: n, ... }
with m ≥ 0 and n > m can be used. ↩You can see a sample application testing for
pthread_mutex_unlock()
safety here and try it out for yourself online here. ↩After all, if we use an
AtomicU8
to represent the max/initial count, they can be as small as three bytes each! ↩For the sake of this example, let’s assume
NUM_CPUS
is a sufficiently large number like 4 or 8, so that enough worker threads will try to enter the semaphore-protected region. ↩rsevent-extra
‘sCountdownEvent
might just be the perfect tool for this job, btw! ↩Another way would be to pack
count
andmax_count
into a single double-width atomic (assuming such an atomic of this size exists) and to decrement bothcount
andmax_count
when a call toSemaphore::wait()
is made. This way, any calls toSemaphore::release()
would compare the potential increase incount
against a similarly decrementedmax_count
and can catch any violations of our core precept. The issues described in the remainder of this article persist regardless of which method was chosen. ↩In rust parlance, memory leaks do not fall under the safety guarantees of the language and it’s perfectly “safe” if not exactly cromulent to write code that doesn’t
drop()
RAII resources. ↩Give up? Just draw another, longer line next to it! ↩
I still need to sit down and experiment with packing
count
andmax_count
into one atomic double-width word and see how it works to decrement bothcount
andmax_count
after each call towait()
instead of tracking with an additionaltotal_count
field, but even there we’d have a price to pay. We can no longer useAtomicXXX::fetch_add()
and we’d have to usecompare_exchange_weak()
in a loop, after fetching the initial value, separating it into its component fields, incrementing/decrementing, then combining the fields into a single word again – although a quick godbolt attempt shows the compiler actually does a rather nice job. ↩C# is a great example here, with extensive input and sanity checking for most APIs but almost all of it in the form of runtime exceptions – despite it being a strongly typed language with powerful and extensible compiler integration. ↩
It currently still uses atomic unsigned integers in the implementation and so does not implement the wait-free, eventually consistent API to artificially reduce the currently available concurrency without making a
wait()
call, waiting for it to return, and then forgetting the result. At the time of its initial development, I had started off with signed integers then realized I didn’t need them for the core functionality and switched to unsigned atomic values instead. I may need to revisit that decision in another release if it can give us either a wait-freereduce()
corollary torelease()
instead of theSemaphore::wait().forget()
or the currentmodify()
method which allows wait-free direct manipulation of the internal count, but only with an&mut Semaphore
reference (to guarantee that it isn’t in use, eschewing eventual consistency for correctness), but feedback on whether a wait-freereduce()
at the cost of eventual consistency is a win or a draw/loss would be appreciated from anyone nerdy enough to read these footnotes! ↩
Very interesting article! Thank you so much for the write-up.
One minor nitpick, though: The unit type in rust is different from void in C. Unit is a type with exactly one value. The closest equivalent in C is probably an empty struct. void is an uninhabited type, that means it has no value. Therefore, it’s equivalent to Rust’s Never type (!), std::convert::Infallible or an emtpy enum.
Hey there,
thank you for writing this interesting article.
While reading the article I felt convinced that there has to be a solution with less than three u32s, so I took a stab at it.
Keep in mind that I never actually worked with semaphores before and I didn’t actually look at your full implementation, only read the article, so maybe my code is missing some crucial functionality but either way here goes:
https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=b0097ac76d837488d8568e3a329efe63
I hope it can be of any use.
Have a great day and best regards 🙂
P.S.: If it is any use to you, you or anyone else may use it in any way they like as far as the parts I didn’t copy from your code are concerned.
It seems a little unusual to use the same functions for incrementing and decrementing max affective acquisitions as said acquisitions themselves.
Wouldn’t a better approach be to have an ownable struct that allows the owner to increment or decrement the max affective acquisitions while the struct available to the consuming threads only exposes a scope guard that increments and decrements the currently in flight count?
In such a scenario, you swap your three internal atomicsu32s with in_flight, effective_max, and absolute_max.
Is there some special reason to use wait and release to do the runtime tuning other than just reducing the number of methods?
please use a monospaced font for your code blocks