Lease Recovery and Blockoing parent bug fix

This commit is contained in:
Ashwin Giridharan 2020-04-30 14:03:39 -07:00
parent eb7a60ec62
commit 5cd40e4718
7 changed files with 164 additions and 37 deletions

View file

@ -18,20 +18,32 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* The top level orchestrator for coordinating the periodic shard sync related
@ -46,25 +58,33 @@ class PeriodicShardSyncManager {
private final String workerId;
private final LeaderDecider leaderDecider;
private final LeaseRefresher leaseRefresher;
private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap;
private final Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider;
private final ScheduledExecutorService shardSyncThreadPool;
private final boolean isMultiStreamingMode;
private boolean isRunning;
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map<StreamIdentifier, StreamConfig> currentStreamConfigMap,
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider) {
this(workerId, leaderDecider, currentStreamConfigMap, shardSyncTaskManagerProvider, Executors.newSingleThreadScheduledExecutor());
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher,
Map<StreamIdentifier, StreamConfig> currentStreamConfigMap,
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider, boolean isMultiStreamingMode) {
this(workerId, leaderDecider, leaseRefresher, currentStreamConfigMap, shardSyncTaskManagerProvider,
Executors.newSingleThreadScheduledExecutor(), isMultiStreamingMode);
}
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, Map<StreamIdentifier, StreamConfig> currentStreamConfigMap,
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider, ScheduledExecutorService shardSyncThreadPool) {
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher,
Map<StreamIdentifier, StreamConfig> currentStreamConfigMap,
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider,
ScheduledExecutorService shardSyncThreadPool, boolean isMultiStreamingMode) {
Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager.");
Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager.");
this.workerId = workerId;
this.leaderDecider = leaderDecider;
this.leaseRefresher = leaseRefresher;
this.currentStreamConfigMap = currentStreamConfigMap;
this.shardSyncTaskManagerProvider = shardSyncTaskManagerProvider;
this.shardSyncThreadPool = shardSyncThreadPool;
this.isMultiStreamingMode = isMultiStreamingMode;
}
public synchronized TaskResult start() {
@ -116,24 +136,53 @@ class PeriodicShardSyncManager {
private void runShardSync() {
if (leaderDecider.isLeader(workerId)) {
try {
final Map<StreamIdentifier, List<Lease>> streamToLeasesMap = getStreamToLeasesMap(currentStreamConfigMap.keySet());
for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) {
final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider.apply(streamConfigEntry.getValue());
if (!shardSyncTaskManager.syncShardAndLeaseInfo()) {
log.warn("Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.",
log.warn(
"Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.",
shardSyncTaskManager.shardDetector().streamIdentifier().streamName());
}
}
} catch (Exception e) {
// TODO : Log
}
} else {
log.debug(String.format("WorkerId %s is not a leader, not running the shard sync task", workerId));
}
}
private Map<StreamIdentifier, List<Lease>> getStreamToLeasesMap(final Set<StreamIdentifier> streamIdentifiersToFilter)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
final List<Lease> leases = leaseRefresher.listLeases();
if(!isMultiStreamingMode) {
Validate.isTrue(streamIdentifiersToFilter.size() == 1);
return Collections.singletonMap(streamIdentifiersToFilter.iterator().next(), leases);
} else {
final Map<StreamIdentifier, List<Lease>> streamToLeasesMap = new HashMap<>();
for(Lease lease : leases) {
StreamIdentifier streamIdentifier = StreamIdentifier
.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier());
if(streamIdentifiersToFilter.contains(streamIdentifier)) {
streamToLeasesMap.computeIfAbsent(streamIdentifier, s -> new ArrayList<>()).add(lease);
}
}
return streamToLeasesMap;
}
}
/**
* Checks if the entire hash range is covered
* @return true if covered, false otherwise
*/
public boolean hashRangeCovered() {
// TODO: Implement method
return true;
public boolean isHashRangeComplete(List<Lease> leases) {
if(CollectionUtils.isNullOrEmpty(leases)) {
return false;
} else {
// leases.stream().filter(lease -> lease.checkpoint().isShardEnd())
return false;
}
}
}

View file

@ -111,7 +111,6 @@ public class Scheduler implements Runnable {
private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L;
private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L;
private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L;
private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L;
private static final String MULTI_STREAM_TRACKER = "MultiStreamTracker";
private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count";
@ -289,8 +288,8 @@ public class Scheduler implements Runnable {
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode);
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(
leaseManagementConfig.workerIdentifier(), leaderDecider, currentStreamConfigMap,
shardSyncTaskManagerProvider);
leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap,
shardSyncTaskManagerProvider, isMultiStreamMode);
}
/**
@ -351,11 +350,10 @@ public class Scheduler implements Runnable {
} else {
log.info("LeaseCoordinator is already running. No need to start it.");
}
log.info("Scheduling periodicShardSync)");
log.info("Scheduling periodicShardSync");
// leaderElectedPeriodicShardSyncManager.start(shardSyncTasks);
// TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged
// TODO: Determine if waitUntilHashRangeCovered() is needed.
//waitUntilHashRangeCovered();
streamSyncWatch.start();
isDone = true;
} catch (LeasingException e) {
@ -398,18 +396,6 @@ public class Scheduler implements Runnable {
return shouldInitiateLeaseSync;
}
private void waitUntilHashRangeCovered() throws InterruptedException {
// TODO: Currently this call is not in use. We may need to implement this method later. Created SIM to track the work: https://sim.amazon.com/issues/KinesisLTR-202
// TODO: For future implementation, streamToShardSyncTaskManagerMap might not contain the most up to date snapshot of active streams.
// Should use currentStreamConfigMap to determine the streams to check.
while (!leaderElectedPeriodicShardSyncManager.hashRangeCovered()) {
// wait until entire hash range is covered
log.info("Hash range is not covered yet. Checking again in {} ms", HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS);
Thread.sleep(HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS);
}
}
@VisibleForTesting
void runProcessLoop() {
try {

View file

@ -45,7 +45,6 @@ import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
@ -341,7 +340,7 @@ public class HierarchicalShardSyncer {
"Stream " + streamName + " is not in ACTIVE OR UPDATING state - will retry getting the shard list.");
}
if (hashRangeOfShardsIsComplete(shards)) {
if (isHashRangeOfShardsComplete(shards)) {
return shards;
}
@ -359,7 +358,7 @@ public class HierarchicalShardSyncer {
" is not in ACTIVE OR UPDATING state - will retry getting the shard list."));
}
private static boolean hashRangeOfShardsIsComplete(@NonNull List<Shard> shards) {
private static boolean isHashRangeOfShardsComplete(@NonNull List<Shard> shards) {
if (shards.isEmpty()) {
throw new IllegalStateException("No shards found when attempting to validate complete hash range.");

View file

@ -105,7 +105,7 @@ public interface LeaseRefresher {
* @throws ProvisionedThroughputException if DynamoDB get fails due to lack of capacity
* @throws DependencyException if DynamoDB get fails in an unexpected way
*
* @return lease for the specified shardId, or null if one doesn't exist
* @return lease for the specified leaseKey, or null if one doesn't exist
*/
Lease getLease(String leaseKey) throws DependencyException, InvalidStateException, ProvisionedThroughputException;

View file

@ -25,8 +25,6 @@ import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.function.Function;
/**
* Task to block until processing of all data records in the parent shard(s) is completed.
* We check if we have checkpoint(s) for the parent shard(s).

View file

@ -141,6 +141,11 @@ public class ExtendedSequenceNumber implements Comparable<ExtendedSequenceNumber
return subSequenceNumber;
}
public boolean isShardEnd() {
return sequenceNumber.equals(SentinelCheckpoint.SHARD_END.toString());
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();

View file

@ -22,6 +22,7 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
@ -40,6 +41,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
public class BlockOnParentShardTaskTest {
private final long backoffTimeInMillis = 50L;
private final String shardId = "shardId-97";
private final String streamId = "123:stream:146";
private final String concurrencyToken = "testToken";
private final List<String> emptyParentShardIds = new ArrayList<String>();
private ShardInfo shardInfo;
@ -107,6 +109,50 @@ public class BlockOnParentShardTaskTest {
assertNull(result.getException());
}
/**
* Test call() when there are 1-2 parent shards that have been fully processed.
* @throws ProvisionedThroughputException
* @throws InvalidStateException
* @throws DependencyException
*/
@Test
public final void testCallWhenParentsHaveFinishedMultiStream()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
ShardInfo shardInfo = null;
BlockOnParentShardTask task = null;
String parent1LeaseKey = streamId + ":" + "shardId-1";
String parent2LeaseKey = streamId + ":" + "shardId-2";
String parent1ShardId = "shardId-1";
String parent2ShardId = "shardId-2";
List<String> parentShardIds = new ArrayList<>();
TaskResult result = null;
Lease parent1Lease = new Lease();
parent1Lease.checkpoint(ExtendedSequenceNumber.SHARD_END);
Lease parent2Lease = new Lease();
parent2Lease.checkpoint(ExtendedSequenceNumber.SHARD_END);
LeaseRefresher leaseRefresher = mock(LeaseRefresher.class);
when(leaseRefresher.getLease(parent1LeaseKey)).thenReturn(parent1Lease);
when(leaseRefresher.getLease(parent2LeaseKey)).thenReturn(parent2Lease);
// test single parent
parentShardIds.add(parent1ShardId);
shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON,
streamId);
task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis);
result = task.call();
assertNull(result.getException());
// test two parents
parentShardIds.add(parent2ShardId);
shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, streamId);
task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis);
result = task.call();
assertNull(result.getException());
}
/**
* Test call() when there are 1-2 parent shards that have NOT been fully processed.
* @throws ProvisionedThroughputException
@ -149,6 +195,50 @@ public class BlockOnParentShardTaskTest {
assertNotNull(result.getException());
}
/**
* Test call() when there are 1-2 parent shards that have NOT been fully processed.
* @throws ProvisionedThroughputException
* @throws InvalidStateException
* @throws DependencyException
*/
@Test
public final void testCallWhenParentsHaveNotFinishedMultiStream()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
ShardInfo shardInfo = null;
BlockOnParentShardTask task = null;
String parent1LeaseKey = streamId + ":" + "shardId-1";
String parent2LeaseKey = streamId + ":" + "shardId-2";
String parent1ShardId = "shardId-1";
String parent2ShardId = "shardId-2";
List<String> parentShardIds = new ArrayList<>();
TaskResult result = null;
Lease parent1Lease = new Lease();
parent1Lease.checkpoint(ExtendedSequenceNumber.LATEST);
Lease parent2Lease = new Lease();
// mock a sequence number checkpoint
parent2Lease.checkpoint(new ExtendedSequenceNumber("98182584034"));
LeaseRefresher leaseRefresher = mock(LeaseRefresher.class);
when(leaseRefresher.getLease(parent1LeaseKey)).thenReturn(parent1Lease);
when(leaseRefresher.getLease(parent2LeaseKey)).thenReturn(parent2Lease);
// test single parent
parentShardIds.add(parent1ShardId);
shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, streamId);
task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis);
result = task.call();
assertNotNull(result.getException());
// test two parents
parentShardIds.add(parent2ShardId);
shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, streamId);
task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis);
result = task.call();
assertNotNull(result.getException());
}
/**
* Test call() with 1 parent shard before and after it is completely processed.
* @throws ProvisionedThroughputException