diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index f3d0d1f2..3ca940ee 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -530,8 +530,7 @@ class ConsumerStates { consumer.isIgnoreUnexpectedChildShards(), consumer.getLeaseManager(), consumer.getTaskBackoffTimeMillis(), - consumer.getGetRecordsCache(), - consumer.getShardSyncer()); + consumer.getGetRecordsCache(), consumer.getShardSyncer(), consumer.getShardSyncStrategy()); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DeterministicShuffleShardSyncLeaderDecider.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DeterministicShuffleShardSyncLeaderDecider.java new file mode 100644 index 00000000..e52f2372 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DeterministicShuffleShardSyncLeaderDecider.java @@ -0,0 +1,153 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.time.Instant; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BooleanSupplier; +import java.util.stream.Collectors; + +import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; +import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; +import com.amazonaws.util.CollectionUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * An implementation of the {@code LeaderDecider} to elect leader(s) based on workerId. + * Leases are shuffled using a predetermined constant seed so that lease ordering is + * preserved across workers. + * This reduces the probability of choosing the leader workers co-located on the same + * host in case workerId starts with a common string (e.g. IP Address). + * Hence if a host has 3 workers, IPADDRESS_Worker1, IPADDRESS_Worker2, and IPADDRESS_Worker3, + * we don't end up choosing all 3 for shard sync as a result of natural ordering of Strings. + * This ensures redundancy for shard-sync during host failures. + */ +class DeterministicShuffleShardSyncLeaderDecider implements LeaderDecider { + // Fixed seed so that the shuffle order is preserved across workers + static final int DETERMINISTIC_SHUFFLE_SEED = 1947; + private static final Log LOG = LogFactory.getLog(DeterministicShuffleShardSyncLeaderDecider.class); + + private static final long ELECTION_INITIAL_DELAY_MILLIS = 60 * 1000; + private static final long ELECTION_SCHEDULING_INTERVAL_MILLIS = 5 * 60 * 1000; + private static final int AWAIT_TERMINATION_MILLIS = 5000; + + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + + private final ILeaseManager leaseManager; + private final int numPeriodicShardSyncWorkers; + private final ScheduledExecutorService leaderElectionThreadPool; + + private volatile Set leaders; + + /** + * + * @param leaseManager LeaseManager instance used to fetch leases. + * @param leaderElectionThreadPool Thread-pool to be used for leaderElection. + * @param numPeriodicShardSyncWorkers Number of leaders that will be elected to perform periodic shard syncs. + */ + DeterministicShuffleShardSyncLeaderDecider(ILeaseManager leaseManager, ScheduledExecutorService leaderElectionThreadPool, + int numPeriodicShardSyncWorkers) { + this.leaseManager = leaseManager; + this.leaderElectionThreadPool = leaderElectionThreadPool; + this.numPeriodicShardSyncWorkers = numPeriodicShardSyncWorkers; + } + + /* + * Shuffles the leases deterministically and elects numPeriodicShardSyncWorkers number of workers + * as leaders (workers that will perform shard sync). + */ + private void electLeaders() { + try { + LOG.debug("Started leader election at: " + Instant.now()); + List leases = leaseManager.listLeases(); + List uniqueHosts = leases.stream().map(KinesisClientLease::getLeaseOwner) + .filter(owner -> owner != null).distinct().sorted().collect(Collectors.toList()); + + Collections.shuffle(uniqueHosts, new Random(DETERMINISTIC_SHUFFLE_SEED)); + int numShardSyncWorkers = Math.min(uniqueHosts.size(), numPeriodicShardSyncWorkers); + // In case value is currently being read, we wait for reading to complete before updating the variable. + // This is to prevent any ConcurrentModificationException exceptions. + readWriteLock.writeLock().lock(); + leaders = new HashSet<>(uniqueHosts.subList(0, numShardSyncWorkers)); + LOG.info("Elected leaders: " + String.join(", ", leaders)); + LOG.debug("Completed leader election at: " + System.currentTimeMillis()); + } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { + LOG.error("Exception occurred while trying to fetch all leases for leader election", e); + } catch (Throwable t) { + LOG.error("Unknown exception during leader election.", t); + } finally { + readWriteLock.writeLock().unlock(); + } + } + + private boolean isWorkerLeaderForShardSync(String workerId) { + return CollectionUtils.isNullOrEmpty(leaders) || + (!CollectionUtils.isNullOrEmpty(leaders) && leaders.contains(workerId)); + } + + @Override + public synchronized Boolean isLeader(String workerId) { + // if no leaders yet, synchronously get leaders. This will happen at first Shard Sync. + if (executeConditionCheckWithReadLock(() -> CollectionUtils.isNullOrEmpty(leaders))) { + electLeaders(); + // start a scheduled executor that will periodically update leaders. + // The first run will be after a minute. + // We don't need jitter since it is scheduled with a fixed delay and time taken to scan leases + // will be different at different times and on different hosts/workers. + leaderElectionThreadPool.scheduleWithFixedDelay(this::electLeaders, ELECTION_INITIAL_DELAY_MILLIS, + ELECTION_SCHEDULING_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + } + return executeConditionCheckWithReadLock(() -> isWorkerLeaderForShardSync(workerId)); + } + + @Override + public synchronized void shutdown() { + try { + leaderElectionThreadPool.shutdown(); + if (leaderElectionThreadPool.awaitTermination(AWAIT_TERMINATION_MILLIS, TimeUnit.MILLISECONDS)) { + LOG.info("Successfully stopped leader election on the worker"); + } else { + leaderElectionThreadPool.shutdownNow(); + LOG.info(String.format("Stopped leader election thread after awaiting termination for %d milliseconds", + AWAIT_TERMINATION_MILLIS)); + } + + } catch (InterruptedException e) { + LOG.debug("Encountered InterruptedException while awaiting leader election threadPool termination"); + } + } + + // Execute condition checks using shared variables under a read-write lock. + private boolean executeConditionCheckWithReadLock(BooleanSupplier action) { + try { + readWriteLock.readLock().lock(); + return action.getAsBoolean(); + } finally { + readWriteLock.readLock().unlock(); + } + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStreamExtended.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStreamExtended.java index c575842e..6cb6a706 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStreamExtended.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStreamExtended.java @@ -20,7 +20,7 @@ import java.util.Date; * Class that houses the entities needed to specify the position in the stream from where a new application should * start. */ -class InitialPositionInStreamExtended { +public class InitialPositionInStreamExtended { private final InitialPositionInStream position; private final Date timestamp; @@ -44,7 +44,7 @@ class InitialPositionInStreamExtended { * * @return The initial position in stream. */ - protected InitialPositionInStream getInitialPositionInStream() { + public InitialPositionInStream getInitialPositionInStream() { return this.position; } @@ -54,11 +54,11 @@ class InitialPositionInStreamExtended { * * @return The timestamp from where we need to start the application. */ - protected Date getTimestamp() { + public Date getTimestamp() { return this.timestamp; } - protected static InitialPositionInStreamExtended newInitialPosition(final InitialPositionInStream position) { + public static InitialPositionInStreamExtended newInitialPosition(final InitialPositionInStream position) { switch (position) { case LATEST: return new InitialPositionInStreamExtended(InitialPositionInStream.LATEST, null); @@ -69,7 +69,7 @@ class InitialPositionInStreamExtended { } } - protected static InitialPositionInStreamExtended newInitialPositionAtTimestamp(final Date timestamp) { + public static InitialPositionInStreamExtended newInitialPositionAtTimestamp(final Date timestamp) { if (timestamp == null) { throw new IllegalArgumentException("Timestamp must be specified for InitialPosition AT_TIMESTAMP"); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 4339c358..99d76b8f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -22,7 +22,6 @@ import org.apache.commons.lang3.Validate; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.regions.RegionUtils; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; @@ -167,6 +166,11 @@ public class KinesisClientLibConfiguration { */ public static final boolean DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false; + /** + * Default ShardSyncStrategy to be used for discovering new shards in the Stream. + */ + public static final ShardSyncStrategyType DEFAULT_SHARD_SYNC_STRATEGY_TYPE = ShardSyncStrategyType.SHARD_END; + /** * Default Shard prioritization strategy. */ @@ -230,6 +234,7 @@ public class KinesisClientLibConfiguration { private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ShardPrioritization shardPrioritization; private long shutdownGraceMillis; + private ShardSyncStrategyType shardSyncStrategyType; @Getter private Optional timeoutInSeconds = Optional.empty(); @@ -492,6 +497,7 @@ public class KinesisClientLibConfiguration { this.initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; + this.shardSyncStrategyType = DEFAULT_SHARD_SYNC_STRATEGY_TYPE; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(); } @@ -600,6 +606,7 @@ public class KinesisClientLibConfiguration { this.initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; + this.shardSyncStrategyType = DEFAULT_SHARD_SYNC_STRATEGY_TYPE; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; this.recordsFetcherFactory = recordsFetcherFactory; this.shutdownGraceMillis = shutdownGraceMillis; @@ -840,6 +847,13 @@ public class KinesisClientLibConfiguration { return skipShardSyncAtWorkerInitializationIfLeasesExist; } + /** + * @return ShardSyncStrategyType to be used by KCL to process the Stream. + */ + public ShardSyncStrategyType getShardSyncStrategyType() { + return shardSyncStrategyType; + } + /** * @return Max leases this Worker can handle at a time */ @@ -1199,6 +1213,15 @@ public class KinesisClientLibConfiguration { return this; } + /** + * @param shardSyncStrategyType ShardSyncStrategy type for KCL. + * @return {@link KinesisClientLibConfiguration} + */ + public KinesisClientLibConfiguration withShardSyncStrategyType(ShardSyncStrategyType shardSyncStrategyType) { + this.shardSyncStrategyType = shardSyncStrategyType; + return this; + } + /** * * @param regionName The region name for the service diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java index bd4cc29b..bcbabcde 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java @@ -21,8 +21,15 @@ import java.util.List; import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutorService; import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator; +import com.amazonaws.services.kinesis.leases.impl.LeaseRenewer; +import com.amazonaws.services.kinesis.leases.impl.LeaseTaker; +import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer; +import com.amazonaws.services.kinesis.leases.interfaces.ILeaseTaker; import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,8 +45,6 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; -import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; -import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; @@ -66,11 +71,8 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator leaseManager, - String workerIdentifier, - long leaseDurationMillis, - long epsilonMillis, - LeaseSelector leaseSelector) { + public KinesisClientLibLeaseCoordinator(ILeaseManager leaseManager, String workerIdentifier, + long leaseDurationMillis, long epsilonMillis, LeaseSelector leaseSelector) { super(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis, epsilonMillis); this.leaseManager = leaseManager; } @@ -81,10 +83,8 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator leaseManager, - String workerIdentifier, - long leaseDurationMillis, - long epsilonMillis) { + public KinesisClientLibLeaseCoordinator(ILeaseManager leaseManager, String workerIdentifier, + long leaseDurationMillis, long epsilonMillis) { this(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, DEFAULT_LEASE_SELECTOR); } @@ -97,11 +97,8 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator leaseManager, - LeaseSelector leaseSelector, - String workerIdentifier, - long leaseDurationMillis, - long epsilonMillis, - IMetricsFactory metricsFactory) { + LeaseSelector leaseSelector, String workerIdentifier, long leaseDurationMillis, + long epsilonMillis, IMetricsFactory metricsFactory) { super(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis, epsilonMillis, metricsFactory); this.leaseManager = leaseManager; } @@ -117,19 +114,22 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator leaseManager, - LeaseSelector leaseSelector, - String workerIdentifier, - long leaseDurationMillis, - long epsilonMillis, - int maxLeasesForWorker, - int maxLeasesToStealAtOneTime, - int maxLeaseRenewerThreadCount, + LeaseSelector leaseSelector, String workerIdentifier, long leaseDurationMillis, + long epsilonMillis, int maxLeasesForWorker, int maxLeasesToStealAtOneTime, int maxLeaseRenewerThreadCount, IMetricsFactory metricsFactory) { super(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, metricsFactory); this.leaseManager = leaseManager; } + public KinesisClientLibLeaseCoordinator(ILeaseManager leaseManager, + ILeaseTaker leaseTaker, ILeaseRenewer leaseRenewer, + final long leaseDurationMillis, final long epsilonMillis, final int maxLeasesForWorker, + final int maxLeasesToStealAtOneTime, final IMetricsFactory metricsFactory) { + super(leaseTaker, leaseRenewer, leaseDurationMillis, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, metricsFactory); + this.leaseManager = leaseManager; + } + /** * @param readCapacity The DynamoDB table used for tracking leases will be provisioned with the specified initial * read capacity @@ -175,8 +175,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator leaseManager, + InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards, + boolean ignoreUnexpectedChildShards) + throws DependencyException, InvalidStateException, ProvisionedThroughputException, + KinesisClientLibIOException { + syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards); + } + + /** + * Check and create leases for any new shards (e.g. following a reshard operation). + * + * @param kinesisProxy + * @param leaseManager + * @param initialPositionInStream + * @param cleanupLeasesOfCompletedShards + * @param ignoreUnexpectedChildShards + * @throws DependencyException + * @throws InvalidStateException + * @throws ProvisionedThroughputException + * @throws KinesisClientLibIOException + */ + public synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, + ILeaseManager leaseManager, InitialPositionInStreamExtended initialPositionInStream, + boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards) + throws DependencyException, InvalidStateException, ProvisionedThroughputException, + KinesisClientLibIOException { + syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards); + } + + /** + * Sync leases with Kinesis shards (e.g. at startup, or when we reach end of a shard). + * + * @param kinesisProxy + * @param leaseManager + * @param initialPosition + * @param cleanupLeasesOfCompletedShards + * @param ignoreUnexpectedChildShards + * @throws DependencyException + * @throws InvalidStateException + * @throws ProvisionedThroughputException + * @throws KinesisClientLibIOException + */ + // CHECKSTYLE:OFF CyclomaticComplexity + private synchronized void syncShardLeases(IKinesisProxy kinesisProxy, + ILeaseManager leaseManager, InitialPositionInStreamExtended initialPosition, + boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards) + throws DependencyException, InvalidStateException, ProvisionedThroughputException, + KinesisClientLibIOException { + List shards = getShardList(kinesisProxy); + LOG.debug("Num shards: " + shards.size()); + + Map shardIdToShardMap = constructShardIdToShardMap(shards); + Map> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(shardIdToShardMap); + Set inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap); + if (!ignoreUnexpectedChildShards) { + assertAllParentShardsAreClosed(inconsistentShardIds); + } + + List currentLeases = leaseManager.listLeases(); + + List newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition, + inconsistentShardIds); + LOG.debug("Num new leases to create: " + newLeasesToCreate.size()); + for (KinesisClientLease lease : newLeasesToCreate) { + long startTimeMillis = System.currentTimeMillis(); + boolean success = false; + try { + leaseManager.createLeaseIfNotExists(lease); + success = true; + } finally { + MetricsHelper.addSuccessAndLatency("CreateLease", startTimeMillis, success, MetricsLevel.DETAILED); + } + } + + List trackedLeases = new ArrayList<>(); + if (currentLeases != null) { + trackedLeases.addAll(currentLeases); + } + trackedLeases.addAll(newLeasesToCreate); + cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager); + if (cleanupLeasesOfCompletedShards) { + cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, + leaseManager); + } + } + // CHECKSTYLE:ON CyclomaticComplexity + + /** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls + * and a reshard operation. + * @param inconsistentShardIds + * @throws KinesisClientLibIOException + */ + private void assertAllParentShardsAreClosed(Set inconsistentShardIds) throws KinesisClientLibIOException { + if (!inconsistentShardIds.isEmpty()) { + String ids = StringUtils.join(inconsistentShardIds, ' '); + throw new KinesisClientLibIOException(String.format("%d open child shards (%s) are inconsistent. " + + "This can happen due to a race condition between describeStream and a reshard operation.", + inconsistentShardIds.size(), ids)); + } + } + + /** + * Helper method to construct the list of inconsistent shards, which are open shards with non-closed ancestor + * parent(s). + * @param shardIdToChildShardIdsMap + * @param shardIdToShardMap + * @return Set of inconsistent open shard ids for shards having open parents. + */ + private Set findInconsistentShardIds(Map> shardIdToChildShardIdsMap, + Map shardIdToShardMap) { + Set result = new HashSet(); + for (String parentShardId : shardIdToChildShardIdsMap.keySet()) { + Shard parentShard = shardIdToShardMap.get(parentShardId); + if ((parentShardId == null) || (parentShard.getSequenceNumberRange().getEndingSequenceNumber() == null)) { + Set childShardIdsMap = shardIdToChildShardIdsMap.get(parentShardId); + result.addAll(childShardIdsMap); + } + } + return result; + } + + /** + * Helper method to create a shardId->KinesisClientLease map. + * Note: This has package level access for testing purposes only. + * @param trackedLeaseList + * @return + */ + Map constructShardIdToKCLLeaseMap(List trackedLeaseList) { + Map trackedLeasesMap = new HashMap<>(); + for (KinesisClientLease lease : trackedLeaseList) { + trackedLeasesMap.put(lease.getLeaseKey(), lease); + } + return trackedLeasesMap; + } + + /** + * Note: this has package level access for testing purposes. + * Useful for asserting that we don't have an incomplete shard list following a reshard operation. + * We verify that if the shard is present in the shard list, it is closed and its hash key range + * is covered by its child shards. + */ + synchronized void assertClosedShardsAreCoveredOrAbsent(Map shardIdToShardMap, + Map> shardIdToChildShardIdsMap, Set shardIdsOfClosedShards) + throws KinesisClientLibIOException { + String exceptionMessageSuffix = "This can happen if we constructed the list of shards " + + " while a reshard operation was in progress."; + + for (String shardId : shardIdsOfClosedShards) { + Shard shard = shardIdToShardMap.get(shardId); + if (shard == null) { + LOG.info("Shard " + shardId + " is not present in Kinesis anymore."); + continue; + } + + String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber(); + if (endingSequenceNumber == null) { + throw new KinesisClientLibIOException("Shard " + shardIdsOfClosedShards + " is not closed. " + + exceptionMessageSuffix); + } + + Set childShardIds = shardIdToChildShardIdsMap.get(shardId); + if (childShardIds == null) { + throw new KinesisClientLibIOException("Incomplete shard list: Closed shard " + shardId + + " has no children." + exceptionMessageSuffix); + } + + assertHashRangeOfClosedShardIsCovered(shard, shardIdToShardMap, childShardIds); + } + } + + private synchronized void assertHashRangeOfClosedShardIsCovered(Shard closedShard, + Map shardIdToShardMap, Set childShardIds) throws KinesisClientLibIOException { + + BigInteger startingHashKeyOfClosedShard = new BigInteger(closedShard.getHashKeyRange().getStartingHashKey()); + BigInteger endingHashKeyOfClosedShard = new BigInteger(closedShard.getHashKeyRange().getEndingHashKey()); + BigInteger minStartingHashKeyOfChildren = null; + BigInteger maxEndingHashKeyOfChildren = null; + + for (String childShardId : childShardIds) { + Shard childShard = shardIdToShardMap.get(childShardId); + BigInteger startingHashKey = new BigInteger(childShard.getHashKeyRange().getStartingHashKey()); + if ((minStartingHashKeyOfChildren == null) || (startingHashKey.compareTo(minStartingHashKeyOfChildren) + < 0)) { + minStartingHashKeyOfChildren = startingHashKey; + } + BigInteger endingHashKey = new BigInteger(childShard.getHashKeyRange().getEndingHashKey()); + if ((maxEndingHashKeyOfChildren == null) || (endingHashKey.compareTo(maxEndingHashKeyOfChildren) > 0)) { + maxEndingHashKeyOfChildren = endingHashKey; + } + } + + if ((minStartingHashKeyOfChildren == null) || (maxEndingHashKeyOfChildren == null) || ( + minStartingHashKeyOfChildren.compareTo(startingHashKeyOfClosedShard) > 0) || ( + maxEndingHashKeyOfChildren.compareTo(endingHashKeyOfClosedShard) < 0)) { + throw new KinesisClientLibIOException("Incomplete shard list: hash key range of shard " + closedShard + .getShardId() + " is not covered by its child shards."); + } + + } + + /** + * Helper method to construct shardId->setOfChildShardIds map. + * Note: This has package access for testing purposes only. + * @param shardIdToShardMap + * @return + */ + Map> constructShardIdToChildShardIdsMap(Map shardIdToShardMap) { + Map> shardIdToChildShardIdsMap = new HashMap<>(); + for (Map.Entry entry : shardIdToShardMap.entrySet()) { + String shardId = entry.getKey(); + Shard shard = entry.getValue(); + String parentShardId = shard.getParentShardId(); + if ((parentShardId != null) && (shardIdToShardMap.containsKey(parentShardId))) { + Set childShardIds = shardIdToChildShardIdsMap.get(parentShardId); + if (childShardIds == null) { + childShardIds = new HashSet(); + shardIdToChildShardIdsMap.put(parentShardId, childShardIds); + } + childShardIds.add(shardId); + } + + String adjacentParentShardId = shard.getAdjacentParentShardId(); + if ((adjacentParentShardId != null) && (shardIdToShardMap.containsKey(adjacentParentShardId))) { + Set childShardIds = shardIdToChildShardIdsMap.get(adjacentParentShardId); + if (childShardIds == null) { + childShardIds = new HashSet(); + shardIdToChildShardIdsMap.put(adjacentParentShardId, childShardIds); + } + childShardIds.add(shardId); + } + } + return shardIdToChildShardIdsMap; + } + + private List getShardList(IKinesisProxy kinesisProxy) throws KinesisClientLibIOException { + List shards = kinesisProxy.getShardList(); + if (shards == null) { + throw new KinesisClientLibIOException( + "Stream is not in ACTIVE OR UPDATING state - will retry getting the shard list."); + } + return shards; + } + + /** + * Determine new leases to create and their initial checkpoint. + * Note: Package level access only for testing purposes. + * + * For each open (no ending sequence number) shard without open parents that doesn't already have a lease, + * determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists): + * If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed. + * If not, set checkpoint of the shard to the initial position specified by the client. + * To check if we need to create leases for ancestors, we use the following rules: + * * If we began (or will begin) processing data for a shard, then we must reach end of that shard before + * we begin processing data from any of its descendants. + * * A shard does not start processing data until data from all its parents has been processed. + * Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create + * leases corresponding to both the parents - the parent shard which is not a descendant will have + * its checkpoint set to Latest. + * + * We assume that if there is an existing lease for a shard, then either: + * * we have previously created a lease for its parent (if it was needed), or + * * the parent shard has expired. + * + * For example: + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5 - shards till epoch 102 + * \ / \ / | | + * 6 7 4 5 - shards from epoch 103 - 205 + * \ / | / \ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (3, 4, 5) + * New leases to create: (2, 6, 7, 8, 9, 10) + * + * The leases returned are sorted by the starting sequence number - following the same order + * when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail + * before creating all the leases. + * + * If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it + * here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very + * high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only + * currently happen here if ignoreUnexpectedChildShards was true in syncShardleases. + * + * + * @param shards List of all shards in Kinesis (we'll create new leases based on this set) + * @param currentLeases List of current leases + * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that + * location in the shard (when an application starts up for the first time - and there are no checkpoints). + * @param inconsistentShardIds Set of child shard ids having open parents. + * @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard + */ + List determineNewLeasesToCreate(List shards, List currentLeases, + InitialPositionInStreamExtended initialPosition, Set inconsistentShardIds) { + Map shardIdToNewLeaseMap = new HashMap(); + Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards); + + Set shardIdsOfCurrentLeases = new HashSet(); + for (KinesisClientLease lease : currentLeases) { + shardIdsOfCurrentLeases.add(lease.getLeaseKey()); + LOG.debug("Existing lease: " + lease); + } + + List openShards = getOpenShards(shards); + Map memoizationContext = new HashMap<>(); + + // Iterate over the open shards and find those that don't have any lease entries. + for (Shard shard : openShards) { + String shardId = shard.getShardId(); + LOG.debug("Evaluating leases for open shard " + shardId + " and its ancestors."); + if (shardIdsOfCurrentLeases.contains(shardId)) { + LOG.debug("Lease for shardId " + shardId + " already exists. Not creating a lease"); + } else if (inconsistentShardIds.contains(shardId)) { + LOG.info("shardId " + shardId + " is an inconsistent child. Not creating a lease"); + } else { + LOG.debug("Need to create a lease for shardId " + shardId); + KinesisClientLease newLease = newKCLLease(shard); + boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition, + shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap, + memoizationContext); + + /** + * If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the + * checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a + * lease just like we do for TRIM_HORIZON. However we will only return back records with server-side + * timestamp at or after the specified initial position timestamp. + * + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5 - shards till epoch 102 + * \ / \ / | | + * 6 7 4 5 - shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * + * Current leases: empty set + * + * For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with + * timestamp value 206. We will then create new leases for all the shards (with checkpoint set to + * AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin + * processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases + * would then be deleted since they won't have records with server-side timestamp at/after 206. And + * after that we will begin processing the descendant shards with epoch at/after 206 and we will + * return the records that meet the timestamp requirement for these shards. + */ + if (isDescendant && !initialPosition.getInitialPositionInStream() + .equals(InitialPositionInStream.AT_TIMESTAMP)) { + newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + } else { + newLease.setCheckpoint(convertToCheckpoint(initialPosition)); + } + LOG.debug("Set checkpoint of " + newLease.getLeaseKey() + " to " + newLease.getCheckpoint()); + shardIdToNewLeaseMap.put(shardId, newLease); + } + } + + List newLeasesToCreate = new ArrayList(); + newLeasesToCreate.addAll(shardIdToNewLeaseMap.values()); + Comparator startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator( + shardIdToShardMapOfAllKinesisShards); + Collections.sort(newLeasesToCreate, startingSequenceNumberComparator); + return newLeasesToCreate; + } + + /** + * Determine new leases to create and their initial checkpoint. + * Note: Package level access only for testing purposes. + */ + List determineNewLeasesToCreate(List shards, List currentLeases, + InitialPositionInStreamExtended initialPosition) { + Set inconsistentShardIds = new HashSet(); + return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds); + } + + /** + * Note: Package level access for testing purposes only. + * Check if this shard is a descendant of a shard that is (or will be) processed. + * Create leases for the ancestors of this shard as required. + * See javadoc of determineNewLeasesToCreate() for rules and example. + * + * @param shardId The shardId to check. + * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that + * location in the shard (when an application starts up for the first time - and there are no checkpoints). + * @param shardIdsOfCurrentLeases The shardIds for the current leases. + * @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream. + * @param shardIdToLeaseMapOfNewShards Add lease POJOs corresponding to ancestors to this map. + * @param memoizationContext Memoization of shards that have been evaluated as part of the evaluation + * @return true if the shard is a descendant of any current shard (lease already exists) + */ + // CHECKSTYLE:OFF CyclomaticComplexity + boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId, + InitialPositionInStreamExtended initialPosition, Set shardIdsOfCurrentLeases, + Map shardIdToShardMapOfAllKinesisShards, + Map shardIdToLeaseMapOfNewShards, Map memoizationContext) { + + Boolean previousValue = memoizationContext.get(shardId); + if (previousValue != null) { + return previousValue; + } + + boolean isDescendant = false; + Shard shard; + Set parentShardIds; + Set descendantParentShardIds = new HashSet(); + + if ((shardId != null) && (shardIdToShardMapOfAllKinesisShards.containsKey(shardId))) { + if (shardIdsOfCurrentLeases.contains(shardId)) { + // This shard is a descendant of a current shard. + isDescendant = true; + // We don't need to add leases of its ancestors, + // because we'd have done it when creating a lease for this shard. + } else { + shard = shardIdToShardMapOfAllKinesisShards.get(shardId); + parentShardIds = getParentShardIds(shard, shardIdToShardMapOfAllKinesisShards); + for (String parentShardId : parentShardIds) { + // Check if the parent is a descendant, and include its ancestors. + if (checkIfDescendantAndAddNewLeasesForAncestors(parentShardId, initialPosition, + shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToLeaseMapOfNewShards, + memoizationContext)) { + isDescendant = true; + descendantParentShardIds.add(parentShardId); + LOG.debug("Parent shard " + parentShardId + " is a descendant."); + } else { + LOG.debug("Parent shard " + parentShardId + " is NOT a descendant."); + } + } + + // If this is a descendant, create leases for its parent shards (if they don't exist) + if (isDescendant) { + for (String parentShardId : parentShardIds) { + if (!shardIdsOfCurrentLeases.contains(parentShardId)) { + LOG.debug("Need to create a lease for shardId " + parentShardId); + KinesisClientLease lease = shardIdToLeaseMapOfNewShards.get(parentShardId); + if (lease == null) { + lease = newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId)); + shardIdToLeaseMapOfNewShards.put(parentShardId, lease); + } + + if (descendantParentShardIds.contains(parentShardId) && !initialPosition + .getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) { + lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + } else { + lease.setCheckpoint(convertToCheckpoint(initialPosition)); + } + } + } + } else { + // This shard should be included, if the customer wants to process all records in the stream or + // if the initial position is AT_TIMESTAMP. For AT_TIMESTAMP, we will add a lease just like we do + // for TRIM_HORIZON. However we will only return back records with server-side timestamp at or + // after the specified initial position timestamp. + if (initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON) + || initialPosition.getInitialPositionInStream() + .equals(InitialPositionInStream.AT_TIMESTAMP)) { + isDescendant = true; + } + } + + } + } + + memoizationContext.put(shardId, isDescendant); + return isDescendant; + } + // CHECKSTYLE:ON CyclomaticComplexity + + /** + * Helper method to get parent shardIds of the current shard - includes the parent shardIds if: + * a/ they are not null + * b/ if they exist in the current shard map (i.e. haven't expired) + * + * @param shard Will return parents of this shard + * @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream. + * @return Set of parentShardIds + */ + Set getParentShardIds(Shard shard, Map shardIdToShardMapOfAllKinesisShards) { + Set parentShardIds = new HashSet(2); + String parentShardId = shard.getParentShardId(); + if ((parentShardId != null) && shardIdToShardMapOfAllKinesisShards.containsKey(parentShardId)) { + parentShardIds.add(parentShardId); + } + String adjacentParentShardId = shard.getAdjacentParentShardId(); + if ((adjacentParentShardId != null) && shardIdToShardMapOfAllKinesisShards.containsKey(adjacentParentShardId)) { + parentShardIds.add(adjacentParentShardId); + } + return parentShardIds; + } + + /** + * Delete leases corresponding to shards that no longer exist in the stream. + * Current scheme: Delete a lease if: + * * the corresponding shard is not present in the list of Kinesis shards, AND + * * the parentShardIds listed in the lease are also not present in the list of Kinesis shards. + * @param shards List of all Kinesis shards (assumed to be a consistent snapshot - when stream is in Active state). + * @param trackedLeases List of + * @param kinesisProxy Kinesis proxy (used to get shard list) + * @param leaseManager + * @throws KinesisClientLibIOException Thrown if we couldn't get a fresh shard list from Kinesis. + * @throws ProvisionedThroughputException + * @throws InvalidStateException + * @throws DependencyException + */ + private void cleanupGarbageLeases(List shards, List trackedLeases, + IKinesisProxy kinesisProxy, ILeaseManager leaseManager) + throws KinesisClientLibIOException, DependencyException, InvalidStateException, + ProvisionedThroughputException { + Set kinesisShards = new HashSet<>(); + for (Shard shard : shards) { + kinesisShards.add(shard.getShardId()); + } + + // Check if there are leases for non-existent shards + List garbageLeases = new ArrayList<>(); + for (KinesisClientLease lease : trackedLeases) { + if (leaseCleanupValidator.isCandidateForCleanup(lease, kinesisShards)) { + garbageLeases.add(lease); + } + } + + if (!garbageLeases.isEmpty()) { + LOG.info("Found " + garbageLeases.size() + " candidate leases for cleanup. Refreshing list of" + + " Kinesis shards to pick up recent/latest shards"); + List currentShardList = getShardList(kinesisProxy); + Set currentKinesisShardIds = new HashSet<>(); + for (Shard shard : currentShardList) { + currentKinesisShardIds.add(shard.getShardId()); + } + + for (KinesisClientLease lease : garbageLeases) { + if (leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds)) { + LOG.info("Deleting lease for shard " + lease.getLeaseKey() + + " as it is not present in Kinesis stream."); + leaseManager.deleteLease(lease); + } + } + } + + } + + /** + * Private helper method. + * Clean up leases for shards that meet the following criteria: + * a/ the shard has been fully processed (checkpoint is set to SHARD_END) + * b/ we've begun processing all the child shards: we have leases for all child shards and their checkpoint is not + * TRIM_HORIZON. + * + * @param currentLeases List of leases we evaluate for clean up + * @param shardIdToShardMap Map of shardId->Shard (assumed to include all Kinesis shards) + * @param shardIdToChildShardIdsMap Map of shardId->childShardIds (assumed to include all Kinesis shards) + * @param trackedLeases List of all leases we are tracking. + * @param leaseManager Lease manager (will be used to delete leases) + * @throws DependencyException + * @throws InvalidStateException + * @throws ProvisionedThroughputException + * @throws KinesisClientLibIOException + */ + private synchronized void cleanupLeasesOfFinishedShards(Collection currentLeases, + Map shardIdToShardMap, Map> shardIdToChildShardIdsMap, + List trackedLeases, ILeaseManager leaseManager) + throws DependencyException, InvalidStateException, ProvisionedThroughputException, + KinesisClientLibIOException { + Set shardIdsOfClosedShards = new HashSet<>(); + List leasesOfClosedShards = new ArrayList<>(); + for (KinesisClientLease lease : currentLeases) { + if (lease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) { + shardIdsOfClosedShards.add(lease.getLeaseKey()); + leasesOfClosedShards.add(lease); + } + } + + if (!leasesOfClosedShards.isEmpty()) { + assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, shardIdsOfClosedShards); + Comparator startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator( + shardIdToShardMap); + Collections.sort(leasesOfClosedShards, startingSequenceNumberComparator); + Map trackedLeaseMap = constructShardIdToKCLLeaseMap(trackedLeases); + + for (KinesisClientLease leaseOfClosedShard : leasesOfClosedShards) { + String closedShardId = leaseOfClosedShard.getLeaseKey(); + Set childShardIds = shardIdToChildShardIdsMap.get(closedShardId); + if ((closedShardId != null) && (childShardIds != null) && (!childShardIds.isEmpty())) { + cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); + } + } + } + } + + /** + * Delete lease for the closed shard. Rules for deletion are: + * a/ the checkpoint for the closed shard is SHARD_END, + * b/ there are leases for all the childShardIds and their checkpoint is NOT TRIM_HORIZON + * Note: This method has package level access solely for testing purposes. + * + * @param closedShardId Identifies the closed shard + * @param childShardIds ShardIds of children of the closed shard + * @param trackedLeases shardId->KinesisClientLease map with all leases we are tracking (should not be null) + * @param leaseManager + * @throws ProvisionedThroughputException + * @throws InvalidStateException + * @throws DependencyException + */ + synchronized void cleanupLeaseForClosedShard(String closedShardId, Set childShardIds, + Map trackedLeases, ILeaseManager leaseManager) + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + KinesisClientLease leaseForClosedShard = trackedLeases.get(closedShardId); + List childShardLeases = new ArrayList<>(); + + for (String childShardId : childShardIds) { + KinesisClientLease childLease = trackedLeases.get(childShardId); + if (childLease != null) { + childShardLeases.add(childLease); + } + } + + if ((leaseForClosedShard != null) && (leaseForClosedShard.getCheckpoint() + .equals(ExtendedSequenceNumber.SHARD_END)) && (childShardLeases.size() == childShardIds.size())) { + boolean okayToDelete = true; + for (KinesisClientLease lease : childShardLeases) { + if (lease.getCheckpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON)) { + okayToDelete = false; + break; + } + } + + if (okayToDelete) { + LOG.info("Deleting lease for shard " + leaseForClosedShard.getLeaseKey() + + " as it has been completely processed and processing of child shards has begun."); + leaseManager.deleteLease(leaseForClosedShard); + } + } + } + + /** + * Helper method to create a new KinesisClientLease POJO for a shard. + * Note: Package level access only for testing purposes + * + * @param shard + * @return + */ + KinesisClientLease newKCLLease(Shard shard) { + KinesisClientLease newLease = new KinesisClientLease(); + newLease.setLeaseKey(shard.getShardId()); + List parentShardIds = new ArrayList(2); + if (shard.getParentShardId() != null) { + parentShardIds.add(shard.getParentShardId()); + } + if (shard.getAdjacentParentShardId() != null) { + parentShardIds.add(shard.getAdjacentParentShardId()); + } + newLease.setParentShardIds(parentShardIds); + newLease.setOwnerSwitchesSinceCheckpoint(0L); + + return newLease; + } + + /** + * Helper method to construct a shardId->Shard map for the specified list of shards. + * + * @param shards List of shards + * @return ShardId->Shard map + */ + Map constructShardIdToShardMap(List shards) { + Map shardIdToShardMap = new HashMap(); + for (Shard shard : shards) { + shardIdToShardMap.put(shard.getShardId(), shard); + } + return shardIdToShardMap; + } + + /** + * Helper method to return all the open shards for a stream. + * Note: Package level access only for testing purposes. + * + * @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list. + * @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active. + */ + List getOpenShards(List allShards) { + List openShards = new ArrayList(); + for (Shard shard : allShards) { + String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber(); + if (endingSequenceNumber == null) { + openShards.add(shard); + LOG.debug("Found open shard: " + shard.getShardId()); + } + } + return openShards; + } + + private ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended position) { + ExtendedSequenceNumber checkpoint = null; + + if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) { + checkpoint = ExtendedSequenceNumber.TRIM_HORIZON; + } else if (position.getInitialPositionInStream().equals(InitialPositionInStream.LATEST)) { + checkpoint = ExtendedSequenceNumber.LATEST; + } else if (position.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) { + checkpoint = ExtendedSequenceNumber.AT_TIMESTAMP; + } + + return checkpoint; + } + + /** Helper class to compare leases based on starting sequence number of the corresponding shards. + * + */ + private static class StartingSequenceNumberAndShardIdBasedComparator implements Comparator, + Serializable { + + private static final long serialVersionUID = 1L; + + private final Map shardIdToShardMap; + + /** + * @param shardIdToShardMapOfAllKinesisShards + */ + public StartingSequenceNumberAndShardIdBasedComparator(Map shardIdToShardMapOfAllKinesisShards) { + shardIdToShardMap = shardIdToShardMapOfAllKinesisShards; + } + + /** + * Compares two leases based on the starting sequence number of corresponding shards. + * If shards are not found in the shardId->shard map supplied, we do a string comparison on the shardIds. + * We assume that lease1 and lease2 are: + * a/ not null, + * b/ shards (if found) have non-null starting sequence numbers + * + * {@inheritDoc} + */ + @Override + public int compare(KinesisClientLease lease1, KinesisClientLease lease2) { + int result = 0; + String shardId1 = lease1.getLeaseKey(); + String shardId2 = lease2.getLeaseKey(); + Shard shard1 = shardIdToShardMap.get(shardId1); + Shard shard2 = shardIdToShardMap.get(shardId2); + + // If we found shards for the two leases, use comparison of the starting sequence numbers + if ((shard1 != null) && (shard2 != null)) { + BigInteger sequenceNumber1 = new BigInteger( + shard1.getSequenceNumberRange().getStartingSequenceNumber()); + BigInteger sequenceNumber2 = new BigInteger( + shard2.getSequenceNumberRange().getStartingSequenceNumber()); + result = sequenceNumber1.compareTo(sequenceNumber2); + } + + if (result == 0) { + result = shardId1.compareTo(shardId2); + } + + return result; + } + + } + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/LeaderDecider.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/LeaderDecider.java new file mode 100644 index 00000000..af1fe1c1 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/LeaderDecider.java @@ -0,0 +1,39 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +/** + * Used in conjunction with periodic shard sync. + * Implement this interface to allow KCL to decide if the current worker should execute shard sync. + * When periodic shard sync is enabled, PeriodicShardSyncManager periodically checks if the current + * worker is one of the leaders designated to execute shard-sync and then acts accordingly. + */ +public interface LeaderDecider { + + /** + * Method invoked to check the given workerId corresponds to one of the workers + * designated to execute shard-syncs periodically. + * + * @param workerId ID of the worker + * @return True if the worker with ID workerId can execute shard-sync. False otherwise. + */ + Boolean isLeader(String workerId); + + /** + * Can be invoked, if needed, to shutdown any clients/thread-pools + * being used in the LeaderDecider implementation. + */ + void shutdown(); +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java new file mode 100644 index 00000000..f2fa165e --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java @@ -0,0 +1,90 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import org.apache.commons.lang3.Validate; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * The top level orchestrator for coordinating the periodic shard sync related + * activities. + */ +@Getter +@EqualsAndHashCode +class PeriodicShardSyncManager { + private static final Log LOG = LogFactory.getLog(PeriodicShardSyncManager.class); + private static final long INITIAL_DELAY = 0; + private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 1000; + + private final String workerId; + private final LeaderDecider leaderDecider; + private final ShardSyncTask shardSyncTask; + private final ScheduledExecutorService shardSyncThreadPool; + private boolean isRunning; + + PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask) { + this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor()); + } + + PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, ScheduledExecutorService shardSyncThreadPool) { + Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager."); + Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager."); + Validate.notNull(shardSyncTask, "ShardSyncTask is required to initialize PeriodicShardSyncManager."); + this.workerId = workerId; + this.leaderDecider = leaderDecider; + this.shardSyncTask = shardSyncTask; + this.shardSyncThreadPool = shardSyncThreadPool; + } + + public synchronized TaskResult start() { + if (!isRunning) { + shardSyncThreadPool + .scheduleWithFixedDelay(this::runShardSync, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS, + TimeUnit.MILLISECONDS); + isRunning = true; + } + return new TaskResult(null); + } + + public void stop() { + if (isRunning) { + LOG.info(String.format("Shutting down leader decider on worker %s", workerId)); + leaderDecider.shutdown(); + LOG.info(String.format("Shutting down periodic shard sync task scheduler on worker %s", workerId)); + shardSyncThreadPool.shutdown(); + isRunning = false; + } + } + + private void runShardSync() { + try { + if (leaderDecider.isLeader(workerId)) { + LOG.debug(String.format("WorkerId %s is a leader, running the shard sync task", workerId)); + shardSyncTask.call(); + } else { + LOG.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId)); + } + } catch (Throwable t) { + LOG.error("Error during runShardSync.", t); + } + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncStrategy.java new file mode 100644 index 00000000..c39803ae --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncStrategy.java @@ -0,0 +1,43 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +/** + * An implementation of ShardSyncStrategy. + */ +class PeriodicShardSyncStrategy implements ShardSyncStrategy { + + private PeriodicShardSyncManager periodicShardSyncManager; + + PeriodicShardSyncStrategy(PeriodicShardSyncManager periodicShardSyncManager) { + this.periodicShardSyncManager = periodicShardSyncManager; + } + + @Override + public ShardSyncStrategyType getStrategyType() { + return ShardSyncStrategyType.PERIODIC; + } + + @Override + public TaskResult syncShards() { + return periodicShardSyncManager.start(); + } + + @Override + public TaskResult onWorkerInitialization() { + return syncShards(); + } + + @Override + public TaskResult onFoundCompletedShard() { + return new TaskResult(null); + } + + @Override + public TaskResult onShardConsumerShutDown() { + return new TaskResult(null); + } + + @Override + public void onWorkerShutDown() { + periodicShardSyncManager.stop(); + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 3469a01d..0b81f23d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -64,6 +64,7 @@ class ShardConsumer { private ITask currentTask; private long currentTaskSubmitTime; private Future future; + private ShardSyncStrategy shardSyncStrategy; @Getter private final GetRecordsCache getRecordsCache; @@ -116,8 +117,7 @@ class ShardConsumer { IMetricsFactory metricsFactory, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - KinesisClientLibConfiguration config, - ShardSyncer shardSyncer) { + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { this(shardInfo, streamConfig, checkpoint, @@ -131,8 +131,7 @@ class ShardConsumer { skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty(), - config, - shardSyncer); + config, shardSyncer, shardSyncStrategy); } /** @@ -164,9 +163,7 @@ class ShardConsumer { boolean skipShardSyncAtWorkerInitializationIfLeasesExist, Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, - KinesisClientLibConfiguration config, - ShardSyncer shardSyncer) { - + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { this( shardInfo, streamConfig, @@ -190,8 +187,7 @@ class ShardConsumer { new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo), retryGetRecordsInSeconds, maxGetRecordsThreadPool, - config, - shardSyncer + config, shardSyncer, shardSyncStrategy ); } @@ -229,8 +225,7 @@ class ShardConsumer { KinesisDataFetcher kinesisDataFetcher, Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, - KinesisClientLibConfiguration config, - ShardSyncer shardSyncer) { + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { this.shardInfo = shardInfo; this.streamConfig = streamConfig; this.checkpoint = checkpoint; @@ -249,6 +244,7 @@ class ShardConsumer { makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo), this.getShardInfo().getShardId(), this.metricsFactory, this.config.getMaxRecords()); this.shardSyncer = shardSyncer; + this.shardSyncStrategy = shardSyncStrategy; } /** @@ -512,4 +508,8 @@ class ShardConsumer { ShutdownNotification getShutdownNotification() { return shutdownNotification; } + + ShardSyncStrategy getShardSyncStrategy() { + return shardSyncStrategy; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java new file mode 100644 index 00000000..49adbf8b --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java @@ -0,0 +1,62 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Implementation of ShardSyncStrategy that facilitates shard syncs when a shard completes processing. + */ +class ShardEndShardSyncStrategy implements ShardSyncStrategy { + + private static final Log LOG = LogFactory.getLog(Worker.class); + private ShardSyncTaskManager shardSyncTaskManager; + + ShardEndShardSyncStrategy(ShardSyncTaskManager shardSyncTaskManager) { + this.shardSyncTaskManager = shardSyncTaskManager; + } + + @Override + public ShardSyncStrategyType getStrategyType() { + return ShardSyncStrategyType.SHARD_END; + } + + @Override + public TaskResult syncShards() { + Future taskResultFuture = null; + TaskResult result = null; + while (taskResultFuture == null) { + taskResultFuture = shardSyncTaskManager.syncShardAndLeaseInfo(null); + } + try { + result = taskResultFuture.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.warn("ShardEndShardSyncStrategy syncShards encountered exception.", e); + } + return result; + } + + @Override + public TaskResult onWorkerInitialization() { + LOG.debug(String.format("onWorkerInitialization is NoOp for ShardSyncStrategyType %s", getStrategyType().toString())); + return new TaskResult(null); + } + + @Override + public TaskResult onFoundCompletedShard() { + shardSyncTaskManager.syncShardAndLeaseInfo(null); + return new TaskResult(null); + } + + @Override + public TaskResult onShardConsumerShutDown() { + return shardSyncTaskManager.runShardSyncer(); + } + + @Override + public void onWorkerShutDown() { + LOG.debug(String.format("Stop is NoOp for ShardSyncStrategyType %s", getStrategyType().toString())); + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncStrategy.java new file mode 100644 index 00000000..6738d2e9 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncStrategy.java @@ -0,0 +1,48 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +/** + * Facade of methods that can be invoked at different points + * in KCL application execution to perform certain actions related to shard-sync. + */ +public interface ShardSyncStrategy { + + /** + * Can be used to provide a custom name for the implemented strategy. + * + * @return Name of the strategy. + */ + ShardSyncStrategyType getStrategyType(); + + /** + * Invoked when the KCL application wants to execute shard-sync. + * + * @return TaskResult + */ + TaskResult syncShards(); + + /** + * Invoked at worker initialization + * + * @return + */ + TaskResult onWorkerInitialization(); + + /** + * Invoked when a completed shard is found. + * + * @return + */ + TaskResult onFoundCompletedShard(); + + /** + * Invoked when ShardConsumer is shutdown. + * + * @return + */ + TaskResult onShardConsumerShutDown(); + + /** + * Invoked when worker is shutdown. + */ + void onWorkerShutDown(); +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncStrategyType.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncStrategyType.java new file mode 100644 index 00000000..855d812e --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncStrategyType.java @@ -0,0 +1,12 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +/** + * Types of ShardSyncStrategy} implemented in KCL. + */ +public enum ShardSyncStrategyType { + + /* Shard sync are performed periodically */ + PERIODIC, + /* Shard syncs are performed when processing of a shard completes */ + SHARD_END +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java index b8c9663b..fb994f50 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java @@ -19,6 +19,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import lombok.Getter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,6 +33,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; * Kinesis shards, remove obsolete leases). We'll have at most one outstanding sync task at any time. * Worker will use this class to kick off a sync task when it finds shards which have been completely processed. */ +@Getter class ShardSyncTaskManager { private static final Log LOG = LogFactory.getLog(ShardSyncTaskManager.class); @@ -83,12 +85,12 @@ class ShardSyncTaskManager { this.shardSyncer = shardSyncer; } - synchronized boolean syncShardAndLeaseInfo(Set closedShardIds) { + synchronized Future syncShardAndLeaseInfo(Set closedShardIds) { return checkAndSubmitNextTask(closedShardIds); } - private synchronized boolean checkAndSubmitNextTask(Set closedShardIds) { - boolean submittedNewTask = false; + private synchronized Future checkAndSubmitNextTask(Set closedShardIds) { + Future submittedTaskFuture = null; if ((future == null) || future.isCancelled() || future.isDone()) { if ((future != null) && future.isDone()) { try { @@ -111,17 +113,32 @@ class ShardSyncTaskManager { shardSyncIdleTimeMillis, shardSyncer), metricsFactory); future = executorService.submit(currentTask); - submittedNewTask = true; if (LOG.isDebugEnabled()) { LOG.debug("Submitted new " + currentTask.getTaskType() + " task."); } + submittedTaskFuture = future; } else { if (LOG.isDebugEnabled()) { LOG.debug("Previous " + currentTask.getTaskType() + " task still pending. Not submitting new task."); } } - - return submittedNewTask; + return submittedTaskFuture; } + synchronized TaskResult runShardSyncer() { + Exception exception = null; + + try { + shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, + leaseManager, + initialPositionInStream, + cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards); + } catch (Exception e) { + LOG.error("Caught exception while sync'ing Kinesis shards and leases", e); + exception = e; + } + + return new TaskResult(exception); + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java index 665bd5d2..2821dd2d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java @@ -14,830 +14,19 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import java.io.Serializable; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.commons.lang3.StringUtils; - import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; -import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; -import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; -import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; -import com.amazonaws.services.kinesis.model.Shard; -/** - * Helper class to sync leases with shards of the Kinesis stream. - * It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding). - * It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it - * and begun processing it's child shards. - */ -class ShardSyncer { +public interface ShardSyncer { - private static final Log LOG = LogFactory.getLog(ShardSyncer.class); - private final LeaseCleanupValidator leaseCleanupValidator; - - public ShardSyncer(final LeaseCleanupValidator leaseCleanupValidator) { - this.leaseCleanupValidator = leaseCleanupValidator; - } - - synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy, - ILeaseManager leaseManager, - InitialPositionInStreamExtended initialPositionInStream, - boolean cleanupLeasesOfCompletedShards, + void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, + InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards) - throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards); - } - - /** - * Check and create leases for any new shards (e.g. following a reshard operation). - * - * @param kinesisProxy - * @param leaseManager - * @param initialPositionInStream - * @param cleanupLeasesOfCompletedShards - * @param ignoreUnexpectedChildShards - * @throws DependencyException - * @throws InvalidStateException - * @throws ProvisionedThroughputException - * @throws KinesisClientLibIOException - */ - synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, - ILeaseManager leaseManager, - InitialPositionInStreamExtended initialPositionInStream, - boolean cleanupLeasesOfCompletedShards, - boolean ignoreUnexpectedChildShards) - throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards); - } - - /** - * Sync leases with Kinesis shards (e.g. at startup, or when we reach end of a shard). - * - * @param kinesisProxy - * @param leaseManager - * @param initialPosition - * @param cleanupLeasesOfCompletedShards - * @param ignoreUnexpectedChildShards - * @throws DependencyException - * @throws InvalidStateException - * @throws ProvisionedThroughputException - * @throws KinesisClientLibIOException - */ - // CHECKSTYLE:OFF CyclomaticComplexity - private synchronized void syncShardLeases(IKinesisProxy kinesisProxy, - ILeaseManager leaseManager, - InitialPositionInStreamExtended initialPosition, - boolean cleanupLeasesOfCompletedShards, - boolean ignoreUnexpectedChildShards) - throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - List shards = getShardList(kinesisProxy); - LOG.debug("Num shards: " + shards.size()); - - Map shardIdToShardMap = constructShardIdToShardMap(shards); - Map> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(shardIdToShardMap); - Set inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap); - if (!ignoreUnexpectedChildShards) { - assertAllParentShardsAreClosed(inconsistentShardIds); - } - - List currentLeases = leaseManager.listLeases(); - - List newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition, - inconsistentShardIds); - LOG.debug("Num new leases to create: " + newLeasesToCreate.size()); - for (KinesisClientLease lease : newLeasesToCreate) { - long startTimeMillis = System.currentTimeMillis(); - boolean success = false; - try { - leaseManager.createLeaseIfNotExists(lease); - success = true; - } finally { - MetricsHelper.addSuccessAndLatency("CreateLease", startTimeMillis, success, MetricsLevel.DETAILED); - } - } - - List trackedLeases = new ArrayList<>(); - if (currentLeases != null) { - trackedLeases.addAll(currentLeases); - } - trackedLeases.addAll(newLeasesToCreate); - cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager); - if (cleanupLeasesOfCompletedShards) { - cleanupLeasesOfFinishedShards(currentLeases, - shardIdToShardMap, - shardIdToChildShardIdsMap, - trackedLeases, - leaseManager); - } - } - // CHECKSTYLE:ON CyclomaticComplexity - - /** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls - * and a reshard operation. - * @param inconsistentShardIds - * @throws KinesisClientLibIOException - */ - private void assertAllParentShardsAreClosed(Set inconsistentShardIds) - throws KinesisClientLibIOException { - if (!inconsistentShardIds.isEmpty()) { - String ids = StringUtils.join(inconsistentShardIds, ' '); - throw new KinesisClientLibIOException(String.format("%d open child shards (%s) are inconsistent. " - + "This can happen due to a race condition between describeStream and a reshard operation.", - inconsistentShardIds.size(), ids)); - } - } - - /** - * Helper method to construct the list of inconsistent shards, which are open shards with non-closed ancestor - * parent(s). - * @param shardIdToChildShardIdsMap - * @param shardIdToShardMap - * @return Set of inconsistent open shard ids for shards having open parents. - */ - private Set findInconsistentShardIds(Map> shardIdToChildShardIdsMap, - Map shardIdToShardMap) { - Set result = new HashSet(); - for (String parentShardId : shardIdToChildShardIdsMap.keySet()) { - Shard parentShard = shardIdToShardMap.get(parentShardId); - if ((parentShardId == null) || (parentShard.getSequenceNumberRange().getEndingSequenceNumber() == null)) { - Set childShardIdsMap = shardIdToChildShardIdsMap.get(parentShardId); - result.addAll(childShardIdsMap); - } - } - return result; - } - - /** - * Helper method to create a shardId->KinesisClientLease map. - * Note: This has package level access for testing purposes only. - * @param trackedLeaseList - * @return - */ - Map constructShardIdToKCLLeaseMap(List trackedLeaseList) { - Map trackedLeasesMap = new HashMap<>(); - for (KinesisClientLease lease : trackedLeaseList) { - trackedLeasesMap.put(lease.getLeaseKey(), lease); - } - return trackedLeasesMap; - } - - /** - * Note: this has package level access for testing purposes. - * Useful for asserting that we don't have an incomplete shard list following a reshard operation. - * We verify that if the shard is present in the shard list, it is closed and its hash key range - * is covered by its child shards. - */ - synchronized void assertClosedShardsAreCoveredOrAbsent(Map shardIdToShardMap, - Map> shardIdToChildShardIdsMap, - Set shardIdsOfClosedShards) throws KinesisClientLibIOException { - String exceptionMessageSuffix = "This can happen if we constructed the list of shards " - + " while a reshard operation was in progress."; - - for (String shardId : shardIdsOfClosedShards) { - Shard shard = shardIdToShardMap.get(shardId); - if (shard == null) { - LOG.info("Shard " + shardId + " is not present in Kinesis anymore."); - continue; - } - - String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber(); - if (endingSequenceNumber == null) { - throw new KinesisClientLibIOException("Shard " + shardIdsOfClosedShards - + " is not closed. " + exceptionMessageSuffix); - } - - Set childShardIds = shardIdToChildShardIdsMap.get(shardId); - if (childShardIds == null) { - throw new KinesisClientLibIOException("Incomplete shard list: Closed shard " + shardId - + " has no children." + exceptionMessageSuffix); - } - - assertHashRangeOfClosedShardIsCovered(shard, shardIdToShardMap, childShardIds); - } - } - - private synchronized void assertHashRangeOfClosedShardIsCovered(Shard closedShard, - Map shardIdToShardMap, - Set childShardIds) throws KinesisClientLibIOException { - - BigInteger startingHashKeyOfClosedShard = new BigInteger(closedShard.getHashKeyRange().getStartingHashKey()); - BigInteger endingHashKeyOfClosedShard = new BigInteger(closedShard.getHashKeyRange().getEndingHashKey()); - BigInteger minStartingHashKeyOfChildren = null; - BigInteger maxEndingHashKeyOfChildren = null; - - for (String childShardId : childShardIds) { - Shard childShard = shardIdToShardMap.get(childShardId); - BigInteger startingHashKey = new BigInteger(childShard.getHashKeyRange().getStartingHashKey()); - if ((minStartingHashKeyOfChildren == null) - || (startingHashKey.compareTo(minStartingHashKeyOfChildren) < 0)) { - minStartingHashKeyOfChildren = startingHashKey; - } - BigInteger endingHashKey = new BigInteger(childShard.getHashKeyRange().getEndingHashKey()); - if ((maxEndingHashKeyOfChildren == null) - || (endingHashKey.compareTo(maxEndingHashKeyOfChildren) > 0)) { - maxEndingHashKeyOfChildren = endingHashKey; - } - } - - if ((minStartingHashKeyOfChildren == null) || (maxEndingHashKeyOfChildren == null) - || (minStartingHashKeyOfChildren.compareTo(startingHashKeyOfClosedShard) > 0) - || (maxEndingHashKeyOfChildren.compareTo(endingHashKeyOfClosedShard) < 0)) { - throw new KinesisClientLibIOException("Incomplete shard list: hash key range of shard " - + closedShard.getShardId() + " is not covered by its child shards."); - } - - } - - /** - * Helper method to construct shardId->setOfChildShardIds map. - * Note: This has package access for testing purposes only. - * @param shardIdToShardMap - * @return - */ - Map> constructShardIdToChildShardIdsMap( - Map shardIdToShardMap) { - Map> shardIdToChildShardIdsMap = new HashMap<>(); - for (Map.Entry entry : shardIdToShardMap.entrySet()) { - String shardId = entry.getKey(); - Shard shard = entry.getValue(); - String parentShardId = shard.getParentShardId(); - if ((parentShardId != null) && (shardIdToShardMap.containsKey(parentShardId))) { - Set childShardIds = shardIdToChildShardIdsMap.get(parentShardId); - if (childShardIds == null) { - childShardIds = new HashSet(); - shardIdToChildShardIdsMap.put(parentShardId, childShardIds); - } - childShardIds.add(shardId); - } - - String adjacentParentShardId = shard.getAdjacentParentShardId(); - if ((adjacentParentShardId != null) && (shardIdToShardMap.containsKey(adjacentParentShardId))) { - Set childShardIds = shardIdToChildShardIdsMap.get(adjacentParentShardId); - if (childShardIds == null) { - childShardIds = new HashSet(); - shardIdToChildShardIdsMap.put(adjacentParentShardId, childShardIds); - } - childShardIds.add(shardId); - } - } - return shardIdToChildShardIdsMap; - } - - private List getShardList(IKinesisProxy kinesisProxy) throws KinesisClientLibIOException { - List shards = kinesisProxy.getShardList(); - if (shards == null) { - throw new KinesisClientLibIOException( - "Stream is not in ACTIVE OR UPDATING state - will retry getting the shard list."); - } - return shards; - } - - /** - * Determine new leases to create and their initial checkpoint. - * Note: Package level access only for testing purposes. - * - * For each open (no ending sequence number) shard without open parents that doesn't already have a lease, - * determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists): - * If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed. - * If not, set checkpoint of the shard to the initial position specified by the client. - * To check if we need to create leases for ancestors, we use the following rules: - * * If we began (or will begin) processing data for a shard, then we must reach end of that shard before - * we begin processing data from any of its descendants. - * * A shard does not start processing data until data from all its parents has been processed. - * Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create - * leases corresponding to both the parents - the parent shard which is not a descendant will have - * its checkpoint set to Latest. - * - * We assume that if there is an existing lease for a shard, then either: - * * we have previously created a lease for its parent (if it was needed), or - * * the parent shard has expired. - * - * For example: - * Shard structure (each level depicts a stream segment): - * 0 1 2 3 4 5 - shards till epoch 102 - * \ / \ / | | - * 6 7 4 5 - shards from epoch 103 - 205 - * \ / | / \ - * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) - * Current leases: (3, 4, 5) - * New leases to create: (2, 6, 7, 8, 9, 10) - * - * The leases returned are sorted by the starting sequence number - following the same order - * when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail - * before creating all the leases. - * - * If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it - * here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very - * high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only - * currently happen here if ignoreUnexpectedChildShards was true in syncShardleases. - * - * - * @param shards List of all shards in Kinesis (we'll create new leases based on this set) - * @param currentLeases List of current leases - * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that - * location in the shard (when an application starts up for the first time - and there are no checkpoints). - * @param inconsistentShardIds Set of child shard ids having open parents. - * @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard - */ - List determineNewLeasesToCreate(List shards, - List currentLeases, - InitialPositionInStreamExtended initialPosition, - Set inconsistentShardIds) { - Map shardIdToNewLeaseMap = new HashMap(); - Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards); - - Set shardIdsOfCurrentLeases = new HashSet(); - for (KinesisClientLease lease : currentLeases) { - shardIdsOfCurrentLeases.add(lease.getLeaseKey()); - LOG.debug("Existing lease: " + lease); - } - - List openShards = getOpenShards(shards); - Map memoizationContext = new HashMap<>(); - - // Iterate over the open shards and find those that don't have any lease entries. - for (Shard shard : openShards) { - String shardId = shard.getShardId(); - LOG.debug("Evaluating leases for open shard " + shardId + " and its ancestors."); - if (shardIdsOfCurrentLeases.contains(shardId)) { - LOG.debug("Lease for shardId " + shardId + " already exists. Not creating a lease"); - } else if (inconsistentShardIds.contains(shardId)) { - LOG.info("shardId " + shardId + " is an inconsistent child. Not creating a lease"); - } else { - LOG.debug("Need to create a lease for shardId " + shardId); - KinesisClientLease newLease = newKCLLease(shard); - boolean isDescendant = - checkIfDescendantAndAddNewLeasesForAncestors(shardId, - initialPosition, - shardIdsOfCurrentLeases, - shardIdToShardMapOfAllKinesisShards, - shardIdToNewLeaseMap, - memoizationContext); - - /** - * If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the - * checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a - * lease just like we do for TRIM_HORIZON. However we will only return back records with server-side - * timestamp at or after the specified initial position timestamp. - * - * Shard structure (each level depicts a stream segment): - * 0 1 2 3 4 5 - shards till epoch 102 - * \ / \ / | | - * 6 7 4 5 - shards from epoch 103 - 205 - * \ / | /\ - * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) - * - * Current leases: empty set - * - * For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with - * timestamp value 206. We will then create new leases for all the shards (with checkpoint set to - * AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin - * processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases - * would then be deleted since they won't have records with server-side timestamp at/after 206. And - * after that we will begin processing the descendant shards with epoch at/after 206 and we will - * return the records that meet the timestamp requirement for these shards. - */ - if (isDescendant && !initialPosition.getInitialPositionInStream() - .equals(InitialPositionInStream.AT_TIMESTAMP)) { - newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); - } else { - newLease.setCheckpoint(convertToCheckpoint(initialPosition)); - } - LOG.debug("Set checkpoint of " + newLease.getLeaseKey() + " to " + newLease.getCheckpoint()); - shardIdToNewLeaseMap.put(shardId, newLease); - } - } - - List newLeasesToCreate = new ArrayList(); - newLeasesToCreate.addAll(shardIdToNewLeaseMap.values()); - Comparator startingSequenceNumberComparator = - new StartingSequenceNumberAndShardIdBasedComparator(shardIdToShardMapOfAllKinesisShards); - Collections.sort(newLeasesToCreate, startingSequenceNumberComparator); - return newLeasesToCreate; - } - - /** - * Determine new leases to create and their initial checkpoint. - * Note: Package level access only for testing purposes. - */ - List determineNewLeasesToCreate(List shards, - List currentLeases, - InitialPositionInStreamExtended initialPosition) { - Set inconsistentShardIds = new HashSet(); - return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds); - } - - /** - * Note: Package level access for testing purposes only. - * Check if this shard is a descendant of a shard that is (or will be) processed. - * Create leases for the ancestors of this shard as required. - * See javadoc of determineNewLeasesToCreate() for rules and example. - * - * @param shardId The shardId to check. - * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that - * location in the shard (when an application starts up for the first time - and there are no checkpoints). - * @param shardIdsOfCurrentLeases The shardIds for the current leases. - * @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream. - * @param shardIdToLeaseMapOfNewShards Add lease POJOs corresponding to ancestors to this map. - * @param memoizationContext Memoization of shards that have been evaluated as part of the evaluation - * @return true if the shard is a descendant of any current shard (lease already exists) - */ - // CHECKSTYLE:OFF CyclomaticComplexity - boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId, - InitialPositionInStreamExtended initialPosition, - Set shardIdsOfCurrentLeases, - Map shardIdToShardMapOfAllKinesisShards, - Map shardIdToLeaseMapOfNewShards, - Map memoizationContext) { - - Boolean previousValue = memoizationContext.get(shardId); - if (previousValue != null) { - return previousValue; - } - - boolean isDescendant = false; - Shard shard; - Set parentShardIds; - Set descendantParentShardIds = new HashSet(); - - if ((shardId != null) && (shardIdToShardMapOfAllKinesisShards.containsKey(shardId))) { - if (shardIdsOfCurrentLeases.contains(shardId)) { - // This shard is a descendant of a current shard. - isDescendant = true; - // We don't need to add leases of its ancestors, - // because we'd have done it when creating a lease for this shard. - } else { - shard = shardIdToShardMapOfAllKinesisShards.get(shardId); - parentShardIds = getParentShardIds(shard, shardIdToShardMapOfAllKinesisShards); - for (String parentShardId : parentShardIds) { - // Check if the parent is a descendant, and include its ancestors. - if (checkIfDescendantAndAddNewLeasesForAncestors(parentShardId, - initialPosition, - shardIdsOfCurrentLeases, - shardIdToShardMapOfAllKinesisShards, - shardIdToLeaseMapOfNewShards, - memoizationContext)) { - isDescendant = true; - descendantParentShardIds.add(parentShardId); - LOG.debug("Parent shard " + parentShardId + " is a descendant."); - } else { - LOG.debug("Parent shard " + parentShardId + " is NOT a descendant."); - } - } - - // If this is a descendant, create leases for its parent shards (if they don't exist) - if (isDescendant) { - for (String parentShardId : parentShardIds) { - if (!shardIdsOfCurrentLeases.contains(parentShardId)) { - LOG.debug("Need to create a lease for shardId " + parentShardId); - KinesisClientLease lease = shardIdToLeaseMapOfNewShards.get(parentShardId); - if (lease == null) { - lease = newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId)); - shardIdToLeaseMapOfNewShards.put(parentShardId, lease); - } - - if (descendantParentShardIds.contains(parentShardId) - && !initialPosition.getInitialPositionInStream() - .equals(InitialPositionInStream.AT_TIMESTAMP)) { - lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); - } else { - lease.setCheckpoint(convertToCheckpoint(initialPosition)); - } - } - } - } else { - // This shard should be included, if the customer wants to process all records in the stream or - // if the initial position is AT_TIMESTAMP. For AT_TIMESTAMP, we will add a lease just like we do - // for TRIM_HORIZON. However we will only return back records with server-side timestamp at or - // after the specified initial position timestamp. - if (initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON) - || initialPosition.getInitialPositionInStream() - .equals(InitialPositionInStream.AT_TIMESTAMP)) { - isDescendant = true; - } - } - - } - } - - memoizationContext.put(shardId, isDescendant); - return isDescendant; - } - // CHECKSTYLE:ON CyclomaticComplexity - - /** - * Helper method to get parent shardIds of the current shard - includes the parent shardIds if: - * a/ they are not null - * b/ if they exist in the current shard map (i.e. haven't expired) - * - * @param shard Will return parents of this shard - * @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream. - * @return Set of parentShardIds - */ - Set getParentShardIds(Shard shard, Map shardIdToShardMapOfAllKinesisShards) { - Set parentShardIds = new HashSet(2); - String parentShardId = shard.getParentShardId(); - if ((parentShardId != null) && shardIdToShardMapOfAllKinesisShards.containsKey(parentShardId)) { - parentShardIds.add(parentShardId); - } - String adjacentParentShardId = shard.getAdjacentParentShardId(); - if ((adjacentParentShardId != null) && shardIdToShardMapOfAllKinesisShards.containsKey(adjacentParentShardId)) { - parentShardIds.add(adjacentParentShardId); - } - return parentShardIds; - } - - /** - * Delete leases corresponding to shards that no longer exist in the stream. - * Current scheme: Delete a lease if: - * * the corresponding shard is not present in the list of Kinesis shards, AND - * * the parentShardIds listed in the lease are also not present in the list of Kinesis shards. - * @param shards List of all Kinesis shards (assumed to be a consistent snapshot - when stream is in Active state). - * @param trackedLeases List of - * @param kinesisProxy Kinesis proxy (used to get shard list) - * @param leaseManager - * @throws KinesisClientLibIOException Thrown if we couldn't get a fresh shard list from Kinesis. - * @throws ProvisionedThroughputException - * @throws InvalidStateException - * @throws DependencyException - */ - private void cleanupGarbageLeases(List shards, - List trackedLeases, - IKinesisProxy kinesisProxy, - ILeaseManager leaseManager) - throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException { - Set kinesisShards = new HashSet<>(); - for (Shard shard : shards) { - kinesisShards.add(shard.getShardId()); - } - - // Check if there are leases for non-existent shards - List garbageLeases = new ArrayList<>(); - for (KinesisClientLease lease : trackedLeases) { - if (leaseCleanupValidator.isCandidateForCleanup(lease, kinesisShards)) { - garbageLeases.add(lease); - } - } - - if (!garbageLeases.isEmpty()) { - LOG.info("Found " + garbageLeases.size() - + " candidate leases for cleanup. Refreshing list of" - + " Kinesis shards to pick up recent/latest shards"); - List currentShardList = getShardList(kinesisProxy); - Set currentKinesisShardIds = new HashSet<>(); - for (Shard shard : currentShardList) { - currentKinesisShardIds.add(shard.getShardId()); - } - - for (KinesisClientLease lease : garbageLeases) { - if (leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds)) { - LOG.info("Deleting lease for shard " + lease.getLeaseKey() - + " as it is not present in Kinesis stream."); - leaseManager.deleteLease(lease); - } - } - } - - } - - /** - * Private helper method. - * Clean up leases for shards that meet the following criteria: - * a/ the shard has been fully processed (checkpoint is set to SHARD_END) - * b/ we've begun processing all the child shards: we have leases for all child shards and their checkpoint is not - * TRIM_HORIZON. - * - * @param currentLeases List of leases we evaluate for clean up - * @param shardIdToShardMap Map of shardId->Shard (assumed to include all Kinesis shards) - * @param shardIdToChildShardIdsMap Map of shardId->childShardIds (assumed to include all Kinesis shards) - * @param trackedLeases List of all leases we are tracking. - * @param leaseManager Lease manager (will be used to delete leases) - * @throws DependencyException - * @throws InvalidStateException - * @throws ProvisionedThroughputException - * @throws KinesisClientLibIOException - */ - private synchronized void cleanupLeasesOfFinishedShards(Collection currentLeases, - Map shardIdToShardMap, - Map> shardIdToChildShardIdsMap, - List trackedLeases, - ILeaseManager leaseManager) - throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - Set shardIdsOfClosedShards = new HashSet<>(); - List leasesOfClosedShards = new ArrayList<>(); - for (KinesisClientLease lease : currentLeases) { - if (lease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) { - shardIdsOfClosedShards.add(lease.getLeaseKey()); - leasesOfClosedShards.add(lease); - } - } - - if (!leasesOfClosedShards.isEmpty()) { - assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, - shardIdToChildShardIdsMap, - shardIdsOfClosedShards); - Comparator startingSequenceNumberComparator = - new StartingSequenceNumberAndShardIdBasedComparator(shardIdToShardMap); - Collections.sort(leasesOfClosedShards, startingSequenceNumberComparator); - Map trackedLeaseMap = constructShardIdToKCLLeaseMap(trackedLeases); - - for (KinesisClientLease leaseOfClosedShard : leasesOfClosedShards) { - String closedShardId = leaseOfClosedShard.getLeaseKey(); - Set childShardIds = shardIdToChildShardIdsMap.get(closedShardId); - if ((closedShardId != null) && (childShardIds != null) && (!childShardIds.isEmpty())) { - cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); - } - } - } - } - - /** - * Delete lease for the closed shard. Rules for deletion are: - * a/ the checkpoint for the closed shard is SHARD_END, - * b/ there are leases for all the childShardIds and their checkpoint is NOT TRIM_HORIZON - * Note: This method has package level access solely for testing purposes. - * - * @param closedShardId Identifies the closed shard - * @param childShardIds ShardIds of children of the closed shard - * @param trackedLeases shardId->KinesisClientLease map with all leases we are tracking (should not be null) - * @param leaseManager - * @throws ProvisionedThroughputException - * @throws InvalidStateException - * @throws DependencyException - */ - synchronized void cleanupLeaseForClosedShard(String closedShardId, - Set childShardIds, - Map trackedLeases, - ILeaseManager leaseManager) - throws DependencyException, InvalidStateException, ProvisionedThroughputException { - KinesisClientLease leaseForClosedShard = trackedLeases.get(closedShardId); - List childShardLeases = new ArrayList<>(); - - for (String childShardId : childShardIds) { - KinesisClientLease childLease = trackedLeases.get(childShardId); - if (childLease != null) { - childShardLeases.add(childLease); - } - } - - if ((leaseForClosedShard != null) - && (leaseForClosedShard.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) - && (childShardLeases.size() == childShardIds.size())) { - boolean okayToDelete = true; - for (KinesisClientLease lease : childShardLeases) { - if (lease.getCheckpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON)) { - okayToDelete = false; - break; - } - } - - if (okayToDelete) { - LOG.info("Deleting lease for shard " + leaseForClosedShard.getLeaseKey() - + " as it has been completely processed and processing of child shards has begun."); - leaseManager.deleteLease(leaseForClosedShard); - } - } - } - - /** - * Helper method to create a new KinesisClientLease POJO for a shard. - * Note: Package level access only for testing purposes - * - * @param shard - * @return - */ - KinesisClientLease newKCLLease(Shard shard) { - KinesisClientLease newLease = new KinesisClientLease(); - newLease.setLeaseKey(shard.getShardId()); - List parentShardIds = new ArrayList(2); - if (shard.getParentShardId() != null) { - parentShardIds.add(shard.getParentShardId()); - } - if (shard.getAdjacentParentShardId() != null) { - parentShardIds.add(shard.getAdjacentParentShardId()); - } - newLease.setParentShardIds(parentShardIds); - newLease.setOwnerSwitchesSinceCheckpoint(0L); - - return newLease; - } - - /** - * Helper method to construct a shardId->Shard map for the specified list of shards. - * - * @param shards List of shards - * @return ShardId->Shard map - */ - Map constructShardIdToShardMap(List shards) { - Map shardIdToShardMap = new HashMap(); - for (Shard shard : shards) { - shardIdToShardMap.put(shard.getShardId(), shard); - } - return shardIdToShardMap; - } - - /** - * Helper method to return all the open shards for a stream. - * Note: Package level access only for testing purposes. - * - * @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list. - * @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active. - */ - List getOpenShards(List allShards) { - List openShards = new ArrayList(); - for (Shard shard : allShards) { - String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber(); - if (endingSequenceNumber == null) { - openShards.add(shard); - LOG.debug("Found open shard: " + shard.getShardId()); - } - } - return openShards; - } - - private ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended position) { - ExtendedSequenceNumber checkpoint = null; - - if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) { - checkpoint = ExtendedSequenceNumber.TRIM_HORIZON; - } else if (position.getInitialPositionInStream().equals(InitialPositionInStream.LATEST)) { - checkpoint = ExtendedSequenceNumber.LATEST; - } else if (position.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) { - checkpoint = ExtendedSequenceNumber.AT_TIMESTAMP; - } - - return checkpoint; - } - - /** Helper class to compare leases based on starting sequence number of the corresponding shards. - * - */ - private static class StartingSequenceNumberAndShardIdBasedComparator implements Comparator, - Serializable { - - private static final long serialVersionUID = 1L; - - private final Map shardIdToShardMap; - - /** - * @param shardIdToShardMapOfAllKinesisShards - */ - public StartingSequenceNumberAndShardIdBasedComparator(Map shardIdToShardMapOfAllKinesisShards) { - shardIdToShardMap = shardIdToShardMapOfAllKinesisShards; - } - - /** - * Compares two leases based on the starting sequence number of corresponding shards. - * If shards are not found in the shardId->shard map supplied, we do a string comparison on the shardIds. - * We assume that lease1 and lease2 are: - * a/ not null, - * b/ shards (if found) have non-null starting sequence numbers - * - * {@inheritDoc} - */ - @Override - public int compare(KinesisClientLease lease1, KinesisClientLease lease2) { - int result = 0; - String shardId1 = lease1.getLeaseKey(); - String shardId2 = lease2.getLeaseKey(); - Shard shard1 = shardIdToShardMap.get(shardId1); - Shard shard2 = shardIdToShardMap.get(shardId2); - - // If we found shards for the two leases, use comparison of the starting sequence numbers - if ((shard1 != null) && (shard2 != null)) { - BigInteger sequenceNumber1 = - new BigInteger(shard1.getSequenceNumberRange().getStartingSequenceNumber()); - BigInteger sequenceNumber2 = - new BigInteger(shard2.getSequenceNumberRange().getStartingSequenceNumber()); - result = sequenceNumber1.compareTo(sequenceNumber2); - } - - if (result == 0) { - result = shardId1.compareTo(shardId2); - } - - return result; - } - - } - + throws DependencyException, InvalidStateException, ProvisionedThroughputException, + KinesisClientLibIOException; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index 135cfa74..f1e15ae7 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -49,6 +49,7 @@ class ShutdownTask implements ITask { private final long backoffTimeMillis; private final GetRecordsCache getRecordsCache; private final ShardSyncer shardSyncer; + private final ShardSyncStrategy shardSyncStrategy; /** * Constructor. @@ -64,8 +65,7 @@ class ShutdownTask implements ITask { boolean ignoreUnexpectedChildShards, ILeaseManager leaseManager, long backoffTimeMillis, - GetRecordsCache getRecordsCache, - ShardSyncer shardSyncer) { + GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; @@ -78,6 +78,7 @@ class ShutdownTask implements ITask { this.backoffTimeMillis = backoffTimeMillis; this.getRecordsCache = getRecordsCache; this.shardSyncer = shardSyncer; + this.shardSyncStrategy = shardSyncStrategy; } /* @@ -130,11 +131,12 @@ class ShutdownTask implements ITask { if (reason == ShutdownReason.TERMINATE) { LOG.debug("Looking for child shards of shard " + shardInfo.getShardId()); // create leases for the child shards - shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, - leaseManager, - initialPositionInStream, - cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards); + TaskResult result = shardSyncStrategy.onShardConsumerShutDown(); + if (result.getException() != null) { + LOG.debug("Exception while trying to sync shards on the shutdown of shard: " + shardInfo + .getShardId()); + throw result.getException(); + } LOG.debug("Finished checking for child shards of shard " + shardInfo.getShardId()); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index e7edb3c4..64179b3c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; @@ -34,6 +35,11 @@ import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector; +import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator; +import com.amazonaws.services.kinesis.leases.impl.LeaseRenewer; +import com.amazonaws.services.kinesis.leases.impl.LeaseTaker; +import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer; +import com.amazonaws.services.kinesis.leases.interfaces.ILeaseTaker; import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; @@ -82,7 +88,9 @@ public class Worker implements Runnable { private static final Log LOG = LogFactory.getLog(Worker.class); + private static final int SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC = 0; private static final int MAX_INITIALIZATION_ATTEMPTS = 20; + private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; //Default for KCL. private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); private static final LeaseCleanupValidator DEFAULT_LEASE_CLEANUP_VALIDATOR = new KinesisLeaseCleanupValidator(); private static final LeaseSelector DEFAULT_LEASE_SELECTOR = new GenericLeaseSelector(); @@ -136,6 +144,10 @@ public class Worker implements Runnable { private WorkerStateChangeListener workerStateChangeListener; + // Periodic Shard Sync related fields + private LeaderDecider leaderDecider; + private ShardSyncStrategy shardSyncStrategy; + /** * Constructor. * @@ -380,25 +392,10 @@ public class Worker implements Runnable { KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) { this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), - config, - new StreamConfig( - new KinesisProxy(config, kinesisClient), - config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(), - config.shouldCallProcessRecordsEvenForEmptyRecordList(), - config.shouldValidateSequenceNumberBeforeCheckpointing(), - config.getInitialPositionInStreamExtended()), + config, getStreamConfig(config, kinesisClient), config.getInitialPositionInStreamExtended(), config.getParentShardPollIntervalMillis(), config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null, - new KinesisClientLibLeaseCoordinator( - new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient), - DEFAULT_LEASE_SELECTOR, - config.getWorkerIdentifier(), - config.getFailoverTimeMillis(), - config.getEpsilonMillis(), - config.getMaxLeasesForWorker(), - config.getMaxLeasesToStealAtOneTime(), - config.getMaxLeaseRenewalThreads(), - metricsFactory) + getLeaseCoordinator(config, dynamoDBClient, metricsFactory) .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), execService, @@ -409,8 +406,7 @@ public class Worker implements Runnable { config.getShardPrioritizationStrategy(), config.getRetryGetRecordsInSeconds(), config.getMaxGetRecordsThreadPool(), - DEFAULT_WORKER_STATE_CHANGE_LISTENER, - DEFAULT_LEASE_CLEANUP_VALIDATOR ); + DEFAULT_WORKER_STATE_CHANGE_LISTENER, DEFAULT_LEASE_CLEANUP_VALIDATOR, null /* leaderDecider */); // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { @@ -470,7 +466,8 @@ public class Worker implements Runnable { this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, - shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER, DEFAULT_LEASE_CLEANUP_VALIDATOR ); + shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER, + DEFAULT_LEASE_CLEANUP_VALIDATOR, null); } /** @@ -520,7 +517,23 @@ public class Worker implements Runnable { IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener, - LeaseCleanupValidator leaseCleanupValidator) { + LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider) { + this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, + parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, + leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, retryGetRecordsInSeconds, + maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(leaseCleanupValidator), + leaderDecider); + } + + Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, + StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, + long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, + ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, + IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, + Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, + WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; this.config = config; @@ -533,7 +546,7 @@ public class Worker implements Runnable { this.executorService = execService; this.leaseCoordinator = leaseCoordinator; this.metricsFactory = metricsFactory; - this.shardSyncer = new ShardSyncer(leaseCleanupValidator); + this.shardSyncer = shardSyncer; this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), initialPositionInStream, cleanupLeasesUponShardCompletion, config.shouldIgnoreUnexpectedChildShards(), shardSyncIdleTimeMillis, metricsFactory, executorService, shardSyncer); @@ -545,6 +558,34 @@ public class Worker implements Runnable { this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; this.workerStateChangeListener = workerStateChangeListener; workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED); + this.leaderDecider = leaderDecider; + this.shardSyncStrategy = createShardSyncStrategy(config.getShardSyncStrategyType()); + LOG.info(String.format("Shard sync strategy determined as %s.", shardSyncStrategy.getStrategyType().toString())); + } + + private ShardSyncStrategy createShardSyncStrategy(ShardSyncStrategyType strategyType) { + switch (strategyType) { + case PERIODIC: + return createPeriodicShardSyncStrategy(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager()); + case SHARD_END: + default: + return createShardEndShardSyncStrategy(controlServer); + } + } + + private static KinesisClientLibLeaseCoordinator getLeaseCoordinator(KinesisClientLibConfiguration config, + AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory) { + return new KinesisClientLibLeaseCoordinator( + new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient), DEFAULT_LEASE_SELECTOR, + config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(), + config.getMaxLeasesForWorker(), config.getMaxLeasesToStealAtOneTime(), + config.getMaxLeaseRenewalThreads(), metricsFactory); + } + + private static StreamConfig getStreamConfig(KinesisClientLibConfiguration config, AmazonKinesis kinesisClient) { + return new StreamConfig(new KinesisProxy(config, kinesisClient), config.getMaxRecords(), + config.getIdleTimeBetweenReadsInMillis(), config.shouldCallProcessRecordsEvenForEmptyRecordList(), + config.shouldValidateSequenceNumberBeforeCheckpointing(), config.getInitialPositionInStreamExtended()); } /** @@ -601,7 +642,7 @@ public class Worker implements Runnable { } if (foundCompletedShard) { - controlServer.syncShardAndLeaseInfo(null); + shardSyncStrategy.onFoundCompletedShard(); } // clean up shard consumers for unassigned shards @@ -651,6 +692,7 @@ public class Worker implements Runnable { } else { LOG.info("LeaseCoordinator is already running. No need to start it."); } + shardSyncStrategy.onWorkerInitialization(); isDone = true; } else { lastException = result.getException(); @@ -897,7 +939,7 @@ public class Worker implements Runnable { * the worker itself. *
    *
  1. Call to start shutdown invoked
  2. - *
  3. Lease coordinator told to stop taking leases, and to drop existing leases.
  4. + *
  5. Lease coordinator told to onWorkerShutDown taking leases, and to drop existing leases.
  6. *
  7. Worker discovers record processors that no longer have leases.
  8. *
  9. Worker triggers shutdown with state {@link ShutdownReason#ZOMBIE}.
  10. *
  11. Once all record processors are shutdown, worker terminates owned resources.
  12. @@ -919,6 +961,10 @@ public class Worker implements Runnable { // Lost leases will force Worker to begin shutdown process for all shard consumers in // Worker.run(). leaseCoordinator.stop(); + // Stop the periodicShardSyncManager for the worker + if (shardSyncStrategy != null) { + shardSyncStrategy.onWorkerShutDown(); + } workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN); } @@ -1005,9 +1051,7 @@ public class Worker implements Runnable { skipShardSyncAtWorkerInitializationIfLeasesExist, retryGetRecordsInSeconds, maxGetRecordsThreadPool, - config, - shardSyncer); - + config, shardSyncer, shardSyncStrategy); } /** @@ -1116,6 +1160,20 @@ public class Worker implements Runnable { } } + private PeriodicShardSyncStrategy createPeriodicShardSyncStrategy(IKinesisProxy kinesisProxy, + ILeaseManager leaseManager) { + return new PeriodicShardSyncStrategy( + new PeriodicShardSyncManager(config.getWorkerIdentifier(), leaderDecider, + new ShardSyncTask(kinesisProxy, leaseManager, config.getInitialPositionInStreamExtended(), + config.shouldCleanupLeasesUponShardCompletion(), + config.shouldIgnoreUnexpectedChildShards(), SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC, + shardSyncer))); + } + + private ShardEndShardSyncStrategy createShardEndShardSyncStrategy(ShardSyncTaskManager shardSyncTaskManager) { + return new ShardEndShardSyncStrategy(shardSyncTaskManager); + } + /** * Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance or not. * Visible and non-final only for testing. @@ -1172,6 +1230,15 @@ public class Worker implements Runnable { private LeaseCleanupValidator leaseCleanupValidator; @Setter @Accessors(fluent = true) private LeaseSelector leaseSelector; + @Setter @Accessors(fluent = true) + private LeaderDecider leaderDecider; + @Setter @Accessors(fluent = true) + private ILeaseTaker leaseTaker; + @Setter @Accessors(fluent = true) + private ILeaseRenewer leaseRenewer; + @Setter @Accessors(fluent = true) + private ShardSyncer shardSyncer; + @VisibleForTesting AmazonKinesis getKinesisClient() { @@ -1269,15 +1336,19 @@ public class Worker implements Runnable { if (config.getKinesisEndpoint() != null) { setField(kinesisClient, "endpoint", kinesisClient::setEndpoint, config.getKinesisEndpoint()); } + if (metricsFactory == null) { metricsFactory = getMetricsFactory(cloudWatchClient, config); } + if (leaseManager == null) { leaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient); } + if (shardPrioritization == null) { shardPrioritization = new ParentsFirstShardPrioritization(1); } + if (kinesisProxy == null) { kinesisProxy = new KinesisProxy(config, kinesisClient); } @@ -1286,14 +1357,35 @@ public class Worker implements Runnable { workerStateChangeListener = DEFAULT_WORKER_STATE_CHANGE_LISTENER; } - if(leaseCleanupValidator == null) { + if (leaseCleanupValidator == null) { leaseCleanupValidator = DEFAULT_LEASE_CLEANUP_VALIDATOR; } + if (shardSyncer == null) { + shardSyncer = new KinesisShardSyncer(leaseCleanupValidator); + } + if(leaseSelector == null) { leaseSelector = DEFAULT_LEASE_SELECTOR; } + if (leaseTaker == null) { + leaseTaker = new LeaseTaker<>(leaseManager, leaseSelector, config.getWorkerIdentifier(), config.getFailoverTimeMillis()) + .withMaxLeasesForWorker(config.getMaxLeasesForWorker()) + .withMaxLeasesToStealAtOneTime(config.getMaxLeasesToStealAtOneTime()); + } + + // We expect users to either inject both LeaseRenewer and the corresponding thread-pool, or neither of them (DEFAULT). + if (leaseRenewer == null){ + ExecutorService leaseRenewerThreadPool = LeaseCoordinator.getDefaultLeaseRenewalExecutorService(config.getMaxLeaseRenewalThreads()); + leaseRenewer = new LeaseRenewer<>(leaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis(), leaseRenewerThreadPool); + } + + if (leaderDecider == null) { + leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseManager, + Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); + } + return new Worker(config.getApplicationName(), recordProcessorFactory, config, @@ -1308,14 +1400,11 @@ public class Worker implements Runnable { config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null, - new KinesisClientLibLeaseCoordinator(leaseManager, - leaseSelector, - config.getWorkerIdentifier(), + new KinesisClientLibLeaseCoordinator(leaseManager, leaseTaker, leaseRenewer, config.getFailoverTimeMillis(), config.getEpsilonMillis(), config.getMaxLeasesForWorker(), config.getMaxLeasesToStealAtOneTime(), - config.getMaxLeaseRenewalThreads(), metricsFactory) .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), @@ -1327,8 +1416,7 @@ public class Worker implements Runnable { shardPrioritization, config.getRetryGetRecordsInSeconds(), config.getMaxGetRecordsThreadPool(), - workerStateChangeListener, - leaseCleanupValidator); + workerStateChangeListener, shardSyncer, leaderDecider); } > R createClient(final T builder, diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java index 089ddc3a..c64acf99 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java @@ -82,7 +82,6 @@ public class LeaseCoordinator { protected final IMetricsFactory metricsFactory; private ScheduledExecutorService leaseCoordinatorThreadPool; - private final ExecutorService leaseRenewalThreadpool; private volatile boolean running = false; private ScheduledFuture takerFuture; @@ -187,43 +186,67 @@ public class LeaseCoordinator { } /** - * Constructor. * * @param leaseManager LeaseManager instance to use - * @param leaseSelector LeaseSelector instance to use + * @param leaseSelector LeaseSelector instance to be used for filtering leases during LeaseTaker execution. * @param workerIdentifier Identifies the worker (e.g. useful to track lease ownership) * @param leaseDurationMillis Duration of a lease * @param epsilonMillis Allow for some variance when calculating lease expirations * @param maxLeasesForWorker Max leases this Worker can handle at a time * @param maxLeasesToStealAtOneTime Steal up to these many leases at a time (for load balancing) + * @param maxLeaseRenewerThreadCount Number of threads to use for lease renewal calls + * @param metricsFactory Used to publish metrics about lease operations + */ + public LeaseCoordinator(ILeaseManager leaseManager, + LeaseSelector leaseSelector, + String workerIdentifier, + long leaseDurationMillis, + long epsilonMillis, + int maxLeasesForWorker, + int maxLeasesToStealAtOneTime, + int maxLeaseRenewerThreadCount, + IMetricsFactory metricsFactory) { + this(new LeaseTaker<>(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis) + .withMaxLeasesForWorker(maxLeasesForWorker) + .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime), + new LeaseRenewer<>(leaseManager, workerIdentifier, leaseDurationMillis, getDefaultLeaseRenewalExecutorService(maxLeaseRenewerThreadCount)), + leaseDurationMillis, + epsilonMillis, + maxLeasesForWorker, + maxLeasesToStealAtOneTime, + metricsFactory); + } + + /** + * + * @param leaseTaker LeaseTaker instance to be used. + * @param leaseRenewer LeaseRenewer instance to be used. + * @param leaseDurationMillis Duration of a lease + * @param epsilonMillis Allow for some variance when calculating lease expirations + * @param maxLeasesForWorker Max leases this Worker can handle at a time + * @param maxLeasesToStealAtOneTime Steal up to these many leases at a time (for load balancing) * @param metricsFactory Used to publish metrics about lease operations */ - public LeaseCoordinator(ILeaseManager leaseManager, - LeaseSelector leaseSelector, - String workerIdentifier, - long leaseDurationMillis, - long epsilonMillis, - int maxLeasesForWorker, - int maxLeasesToStealAtOneTime, - int maxLeaseRenewerThreadCount, - IMetricsFactory metricsFactory) { - this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount); - this.leaseTaker = new LeaseTaker(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis) - .withMaxLeasesForWorker(maxLeasesForWorker) - .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime); - this.leaseRenewer = new LeaseRenewer( - leaseManager, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool); + public LeaseCoordinator(final ILeaseTaker leaseTaker, + final ILeaseRenewer leaseRenewer, + final long leaseDurationMillis, + final long epsilonMillis, + final int maxLeasesForWorker, + final int maxLeasesToStealAtOneTime, + final IMetricsFactory metricsFactory) { + this.leaseTaker = leaseTaker; + this.leaseRenewer = leaseRenewer; + this.metricsFactory = metricsFactory; this.renewerIntervalMillis = leaseDurationMillis / 3 - epsilonMillis; this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2; - this.metricsFactory = metricsFactory; LOG.info(String.format( "With failover time %d ms and epsilon %d ms, LeaseCoordinator will renew leases every %d ms, take" + "leases every %d ms, process maximum of %d leases and steal %d lease(s) at a time.", leaseDurationMillis, epsilonMillis, - renewerIntervalMillis, - takerIntervalMillis, + this.renewerIntervalMillis, + this.takerIntervalMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime)); } @@ -380,7 +403,7 @@ public class LeaseCoordinator { LOG.debug("Threadpool was null, no need to shutdown/terminate threadpool."); } - leaseRenewalThreadpool.shutdownNow(); + leaseRenewer.shutdown(); synchronized (shutdownLock) { leaseRenewer.clearCurrentlyHeldLeases(); running = false; @@ -437,10 +460,10 @@ public class LeaseCoordinator { * @param maximumPoolSize Maximum allowed thread pool size * @return Executor service that should be used for lease renewal. */ - private static ExecutorService getLeaseRenewalExecutorService(int maximumPoolSize) { + public static ExecutorService getDefaultLeaseRenewalExecutorService(int maximumPoolSize) { int coreLeaseCount = Math.max(maximumPoolSize / 4, 2); return new ThreadPoolExecutor(coreLeaseCount, maximumPoolSize, 60, TimeUnit.SECONDS, - new LinkedTransferQueue(), LEASE_RENEWAL_THREAD_FACTORY); + new LinkedTransferQueue<>(), LEASE_RENEWAL_THREAD_FACTORY); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java index dd951755..eb3cf1ac 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java @@ -404,6 +404,10 @@ public class LeaseRenewer implements ILeaseRenewer { addLeasesToRenew(myLeases); } + public void shutdown() { + executorService.shutdownNow(); + } + private void verifyNotNull(Object object, String message) { if (object == null) { throw new IllegalArgumentException(message); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseRenewer.java b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseRenewer.java index f66841ee..8d348e21 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseRenewer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseRenewer.java @@ -36,7 +36,7 @@ public interface ILeaseRenewer { * @throws InvalidStateException if lease table doesn't exist * @throws ProvisionedThroughputException if DynamoDB reads fail due to insufficient capacity */ - public void initialize() throws DependencyException, InvalidStateException, ProvisionedThroughputException; + void initialize() throws DependencyException, InvalidStateException, ProvisionedThroughputException; /** * Attempt to renew all currently held leases. @@ -44,21 +44,21 @@ public interface ILeaseRenewer { * @throws DependencyException on unexpected DynamoDB failures * @throws InvalidStateException if lease table does not exist */ - public void renewLeases() throws DependencyException, InvalidStateException; + void renewLeases() throws DependencyException, InvalidStateException; /** * @return currently held leases. Key is shardId, value is corresponding Lease object. A lease is currently held if * we successfully renewed it on the last run of renewLeases(). Lease objects returned are deep copies - * their lease counters will not tick. */ - public Map getCurrentlyHeldLeases(); + Map getCurrentlyHeldLeases(); /** * @param leaseKey key of the lease to retrieve * * @return a deep copy of a currently held lease, or null if we don't hold the lease */ - public T getCurrentlyHeldLease(String leaseKey); + T getCurrentlyHeldLease(String leaseKey); /** * Adds leases to this LeaseRenewer's set of currently held leases. Leases must have lastRenewalNanos set to the @@ -66,12 +66,12 @@ public interface ILeaseRenewer { * * @param newLeases new leases. */ - public void addLeasesToRenew(Collection newLeases); + void addLeasesToRenew(Collection newLeases); /** * Clears this LeaseRenewer's set of currently held leases. */ - public void clearCurrentlyHeldLeases(); + void clearCurrentlyHeldLeases(); /** * Stops the lease renewer from continunig to maintain the given lease. @@ -97,4 +97,11 @@ public interface ILeaseRenewer { boolean updateLease(T lease, UUID concurrencyToken) throws DependencyException, InvalidStateException, ProvisionedThroughputException; + /** + * Shutdown any clients and thread-pools. + */ + default void shutdown() { + // Does nothing. + } + } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DeterministicShuffleShardSyncLeaderDeciderTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DeterministicShuffleShardSyncLeaderDeciderTest.java new file mode 100644 index 00000000..25c36817 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DeterministicShuffleShardSyncLeaderDeciderTest.java @@ -0,0 +1,125 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.DeterministicShuffleShardSyncLeaderDecider.DETERMINISTIC_SHUFFLE_SEED; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; + +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class DeterministicShuffleShardSyncLeaderDeciderTest { + private static final String LEASE_KEY = "lease_key"; + private static final String LEASE_OWNER = "lease_owner"; + private static final String WORKER_ID = "worker-id"; + + private DeterministicShuffleShardSyncLeaderDecider leaderDecider; + + @Mock + private ILeaseManager leaseManager; + + @Mock + private ScheduledExecutorService scheduledExecutorService; + + private int numShardSyncWorkers; + + @Before + public void setup() { + numShardSyncWorkers = 1; + leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseManager, scheduledExecutorService, numShardSyncWorkers); + } + + @Test + public void testLeaderElectionWithNullLeases() { + boolean isLeader = leaderDecider.isLeader(WORKER_ID); + assertTrue("IsLeader should return true if leaders is null", isLeader); + } + + @Test + public void testLeaderElectionWithEmptyLeases() throws Exception{ + when(leaseManager.listLeases()).thenReturn(new ArrayList<>()); + boolean isLeader = leaderDecider.isLeader(WORKER_ID); + assertTrue("IsLeader should return true if no leases are returned", isLeader); + } + + @Test + public void testElectedLeadersAsPerExpectedShufflingOrder() throws Exception { + List leases = getLeases(5, false /* duplicateLeaseOwner */, true /* activeLeases */); + when(leaseManager.listLeases()).thenReturn(leases); + Set expectedLeaders = getExpectedLeaders(leases); + for (String leader : expectedLeaders) { + assertTrue(leaderDecider.isLeader(leader)); + } + for (KinesisClientLease lease : leases) { + if (!expectedLeaders.contains(lease.getLeaseOwner())) { + assertFalse(leaderDecider.isLeader(lease.getLeaseOwner())); + } + } + } + + @Test + public void testElectedLeadersAsPerExpectedShufflingOrderWhenUniqueWorkersLessThanMaxLeaders() { + this.numShardSyncWorkers = 5; // More than number of unique lease owners + leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseManager, scheduledExecutorService, numShardSyncWorkers); + List leases = getLeases(3, false /* duplicateLeaseOwner */, true /* activeLeases */); + Set expectedLeaders = getExpectedLeaders(leases); + // All lease owners should be present in expected leaders set, and they should all be leaders. + for (KinesisClientLease lease : leases) { + assertTrue(leaderDecider.isLeader(lease.getLeaseOwner())); + assertTrue(expectedLeaders.contains(lease.getLeaseOwner())); + } + } + + private List getLeases(int count, boolean duplicateLeaseOwner, boolean activeLeases) { + List leases = new ArrayList<>(); + for (int i=0;i getExpectedLeaders(List leases) { + List uniqueHosts = leases.stream().filter(lease -> lease.getLeaseOwner() != null) + .map(KinesisClientLease::getLeaseOwner).distinct().sorted().collect(Collectors.toList()); + + Collections.shuffle(uniqueHosts, new Random(DETERMINISTIC_SHUFFLE_SEED)); + int numWorkers = Math.min(uniqueHosts.size(), this.numShardSyncWorkers); + return new HashSet<>(uniqueHosts.subList(0, numWorkers)); + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 783d72ab..ff7aef75 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -97,7 +97,7 @@ public class ShardConsumerTest { private final boolean skipCheckpointValidationValue = false; private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST); - private static final ShardSyncer shardSyncer = new ShardSyncer(new KinesisLeaseCleanupValidator()); + private static final ShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator()); // Use Executors.newFixedThreadPool since it returns ThreadPoolExecutor, which is // ... a non-final public class, and so can be mocked and spied. @@ -120,6 +120,8 @@ public class ShardConsumerTest { private ICheckpoint checkpoint; @Mock private ShutdownNotification shutdownNotification; + @Mock + ShardSyncStrategy shardSyncStrategy; @Before public void setup() { @@ -163,8 +165,8 @@ public class ShardConsumerTest { taskBackoffTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, config, - shardSyncer); - + shardSyncer, + shardSyncStrategy); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize Thread.sleep(50L); @@ -212,7 +214,8 @@ public class ShardConsumerTest { taskBackoffTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, config, - shardSyncer); + shardSyncer, + shardSyncStrategy); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize @@ -255,7 +258,8 @@ public class ShardConsumerTest { taskBackoffTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, config, - shardSyncer); + shardSyncer, + shardSyncStrategy); final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null; @@ -375,7 +379,8 @@ public class ShardConsumerTest { Optional.empty(), Optional.empty(), config, - shardSyncer); + shardSyncer, + shardSyncStrategy); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards @@ -520,7 +525,10 @@ public class ShardConsumerTest { Optional.empty(), Optional.empty(), config, - shardSyncer); + shardSyncer, + shardSyncStrategy); + + when(shardSyncStrategy.onShardConsumerShutDown()).thenReturn(new TaskResult(null)); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards @@ -658,7 +666,8 @@ public class ShardConsumerTest { Optional.empty(), Optional.empty(), config, - shardSyncer); + shardSyncer, + shardSyncStrategy); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards @@ -729,7 +738,8 @@ public class ShardConsumerTest { taskBackoffTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, config, - shardSyncer); + shardSyncer, + shardSyncStrategy); GetRecordsCache getRecordsCache = spy(consumer.getGetRecordsCache()); @@ -783,7 +793,8 @@ public class ShardConsumerTest { Optional.empty(), Optional.empty(), config, - shardSyncer); + shardSyncer, + shardSyncStrategy); assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(), SynchronousGetRecordsRetrievalStrategy.class); @@ -814,7 +825,8 @@ public class ShardConsumerTest { Optional.of(1), Optional.of(2), config, - shardSyncer); + shardSyncer, + shardSyncStrategy); assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(), AsynchronousGetRecordsRetrievalStrategy.class); @@ -854,8 +866,8 @@ public class ShardConsumerTest { taskBackoffTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, config, - shardSyncer); - + shardSyncer, + shardSyncStrategy); shardConsumer.consumeShard(); Thread.sleep(sleepTime); @@ -906,4 +918,4 @@ public class ShardConsumerTest { } }; } -} +} \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java index a4db819f..26a47079 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java @@ -52,7 +52,7 @@ public class ShardSyncTaskIntegrationTest { private static AWSCredentialsProvider credentialsProvider; private IKinesisClientLeaseManager leaseManager; private IKinesisProxy kinesisProxy; - private final ShardSyncer shardSyncer = new ShardSyncer(new KinesisLeaseCleanupValidator()); + private final KinesisShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator()); /** * @throws java.lang.Exception diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java index 6325dc52..9c156f58 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java @@ -59,7 +59,7 @@ import junit.framework.Assert; */ // CHECKSTYLE:IGNORE JavaNCSS FOR NEXT 800 LINES public class ShardSyncerTest { - private static final Log LOG = LogFactory.getLog(ShardSyncer.class); + private static final Log LOG = LogFactory.getLog(KinesisShardSyncer.class); private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST); private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = @@ -71,7 +71,7 @@ public class ShardSyncerTest { LeaseManager leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient); private static final int EXPONENT = 128; protected static final KinesisLeaseCleanupValidator leaseCleanupValidator = new KinesisLeaseCleanupValidator(); - private static final ShardSyncer shardSyncer = new ShardSyncer(leaseCleanupValidator); + private static final KinesisShardSyncer shardSyncer = new KinesisShardSyncer(leaseCleanupValidator); /** * Old/Obsolete max value of a sequence number (2^128 -1). */ @@ -398,7 +398,7 @@ public class ShardSyncerTest { for (int c = 1; c <= maxCallingCount; c = c + 2) { testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( ExceptionThrowingLeaseManagerMethods.DELETELEASE, c, INITIAL_POSITION_TRIM_HORIZON); - // Need to clean up lease manager every time after calling ShardSyncer + // Need to clean up lease manager every time after calling KinesisShardSyncer leaseManager.deleteAll(); } } @@ -420,7 +420,7 @@ public class ShardSyncerTest { for (int c = 1; c <= maxCallingCount; c = c + 2) { testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( ExceptionThrowingLeaseManagerMethods.LISTLEASES, c, INITIAL_POSITION_TRIM_HORIZON); - // Need to clean up lease manager every time after calling ShardSyncer + // Need to clean up lease manager every time after calling KinesisShardSyncer leaseManager.deleteAll(); } } @@ -442,7 +442,7 @@ public class ShardSyncerTest { for (int c = 1; c <= maxCallingCount; c = c + 2) { testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS, c,INITIAL_POSITION_TRIM_HORIZON); - // Need to clean up lease manager every time after calling ShardSyncer + // Need to clean up lease manager every time after calling KinesisShardSyncer leaseManager.deleteAll(); } } @@ -472,7 +472,7 @@ public class ShardSyncerTest { } catch (LeasingException e) { LOG.debug("Catch leasing exception", e); } - // Clear throwing exception scenario every time after calling ShardSyncer + // Clear throwing exception scenario every time after calling KinesisShardSyncer exceptionThrowingLeaseManager.clearLeaseManagerThrowingExceptionScenario(); } } else { @@ -517,7 +517,7 @@ public class ShardSyncerTest { testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( ExceptionThrowingLeaseManagerMethods.DELETELEASE, c, INITIAL_POSITION_AT_TIMESTAMP); - // Need to clean up lease manager every time after calling ShardSyncer + // Need to clean up lease manager every time after calling KinesisShardSyncer leaseManager.deleteAll(); } } @@ -540,7 +540,7 @@ public class ShardSyncerTest { testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( ExceptionThrowingLeaseManagerMethods.LISTLEASES, c, INITIAL_POSITION_AT_TIMESTAMP); - // Need to clean up lease manager every time after calling ShardSyncer + // Need to clean up lease manager every time after calling KinesisShardSyncer leaseManager.deleteAll(); } } @@ -563,7 +563,7 @@ public class ShardSyncerTest { testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS, c, INITIAL_POSITION_AT_TIMESTAMP); - // Need to clean up lease manager every time after calling ShardSyncer + // Need to clean up lease manager every time after calling KinesisShardSyncer leaseManager.deleteAll(); } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 36f9eed0..029a1efe 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -21,7 +21,9 @@ import static org.mockito.Mockito.when; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ExecutorService; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -57,10 +59,13 @@ public class ShutdownTaskTest { defaultParentShardIds, ExtendedSequenceNumber.LATEST); IRecordProcessor defaultRecordProcessor = new TestStreamlet(); - ShardSyncer shardSyncer = new ShardSyncer(new KinesisLeaseCleanupValidator()); + ShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator()); + @Mock private GetRecordsCache getRecordsCache; + @Mock + private ShardSyncStrategy shardSyncStrategy; /** * @throws java.lang.Exception @@ -113,7 +118,8 @@ public class ShutdownTaskTest { leaseManager, TASK_BACKOFF_TIME_MILLIS, getRecordsCache, - shardSyncer); + shardSyncer, + shardSyncStrategy); TaskResult result = task.call(); Assert.assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof IllegalArgumentException); @@ -131,6 +137,7 @@ public class ShutdownTaskTest { ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; + when(shardSyncStrategy.onShardConsumerShutDown()).thenReturn(new TaskResult(new KinesisClientLibIOException(""))); ShutdownTask task = new ShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, @@ -142,8 +149,10 @@ public class ShutdownTaskTest { leaseManager, TASK_BACKOFF_TIME_MILLIS, getRecordsCache, - shardSyncer); + shardSyncer, + shardSyncStrategy); TaskResult result = task.call(); + verify(shardSyncStrategy).onShardConsumerShutDown(); Assert.assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException); verify(getRecordsCache).shutdown(); @@ -154,8 +163,8 @@ public class ShutdownTaskTest { */ @Test public final void testGetTaskType() { - ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, null, 0, getRecordsCache, shardSyncer); + ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, null, 0, getRecordsCache, shardSyncer, shardSyncStrategy); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); } -} +} \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 4b377cbb..23c91269 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; @@ -52,6 +53,7 @@ import java.util.HashSet; import java.util.List; import java.util.ListIterator; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; @@ -161,7 +163,7 @@ public class WorkerTest { private RecordsFetcherFactory recordsFetcherFactory; private KinesisClientLibConfiguration config; - private ShardSyncer shardSyncer = new ShardSyncer(new KinesisLeaseCleanupValidator()); + private KinesisShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator()); @Mock private KinesisClientLibLeaseCoordinator leaseCoordinator; @@ -189,6 +191,8 @@ public class WorkerTest { private TaskResult taskResult; @Mock private WorkerStateChangeListener workerStateChangeListener; + @Mock + private ShardSyncStrategy shardSyncStrategy; @Before public void setup() { @@ -521,7 +525,7 @@ public class WorkerTest { final int threadPoolSize = 2; final int numberOfRecordsPerShard = 10; List shardList = createShardListWithOneSplit(); - List initialLeases = new ArrayList(); + List initialLeases = new ArrayList<>(); KinesisClientLease lease = shardSyncer.newKCLLease(shardList.get(0)); lease.setCheckpoint(new ExtendedSequenceNumber("2")); initialLeases.add(lease); @@ -2308,4 +2312,4 @@ public class WorkerTest { return worker; } } -} +} \ No newline at end of file