Merge pull request #776 from awslabs/shard_end_corruption_issues

KCL 1.X Fix for ShardEnd corruption and preventing lease table interference in multi-app JVM
This commit is contained in:
ashwing 2021-01-25 23:16:24 -08:00 committed by GitHub
commit e873c999cd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 116 additions and 61 deletions

View file

@ -45,6 +45,8 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.Shard;
import static com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease.fromHashKeyRange;
/** /**
* Helper class to sync leases with shards of the Kinesis stream. * Helper class to sync leases with shards of the Kinesis stream.
* It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding). * It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding).
@ -617,7 +619,7 @@ class KinesisShardSyncer implements ShardSyncer {
} }
newLease.setParentShardIds(parentShardIds); newLease.setParentShardIds(parentShardIds);
newLease.setOwnerSwitchesSinceCheckpoint(0L); newLease.setOwnerSwitchesSinceCheckpoint(0L);
newLease.setHashKeyRange(fromHashKeyRange(shard.getHashKeyRange()));
return newLease; return newLease;
} }
@ -641,6 +643,7 @@ class KinesisShardSyncer implements ShardSyncer {
newLease.setParentShardIds(parentShardIds); newLease.setParentShardIds(parentShardIds);
newLease.setOwnerSwitchesSinceCheckpoint(0L); newLease.setOwnerSwitchesSinceCheckpoint(0L);
newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
newLease.setHashKeyRange(fromHashKeyRange(childShard.getHashKeyRange()));
return newLease; return newLease;
} }

View file

@ -243,7 +243,7 @@ class ShardConsumer {
this(shardInfo, streamConfig, checkpoint, recordProcessor, recordProcessorCheckpointer, leaseCoordinator, this(shardInfo, streamConfig, checkpoint, recordProcessor, recordProcessorCheckpointer, leaseCoordinator,
parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory,
backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, kinesisDataFetcher, retryGetRecordsInSeconds, backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, kinesisDataFetcher, retryGetRecordsInSeconds,
maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, LeaseCleanupManager.createOrGetInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, LeaseCleanupManager.newInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
Executors.newSingleThreadScheduledExecutor(), metricsFactory, config.shouldCleanupLeasesUponShardCompletion(), Executors.newSingleThreadScheduledExecutor(), metricsFactory, config.shouldCleanupLeasesUponShardCompletion(),
config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(), config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(),
config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords())); config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords()));

View file

@ -96,9 +96,9 @@ public class Worker implements Runnable {
// Default configs for periodic shard sync // Default configs for periodic shard sync
private static final int SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC = 0; private static final int SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC = 0;
private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; //Default for KCL. private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; //Default for KCL.
static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; static long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L;
static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L; static long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L;
static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; static long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
private static final LeaseCleanupValidator DEFAULT_LEASE_CLEANUP_VALIDATOR = new KinesisLeaseCleanupValidator(); private static final LeaseCleanupValidator DEFAULT_LEASE_CLEANUP_VALIDATOR = new KinesisLeaseCleanupValidator();
@ -576,7 +576,7 @@ public class Worker implements Runnable {
this.workerStateChangeListener = workerStateChangeListener; this.workerStateChangeListener = workerStateChangeListener;
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED); workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
createShardSyncStrategy(config.getShardSyncStrategyType(), leaderDecider, periodicShardSyncManager); createShardSyncStrategy(config.getShardSyncStrategyType(), leaderDecider, periodicShardSyncManager);
this.leaseCleanupManager = LeaseCleanupManager.createOrGetInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), this.leaseCleanupManager = LeaseCleanupManager.newInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
Executors.newSingleThreadScheduledExecutor(), metricsFactory, cleanupLeasesUponShardCompletion, Executors.newSingleThreadScheduledExecutor(), metricsFactory, cleanupLeasesUponShardCompletion,
config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(), config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(),
config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords()); config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords());

View file

@ -68,6 +68,11 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.getPendingCheckpoint().getSubSequenceNumber())); result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.getPendingCheckpoint().getSubSequenceNumber()));
} }
if(lease.getHashKeyRange() != null) {
result.put(STARTING_HASH_KEY, DynamoUtils.createAttributeValue(lease.getHashKeyRange().serializedStartingHashKey()));
result.put(ENDING_HASH_KEY, DynamoUtils.createAttributeValue(lease.getHashKeyRange().serializedEndingHashKey()));
}
return result; return result;
} }
@ -92,6 +97,12 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
); );
} }
final String startingHashKey, endingHashKey;
if (!Strings.isNullOrEmpty(startingHashKey = DynamoUtils.safeGetString(dynamoRecord, STARTING_HASH_KEY))
&& !Strings.isNullOrEmpty(endingHashKey = DynamoUtils.safeGetString(dynamoRecord, ENDING_HASH_KEY))) {
result.setHashKeyRange(HashKeyRangeForLease.deserialize(startingHashKey, endingHashKey));
}
return result; return result;
} }
@ -163,6 +174,11 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
result.put(CHILD_SHARD_IDS_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getChildShardIds()), AttributeAction.PUT)); result.put(CHILD_SHARD_IDS_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getChildShardIds()), AttributeAction.PUT));
} }
if(lease.getHashKeyRange() != null) {
result.put(STARTING_HASH_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getHashKeyRange().serializedStartingHashKey()), AttributeAction.PUT));
result.put(ENDING_HASH_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getHashKeyRange().serializedEndingHashKey()), AttributeAction.PUT));
}
if (lease.getPendingCheckpoint() != null && !lease.getPendingCheckpoint().getSequenceNumber().isEmpty()) { if (lease.getPendingCheckpoint() != null && !lease.getPendingCheckpoint().getSequenceNumber().isEmpty()) {
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getPendingCheckpoint().getSequenceNumber()), AttributeAction.PUT)); result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getPendingCheckpoint().getSequenceNumber()), AttributeAction.PUT));
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getPendingCheckpoint().getSubSequenceNumber()), AttributeAction.PUT)); result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getPendingCheckpoint().getSubSequenceNumber()), AttributeAction.PUT));
@ -181,7 +197,10 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
switch (updateField) { switch (updateField) {
case CHILD_SHARDS: case CHILD_SHARDS:
// TODO: Implement update fields for child shards if (!CollectionUtils.isNullOrEmpty(lease.getChildShardIds())) {
result.put(CHILD_SHARD_IDS_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(
lease.getChildShardIds()), AttributeAction.PUT));
}
break; break;
case HASH_KEY_RANGE: case HASH_KEY_RANGE:
if (lease.getHashKeyRange() != null) { if (lease.getHashKeyRange() != null) {

View file

@ -15,6 +15,7 @@ package com.amazonaws.services.kinesis.leases.impl;
* limitations under the License. * limitations under the License.
*/ */
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
@ -81,10 +82,8 @@ public class LeaseCleanupManager {
@Getter @Getter
private volatile boolean isRunning = false; private volatile boolean isRunning = false;
private static LeaseCleanupManager instance;
/** /**
* Factory method to return a singleton instance of {@link LeaseCleanupManager}. * Method to return a new instance of {@link LeaseCleanupManager}.
* @param kinesisProxy * @param kinesisProxy
* @param leaseManager * @param leaseManager
* @param deletionThreadPool * @param deletionThreadPool
@ -96,17 +95,13 @@ public class LeaseCleanupManager {
* @param maxRecords * @param maxRecords
* @return * @return
*/ */
public static LeaseCleanupManager createOrGetInstance(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, public static LeaseCleanupManager newInstance(IKinesisProxy kinesisProxy, ILeaseManager leaseManager,
ScheduledExecutorService deletionThreadPool, IMetricsFactory metricsFactory, ScheduledExecutorService deletionThreadPool, IMetricsFactory metricsFactory,
boolean cleanupLeasesUponShardCompletion, long leaseCleanupIntervalMillis, boolean cleanupLeasesUponShardCompletion, long leaseCleanupIntervalMillis,
long completedLeaseCleanupIntervalMillis, long garbageLeaseCleanupIntervalMillis, long completedLeaseCleanupIntervalMillis, long garbageLeaseCleanupIntervalMillis,
int maxRecords) { int maxRecords) {
if (instance == null) { return new LeaseCleanupManager(kinesisProxy, leaseManager, deletionThreadPool, metricsFactory, cleanupLeasesUponShardCompletion,
instance = new LeaseCleanupManager(kinesisProxy, leaseManager, deletionThreadPool, metricsFactory, cleanupLeasesUponShardCompletion, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords);
leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords);
}
return instance;
} }
/** /**
@ -181,6 +176,7 @@ public class LeaseCleanupManager {
boolean alreadyCheckedForGarbageCollection = false; boolean alreadyCheckedForGarbageCollection = false;
boolean wereChildShardsPresent = false; boolean wereChildShardsPresent = false;
boolean wasResourceNotFound = false; boolean wasResourceNotFound = false;
String cleanupFailureReason = "";
try { try {
if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) { if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) {
@ -189,49 +185,55 @@ public class LeaseCleanupManager {
Set<String> childShardKeys = leaseFromDDB.getChildShardIds(); Set<String> childShardKeys = leaseFromDDB.getChildShardIds();
if (CollectionUtils.isNullOrEmpty(childShardKeys)) { if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
try { try {
// throws ResourceNotFoundException
childShardKeys = getChildShardsFromService(shardInfo); childShardKeys = getChildShardsFromService(shardInfo);
if (CollectionUtils.isNullOrEmpty(childShardKeys)) { if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
LOG.error("No child shards returned from service for shard " + shardInfo.getShardId()); LOG.error("No child shards returned from service for shard " + shardInfo.getShardId());
// If no children shard is found in DDB and from service, then do not delete the lease
throw new InvalidStateException("No child shards found for this supposedly " +
"closed shard in both local DDB and in service " + shardInfo.getShardId());
} else { } else {
wereChildShardsPresent = true; wereChildShardsPresent = true;
updateLeaseWithChildShards(leasePendingDeletion, childShardKeys); updateLeaseWithChildShards(leasePendingDeletion, childShardKeys);
} }
} catch (ResourceNotFoundException e) {
throw e;
} finally { } finally {
// We rely on resource presence in service for garbage collection. Since we already
// made a call to getChildShardsFromService we would be coming to know if the resource
// is present of not. In latter case, we would throw ResourceNotFoundException, which is
// handled in catch block.
alreadyCheckedForGarbageCollection = true; alreadyCheckedForGarbageCollection = true;
} }
} else { } else {
wereChildShardsPresent = true; wereChildShardsPresent = true;
} }
try { try {
cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, childShardKeys); final CompletedShardResult completedShardResult = cleanupLeaseForCompletedShard(lease, childShardKeys);
cleanedUpCompletedLease = completedShardResult.cleanedUp();
cleanupFailureReason = completedShardResult.failureMsg();
} catch (Exception e) { } catch (Exception e) {
// Suppressing the exception here, so that we can attempt for garbage cleanup. // Suppressing the exception here, so that we can attempt for garbage cleanup.
LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId()); LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId() + " due to " + e.getMessage());
} }
} else { } else {
LOG.info("Lease not present in lease table while cleaning the shard " + shardInfo.getShardId()); LOG.info("Lease not present in lease table while cleaning the shard " + shardInfo.getShardId());
cleanedUpCompletedLease = true; cleanedUpCompletedLease = true;
} }
} }
if (!cleanedUpCompletedLease && !alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) {
if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) { // throws ResourceNotFoundException
try { wereChildShardsPresent = !CollectionUtils
wereChildShardsPresent = !CollectionUtils
.isNullOrEmpty(getChildShardsFromService(shardInfo)); .isNullOrEmpty(getChildShardsFromService(shardInfo));
} catch (ResourceNotFoundException e) {
throw e;
}
} }
} catch (ResourceNotFoundException e) { } catch (ResourceNotFoundException e) {
wasResourceNotFound = true; wasResourceNotFound = true;
cleanedUpGarbageLease = cleanupLeaseForGarbageShard(lease); cleanedUpGarbageLease = cleanupLeaseForGarbageShard(lease);
cleanupFailureReason = cleanedUpGarbageLease ? "" : "DDB Lease Deletion Failed";
} catch (Exception e) {
LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId() + " : " + e.getMessage());
cleanupFailureReason = e.getMessage();
} }
return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease, wereChildShardsPresent, return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease, wereChildShardsPresent,
wasResourceNotFound); wasResourceNotFound, cleanupFailureReason);
} }
private Set<String> getChildShardsFromService(ShardInfo shardInfo) { private Set<String> getChildShardsFromService(ShardInfo shardInfo) {
@ -239,12 +241,16 @@ public class LeaseCleanupManager {
return kinesisProxy.get(iterator, maxRecords).getChildShards().stream().map(c -> c.getShardId()).collect(Collectors.toSet()); return kinesisProxy.get(iterator, maxRecords).getChildShards().stream().map(c -> c.getShardId()).collect(Collectors.toSet());
} }
// A lease that ended with SHARD_END from ResourceNotFoundException is safe to delete if it no longer exists in the // A lease that ended with SHARD_END from ResourceNotFoundException is safe to delete if it no longer exists in the
// stream (known explicitly from ResourceNotFound being thrown when processing this shard), // stream (known explicitly from ResourceNotFound being thrown when processing this shard),
private boolean cleanupLeaseForGarbageShard(KinesisClientLease lease) throws DependencyException, ProvisionedThroughputException, InvalidStateException { private boolean cleanupLeaseForGarbageShard(KinesisClientLease lease) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
LOG.info("Deleting lease " + lease.getLeaseKey() + " as it is not present in the stream."); LOG.info("Deleting lease " + lease.getLeaseKey() + " as it is not present in the stream.");
leaseManager.deleteLease(lease); try {
leaseManager.deleteLease(lease);
} catch (Exception e) {
LOG.warn("Lease deletion failed for " + lease.getLeaseKey() + " due to " + e.getMessage());
return false;
}
return true; return true;
} }
@ -264,8 +270,9 @@ public class LeaseCleanupManager {
// We should only be deleting the current shard's lease if // We should only be deleting the current shard's lease if
// 1. All of its children are currently being processed, i.e their checkpoint is not TRIM_HORIZON or AT_TIMESTAMP. // 1. All of its children are currently being processed, i.e their checkpoint is not TRIM_HORIZON or AT_TIMESTAMP.
// 2. Its parent shard lease(s) have already been deleted. // 2. Its parent shard lease(s) have already been deleted.
private boolean cleanupLeaseForCompletedShard(KinesisClientLease lease, Set<String> childShardLeaseKeys) private CompletedShardResult cleanupLeaseForCompletedShard(KinesisClientLease lease, Set<String> childShardLeaseKeys)
throws DependencyException, ProvisionedThroughputException, InvalidStateException, IllegalStateException { throws DependencyException, ProvisionedThroughputException, InvalidStateException, IllegalStateException {
final Set<String> processedChildShardLeaseKeys = new HashSet<>(); final Set<String> processedChildShardLeaseKeys = new HashSet<>();
for (String childShardLeaseKey : childShardLeaseKeys) { for (String childShardLeaseKey : childShardLeaseKeys) {
@ -281,14 +288,17 @@ public class LeaseCleanupManager {
} }
} }
if (!allParentShardLeasesDeleted(lease) || !Objects.equals(childShardLeaseKeys, processedChildShardLeaseKeys)) { boolean parentShardsDeleted = allParentShardLeasesDeleted(lease);
return false; boolean childrenStartedProcessing = Objects.equals(childShardLeaseKeys, processedChildShardLeaseKeys);
if (!parentShardsDeleted || !childrenStartedProcessing) {
return new CompletedShardResult(false, !parentShardsDeleted ? "Parent shard(s) not deleted yet" : "Child shard(s) yet to begin processing");
} }
LOG.info("Deleting lease " + lease.getLeaseKey() + " as it has been completely processed and processing of child shard(s) has begun."); LOG.info("Deleting lease " + lease.getLeaseKey() + " as it has been completely processed and processing of child shard(s) has begun.");
leaseManager.deleteLease(lease); leaseManager.deleteLease(lease);
return true; return new CompletedShardResult(true, "");
} }
private void updateLeaseWithChildShards(LeasePendingDeletion leasePendingDeletion, Set<String> childShardKeys) private void updateLeaseWithChildShards(LeasePendingDeletion leasePendingDeletion, Set<String> childShardKeys)
@ -296,7 +306,7 @@ public class LeaseCleanupManager {
final KinesisClientLease updatedLease = leasePendingDeletion.lease(); final KinesisClientLease updatedLease = leasePendingDeletion.lease();
updatedLease.setChildShardIds(childShardKeys); updatedLease.setChildShardIds(childShardKeys);
leaseManager.updateLease(updatedLease); leaseManager.updateLeaseWithMetaInfo(updatedLease, UpdateField.CHILD_SHARDS);
} }
@VisibleForTesting @VisibleForTesting
@ -364,9 +374,17 @@ public class LeaseCleanupManager {
boolean cleanedUpGarbageLease; boolean cleanedUpGarbageLease;
boolean wereChildShardsPresent; boolean wereChildShardsPresent;
boolean wasResourceNotFound; boolean wasResourceNotFound;
String cleanupFailureReason;
public boolean leaseCleanedUp() { public boolean leaseCleanedUp() {
return cleanedUpCompletedLease | cleanedUpGarbageLease; return cleanedUpCompletedLease | cleanedUpGarbageLease;
} }
} }
@Value
@Accessors(fluent = true)
private static class CompletedShardResult {
boolean cleanedUp;
String failureMsg;
}
} }

View file

@ -624,7 +624,6 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
request.setExpected(serializer.getDynamoExistentExpectation(lease.getLeaseKey())); request.setExpected(serializer.getDynamoExistentExpectation(lease.getLeaseKey()));
Map<String, AttributeValueUpdate> updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField); Map<String, AttributeValueUpdate> updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField);
updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease));
request.setAttributeUpdates(updates); request.setAttributeUpdates(updates);
try { try {

View file

@ -93,6 +93,8 @@ public class ShardSyncerTest {
private LeaseManager<KinesisClientLease> leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient, KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE); private LeaseManager<KinesisClientLease> leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient, KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE);
protected static final KinesisLeaseCleanupValidator leaseCleanupValidator = new KinesisLeaseCleanupValidator(); protected static final KinesisLeaseCleanupValidator leaseCleanupValidator = new KinesisLeaseCleanupValidator();
private static final KinesisShardSyncer shardSyncer = new KinesisShardSyncer(leaseCleanupValidator); private static final KinesisShardSyncer shardSyncer = new KinesisShardSyncer(leaseCleanupValidator);
private static final HashKeyRange hashKeyRange = new HashKeyRange().withStartingHashKey("0").withEndingHashKey("10");
/** /**
* Old/Obsolete max value of a sequence number (2^128 -1). * Old/Obsolete max value of a sequence number (2^128 -1).
*/ */
@ -154,10 +156,10 @@ public class ShardSyncerTest {
SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
String shardId0 = "shardId-0"; String shardId0 = "shardId-0";
shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange)); shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, hashKeyRange));
String shardId1 = "shardId-1"; String shardId1 = "shardId-1";
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, hashKeyRange));
final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases);
@ -183,16 +185,16 @@ public class ShardSyncerTest {
SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
String shardId0 = "shardId-0"; String shardId0 = "shardId-0";
shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange)); shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, hashKeyRange));
String shardId1 = "shardId-1"; String shardId1 = "shardId-1";
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, hashKeyRange));
String shardId2 = "shardId-2"; String shardId2 = "shardId-2";
shards.add(ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange)); shards.add(ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange, hashKeyRange));
String shardIdWithLease = "shardId-3"; String shardIdWithLease = "shardId-3";
shards.add(ShardObjectHelper.newShard(shardIdWithLease, shardIdWithLease, null, sequenceRange)); shards.add(ShardObjectHelper.newShard(shardIdWithLease, shardIdWithLease, null, sequenceRange, hashKeyRange));
currentLeases.add(newLease(shardIdWithLease)); currentLeases.add(newLease(shardIdWithLease));
@ -699,9 +701,9 @@ public class ShardSyncerTest {
SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
String shardId0 = "shardId-0"; String shardId0 = "shardId-0";
shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange)); shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, hashKeyRange));
String shardId1 = "shardId-1"; String shardId1 = "shardId-1";
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, hashKeyRange));
File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1"); File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1");
dataFile.deleteOnExit(); dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
@ -731,10 +733,10 @@ public class ShardSyncerTest {
SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
String shardId0 = "shardId-0"; String shardId0 = "shardId-0";
shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange)); shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, hashKeyRange));
String shardId1 = "shardId-1"; String shardId1 = "shardId-1";
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, hashKeyRange));
Set<InitialPositionInStreamExtended> initialPositions = new HashSet<InitialPositionInStreamExtended>(); Set<InitialPositionInStreamExtended> initialPositions = new HashSet<InitialPositionInStreamExtended>();
initialPositions.add(INITIAL_POSITION_LATEST); initialPositions.add(INITIAL_POSITION_LATEST);
@ -769,17 +771,20 @@ public class ShardSyncerTest {
shardsWithoutLeases.add(ShardObjectHelper.newShard("shardId-0", shardsWithoutLeases.add(ShardObjectHelper.newShard("shardId-0",
null, null,
null, null,
ShardObjectHelper.newSequenceNumberRange("303", "404"))); ShardObjectHelper.newSequenceNumberRange("303", "404"),
hashKeyRange));
final String lastShardId = "shardId-1"; final String lastShardId = "shardId-1";
shardsWithoutLeases.add(ShardObjectHelper.newShard(lastShardId, shardsWithoutLeases.add(ShardObjectHelper.newShard(lastShardId,
null, null,
null, null,
ShardObjectHelper.newSequenceNumberRange("405", null))); ShardObjectHelper.newSequenceNumberRange("405", null),
hashKeyRange));
shardsWithLeases.add(ShardObjectHelper.newShard("shardId-2", shardsWithLeases.add(ShardObjectHelper.newShard("shardId-2",
null, null,
null, null,
ShardObjectHelper.newSequenceNumberRange("202", "302"))); ShardObjectHelper.newSequenceNumberRange("202", "302"),
hashKeyRange));
currentLeases.add(newLease("shardId-2")); currentLeases.add(newLease("shardId-2"));
final List<Shard> allShards = final List<Shard> allShards =
@ -805,12 +810,14 @@ public class ShardSyncerTest {
shards.add(ShardObjectHelper.newShard(firstShardId, shards.add(ShardObjectHelper.newShard(firstShardId,
null, null,
null, null,
ShardObjectHelper.newSequenceNumberRange("303", "404"))); ShardObjectHelper.newSequenceNumberRange("303", "404"),
hashKeyRange));
final String lastShardId = "shardId-1"; final String lastShardId = "shardId-1";
shards.add(ShardObjectHelper.newShard(lastShardId, shards.add(ShardObjectHelper.newShard(lastShardId,
null, null,
null, null,
ShardObjectHelper.newSequenceNumberRange("405", null))); ShardObjectHelper.newSequenceNumberRange("405", null),
hashKeyRange));
final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases);
@ -1969,14 +1976,14 @@ public class ShardSyncerTest {
Map<String, Shard> kinesisShards = new HashMap<String, Shard>(); Map<String, Shard> kinesisShards = new HashMap<String, Shard>();
String parentShardId = "shardId-parent"; String parentShardId = "shardId-parent";
kinesisShards.put(parentShardId, ShardObjectHelper.newShard(parentShardId, null, null, null)); kinesisShards.put(parentShardId, ShardObjectHelper.newShard(parentShardId, null, null, null, hashKeyRange));
shardIdsOfCurrentLeases.add(parentShardId); shardIdsOfCurrentLeases.add(parentShardId);
String adjacentParentShardId = "shardId-adjacentParent"; String adjacentParentShardId = "shardId-adjacentParent";
kinesisShards.put(adjacentParentShardId, ShardObjectHelper.newShard(adjacentParentShardId, null, null, null)); kinesisShards.put(adjacentParentShardId, ShardObjectHelper.newShard(adjacentParentShardId, null, null, null, hashKeyRange));
String shardId = "shardId-9-1"; String shardId = "shardId-9-1";
Shard shard = ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null); Shard shard = ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null, hashKeyRange);
kinesisShards.put(shardId, shard); kinesisShards.put(shardId, shard);
final MemoizationContext memoizationContext = new MemoizationContext(); final MemoizationContext memoizationContext = new MemoizationContext();
@ -2097,6 +2104,7 @@ public class ShardSyncerTest {
String adjacentParentShardId = "shardId-adjacentParent"; String adjacentParentShardId = "shardId-adjacentParent";
shard.setParentShardId(parentShardId); shard.setParentShardId(parentShardId);
shard.setAdjacentParentShardId(adjacentParentShardId); shard.setAdjacentParentShardId(adjacentParentShardId);
shard.setHashKeyRange(hashKeyRange);
KinesisClientLease lease = shardSyncer.newKCLLease(shard); KinesisClientLease lease = shardSyncer.newKCLLease(shard);
Assert.assertEquals(shardId, lease.getLeaseKey()); Assert.assertEquals(shardId, lease.getLeaseKey());

View file

@ -204,6 +204,10 @@ public class WorkerTest {
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
when(leaseCoordinator.getLeaseManager()).thenReturn(mock(ILeaseManager.class)); when(leaseCoordinator.getLeaseManager()).thenReturn(mock(ILeaseManager.class));
when(streamConfig.getStreamProxy()).thenReturn(kinesisProxy); when(streamConfig.getStreamProxy()).thenReturn(kinesisProxy);
Worker.MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 10;
Worker.MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 50;
Worker.LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 10;
} }
// CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES

View file

@ -35,6 +35,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.ShardFilter; import com.amazonaws.services.kinesis.model.ShardFilter;
import com.amazonaws.util.CollectionUtils; import com.amazonaws.util.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -408,11 +409,13 @@ public class KinesisLocalFileProxy implements IKinesisProxy {
ChildShard leftChild = new ChildShard(); ChildShard leftChild = new ChildShard();
leftChild.setShardId("shardId-1"); leftChild.setShardId("shardId-1");
leftChild.setParentShards(parentShards); leftChild.setParentShards(parentShards);
leftChild.setHashKeyRange(new HashKeyRange().withStartingHashKey("0").withEndingHashKey("10"));
childShards.add(leftChild); childShards.add(leftChild);
ChildShard rightChild = new ChildShard(); ChildShard rightChild = new ChildShard();
rightChild.setShardId("shardId-2"); rightChild.setShardId("shardId-2");
rightChild.setParentShards(parentShards); rightChild.setParentShards(parentShards);
rightChild.setHashKeyRange(new HashKeyRange().withStartingHashKey("11").withEndingHashKey(MAX_HASHKEY_VALUE.toString()));
childShards.add(rightChild); childShards.add(rightChild);
return childShards; return childShards;
} }

View file

@ -163,6 +163,7 @@ public class KinesisLocalFileDataCreator {
HashKeyRange hashKeyRange = new HashKeyRange(); HashKeyRange hashKeyRange = new HashKeyRange();
hashKeyRange.setStartingHashKey(hashKeyRangeStart.toString()); hashKeyRange.setStartingHashKey(hashKeyRangeStart.toString());
hashKeyRange.setEndingHashKey(hashKeyRangeEnd.toString()); hashKeyRange.setEndingHashKey(hashKeyRangeEnd.toString());
shard.setHashKeyRange(hashKeyRange);
shards.add(shard); shards.add(shard);
} }