Merge pull request #83 from ashwing/ltr_1_shardsync_executor_overload_fix

Avoiding ShardSync Task sleep when we skip the shard sync due to no s…
This commit is contained in:
ashwing 2020-07-22 12:16:47 -07:00 committed by GitHub
commit e235777c17
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 18 additions and 10 deletions

View file

@ -28,7 +28,7 @@ import java.util.concurrent.ThreadPoolExecutor;
@ToString(exclude = "isThreadPoolExecutor") @ToString(exclude = "isThreadPoolExecutor")
@Slf4j @Slf4j
@KinesisClientInternalApi @KinesisClientInternalApi
class ExecutorStateEvent implements DiagnosticEvent { public class ExecutorStateEvent implements DiagnosticEvent {
private static final String MESSAGE = "Current thread pool executor state: "; private static final String MESSAGE = "Current thread pool executor state: ";
private boolean isThreadPoolExecutor; private boolean isThreadPoolExecutor;
@ -41,6 +41,11 @@ class ExecutorStateEvent implements DiagnosticEvent {
private int maximumPoolSize; private int maximumPoolSize;
ExecutorStateEvent(ExecutorService executor, LeaseCoordinator leaseCoordinator) { ExecutorStateEvent(ExecutorService executor, LeaseCoordinator leaseCoordinator) {
this(executor);
this.leasesOwned = leaseCoordinator.getAssignments().size();
}
public ExecutorStateEvent(ExecutorService executor) {
if (executor instanceof ThreadPoolExecutor) { if (executor instanceof ThreadPoolExecutor) {
this.isThreadPoolExecutor = true; this.isThreadPoolExecutor = true;
@ -52,8 +57,6 @@ class ExecutorStateEvent implements DiagnosticEvent {
this.largestPoolSize = ex.getLargestPoolSize(); this.largestPoolSize = ex.getLargestPoolSize();
this.maximumPoolSize = ex.getMaximumPoolSize(); this.maximumPoolSize = ex.getMaximumPoolSize();
} }
this.leasesOwned = leaseCoordinator.getAssignments().size();
} }
@Override @Override

View file

@ -97,6 +97,7 @@ public class HierarchicalShardSyncer {
/** /**
* Check and create leases for any new shards (e.g. following a reshard operation). Sync leases with Kinesis shards * Check and create leases for any new shards (e.g. following a reshard operation). Sync leases with Kinesis shards
* (e.g. at startup, or when we reach end of a shard). * (e.g. at startup, or when we reach end of a shard).
* Return true, if shard sync was performed. Return false, if shard sync is skipped.
* *
* @param shardDetector * @param shardDetector
* @param leaseRefresher * @param leaseRefresher
@ -109,18 +110,18 @@ public class HierarchicalShardSyncer {
* @throws KinesisClientLibIOException * @throws KinesisClientLibIOException
*/ */
// CHECKSTYLE:OFF CyclomaticComplexity // CHECKSTYLE:OFF CyclomaticComplexity
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, public synchronized boolean checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
final MetricsScope scope, final boolean ignoreUnexpectedChildShards, final boolean isLeaseTableEmpty) final MetricsScope scope, final boolean ignoreUnexpectedChildShards, final boolean isLeaseTableEmpty)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException, InterruptedException { throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException, InterruptedException {
final List<Shard> latestShards = isLeaseTableEmpty ? final List<Shard> latestShards = isLeaseTableEmpty ?
getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector); getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector);
checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, ignoreUnexpectedChildShards, scope, return checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, ignoreUnexpectedChildShards, scope,
isLeaseTableEmpty); isLeaseTableEmpty);
} }
//Provide a pre-collcted list of shards to avoid calling ListShards API //Provide a pre-collcted list of shards to avoid calling ListShards API
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, public synchronized boolean checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
List<Shard> latestShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty) List<Shard> latestShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
@ -131,7 +132,7 @@ public class HierarchicalShardSyncer {
log.debug("{} - Num shards: {}", streamIdentifier, latestShards.size()); log.debug("{} - Num shards: {}", streamIdentifier, latestShards.size());
} else { } else {
log.warn("Skipping shard sync for {} as no shards found from service.", streamIdentifier); log.warn("Skipping shard sync for {} as no shards found from service.", streamIdentifier);
return; return false;
} }
final Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(latestShards); final Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(latestShards);
@ -161,6 +162,7 @@ public class HierarchicalShardSyncer {
} }
final List<Lease> trackedLeases = new ArrayList<>(currentLeases); final List<Lease> trackedLeases = new ArrayList<>(currentLeases);
trackedLeases.addAll(newLeasesToCreate); trackedLeases.addAll(newLeasesToCreate);
return true;
} }
/** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls /** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls

View file

@ -67,11 +67,11 @@ public class ShardSyncTask implements ConsumerTask {
boolean shardSyncSuccess = true; boolean shardSyncSuccess = true;
try { try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, boolean didPerformShardSync = hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher,
initialPosition, scope, ignoreUnexpectedChildShards, initialPosition, scope, ignoreUnexpectedChildShards,
leaseRefresher.isLeaseTableEmpty()); leaseRefresher.isLeaseTableEmpty());
if (shardSyncTaskIdleTimeMillis > 0) { if (didPerformShardSync && shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis); Thread.sleep(shardSyncTaskIdleTimeMillis);
} }
} catch (Exception e) { } catch (Exception e) {

View file

@ -17,7 +17,6 @@ package software.amazon.kinesis.leases;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -27,6 +26,7 @@ import lombok.Data;
import lombok.NonNull; import lombok.NonNull;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.coordinator.ExecutorStateEvent;
import software.amazon.kinesis.lifecycle.ConsumerTask; import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
@ -186,6 +186,9 @@ public class ShardSyncTaskManager {
metricsFactory); metricsFactory);
future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService) future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService)
.whenComplete((taskResult, exception) -> handlePendingShardSyncs(exception, taskResult)); .whenComplete((taskResult, exception) -> handlePendingShardSyncs(exception, taskResult));
log.info(new ExecutorStateEvent(executorService).message());
submittedNewTask = true; submittedNewTask = true;
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Submitted new {} task.", currentTask.taskType()); log.debug("Submitted new {} task.", currentTask.taskType());