-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-13015, BEAM-14184] Address unbounded number of messages being written to DirectStreamObserver before isReady is checked #17358
Conversation
…rectStreamObserver before isReady is checked
|
R: @scwhittle |
|
Run Java PreCommit |
3 similar comments
|
Run Java PreCommit |
|
Run Java PreCommit |
|
Run Java PreCommit |
| Thread.currentThread().interrupt(); | ||
| throw new RuntimeException(e); | ||
| } | ||
| // There is a chance that we were spuriously woken up but the outboundObserver is no |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think await returns the current phase, can you use that instead?
nit: the wakeup doesn't have to be spurious, it could have been ready and thus notified but it is just no longer ready for some unrelated reason
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| int initialPhase = phase; | ||
| while (!outboundObserver.isReady()) { | ||
| try { | ||
| phaser.awaitAdvanceInterruptibly(phase, waitTime, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One possible difference is we are blocking waiting for isReady to transition while synchronizing on the outbound observer.
Would it be safer to synchronize on something internal to the DirectStreamObserver instead in case the outbound observer is synchronized upon elsewhere? For example what if the outbound observer (sometimes?) synchronizes on itself to transition from isReady=false to isReady=true but not for onNext? Or does grpc document syncronization for these observers and thus it doesn't matter?
If you don't think that is a concern for some reason, LGTM to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
Run Java PreCommit |
| if (totalTimeWaited > 0) { | ||
| // If the phase didn't change, this means that the installed onReady callback had not | ||
| // been invoked. | ||
| if (initialPhase == phaser.getPhase()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could just use phase instead of getPhase() here
|
Run Java PreCommit |
|
Run Java PreCommit |
1 similar comment
|
Run Java PreCommit |
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.