Introducing support for periodic shard sync (#579)

* Changes to support injection of ShardSyncer, LeaseTaker, and LeaseRenewer into KCL Worker

* Additional checks around injection of LeaseRenewer and LeaseRenewerThreadPool

* Changed accessor on InitialPositionInStreamExtended to public to allow ShardSyncer injection

* Changed ShardSyncer to a public interface. Renamed implementation to KinesisShardSyncer.

* Removed wild card imports introduced in previous commit

* Minor refactoring in Worker Builder

* Added license info to ShardSyncer interface. Minor refactoring

* Changes to chain constructor in LeaseCoordinator

* Changed accessor on InitialPositionInStreamExtended factory methods. Minor changes in Worker builder.

* Changes to support periodic shard sync

* Patching changes left out in merge

* Overriding shard-sync idle time to 0 for periodic shard-sync

* Addressed PR feedback

* Addresed PR #579 review comments

* Modified constructor for DeterministicShuffleShardSyncLeaderDecider

* Addressed PR comments

* Fixed failing test

* Removed redundant member varible
This commit is contained in:
Parijat Sinha 2019-08-07 15:37:20 -07:00 committed by Sahil Palvia
parent 5e4888f431
commit 7a1d3031c5
26 changed files with 1735 additions and 977 deletions

View file

@ -530,8 +530,7 @@ class ConsumerStates {
consumer.isIgnoreUnexpectedChildShards(),
consumer.getLeaseManager(),
consumer.getTaskBackoffTimeMillis(),
consumer.getGetRecordsCache(),
consumer.getShardSyncer());
consumer.getGetRecordsCache(), consumer.getShardSyncer(), consumer.getShardSyncStrategy());
}
@Override

View file

@ -0,0 +1,153 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.util.CollectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* An implementation of the {@code LeaderDecider} to elect leader(s) based on workerId.
* Leases are shuffled using a predetermined constant seed so that lease ordering is
* preserved across workers.
* This reduces the probability of choosing the leader workers co-located on the same
* host in case workerId starts with a common string (e.g. IP Address).
* Hence if a host has 3 workers, IPADDRESS_Worker1, IPADDRESS_Worker2, and IPADDRESS_Worker3,
* we don't end up choosing all 3 for shard sync as a result of natural ordering of Strings.
* This ensures redundancy for shard-sync during host failures.
*/
class DeterministicShuffleShardSyncLeaderDecider implements LeaderDecider {
// Fixed seed so that the shuffle order is preserved across workers
static final int DETERMINISTIC_SHUFFLE_SEED = 1947;
private static final Log LOG = LogFactory.getLog(DeterministicShuffleShardSyncLeaderDecider.class);
private static final long ELECTION_INITIAL_DELAY_MILLIS = 60 * 1000;
private static final long ELECTION_SCHEDULING_INTERVAL_MILLIS = 5 * 60 * 1000;
private static final int AWAIT_TERMINATION_MILLIS = 5000;
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final ILeaseManager<KinesisClientLease> leaseManager;
private final int numPeriodicShardSyncWorkers;
private final ScheduledExecutorService leaderElectionThreadPool;
private volatile Set<String> leaders;
/**
*
* @param leaseManager LeaseManager instance used to fetch leases.
* @param leaderElectionThreadPool Thread-pool to be used for leaderElection.
* @param numPeriodicShardSyncWorkers Number of leaders that will be elected to perform periodic shard syncs.
*/
DeterministicShuffleShardSyncLeaderDecider(ILeaseManager<KinesisClientLease> leaseManager, ScheduledExecutorService leaderElectionThreadPool,
int numPeriodicShardSyncWorkers) {
this.leaseManager = leaseManager;
this.leaderElectionThreadPool = leaderElectionThreadPool;
this.numPeriodicShardSyncWorkers = numPeriodicShardSyncWorkers;
}
/*
* Shuffles the leases deterministically and elects numPeriodicShardSyncWorkers number of workers
* as leaders (workers that will perform shard sync).
*/
private void electLeaders() {
try {
LOG.debug("Started leader election at: " + Instant.now());
List<KinesisClientLease> leases = leaseManager.listLeases();
List<String> uniqueHosts = leases.stream().map(KinesisClientLease::getLeaseOwner)
.filter(owner -> owner != null).distinct().sorted().collect(Collectors.toList());
Collections.shuffle(uniqueHosts, new Random(DETERMINISTIC_SHUFFLE_SEED));
int numShardSyncWorkers = Math.min(uniqueHosts.size(), numPeriodicShardSyncWorkers);
// In case value is currently being read, we wait for reading to complete before updating the variable.
// This is to prevent any ConcurrentModificationException exceptions.
readWriteLock.writeLock().lock();
leaders = new HashSet<>(uniqueHosts.subList(0, numShardSyncWorkers));
LOG.info("Elected leaders: " + String.join(", ", leaders));
LOG.debug("Completed leader election at: " + System.currentTimeMillis());
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
LOG.error("Exception occurred while trying to fetch all leases for leader election", e);
} catch (Throwable t) {
LOG.error("Unknown exception during leader election.", t);
} finally {
readWriteLock.writeLock().unlock();
}
}
private boolean isWorkerLeaderForShardSync(String workerId) {
return CollectionUtils.isNullOrEmpty(leaders) ||
(!CollectionUtils.isNullOrEmpty(leaders) && leaders.contains(workerId));
}
@Override
public synchronized Boolean isLeader(String workerId) {
// if no leaders yet, synchronously get leaders. This will happen at first Shard Sync.
if (executeConditionCheckWithReadLock(() -> CollectionUtils.isNullOrEmpty(leaders))) {
electLeaders();
// start a scheduled executor that will periodically update leaders.
// The first run will be after a minute.
// We don't need jitter since it is scheduled with a fixed delay and time taken to scan leases
// will be different at different times and on different hosts/workers.
leaderElectionThreadPool.scheduleWithFixedDelay(this::electLeaders, ELECTION_INITIAL_DELAY_MILLIS,
ELECTION_SCHEDULING_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
}
return executeConditionCheckWithReadLock(() -> isWorkerLeaderForShardSync(workerId));
}
@Override
public synchronized void shutdown() {
try {
leaderElectionThreadPool.shutdown();
if (leaderElectionThreadPool.awaitTermination(AWAIT_TERMINATION_MILLIS, TimeUnit.MILLISECONDS)) {
LOG.info("Successfully stopped leader election on the worker");
} else {
leaderElectionThreadPool.shutdownNow();
LOG.info(String.format("Stopped leader election thread after awaiting termination for %d milliseconds",
AWAIT_TERMINATION_MILLIS));
}
} catch (InterruptedException e) {
LOG.debug("Encountered InterruptedException while awaiting leader election threadPool termination");
}
}
// Execute condition checks using shared variables under a read-write lock.
private boolean executeConditionCheckWithReadLock(BooleanSupplier action) {
try {
readWriteLock.readLock().lock();
return action.getAsBoolean();
} finally {
readWriteLock.readLock().unlock();
}
}
}

View file

@ -20,7 +20,7 @@ import java.util.Date;
* Class that houses the entities needed to specify the position in the stream from where a new application should
* start.
*/
class InitialPositionInStreamExtended {
public class InitialPositionInStreamExtended {
private final InitialPositionInStream position;
private final Date timestamp;
@ -44,7 +44,7 @@ class InitialPositionInStreamExtended {
*
* @return The initial position in stream.
*/
protected InitialPositionInStream getInitialPositionInStream() {
public InitialPositionInStream getInitialPositionInStream() {
return this.position;
}
@ -54,11 +54,11 @@ class InitialPositionInStreamExtended {
*
* @return The timestamp from where we need to start the application.
*/
protected Date getTimestamp() {
public Date getTimestamp() {
return this.timestamp;
}
protected static InitialPositionInStreamExtended newInitialPosition(final InitialPositionInStream position) {
public static InitialPositionInStreamExtended newInitialPosition(final InitialPositionInStream position) {
switch (position) {
case LATEST:
return new InitialPositionInStreamExtended(InitialPositionInStream.LATEST, null);
@ -69,7 +69,7 @@ class InitialPositionInStreamExtended {
}
}
protected static InitialPositionInStreamExtended newInitialPositionAtTimestamp(final Date timestamp) {
public static InitialPositionInStreamExtended newInitialPositionAtTimestamp(final Date timestamp) {
if (timestamp == null) {
throw new IllegalArgumentException("Timestamp must be specified for InitialPosition AT_TIMESTAMP");
}

View file

@ -22,7 +22,6 @@ import org.apache.commons.lang3.Validate;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
@ -167,6 +166,11 @@ public class KinesisClientLibConfiguration {
*/
public static final boolean DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false;
/**
* Default ShardSyncStrategy to be used for discovering new shards in the Stream.
*/
public static final ShardSyncStrategyType DEFAULT_SHARD_SYNC_STRATEGY_TYPE = ShardSyncStrategyType.SHARD_END;
/**
* Default Shard prioritization strategy.
*/
@ -230,6 +234,7 @@ public class KinesisClientLibConfiguration {
private boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
private ShardPrioritization shardPrioritization;
private long shutdownGraceMillis;
private ShardSyncStrategyType shardSyncStrategyType;
@Getter
private Optional<Integer> timeoutInSeconds = Optional.empty();
@ -492,6 +497,7 @@ public class KinesisClientLibConfiguration {
this.initialPositionInStreamExtended =
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
this.shardSyncStrategyType = DEFAULT_SHARD_SYNC_STRATEGY_TYPE;
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
this.recordsFetcherFactory = new SimpleRecordsFetcherFactory();
}
@ -600,6 +606,7 @@ public class KinesisClientLibConfiguration {
this.initialPositionInStreamExtended =
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
this.shardSyncStrategyType = DEFAULT_SHARD_SYNC_STRATEGY_TYPE;
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
this.recordsFetcherFactory = recordsFetcherFactory;
this.shutdownGraceMillis = shutdownGraceMillis;
@ -840,6 +847,13 @@ public class KinesisClientLibConfiguration {
return skipShardSyncAtWorkerInitializationIfLeasesExist;
}
/**
* @return ShardSyncStrategyType to be used by KCL to process the Stream.
*/
public ShardSyncStrategyType getShardSyncStrategyType() {
return shardSyncStrategyType;
}
/**
* @return Max leases this Worker can handle at a time
*/
@ -1199,6 +1213,15 @@ public class KinesisClientLibConfiguration {
return this;
}
/**
* @param shardSyncStrategyType ShardSyncStrategy type for KCL.
* @return {@link KinesisClientLibConfiguration}
*/
public KinesisClientLibConfiguration withShardSyncStrategyType(ShardSyncStrategyType shardSyncStrategyType) {
this.shardSyncStrategyType = shardSyncStrategyType;
return this;
}
/**
*
* @param regionName The region name for the service

View file

@ -21,8 +21,15 @@ import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator;
import com.amazonaws.services.kinesis.leases.impl.LeaseRenewer;
import com.amazonaws.services.kinesis.leases.impl.LeaseTaker;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseTaker;
import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -38,8 +45,6 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
@ -66,11 +71,8 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
* @param epsilonMillis Delta for timing operations (e.g. checking lease expiry)
* @param leaseSelector Lease selector which decides which leases to take
*/
public KinesisClientLibLeaseCoordinator(ILeaseManager<KinesisClientLease> leaseManager,
String workerIdentifier,
long leaseDurationMillis,
long epsilonMillis,
LeaseSelector<KinesisClientLease> leaseSelector) {
public KinesisClientLibLeaseCoordinator(ILeaseManager<KinesisClientLease> leaseManager, String workerIdentifier,
long leaseDurationMillis, long epsilonMillis, LeaseSelector<KinesisClientLease> leaseSelector) {
super(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis, epsilonMillis);
this.leaseManager = leaseManager;
}
@ -81,10 +83,8 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
* @param leaseDurationMillis Duration of a lease in milliseconds
* @param epsilonMillis Delta for timing operations (e.g. checking lease expiry)
*/
public KinesisClientLibLeaseCoordinator(ILeaseManager<KinesisClientLease> leaseManager,
String workerIdentifier,
long leaseDurationMillis,
long epsilonMillis) {
public KinesisClientLibLeaseCoordinator(ILeaseManager<KinesisClientLease> leaseManager, String workerIdentifier,
long leaseDurationMillis, long epsilonMillis) {
this(leaseManager, workerIdentifier, leaseDurationMillis, epsilonMillis, DEFAULT_LEASE_SELECTOR);
}
@ -97,11 +97,8 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
* @param metricsFactory Metrics factory used to emit metrics
*/
public KinesisClientLibLeaseCoordinator(ILeaseManager<KinesisClientLease> leaseManager,
LeaseSelector<KinesisClientLease> leaseSelector,
String workerIdentifier,
long leaseDurationMillis,
long epsilonMillis,
IMetricsFactory metricsFactory) {
LeaseSelector<KinesisClientLease> leaseSelector, String workerIdentifier, long leaseDurationMillis,
long epsilonMillis, IMetricsFactory metricsFactory) {
super(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis, epsilonMillis, metricsFactory);
this.leaseManager = leaseManager;
}
@ -117,19 +114,22 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
* @param metricsFactory Metrics factory used to emit metrics
*/
public KinesisClientLibLeaseCoordinator(ILeaseManager<KinesisClientLease> leaseManager,
LeaseSelector<KinesisClientLease> leaseSelector,
String workerIdentifier,
long leaseDurationMillis,
long epsilonMillis,
int maxLeasesForWorker,
int maxLeasesToStealAtOneTime,
int maxLeaseRenewerThreadCount,
LeaseSelector<KinesisClientLease> leaseSelector, String workerIdentifier, long leaseDurationMillis,
long epsilonMillis, int maxLeasesForWorker, int maxLeasesToStealAtOneTime, int maxLeaseRenewerThreadCount,
IMetricsFactory metricsFactory) {
super(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, metricsFactory);
this.leaseManager = leaseManager;
}
public KinesisClientLibLeaseCoordinator(ILeaseManager<KinesisClientLease> leaseManager,
ILeaseTaker<KinesisClientLease> leaseTaker, ILeaseRenewer<KinesisClientLease> leaseRenewer,
final long leaseDurationMillis, final long epsilonMillis, final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime, final IMetricsFactory metricsFactory) {
super(leaseTaker, leaseRenewer, leaseDurationMillis, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, metricsFactory);
this.leaseManager = leaseManager;
}
/**
* @param readCapacity The DynamoDB table used for tracking leases will be provisioned with the specified initial
* read capacity
@ -175,8 +175,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
if (lease == null) {
LOG.info(String.format(
"Worker %s could not update checkpoint for shard %s because it does not hold the lease",
getWorkerIdentifier(),
shardId));
getWorkerIdentifier(), shardId));
return false;
}
@ -242,8 +241,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
if (lease == null) {
LOG.info(String.format(
"Worker %s could not prepare checkpoint for shard %s because it does not hold the lease",
getWorkerIdentifier(),
shardId));
getWorkerIdentifier(), shardId));
return false;
}
@ -256,12 +254,11 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
* {@inheritDoc}
*/
@Override
public void prepareCheckpoint(String shardId,
ExtendedSequenceNumber pendingCheckpointValue,
public void prepareCheckpoint(String shardId, ExtendedSequenceNumber pendingCheckpointValue,
String concurrencyToken) throws KinesisClientLibException {
try {
boolean wasSuccessful =
prepareCheckpoint(shardId, pendingCheckpointValue, UUID.fromString(concurrencyToken));
boolean wasSuccessful = prepareCheckpoint(shardId, pendingCheckpointValue,
UUID.fromString(concurrencyToken));
if (!wasSuccessful) {
throw new ShutdownException(
"Can't prepare checkpoint - instance doesn't hold the lease for this shard");
@ -328,8 +325,8 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
* @throws ProvisionedThroughputException
*/
void initialize() throws ProvisionedThroughputException, DependencyException, IllegalStateException {
final boolean newTableCreated =
leaseManager.createLeaseTableIfNotExists(initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity);
final boolean newTableCreated = leaseManager
.createLeaseTableIfNotExists(initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity);
if (newTableCreated) {
LOG.info(String.format(
"Created new lease table for coordinator with initial read capacity of %d and write capacity of %d.",

View file

@ -0,0 +1,812 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.lang3.StringUtils;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.amazonaws.services.kinesis.model.Shard;
/**
* Helper class to sync leases with shards of the Kinesis stream.
* It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding).
* It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it
* and begun processing it's child shards.
*/
class KinesisShardSyncer implements ShardSyncer {
private static final Log LOG = LogFactory.getLog(KinesisShardSyncer.class);
private final LeaseCleanupValidator leaseCleanupValidator;
public KinesisShardSyncer(final LeaseCleanupValidator leaseCleanupValidator) {
this.leaseCleanupValidator = leaseCleanupValidator;
}
synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy, ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards);
}
/**
* Check and create leases for any new shards (e.g. following a reshard operation).
*
* @param kinesisProxy
* @param leaseManager
* @param initialPositionInStream
* @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
public synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager, InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards);
}
/**
* Sync leases with Kinesis shards (e.g. at startup, or when we reach end of a shard).
*
* @param kinesisProxy
* @param leaseManager
* @param initialPosition
* @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
// CHECKSTYLE:OFF CyclomaticComplexity
private synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager, InitialPositionInStreamExtended initialPosition,
boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
List<Shard> shards = getShardList(kinesisProxy);
LOG.debug("Num shards: " + shards.size());
Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(shards);
Map<String, Set<String>> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(shardIdToShardMap);
Set<String> inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap);
if (!ignoreUnexpectedChildShards) {
assertAllParentShardsAreClosed(inconsistentShardIds);
}
List<KinesisClientLease> currentLeases = leaseManager.listLeases();
List<KinesisClientLease> newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition,
inconsistentShardIds);
LOG.debug("Num new leases to create: " + newLeasesToCreate.size());
for (KinesisClientLease lease : newLeasesToCreate) {
long startTimeMillis = System.currentTimeMillis();
boolean success = false;
try {
leaseManager.createLeaseIfNotExists(lease);
success = true;
} finally {
MetricsHelper.addSuccessAndLatency("CreateLease", startTimeMillis, success, MetricsLevel.DETAILED);
}
}
List<KinesisClientLease> trackedLeases = new ArrayList<>();
if (currentLeases != null) {
trackedLeases.addAll(currentLeases);
}
trackedLeases.addAll(newLeasesToCreate);
cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager);
if (cleanupLeasesOfCompletedShards) {
cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases,
leaseManager);
}
}
// CHECKSTYLE:ON CyclomaticComplexity
/** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls
* and a reshard operation.
* @param inconsistentShardIds
* @throws KinesisClientLibIOException
*/
private void assertAllParentShardsAreClosed(Set<String> inconsistentShardIds) throws KinesisClientLibIOException {
if (!inconsistentShardIds.isEmpty()) {
String ids = StringUtils.join(inconsistentShardIds, ' ');
throw new KinesisClientLibIOException(String.format("%d open child shards (%s) are inconsistent. "
+ "This can happen due to a race condition between describeStream and a reshard operation.",
inconsistentShardIds.size(), ids));
}
}
/**
* Helper method to construct the list of inconsistent shards, which are open shards with non-closed ancestor
* parent(s).
* @param shardIdToChildShardIdsMap
* @param shardIdToShardMap
* @return Set of inconsistent open shard ids for shards having open parents.
*/
private Set<String> findInconsistentShardIds(Map<String, Set<String>> shardIdToChildShardIdsMap,
Map<String, Shard> shardIdToShardMap) {
Set<String> result = new HashSet<String>();
for (String parentShardId : shardIdToChildShardIdsMap.keySet()) {
Shard parentShard = shardIdToShardMap.get(parentShardId);
if ((parentShardId == null) || (parentShard.getSequenceNumberRange().getEndingSequenceNumber() == null)) {
Set<String> childShardIdsMap = shardIdToChildShardIdsMap.get(parentShardId);
result.addAll(childShardIdsMap);
}
}
return result;
}
/**
* Helper method to create a shardId->KinesisClientLease map.
* Note: This has package level access for testing purposes only.
* @param trackedLeaseList
* @return
*/
Map<String, KinesisClientLease> constructShardIdToKCLLeaseMap(List<KinesisClientLease> trackedLeaseList) {
Map<String, KinesisClientLease> trackedLeasesMap = new HashMap<>();
for (KinesisClientLease lease : trackedLeaseList) {
trackedLeasesMap.put(lease.getLeaseKey(), lease);
}
return trackedLeasesMap;
}
/**
* Note: this has package level access for testing purposes.
* Useful for asserting that we don't have an incomplete shard list following a reshard operation.
* We verify that if the shard is present in the shard list, it is closed and its hash key range
* is covered by its child shards.
*/
synchronized void assertClosedShardsAreCoveredOrAbsent(Map<String, Shard> shardIdToShardMap,
Map<String, Set<String>> shardIdToChildShardIdsMap, Set<String> shardIdsOfClosedShards)
throws KinesisClientLibIOException {
String exceptionMessageSuffix = "This can happen if we constructed the list of shards "
+ " while a reshard operation was in progress.";
for (String shardId : shardIdsOfClosedShards) {
Shard shard = shardIdToShardMap.get(shardId);
if (shard == null) {
LOG.info("Shard " + shardId + " is not present in Kinesis anymore.");
continue;
}
String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber();
if (endingSequenceNumber == null) {
throw new KinesisClientLibIOException("Shard " + shardIdsOfClosedShards + " is not closed. "
+ exceptionMessageSuffix);
}
Set<String> childShardIds = shardIdToChildShardIdsMap.get(shardId);
if (childShardIds == null) {
throw new KinesisClientLibIOException("Incomplete shard list: Closed shard " + shardId
+ " has no children." + exceptionMessageSuffix);
}
assertHashRangeOfClosedShardIsCovered(shard, shardIdToShardMap, childShardIds);
}
}
private synchronized void assertHashRangeOfClosedShardIsCovered(Shard closedShard,
Map<String, Shard> shardIdToShardMap, Set<String> childShardIds) throws KinesisClientLibIOException {
BigInteger startingHashKeyOfClosedShard = new BigInteger(closedShard.getHashKeyRange().getStartingHashKey());
BigInteger endingHashKeyOfClosedShard = new BigInteger(closedShard.getHashKeyRange().getEndingHashKey());
BigInteger minStartingHashKeyOfChildren = null;
BigInteger maxEndingHashKeyOfChildren = null;
for (String childShardId : childShardIds) {
Shard childShard = shardIdToShardMap.get(childShardId);
BigInteger startingHashKey = new BigInteger(childShard.getHashKeyRange().getStartingHashKey());
if ((minStartingHashKeyOfChildren == null) || (startingHashKey.compareTo(minStartingHashKeyOfChildren)
< 0)) {
minStartingHashKeyOfChildren = startingHashKey;
}
BigInteger endingHashKey = new BigInteger(childShard.getHashKeyRange().getEndingHashKey());
if ((maxEndingHashKeyOfChildren == null) || (endingHashKey.compareTo(maxEndingHashKeyOfChildren) > 0)) {
maxEndingHashKeyOfChildren = endingHashKey;
}
}
if ((minStartingHashKeyOfChildren == null) || (maxEndingHashKeyOfChildren == null) || (
minStartingHashKeyOfChildren.compareTo(startingHashKeyOfClosedShard) > 0) || (
maxEndingHashKeyOfChildren.compareTo(endingHashKeyOfClosedShard) < 0)) {
throw new KinesisClientLibIOException("Incomplete shard list: hash key range of shard " + closedShard
.getShardId() + " is not covered by its child shards.");
}
}
/**
* Helper method to construct shardId->setOfChildShardIds map.
* Note: This has package access for testing purposes only.
* @param shardIdToShardMap
* @return
*/
Map<String, Set<String>> constructShardIdToChildShardIdsMap(Map<String, Shard> shardIdToShardMap) {
Map<String, Set<String>> shardIdToChildShardIdsMap = new HashMap<>();
for (Map.Entry<String, Shard> entry : shardIdToShardMap.entrySet()) {
String shardId = entry.getKey();
Shard shard = entry.getValue();
String parentShardId = shard.getParentShardId();
if ((parentShardId != null) && (shardIdToShardMap.containsKey(parentShardId))) {
Set<String> childShardIds = shardIdToChildShardIdsMap.get(parentShardId);
if (childShardIds == null) {
childShardIds = new HashSet<String>();
shardIdToChildShardIdsMap.put(parentShardId, childShardIds);
}
childShardIds.add(shardId);
}
String adjacentParentShardId = shard.getAdjacentParentShardId();
if ((adjacentParentShardId != null) && (shardIdToShardMap.containsKey(adjacentParentShardId))) {
Set<String> childShardIds = shardIdToChildShardIdsMap.get(adjacentParentShardId);
if (childShardIds == null) {
childShardIds = new HashSet<String>();
shardIdToChildShardIdsMap.put(adjacentParentShardId, childShardIds);
}
childShardIds.add(shardId);
}
}
return shardIdToChildShardIdsMap;
}
private List<Shard> getShardList(IKinesisProxy kinesisProxy) throws KinesisClientLibIOException {
List<Shard> shards = kinesisProxy.getShardList();
if (shards == null) {
throw new KinesisClientLibIOException(
"Stream is not in ACTIVE OR UPDATING state - will retry getting the shard list.");
}
return shards;
}
/**
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
*
* For each open (no ending sequence number) shard without open parents that doesn't already have a lease,
* determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists):
* If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed.
* If not, set checkpoint of the shard to the initial position specified by the client.
* To check if we need to create leases for ancestors, we use the following rules:
* * If we began (or will begin) processing data for a shard, then we must reach end of that shard before
* we begin processing data from any of its descendants.
* * A shard does not start processing data until data from all its parents has been processed.
* Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create
* leases corresponding to both the parents - the parent shard which is not a descendant will have
* its checkpoint set to Latest.
*
* We assume that if there is an existing lease for a shard, then either:
* * we have previously created a lease for its parent (if it was needed), or
* * the parent shard has expired.
*
* For example:
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | / \
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Current leases: (3, 4, 5)
* New leases to create: (2, 6, 7, 8, 9, 10)
*
* The leases returned are sorted by the starting sequence number - following the same order
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
* before creating all the leases.
*
* If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it
* here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very
* high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only
* currently happen here if ignoreUnexpectedChildShards was true in syncShardleases.
*
*
* @param shards List of all shards in Kinesis (we'll create new leases based on this set)
* @param currentLeases List of current leases
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
* @param inconsistentShardIds Set of child shard ids having open parents.
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards, List<KinesisClientLease> currentLeases,
InitialPositionInStreamExtended initialPosition, Set<String> inconsistentShardIds) {
Map<String, KinesisClientLease> shardIdToNewLeaseMap = new HashMap<String, KinesisClientLease>();
Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
Set<String> shardIdsOfCurrentLeases = new HashSet<String>();
for (KinesisClientLease lease : currentLeases) {
shardIdsOfCurrentLeases.add(lease.getLeaseKey());
LOG.debug("Existing lease: " + lease);
}
List<Shard> openShards = getOpenShards(shards);
Map<String, Boolean> memoizationContext = new HashMap<>();
// Iterate over the open shards and find those that don't have any lease entries.
for (Shard shard : openShards) {
String shardId = shard.getShardId();
LOG.debug("Evaluating leases for open shard " + shardId + " and its ancestors.");
if (shardIdsOfCurrentLeases.contains(shardId)) {
LOG.debug("Lease for shardId " + shardId + " already exists. Not creating a lease");
} else if (inconsistentShardIds.contains(shardId)) {
LOG.info("shardId " + shardId + " is an inconsistent child. Not creating a lease");
} else {
LOG.debug("Need to create a lease for shardId " + shardId);
KinesisClientLease newLease = newKCLLease(shard);
boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition,
shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap,
memoizationContext);
/**
* If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
* checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a
* lease just like we do for TRIM_HORIZON. However we will only return back records with server-side
* timestamp at or after the specified initial position timestamp.
*
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
*
* Current leases: empty set
*
* For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
* timestamp value 206. We will then create new leases for all the shards (with checkpoint set to
* AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin
* processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases
* would then be deleted since they won't have records with server-side timestamp at/after 206. And
* after that we will begin processing the descendant shards with epoch at/after 206 and we will
* return the records that meet the timestamp requirement for these shards.
*/
if (isDescendant && !initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
newLease.setCheckpoint(convertToCheckpoint(initialPosition));
}
LOG.debug("Set checkpoint of " + newLease.getLeaseKey() + " to " + newLease.getCheckpoint());
shardIdToNewLeaseMap.put(shardId, newLease);
}
}
List<KinesisClientLease> newLeasesToCreate = new ArrayList<KinesisClientLease>();
newLeasesToCreate.addAll(shardIdToNewLeaseMap.values());
Comparator<? super KinesisClientLease> startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator(
shardIdToShardMapOfAllKinesisShards);
Collections.sort(newLeasesToCreate, startingSequenceNumberComparator);
return newLeasesToCreate;
}
/**
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
*/
List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards, List<KinesisClientLease> currentLeases,
InitialPositionInStreamExtended initialPosition) {
Set<String> inconsistentShardIds = new HashSet<String>();
return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds);
}
/**
* Note: Package level access for testing purposes only.
* Check if this shard is a descendant of a shard that is (or will be) processed.
* Create leases for the ancestors of this shard as required.
* See javadoc of determineNewLeasesToCreate() for rules and example.
*
* @param shardId The shardId to check.
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
* @param shardIdsOfCurrentLeases The shardIds for the current leases.
* @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream.
* @param shardIdToLeaseMapOfNewShards Add lease POJOs corresponding to ancestors to this map.
* @param memoizationContext Memoization of shards that have been evaluated as part of the evaluation
* @return true if the shard is a descendant of any current shard (lease already exists)
*/
// CHECKSTYLE:OFF CyclomaticComplexity
boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId,
InitialPositionInStreamExtended initialPosition, Set<String> shardIdsOfCurrentLeases,
Map<String, Shard> shardIdToShardMapOfAllKinesisShards,
Map<String, KinesisClientLease> shardIdToLeaseMapOfNewShards, Map<String, Boolean> memoizationContext) {
Boolean previousValue = memoizationContext.get(shardId);
if (previousValue != null) {
return previousValue;
}
boolean isDescendant = false;
Shard shard;
Set<String> parentShardIds;
Set<String> descendantParentShardIds = new HashSet<String>();
if ((shardId != null) && (shardIdToShardMapOfAllKinesisShards.containsKey(shardId))) {
if (shardIdsOfCurrentLeases.contains(shardId)) {
// This shard is a descendant of a current shard.
isDescendant = true;
// We don't need to add leases of its ancestors,
// because we'd have done it when creating a lease for this shard.
} else {
shard = shardIdToShardMapOfAllKinesisShards.get(shardId);
parentShardIds = getParentShardIds(shard, shardIdToShardMapOfAllKinesisShards);
for (String parentShardId : parentShardIds) {
// Check if the parent is a descendant, and include its ancestors.
if (checkIfDescendantAndAddNewLeasesForAncestors(parentShardId, initialPosition,
shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToLeaseMapOfNewShards,
memoizationContext)) {
isDescendant = true;
descendantParentShardIds.add(parentShardId);
LOG.debug("Parent shard " + parentShardId + " is a descendant.");
} else {
LOG.debug("Parent shard " + parentShardId + " is NOT a descendant.");
}
}
// If this is a descendant, create leases for its parent shards (if they don't exist)
if (isDescendant) {
for (String parentShardId : parentShardIds) {
if (!shardIdsOfCurrentLeases.contains(parentShardId)) {
LOG.debug("Need to create a lease for shardId " + parentShardId);
KinesisClientLease lease = shardIdToLeaseMapOfNewShards.get(parentShardId);
if (lease == null) {
lease = newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId));
shardIdToLeaseMapOfNewShards.put(parentShardId, lease);
}
if (descendantParentShardIds.contains(parentShardId) && !initialPosition
.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
lease.setCheckpoint(convertToCheckpoint(initialPosition));
}
}
}
} else {
// This shard should be included, if the customer wants to process all records in the stream or
// if the initial position is AT_TIMESTAMP. For AT_TIMESTAMP, we will add a lease just like we do
// for TRIM_HORIZON. However we will only return back records with server-side timestamp at or
// after the specified initial position timestamp.
if (initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)
|| initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
isDescendant = true;
}
}
}
}
memoizationContext.put(shardId, isDescendant);
return isDescendant;
}
// CHECKSTYLE:ON CyclomaticComplexity
/**
* Helper method to get parent shardIds of the current shard - includes the parent shardIds if:
* a/ they are not null
* b/ if they exist in the current shard map (i.e. haven't expired)
*
* @param shard Will return parents of this shard
* @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream.
* @return Set of parentShardIds
*/
Set<String> getParentShardIds(Shard shard, Map<String, Shard> shardIdToShardMapOfAllKinesisShards) {
Set<String> parentShardIds = new HashSet<String>(2);
String parentShardId = shard.getParentShardId();
if ((parentShardId != null) && shardIdToShardMapOfAllKinesisShards.containsKey(parentShardId)) {
parentShardIds.add(parentShardId);
}
String adjacentParentShardId = shard.getAdjacentParentShardId();
if ((adjacentParentShardId != null) && shardIdToShardMapOfAllKinesisShards.containsKey(adjacentParentShardId)) {
parentShardIds.add(adjacentParentShardId);
}
return parentShardIds;
}
/**
* Delete leases corresponding to shards that no longer exist in the stream.
* Current scheme: Delete a lease if:
* * the corresponding shard is not present in the list of Kinesis shards, AND
* * the parentShardIds listed in the lease are also not present in the list of Kinesis shards.
* @param shards List of all Kinesis shards (assumed to be a consistent snapshot - when stream is in Active state).
* @param trackedLeases List of
* @param kinesisProxy Kinesis proxy (used to get shard list)
* @param leaseManager
* @throws KinesisClientLibIOException Thrown if we couldn't get a fresh shard list from Kinesis.
* @throws ProvisionedThroughputException
* @throws InvalidStateException
* @throws DependencyException
*/
private void cleanupGarbageLeases(List<Shard> shards, List<KinesisClientLease> trackedLeases,
IKinesisProxy kinesisProxy, ILeaseManager<KinesisClientLease> leaseManager)
throws KinesisClientLibIOException, DependencyException, InvalidStateException,
ProvisionedThroughputException {
Set<String> kinesisShards = new HashSet<>();
for (Shard shard : shards) {
kinesisShards.add(shard.getShardId());
}
// Check if there are leases for non-existent shards
List<KinesisClientLease> garbageLeases = new ArrayList<>();
for (KinesisClientLease lease : trackedLeases) {
if (leaseCleanupValidator.isCandidateForCleanup(lease, kinesisShards)) {
garbageLeases.add(lease);
}
}
if (!garbageLeases.isEmpty()) {
LOG.info("Found " + garbageLeases.size() + " candidate leases for cleanup. Refreshing list of"
+ " Kinesis shards to pick up recent/latest shards");
List<Shard> currentShardList = getShardList(kinesisProxy);
Set<String> currentKinesisShardIds = new HashSet<>();
for (Shard shard : currentShardList) {
currentKinesisShardIds.add(shard.getShardId());
}
for (KinesisClientLease lease : garbageLeases) {
if (leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds)) {
LOG.info("Deleting lease for shard " + lease.getLeaseKey()
+ " as it is not present in Kinesis stream.");
leaseManager.deleteLease(lease);
}
}
}
}
/**
* Private helper method.
* Clean up leases for shards that meet the following criteria:
* a/ the shard has been fully processed (checkpoint is set to SHARD_END)
* b/ we've begun processing all the child shards: we have leases for all child shards and their checkpoint is not
* TRIM_HORIZON.
*
* @param currentLeases List of leases we evaluate for clean up
* @param shardIdToShardMap Map of shardId->Shard (assumed to include all Kinesis shards)
* @param shardIdToChildShardIdsMap Map of shardId->childShardIds (assumed to include all Kinesis shards)
* @param trackedLeases List of all leases we are tracking.
* @param leaseManager Lease manager (will be used to delete leases)
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
private synchronized void cleanupLeasesOfFinishedShards(Collection<KinesisClientLease> currentLeases,
Map<String, Shard> shardIdToShardMap, Map<String, Set<String>> shardIdToChildShardIdsMap,
List<KinesisClientLease> trackedLeases, ILeaseManager<KinesisClientLease> leaseManager)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
Set<String> shardIdsOfClosedShards = new HashSet<>();
List<KinesisClientLease> leasesOfClosedShards = new ArrayList<>();
for (KinesisClientLease lease : currentLeases) {
if (lease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
shardIdsOfClosedShards.add(lease.getLeaseKey());
leasesOfClosedShards.add(lease);
}
}
if (!leasesOfClosedShards.isEmpty()) {
assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, shardIdsOfClosedShards);
Comparator<? super KinesisClientLease> startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator(
shardIdToShardMap);
Collections.sort(leasesOfClosedShards, startingSequenceNumberComparator);
Map<String, KinesisClientLease> trackedLeaseMap = constructShardIdToKCLLeaseMap(trackedLeases);
for (KinesisClientLease leaseOfClosedShard : leasesOfClosedShards) {
String closedShardId = leaseOfClosedShard.getLeaseKey();
Set<String> childShardIds = shardIdToChildShardIdsMap.get(closedShardId);
if ((closedShardId != null) && (childShardIds != null) && (!childShardIds.isEmpty())) {
cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
}
}
}
}
/**
* Delete lease for the closed shard. Rules for deletion are:
* a/ the checkpoint for the closed shard is SHARD_END,
* b/ there are leases for all the childShardIds and their checkpoint is NOT TRIM_HORIZON
* Note: This method has package level access solely for testing purposes.
*
* @param closedShardId Identifies the closed shard
* @param childShardIds ShardIds of children of the closed shard
* @param trackedLeases shardId->KinesisClientLease map with all leases we are tracking (should not be null)
* @param leaseManager
* @throws ProvisionedThroughputException
* @throws InvalidStateException
* @throws DependencyException
*/
synchronized void cleanupLeaseForClosedShard(String closedShardId, Set<String> childShardIds,
Map<String, KinesisClientLease> trackedLeases, ILeaseManager<KinesisClientLease> leaseManager)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
KinesisClientLease leaseForClosedShard = trackedLeases.get(closedShardId);
List<KinesisClientLease> childShardLeases = new ArrayList<>();
for (String childShardId : childShardIds) {
KinesisClientLease childLease = trackedLeases.get(childShardId);
if (childLease != null) {
childShardLeases.add(childLease);
}
}
if ((leaseForClosedShard != null) && (leaseForClosedShard.getCheckpoint()
.equals(ExtendedSequenceNumber.SHARD_END)) && (childShardLeases.size() == childShardIds.size())) {
boolean okayToDelete = true;
for (KinesisClientLease lease : childShardLeases) {
if (lease.getCheckpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON)) {
okayToDelete = false;
break;
}
}
if (okayToDelete) {
LOG.info("Deleting lease for shard " + leaseForClosedShard.getLeaseKey()
+ " as it has been completely processed and processing of child shards has begun.");
leaseManager.deleteLease(leaseForClosedShard);
}
}
}
/**
* Helper method to create a new KinesisClientLease POJO for a shard.
* Note: Package level access only for testing purposes
*
* @param shard
* @return
*/
KinesisClientLease newKCLLease(Shard shard) {
KinesisClientLease newLease = new KinesisClientLease();
newLease.setLeaseKey(shard.getShardId());
List<String> parentShardIds = new ArrayList<String>(2);
if (shard.getParentShardId() != null) {
parentShardIds.add(shard.getParentShardId());
}
if (shard.getAdjacentParentShardId() != null) {
parentShardIds.add(shard.getAdjacentParentShardId());
}
newLease.setParentShardIds(parentShardIds);
newLease.setOwnerSwitchesSinceCheckpoint(0L);
return newLease;
}
/**
* Helper method to construct a shardId->Shard map for the specified list of shards.
*
* @param shards List of shards
* @return ShardId->Shard map
*/
Map<String, Shard> constructShardIdToShardMap(List<Shard> shards) {
Map<String, Shard> shardIdToShardMap = new HashMap<String, Shard>();
for (Shard shard : shards) {
shardIdToShardMap.put(shard.getShardId(), shard);
}
return shardIdToShardMap;
}
/**
* Helper method to return all the open shards for a stream.
* Note: Package level access only for testing purposes.
*
* @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list.
* @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active.
*/
List<Shard> getOpenShards(List<Shard> allShards) {
List<Shard> openShards = new ArrayList<Shard>();
for (Shard shard : allShards) {
String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber();
if (endingSequenceNumber == null) {
openShards.add(shard);
LOG.debug("Found open shard: " + shard.getShardId());
}
}
return openShards;
}
private ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended position) {
ExtendedSequenceNumber checkpoint = null;
if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) {
checkpoint = ExtendedSequenceNumber.TRIM_HORIZON;
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.LATEST)) {
checkpoint = ExtendedSequenceNumber.LATEST;
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
checkpoint = ExtendedSequenceNumber.AT_TIMESTAMP;
}
return checkpoint;
}
/** Helper class to compare leases based on starting sequence number of the corresponding shards.
*
*/
private static class StartingSequenceNumberAndShardIdBasedComparator implements Comparator<KinesisClientLease>,
Serializable {
private static final long serialVersionUID = 1L;
private final Map<String, Shard> shardIdToShardMap;
/**
* @param shardIdToShardMapOfAllKinesisShards
*/
public StartingSequenceNumberAndShardIdBasedComparator(Map<String, Shard> shardIdToShardMapOfAllKinesisShards) {
shardIdToShardMap = shardIdToShardMapOfAllKinesisShards;
}
/**
* Compares two leases based on the starting sequence number of corresponding shards.
* If shards are not found in the shardId->shard map supplied, we do a string comparison on the shardIds.
* We assume that lease1 and lease2 are:
* a/ not null,
* b/ shards (if found) have non-null starting sequence numbers
*
* {@inheritDoc}
*/
@Override
public int compare(KinesisClientLease lease1, KinesisClientLease lease2) {
int result = 0;
String shardId1 = lease1.getLeaseKey();
String shardId2 = lease2.getLeaseKey();
Shard shard1 = shardIdToShardMap.get(shardId1);
Shard shard2 = shardIdToShardMap.get(shardId2);
// If we found shards for the two leases, use comparison of the starting sequence numbers
if ((shard1 != null) && (shard2 != null)) {
BigInteger sequenceNumber1 = new BigInteger(
shard1.getSequenceNumberRange().getStartingSequenceNumber());
BigInteger sequenceNumber2 = new BigInteger(
shard2.getSequenceNumberRange().getStartingSequenceNumber());
result = sequenceNumber1.compareTo(sequenceNumber2);
}
if (result == 0) {
result = shardId1.compareTo(shardId2);
}
return result;
}
}
}

View file

@ -0,0 +1,39 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
/**
* Used in conjunction with periodic shard sync.
* Implement this interface to allow KCL to decide if the current worker should execute shard sync.
* When periodic shard sync is enabled, PeriodicShardSyncManager periodically checks if the current
* worker is one of the leaders designated to execute shard-sync and then acts accordingly.
*/
public interface LeaderDecider {
/**
* Method invoked to check the given workerId corresponds to one of the workers
* designated to execute shard-syncs periodically.
*
* @param workerId ID of the worker
* @return True if the worker with ID workerId can execute shard-sync. False otherwise.
*/
Boolean isLeader(String workerId);
/**
* Can be invoked, if needed, to shutdown any clients/thread-pools
* being used in the LeaderDecider implementation.
*/
void shutdown();
}

View file

@ -0,0 +1,90 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.apache.commons.lang3.Validate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* The top level orchestrator for coordinating the periodic shard sync related
* activities.
*/
@Getter
@EqualsAndHashCode
class PeriodicShardSyncManager {
private static final Log LOG = LogFactory.getLog(PeriodicShardSyncManager.class);
private static final long INITIAL_DELAY = 0;
private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 1000;
private final String workerId;
private final LeaderDecider leaderDecider;
private final ShardSyncTask shardSyncTask;
private final ScheduledExecutorService shardSyncThreadPool;
private boolean isRunning;
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask) {
this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor());
}
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, ShardSyncTask shardSyncTask, ScheduledExecutorService shardSyncThreadPool) {
Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager.");
Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager.");
Validate.notNull(shardSyncTask, "ShardSyncTask is required to initialize PeriodicShardSyncManager.");
this.workerId = workerId;
this.leaderDecider = leaderDecider;
this.shardSyncTask = shardSyncTask;
this.shardSyncThreadPool = shardSyncThreadPool;
}
public synchronized TaskResult start() {
if (!isRunning) {
shardSyncThreadPool
.scheduleWithFixedDelay(this::runShardSync, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS,
TimeUnit.MILLISECONDS);
isRunning = true;
}
return new TaskResult(null);
}
public void stop() {
if (isRunning) {
LOG.info(String.format("Shutting down leader decider on worker %s", workerId));
leaderDecider.shutdown();
LOG.info(String.format("Shutting down periodic shard sync task scheduler on worker %s", workerId));
shardSyncThreadPool.shutdown();
isRunning = false;
}
}
private void runShardSync() {
try {
if (leaderDecider.isLeader(workerId)) {
LOG.debug(String.format("WorkerId %s is a leader, running the shard sync task", workerId));
shardSyncTask.call();
} else {
LOG.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId));
}
} catch (Throwable t) {
LOG.error("Error during runShardSync.", t);
}
}
}

View file

@ -0,0 +1,43 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
/**
* An implementation of ShardSyncStrategy.
*/
class PeriodicShardSyncStrategy implements ShardSyncStrategy {
private PeriodicShardSyncManager periodicShardSyncManager;
PeriodicShardSyncStrategy(PeriodicShardSyncManager periodicShardSyncManager) {
this.periodicShardSyncManager = periodicShardSyncManager;
}
@Override
public ShardSyncStrategyType getStrategyType() {
return ShardSyncStrategyType.PERIODIC;
}
@Override
public TaskResult syncShards() {
return periodicShardSyncManager.start();
}
@Override
public TaskResult onWorkerInitialization() {
return syncShards();
}
@Override
public TaskResult onFoundCompletedShard() {
return new TaskResult(null);
}
@Override
public TaskResult onShardConsumerShutDown() {
return new TaskResult(null);
}
@Override
public void onWorkerShutDown() {
periodicShardSyncManager.stop();
}
}

View file

@ -64,6 +64,7 @@ class ShardConsumer {
private ITask currentTask;
private long currentTaskSubmitTime;
private Future<TaskResult> future;
private ShardSyncStrategy shardSyncStrategy;
@Getter
private final GetRecordsCache getRecordsCache;
@ -116,8 +117,7 @@ class ShardConsumer {
IMetricsFactory metricsFactory,
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
KinesisClientLibConfiguration config,
ShardSyncer shardSyncer) {
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
this(shardInfo,
streamConfig,
checkpoint,
@ -131,8 +131,7 @@ class ShardConsumer {
skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional.empty(),
Optional.empty(),
config,
shardSyncer);
config, shardSyncer, shardSyncStrategy);
}
/**
@ -164,9 +163,7 @@ class ShardConsumer {
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config,
ShardSyncer shardSyncer) {
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
this(
shardInfo,
streamConfig,
@ -190,8 +187,7 @@ class ShardConsumer {
new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo),
retryGetRecordsInSeconds,
maxGetRecordsThreadPool,
config,
shardSyncer
config, shardSyncer, shardSyncStrategy
);
}
@ -229,8 +225,7 @@ class ShardConsumer {
KinesisDataFetcher kinesisDataFetcher,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config,
ShardSyncer shardSyncer) {
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
this.shardInfo = shardInfo;
this.streamConfig = streamConfig;
this.checkpoint = checkpoint;
@ -249,6 +244,7 @@ class ShardConsumer {
makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo),
this.getShardInfo().getShardId(), this.metricsFactory, this.config.getMaxRecords());
this.shardSyncer = shardSyncer;
this.shardSyncStrategy = shardSyncStrategy;
}
/**
@ -512,4 +508,8 @@ class ShardConsumer {
ShutdownNotification getShutdownNotification() {
return shutdownNotification;
}
ShardSyncStrategy getShardSyncStrategy() {
return shardSyncStrategy;
}
}

View file

@ -0,0 +1,62 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Implementation of ShardSyncStrategy that facilitates shard syncs when a shard completes processing.
*/
class ShardEndShardSyncStrategy implements ShardSyncStrategy {
private static final Log LOG = LogFactory.getLog(Worker.class);
private ShardSyncTaskManager shardSyncTaskManager;
ShardEndShardSyncStrategy(ShardSyncTaskManager shardSyncTaskManager) {
this.shardSyncTaskManager = shardSyncTaskManager;
}
@Override
public ShardSyncStrategyType getStrategyType() {
return ShardSyncStrategyType.SHARD_END;
}
@Override
public TaskResult syncShards() {
Future<TaskResult> taskResultFuture = null;
TaskResult result = null;
while (taskResultFuture == null) {
taskResultFuture = shardSyncTaskManager.syncShardAndLeaseInfo(null);
}
try {
result = taskResultFuture.get();
} catch (InterruptedException | ExecutionException e) {
LOG.warn("ShardEndShardSyncStrategy syncShards encountered exception.", e);
}
return result;
}
@Override
public TaskResult onWorkerInitialization() {
LOG.debug(String.format("onWorkerInitialization is NoOp for ShardSyncStrategyType %s", getStrategyType().toString()));
return new TaskResult(null);
}
@Override
public TaskResult onFoundCompletedShard() {
shardSyncTaskManager.syncShardAndLeaseInfo(null);
return new TaskResult(null);
}
@Override
public TaskResult onShardConsumerShutDown() {
return shardSyncTaskManager.runShardSyncer();
}
@Override
public void onWorkerShutDown() {
LOG.debug(String.format("Stop is NoOp for ShardSyncStrategyType %s", getStrategyType().toString()));
}
}

View file

@ -0,0 +1,48 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
/**
* Facade of methods that can be invoked at different points
* in KCL application execution to perform certain actions related to shard-sync.
*/
public interface ShardSyncStrategy {
/**
* Can be used to provide a custom name for the implemented strategy.
*
* @return Name of the strategy.
*/
ShardSyncStrategyType getStrategyType();
/**
* Invoked when the KCL application wants to execute shard-sync.
*
* @return TaskResult
*/
TaskResult syncShards();
/**
* Invoked at worker initialization
*
* @return
*/
TaskResult onWorkerInitialization();
/**
* Invoked when a completed shard is found.
*
* @return
*/
TaskResult onFoundCompletedShard();
/**
* Invoked when ShardConsumer is shutdown.
*
* @return
*/
TaskResult onShardConsumerShutDown();
/**
* Invoked when worker is shutdown.
*/
void onWorkerShutDown();
}

View file

@ -0,0 +1,12 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
/**
* Types of ShardSyncStrategy} implemented in KCL.
*/
public enum ShardSyncStrategyType {
/* Shard sync are performed periodically */
PERIODIC,
/* Shard syncs are performed when processing of a shard completes */
SHARD_END
}

View file

@ -19,6 +19,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import lombok.Getter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -32,6 +33,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
* Kinesis shards, remove obsolete leases). We'll have at most one outstanding sync task at any time.
* Worker will use this class to kick off a sync task when it finds shards which have been completely processed.
*/
@Getter
class ShardSyncTaskManager {
private static final Log LOG = LogFactory.getLog(ShardSyncTaskManager.class);
@ -83,12 +85,12 @@ class ShardSyncTaskManager {
this.shardSyncer = shardSyncer;
}
synchronized boolean syncShardAndLeaseInfo(Set<String> closedShardIds) {
synchronized Future<TaskResult> syncShardAndLeaseInfo(Set<String> closedShardIds) {
return checkAndSubmitNextTask(closedShardIds);
}
private synchronized boolean checkAndSubmitNextTask(Set<String> closedShardIds) {
boolean submittedNewTask = false;
private synchronized Future<TaskResult> checkAndSubmitNextTask(Set<String> closedShardIds) {
Future<TaskResult> submittedTaskFuture = null;
if ((future == null) || future.isCancelled() || future.isDone()) {
if ((future != null) && future.isDone()) {
try {
@ -111,17 +113,32 @@ class ShardSyncTaskManager {
shardSyncIdleTimeMillis,
shardSyncer), metricsFactory);
future = executorService.submit(currentTask);
submittedNewTask = true;
if (LOG.isDebugEnabled()) {
LOG.debug("Submitted new " + currentTask.getTaskType() + " task.");
}
submittedTaskFuture = future;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Previous " + currentTask.getTaskType() + " task still pending. Not submitting new task.");
}
}
return submittedNewTask;
return submittedTaskFuture;
}
synchronized TaskResult runShardSyncer() {
Exception exception = null;
try {
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
leaseManager,
initialPositionInStream,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards);
} catch (Exception e) {
LOG.error("Caught exception while sync'ing Kinesis shards and leases", e);
exception = e;
}
return new TaskResult(exception);
}
}

View file

@ -14,830 +14,19 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.lang3.StringUtils;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.amazonaws.services.kinesis.model.Shard;
/**
* Helper class to sync leases with shards of the Kinesis stream.
* It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding).
* It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it
* and begun processing it's child shards.
*/
class ShardSyncer {
public interface ShardSyncer {
private static final Log LOG = LogFactory.getLog(ShardSyncer.class);
private final LeaseCleanupValidator leaseCleanupValidator;
public ShardSyncer(final LeaseCleanupValidator leaseCleanupValidator) {
this.leaseCleanupValidator = leaseCleanupValidator;
}
synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards,
void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards);
}
/**
* Check and create leases for any new shards (e.g. following a reshard operation).
*
* @param kinesisProxy
* @param leaseManager
* @param initialPositionInStream
* @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards);
}
/**
* Sync leases with Kinesis shards (e.g. at startup, or when we reach end of a shard).
*
* @param kinesisProxy
* @param leaseManager
* @param initialPosition
* @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
// CHECKSTYLE:OFF CyclomaticComplexity
private synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPosition,
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
List<Shard> shards = getShardList(kinesisProxy);
LOG.debug("Num shards: " + shards.size());
Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(shards);
Map<String, Set<String>> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(shardIdToShardMap);
Set<String> inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap);
if (!ignoreUnexpectedChildShards) {
assertAllParentShardsAreClosed(inconsistentShardIds);
}
List<KinesisClientLease> currentLeases = leaseManager.listLeases();
List<KinesisClientLease> newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition,
inconsistentShardIds);
LOG.debug("Num new leases to create: " + newLeasesToCreate.size());
for (KinesisClientLease lease : newLeasesToCreate) {
long startTimeMillis = System.currentTimeMillis();
boolean success = false;
try {
leaseManager.createLeaseIfNotExists(lease);
success = true;
} finally {
MetricsHelper.addSuccessAndLatency("CreateLease", startTimeMillis, success, MetricsLevel.DETAILED);
}
}
List<KinesisClientLease> trackedLeases = new ArrayList<>();
if (currentLeases != null) {
trackedLeases.addAll(currentLeases);
}
trackedLeases.addAll(newLeasesToCreate);
cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager);
if (cleanupLeasesOfCompletedShards) {
cleanupLeasesOfFinishedShards(currentLeases,
shardIdToShardMap,
shardIdToChildShardIdsMap,
trackedLeases,
leaseManager);
}
}
// CHECKSTYLE:ON CyclomaticComplexity
/** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls
* and a reshard operation.
* @param inconsistentShardIds
* @throws KinesisClientLibIOException
*/
private void assertAllParentShardsAreClosed(Set<String> inconsistentShardIds)
throws KinesisClientLibIOException {
if (!inconsistentShardIds.isEmpty()) {
String ids = StringUtils.join(inconsistentShardIds, ' ');
throw new KinesisClientLibIOException(String.format("%d open child shards (%s) are inconsistent. "
+ "This can happen due to a race condition between describeStream and a reshard operation.",
inconsistentShardIds.size(), ids));
}
}
/**
* Helper method to construct the list of inconsistent shards, which are open shards with non-closed ancestor
* parent(s).
* @param shardIdToChildShardIdsMap
* @param shardIdToShardMap
* @return Set of inconsistent open shard ids for shards having open parents.
*/
private Set<String> findInconsistentShardIds(Map<String, Set<String>> shardIdToChildShardIdsMap,
Map<String, Shard> shardIdToShardMap) {
Set<String> result = new HashSet<String>();
for (String parentShardId : shardIdToChildShardIdsMap.keySet()) {
Shard parentShard = shardIdToShardMap.get(parentShardId);
if ((parentShardId == null) || (parentShard.getSequenceNumberRange().getEndingSequenceNumber() == null)) {
Set<String> childShardIdsMap = shardIdToChildShardIdsMap.get(parentShardId);
result.addAll(childShardIdsMap);
}
}
return result;
}
/**
* Helper method to create a shardId->KinesisClientLease map.
* Note: This has package level access for testing purposes only.
* @param trackedLeaseList
* @return
*/
Map<String, KinesisClientLease> constructShardIdToKCLLeaseMap(List<KinesisClientLease> trackedLeaseList) {
Map<String, KinesisClientLease> trackedLeasesMap = new HashMap<>();
for (KinesisClientLease lease : trackedLeaseList) {
trackedLeasesMap.put(lease.getLeaseKey(), lease);
}
return trackedLeasesMap;
}
/**
* Note: this has package level access for testing purposes.
* Useful for asserting that we don't have an incomplete shard list following a reshard operation.
* We verify that if the shard is present in the shard list, it is closed and its hash key range
* is covered by its child shards.
*/
synchronized void assertClosedShardsAreCoveredOrAbsent(Map<String, Shard> shardIdToShardMap,
Map<String, Set<String>> shardIdToChildShardIdsMap,
Set<String> shardIdsOfClosedShards) throws KinesisClientLibIOException {
String exceptionMessageSuffix = "This can happen if we constructed the list of shards "
+ " while a reshard operation was in progress.";
for (String shardId : shardIdsOfClosedShards) {
Shard shard = shardIdToShardMap.get(shardId);
if (shard == null) {
LOG.info("Shard " + shardId + " is not present in Kinesis anymore.");
continue;
}
String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber();
if (endingSequenceNumber == null) {
throw new KinesisClientLibIOException("Shard " + shardIdsOfClosedShards
+ " is not closed. " + exceptionMessageSuffix);
}
Set<String> childShardIds = shardIdToChildShardIdsMap.get(shardId);
if (childShardIds == null) {
throw new KinesisClientLibIOException("Incomplete shard list: Closed shard " + shardId
+ " has no children." + exceptionMessageSuffix);
}
assertHashRangeOfClosedShardIsCovered(shard, shardIdToShardMap, childShardIds);
}
}
private synchronized void assertHashRangeOfClosedShardIsCovered(Shard closedShard,
Map<String, Shard> shardIdToShardMap,
Set<String> childShardIds) throws KinesisClientLibIOException {
BigInteger startingHashKeyOfClosedShard = new BigInteger(closedShard.getHashKeyRange().getStartingHashKey());
BigInteger endingHashKeyOfClosedShard = new BigInteger(closedShard.getHashKeyRange().getEndingHashKey());
BigInteger minStartingHashKeyOfChildren = null;
BigInteger maxEndingHashKeyOfChildren = null;
for (String childShardId : childShardIds) {
Shard childShard = shardIdToShardMap.get(childShardId);
BigInteger startingHashKey = new BigInteger(childShard.getHashKeyRange().getStartingHashKey());
if ((minStartingHashKeyOfChildren == null)
|| (startingHashKey.compareTo(minStartingHashKeyOfChildren) < 0)) {
minStartingHashKeyOfChildren = startingHashKey;
}
BigInteger endingHashKey = new BigInteger(childShard.getHashKeyRange().getEndingHashKey());
if ((maxEndingHashKeyOfChildren == null)
|| (endingHashKey.compareTo(maxEndingHashKeyOfChildren) > 0)) {
maxEndingHashKeyOfChildren = endingHashKey;
}
}
if ((minStartingHashKeyOfChildren == null) || (maxEndingHashKeyOfChildren == null)
|| (minStartingHashKeyOfChildren.compareTo(startingHashKeyOfClosedShard) > 0)
|| (maxEndingHashKeyOfChildren.compareTo(endingHashKeyOfClosedShard) < 0)) {
throw new KinesisClientLibIOException("Incomplete shard list: hash key range of shard "
+ closedShard.getShardId() + " is not covered by its child shards.");
}
}
/**
* Helper method to construct shardId->setOfChildShardIds map.
* Note: This has package access for testing purposes only.
* @param shardIdToShardMap
* @return
*/
Map<String, Set<String>> constructShardIdToChildShardIdsMap(
Map<String, Shard> shardIdToShardMap) {
Map<String, Set<String>> shardIdToChildShardIdsMap = new HashMap<>();
for (Map.Entry<String, Shard> entry : shardIdToShardMap.entrySet()) {
String shardId = entry.getKey();
Shard shard = entry.getValue();
String parentShardId = shard.getParentShardId();
if ((parentShardId != null) && (shardIdToShardMap.containsKey(parentShardId))) {
Set<String> childShardIds = shardIdToChildShardIdsMap.get(parentShardId);
if (childShardIds == null) {
childShardIds = new HashSet<String>();
shardIdToChildShardIdsMap.put(parentShardId, childShardIds);
}
childShardIds.add(shardId);
}
String adjacentParentShardId = shard.getAdjacentParentShardId();
if ((adjacentParentShardId != null) && (shardIdToShardMap.containsKey(adjacentParentShardId))) {
Set<String> childShardIds = shardIdToChildShardIdsMap.get(adjacentParentShardId);
if (childShardIds == null) {
childShardIds = new HashSet<String>();
shardIdToChildShardIdsMap.put(adjacentParentShardId, childShardIds);
}
childShardIds.add(shardId);
}
}
return shardIdToChildShardIdsMap;
}
private List<Shard> getShardList(IKinesisProxy kinesisProxy) throws KinesisClientLibIOException {
List<Shard> shards = kinesisProxy.getShardList();
if (shards == null) {
throw new KinesisClientLibIOException(
"Stream is not in ACTIVE OR UPDATING state - will retry getting the shard list.");
}
return shards;
}
/**
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
*
* For each open (no ending sequence number) shard without open parents that doesn't already have a lease,
* determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists):
* If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed.
* If not, set checkpoint of the shard to the initial position specified by the client.
* To check if we need to create leases for ancestors, we use the following rules:
* * If we began (or will begin) processing data for a shard, then we must reach end of that shard before
* we begin processing data from any of its descendants.
* * A shard does not start processing data until data from all its parents has been processed.
* Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create
* leases corresponding to both the parents - the parent shard which is not a descendant will have
* its checkpoint set to Latest.
*
* We assume that if there is an existing lease for a shard, then either:
* * we have previously created a lease for its parent (if it was needed), or
* * the parent shard has expired.
*
* For example:
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | / \
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Current leases: (3, 4, 5)
* New leases to create: (2, 6, 7, 8, 9, 10)
*
* The leases returned are sorted by the starting sequence number - following the same order
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
* before creating all the leases.
*
* If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it
* here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very
* high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only
* currently happen here if ignoreUnexpectedChildShards was true in syncShardleases.
*
*
* @param shards List of all shards in Kinesis (we'll create new leases based on this set)
* @param currentLeases List of current leases
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
* @param inconsistentShardIds Set of child shard ids having open parents.
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
List<KinesisClientLease> currentLeases,
InitialPositionInStreamExtended initialPosition,
Set<String> inconsistentShardIds) {
Map<String, KinesisClientLease> shardIdToNewLeaseMap = new HashMap<String, KinesisClientLease>();
Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
Set<String> shardIdsOfCurrentLeases = new HashSet<String>();
for (KinesisClientLease lease : currentLeases) {
shardIdsOfCurrentLeases.add(lease.getLeaseKey());
LOG.debug("Existing lease: " + lease);
}
List<Shard> openShards = getOpenShards(shards);
Map<String, Boolean> memoizationContext = new HashMap<>();
// Iterate over the open shards and find those that don't have any lease entries.
for (Shard shard : openShards) {
String shardId = shard.getShardId();
LOG.debug("Evaluating leases for open shard " + shardId + " and its ancestors.");
if (shardIdsOfCurrentLeases.contains(shardId)) {
LOG.debug("Lease for shardId " + shardId + " already exists. Not creating a lease");
} else if (inconsistentShardIds.contains(shardId)) {
LOG.info("shardId " + shardId + " is an inconsistent child. Not creating a lease");
} else {
LOG.debug("Need to create a lease for shardId " + shardId);
KinesisClientLease newLease = newKCLLease(shard);
boolean isDescendant =
checkIfDescendantAndAddNewLeasesForAncestors(shardId,
initialPosition,
shardIdsOfCurrentLeases,
shardIdToShardMapOfAllKinesisShards,
shardIdToNewLeaseMap,
memoizationContext);
/**
* If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
* checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a
* lease just like we do for TRIM_HORIZON. However we will only return back records with server-side
* timestamp at or after the specified initial position timestamp.
*
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
*
* Current leases: empty set
*
* For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
* timestamp value 206. We will then create new leases for all the shards (with checkpoint set to
* AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin
* processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases
* would then be deleted since they won't have records with server-side timestamp at/after 206. And
* after that we will begin processing the descendant shards with epoch at/after 206 and we will
* return the records that meet the timestamp requirement for these shards.
*/
if (isDescendant && !initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
newLease.setCheckpoint(convertToCheckpoint(initialPosition));
}
LOG.debug("Set checkpoint of " + newLease.getLeaseKey() + " to " + newLease.getCheckpoint());
shardIdToNewLeaseMap.put(shardId, newLease);
}
}
List<KinesisClientLease> newLeasesToCreate = new ArrayList<KinesisClientLease>();
newLeasesToCreate.addAll(shardIdToNewLeaseMap.values());
Comparator<? super KinesisClientLease> startingSequenceNumberComparator =
new StartingSequenceNumberAndShardIdBasedComparator(shardIdToShardMapOfAllKinesisShards);
Collections.sort(newLeasesToCreate, startingSequenceNumberComparator);
return newLeasesToCreate;
}
/**
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
*/
List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
List<KinesisClientLease> currentLeases,
InitialPositionInStreamExtended initialPosition) {
Set<String> inconsistentShardIds = new HashSet<String>();
return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds);
}
/**
* Note: Package level access for testing purposes only.
* Check if this shard is a descendant of a shard that is (or will be) processed.
* Create leases for the ancestors of this shard as required.
* See javadoc of determineNewLeasesToCreate() for rules and example.
*
* @param shardId The shardId to check.
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
* @param shardIdsOfCurrentLeases The shardIds for the current leases.
* @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream.
* @param shardIdToLeaseMapOfNewShards Add lease POJOs corresponding to ancestors to this map.
* @param memoizationContext Memoization of shards that have been evaluated as part of the evaluation
* @return true if the shard is a descendant of any current shard (lease already exists)
*/
// CHECKSTYLE:OFF CyclomaticComplexity
boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId,
InitialPositionInStreamExtended initialPosition,
Set<String> shardIdsOfCurrentLeases,
Map<String, Shard> shardIdToShardMapOfAllKinesisShards,
Map<String, KinesisClientLease> shardIdToLeaseMapOfNewShards,
Map<String, Boolean> memoizationContext) {
Boolean previousValue = memoizationContext.get(shardId);
if (previousValue != null) {
return previousValue;
}
boolean isDescendant = false;
Shard shard;
Set<String> parentShardIds;
Set<String> descendantParentShardIds = new HashSet<String>();
if ((shardId != null) && (shardIdToShardMapOfAllKinesisShards.containsKey(shardId))) {
if (shardIdsOfCurrentLeases.contains(shardId)) {
// This shard is a descendant of a current shard.
isDescendant = true;
// We don't need to add leases of its ancestors,
// because we'd have done it when creating a lease for this shard.
} else {
shard = shardIdToShardMapOfAllKinesisShards.get(shardId);
parentShardIds = getParentShardIds(shard, shardIdToShardMapOfAllKinesisShards);
for (String parentShardId : parentShardIds) {
// Check if the parent is a descendant, and include its ancestors.
if (checkIfDescendantAndAddNewLeasesForAncestors(parentShardId,
initialPosition,
shardIdsOfCurrentLeases,
shardIdToShardMapOfAllKinesisShards,
shardIdToLeaseMapOfNewShards,
memoizationContext)) {
isDescendant = true;
descendantParentShardIds.add(parentShardId);
LOG.debug("Parent shard " + parentShardId + " is a descendant.");
} else {
LOG.debug("Parent shard " + parentShardId + " is NOT a descendant.");
}
}
// If this is a descendant, create leases for its parent shards (if they don't exist)
if (isDescendant) {
for (String parentShardId : parentShardIds) {
if (!shardIdsOfCurrentLeases.contains(parentShardId)) {
LOG.debug("Need to create a lease for shardId " + parentShardId);
KinesisClientLease lease = shardIdToLeaseMapOfNewShards.get(parentShardId);
if (lease == null) {
lease = newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId));
shardIdToLeaseMapOfNewShards.put(parentShardId, lease);
}
if (descendantParentShardIds.contains(parentShardId)
&& !initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
lease.setCheckpoint(convertToCheckpoint(initialPosition));
}
}
}
} else {
// This shard should be included, if the customer wants to process all records in the stream or
// if the initial position is AT_TIMESTAMP. For AT_TIMESTAMP, we will add a lease just like we do
// for TRIM_HORIZON. However we will only return back records with server-side timestamp at or
// after the specified initial position timestamp.
if (initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)
|| initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
isDescendant = true;
}
}
}
}
memoizationContext.put(shardId, isDescendant);
return isDescendant;
}
// CHECKSTYLE:ON CyclomaticComplexity
/**
* Helper method to get parent shardIds of the current shard - includes the parent shardIds if:
* a/ they are not null
* b/ if they exist in the current shard map (i.e. haven't expired)
*
* @param shard Will return parents of this shard
* @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream.
* @return Set of parentShardIds
*/
Set<String> getParentShardIds(Shard shard, Map<String, Shard> shardIdToShardMapOfAllKinesisShards) {
Set<String> parentShardIds = new HashSet<String>(2);
String parentShardId = shard.getParentShardId();
if ((parentShardId != null) && shardIdToShardMapOfAllKinesisShards.containsKey(parentShardId)) {
parentShardIds.add(parentShardId);
}
String adjacentParentShardId = shard.getAdjacentParentShardId();
if ((adjacentParentShardId != null) && shardIdToShardMapOfAllKinesisShards.containsKey(adjacentParentShardId)) {
parentShardIds.add(adjacentParentShardId);
}
return parentShardIds;
}
/**
* Delete leases corresponding to shards that no longer exist in the stream.
* Current scheme: Delete a lease if:
* * the corresponding shard is not present in the list of Kinesis shards, AND
* * the parentShardIds listed in the lease are also not present in the list of Kinesis shards.
* @param shards List of all Kinesis shards (assumed to be a consistent snapshot - when stream is in Active state).
* @param trackedLeases List of
* @param kinesisProxy Kinesis proxy (used to get shard list)
* @param leaseManager
* @throws KinesisClientLibIOException Thrown if we couldn't get a fresh shard list from Kinesis.
* @throws ProvisionedThroughputException
* @throws InvalidStateException
* @throws DependencyException
*/
private void cleanupGarbageLeases(List<Shard> shards,
List<KinesisClientLease> trackedLeases,
IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager)
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException {
Set<String> kinesisShards = new HashSet<>();
for (Shard shard : shards) {
kinesisShards.add(shard.getShardId());
}
// Check if there are leases for non-existent shards
List<KinesisClientLease> garbageLeases = new ArrayList<>();
for (KinesisClientLease lease : trackedLeases) {
if (leaseCleanupValidator.isCandidateForCleanup(lease, kinesisShards)) {
garbageLeases.add(lease);
}
}
if (!garbageLeases.isEmpty()) {
LOG.info("Found " + garbageLeases.size()
+ " candidate leases for cleanup. Refreshing list of"
+ " Kinesis shards to pick up recent/latest shards");
List<Shard> currentShardList = getShardList(kinesisProxy);
Set<String> currentKinesisShardIds = new HashSet<>();
for (Shard shard : currentShardList) {
currentKinesisShardIds.add(shard.getShardId());
}
for (KinesisClientLease lease : garbageLeases) {
if (leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds)) {
LOG.info("Deleting lease for shard " + lease.getLeaseKey()
+ " as it is not present in Kinesis stream.");
leaseManager.deleteLease(lease);
}
}
}
}
/**
* Private helper method.
* Clean up leases for shards that meet the following criteria:
* a/ the shard has been fully processed (checkpoint is set to SHARD_END)
* b/ we've begun processing all the child shards: we have leases for all child shards and their checkpoint is not
* TRIM_HORIZON.
*
* @param currentLeases List of leases we evaluate for clean up
* @param shardIdToShardMap Map of shardId->Shard (assumed to include all Kinesis shards)
* @param shardIdToChildShardIdsMap Map of shardId->childShardIds (assumed to include all Kinesis shards)
* @param trackedLeases List of all leases we are tracking.
* @param leaseManager Lease manager (will be used to delete leases)
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
private synchronized void cleanupLeasesOfFinishedShards(Collection<KinesisClientLease> currentLeases,
Map<String, Shard> shardIdToShardMap,
Map<String, Set<String>> shardIdToChildShardIdsMap,
List<KinesisClientLease> trackedLeases,
ILeaseManager<KinesisClientLease> leaseManager)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
Set<String> shardIdsOfClosedShards = new HashSet<>();
List<KinesisClientLease> leasesOfClosedShards = new ArrayList<>();
for (KinesisClientLease lease : currentLeases) {
if (lease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
shardIdsOfClosedShards.add(lease.getLeaseKey());
leasesOfClosedShards.add(lease);
}
}
if (!leasesOfClosedShards.isEmpty()) {
assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap,
shardIdToChildShardIdsMap,
shardIdsOfClosedShards);
Comparator<? super KinesisClientLease> startingSequenceNumberComparator =
new StartingSequenceNumberAndShardIdBasedComparator(shardIdToShardMap);
Collections.sort(leasesOfClosedShards, startingSequenceNumberComparator);
Map<String, KinesisClientLease> trackedLeaseMap = constructShardIdToKCLLeaseMap(trackedLeases);
for (KinesisClientLease leaseOfClosedShard : leasesOfClosedShards) {
String closedShardId = leaseOfClosedShard.getLeaseKey();
Set<String> childShardIds = shardIdToChildShardIdsMap.get(closedShardId);
if ((closedShardId != null) && (childShardIds != null) && (!childShardIds.isEmpty())) {
cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
}
}
}
}
/**
* Delete lease for the closed shard. Rules for deletion are:
* a/ the checkpoint for the closed shard is SHARD_END,
* b/ there are leases for all the childShardIds and their checkpoint is NOT TRIM_HORIZON
* Note: This method has package level access solely for testing purposes.
*
* @param closedShardId Identifies the closed shard
* @param childShardIds ShardIds of children of the closed shard
* @param trackedLeases shardId->KinesisClientLease map with all leases we are tracking (should not be null)
* @param leaseManager
* @throws ProvisionedThroughputException
* @throws InvalidStateException
* @throws DependencyException
*/
synchronized void cleanupLeaseForClosedShard(String closedShardId,
Set<String> childShardIds,
Map<String, KinesisClientLease> trackedLeases,
ILeaseManager<KinesisClientLease> leaseManager)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
KinesisClientLease leaseForClosedShard = trackedLeases.get(closedShardId);
List<KinesisClientLease> childShardLeases = new ArrayList<>();
for (String childShardId : childShardIds) {
KinesisClientLease childLease = trackedLeases.get(childShardId);
if (childLease != null) {
childShardLeases.add(childLease);
}
}
if ((leaseForClosedShard != null)
&& (leaseForClosedShard.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END))
&& (childShardLeases.size() == childShardIds.size())) {
boolean okayToDelete = true;
for (KinesisClientLease lease : childShardLeases) {
if (lease.getCheckpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON)) {
okayToDelete = false;
break;
}
}
if (okayToDelete) {
LOG.info("Deleting lease for shard " + leaseForClosedShard.getLeaseKey()
+ " as it has been completely processed and processing of child shards has begun.");
leaseManager.deleteLease(leaseForClosedShard);
}
}
}
/**
* Helper method to create a new KinesisClientLease POJO for a shard.
* Note: Package level access only for testing purposes
*
* @param shard
* @return
*/
KinesisClientLease newKCLLease(Shard shard) {
KinesisClientLease newLease = new KinesisClientLease();
newLease.setLeaseKey(shard.getShardId());
List<String> parentShardIds = new ArrayList<String>(2);
if (shard.getParentShardId() != null) {
parentShardIds.add(shard.getParentShardId());
}
if (shard.getAdjacentParentShardId() != null) {
parentShardIds.add(shard.getAdjacentParentShardId());
}
newLease.setParentShardIds(parentShardIds);
newLease.setOwnerSwitchesSinceCheckpoint(0L);
return newLease;
}
/**
* Helper method to construct a shardId->Shard map for the specified list of shards.
*
* @param shards List of shards
* @return ShardId->Shard map
*/
Map<String, Shard> constructShardIdToShardMap(List<Shard> shards) {
Map<String, Shard> shardIdToShardMap = new HashMap<String, Shard>();
for (Shard shard : shards) {
shardIdToShardMap.put(shard.getShardId(), shard);
}
return shardIdToShardMap;
}
/**
* Helper method to return all the open shards for a stream.
* Note: Package level access only for testing purposes.
*
* @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list.
* @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active.
*/
List<Shard> getOpenShards(List<Shard> allShards) {
List<Shard> openShards = new ArrayList<Shard>();
for (Shard shard : allShards) {
String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber();
if (endingSequenceNumber == null) {
openShards.add(shard);
LOG.debug("Found open shard: " + shard.getShardId());
}
}
return openShards;
}
private ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended position) {
ExtendedSequenceNumber checkpoint = null;
if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) {
checkpoint = ExtendedSequenceNumber.TRIM_HORIZON;
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.LATEST)) {
checkpoint = ExtendedSequenceNumber.LATEST;
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
checkpoint = ExtendedSequenceNumber.AT_TIMESTAMP;
}
return checkpoint;
}
/** Helper class to compare leases based on starting sequence number of the corresponding shards.
*
*/
private static class StartingSequenceNumberAndShardIdBasedComparator implements Comparator<KinesisClientLease>,
Serializable {
private static final long serialVersionUID = 1L;
private final Map<String, Shard> shardIdToShardMap;
/**
* @param shardIdToShardMapOfAllKinesisShards
*/
public StartingSequenceNumberAndShardIdBasedComparator(Map<String, Shard> shardIdToShardMapOfAllKinesisShards) {
shardIdToShardMap = shardIdToShardMapOfAllKinesisShards;
}
/**
* Compares two leases based on the starting sequence number of corresponding shards.
* If shards are not found in the shardId->shard map supplied, we do a string comparison on the shardIds.
* We assume that lease1 and lease2 are:
* a/ not null,
* b/ shards (if found) have non-null starting sequence numbers
*
* {@inheritDoc}
*/
@Override
public int compare(KinesisClientLease lease1, KinesisClientLease lease2) {
int result = 0;
String shardId1 = lease1.getLeaseKey();
String shardId2 = lease2.getLeaseKey();
Shard shard1 = shardIdToShardMap.get(shardId1);
Shard shard2 = shardIdToShardMap.get(shardId2);
// If we found shards for the two leases, use comparison of the starting sequence numbers
if ((shard1 != null) && (shard2 != null)) {
BigInteger sequenceNumber1 =
new BigInteger(shard1.getSequenceNumberRange().getStartingSequenceNumber());
BigInteger sequenceNumber2 =
new BigInteger(shard2.getSequenceNumberRange().getStartingSequenceNumber());
result = sequenceNumber1.compareTo(sequenceNumber2);
}
if (result == 0) {
result = shardId1.compareTo(shardId2);
}
return result;
}
}
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException;
}

View file

@ -49,6 +49,7 @@ class ShutdownTask implements ITask {
private final long backoffTimeMillis;
private final GetRecordsCache getRecordsCache;
private final ShardSyncer shardSyncer;
private final ShardSyncStrategy shardSyncStrategy;
/**
* Constructor.
@ -64,8 +65,7 @@ class ShutdownTask implements ITask {
boolean ignoreUnexpectedChildShards,
ILeaseManager<KinesisClientLease> leaseManager,
long backoffTimeMillis,
GetRecordsCache getRecordsCache,
ShardSyncer shardSyncer) {
GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
@ -78,6 +78,7 @@ class ShutdownTask implements ITask {
this.backoffTimeMillis = backoffTimeMillis;
this.getRecordsCache = getRecordsCache;
this.shardSyncer = shardSyncer;
this.shardSyncStrategy = shardSyncStrategy;
}
/*
@ -130,11 +131,12 @@ class ShutdownTask implements ITask {
if (reason == ShutdownReason.TERMINATE) {
LOG.debug("Looking for child shards of shard " + shardInfo.getShardId());
// create leases for the child shards
shardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
leaseManager,
initialPositionInStream,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards);
TaskResult result = shardSyncStrategy.onShardConsumerShutDown();
if (result.getException() != null) {
LOG.debug("Exception while trying to sync shards on the shutdown of shard: " + shardInfo
.getShardId());
throw result.getException();
}
LOG.debug("Finished checking for child shards of shard " + shardInfo.getShardId());
}

View file

@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
@ -34,6 +35,11 @@ import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator;
import com.amazonaws.services.kinesis.leases.impl.LeaseRenewer;
import com.amazonaws.services.kinesis.leases.impl.LeaseTaker;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseTaker;
import com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
@ -82,7 +88,9 @@ public class Worker implements Runnable {
private static final Log LOG = LogFactory.getLog(Worker.class);
private static final int SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC = 0;
private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; //Default for KCL.
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
private static final LeaseCleanupValidator DEFAULT_LEASE_CLEANUP_VALIDATOR = new KinesisLeaseCleanupValidator();
private static final LeaseSelector<KinesisClientLease> DEFAULT_LEASE_SELECTOR = new GenericLeaseSelector<KinesisClientLease>();
@ -136,6 +144,10 @@ public class Worker implements Runnable {
private WorkerStateChangeListener workerStateChangeListener;
// Periodic Shard Sync related fields
private LeaderDecider leaderDecider;
private ShardSyncStrategy shardSyncStrategy;
/**
* Constructor.
*
@ -380,25 +392,10 @@ public class Worker implements Runnable {
KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient,
IMetricsFactory metricsFactory, ExecutorService execService) {
this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory),
config,
new StreamConfig(
new KinesisProxy(config, kinesisClient),
config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(),
config.shouldCallProcessRecordsEvenForEmptyRecordList(),
config.shouldValidateSequenceNumberBeforeCheckpointing(),
config.getInitialPositionInStreamExtended()),
config, getStreamConfig(config, kinesisClient),
config.getInitialPositionInStreamExtended(), config.getParentShardPollIntervalMillis(),
config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null,
new KinesisClientLibLeaseCoordinator(
new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient),
DEFAULT_LEASE_SELECTOR,
config.getWorkerIdentifier(),
config.getFailoverTimeMillis(),
config.getEpsilonMillis(),
config.getMaxLeasesForWorker(),
config.getMaxLeasesToStealAtOneTime(),
config.getMaxLeaseRenewalThreads(),
metricsFactory)
getLeaseCoordinator(config, dynamoDBClient, metricsFactory)
.withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
.withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
execService,
@ -409,8 +406,7 @@ public class Worker implements Runnable {
config.getShardPrioritizationStrategy(),
config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool(),
DEFAULT_WORKER_STATE_CHANGE_LISTENER,
DEFAULT_LEASE_CLEANUP_VALIDATOR );
DEFAULT_WORKER_STATE_CHANGE_LISTENER, DEFAULT_LEASE_CLEANUP_VALIDATOR, null /* leaderDecider */);
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) {
@ -470,7 +466,8 @@ public class Worker implements Runnable {
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER, DEFAULT_LEASE_CLEANUP_VALIDATOR );
shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER,
DEFAULT_LEASE_CLEANUP_VALIDATOR, null);
}
/**
@ -520,7 +517,23 @@ public class Worker implements Runnable {
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener,
LeaseCleanupValidator leaseCleanupValidator) {
LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider) {
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream,
parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint,
leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, retryGetRecordsInSeconds,
maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(leaseCleanupValidator),
leaderDecider);
}
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream,
long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion,
ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool,
WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider) {
this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory;
this.config = config;
@ -533,7 +546,7 @@ public class Worker implements Runnable {
this.executorService = execService;
this.leaseCoordinator = leaseCoordinator;
this.metricsFactory = metricsFactory;
this.shardSyncer = new ShardSyncer(leaseCleanupValidator);
this.shardSyncer = shardSyncer;
this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
initialPositionInStream, cleanupLeasesUponShardCompletion, config.shouldIgnoreUnexpectedChildShards(),
shardSyncIdleTimeMillis, metricsFactory, executorService, shardSyncer);
@ -545,6 +558,34 @@ public class Worker implements Runnable {
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
this.workerStateChangeListener = workerStateChangeListener;
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
this.leaderDecider = leaderDecider;
this.shardSyncStrategy = createShardSyncStrategy(config.getShardSyncStrategyType());
LOG.info(String.format("Shard sync strategy determined as %s.", shardSyncStrategy.getStrategyType().toString()));
}
private ShardSyncStrategy createShardSyncStrategy(ShardSyncStrategyType strategyType) {
switch (strategyType) {
case PERIODIC:
return createPeriodicShardSyncStrategy(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager());
case SHARD_END:
default:
return createShardEndShardSyncStrategy(controlServer);
}
}
private static KinesisClientLibLeaseCoordinator getLeaseCoordinator(KinesisClientLibConfiguration config,
AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory) {
return new KinesisClientLibLeaseCoordinator(
new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient), DEFAULT_LEASE_SELECTOR,
config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(),
config.getMaxLeasesForWorker(), config.getMaxLeasesToStealAtOneTime(),
config.getMaxLeaseRenewalThreads(), metricsFactory);
}
private static StreamConfig getStreamConfig(KinesisClientLibConfiguration config, AmazonKinesis kinesisClient) {
return new StreamConfig(new KinesisProxy(config, kinesisClient), config.getMaxRecords(),
config.getIdleTimeBetweenReadsInMillis(), config.shouldCallProcessRecordsEvenForEmptyRecordList(),
config.shouldValidateSequenceNumberBeforeCheckpointing(), config.getInitialPositionInStreamExtended());
}
/**
@ -601,7 +642,7 @@ public class Worker implements Runnable {
}
if (foundCompletedShard) {
controlServer.syncShardAndLeaseInfo(null);
shardSyncStrategy.onFoundCompletedShard();
}
// clean up shard consumers for unassigned shards
@ -651,6 +692,7 @@ public class Worker implements Runnable {
} else {
LOG.info("LeaseCoordinator is already running. No need to start it.");
}
shardSyncStrategy.onWorkerInitialization();
isDone = true;
} else {
lastException = result.getException();
@ -897,7 +939,7 @@ public class Worker implements Runnable {
* the worker itself.
* <ol>
* <li>Call to start shutdown invoked</li>
* <li>Lease coordinator told to stop taking leases, and to drop existing leases.</li>
* <li>Lease coordinator told to onWorkerShutDown taking leases, and to drop existing leases.</li>
* <li>Worker discovers record processors that no longer have leases.</li>
* <li>Worker triggers shutdown with state {@link ShutdownReason#ZOMBIE}.</li>
* <li>Once all record processors are shutdown, worker terminates owned resources.</li>
@ -919,6 +961,10 @@ public class Worker implements Runnable {
// Lost leases will force Worker to begin shutdown process for all shard consumers in
// Worker.run().
leaseCoordinator.stop();
// Stop the periodicShardSyncManager for the worker
if (shardSyncStrategy != null) {
shardSyncStrategy.onWorkerShutDown();
}
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
}
@ -1005,9 +1051,7 @@ public class Worker implements Runnable {
skipShardSyncAtWorkerInitializationIfLeasesExist,
retryGetRecordsInSeconds,
maxGetRecordsThreadPool,
config,
shardSyncer);
config, shardSyncer, shardSyncStrategy);
}
/**
@ -1116,6 +1160,20 @@ public class Worker implements Runnable {
}
}
private PeriodicShardSyncStrategy createPeriodicShardSyncStrategy(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager) {
return new PeriodicShardSyncStrategy(
new PeriodicShardSyncManager(config.getWorkerIdentifier(), leaderDecider,
new ShardSyncTask(kinesisProxy, leaseManager, config.getInitialPositionInStreamExtended(),
config.shouldCleanupLeasesUponShardCompletion(),
config.shouldIgnoreUnexpectedChildShards(), SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC,
shardSyncer)));
}
private ShardEndShardSyncStrategy createShardEndShardSyncStrategy(ShardSyncTaskManager shardSyncTaskManager) {
return new ShardEndShardSyncStrategy(shardSyncTaskManager);
}
/**
* Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance or not.
* Visible and non-final only for testing.
@ -1172,6 +1230,15 @@ public class Worker implements Runnable {
private LeaseCleanupValidator leaseCleanupValidator;
@Setter @Accessors(fluent = true)
private LeaseSelector<KinesisClientLease> leaseSelector;
@Setter @Accessors(fluent = true)
private LeaderDecider leaderDecider;
@Setter @Accessors(fluent = true)
private ILeaseTaker<KinesisClientLease> leaseTaker;
@Setter @Accessors(fluent = true)
private ILeaseRenewer<KinesisClientLease> leaseRenewer;
@Setter @Accessors(fluent = true)
private ShardSyncer shardSyncer;
@VisibleForTesting
AmazonKinesis getKinesisClient() {
@ -1269,15 +1336,19 @@ public class Worker implements Runnable {
if (config.getKinesisEndpoint() != null) {
setField(kinesisClient, "endpoint", kinesisClient::setEndpoint, config.getKinesisEndpoint());
}
if (metricsFactory == null) {
metricsFactory = getMetricsFactory(cloudWatchClient, config);
}
if (leaseManager == null) {
leaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient);
}
if (shardPrioritization == null) {
shardPrioritization = new ParentsFirstShardPrioritization(1);
}
if (kinesisProxy == null) {
kinesisProxy = new KinesisProxy(config, kinesisClient);
}
@ -1286,14 +1357,35 @@ public class Worker implements Runnable {
workerStateChangeListener = DEFAULT_WORKER_STATE_CHANGE_LISTENER;
}
if(leaseCleanupValidator == null) {
if (leaseCleanupValidator == null) {
leaseCleanupValidator = DEFAULT_LEASE_CLEANUP_VALIDATOR;
}
if (shardSyncer == null) {
shardSyncer = new KinesisShardSyncer(leaseCleanupValidator);
}
if(leaseSelector == null) {
leaseSelector = DEFAULT_LEASE_SELECTOR;
}
if (leaseTaker == null) {
leaseTaker = new LeaseTaker<>(leaseManager, leaseSelector, config.getWorkerIdentifier(), config.getFailoverTimeMillis())
.withMaxLeasesForWorker(config.getMaxLeasesForWorker())
.withMaxLeasesToStealAtOneTime(config.getMaxLeasesToStealAtOneTime());
}
// We expect users to either inject both LeaseRenewer and the corresponding thread-pool, or neither of them (DEFAULT).
if (leaseRenewer == null){
ExecutorService leaseRenewerThreadPool = LeaseCoordinator.getDefaultLeaseRenewalExecutorService(config.getMaxLeaseRenewalThreads());
leaseRenewer = new LeaseRenewer<>(leaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis(), leaseRenewerThreadPool);
}
if (leaderDecider == null) {
leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseManager,
Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
}
return new Worker(config.getApplicationName(),
recordProcessorFactory,
config,
@ -1308,14 +1400,11 @@ public class Worker implements Runnable {
config.getShardSyncIntervalMillis(),
config.shouldCleanupLeasesUponShardCompletion(),
null,
new KinesisClientLibLeaseCoordinator(leaseManager,
leaseSelector,
config.getWorkerIdentifier(),
new KinesisClientLibLeaseCoordinator(leaseManager, leaseTaker, leaseRenewer,
config.getFailoverTimeMillis(),
config.getEpsilonMillis(),
config.getMaxLeasesForWorker(),
config.getMaxLeasesToStealAtOneTime(),
config.getMaxLeaseRenewalThreads(),
metricsFactory)
.withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
.withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
@ -1327,8 +1416,7 @@ public class Worker implements Runnable {
shardPrioritization,
config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool(),
workerStateChangeListener,
leaseCleanupValidator);
workerStateChangeListener, shardSyncer, leaderDecider);
}
<R, T extends AwsClientBuilder<T, R>> R createClient(final T builder,

View file

@ -82,7 +82,6 @@ public class LeaseCoordinator<T extends Lease> {
protected final IMetricsFactory metricsFactory;
private ScheduledExecutorService leaseCoordinatorThreadPool;
private final ExecutorService leaseRenewalThreadpool;
private volatile boolean running = false;
private ScheduledFuture<?> takerFuture;
@ -187,15 +186,15 @@ public class LeaseCoordinator<T extends Lease> {
}
/**
* Constructor.
*
* @param leaseManager LeaseManager instance to use
* @param leaseSelector LeaseSelector instance to use
* @param leaseSelector LeaseSelector instance to be used for filtering leases during LeaseTaker execution.
* @param workerIdentifier Identifies the worker (e.g. useful to track lease ownership)
* @param leaseDurationMillis Duration of a lease
* @param epsilonMillis Allow for some variance when calculating lease expirations
* @param maxLeasesForWorker Max leases this Worker can handle at a time
* @param maxLeasesToStealAtOneTime Steal up to these many leases at a time (for load balancing)
* @param maxLeaseRenewerThreadCount Number of threads to use for lease renewal calls
* @param metricsFactory Used to publish metrics about lease operations
*/
public LeaseCoordinator(ILeaseManager<T> leaseManager,
@ -207,23 +206,47 @@ public class LeaseCoordinator<T extends Lease> {
int maxLeasesToStealAtOneTime,
int maxLeaseRenewerThreadCount,
IMetricsFactory metricsFactory) {
this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount);
this.leaseTaker = new LeaseTaker<T>(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis)
this(new LeaseTaker<>(leaseManager, leaseSelector, workerIdentifier, leaseDurationMillis)
.withMaxLeasesForWorker(maxLeasesForWorker)
.withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime);
this.leaseRenewer = new LeaseRenewer<T>(
leaseManager, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool);
.withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime),
new LeaseRenewer<>(leaseManager, workerIdentifier, leaseDurationMillis, getDefaultLeaseRenewalExecutorService(maxLeaseRenewerThreadCount)),
leaseDurationMillis,
epsilonMillis,
maxLeasesForWorker,
maxLeasesToStealAtOneTime,
metricsFactory);
}
/**
*
* @param leaseTaker LeaseTaker instance to be used.
* @param leaseRenewer LeaseRenewer instance to be used.
* @param leaseDurationMillis Duration of a lease
* @param epsilonMillis Allow for some variance when calculating lease expirations
* @param maxLeasesForWorker Max leases this Worker can handle at a time
* @param maxLeasesToStealAtOneTime Steal up to these many leases at a time (for load balancing)
* @param metricsFactory Used to publish metrics about lease operations
*/
public LeaseCoordinator(final ILeaseTaker<T> leaseTaker,
final ILeaseRenewer<T> leaseRenewer,
final long leaseDurationMillis,
final long epsilonMillis,
final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime,
final IMetricsFactory metricsFactory) {
this.leaseTaker = leaseTaker;
this.leaseRenewer = leaseRenewer;
this.metricsFactory = metricsFactory;
this.renewerIntervalMillis = leaseDurationMillis / 3 - epsilonMillis;
this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2;
this.metricsFactory = metricsFactory;
LOG.info(String.format(
"With failover time %d ms and epsilon %d ms, LeaseCoordinator will renew leases every %d ms, take" +
"leases every %d ms, process maximum of %d leases and steal %d lease(s) at a time.",
leaseDurationMillis,
epsilonMillis,
renewerIntervalMillis,
takerIntervalMillis,
this.renewerIntervalMillis,
this.takerIntervalMillis,
maxLeasesForWorker,
maxLeasesToStealAtOneTime));
}
@ -380,7 +403,7 @@ public class LeaseCoordinator<T extends Lease> {
LOG.debug("Threadpool was null, no need to shutdown/terminate threadpool.");
}
leaseRenewalThreadpool.shutdownNow();
leaseRenewer.shutdown();
synchronized (shutdownLock) {
leaseRenewer.clearCurrentlyHeldLeases();
running = false;
@ -437,10 +460,10 @@ public class LeaseCoordinator<T extends Lease> {
* @param maximumPoolSize Maximum allowed thread pool size
* @return Executor service that should be used for lease renewal.
*/
private static ExecutorService getLeaseRenewalExecutorService(int maximumPoolSize) {
public static ExecutorService getDefaultLeaseRenewalExecutorService(int maximumPoolSize) {
int coreLeaseCount = Math.max(maximumPoolSize / 4, 2);
return new ThreadPoolExecutor(coreLeaseCount, maximumPoolSize, 60, TimeUnit.SECONDS,
new LinkedTransferQueue<Runnable>(), LEASE_RENEWAL_THREAD_FACTORY);
new LinkedTransferQueue<>(), LEASE_RENEWAL_THREAD_FACTORY);
}
}

View file

@ -404,6 +404,10 @@ public class LeaseRenewer<T extends Lease> implements ILeaseRenewer<T> {
addLeasesToRenew(myLeases);
}
public void shutdown() {
executorService.shutdownNow();
}
private void verifyNotNull(Object object, String message) {
if (object == null) {
throw new IllegalArgumentException(message);

View file

@ -36,7 +36,7 @@ public interface ILeaseRenewer<T extends Lease> {
* @throws InvalidStateException if lease table doesn't exist
* @throws ProvisionedThroughputException if DynamoDB reads fail due to insufficient capacity
*/
public void initialize() throws DependencyException, InvalidStateException, ProvisionedThroughputException;
void initialize() throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* Attempt to renew all currently held leases.
@ -44,21 +44,21 @@ public interface ILeaseRenewer<T extends Lease> {
* @throws DependencyException on unexpected DynamoDB failures
* @throws InvalidStateException if lease table does not exist
*/
public void renewLeases() throws DependencyException, InvalidStateException;
void renewLeases() throws DependencyException, InvalidStateException;
/**
* @return currently held leases. Key is shardId, value is corresponding Lease object. A lease is currently held if
* we successfully renewed it on the last run of renewLeases(). Lease objects returned are deep copies -
* their lease counters will not tick.
*/
public Map<String, T> getCurrentlyHeldLeases();
Map<String, T> getCurrentlyHeldLeases();
/**
* @param leaseKey key of the lease to retrieve
*
* @return a deep copy of a currently held lease, or null if we don't hold the lease
*/
public T getCurrentlyHeldLease(String leaseKey);
T getCurrentlyHeldLease(String leaseKey);
/**
* Adds leases to this LeaseRenewer's set of currently held leases. Leases must have lastRenewalNanos set to the
@ -66,12 +66,12 @@ public interface ILeaseRenewer<T extends Lease> {
*
* @param newLeases new leases.
*/
public void addLeasesToRenew(Collection<T> newLeases);
void addLeasesToRenew(Collection<T> newLeases);
/**
* Clears this LeaseRenewer's set of currently held leases.
*/
public void clearCurrentlyHeldLeases();
void clearCurrentlyHeldLeases();
/**
* Stops the lease renewer from continunig to maintain the given lease.
@ -97,4 +97,11 @@ public interface ILeaseRenewer<T extends Lease> {
boolean updateLease(T lease, UUID concurrencyToken)
throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* Shutdown any clients and thread-pools.
*/
default void shutdown() {
// Does nothing.
}
}

View file

@ -0,0 +1,125 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.DeterministicShuffleShardSyncLeaderDecider.DETERMINISTIC_SHUFFLE_SEED;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class DeterministicShuffleShardSyncLeaderDeciderTest {
private static final String LEASE_KEY = "lease_key";
private static final String LEASE_OWNER = "lease_owner";
private static final String WORKER_ID = "worker-id";
private DeterministicShuffleShardSyncLeaderDecider leaderDecider;
@Mock
private ILeaseManager<KinesisClientLease> leaseManager;
@Mock
private ScheduledExecutorService scheduledExecutorService;
private int numShardSyncWorkers;
@Before
public void setup() {
numShardSyncWorkers = 1;
leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseManager, scheduledExecutorService, numShardSyncWorkers);
}
@Test
public void testLeaderElectionWithNullLeases() {
boolean isLeader = leaderDecider.isLeader(WORKER_ID);
assertTrue("IsLeader should return true if leaders is null", isLeader);
}
@Test
public void testLeaderElectionWithEmptyLeases() throws Exception{
when(leaseManager.listLeases()).thenReturn(new ArrayList<>());
boolean isLeader = leaderDecider.isLeader(WORKER_ID);
assertTrue("IsLeader should return true if no leases are returned", isLeader);
}
@Test
public void testElectedLeadersAsPerExpectedShufflingOrder() throws Exception {
List<KinesisClientLease> leases = getLeases(5, false /* duplicateLeaseOwner */, true /* activeLeases */);
when(leaseManager.listLeases()).thenReturn(leases);
Set<String> expectedLeaders = getExpectedLeaders(leases);
for (String leader : expectedLeaders) {
assertTrue(leaderDecider.isLeader(leader));
}
for (KinesisClientLease lease : leases) {
if (!expectedLeaders.contains(lease.getLeaseOwner())) {
assertFalse(leaderDecider.isLeader(lease.getLeaseOwner()));
}
}
}
@Test
public void testElectedLeadersAsPerExpectedShufflingOrderWhenUniqueWorkersLessThanMaxLeaders() {
this.numShardSyncWorkers = 5; // More than number of unique lease owners
leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseManager, scheduledExecutorService, numShardSyncWorkers);
List<KinesisClientLease> leases = getLeases(3, false /* duplicateLeaseOwner */, true /* activeLeases */);
Set<String> expectedLeaders = getExpectedLeaders(leases);
// All lease owners should be present in expected leaders set, and they should all be leaders.
for (KinesisClientLease lease : leases) {
assertTrue(leaderDecider.isLeader(lease.getLeaseOwner()));
assertTrue(expectedLeaders.contains(lease.getLeaseOwner()));
}
}
private List<KinesisClientLease> getLeases(int count, boolean duplicateLeaseOwner, boolean activeLeases) {
List<KinesisClientLease> leases = new ArrayList<>();
for (int i=0;i<count;i++) {
KinesisClientLease lease = new KinesisClientLease();
lease.setLeaseKey(LEASE_KEY + i);
lease.setCheckpoint(activeLeases ? ExtendedSequenceNumber.LATEST : ExtendedSequenceNumber.SHARD_END);
lease.setLeaseCounter(new Random().nextLong());
lease.setLeaseOwner(LEASE_OWNER + (duplicateLeaseOwner ? "" : i));
leases.add(lease);
}
return leases;
}
private Set<String> getExpectedLeaders(List<KinesisClientLease> leases) {
List<String> uniqueHosts = leases.stream().filter(lease -> lease.getLeaseOwner() != null)
.map(KinesisClientLease::getLeaseOwner).distinct().sorted().collect(Collectors.toList());
Collections.shuffle(uniqueHosts, new Random(DETERMINISTIC_SHUFFLE_SEED));
int numWorkers = Math.min(uniqueHosts.size(), this.numShardSyncWorkers);
return new HashSet<>(uniqueHosts.subList(0, numWorkers));
}
}

View file

@ -97,7 +97,7 @@ public class ShardConsumerTest {
private final boolean skipCheckpointValidationValue = false;
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
private static final ShardSyncer shardSyncer = new ShardSyncer(new KinesisLeaseCleanupValidator());
private static final ShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator());
// Use Executors.newFixedThreadPool since it returns ThreadPoolExecutor, which is
// ... a non-final public class, and so can be mocked and spied.
@ -120,6 +120,8 @@ public class ShardConsumerTest {
private ICheckpoint checkpoint;
@Mock
private ShutdownNotification shutdownNotification;
@Mock
ShardSyncStrategy shardSyncStrategy;
@Before
public void setup() {
@ -163,8 +165,8 @@ public class ShardConsumerTest {
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
config,
shardSyncer);
shardSyncer,
shardSyncStrategy);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
@ -212,7 +214,8 @@ public class ShardConsumerTest {
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
config,
shardSyncer);
shardSyncer,
shardSyncStrategy);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize
@ -255,7 +258,8 @@ public class ShardConsumerTest {
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
config,
shardSyncer);
shardSyncer,
shardSyncStrategy);
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null;
@ -375,7 +379,8 @@ public class ShardConsumerTest {
Optional.empty(),
Optional.empty(),
config,
shardSyncer);
shardSyncer,
shardSyncStrategy);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
@ -520,7 +525,10 @@ public class ShardConsumerTest {
Optional.empty(),
Optional.empty(),
config,
shardSyncer);
shardSyncer,
shardSyncStrategy);
when(shardSyncStrategy.onShardConsumerShutDown()).thenReturn(new TaskResult(null));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
@ -658,7 +666,8 @@ public class ShardConsumerTest {
Optional.empty(),
Optional.empty(),
config,
shardSyncer);
shardSyncer,
shardSyncStrategy);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
@ -729,7 +738,8 @@ public class ShardConsumerTest {
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
config,
shardSyncer);
shardSyncer,
shardSyncStrategy);
GetRecordsCache getRecordsCache = spy(consumer.getGetRecordsCache());
@ -783,7 +793,8 @@ public class ShardConsumerTest {
Optional.empty(),
Optional.empty(),
config,
shardSyncer);
shardSyncer,
shardSyncStrategy);
assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(),
SynchronousGetRecordsRetrievalStrategy.class);
@ -814,7 +825,8 @@ public class ShardConsumerTest {
Optional.of(1),
Optional.of(2),
config,
shardSyncer);
shardSyncer,
shardSyncStrategy);
assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(),
AsynchronousGetRecordsRetrievalStrategy.class);
@ -854,8 +866,8 @@ public class ShardConsumerTest {
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
config,
shardSyncer);
shardSyncer,
shardSyncStrategy);
shardConsumer.consumeShard();
Thread.sleep(sleepTime);

View file

@ -52,7 +52,7 @@ public class ShardSyncTaskIntegrationTest {
private static AWSCredentialsProvider credentialsProvider;
private IKinesisClientLeaseManager leaseManager;
private IKinesisProxy kinesisProxy;
private final ShardSyncer shardSyncer = new ShardSyncer(new KinesisLeaseCleanupValidator());
private final KinesisShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator());
/**
* @throws java.lang.Exception

View file

@ -59,7 +59,7 @@ import junit.framework.Assert;
*/
// CHECKSTYLE:IGNORE JavaNCSS FOR NEXT 800 LINES
public class ShardSyncerTest {
private static final Log LOG = LogFactory.getLog(ShardSyncer.class);
private static final Log LOG = LogFactory.getLog(KinesisShardSyncer.class);
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON =
@ -71,7 +71,7 @@ public class ShardSyncerTest {
LeaseManager<KinesisClientLease> leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient);
private static final int EXPONENT = 128;
protected static final KinesisLeaseCleanupValidator leaseCleanupValidator = new KinesisLeaseCleanupValidator();
private static final ShardSyncer shardSyncer = new ShardSyncer(leaseCleanupValidator);
private static final KinesisShardSyncer shardSyncer = new KinesisShardSyncer(leaseCleanupValidator);
/**
* Old/Obsolete max value of a sequence number (2^128 -1).
*/
@ -398,7 +398,7 @@ public class ShardSyncerTest {
for (int c = 1; c <= maxCallingCount; c = c + 2) {
testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
ExceptionThrowingLeaseManagerMethods.DELETELEASE, c, INITIAL_POSITION_TRIM_HORIZON);
// Need to clean up lease manager every time after calling ShardSyncer
// Need to clean up lease manager every time after calling KinesisShardSyncer
leaseManager.deleteAll();
}
}
@ -420,7 +420,7 @@ public class ShardSyncerTest {
for (int c = 1; c <= maxCallingCount; c = c + 2) {
testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
ExceptionThrowingLeaseManagerMethods.LISTLEASES, c, INITIAL_POSITION_TRIM_HORIZON);
// Need to clean up lease manager every time after calling ShardSyncer
// Need to clean up lease manager every time after calling KinesisShardSyncer
leaseManager.deleteAll();
}
}
@ -442,7 +442,7 @@ public class ShardSyncerTest {
for (int c = 1; c <= maxCallingCount; c = c + 2) {
testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS, c,INITIAL_POSITION_TRIM_HORIZON);
// Need to clean up lease manager every time after calling ShardSyncer
// Need to clean up lease manager every time after calling KinesisShardSyncer
leaseManager.deleteAll();
}
}
@ -472,7 +472,7 @@ public class ShardSyncerTest {
} catch (LeasingException e) {
LOG.debug("Catch leasing exception", e);
}
// Clear throwing exception scenario every time after calling ShardSyncer
// Clear throwing exception scenario every time after calling KinesisShardSyncer
exceptionThrowingLeaseManager.clearLeaseManagerThrowingExceptionScenario();
}
} else {
@ -517,7 +517,7 @@ public class ShardSyncerTest {
testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
ExceptionThrowingLeaseManagerMethods.DELETELEASE,
c, INITIAL_POSITION_AT_TIMESTAMP);
// Need to clean up lease manager every time after calling ShardSyncer
// Need to clean up lease manager every time after calling KinesisShardSyncer
leaseManager.deleteAll();
}
}
@ -540,7 +540,7 @@ public class ShardSyncerTest {
testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
ExceptionThrowingLeaseManagerMethods.LISTLEASES,
c, INITIAL_POSITION_AT_TIMESTAMP);
// Need to clean up lease manager every time after calling ShardSyncer
// Need to clean up lease manager every time after calling KinesisShardSyncer
leaseManager.deleteAll();
}
}
@ -563,7 +563,7 @@ public class ShardSyncerTest {
testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(
ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS,
c, INITIAL_POSITION_AT_TIMESTAMP);
// Need to clean up lease manager every time after calling ShardSyncer
// Need to clean up lease manager every time after calling KinesisShardSyncer
leaseManager.deleteAll();
}
}

View file

@ -21,7 +21,9 @@ import static org.mockito.Mockito.when;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@ -57,10 +59,13 @@ public class ShutdownTaskTest {
defaultParentShardIds,
ExtendedSequenceNumber.LATEST);
IRecordProcessor defaultRecordProcessor = new TestStreamlet();
ShardSyncer shardSyncer = new ShardSyncer(new KinesisLeaseCleanupValidator());
ShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator());
@Mock
private GetRecordsCache getRecordsCache;
@Mock
private ShardSyncStrategy shardSyncStrategy;
/**
* @throws java.lang.Exception
@ -113,7 +118,8 @@ public class ShutdownTaskTest {
leaseManager,
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache,
shardSyncer);
shardSyncer,
shardSyncStrategy);
TaskResult result = task.call();
Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof IllegalArgumentException);
@ -131,6 +137,7 @@ public class ShutdownTaskTest {
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;
when(shardSyncStrategy.onShardConsumerShutDown()).thenReturn(new TaskResult(new KinesisClientLibIOException("")));
ShutdownTask task = new ShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
@ -142,8 +149,10 @@ public class ShutdownTaskTest {
leaseManager,
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache,
shardSyncer);
shardSyncer,
shardSyncStrategy);
TaskResult result = task.call();
verify(shardSyncStrategy).onShardConsumerShutDown();
Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException);
verify(getRecordsCache).shutdown();
@ -154,7 +163,7 @@ public class ShutdownTaskTest {
*/
@Test
public final void testGetTaskType() {
ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, null, 0, getRecordsCache, shardSyncer);
ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, null, 0, getRecordsCache, shardSyncer, shardSyncStrategy);
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
}

View file

@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
@ -52,6 +53,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
@ -161,7 +163,7 @@ public class WorkerTest {
private RecordsFetcherFactory recordsFetcherFactory;
private KinesisClientLibConfiguration config;
private ShardSyncer shardSyncer = new ShardSyncer(new KinesisLeaseCleanupValidator());
private KinesisShardSyncer shardSyncer = new KinesisShardSyncer(new KinesisLeaseCleanupValidator());
@Mock
private KinesisClientLibLeaseCoordinator leaseCoordinator;
@ -189,6 +191,8 @@ public class WorkerTest {
private TaskResult taskResult;
@Mock
private WorkerStateChangeListener workerStateChangeListener;
@Mock
private ShardSyncStrategy shardSyncStrategy;
@Before
public void setup() {
@ -521,7 +525,7 @@ public class WorkerTest {
final int threadPoolSize = 2;
final int numberOfRecordsPerShard = 10;
List<Shard> shardList = createShardListWithOneSplit();
List<KinesisClientLease> initialLeases = new ArrayList<KinesisClientLease>();
List<KinesisClientLease> initialLeases = new ArrayList<>();
KinesisClientLease lease = shardSyncer.newKCLLease(shardList.get(0));
lease.setCheckpoint(new ExtendedSequenceNumber("2"));
initialLeases.add(lease);