Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13015, BEAM-14184] Address unbounded number of messages being written to DirectStreamObserver before isReady is checked #17358

Merged
merged 3 commits into from
Apr 27, 2022

Conversation

lukecwik
Copy link
Member


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

…rectStreamObserver before isReady is checked
@lukecwik
Copy link
Member Author

R: @scwhittle

@github-actions github-actions bot added the java label Apr 12, 2022
@lukecwik lukecwik changed the title [BEAM-13015] Address unbounded number of messages being written to DirectStreamObserver before isReady is checked [BEAM-13015, BEAM-14184] Address unbounded number of messages being written to DirectStreamObserver before isReady is checked Apr 13, 2022
@lukecwik
Copy link
Member Author

Run Java PreCommit

3 similar comments
@lukecwik
Copy link
Member Author

Run Java PreCommit

@lukecwik
Copy link
Member Author

Run Java PreCommit

@lukecwik
Copy link
Member Author

Run Java PreCommit

Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
// There is a chance that we were spuriously woken up but the outboundObserver is no
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think await returns the current phase, can you use that instead?

nit: the wakeup doesn't have to be spurious, it could have been ready and thus notified but it is just no longer ready for some unrelated reason

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

int initialPhase = phase;
while (!outboundObserver.isReady()) {
try {
phaser.awaitAdvanceInterruptibly(phase, waitTime, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One possible difference is we are blocking waiting for isReady to transition while synchronizing on the outbound observer.
Would it be safer to synchronize on something internal to the DirectStreamObserver instead in case the outbound observer is synchronized upon elsewhere? For example what if the outbound observer (sometimes?) synchronizes on itself to transition from isReady=false to isReady=true but not for onNext? Or does grpc document syncronization for these observers and thus it doesn't matter?

If you don't think that is a concern for some reason, LGTM to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@lukecwik
Copy link
Member Author

Run Java PreCommit

if (totalTimeWaited > 0) {
// If the phase didn't change, this means that the installed onReady callback had not
// been invoked.
if (initialPhase == phaser.getPhase()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could just use phase instead of getPhase() here

@scwhittle
Copy link
Contributor

Run Java PreCommit

@lukecwik
Copy link
Member Author

Run Java PreCommit

1 similar comment
@lukecwik
Copy link
Member Author

Run Java PreCommit

@lukecwik lukecwik merged commit adfa113 into apache:master Apr 27, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants