1. Fix for updating HashRange directly while creating leases and including HashRange in seralizing and deserializing to/from DDB

2. Fix for making LeaseCleanupManager non-singleton to avoid cross-table interference in multiple apps running in same JVM
3. Fixing updateMetaInfo method to not update other lease table fields
4. Preventing shard deletion in LeaseCleanupManager if a valid shard does not have child shards in lease table and in Kinesis Service
5. Adding childshards update support in updateMetaInfo
6. Fixing LeaseCleanupManager to call updateMetaInfo instead of update for childshard update in lease

7. Fixing unit tests to accommodate HashRange changes
This commit is contained in:
Ashwin Giridharan 2021-01-22 21:51:03 -08:00
parent 6fbfc21ad7
commit 9cb5020022
10 changed files with 77 additions and 41 deletions

View file

@ -45,6 +45,8 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.amazonaws.services.kinesis.model.Shard;
import static com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease.fromHashKeyRange;
/**
* Helper class to sync leases with shards of the Kinesis stream.
* It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding).
@ -617,7 +619,7 @@ class KinesisShardSyncer implements ShardSyncer {
}
newLease.setParentShardIds(parentShardIds);
newLease.setOwnerSwitchesSinceCheckpoint(0L);
newLease.setHashKeyRange(fromHashKeyRange(shard.getHashKeyRange()));
return newLease;
}
@ -641,6 +643,7 @@ class KinesisShardSyncer implements ShardSyncer {
newLease.setParentShardIds(parentShardIds);
newLease.setOwnerSwitchesSinceCheckpoint(0L);
newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
newLease.setHashKeyRange(fromHashKeyRange(childShard.getHashKeyRange()));
return newLease;
}

View file

@ -243,7 +243,7 @@ class ShardConsumer {
this(shardInfo, streamConfig, checkpoint, recordProcessor, recordProcessorCheckpointer, leaseCoordinator,
parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory,
backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, kinesisDataFetcher, retryGetRecordsInSeconds,
maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, LeaseCleanupManager.createOrGetInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, LeaseCleanupManager.newInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
Executors.newSingleThreadScheduledExecutor(), metricsFactory, config.shouldCleanupLeasesUponShardCompletion(),
config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(),
config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords()));

View file

@ -96,9 +96,9 @@ public class Worker implements Runnable {
// Default configs for periodic shard sync
private static final int SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC = 0;
private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; //Default for KCL.
static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L;
static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L;
static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
static long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L;
static long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L;
static long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
private static final LeaseCleanupValidator DEFAULT_LEASE_CLEANUP_VALIDATOR = new KinesisLeaseCleanupValidator();
@ -576,7 +576,7 @@ public class Worker implements Runnable {
this.workerStateChangeListener = workerStateChangeListener;
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
createShardSyncStrategy(config.getShardSyncStrategyType(), leaderDecider, periodicShardSyncManager);
this.leaseCleanupManager = LeaseCleanupManager.createOrGetInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
this.leaseCleanupManager = LeaseCleanupManager.newInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
Executors.newSingleThreadScheduledExecutor(), metricsFactory, cleanupLeasesUponShardCompletion,
config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(),
config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords());

View file

@ -68,6 +68,11 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.getPendingCheckpoint().getSubSequenceNumber()));
}
if(lease.getHashKeyRange() != null) {
result.put(STARTING_HASH_KEY, DynamoUtils.createAttributeValue(lease.getHashKeyRange().serializedStartingHashKey()));
result.put(ENDING_HASH_KEY, DynamoUtils.createAttributeValue(lease.getHashKeyRange().serializedEndingHashKey()));
}
return result;
}
@ -92,6 +97,12 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
);
}
final String startingHashKey, endingHashKey;
if (!Strings.isNullOrEmpty(startingHashKey = DynamoUtils.safeGetString(dynamoRecord, STARTING_HASH_KEY))
&& !Strings.isNullOrEmpty(endingHashKey = DynamoUtils.safeGetString(dynamoRecord, ENDING_HASH_KEY))) {
result.setHashKeyRange(HashKeyRangeForLease.deserialize(startingHashKey, endingHashKey));
}
return result;
}
@ -163,6 +174,11 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
result.put(CHILD_SHARD_IDS_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getChildShardIds()), AttributeAction.PUT));
}
if(lease.getHashKeyRange() != null) {
result.put(STARTING_HASH_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getHashKeyRange().serializedStartingHashKey()), AttributeAction.PUT));
result.put(ENDING_HASH_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getHashKeyRange().serializedEndingHashKey()), AttributeAction.PUT));
}
if (lease.getPendingCheckpoint() != null && !lease.getPendingCheckpoint().getSequenceNumber().isEmpty()) {
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getPendingCheckpoint().getSequenceNumber()), AttributeAction.PUT));
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getPendingCheckpoint().getSubSequenceNumber()), AttributeAction.PUT));
@ -181,7 +197,10 @@ public class KinesisClientLeaseSerializer implements ILeaseSerializer<KinesisCli
switch (updateField) {
case CHILD_SHARDS:
// TODO: Implement update fields for child shards
if (!CollectionUtils.isNullOrEmpty(lease.getChildShardIds())) {
result.put(CHILD_SHARD_IDS_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(
lease.getChildShardIds()), AttributeAction.PUT));
}
break;
case HASH_KEY_RANGE:
if (lease.getHashKeyRange() != null) {

View file

@ -15,6 +15,7 @@ package com.amazonaws.services.kinesis.leases.impl;
* limitations under the License.
*/
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
@ -81,10 +82,8 @@ public class LeaseCleanupManager {
@Getter
private volatile boolean isRunning = false;
private static LeaseCleanupManager instance;
/**
* Factory method to return a singleton instance of {@link LeaseCleanupManager}.
* Method to return a new instance of {@link LeaseCleanupManager}.
* @param kinesisProxy
* @param leaseManager
* @param deletionThreadPool
@ -96,17 +95,13 @@ public class LeaseCleanupManager {
* @param maxRecords
* @return
*/
public static LeaseCleanupManager createOrGetInstance(IKinesisProxy kinesisProxy, ILeaseManager leaseManager,
ScheduledExecutorService deletionThreadPool, IMetricsFactory metricsFactory,
boolean cleanupLeasesUponShardCompletion, long leaseCleanupIntervalMillis,
long completedLeaseCleanupIntervalMillis, long garbageLeaseCleanupIntervalMillis,
int maxRecords) {
if (instance == null) {
instance = new LeaseCleanupManager(kinesisProxy, leaseManager, deletionThreadPool, metricsFactory, cleanupLeasesUponShardCompletion,
leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords);
}
return instance;
public static LeaseCleanupManager newInstance(IKinesisProxy kinesisProxy, ILeaseManager leaseManager,
ScheduledExecutorService deletionThreadPool, IMetricsFactory metricsFactory,
boolean cleanupLeasesUponShardCompletion, long leaseCleanupIntervalMillis,
long completedLeaseCleanupIntervalMillis, long garbageLeaseCleanupIntervalMillis,
int maxRecords) {
return new LeaseCleanupManager(kinesisProxy, leaseManager, deletionThreadPool, metricsFactory, cleanupLeasesUponShardCompletion,
leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords);
}
/**
@ -193,6 +188,10 @@ public class LeaseCleanupManager {
if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
LOG.error("No child shards returned from service for shard " + shardInfo.getShardId());
// If no children shard is found in DDB and from service, then do not delete the lease
throw new InvalidStateException("No child shards found for this supposedly " +
"closed shard in both local DDB and in service " + shardInfo.getShardId());
} else {
wereChildShardsPresent = true;
updateLeaseWithChildShards(leasePendingDeletion, childShardKeys);
@ -296,7 +295,7 @@ public class LeaseCleanupManager {
final KinesisClientLease updatedLease = leasePendingDeletion.lease();
updatedLease.setChildShardIds(childShardKeys);
leaseManager.updateLease(updatedLease);
leaseManager.updateLeaseWithMetaInfo(updatedLease, UpdateField.CHILD_SHARDS);
}
@VisibleForTesting

View file

@ -624,7 +624,6 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
request.setExpected(serializer.getDynamoExistentExpectation(lease.getLeaseKey()));
Map<String, AttributeValueUpdate> updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField);
updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease));
request.setAttributeUpdates(updates);
try {

View file

@ -93,6 +93,8 @@ public class ShardSyncerTest {
private LeaseManager<KinesisClientLease> leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient, KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE);
protected static final KinesisLeaseCleanupValidator leaseCleanupValidator = new KinesisLeaseCleanupValidator();
private static final KinesisShardSyncer shardSyncer = new KinesisShardSyncer(leaseCleanupValidator);
private static final HashKeyRange hashKeyRange = new HashKeyRange().withStartingHashKey("0").withEndingHashKey("10");
/**
* Old/Obsolete max value of a sequence number (2^128 -1).
*/
@ -154,10 +156,10 @@ public class ShardSyncerTest {
SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
String shardId0 = "shardId-0";
shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange));
shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, hashKeyRange));
String shardId1 = "shardId-1";
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, hashKeyRange));
final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases);
@ -183,16 +185,16 @@ public class ShardSyncerTest {
SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
String shardId0 = "shardId-0";
shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange));
shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, hashKeyRange));
String shardId1 = "shardId-1";
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, hashKeyRange));
String shardId2 = "shardId-2";
shards.add(ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange));
shards.add(ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange, hashKeyRange));
String shardIdWithLease = "shardId-3";
shards.add(ShardObjectHelper.newShard(shardIdWithLease, shardIdWithLease, null, sequenceRange));
shards.add(ShardObjectHelper.newShard(shardIdWithLease, shardIdWithLease, null, sequenceRange, hashKeyRange));
currentLeases.add(newLease(shardIdWithLease));
@ -699,9 +701,9 @@ public class ShardSyncerTest {
SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
String shardId0 = "shardId-0";
shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange));
shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, hashKeyRange));
String shardId1 = "shardId-1";
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, hashKeyRange));
File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1");
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
@ -731,10 +733,10 @@ public class ShardSyncerTest {
SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
String shardId0 = "shardId-0";
shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange));
shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange, hashKeyRange));
String shardId1 = "shardId-1";
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange, hashKeyRange));
Set<InitialPositionInStreamExtended> initialPositions = new HashSet<InitialPositionInStreamExtended>();
initialPositions.add(INITIAL_POSITION_LATEST);
@ -769,17 +771,20 @@ public class ShardSyncerTest {
shardsWithoutLeases.add(ShardObjectHelper.newShard("shardId-0",
null,
null,
ShardObjectHelper.newSequenceNumberRange("303", "404")));
ShardObjectHelper.newSequenceNumberRange("303", "404"),
hashKeyRange));
final String lastShardId = "shardId-1";
shardsWithoutLeases.add(ShardObjectHelper.newShard(lastShardId,
null,
null,
ShardObjectHelper.newSequenceNumberRange("405", null)));
ShardObjectHelper.newSequenceNumberRange("405", null),
hashKeyRange));
shardsWithLeases.add(ShardObjectHelper.newShard("shardId-2",
null,
null,
ShardObjectHelper.newSequenceNumberRange("202", "302")));
ShardObjectHelper.newSequenceNumberRange("202", "302"),
hashKeyRange));
currentLeases.add(newLease("shardId-2"));
final List<Shard> allShards =
@ -805,12 +810,14 @@ public class ShardSyncerTest {
shards.add(ShardObjectHelper.newShard(firstShardId,
null,
null,
ShardObjectHelper.newSequenceNumberRange("303", "404")));
ShardObjectHelper.newSequenceNumberRange("303", "404"),
hashKeyRange));
final String lastShardId = "shardId-1";
shards.add(ShardObjectHelper.newShard(lastShardId,
null,
null,
ShardObjectHelper.newSequenceNumberRange("405", null)));
ShardObjectHelper.newSequenceNumberRange("405", null),
hashKeyRange));
final LeaseSynchronizer leaseSynchronizer = getLeaseSynchronizer(shards, currentLeases);
@ -1969,14 +1976,14 @@ public class ShardSyncerTest {
Map<String, Shard> kinesisShards = new HashMap<String, Shard>();
String parentShardId = "shardId-parent";
kinesisShards.put(parentShardId, ShardObjectHelper.newShard(parentShardId, null, null, null));
kinesisShards.put(parentShardId, ShardObjectHelper.newShard(parentShardId, null, null, null, hashKeyRange));
shardIdsOfCurrentLeases.add(parentShardId);
String adjacentParentShardId = "shardId-adjacentParent";
kinesisShards.put(adjacentParentShardId, ShardObjectHelper.newShard(adjacentParentShardId, null, null, null));
kinesisShards.put(adjacentParentShardId, ShardObjectHelper.newShard(adjacentParentShardId, null, null, null, hashKeyRange));
String shardId = "shardId-9-1";
Shard shard = ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null);
Shard shard = ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null, hashKeyRange);
kinesisShards.put(shardId, shard);
final MemoizationContext memoizationContext = new MemoizationContext();
@ -2097,6 +2104,7 @@ public class ShardSyncerTest {
String adjacentParentShardId = "shardId-adjacentParent";
shard.setParentShardId(parentShardId);
shard.setAdjacentParentShardId(adjacentParentShardId);
shard.setHashKeyRange(hashKeyRange);
KinesisClientLease lease = shardSyncer.newKCLLease(shard);
Assert.assertEquals(shardId, lease.getLeaseKey());

View file

@ -204,6 +204,10 @@ public class WorkerTest {
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
when(leaseCoordinator.getLeaseManager()).thenReturn(mock(ILeaseManager.class));
when(streamConfig.getStreamProxy()).thenReturn(kinesisProxy);
Worker.MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 10;
Worker.MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 50;
Worker.LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 10;
}
// CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES

View file

@ -35,6 +35,7 @@ import java.util.Map;
import java.util.Set;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.ShardFilter;
import com.amazonaws.util.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@ -408,11 +409,13 @@ public class KinesisLocalFileProxy implements IKinesisProxy {
ChildShard leftChild = new ChildShard();
leftChild.setShardId("shardId-1");
leftChild.setParentShards(parentShards);
leftChild.setHashKeyRange(new HashKeyRange().withStartingHashKey("0").withEndingHashKey("10"));
childShards.add(leftChild);
ChildShard rightChild = new ChildShard();
rightChild.setShardId("shardId-2");
rightChild.setParentShards(parentShards);
rightChild.setHashKeyRange(new HashKeyRange().withStartingHashKey("11").withEndingHashKey(MAX_HASHKEY_VALUE.toString()));
childShards.add(rightChild);
return childShards;
}

View file

@ -163,6 +163,7 @@ public class KinesisLocalFileDataCreator {
HashKeyRange hashKeyRange = new HashKeyRange();
hashKeyRange.setStartingHashKey(hashKeyRangeStart.toString());
hashKeyRange.setEndingHashKey(hashKeyRangeEnd.toString());
shard.setHashKeyRange(hashKeyRange);
shards.add(shard);
}