diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java index 4235bad9..af2b7c36 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java @@ -45,6 +45,8 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.model.Shard; +import static com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease.fromHashKeyRange; + /** * Helper class to sync leases with shards of the Kinesis stream. * It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding). @@ -617,7 +619,7 @@ class KinesisShardSyncer implements ShardSyncer { } newLease.setParentShardIds(parentShardIds); newLease.setOwnerSwitchesSinceCheckpoint(0L); - + newLease.setHashKeyRange(fromHashKeyRange(shard.getHashKeyRange())); return newLease; } @@ -641,6 +643,7 @@ class KinesisShardSyncer implements ShardSyncer { newLease.setParentShardIds(parentShardIds); newLease.setOwnerSwitchesSinceCheckpoint(0L); newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + newLease.setHashKeyRange(fromHashKeyRange(childShard.getHashKeyRange())); return newLease; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 29a95ac4..9e73aad9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -243,7 +243,7 @@ class ShardConsumer { this(shardInfo, streamConfig, checkpoint, recordProcessor, recordProcessorCheckpointer, leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, kinesisDataFetcher, retryGetRecordsInSeconds, - maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, LeaseCleanupManager.createOrGetInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), + maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, LeaseCleanupManager.newInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), Executors.newSingleThreadScheduledExecutor(), metricsFactory, config.shouldCleanupLeasesUponShardCompletion(), config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(), config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords())); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index f8c66181..e7e725f8 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -96,9 +96,9 @@ public class Worker implements Runnable { // Default configs for periodic shard sync private static final int SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC = 0; private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; //Default for KCL. - static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; - static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L; - static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; + static long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; + static long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L; + static long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); private static final LeaseCleanupValidator DEFAULT_LEASE_CLEANUP_VALIDATOR = new KinesisLeaseCleanupValidator(); @@ -576,7 +576,7 @@ public class Worker implements Runnable { this.workerStateChangeListener = workerStateChangeListener; workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED); createShardSyncStrategy(config.getShardSyncStrategyType(), leaderDecider, periodicShardSyncManager); - this.leaseCleanupManager = LeaseCleanupManager.createOrGetInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), + this.leaseCleanupManager = LeaseCleanupManager.newInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), Executors.newSingleThreadScheduledExecutor(), metricsFactory, cleanupLeasesUponShardCompletion, config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(), config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords()); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java index 310edb67..5a7fb9b2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java @@ -68,6 +68,11 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer childShardKeys = leaseFromDDB.getChildShardIds(); if (CollectionUtils.isNullOrEmpty(childShardKeys)) { try { + // throws ResourceNotFoundException childShardKeys = getChildShardsFromService(shardInfo); - if (CollectionUtils.isNullOrEmpty(childShardKeys)) { LOG.error("No child shards returned from service for shard " + shardInfo.getShardId()); + // If no children shard is found in DDB and from service, then do not delete the lease + throw new InvalidStateException("No child shards found for this supposedly " + + "closed shard in both local DDB and in service " + shardInfo.getShardId()); } else { wereChildShardsPresent = true; updateLeaseWithChildShards(leasePendingDeletion, childShardKeys); } - } catch (ResourceNotFoundException e) { - throw e; } finally { + // We rely on resource presence in service for garbage collection. Since we already + // made a call to getChildShardsFromService we would be coming to know if the resource + // is present of not. In latter case, we would throw ResourceNotFoundException, which is + // handled in catch block. alreadyCheckedForGarbageCollection = true; } } else { wereChildShardsPresent = true; } try { - cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, childShardKeys); + final CompletedShardResult completedShardResult = cleanupLeaseForCompletedShard(lease, childShardKeys); + cleanedUpCompletedLease = completedShardResult.cleanedUp(); + cleanupFailureReason = completedShardResult.failureMsg(); } catch (Exception e) { // Suppressing the exception here, so that we can attempt for garbage cleanup. - LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId()); + LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId() + " due to " + e.getMessage()); } } else { LOG.info("Lease not present in lease table while cleaning the shard " + shardInfo.getShardId()); cleanedUpCompletedLease = true; } } - - if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) { - try { - wereChildShardsPresent = !CollectionUtils + if (!cleanedUpCompletedLease && !alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) { + // throws ResourceNotFoundException + wereChildShardsPresent = !CollectionUtils .isNullOrEmpty(getChildShardsFromService(shardInfo)); - } catch (ResourceNotFoundException e) { - throw e; - } } } catch (ResourceNotFoundException e) { wasResourceNotFound = true; cleanedUpGarbageLease = cleanupLeaseForGarbageShard(lease); + cleanupFailureReason = cleanedUpGarbageLease ? "" : "DDB Lease Deletion Failed"; + } catch (Exception e) { + LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId() + " : " + e.getMessage()); + cleanupFailureReason = e.getMessage(); } - return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease, wereChildShardsPresent, - wasResourceNotFound); + wasResourceNotFound, cleanupFailureReason); } private Set getChildShardsFromService(ShardInfo shardInfo) { @@ -239,12 +241,16 @@ public class LeaseCleanupManager { return kinesisProxy.get(iterator, maxRecords).getChildShards().stream().map(c -> c.getShardId()).collect(Collectors.toSet()); } - // A lease that ended with SHARD_END from ResourceNotFoundException is safe to delete if it no longer exists in the // stream (known explicitly from ResourceNotFound being thrown when processing this shard), private boolean cleanupLeaseForGarbageShard(KinesisClientLease lease) throws DependencyException, ProvisionedThroughputException, InvalidStateException { LOG.info("Deleting lease " + lease.getLeaseKey() + " as it is not present in the stream."); - leaseManager.deleteLease(lease); + try { + leaseManager.deleteLease(lease); + } catch (Exception e) { + LOG.warn("Lease deletion failed for " + lease.getLeaseKey() + " due to " + e.getMessage()); + return false; + } return true; } @@ -264,8 +270,9 @@ public class LeaseCleanupManager { // We should only be deleting the current shard's lease if // 1. All of its children are currently being processed, i.e their checkpoint is not TRIM_HORIZON or AT_TIMESTAMP. // 2. Its parent shard lease(s) have already been deleted. - private boolean cleanupLeaseForCompletedShard(KinesisClientLease lease, Set childShardLeaseKeys) + private CompletedShardResult cleanupLeaseForCompletedShard(KinesisClientLease lease, Set childShardLeaseKeys) throws DependencyException, ProvisionedThroughputException, InvalidStateException, IllegalStateException { + final Set processedChildShardLeaseKeys = new HashSet<>(); for (String childShardLeaseKey : childShardLeaseKeys) { @@ -281,14 +288,17 @@ public class LeaseCleanupManager { } } - if (!allParentShardLeasesDeleted(lease) || !Objects.equals(childShardLeaseKeys, processedChildShardLeaseKeys)) { - return false; + boolean parentShardsDeleted = allParentShardLeasesDeleted(lease); + boolean childrenStartedProcessing = Objects.equals(childShardLeaseKeys, processedChildShardLeaseKeys); + + if (!parentShardsDeleted || !childrenStartedProcessing) { + return new CompletedShardResult(false, !parentShardsDeleted ? "Parent shard(s) not deleted yet" : "Child shard(s) yet to begin processing"); } LOG.info("Deleting lease " + lease.getLeaseKey() + " as it has been completely processed and processing of child shard(s) has begun."); leaseManager.deleteLease(lease); - return true; + return new CompletedShardResult(true, ""); } private void updateLeaseWithChildShards(LeasePendingDeletion leasePendingDeletion, Set childShardKeys) @@ -296,7 +306,7 @@ public class LeaseCleanupManager { final KinesisClientLease updatedLease = leasePendingDeletion.lease(); updatedLease.setChildShardIds(childShardKeys); - leaseManager.updateLease(updatedLease); + leaseManager.updateLeaseWithMetaInfo(updatedLease, UpdateField.CHILD_SHARDS); } @VisibleForTesting @@ -364,9 +374,17 @@ public class LeaseCleanupManager { boolean cleanedUpGarbageLease; boolean wereChildShardsPresent; boolean wasResourceNotFound; + String cleanupFailureReason; public boolean leaseCleanedUp() { return cleanedUpCompletedLease | cleanedUpGarbageLease; } } + + @Value + @Accessors(fluent = true) + private static class CompletedShardResult { + boolean cleanedUp; + String failureMsg; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java index 00a3e755..d813e627 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java @@ -624,7 +624,6 @@ public class LeaseManager implements ILeaseManager { request.setExpected(serializer.getDynamoExistentExpectation(lease.getLeaseKey())); Map updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField); - updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease)); request.setAttributeUpdates(updates); try { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java index 9e9870d3..731cfdd3 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java @@ -93,6 +93,8 @@ public class ShardSyncerTest { private LeaseManager leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient, KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE); protected static final KinesisLeaseCleanupValidator leaseCleanupValidator = new KinesisLeaseCleanupValidator(); private static final KinesisShardSyncer shardSyncer = new KinesisShardSyncer(leaseCleanupValidator); + private static final HashKeyRange hashKeyRange = new HashKeyRange().withStartingHashKey("0").withEndingHashKey("10"); + /** * Old/Obsolete max value of a sequence number (2^128 -1). */ @@ -154,10 +156,10 @@ public class ShardSyncerTest { SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); String shardId0 = "shardId-0"; - shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, hashKeyRange)); String shardId1 = "shardId-1"; - shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, hashKeyRange)); final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); @@ -183,16 +185,16 @@ public class ShardSyncerTest { SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); String shardId0 = "shardId-0"; - shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, hashKeyRange)); String shardId1 = "shardId-1"; - shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, hashKeyRange)); String shardId2 = "shardId-2"; - shards.add(ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange, hashKeyRange)); String shardIdWithLease = "shardId-3"; - shards.add(ShardObjectHelper.newShard(shardIdWithLease, shardIdWithLease, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardIdWithLease, shardIdWithLease, null, sequenceRange, hashKeyRange)); currentLeases.add(newLease(shardIdWithLease)); @@ -699,9 +701,9 @@ public class ShardSyncerTest { SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); String shardId0 = "shardId-0"; - shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, hashKeyRange)); String shardId1 = "shardId-1"; - shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, hashKeyRange)); File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1"); dataFile.deleteOnExit(); IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); @@ -731,10 +733,10 @@ public class ShardSyncerTest { SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); String shardId0 = "shardId-0"; - shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, hashKeyRange)); String shardId1 = "shardId-1"; - shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, hashKeyRange)); Set initialPositions = new HashSet(); initialPositions.add(INITIAL_POSITION_LATEST); @@ -769,17 +771,20 @@ public class ShardSyncerTest { shardsWithoutLeases.add(ShardObjectHelper.newShard("shardId-0", null, null, - ShardObjectHelper.newSequenceNumberRange("303", "404"))); + ShardObjectHelper.newSequenceNumberRange("303", "404"), + hashKeyRange)); final String lastShardId = "shardId-1"; shardsWithoutLeases.add(ShardObjectHelper.newShard(lastShardId, null, null, - ShardObjectHelper.newSequenceNumberRange("405", null))); + ShardObjectHelper.newSequenceNumberRange("405", null), + hashKeyRange)); shardsWithLeases.add(ShardObjectHelper.newShard("shardId-2", null, null, - ShardObjectHelper.newSequenceNumberRange("202", "302"))); + ShardObjectHelper.newSequenceNumberRange("202", "302"), + hashKeyRange)); currentLeases.add(newLease("shardId-2")); final List allShards = @@ -805,12 +810,14 @@ public class ShardSyncerTest { shards.add(ShardObjectHelper.newShard(firstShardId, null, null, - ShardObjectHelper.newSequenceNumberRange("303", "404"))); + ShardObjectHelper.newSequenceNumberRange("303", "404"), + hashKeyRange)); final String lastShardId = "shardId-1"; shards.add(ShardObjectHelper.newShard(lastShardId, null, null, - ShardObjectHelper.newSequenceNumberRange("405", null))); + ShardObjectHelper.newSequenceNumberRange("405", null), + hashKeyRange)); final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); @@ -1969,14 +1976,14 @@ public class ShardSyncerTest { Map kinesisShards = new HashMap(); String parentShardId = "shardId-parent"; - kinesisShards.put(parentShardId, ShardObjectHelper.newShard(parentShardId, null, null, null)); + kinesisShards.put(parentShardId, ShardObjectHelper.newShard(parentShardId, null, null, null, hashKeyRange)); shardIdsOfCurrentLeases.add(parentShardId); String adjacentParentShardId = "shardId-adjacentParent"; - kinesisShards.put(adjacentParentShardId, ShardObjectHelper.newShard(adjacentParentShardId, null, null, null)); + kinesisShards.put(adjacentParentShardId, ShardObjectHelper.newShard(adjacentParentShardId, null, null, null, hashKeyRange)); String shardId = "shardId-9-1"; - Shard shard = ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null); + Shard shard = ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null, hashKeyRange); kinesisShards.put(shardId, shard); final MemoizationContext memoizationContext = new MemoizationContext(); @@ -2097,6 +2104,7 @@ public class ShardSyncerTest { String adjacentParentShardId = "shardId-adjacentParent"; shard.setParentShardId(parentShardId); shard.setAdjacentParentShardId(adjacentParentShardId); + shard.setHashKeyRange(hashKeyRange); KinesisClientLease lease = shardSyncer.newKCLLease(shard); Assert.assertEquals(shardId, lease.getLeaseKey()); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index a7ca1151..50a3aa9b 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -204,6 +204,10 @@ public class WorkerTest { when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); when(leaseCoordinator.getLeaseManager()).thenReturn(mock(ILeaseManager.class)); when(streamConfig.getStreamProxy()).thenReturn(kinesisProxy); + Worker.MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 10; + Worker.MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 50; + Worker.LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 10; + } // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java index fe922520..1e3ff6de 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Set; import com.amazonaws.services.kinesis.model.ChildShard; +import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.ShardFilter; import com.amazonaws.util.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -408,11 +409,13 @@ public class KinesisLocalFileProxy implements IKinesisProxy { ChildShard leftChild = new ChildShard(); leftChild.setShardId("shardId-1"); leftChild.setParentShards(parentShards); + leftChild.setHashKeyRange(new HashKeyRange().withStartingHashKey("0").withEndingHashKey("10")); childShards.add(leftChild); ChildShard rightChild = new ChildShard(); rightChild.setShardId("shardId-2"); rightChild.setParentShards(parentShards); + rightChild.setHashKeyRange(new HashKeyRange().withStartingHashKey("11").withEndingHashKey(MAX_HASHKEY_VALUE.toString())); childShards.add(rightChild); return childShards; } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java index b13d0233..2baac1ca 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java @@ -163,6 +163,7 @@ public class KinesisLocalFileDataCreator { HashKeyRange hashKeyRange = new HashKeyRange(); hashKeyRange.setStartingHashKey(hashKeyRangeStart.toString()); hashKeyRange.setEndingHashKey(hashKeyRangeEnd.toString()); + shard.setHashKeyRange(hashKeyRange); shards.add(shard); }