New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge crossbeam-channel into std::sync::mpsc
#93563
Conversation
|
Thanks for working on this! I'd like to avoid adding any new public API (like mpmc) in the course of fixing mpsc, even if it's unstable. Let's focus on making mpsc less broken. Also, the phrasing of the PR seems unfortunate; crossbeam channel has lots of users, but that doesn't make it "the defacto replacement". |
True, I just meant to say it is the most popular replacement. I'll update the description. |
Rather than copying the whole crate, you can add it as an internal dependency and re-export it; that will make it easier to update in the future. See |
|
@jyn514 The problem with that is |
|
Crossbeam-channel also directly depends on libstd for getting the current thread id: https://github.com/crossbeam-rs/crossbeam/blob/85d0bdc95cdac6450ac1ba4ed587f362a5235e60/crossbeam-channel/src/waker.rs#L274 It probably also depends on other parts of libstd to wake up the right thread. |
|
There is a UI test (src/test/ui/threads-sendsync/mpsc_stress.rs) that is partially disabled. This PR should enable it. And make sure that it still detects the bug (red-green). |
|
If the latter, would it make sense for This could be a way to resolve the conflict between: the |
I'm surprised this code does as much dubious spinning as it does. In practice most of it you'd expect not to hurt that badly (which matches my experience with crossbeam), but on some arches a few of the placements of backoff.spin() could have bad behavior.
The spinlock was a surprise though. I had no idea crossbeam had one.
| } | ||
| } | ||
|
|
||
| const SPIN_LIMIT: u32 = 6; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 << 6 calls to hint::spin_loop() is... very aggressive. Those calls tend to be a few hundred cycles. Actually, it's 1 << 6 + 1 << 5 + 1 << 4 + ... before it falls back to pinging thread::yield(), which is not really ideal either — "switch to random thread" is almost always bad behavior in real code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the code to use quadratic instead of exponential backoff (437ea34). The fallback to thread::yield is only in blocking loops, where there isn't really any other option..
| } | ||
| Err(h) => { | ||
| head = h; | ||
| backoff.spin(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an antipattern to spin in the failure case of a compare_exchange_weak loop like this (there are a few others too in this code). For those you should just update the start var and continue to retry immediately. There are a lot of reasons doing it the way you have is bad:
-
The code just got an updated
headvalue right before waiting. The best time to use it was right after getting the updated value, not later. Waiting will increase the likelihood that that value becomes stale, especially as the spin count gets higher.If you look at the asm on arm, you can see that in these cases it could easly just repeatedly end up repeatedly doing ldrex, bailing out and clrex+spinning, without ever getting to even try the strex https://rust.godbolt.org/z/aYYEKd6rE
-
You're basically not allowed to do anything complex inside the ll/sc loop so in practice calling the backoff here makes the code much worse as it doubles-up the loop to manage this.
-
The compare_exchange was weak so it may have a totally spurious failure anyway.
-
Not an issue here, but it can cause livelock if you're unlucky and the backoff count and atomic are on the same line.
In practice most people should use AtomicFoo::fetch_update for this stuff which gets all of this right, and is less tedious, and expresses intent better too. That said, this loop does enough that it might be hard to make that way.
The other usage function might be okay -- or, I don't love it (it seems like it would just make things worse here... is there a specific situation that was trying to be avoided), but this one is wrong.
(I will note that the implementations that are cited as reference don't have it, and it's surprising to see something like this, since I think it doesn't actually have to wait for other threads to make progress...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an aside, this is great advice and something I haven't seen very well documented anywhere. Any chance you'd consider writing this up as a blog post, or better yet, find a way to add it to the std docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is actually a good reason for spinning on CAS failure: if a thread encounters a lot of CAS failures then it means that the atomic it is trying to access is heavily contended. In such a situation where multiple threads are trying to access the same cacheline, you can significantly improve throughput by backing off and letting a single thread access the atomic at a time.
Essentially the access patterns go from:
Thread A
Thread B
Thread A
Thread B
Thread A
Thread B
Thread A
Thread B
to:
Thread A
Thread A
Thread A
Thread A
Thread B
Thread B
Thread B
Thread B
This improves throughput since the cacheline stays on the same CPU across multiple accesses.
This optimization looks good in benchmarks (I do something similar in parking_lot) but it is unclear how much impact it has on real world programs. I don't even know where to begin measuring this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the code to reload the head after spinning to reduce the likelihood of it being stale (0ae5d53).
| const YIELD_LIMIT: u32 = 10; | ||
|
|
||
| /// Performs exponential backoff in spin loops. | ||
| pub struct Backoff { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we keep this code in the tree, please indicate that this shouldn't be used or emulated elsewhere in libstd?
| /// Waits until a message is written into the slot. | ||
| fn wait_write(&self) { | ||
| let backoff = Backoff::new(); | ||
| while self.state.load(Ordering::Acquire) & WRITE == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was deeeply concerning at first, but in practice I think you'd expect this to only very rarely need to go into the loop given where it's called? Still...
| Err(h) => { | ||
| head = h; | ||
| block = self.head.block.load(Ordering::Acquire); | ||
| backoff.spin(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing i said about spinning in compare_exchange_weak applies here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same fix as above.
|
All that said, this code is vastly easier for me to follow than |
|
This PR was briefly discussed in the library API meeting last week. We didn't reach any conclusion about future API changes like deprecation or adding an mpmc channel. That can be discussed separately if someone wants to put in the time and energy. We see this PR itself basically as a bugfix, requiring no FCP. A crater run might be useful though, and it's probably good if there's more than one reviewer. (Thanks @thomcc for helping out with reviewing!) |
|
@ibraheemdev You're asking people for review on the community Discord; I'm not really qualified to review this but I can offer some feedback. In internal codebases, we make extensive use of crossbeam channels to communicate between well-named threads. This has the nice benefit that you can run loop {
match rx.try_recv() {
Ok(msg) => process(msg),
Err(TryRecvError::Empty) => sleep(std::time::Duration::from_millis(1)),
Err(TryRecvError::Disconnected) => break,
}
}I've seen this pattern drop the I'm concerned that putting a spinny channel implementation that does awesome on bechmarks into |
|
I was asked for a reproducer of the above. I thought this would be hard but... it's not? Here's a ~25x reduction in CPU usage from the sleepy pattern over crossbeam defaults: fn main() {
let (tx, rx) = crossbeam_channel::unbounded();
for _ in 0..100 {
std::thread::Builder::new()
.name("worker".to_string())
.spawn({
let tx = tx.clone();
move || {
for i in 0..usize::MAX {
tx.send(i).unwrap();
std::thread::sleep(std::time::Duration::from_millis(1));
}
}
})
.unwrap();
}
let mut sum = 0usize;
// This impl uses ~26% of a core in top for crossbeam
// and ~11% of a thread for std::sync::mpsc
for v in rx.iter() {
sum = sum.wrapping_add(v);
}
// This impl uses ~0.9% of a core in top for crossbeam
// and ~1.8% in current std::sync::mpsc
/*
loop {
match rx.try_recv() {
Ok(v) => {
sum = sum.wrapping_add(v);
}
Err(crossbeam_channel::TryRecvError::Empty) => {
std::thread::sleep(std::time::Duration::from_millis(1));
}
Err(crossbeam_channel::TryRecvError::Disconnected) => {
break;
}
}
}
*/
println!("{}", sum);
} |
Any applications which are more concerned with energy usage than throughput (which could include something as non-specialized as Rust in WASM in a web page that happens to be opened on a smartphone, as well as embedded applications) would correctly prefer to sleep rather than spin. This seems like an area where (Disclaimer: I'm not an expert in optimizing for either of the above; I've just heard a few things, and am trying to make sure this consideration isn't entirely overlooked.) |
|
Assigning @Amanieu and myself to review this, since we're both used to working with concurrency/atomics. |
|
|
|
@rustbot ready |
|
Changes look fine. @bors r=Amanieu |
|
|
|
Finished benchmarking commit (afd7977): comparison URL. Overall result:
|
| mean | range | count | |
|---|---|---|---|
| Regressions (primary) |
0.8% | [0.4%, 1.1%] | 5 |
| Regressions (secondary) |
0.5% | [0.5%, 0.5%] | 1 |
| Improvements (primary) |
-0.7% | [-1.1%, -0.3%] | 3 |
| Improvements (secondary) |
- | - | 0 |
| All |
0.2% | [-1.1%, 1.1%] | 8 |
Max RSS (memory usage)
Results
This is a less reliable metric that may be of interest but was not used to determine the overall result at the top of this comment.
| mean | range | count | |
|---|---|---|---|
| Regressions (primary) |
1.4% | [0.1%, 2.8%] | 2 |
| Regressions (secondary) |
3.4% | [3.2%, 3.7%] | 3 |
| Improvements (primary) |
-1.3% | [-1.3%, -1.3%] | 2 |
| Improvements (secondary) |
-5.6% | [-8.5%, -1.0%] | 4 |
| All |
0.1% | [-1.3%, 2.8%] | 4 |
Cycles
Results
This is a less reliable metric that may be of interest but was not used to determine the overall result at the top of this comment.
| mean | range | count | |
|---|---|---|---|
| Regressions (primary) |
1.1% | [1.1%, 1.1%] | 2 |
| Regressions (secondary) |
- | - | 0 |
| Improvements (primary) |
-1.3% | [-1.3%, -1.3%] | 1 |
| Improvements (secondary) |
- | - | 0 |
| All |
0.3% | [-1.3%, 1.1%] | 3 |
|
There is a smattering of regressions and improvements, nothing really notable or concerning. @rustbot label: +perf-regression-triaged |
|
As part of this diff, a memory leak was introduced, at least according to Miri when running the std tests filtered for Is this expected (some kind of management memory that is deliberately not being deallocated, or a test that deliberately leaks) or something to look into? See here for the contents of the leaked memory. |
I think this is expected, the test for #39364 leaks a |
|
Is there a way to write that test to clean up afterwards? We could disable the leak checker in Miri, but then we'd miss accidental leaks in other tests. |
If I understand the issue correctly, that test would still catch it. The important part seems to be to upgrade the channel (from a second thread) just before a receive operation times out. As long as the channel is not disconnected before the next receive attempt, the issue would still occur. |
|
All right, submitted as #104401. |
|
This feels like it'd be worth a |
This PR imports the
crossbeam-channelcrate into the standard library as a private module,sync::mpmc.sync::mpscis now implemented as a thin wrapper aroundsync::mpmc. The primary purpose of this PR is to resolve #39364, so there are no public API changes.The reason #39364 has not been fixed in over 5 years is that the current channel is incredibly complex. It was written many years ago and has sat mostly untouched since.
crossbeam-channelhas become the most popular alternative on crates.io, amassing over 30 million downloads. While crossbeam's channel is also complex, like all fast concurrent data structures, it avoids some of the major issues with the current implementation around dynamic flavor upgrades. The new implementation decides on the datastructure to be used when the channel is created, and the channel retains that structure until it is dropped.Replacing
sync::mpscwith a simpler, less performant implementation has been discussed as an alternative. However, Rust touts itself as enabling fearless concurrency, and having the standard library feature a subpar implementation of a core concurrency primitive doesn't feel right. The argument is that slower is better than broken, but this PR shows that we can do better.As mentioned before, the primary purpose of this PR is to fix #39364, and so the public API intentionally remains the same. After that problem is fixed, the fact that
sync::mpmcnow exists makes it easier to fix a limitation ofmpsc, the fact that it only supports a single consumer. spmc and mpmc are two other common concurrency patterns, and this change enables a path to deprecatingmpscand exposing a generalsync::channelmodule that supports multiple consumers. It also implements other useful methods such assend_timeout. That said, exposing MPMC and other new functionality is mostly out of scope for this PR and it would be helpful if discussion stays on topic :)For what it's worth, the new implementation has also been shown to be more performant in some basic benchmarks.
cc @taiki-e
r? rust-lang/libs