Merge pull request #659 from parijatsinha/bug-fix

Ensure ShardSyncTask invocation from ShardSyncTaskManager for pending ShardEnd
This commit is contained in:
ychunxue 2019-12-26 09:44:37 -08:00 committed by GitHub
commit bd59461c2c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 195 additions and 33 deletions

View file

@ -15,10 +15,12 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import com.amazonaws.services.kinesis.model.Shard;
import lombok.Getter;
@ -35,13 +37,12 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
* Kinesis shards, remove obsolete leases). We'll have at most one outstanding sync task at any time.
* Worker will use this class to kick off a sync task when it finds shards which have been completely processed.
*/
@Getter
class ShardSyncTaskManager {
@Getter class ShardSyncTaskManager {
private static final Log LOG = LogFactory.getLog(ShardSyncTaskManager.class);
private ITask currentTask;
private Future<TaskResult> future;
private CompletableFuture<TaskResult> future;
private final IKinesisProxy kinesisProxy;
private final ILeaseManager<KinesisClientLease> leaseManager;
private final IMetricsFactory metricsFactory;
@ -51,30 +52,28 @@ class ShardSyncTaskManager {
private boolean ignoreUnexpectedChildShards;
private final long shardSyncIdleTimeMillis;
private final ShardSyncer shardSyncer;
private final ReentrantLock lock;
private AtomicBoolean shardSyncRequestPending;
/**
* Constructor.
*
* @param kinesisProxy Proxy used to fetch streamInfo (shards)
* @param leaseManager Lease manager (used to list and create leases for shards)
* @param initialPositionInStream Initial position in stream
* @param kinesisProxy Proxy used to fetch streamInfo (shards)
* @param leaseManager Lease manager (used to list and create leases for shards)
* @param initialPositionInStream Initial position in stream
* @param cleanupLeasesUponShardCompletion Clean up leases for shards that we've finished processing (don't wait
* until they expire)
* @param ignoreUnexpectedChildShards Ignore child shards with open parents
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
* @param metricsFactory Metrics factory
* @param executorService ExecutorService to execute the shard sync tasks
* @param shardSyncer shardSyncer instance used to check and create new leases
* until they expire)
* @param ignoreUnexpectedChildShards Ignore child shards with open parents
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
* @param metricsFactory Metrics factory
* @param executorService ExecutorService to execute the shard sync tasks
* @param shardSyncer shardSyncer instance used to check and create new leases
*/
ShardSyncTaskManager(final IKinesisProxy kinesisProxy,
final ILeaseManager<KinesisClientLease> leaseManager,
ShardSyncTaskManager(final IKinesisProxy kinesisProxy, final ILeaseManager<KinesisClientLease> leaseManager,
final InitialPositionInStreamExtended initialPositionInStream,
final boolean cleanupLeasesUponShardCompletion,
final boolean ignoreUnexpectedChildShards,
final long shardSyncIdleTimeMillis,
final IMetricsFactory metricsFactory,
ExecutorService executorService,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIdleTimeMillis, final IMetricsFactory metricsFactory, ExecutorService executorService,
ShardSyncer shardSyncer) {
this.kinesisProxy = kinesisProxy;
this.leaseManager = leaseManager;
@ -85,13 +84,20 @@ class ShardSyncTaskManager {
this.executorService = executorService;
this.initialPositionInStream = initialPositionInStream;
this.shardSyncer = shardSyncer;
this.shardSyncRequestPending = new AtomicBoolean(false);
this.lock = new ReentrantLock();
}
synchronized Future<TaskResult> syncShardAndLeaseInfo(List<Shard> latestShards) {
return checkAndSubmitNextTask(latestShards);
Future<TaskResult> syncShardAndLeaseInfo(List<Shard> latestShards) {
try {
lock.lock();
return checkAndSubmitNextTask(latestShards);
} finally {
lock.unlock();
}
}
private synchronized Future<TaskResult> checkAndSubmitNextTask(List<Shard> latestShards) {
private Future<TaskResult> checkAndSubmitNextTask(List<Shard> latestShards) {
Future<TaskResult> submittedTaskFuture = null;
if ((future == null) || future.isCancelled() || future.isDone()) {
if ((future != null) && future.isDone()) {
@ -106,24 +112,48 @@ class ShardSyncTaskManager {
}
}
currentTask =
new MetricsCollectingTaskDecorator(new ShardSyncTask(kinesisProxy,
leaseManager,
initialPositionInStream,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis,
currentTask = new MetricsCollectingTaskDecorator(
new ShardSyncTask(kinesisProxy, leaseManager, initialPositionInStream,
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis,
shardSyncer, latestShards), metricsFactory);
future = executorService.submit(currentTask);
future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService)
.whenComplete((taskResult, exception) -> handlePendingShardSyncs(latestShards, exception));
if (LOG.isDebugEnabled()) {
LOG.debug("Submitted new " + currentTask.getTaskType() + " task.");
}
submittedTaskFuture = future;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Previous " + currentTask.getTaskType() + " task still pending. Not submitting new task.");
LOG.debug("Previous " + currentTask.getTaskType() + " task still pending. Not submitting new task. "
+ "Enqueued a request that will be executed when the current request completes.");
}
shardSyncRequestPending.compareAndSet(false /*expected*/, true /*update*/);
}
return submittedTaskFuture;
}
}
private void handlePendingShardSyncs(List<Shard> latestShards, Throwable exception) {
if (exception != null) {
LOG.error("Caught exception running " + currentTask.getTaskType() + " task: ", exception);
}
// Acquire lock here. If shardSyncRequestPending is false in this completionStage and
// syncShardAndLeaseInfo is invoked, before completion stage exits (future completes)
// but right after the value of shardSyncRequestPending is checked, it will result in
// shardSyncRequestPending being set to true, but no pending futures to trigger the next
// ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the
// previous task is in this completion stage, checkAndSubmitNextTask is not invoked
// until this completionStage exits.
try {
lock.lock();
if (shardSyncRequestPending.get()) {
shardSyncRequestPending.set(false);
// reset future to null, so next call creates a new one
// without trying to get results from the old future.
future = null;
checkAndSubmitNextTask(latestShards);
}
} finally {
lock.unlock();
}
}
}

View file

@ -0,0 +1,132 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import com.amazonaws.services.kinesis.model.Shard;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ShardSyncTaskManagerTest {
private static final InitialPositionInStreamExtended INITIAL_POSITION_IN_STREAM_EXTENDED = InitialPositionInStreamExtended
.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
private static final boolean CLEANUP_LEASES_SHARD_COMPLETION = Boolean.TRUE;
private static final boolean IGNORE_UNEXPECTED_CHILD_SHARDS = Boolean.TRUE;
private static final long SHARD_SYNC_IDLE_TIME_MILLIS = 0;
@Mock private IKinesisProxy mockKinesisProxy;
@Mock private ILeaseManager<KinesisClientLease> mockLeaseManager;
@Mock private IMetricsFactory mockMetricsFactory;
@Mock private IMetricsScope mockMetricsScope;
private ShardSyncTaskManager shardSyncTaskManager;
private PausableNoOpShardSyncer pausableNoOpShardSyncer;
private ShardSyncer mockShardSyncer;
@Before public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
when(mockMetricsFactory.createMetrics()).thenReturn(mockMetricsScope);
pausableNoOpShardSyncer = new PausableNoOpShardSyncer();
mockShardSyncer = mock(ShardSyncer.class, delegatesTo(pausableNoOpShardSyncer));
shardSyncTaskManager = new ShardSyncTaskManager(mockKinesisProxy, mockLeaseManager,
INITIAL_POSITION_IN_STREAM_EXTENDED, CLEANUP_LEASES_SHARD_COMPLETION, IGNORE_UNEXPECTED_CHILD_SHARDS,
SHARD_SYNC_IDLE_TIME_MILLIS, mockMetricsFactory, Executors.newSingleThreadExecutor(), mockShardSyncer);
}
@Test public void testShardSyncIdempotency() throws Exception {
shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>());
// Wait for ShardSyncer to be initialized.
pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch.await();
verify(mockShardSyncer, times(1))
.checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(),
anyBoolean(), Matchers.any());
// Invoke a few more times. This would flip shardSyncRequestPending to true in ShardSyncTaskManager.
int count = 0;
while (count++ < 5) {
shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>());
}
// Since blockShardSyncLatch is still blocked, previous ShardSyncTask is still running, hence no new invocations.
verify(mockShardSyncer, times(1))
.checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(),
anyBoolean(), Matchers.any());
// countdown and exit.
pausableNoOpShardSyncer.blockShardSyncLatch.countDown();
}
@Test public void testShardSyncRerunsForPendingRequests() throws Exception {
shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>());
// Wait for ShardSyncer to be initialized.
pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch.await();
verify(mockShardSyncer, times(1))
.checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(),
anyBoolean(), Matchers.any());
// Invoke a few more times. This would flip shardSyncRequestPending to true in ShardSyncTaskManager.
int count = 0;
while (count++ < 5) {
shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>());
}
pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch = new CountDownLatch(1);
// unblock pending shardSync so a new ShardSync is triggered.
pausableNoOpShardSyncer.blockShardSyncLatch.countDown();
// Wait for ShardSyncer to be initialized.
pausableNoOpShardSyncer.waitForShardSyncerInitializationLatch.await();
// There should be 1 more shardSyncer invocation after the previous shardSync completes.
verify(mockShardSyncer, times(2))
.checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(),
anyBoolean(), Matchers.any());
}
private static class PausableNoOpShardSyncer implements ShardSyncer {
private CountDownLatch blockShardSyncLatch;
private CountDownLatch waitForShardSyncerInitializationLatch;
PausableNoOpShardSyncer() {
this.blockShardSyncLatch = new CountDownLatch(1);
this.waitForShardSyncerInitializationLatch = new CountDownLatch(1);
}
@Override public void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager, InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
try {
waitForShardSyncerInitializationLatch.countDown();
blockShardSyncLatch.await();
} catch (InterruptedException e) {
// No-OP
}
}
@Override public void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager, InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, List<Shard> latestShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
this.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards);
}
}
}