diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java index 4235bad9..af2b7c36 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java @@ -45,6 +45,8 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.model.Shard; +import static com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease.fromHashKeyRange; + /** * Helper class to sync leases with shards of the Kinesis stream. * It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding). @@ -617,7 +619,7 @@ class KinesisShardSyncer implements ShardSyncer { } newLease.setParentShardIds(parentShardIds); newLease.setOwnerSwitchesSinceCheckpoint(0L); - + newLease.setHashKeyRange(fromHashKeyRange(shard.getHashKeyRange())); return newLease; } @@ -641,6 +643,7 @@ class KinesisShardSyncer implements ShardSyncer { newLease.setParentShardIds(parentShardIds); newLease.setOwnerSwitchesSinceCheckpoint(0L); newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + newLease.setHashKeyRange(fromHashKeyRange(childShard.getHashKeyRange())); return newLease; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 29a95ac4..9e73aad9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -243,7 +243,7 @@ class ShardConsumer { this(shardInfo, streamConfig, checkpoint, recordProcessor, recordProcessorCheckpointer, leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, kinesisDataFetcher, retryGetRecordsInSeconds, - maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, LeaseCleanupManager.createOrGetInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), + maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, LeaseCleanupManager.newInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), Executors.newSingleThreadScheduledExecutor(), metricsFactory, config.shouldCleanupLeasesUponShardCompletion(), config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(), config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords())); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index f8c66181..e7e725f8 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -96,9 +96,9 @@ public class Worker implements Runnable { // Default configs for periodic shard sync private static final int SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC = 0; private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; //Default for KCL. - static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; - static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L; - static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; + static long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; + static long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L; + static long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); private static final LeaseCleanupValidator DEFAULT_LEASE_CLEANUP_VALIDATOR = new KinesisLeaseCleanupValidator(); @@ -576,7 +576,7 @@ public class Worker implements Runnable { this.workerStateChangeListener = workerStateChangeListener; workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED); createShardSyncStrategy(config.getShardSyncStrategyType(), leaderDecider, periodicShardSyncManager); - this.leaseCleanupManager = LeaseCleanupManager.createOrGetInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), + this.leaseCleanupManager = LeaseCleanupManager.newInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), Executors.newSingleThreadScheduledExecutor(), metricsFactory, cleanupLeasesUponShardCompletion, config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(), config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords()); diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java index 310edb67..5a7fb9b2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseSerializer.java @@ -68,6 +68,11 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer implements ILeaseManager { request.setExpected(serializer.getDynamoExistentExpectation(lease.getLeaseKey())); Map updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField); - updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease)); request.setAttributeUpdates(updates); try { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java index 9e9870d3..731cfdd3 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java @@ -93,6 +93,8 @@ public class ShardSyncerTest { private LeaseManager leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient, KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE); protected static final KinesisLeaseCleanupValidator leaseCleanupValidator = new KinesisLeaseCleanupValidator(); private static final KinesisShardSyncer shardSyncer = new KinesisShardSyncer(leaseCleanupValidator); + private static final HashKeyRange hashKeyRange = new HashKeyRange().withStartingHashKey("0").withEndingHashKey("10"); + /** * Old/Obsolete max value of a sequence number (2^128 -1). */ @@ -154,10 +156,10 @@ public class ShardSyncerTest { SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); String shardId0 = "shardId-0"; - shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, hashKeyRange)); String shardId1 = "shardId-1"; - shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, hashKeyRange)); final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); @@ -183,16 +185,16 @@ public class ShardSyncerTest { SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); String shardId0 = "shardId-0"; - shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, hashKeyRange)); String shardId1 = "shardId-1"; - shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, hashKeyRange)); String shardId2 = "shardId-2"; - shards.add(ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange, hashKeyRange)); String shardIdWithLease = "shardId-3"; - shards.add(ShardObjectHelper.newShard(shardIdWithLease, shardIdWithLease, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardIdWithLease, shardIdWithLease, null, sequenceRange, hashKeyRange)); currentLeases.add(newLease(shardIdWithLease)); @@ -699,9 +701,9 @@ public class ShardSyncerTest { SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); String shardId0 = "shardId-0"; - shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, hashKeyRange)); String shardId1 = "shardId-1"; - shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, hashKeyRange)); File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1"); dataFile.deleteOnExit(); IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); @@ -731,10 +733,10 @@ public class ShardSyncerTest { SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null); String shardId0 = "shardId-0"; - shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, hashKeyRange)); String shardId1 = "shardId-1"; - shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); + shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, hashKeyRange)); Set initialPositions = new HashSet(); initialPositions.add(INITIAL_POSITION_LATEST); @@ -769,17 +771,20 @@ public class ShardSyncerTest { shardsWithoutLeases.add(ShardObjectHelper.newShard("shardId-0", null, null, - ShardObjectHelper.newSequenceNumberRange("303", "404"))); + ShardObjectHelper.newSequenceNumberRange("303", "404"), + hashKeyRange)); final String lastShardId = "shardId-1"; shardsWithoutLeases.add(ShardObjectHelper.newShard(lastShardId, null, null, - ShardObjectHelper.newSequenceNumberRange("405", null))); + ShardObjectHelper.newSequenceNumberRange("405", null), + hashKeyRange)); shardsWithLeases.add(ShardObjectHelper.newShard("shardId-2", null, null, - ShardObjectHelper.newSequenceNumberRange("202", "302"))); + ShardObjectHelper.newSequenceNumberRange("202", "302"), + hashKeyRange)); currentLeases.add(newLease("shardId-2")); final List allShards = @@ -805,12 +810,14 @@ public class ShardSyncerTest { shards.add(ShardObjectHelper.newShard(firstShardId, null, null, - ShardObjectHelper.newSequenceNumberRange("303", "404"))); + ShardObjectHelper.newSequenceNumberRange("303", "404"), + hashKeyRange)); final String lastShardId = "shardId-1"; shards.add(ShardObjectHelper.newShard(lastShardId, null, null, - ShardObjectHelper.newSequenceNumberRange("405", null))); + ShardObjectHelper.newSequenceNumberRange("405", null), + hashKeyRange)); final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases); @@ -1969,14 +1976,14 @@ public class ShardSyncerTest { Map kinesisShards = new HashMap(); String parentShardId = "shardId-parent"; - kinesisShards.put(parentShardId, ShardObjectHelper.newShard(parentShardId, null, null, null)); + kinesisShards.put(parentShardId, ShardObjectHelper.newShard(parentShardId, null, null, null, hashKeyRange)); shardIdsOfCurrentLeases.add(parentShardId); String adjacentParentShardId = "shardId-adjacentParent"; - kinesisShards.put(adjacentParentShardId, ShardObjectHelper.newShard(adjacentParentShardId, null, null, null)); + kinesisShards.put(adjacentParentShardId, ShardObjectHelper.newShard(adjacentParentShardId, null, null, null, hashKeyRange)); String shardId = "shardId-9-1"; - Shard shard = ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null); + Shard shard = ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null, hashKeyRange); kinesisShards.put(shardId, shard); final MemoizationContext memoizationContext = new MemoizationContext(); @@ -2097,6 +2104,7 @@ public class ShardSyncerTest { String adjacentParentShardId = "shardId-adjacentParent"; shard.setParentShardId(parentShardId); shard.setAdjacentParentShardId(adjacentParentShardId); + shard.setHashKeyRange(hashKeyRange); KinesisClientLease lease = shardSyncer.newKCLLease(shard); Assert.assertEquals(shardId, lease.getLeaseKey()); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index a7ca1151..50a3aa9b 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -204,6 +204,10 @@ public class WorkerTest { when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); when(leaseCoordinator.getLeaseManager()).thenReturn(mock(ILeaseManager.class)); when(streamConfig.getStreamProxy()).thenReturn(kinesisProxy); + Worker.MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 10; + Worker.MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 50; + Worker.LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 10; + } // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java index fe922520..1e3ff6de 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Set; import com.amazonaws.services.kinesis.model.ChildShard; +import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.ShardFilter; import com.amazonaws.util.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -408,11 +409,13 @@ public class KinesisLocalFileProxy implements IKinesisProxy { ChildShard leftChild = new ChildShard(); leftChild.setShardId("shardId-1"); leftChild.setParentShards(parentShards); + leftChild.setHashKeyRange(new HashKeyRange().withStartingHashKey("0").withEndingHashKey("10")); childShards.add(leftChild); ChildShard rightChild = new ChildShard(); rightChild.setShardId("shardId-2"); rightChild.setParentShards(parentShards); + rightChild.setHashKeyRange(new HashKeyRange().withStartingHashKey("11").withEndingHashKey(MAX_HASHKEY_VALUE.toString())); childShards.add(rightChild); return childShards; } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java index b13d0233..2baac1ca 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java @@ -163,6 +163,7 @@ public class KinesisLocalFileDataCreator { HashKeyRange hashKeyRange = new HashKeyRange(); hashKeyRange.setStartingHashKey(hashKeyRangeStart.toString()); hashKeyRange.setEndingHashKey(hashKeyRangeEnd.toString()); + shard.setHashKeyRange(hashKeyRange); shards.add(shard); }