remove unnecessary whitespace changes

This commit is contained in:
Mike Watters 2017-11-06 15:42:31 -07:00
parent 2afb0bd00b
commit 22e1750f97
8 changed files with 87 additions and 86 deletions

View file

@ -21,7 +21,7 @@ import java.util.Optional;
* and state transitions is contained within the {@link ConsumerState} objects. * and state transitions is contained within the {@link ConsumerState} objects.
* *
* <h2>State Diagram</h2> * <h2>State Diagram</h2>
* *
* <pre> * <pre>
* +-------------------+ * +-------------------+
* | Waiting on Parent | +------------------+ * | Waiting on Parent | +------------------+
@ -96,14 +96,14 @@ class ConsumerStates {
/** /**
* Represents a the current state of the consumer. This handles the creation of tasks for the consumer, and what to * Represents a the current state of the consumer. This handles the creation of tasks for the consumer, and what to
* do when a transition occurs. * do when a transition occurs.
* *
*/ */
interface ConsumerState { interface ConsumerState {
/** /**
* Creates a new task for this state using the passed in consumer to build the task. If there is no task * Creates a new task for this state using the passed in consumer to build the task. If there is no task
* required for this state it may return a null value. {@link ConsumerState}'s are allowed to modify the * required for this state it may return a null value. {@link ConsumerState}'s are allowed to modify the
* consumer during the execution of this method. * consumer during the execution of this method.
* *
* @param consumer * @param consumer
* the consumer to use build the task, or execute state. * the consumer to use build the task, or execute state.
* @return a valid task for this state or null if there is no task required. * @return a valid task for this state or null if there is no task required.
@ -113,7 +113,7 @@ class ConsumerStates {
/** /**
* Provides the next state of the consumer upon success of the task return by * Provides the next state of the consumer upon success of the task return by
* {@link ConsumerState#createTask(ShardConsumer)}. * {@link ConsumerState#createTask(ShardConsumer)}.
* *
* @return the next state that the consumer should transition to, this may be the same object as the current * @return the next state that the consumer should transition to, this may be the same object as the current
* state. * state.
*/ */
@ -122,7 +122,7 @@ class ConsumerStates {
/** /**
* Provides the next state of the consumer when a shutdown has been requested. The returned state is dependent * Provides the next state of the consumer when a shutdown has been requested. The returned state is dependent
* on the current state, and the shutdown reason. * on the current state, and the shutdown reason.
* *
* @param shutdownReason * @param shutdownReason
* the reason that a shutdown was requested * the reason that a shutdown was requested
* @return the next state that the consumer should transition to, this may be the same object as the current * @return the next state that the consumer should transition to, this may be the same object as the current
@ -133,7 +133,7 @@ class ConsumerStates {
/** /**
* The type of task that {@link ConsumerState#createTask(ShardConsumer)} would return. This is always a valid state * The type of task that {@link ConsumerState#createTask(ShardConsumer)} would return. This is always a valid state
* even if createTask would return a null value. * even if createTask would return a null value.
* *
* @return the type of task that this state represents. * @return the type of task that this state represents.
*/ */
TaskType getTaskType(); TaskType getTaskType();
@ -141,7 +141,7 @@ class ConsumerStates {
/** /**
* An enumeration represent the type of this state. Different consumer states may return the same * An enumeration represent the type of this state. Different consumer states may return the same
* {@link ShardConsumerState}. * {@link ShardConsumerState}.
* *
* @return the type of consumer state this represents. * @return the type of consumer state this represents.
*/ */
ShardConsumerState getState(); ShardConsumerState getState();

View file

@ -169,7 +169,7 @@ class ShardConsumer {
/** /**
* No-op if current task is pending, otherwise submits next task for this shard. * No-op if current task is pending, otherwise submits next task for this shard.
* This method should NOT be called if the ShardConsumer is already in SHUTDOWN_COMPLETED state. * This method should NOT be called if the ShardConsumer is already in SHUTDOWN_COMPLETED state.
* *
* @return true if a new process task was submitted, false otherwise * @return true if a new process task was submitted, false otherwise
*/ */
synchronized boolean consumeShard() { synchronized boolean consumeShard() {
@ -264,7 +264,7 @@ class ShardConsumer {
/** /**
* Requests the shutdown of the this ShardConsumer. This should give the record processor a chance to checkpoint * Requests the shutdown of the this ShardConsumer. This should give the record processor a chance to checkpoint
* before being shutdown. * before being shutdown.
* *
* @param shutdownNotification used to signal that the record processor has been given the chance to shutdown. * @param shutdownNotification used to signal that the record processor has been given the chance to shutdown.
*/ */
void notifyShutdownRequested(ShutdownNotification shutdownNotification) { void notifyShutdownRequested(ShutdownNotification shutdownNotification) {
@ -275,7 +275,7 @@ class ShardConsumer {
/** /**
* Shutdown this ShardConsumer (including invoking the RecordProcessor shutdown API). * Shutdown this ShardConsumer (including invoking the RecordProcessor shutdown API).
* This is called by Worker when it loses responsibility for a shard. * This is called by Worker when it loses responsibility for a shard.
* *
* @return true if shutdown is complete (false if shutdown is still in progress) * @return true if shutdown is complete (false if shutdown is still in progress)
*/ */
synchronized boolean beginShutdown() { synchronized boolean beginShutdown() {
@ -295,7 +295,7 @@ class ShardConsumer {
/** /**
* Used (by Worker) to check if this ShardConsumer instance has been shutdown * Used (by Worker) to check if this ShardConsumer instance has been shutdown
* RecordProcessor shutdown() has been invoked, as appropriate. * RecordProcessor shutdown() has been invoked, as appropriate.
* *
* @return true if shutdown is complete * @return true if shutdown is complete
*/ */
boolean isShutdown() { boolean isShutdown() {
@ -311,7 +311,7 @@ class ShardConsumer {
/** /**
* Figure out next task to run based on current state, task, and shutdown context. * Figure out next task to run based on current state, task, and shutdown context.
* *
* @return Return next task to run * @return Return next task to run
*/ */
private ITask getNextTask() { private ITask getNextTask() {
@ -327,7 +327,7 @@ class ShardConsumer {
/** /**
* Note: This is a private/internal method with package level access solely for testing purposes. * Note: This is a private/internal method with package level access solely for testing purposes.
* Update state based on information about: task success, current state, and shutdown info. * Update state based on information about: task success, current state, and shutdown info.
* *
* @param taskOutcome The outcome of the last task * @param taskOutcome The outcome of the last task
*/ */
void updateState(TaskOutcome taskOutcome) { void updateState(TaskOutcome taskOutcome) {
@ -359,7 +359,7 @@ class ShardConsumer {
/** /**
* Private/Internal method - has package level access solely for testing purposes. * Private/Internal method - has package level access solely for testing purposes.
* *
* @return the currentState * @return the currentState
*/ */
ConsumerStates.ShardConsumerState getCurrentState() { ConsumerStates.ShardConsumerState getCurrentState() {

View file

@ -50,7 +50,7 @@ class ShardSyncTaskManager {
/** /**
* Constructor. * Constructor.
* *
* @param kinesisProxy Proxy used to fetch streamInfo (shards) * @param kinesisProxy Proxy used to fetch streamInfo (shards)
* @param leaseManager Lease manager (used to list and create leases for shards) * @param leaseManager Lease manager (used to list and create leases for shards)
* @param initialPositionInStream Initial position in stream * @param initialPositionInStream Initial position in stream

View file

@ -67,7 +67,7 @@ class ShardSyncer {
/** /**
* Check and create leases for any new shards (e.g. following a reshard operation). * Check and create leases for any new shards (e.g. following a reshard operation).
* *
* @param kinesisProxy * @param kinesisProxy
* @param leaseManager * @param leaseManager
* @param initialPositionInStream * @param initialPositionInStream
@ -97,7 +97,7 @@ class ShardSyncer {
/** /**
* Sync leases with Kinesis shards (e.g. at startup, or when we reach end of a shard). * Sync leases with Kinesis shards (e.g. at startup, or when we reach end of a shard).
* *
* @param kinesisProxy * @param kinesisProxy
* @param leaseManager * @param leaseManager
* @param initialPosition * @param initialPosition
@ -140,10 +140,10 @@ class ShardSyncer {
MetricsHelper.addSuccessAndLatency("CreateLease", startTimeMillis, success, MetricsLevel.DETAILED); MetricsHelper.addSuccessAndLatency("CreateLease", startTimeMillis, success, MetricsLevel.DETAILED);
} }
} }
List<KinesisClientLease> trackedLeases = new ArrayList<>(); List<KinesisClientLease> trackedLeases = new ArrayList<>();
if (currentLeases != null) { if (currentLeases != null) {
trackedLeases.addAll(currentLeases); trackedLeases.addAll(currentLeases);
} }
trackedLeases.addAll(newLeasesToCreate); trackedLeases.addAll(newLeasesToCreate);
cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager); cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager);
@ -209,7 +209,7 @@ class ShardSyncer {
} }
/** /**
* Note: this has package level access for testing purposes. * Note: this has package level access for testing purposes.
* Useful for asserting that we don't have an incomplete shard list following a reshard operation. * Useful for asserting that we don't have an incomplete shard list following a reshard operation.
* We verify that if the shard is present in the shard list, it is closed and its hash key range * We verify that if the shard is present in the shard list, it is closed and its hash key range
* is covered by its child shards. * is covered by its child shards.
@ -220,17 +220,17 @@ class ShardSyncer {
*/ */
static synchronized void assertClosedShardsAreCoveredOrAbsent(Map<String, Shard> shardIdToShardMap, static synchronized void assertClosedShardsAreCoveredOrAbsent(Map<String, Shard> shardIdToShardMap,
Map<String, Set<String>> shardIdToChildShardIdsMap, Map<String, Set<String>> shardIdToChildShardIdsMap,
Set<String> shardIdsOfClosedShards) throws KinesisClientLibIOException { Set<String> shardIdsOfClosedShards) throws KinesisClientLibIOException {
String exceptionMessageSuffix = "This can happen if we constructed the list of shards " String exceptionMessageSuffix = "This can happen if we constructed the list of shards "
+ " while a reshard operation was in progress."; + " while a reshard operation was in progress.";
for (String shardId : shardIdsOfClosedShards) { for (String shardId : shardIdsOfClosedShards) {
Shard shard = shardIdToShardMap.get(shardId); Shard shard = shardIdToShardMap.get(shardId);
if (shard == null) { if (shard == null) {
LOG.info("Shard " + shardId + " is not present in Kinesis anymore."); LOG.info("Shard " + shardId + " is not present in Kinesis anymore.");
continue; continue;
} }
String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber(); String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber();
if (endingSequenceNumber == null) { if (endingSequenceNumber == null) {
throw new KinesisClientLibIOException("Shard " + shardIdsOfClosedShards throw new KinesisClientLibIOException("Shard " + shardIdsOfClosedShards
@ -250,7 +250,7 @@ class ShardSyncer {
private static synchronized void assertHashRangeOfClosedShardIsCovered(Shard closedShard, private static synchronized void assertHashRangeOfClosedShardIsCovered(Shard closedShard,
Map<String, Shard> shardIdToShardMap, Map<String, Shard> shardIdToShardMap,
Set<String> childShardIds) throws KinesisClientLibIOException { Set<String> childShardIds) throws KinesisClientLibIOException {
BigInteger startingHashKeyOfClosedShard = new BigInteger(closedShard.getHashKeyRange().getStartingHashKey()); BigInteger startingHashKeyOfClosedShard = new BigInteger(closedShard.getHashKeyRange().getStartingHashKey());
BigInteger endingHashKeyOfClosedShard = new BigInteger(closedShard.getHashKeyRange().getEndingHashKey()); BigInteger endingHashKeyOfClosedShard = new BigInteger(closedShard.getHashKeyRange().getEndingHashKey());
BigInteger minStartingHashKeyOfChildren = null; BigInteger minStartingHashKeyOfChildren = null;
@ -269,16 +269,16 @@ class ShardSyncer {
maxEndingHashKeyOfChildren = endingHashKey; maxEndingHashKeyOfChildren = endingHashKey;
} }
} }
if ((minStartingHashKeyOfChildren == null) || (maxEndingHashKeyOfChildren == null) if ((minStartingHashKeyOfChildren == null) || (maxEndingHashKeyOfChildren == null)
|| (minStartingHashKeyOfChildren.compareTo(startingHashKeyOfClosedShard) > 0) || (minStartingHashKeyOfChildren.compareTo(startingHashKeyOfClosedShard) > 0)
|| (maxEndingHashKeyOfChildren.compareTo(endingHashKeyOfClosedShard) < 0)) { || (maxEndingHashKeyOfChildren.compareTo(endingHashKeyOfClosedShard) < 0)) {
throw new KinesisClientLibIOException("Incomplete shard list: hash key range of shard " throw new KinesisClientLibIOException("Incomplete shard list: hash key range of shard "
+ closedShard.getShardId() + " is not covered by its child shards."); + closedShard.getShardId() + " is not covered by its child shards.");
} }
} }
/** /**
* Helper method to construct shardId->setOfChildShardIds map. * Helper method to construct shardId->setOfChildShardIds map.
* Note: This has package access for testing purposes only. * Note: This has package access for testing purposes only.
@ -300,7 +300,7 @@ class ShardSyncer {
} }
childShardIds.add(shardId); childShardIds.add(shardId);
} }
String adjacentParentShardId = shard.getAdjacentParentShardId(); String adjacentParentShardId = shard.getAdjacentParentShardId();
if ((adjacentParentShardId != null) && (shardIdToShardMap.containsKey(adjacentParentShardId))) { if ((adjacentParentShardId != null) && (shardIdToShardMap.containsKey(adjacentParentShardId))) {
Set<String> childShardIds = shardIdToChildShardIdsMap.get(adjacentParentShardId); Set<String> childShardIds = shardIdToChildShardIdsMap.get(adjacentParentShardId);
@ -336,13 +336,13 @@ class ShardSyncer {
* we begin processing data from any of its descendants. * we begin processing data from any of its descendants.
* * A shard does not start processing data until data from all its parents has been processed. * * A shard does not start processing data until data from all its parents has been processed.
* Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create * Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create
* leases corresponding to both the parents - the parent shard which is not a descendant will have * leases corresponding to both the parents - the parent shard which is not a descendant will have
* its checkpoint set to Latest. * its checkpoint set to Latest.
* *
* We assume that if there is an existing lease for a shard, then either: * We assume that if there is an existing lease for a shard, then either:
* * we have previously created a lease for its parent (if it was needed), or * * we have previously created a lease for its parent (if it was needed), or
* * the parent shard has expired. * * the parent shard has expired.
* *
* For example: * For example:
* Shard structure (each level depicts a stream segment): * Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102 * 0 1 2 3 4 5 - shards till epoch 102
@ -352,7 +352,7 @@ class ShardSyncer {
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Current leases: (3, 4, 5) * Current leases: (3, 4, 5)
* New leases to create: (2, 6, 7, 8, 9, 10) * New leases to create: (2, 6, 7, 8, 9, 10)
* *
* The leases returned are sorted by the starting sequence number - following the same order * The leases returned are sorted by the starting sequence number - following the same order
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail * when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
* before creating all the leases. * before creating all the leases.
@ -362,6 +362,7 @@ class ShardSyncer {
* high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only * high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only
* currently happen here if ignoreUnexpectedChildShards was true in syncShardleases. * currently happen here if ignoreUnexpectedChildShards was true in syncShardleases.
* *
*
* @param shards List of all shards in Kinesis (we'll create new leases based on this set) * @param shards List of all shards in Kinesis (we'll create new leases based on this set)
* @param currentLeases List of current leases * @param currentLeases List of current leases
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
@ -462,7 +463,7 @@ class ShardSyncer {
* Check if this shard is a descendant of a shard that is (or will be) processed. * Check if this shard is a descendant of a shard that is (or will be) processed.
* Create leases for the ancestors of this shard as required. * Create leases for the ancestors of this shard as required.
* See javadoc of determineNewLeasesToCreate() for rules and example. * See javadoc of determineNewLeasesToCreate() for rules and example.
* *
* @param shardId The shardId to check. * @param shardId The shardId to check.
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
* location in the shard (when an application starts up for the first time - and there are no checkpoints). * location in the shard (when an application starts up for the first time - and there are no checkpoints).
@ -479,7 +480,7 @@ class ShardSyncer {
Map<String, Shard> shardIdToShardMapOfAllKinesisShards, Map<String, Shard> shardIdToShardMapOfAllKinesisShards,
Map<String, KinesisClientLease> shardIdToLeaseMapOfNewShards, Map<String, KinesisClientLease> shardIdToLeaseMapOfNewShards,
Map<String, Boolean> memoizationContext) { Map<String, Boolean> memoizationContext) {
Boolean previousValue = memoizationContext.get(shardId); Boolean previousValue = memoizationContext.get(shardId);
if (previousValue != null) { if (previousValue != null) {
return previousValue; return previousValue;
@ -559,7 +560,7 @@ class ShardSyncer {
* Helper method to get parent shardIds of the current shard - includes the parent shardIds if: * Helper method to get parent shardIds of the current shard - includes the parent shardIds if:
* a/ they are not null * a/ they are not null
* b/ if they exist in the current shard map (i.e. haven't expired) * b/ if they exist in the current shard map (i.e. haven't expired)
* *
* @param shard Will return parents of this shard * @param shard Will return parents of this shard
* @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream. * @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream.
* @return Set of parentShardIds * @return Set of parentShardIds
@ -578,18 +579,18 @@ class ShardSyncer {
} }
/** /**
* Delete leases corresponding to shards that no longer exist in the stream. * Delete leases corresponding to shards that no longer exist in the stream.
* Current scheme: Delete a lease if: * Current scheme: Delete a lease if:
* * the corresponding shard is not present in the list of Kinesis shards, AND * * the corresponding shard is not present in the list of Kinesis shards, AND
* * the parentShardIds listed in the lease are also not present in the list of Kinesis shards. * * the parentShardIds listed in the lease are also not present in the list of Kinesis shards.
* @param shards List of all Kinesis shards (assumed to be a consistent snapshot - when stream is in Active state). * @param shards List of all Kinesis shards (assumed to be a consistent snapshot - when stream is in Active state).
* @param trackedLeases List of * @param trackedLeases List of
* @param kinesisProxy Kinesis proxy (used to get shard list) * @param kinesisProxy Kinesis proxy (used to get shard list)
* @param leaseManager * @param leaseManager
* @throws KinesisClientLibIOException Thrown if we couldn't get a fresh shard list from Kinesis. * @throws KinesisClientLibIOException Thrown if we couldn't get a fresh shard list from Kinesis.
* @throws ProvisionedThroughputException * @throws ProvisionedThroughputException
* @throws InvalidStateException * @throws InvalidStateException
* @throws DependencyException * @throws DependencyException
*/ */
private static void cleanupGarbageLeases(List<Shard> shards, private static void cleanupGarbageLeases(List<Shard> shards,
List<KinesisClientLease> trackedLeases, List<KinesisClientLease> trackedLeases,
@ -600,7 +601,7 @@ class ShardSyncer {
for (Shard shard : shards) { for (Shard shard : shards) {
kinesisShards.add(shard.getShardId()); kinesisShards.add(shard.getShardId());
} }
// Check if there are leases for non-existent shards // Check if there are leases for non-existent shards
List<KinesisClientLease> garbageLeases = new ArrayList<>(); List<KinesisClientLease> garbageLeases = new ArrayList<>();
for (KinesisClientLease lease : trackedLeases) { for (KinesisClientLease lease : trackedLeases) {
@ -608,10 +609,10 @@ class ShardSyncer {
garbageLeases.add(lease); garbageLeases.add(lease);
} }
} }
if (!garbageLeases.isEmpty()) { if (!garbageLeases.isEmpty()) {
LOG.info("Found " + garbageLeases.size() LOG.info("Found " + garbageLeases.size()
+ " candidate leases for cleanup. Refreshing list of" + " candidate leases for cleanup. Refreshing list of"
+ " Kinesis shards to pick up recent/latest shards"); + " Kinesis shards to pick up recent/latest shards");
List<Shard> currentShardList = getShardList(kinesisProxy); List<Shard> currentShardList = getShardList(kinesisProxy);
Set<String> currentKinesisShardIds = new HashSet<>(); Set<String> currentKinesisShardIds = new HashSet<>();
@ -627,12 +628,12 @@ class ShardSyncer {
} }
} }
} }
} }
/** /**
* Note: This method has package level access, solely for testing purposes. * Note: This method has package level access, solely for testing purposes.
* *
* @param lease Candidate shard we are considering for deletion. * @param lease Candidate shard we are considering for deletion.
* @param currentKinesisShardIds * @param currentKinesisShardIds
* @return true if neither the shard (corresponding to the lease), nor its parents are present in * @return true if neither the shard (corresponding to the lease), nor its parents are present in
@ -643,16 +644,16 @@ class ShardSyncer {
static boolean isCandidateForCleanup(KinesisClientLease lease, Set<String> currentKinesisShardIds) static boolean isCandidateForCleanup(KinesisClientLease lease, Set<String> currentKinesisShardIds)
throws KinesisClientLibIOException { throws KinesisClientLibIOException {
boolean isCandidateForCleanup = true; boolean isCandidateForCleanup = true;
if (currentKinesisShardIds.contains(lease.getLeaseKey())) { if (currentKinesisShardIds.contains(lease.getLeaseKey())) {
isCandidateForCleanup = false; isCandidateForCleanup = false;
} else { } else {
LOG.info("Found lease for non-existent shard: " + lease.getLeaseKey() + ". Checking its parent shards"); LOG.info("Found lease for non-existent shard: " + lease.getLeaseKey() + ". Checking its parent shards");
Set<String> parentShardIds = lease.getParentShardIds(); Set<String> parentShardIds = lease.getParentShardIds();
for (String parentShardId : parentShardIds) { for (String parentShardId : parentShardIds) {
// Throw an exception if the parent shard exists (but the child does not). // Throw an exception if the parent shard exists (but the child does not).
// This may be a (rare) race condition between fetching the shard list and Kinesis expiring shards. // This may be a (rare) race condition between fetching the shard list and Kinesis expiring shards.
if (currentKinesisShardIds.contains(parentShardId)) { if (currentKinesisShardIds.contains(parentShardId)) {
String message = String message =
"Parent shard " + parentShardId + " exists but not the child shard " "Parent shard " + parentShardId + " exists but not the child shard "
@ -665,14 +666,14 @@ class ShardSyncer {
return isCandidateForCleanup; return isCandidateForCleanup;
} }
/** /**
* Private helper method. * Private helper method.
* Clean up leases for shards that meet the following criteria: * Clean up leases for shards that meet the following criteria:
* a/ the shard has been fully processed (checkpoint is set to SHARD_END) * a/ the shard has been fully processed (checkpoint is set to SHARD_END)
* b/ we've begun processing all the child shards: we have leases for all child shards and their checkpoint is not * b/ we've begun processing all the child shards: we have leases for all child shards and their checkpoint is not
* TRIM_HORIZON. * TRIM_HORIZON.
* *
* @param currentLeases List of leases we evaluate for clean up * @param currentLeases List of leases we evaluate for clean up
* @param shardIdToShardMap Map of shardId->Shard (assumed to include all Kinesis shards) * @param shardIdToShardMap Map of shardId->Shard (assumed to include all Kinesis shards)
* @param shardIdToChildShardIdsMap Map of shardId->childShardIds (assumed to include all Kinesis shards) * @param shardIdToChildShardIdsMap Map of shardId->childShardIds (assumed to include all Kinesis shards)
@ -714,22 +715,22 @@ class ShardSyncer {
cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
} }
} }
} }
} }
/** /**
* Delete lease for the closed shard. Rules for deletion are: * Delete lease for the closed shard. Rules for deletion are:
* a/ the checkpoint for the closed shard is SHARD_END, * a/ the checkpoint for the closed shard is SHARD_END,
* b/ there are leases for all the childShardIds and their checkpoint is NOT TRIM_HORIZON * b/ there are leases for all the childShardIds and their checkpoint is NOT TRIM_HORIZON
* Note: This method has package level access solely for testing purposes. * Note: This method has package level access solely for testing purposes.
* *
* @param closedShardId Identifies the closed shard * @param closedShardId Identifies the closed shard
* @param childShardIds ShardIds of children of the closed shard * @param childShardIds ShardIds of children of the closed shard
* @param trackedLeases shardId->KinesisClientLease map with all leases we are tracking (should not be null) * @param trackedLeases shardId->KinesisClientLease map with all leases we are tracking (should not be null)
* @param leaseManager * @param leaseManager
* @throws ProvisionedThroughputException * @throws ProvisionedThroughputException
* @throws InvalidStateException * @throws InvalidStateException
* @throws DependencyException * @throws DependencyException
*/ */
static synchronized void cleanupLeaseForClosedShard(String closedShardId, static synchronized void cleanupLeaseForClosedShard(String closedShardId,
Set<String> childShardIds, Set<String> childShardIds,
@ -738,14 +739,14 @@ class ShardSyncer {
throws DependencyException, InvalidStateException, ProvisionedThroughputException { throws DependencyException, InvalidStateException, ProvisionedThroughputException {
KinesisClientLease leaseForClosedShard = trackedLeases.get(closedShardId); KinesisClientLease leaseForClosedShard = trackedLeases.get(closedShardId);
List<KinesisClientLease> childShardLeases = new ArrayList<>(); List<KinesisClientLease> childShardLeases = new ArrayList<>();
for (String childShardId : childShardIds) { for (String childShardId : childShardIds) {
KinesisClientLease childLease = trackedLeases.get(childShardId); KinesisClientLease childLease = trackedLeases.get(childShardId);
if (childLease != null) { if (childLease != null) {
childShardLeases.add(childLease); childShardLeases.add(childLease);
} }
} }
if ((leaseForClosedShard != null) if ((leaseForClosedShard != null)
&& (leaseForClosedShard.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) && (leaseForClosedShard.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END))
&& (childShardLeases.size() == childShardIds.size())) { && (childShardLeases.size() == childShardIds.size())) {
@ -756,7 +757,7 @@ class ShardSyncer {
break; break;
} }
} }
if (okayToDelete) { if (okayToDelete) {
LOG.info("Deleting lease for shard " + leaseForClosedShard.getLeaseKey() LOG.info("Deleting lease for shard " + leaseForClosedShard.getLeaseKey()
+ " as it has been completely processed and processing of child shards has begun."); + " as it has been completely processed and processing of child shards has begun.");
@ -768,7 +769,7 @@ class ShardSyncer {
/** /**
* Helper method to create a new KinesisClientLease POJO for a shard. * Helper method to create a new KinesisClientLease POJO for a shard.
* Note: Package level access only for testing purposes * Note: Package level access only for testing purposes
* *
* @param shard * @param shard
* @return * @return
*/ */
@ -790,7 +791,7 @@ class ShardSyncer {
/** /**
* Helper method to construct a shardId->Shard map for the specified list of shards. * Helper method to construct a shardId->Shard map for the specified list of shards.
* *
* @param shards List of shards * @param shards List of shards
* @return ShardId->Shard map * @return ShardId->Shard map
*/ */
@ -805,7 +806,7 @@ class ShardSyncer {
/** /**
* Helper method to return all the open shards for a stream. * Helper method to return all the open shards for a stream.
* Note: Package level access only for testing purposes. * Note: Package level access only for testing purposes.
* *
* @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list. * @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list.
* @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active. * @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active.
*/ */
@ -823,7 +824,7 @@ class ShardSyncer {
private static ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended position) { private static ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended position) {
ExtendedSequenceNumber checkpoint = null; ExtendedSequenceNumber checkpoint = null;
if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) { if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) {
checkpoint = ExtendedSequenceNumber.TRIM_HORIZON; checkpoint = ExtendedSequenceNumber.TRIM_HORIZON;
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.LATEST)) { } else if (position.getInitialPositionInStream().equals(InitialPositionInStream.LATEST)) {
@ -831,10 +832,10 @@ class ShardSyncer {
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) { } else if (position.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
checkpoint = ExtendedSequenceNumber.AT_TIMESTAMP; checkpoint = ExtendedSequenceNumber.AT_TIMESTAMP;
} }
return checkpoint; return checkpoint;
} }
/** Helper class to compare leases based on starting sequence number of the corresponding shards. /** Helper class to compare leases based on starting sequence number of the corresponding shards.
* *
*/ */
@ -844,7 +845,7 @@ class ShardSyncer {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private final Map<String, Shard> shardIdToShardMap; private final Map<String, Shard> shardIdToShardMap;
/** /**
* @param shardIdToShardMapOfAllKinesisShards * @param shardIdToShardMapOfAllKinesisShards
*/ */
@ -858,7 +859,7 @@ class ShardSyncer {
* We assume that lease1 and lease2 are: * We assume that lease1 and lease2 are:
* a/ not null, * a/ not null,
* b/ shards (if found) have non-null starting sequence numbers * b/ shards (if found) have non-null starting sequence numbers
* *
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
@ -868,23 +869,23 @@ class ShardSyncer {
String shardId2 = lease2.getLeaseKey(); String shardId2 = lease2.getLeaseKey();
Shard shard1 = shardIdToShardMap.get(shardId1); Shard shard1 = shardIdToShardMap.get(shardId1);
Shard shard2 = shardIdToShardMap.get(shardId2); Shard shard2 = shardIdToShardMap.get(shardId2);
// If we found shards for the two leases, use comparison of the starting sequence numbers // If we found shards for the two leases, use comparison of the starting sequence numbers
if ((shard1 != null) && (shard2 != null)) { if ((shard1 != null) && (shard2 != null)) {
BigInteger sequenceNumber1 = BigInteger sequenceNumber1 =
new BigInteger(shard1.getSequenceNumberRange().getStartingSequenceNumber()); new BigInteger(shard1.getSequenceNumberRange().getStartingSequenceNumber());
BigInteger sequenceNumber2 = BigInteger sequenceNumber2 =
new BigInteger(shard2.getSequenceNumberRange().getStartingSequenceNumber()); new BigInteger(shard2.getSequenceNumberRange().getStartingSequenceNumber());
result = sequenceNumber1.compareTo(sequenceNumber2); result = sequenceNumber1.compareTo(sequenceNumber2);
} }
if (result == 0) { if (result == 0) {
result = shardId1.compareTo(shardId2); result = shardId1.compareTo(shardId2);
} }
return result; return result;
} }
} }
} }

View file

@ -62,7 +62,7 @@ class ShutdownTask implements ITask {
boolean cleanupLeasesOfCompletedShards, boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards, boolean ignoreUnexpectedChildShards,
ILeaseManager<KinesisClientLease> leaseManager, ILeaseManager<KinesisClientLease> leaseManager,
long backoffTimeMillis, long backoffTimeMillis,
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor; this.recordProcessor = recordProcessor;
@ -80,7 +80,7 @@ class ShutdownTask implements ITask {
/* /*
* Invokes RecordProcessor shutdown() API. * Invokes RecordProcessor shutdown() API.
* (non-Javadoc) * (non-Javadoc)
* *
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call() * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call()
*/ */
@Override @Override
@ -156,7 +156,7 @@ class ShutdownTask implements ITask {
/* /*
* (non-Javadoc) * (non-Javadoc)
* *
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#getTaskType() * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#getTaskType()
*/ */
@Override @Override

View file

@ -800,7 +800,7 @@ public class Worker implements Runnable {
/** /**
* Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()} * Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()}
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings. * method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
* *
* @return Whether worker should shutdown immediately. * @return Whether worker should shutdown immediately.
*/ */
@VisibleForTesting @VisibleForTesting
@ -1172,10 +1172,10 @@ public class Worker implements Runnable {
/** /**
* Provides logic how to prioritize shard processing. * Provides logic how to prioritize shard processing.
* *
* @param shardPrioritization * @param shardPrioritization
* shardPrioritization is responsible to order shards before processing * shardPrioritization is responsible to order shards before processing
* *
* @return A reference to this updated object so that method calls can be chained together. * @return A reference to this updated object so that method calls can be chained together.
*/ */
public Builder shardPrioritization(ShardPrioritization shardPrioritization) { public Builder shardPrioritization(ShardPrioritization shardPrioritization) {

View file

@ -105,7 +105,7 @@ public class ShardSyncTaskIntegrationTest {
/** /**
* Test method for call(). * Test method for call().
* *
* @throws CapacityExceededException * @throws CapacityExceededException
* @throws DependencyException * @throws DependencyException
* @throws InvalidStateException * @throws InvalidStateException

View file

@ -157,7 +157,7 @@ public class WorkerTest {
private TaskResult taskResult; private TaskResult taskResult;
// CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES
private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY = private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY =
new com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory() { new com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory() {
@Override @Override
@ -190,8 +190,8 @@ public class WorkerTest {
}; };
} }
}; };
private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 = private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 =
new V1ToV2RecordProcessorFactoryAdapter(SAMPLE_RECORD_PROCESSOR_FACTORY); new V1ToV2RecordProcessorFactoryAdapter(SAMPLE_RECORD_PROCESSOR_FACTORY);
@ -608,7 +608,7 @@ public class WorkerTest {
* This test is testing the {@link Worker}'s shutdown behavior and by extension the behavior of * This test is testing the {@link Worker}'s shutdown behavior and by extension the behavior of
* {@link ThreadPoolExecutor#shutdownNow()}. It depends on the thread pool sending an interrupt to the pool threads. * {@link ThreadPoolExecutor#shutdownNow()}. It depends on the thread pool sending an interrupt to the pool threads.
* This behavior makes the test a bit racy, since we need to ensure a specific order of events. * This behavior makes the test a bit racy, since we need to ensure a specific order of events.
* *
* @throws Exception * @throws Exception
*/ */
@Test @Test
@ -1678,7 +1678,7 @@ public class WorkerTest {
failoverTimeMillis, failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
shardPrioritization); shardPrioritization);
WorkerThread workerThread = new WorkerThread(worker); WorkerThread workerThread = new WorkerThread(worker);
workerThread.start(); workerThread.start();
return workerThread; return workerThread;