Addressed review comments

This commit is contained in:
parijas 2019-12-17 11:40:20 -08:00
parent 2d6b92e8ac
commit 5b3078f801
2 changed files with 49 additions and 41 deletions

View file

@ -85,7 +85,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
this.initialPositionInStream = initialPositionInStream; this.initialPositionInStream = initialPositionInStream;
this.shardSyncer = shardSyncer; this.shardSyncer = shardSyncer;
this.shardSyncRequestPending = new AtomicBoolean(false); this.shardSyncRequestPending = new AtomicBoolean(false);
this.lock = new ReentrantLock(Boolean.TRUE); this.lock = new ReentrantLock();
} }
Future<TaskResult> syncShardAndLeaseInfo(List<Shard> latestShards) { Future<TaskResult> syncShardAndLeaseInfo(List<Shard> latestShards) {
@ -117,30 +117,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis,
shardSyncer, latestShards), metricsFactory); shardSyncer, latestShards), metricsFactory);
future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService) future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService)
.whenComplete((taskResult, exception) -> { .whenComplete((taskResult, exception) -> handlePendingShardSyncs(latestShards, exception));
if (exception != null) {
LOG.error("Caught exception running " + currentTask.getTaskType() + " task: ", exception);
}
// Acquire lock here. If shardSyncRequestPending is false in this completionStage and
// syncShardAndLeaseInfo is invoked, before completion stage exits (future completes)
// but right after the value of shardSyncRequestPending is checked, it will result in
// shardSyncRequestPending being set to true, but no pending futures to trigger the next
// ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the
// previous task is in this completion stage, checkAndSubmitNextTask is not invoked
// until this completionStage exits.
try {
lock.lock();
if (shardSyncRequestPending.get()) {
shardSyncRequestPending.set(false);
// reset future to null, so next call creates a new one
// without trying to get results from the old future.
future = null;
checkAndSubmitNextTask(latestShards);
}
} finally {
lock.unlock();
}
});
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Submitted new " + currentTask.getTaskType() + " task."); LOG.debug("Submitted new " + currentTask.getTaskType() + " task.");
} }
@ -154,4 +131,29 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
} }
return submittedTaskFuture; return submittedTaskFuture;
} }
private void handlePendingShardSyncs(List<Shard> latestShards, Throwable exception) {
if (exception != null) {
LOG.error("Caught exception running " + currentTask.getTaskType() + " task: ", exception);
}
// Acquire lock here. If shardSyncRequestPending is false in this completionStage and
// syncShardAndLeaseInfo is invoked, before completion stage exits (future completes)
// but right after the value of shardSyncRequestPending is checked, it will result in
// shardSyncRequestPending being set to true, but no pending futures to trigger the next
// ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the
// previous task is in this completion stage, checkAndSubmitNextTask is not invoked
// until this completionStage exits.
try {
lock.lock();
if (shardSyncRequestPending.get()) {
shardSyncRequestPending.set(false);
// reset future to null, so next call creates a new one
// without trying to get results from the old future.
future = null;
checkAndSubmitNextTask(latestShards);
}
} finally {
lock.unlock();
}
}
} }

View file

@ -29,7 +29,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class ShardSyncTaskManagerTest { public class ShardSyncTaskManagerTest {
private static final long SHARDSYNCER_INVOCATION_WAIT_SLEEP_MILLIS = 100;
private static final InitialPositionInStreamExtended INITIAL_POSITION_IN_STREAM_EXTENDED = InitialPositionInStreamExtended private static final InitialPositionInStreamExtended INITIAL_POSITION_IN_STREAM_EXTENDED = InitialPositionInStreamExtended
.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); .newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
private static final boolean CLEANUP_LEASES_SHARD_COMPLETION = Boolean.TRUE; private static final boolean CLEANUP_LEASES_SHARD_COMPLETION = Boolean.TRUE;
@ -42,15 +41,13 @@ public class ShardSyncTaskManagerTest {
@Mock private IMetricsScope mockMetricsScope; @Mock private IMetricsScope mockMetricsScope;
private ShardSyncTaskManager shardSyncTaskManager; private ShardSyncTaskManager shardSyncTaskManager;
private ShardSyncer pausableNoOpShardSyncer; private PausableNoOpShardSyncer pausableNoOpShardSyncer;
private ShardSyncer mockShardSyncer; private ShardSyncer mockShardSyncer;
private CountDownLatch countDownLatch;
@Before public void setup() { @Before public void setup() throws Exception {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
when(mockMetricsFactory.createMetrics()).thenReturn(mockMetricsScope); when(mockMetricsFactory.createMetrics()).thenReturn(mockMetricsScope);
countDownLatch = new CountDownLatch(1); pausableNoOpShardSyncer = new PausableNoOpShardSyncer();
pausableNoOpShardSyncer = new PausableNoOpShardSyncer(countDownLatch);
mockShardSyncer = mock(ShardSyncer.class, delegatesTo(pausableNoOpShardSyncer)); mockShardSyncer = mock(ShardSyncer.class, delegatesTo(pausableNoOpShardSyncer));
shardSyncTaskManager = new ShardSyncTaskManager(mockKinesisProxy, mockLeaseManager, shardSyncTaskManager = new ShardSyncTaskManager(mockKinesisProxy, mockLeaseManager,
INITIAL_POSITION_IN_STREAM_EXTENDED, CLEANUP_LEASES_SHARD_COMPLETION, IGNORE_UNEXPECTED_CHILD_SHARDS, INITIAL_POSITION_IN_STREAM_EXTENDED, CLEANUP_LEASES_SHARD_COMPLETION, IGNORE_UNEXPECTED_CHILD_SHARDS,
@ -59,7 +56,8 @@ public class ShardSyncTaskManagerTest {
@Test public void testShardSyncIdempotency() throws Exception { @Test public void testShardSyncIdempotency() throws Exception {
shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>()); shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>());
Thread.sleep(SHARDSYNCER_INVOCATION_WAIT_SLEEP_MILLIS); // small pause to wait for shardSyncer invocations. // Wait for ShardSyncer to be initialized.
pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch.await();
verify(mockShardSyncer, times(1)) verify(mockShardSyncer, times(1))
.checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(),
anyBoolean(), Matchers.any()); anyBoolean(), Matchers.any());
@ -68,15 +66,18 @@ public class ShardSyncTaskManagerTest {
while (count++ < 5) { while (count++ < 5) {
shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>()); shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>());
} }
// Since countDownLatch is still blocked, previous ShardSyncTask is still running, hence no new invocations. // Since blockShardSyncLatch is still blocked, previous ShardSyncTask is still running, hence no new invocations.
verify(mockShardSyncer, times(1)) verify(mockShardSyncer, times(1))
.checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(),
anyBoolean(), Matchers.any()); anyBoolean(), Matchers.any());
// countdown and exit.
pausableNoOpShardSyncer.blockShardSyncLatch.countDown();
} }
@Test public void testShardSyncRerunsForPendingRequests() throws Exception { @Test public void testShardSyncRerunsForPendingRequests() throws Exception {
shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>()); shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>());
Thread.sleep(SHARDSYNCER_INVOCATION_WAIT_SLEEP_MILLIS); // small pause to wait for shardSyncer invocations. // Wait for ShardSyncer to be initialized.
pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch.await();
verify(mockShardSyncer, times(1)) verify(mockShardSyncer, times(1))
.checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(),
anyBoolean(), Matchers.any()); anyBoolean(), Matchers.any());
@ -85,8 +86,11 @@ public class ShardSyncTaskManagerTest {
while (count++ < 5) { while (count++ < 5) {
shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>()); shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>());
} }
countDownLatch.countDown(); // Will unblock pending shardSync and a new ShardSync should be triggered. pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch = new CountDownLatch(1);
Thread.sleep(SHARDSYNCER_INVOCATION_WAIT_SLEEP_MILLIS); // small pause to wait for shardSyncer invocations. // unblock pending shardSync so a new ShardSync is triggered.
pausableNoOpShardSyncer.blockShardSyncLatch.countDown();
// Wait for ShardSyncer to be initialized.
pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch.await();
// There should be 1 more shardSyncer invocation after the previous shardSync completes. // There should be 1 more shardSyncer invocation after the previous shardSync completes.
verify(mockShardSyncer, times(2)) verify(mockShardSyncer, times(2))
.checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(), .checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(),
@ -95,10 +99,12 @@ public class ShardSyncTaskManagerTest {
private static class PausableNoOpShardSyncer implements ShardSyncer { private static class PausableNoOpShardSyncer implements ShardSyncer {
private final CountDownLatch countDownLatch; private CountDownLatch blockShardSyncLatch;
private CountDownLatch waitForShardSyncerInitializationLatch;
PausableNoOpShardSyncer(CountDownLatch countDownLatch) { PausableNoOpShardSyncer() {
this.countDownLatch = countDownLatch; this.blockShardSyncLatch = new CountDownLatch(1);
this.waitForShardSyncerInitializationLatch = new CountDownLatch(1);
} }
@Override public void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, @Override public void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
@ -107,7 +113,8 @@ public class ShardSyncTaskManagerTest {
throws DependencyException, InvalidStateException, ProvisionedThroughputException, throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException { KinesisClientLibIOException {
try { try {
countDownLatch.await(); waitForShardSyncerInitializationLatch.countDown();
blockShardSyncLatch.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
// No-OP // No-OP
} }
@ -122,5 +129,4 @@ public class ShardSyncTaskManagerTest {
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards); cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards);
} }
} }
} }