Add some more comments around possible race conditions.
This commit is contained in:
parent
bd77f9a7ac
commit
bcfc325364
1 changed files with 10 additions and 3 deletions
|
|
@ -538,10 +538,12 @@ public class Worker implements Runnable {
|
||||||
*/
|
*/
|
||||||
public Future<Void> requestShutdown() {
|
public Future<Void> requestShutdown() {
|
||||||
|
|
||||||
|
//
|
||||||
|
// Stop accepting new leases. Once we do this we can be sure that
|
||||||
|
// no more leases will be acquired.
|
||||||
|
//
|
||||||
leaseCoordinator.stopLeaseTaker();
|
leaseCoordinator.stopLeaseTaker();
|
||||||
//
|
|
||||||
// Stop accepting new leases
|
|
||||||
//
|
|
||||||
Collection<KinesisClientLease> leases = leaseCoordinator.getAssignments();
|
Collection<KinesisClientLease> leases = leaseCoordinator.getAssignments();
|
||||||
if (leases == null || leases.isEmpty()) {
|
if (leases == null || leases.isEmpty()) {
|
||||||
//
|
//
|
||||||
|
|
@ -560,6 +562,11 @@ public class Worker implements Runnable {
|
||||||
if (consumer != null) {
|
if (consumer != null) {
|
||||||
consumer.notifyShutdownRequested(shutdownNotification);
|
consumer.notifyShutdownRequested(shutdownNotification);
|
||||||
} else {
|
} else {
|
||||||
|
//
|
||||||
|
// There is a race condition between retrieving the current assignments, and creating the
|
||||||
|
// notification. If the a lease is lost in between these two points, we explicitly decrement the
|
||||||
|
// notification latches to clear the shutdown.
|
||||||
|
//
|
||||||
notificationCompleteLatch.countDown();
|
notificationCompleteLatch.countDown();
|
||||||
shutdownCompleteLatch.countDown();
|
shutdownCompleteLatch.countDown();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue