Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge crossbeam-channel into std::sync::mpsc #93563

Merged
merged 14 commits into from Nov 13, 2022

Conversation

ibraheemdev
Copy link
Contributor

@ibraheemdev ibraheemdev commented Feb 1, 2022

This PR imports the crossbeam-channel crate into the standard library as a private module, sync::mpmc. sync::mpsc is now implemented as a thin wrapper around sync::mpmc. The primary purpose of this PR is to resolve #39364, so there are no public API changes.

The reason #39364 has not been fixed in over 5 years is that the current channel is incredibly complex. It was written many years ago and has sat mostly untouched since. crossbeam-channel has become the most popular alternative on crates.io, amassing over 30 million downloads. While crossbeam's channel is also complex, like all fast concurrent data structures, it avoids some of the major issues with the current implementation around dynamic flavor upgrades. The new implementation decides on the datastructure to be used when the channel is created, and the channel retains that structure until it is dropped.

Replacing sync::mpsc with a simpler, less performant implementation has been discussed as an alternative. However, Rust touts itself as enabling fearless concurrency, and having the standard library feature a subpar implementation of a core concurrency primitive doesn't feel right. The argument is that slower is better than broken, but this PR shows that we can do better.

As mentioned before, the primary purpose of this PR is to fix #39364, and so the public API intentionally remains the same. After that problem is fixed, the fact that sync::mpmc now exists makes it easier to fix a limitation of mpsc, the fact that it only supports a single consumer. spmc and mpmc are two other common concurrency patterns, and this change enables a path to deprecating mpsc and exposing a general sync::channel module that supports multiple consumers. It also implements other useful methods such as send_timeout. That said, exposing MPMC and other new functionality is mostly out of scope for this PR and it would be helpful if discussion stays on topic :)

For what it's worth, the new implementation has also been shown to be more performant in some basic benchmarks.

cc @taiki-e

r? rust-lang/libs

@rust-highfive rust-highfive added the S-waiting-on-review Status: Awaiting review from the assignee but also interested parties. label Feb 1, 2022
@joshtriplett
Copy link
Member

joshtriplett commented Feb 1, 2022

Thanks for working on this!

I'd like to avoid adding any new public API (like mpmc) in the course of fixing mpsc, even if it's unstable. Let's focus on making mpsc less broken.

Also, the phrasing of the PR seems unfortunate; crossbeam channel has lots of users, but that doesn't make it "the defacto replacement".

@ibraheemdev
Copy link
Contributor Author

ibraheemdev commented Feb 1, 2022

I'd like to avoid adding any new public API (like mpmc) in the course of fixing mpsc, even if it's unstable. Let's focus on making mpsc less broken.

sync::mpmc is private. Avoiding any public API changes is an explicit goal of this PR.

Also, the phrasing of the PR seems unfortunate; crossbeam channel has lots of users, but that doesn't make it "the defacto replacement".

True, I just meant to say it is the most popular replacement. I'll update the description.

@joshtriplett joshtriplett added I-libs-api-nominated Indicates that an issue has been nominated for discussion during a libs-api team meeting. T-libs-api Relevant to the library API team, which will review and decide on the PR/issue. labels Feb 2, 2022
@jyn514
Copy link
Member

jyn514 commented Feb 2, 2022

This PR imports the crossbeam-channel crate into the standard library as a private module, sync::mpmc.

Rather than copying the whole crate, you can add it as an internal dependency and re-export it; that will make it easier to update in the future. See hashbrown in library/std/src/collections/hash/map.rs for an example.

@ibraheemdev
Copy link
Contributor Author

ibraheemdev commented Feb 2, 2022

@jyn514 The problem with that is crossbeam-channel depends on crossbeam-utils, which depends on lazy_static and includes many utilities that we doesn't need. sync::mpmc doesn't contain the entire crate, only the parts necessary to bounded, unbounded, and rendezvous channels. This could work if a crossbeam-channel-core crate + some feature flags is created that we depend on, but we discussed this before and would like to see if the libs team is generally in support of this change before worrying about integration details.

@bjorn3
Copy link
Contributor

bjorn3 commented Feb 2, 2022

Crossbeam-channel also directly depends on libstd for getting the current thread id: https://github.com/crossbeam-rs/crossbeam/blob/85d0bdc95cdac6450ac1ba4ed587f362a5235e60/crossbeam-channel/src/waker.rs#L274 It probably also depends on other parts of libstd to wake up the right thread.

@the8472
Copy link
Contributor

the8472 commented Feb 2, 2022

There is a UI test (src/test/ui/threads-sendsync/mpsc_stress.rs) that is partially disabled. This PR should enable it. And make sure that it still detects the bug (red-green).

@SimonSapin
Copy link
Contributor

SimonSapin commented Feb 3, 2022

sync::mpmc contains a couple thousands lines of code and is effectively of fork of (part of) crossbeam-channel. What is the plan for both of their maintenance? Should bug fixes in one be manually ported to the other? Or is the intent that some plan will be made at a later date to go back to a single "source of truth" in terms of source code repositories?

If the latter, would it make sense for rust-lang/rust to be the source of truth for some pieces of code that are both used in the standard library (but not exposed, so no API stability requirement) and separately published to crates.io (where semver-breaking version numbers can be used as needed) ?

This could be a way to resolve the conflict between: the std crate wants to use something from crates.io, but that crate needs API (like thread IDs) that’s only exposed publicly by std.

Copy link
Member

@thomcc thomcc left a comment

I'm surprised this code does as much dubious spinning as it does. In practice most of it you'd expect not to hurt that badly (which matches my experience with crossbeam), but on some arches a few of the placements of backoff.spin() could have bad behavior.

The spinlock was a surprise though. I had no idea crossbeam had one.

}
}

const SPIN_LIMIT: u32 = 6;
Copy link
Member

@thomcc thomcc Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 << 6 calls to hint::spin_loop() is... very aggressive. Those calls tend to be a few hundred cycles. Actually, it's 1 << 6 + 1 << 5 + 1 << 4 + ... before it falls back to pinging thread::yield(), which is not really ideal either — "switch to random thread" is almost always bad behavior in real code.

Copy link
Contributor Author

@ibraheemdev ibraheemdev Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the code to use quadratic instead of exponential backoff (437ea34). The fallback to thread::yield is only in blocking loops, where there isn't really any other option..

}
Err(h) => {
head = h;
backoff.spin();
Copy link
Member

@thomcc thomcc Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an antipattern to spin in the failure case of a compare_exchange_weak loop like this (there are a few others too in this code). For those you should just update the start var and continue to retry immediately. There are a lot of reasons doing it the way you have is bad:

  1. The code just got an updated head value right before waiting. The best time to use it was right after getting the updated value, not later. Waiting will increase the likelihood that that value becomes stale, especially as the spin count gets higher.

    If you look at the asm on arm, you can see that in these cases it could easly just repeatedly end up repeatedly doing ldrex, bailing out and clrex+spinning, without ever getting to even try the strex https://rust.godbolt.org/z/aYYEKd6rE

  2. You're basically not allowed to do anything complex inside the ll/sc loop so in practice calling the backoff here makes the code much worse as it doubles-up the loop to manage this.

  3. The compare_exchange was weak so it may have a totally spurious failure anyway.

  4. Not an issue here, but it can cause livelock if you're unlucky and the backoff count and atomic are on the same line.

In practice most people should use AtomicFoo::fetch_update for this stuff which gets all of this right, and is less tedious, and expresses intent better too. That said, this loop does enough that it might be hard to make that way.

The other usage function might be okay -- or, I don't love it (it seems like it would just make things worse here... is there a specific situation that was trying to be avoided), but this one is wrong.

(I will note that the implementations that are cited as reference don't have it, and it's surprising to see something like this, since I think it doesn't actually have to wait for other threads to make progress...)

Copy link
Contributor

@jonhoo jonhoo Feb 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an aside, this is great advice and something I haven't seen very well documented anywhere. Any chance you'd consider writing this up as a blog post, or better yet, find a way to add it to the std docs?

Copy link
Member

@Amanieu Amanieu Feb 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is actually a good reason for spinning on CAS failure: if a thread encounters a lot of CAS failures then it means that the atomic it is trying to access is heavily contended. In such a situation where multiple threads are trying to access the same cacheline, you can significantly improve throughput by backing off and letting a single thread access the atomic at a time.

Essentially the access patterns go from:

Thread A
Thread B
Thread A
Thread B
Thread A
Thread B
Thread A
Thread B

to:

Thread A
Thread A
Thread A
Thread A
Thread B
Thread B
Thread B
Thread B

This improves throughput since the cacheline stays on the same CPU across multiple accesses.

This optimization looks good in benchmarks (I do something similar in parking_lot) but it is unclear how much impact it has on real world programs. I don't even know where to begin measuring this.

Copy link
Contributor Author

@ibraheemdev ibraheemdev Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the code to reload the head after spinning to reduce the likelihood of it being stale (0ae5d53).

const YIELD_LIMIT: u32 = 10;

/// Performs exponential backoff in spin loops.
pub struct Backoff {
Copy link
Member

@thomcc thomcc Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep this code in the tree, please indicate that this shouldn't be used or emulated elsewhere in libstd?

/// Waits until a message is written into the slot.
fn wait_write(&self) {
let backoff = Backoff::new();
while self.state.load(Ordering::Acquire) & WRITE == 0 {
Copy link
Member

@thomcc thomcc Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was deeeply concerning at first, but in practice I think you'd expect this to only very rarely need to go into the loop given where it's called? Still...

Err(h) => {
head = h;
block = self.head.block.load(Ordering::Acquire);
backoff.spin();
Copy link
Member

@thomcc thomcc Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing i said about spinning in compare_exchange_weak applies here.

Copy link
Contributor Author

@ibraheemdev ibraheemdev Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same fix as above.

library/std/src/sync/mpmc/utils.rs Outdated Show resolved Hide resolved
@thomcc
Copy link
Member

thomcc commented Feb 4, 2022

All that said, this code is vastly easier for me to follow than std::sync::mpsc — most of my complaints are relatively minor or easily fixed problems.

@m-ou-se m-ou-se removed the I-libs-api-nominated Indicates that an issue has been nominated for discussion during a libs-api team meeting. label Feb 16, 2022
@m-ou-se
Copy link
Member

m-ou-se commented Feb 16, 2022

This PR was briefly discussed in the library API meeting last week. We didn't reach any conclusion about future API changes like deprecation or adding an mpmc channel. That can be discussed separately if someone wants to put in the time and energy. We see this PR itself basically as a bugfix, requiring no FCP. A crater run might be useful though, and it's probably good if there's more than one reviewer. (Thanks @thomcc for helping out with reviewing!)

@saethlin
Copy link
Contributor

saethlin commented Feb 16, 2022

@ibraheemdev You're asking people for review on the community Discord; I'm not really qualified to review this but I can offer some feedback.

In internal codebases, we make extensive use of crossbeam channels to communicate between well-named threads. This has the nice benefit that you can run top -H -p PID to get a very quick and very dirty visualization of what's taking up time in a running system. The downside is that crossbeam spins a lot, so certain components spend most of their time on CPU according to top spinning for messages. And therefore, the codebases are littered with this pattern:

loop {
    match rx.try_recv() {
        Ok(msg) => process(msg),
        Err(TryRecvError::Empty) => sleep(std::time::Duration::from_millis(1)),
        Err(TryRecvError::Disconnected) => break,        
    }
}

I've seen this pattern drop the top-reported CPU usage of a thread from 20% to 5%. But it is not at all clear whether this is beneficial for the behavior of the application. I can certainly imagine how this would result in throughput bubbles as objects accumulate in the channel over the course of the sleep.

I'm concerned that putting a spinny channel implementation that does awesome on bechmarks into std where it will hopefully be used by a lot of beginners and may cause said beginners to mis-assess the resource utilization/requirements of their code. "I upgraded to Rust 1.61 and my CPU usage doubled" is not a Reddit post I want to see. Perhaps even worse would be comments suggesting this sleep-on-empty pattern.

@m-ou-se m-ou-se self-requested a review Feb 16, 2022
@saethlin
Copy link
Contributor

saethlin commented Feb 17, 2022

I was asked for a reproducer of the above. I thought this would be hard but... it's not? Here's a ~25x reduction in CPU usage from the sleepy pattern over crossbeam defaults:
EDIT: Also included times for current std::sync::mpsc, the pattern has a similar effect but it seems std::sync::mpsc is both less spinny and more CPU intensive for the sleepy pattern.

fn main() {
    let (tx, rx) = crossbeam_channel::unbounded();
    for _ in 0..100 {
        std::thread::Builder::new()
            .name("worker".to_string())
            .spawn({
                let tx = tx.clone();
                move || {
                    for i in 0..usize::MAX {
                        tx.send(i).unwrap();
                        std::thread::sleep(std::time::Duration::from_millis(1));
                    }
                }
            })
            .unwrap();
    }

    let mut sum = 0usize;

    // This impl uses ~26% of a core in top for crossbeam
    // and ~11% of a thread for std::sync::mpsc
    for v in rx.iter() {
        sum = sum.wrapping_add(v);
    }

    // This impl uses ~0.9% of a core in top for crossbeam
    // and ~1.8% in current std::sync::mpsc
    /*
    loop {
        match rx.try_recv() {
            Ok(v) => {
                sum = sum.wrapping_add(v);
            }
            Err(crossbeam_channel::TryRecvError::Empty) => {
                std::thread::sleep(std::time::Duration::from_millis(1));
            }
            Err(crossbeam_channel::TryRecvError::Disconnected) => {
                break;
            }
        }
    }
    */

    println!("{}", sum);
}

@kpreid
Copy link
Contributor

kpreid commented Feb 17, 2022

I'm concerned that putting a spinny channel implementation that does awesome on bechmarks into std where it will hopefully be used by a lot of beginners and may cause said beginners to mis-assess the resource utilization/requirements of their code. "I upgraded to Rust 1.61 and my CPU usage doubled" is not a Reddit post I want to see. Perhaps even worse would be comments suggesting this sleep-on-empty pattern.

Any applications which are more concerned with energy usage than throughput (which could include something as non-specialized as Rust in WASM in a web page that happens to be opened on a smartphone, as well as embedded applications) would correctly prefer to sleep rather than spin. This seems like an area where std should not be choosing the other option — users looking for high throughput can easily benchmark wall-time and choose to use a different implementation, but energy usage is harder to measure, and more often overlooked.

(Disclaimer: I'm not an expert in optimizing for either of the above; I've just heard a few things, and am trying to make sure this consideration isn't entirely overlooked.)

@m-ou-se m-ou-se assigned Amanieu and m-ou-se and unassigned yaahc Feb 17, 2022
@m-ou-se
Copy link
Member

m-ou-se commented Feb 17, 2022

Assigning @Amanieu and myself to review this, since we're both used to working with concurrency/atomics.

@bors
Copy link
Contributor

bors commented Mar 11, 2022

The latest upstream changes (presumably #94824) made this pull request unmergeable. Please resolve the merge conflicts.

@JohnCSimon JohnCSimon added S-waiting-on-review Status: Awaiting review from the assignee but also interested parties. and removed S-waiting-on-review Status: Awaiting review from the assignee but also interested parties. labels Apr 11, 2022
@ibraheemdev
Copy link
Contributor Author

ibraheemdev commented Nov 13, 2022

@rustbot ready

@rustbot rustbot added S-waiting-on-review Status: Awaiting review from the assignee but also interested parties. and removed S-waiting-on-author Status: This is awaiting some action (such as code changes or more information) from the author. labels Nov 13, 2022
@thomcc
Copy link
Member

thomcc commented Nov 13, 2022

Changes look fine.

@bors r=Amanieu

@bors
Copy link
Contributor

bors commented Nov 13, 2022

📌 Commit a2f58ab has been approved by Amanieu

It is now in the queue for this repository.

@bors bors added S-waiting-on-bors Status: Waiting on bors to run and complete tests. Bors will change the label on completion. and removed S-waiting-on-review Status: Awaiting review from the assignee but also interested parties. labels Nov 13, 2022
@bors
Copy link
Contributor

bors commented Nov 13, 2022

Testing commit a2f58ab with merge afd7977...

@bors
Copy link
Contributor

bors commented Nov 13, 2022

☀️ Test successful - checks-actions
Approved by: Amanieu
Pushing afd7977 to master...

@bors bors added the merged-by-bors This PR was explicitly merged by bors label Nov 13, 2022
@bors bors merged commit afd7977 into rust-lang:master Nov 13, 2022
11 checks passed
@rustbot rustbot added this to the 1.67.0 milestone Nov 13, 2022
@rust-timer
Copy link
Collaborator

rust-timer commented Nov 13, 2022

Finished benchmarking commit (afd7977): comparison URL.

Overall result: regressions and improvements - ACTION NEEDED

Next Steps: If you can justify the regressions found in this perf run, please indicate this with @rustbot label: +perf-regression-triaged along with sufficient written justification. If you cannot justify the regressions please open an issue or create a new PR that fixes the regressions, add a comment linking to the newly created issue or PR, and then add the perf-regression-triaged label to this PR.

@rustbot label: +perf-regression
cc @rust-lang/wg-compiler-performance

Instruction count

This is a highly reliable metric that was used to determine the overall result at the top of this comment.

mean range count
Regressions
(primary)
0.8% [0.4%, 1.1%] 5
Regressions
(secondary)
0.5% [0.5%, 0.5%] 1
Improvements
(primary)
-0.7% [-1.1%, -0.3%] 3
Improvements
(secondary)
- - 0
All (primary) 0.2% [-1.1%, 1.1%] 8

Max RSS (memory usage)

Results

This is a less reliable metric that may be of interest but was not used to determine the overall result at the top of this comment.

mean range count
Regressions
(primary)
1.4% [0.1%, 2.8%] 2
Regressions
(secondary)
3.4% [3.2%, 3.7%] 3
Improvements
(primary)
-1.3% [-1.3%, -1.3%] 2
Improvements
(secondary)
-5.6% [-8.5%, -1.0%] 4
All (primary) 0.1% [-1.3%, 2.8%] 4

Cycles

Results

This is a less reliable metric that may be of interest but was not used to determine the overall result at the top of this comment.

mean range count
Regressions
(primary)
1.1% [1.1%, 1.1%] 2
Regressions
(secondary)
- - 0
Improvements
(primary)
-1.3% [-1.3%, -1.3%] 1
Improvements
(secondary)
- - 0
All (primary) 0.3% [-1.3%, 1.1%] 3

@rustbot rustbot added the perf-regression Performance regressions label Nov 13, 2022
@nnethercote
Copy link
Contributor

nnethercote commented Nov 13, 2022

There is a smattering of regressions and improvements, nothing really notable or concerning.

@rustbot label: +perf-regression-triaged

@rustbot rustbot added the perf-regression-triaged The performance regression has been triaged. label Nov 13, 2022
@RalfJung
Copy link
Member

RalfJung commented Nov 14, 2022

As part of this diff, a memory leak was introduced, at least according to Miri when running the std tests filtered for time:: sync:: thread:: env::. The most likely candidate is this PR.

Is this expected (some kind of management memory that is deliberately not being deallocated, or a test that deliberately leaks) or something to look into? See here for the contents of the leaked memory.

@joboet
Copy link
Contributor

joboet commented Nov 14, 2022

As part of this diff, a memory leak was introduced, at least according to Miri when running the std tests filtered for time:: sync:: thread:: env::. The most likely candidate is this PR.

Is this expected (some kind of management memory that is deliberately not being deallocated, or a test that deliberately leaks) or something to look into? See here for the contents of the leaked memory.

I think this is expected, the test for #39364 leaks a Sender.

@RalfJung
Copy link
Member

RalfJung commented Nov 14, 2022

Is there a way to write that test to clean up afterwards?

We could disable the leak checker in Miri, but then we'd miss accidental leaks in other tests.
I'm thinking of something like this but I am not sure if that test still tests what it is supposed to test. It panics on stable though so I guess it does?

@joboet
Copy link
Contributor

joboet commented Nov 14, 2022

Is there a way to write that test to clean up afterwards?

We could disable the leak checker in Miri, but then we'd miss accidental leaks in other tests. I'm thinking of something like this but I am not sure if that test still tests what it is supposed to test. It panics on stable though so I guess it does?

If I understand the issue correctly, that test would still catch it. The important part seems to be to upgrade the channel (from a second thread) just before a receive operation times out. As long as the channel is not disconnected before the next receive attempt, the issue would still occur.

@RalfJung
Copy link
Member

RalfJung commented Nov 14, 2022

All right, submitted as #104401.

@jonhoo
Copy link
Contributor

jonhoo commented Nov 17, 2022

This feels like it'd be worth a relnotes tag?

@Mark-Simulacrum Mark-Simulacrum added the relnotes Marks issues that should be documented in the release notes of the next release. label Nov 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-meta Area: Issues about the rust-lang/rust repository. I-libs-nominated Indicates that an issue has been nominated for discussion during a libs team meeting. merged-by-bors This PR was explicitly merged by bors perf-regression Performance regressions perf-regression-triaged The performance regression has been triaged. relnotes Marks issues that should be documented in the release notes of the next release. S-waiting-on-bors Status: Waiting on bors to run and complete tests. Bors will change the label on completion. T-libs-api Relevant to the library API team, which will review and decide on the PR/issue.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Panic in Receiver::recv()