Testcase fixes

This commit is contained in:
Ashwin Giridharan 2020-02-18 22:14:39 -08:00
parent fd0e96a5d1
commit 11c0ee7556
5 changed files with 97 additions and 56 deletions

View file

@ -16,7 +16,6 @@
package software.amazon.kinesis.coordinator;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -34,7 +33,6 @@ import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.reactivex.plugins.RxJavaPlugins;
import lombok.AccessLevel;
import lombok.Getter;
@ -42,12 +40,10 @@ import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.awssdk.utils.Either;
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
@ -124,7 +120,8 @@ public class Scheduler implements Runnable {
private final MetricsFactory metricsFactory;
private final long failoverTimeMillis;
private final long taskBackoffTimeMillis;
private final Either<MultiStreamTracker, StreamConfig> applicationStreamTracker;
private final Either<MultiStreamTracker, StreamConfig> appStreamTracker;
private final Map<String, StreamConfig> currentStreamConfigMap;
private final long listShardsBackoffTimeMillis;
private final int maxListShardsRetryAttempts;
private final LeaseRefresher leaseRefresher;
@ -186,15 +183,20 @@ public class Scheduler implements Runnable {
this.applicationName = this.coordinatorConfig.applicationName();
final MultiStreamTracker multiStreamTracker = this.retrievalConfig.multiStreamTracker();
if(multiStreamTracker == null) {
this.applicationStreamTracker = Either.right(new StreamConfig(this.retrievalConfig.streamName(),
this.retrievalConfig.initialPositionInStreamExtended()));
final StreamConfig streamConfig = new StreamConfig(this.retrievalConfig.streamName(),
this.retrievalConfig.initialPositionInStreamExtended());
this.appStreamTracker = Either.right(streamConfig);
this.currentStreamConfigMap = new HashMap<String, StreamConfig>() {{
put(streamConfig.streamName(), streamConfig);
}};
} else {
this.applicationStreamTracker = Either.left(multiStreamTracker);
this.appStreamTracker = Either.left(multiStreamTracker);
this.currentStreamConfigMap = multiStreamTracker.streamConfigMap();
}
this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts();
this.metricsFactory = this.metricsConfig.metricsFactory();
// Determine leaseSerializer based on availability of MultiStreamTracker.
final LeaseSerializer leaseSerializer = this.applicationStreamTracker.map(mst -> true, sc -> false) ?
final LeaseSerializer leaseSerializer = this.appStreamTracker.map(mst -> true, sc -> false) ?
new DynamoDBMultiStreamLeaseSerializer() :
new DynamoDBLeaseSerializer();
this.leaseCoordinator = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer)
@ -218,8 +220,8 @@ public class Scheduler implements Runnable {
// TODO : Halo : Handle case of no StreamConfig present in streamConfigMap() for the supplied streamName.
// TODO : Pass the immutable map here instead of using mst.streamConfigMap()
this.shardSyncTaskManagerProvider = streamName -> this.leaseManagementConfig
.leaseManagementFactory(leaseSerializer).createShardSyncTaskManager(this.metricsFactory,
applicationStreamTracker.map(mst -> mst.streamConfigMap().get(streamName), sc -> sc));
.leaseManagementFactory(leaseSerializer)
.createShardSyncTaskManager(this.metricsFactory, this.currentStreamConfigMap.get(streamName));
this.shardPrioritization = this.coordinatorConfig.shardPrioritization();
this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
this.skipShardSyncAtWorkerInitializationIfLeasesExist =
@ -245,6 +247,7 @@ public class Scheduler implements Runnable {
this.shardDetectorProvider = streamName -> createOrGetShardSyncTaskManager(streamName).shardDetector();
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
// TODO : Halo : Check if this needs to be per stream.
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer();
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
}
@ -293,11 +296,9 @@ public class Scheduler implements Runnable {
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
// TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing
// TODO: for already synced streams
final Map<String, StreamConfig> streamConfigMap = applicationStreamTracker
.map(mst -> mst.streamConfigMap(), sc -> Collections.singletonMap(sc.streamName(), sc));
for(String streamName : streamConfigMap.keySet().stream().collect(Collectors.toList())) {
for(String streamName : currentStreamConfigMap.keySet().stream().collect(Collectors.toList())) {
log.info("Syncing Kinesis shard info");
final StreamConfig streamConfig = streamConfigMap.get(streamName);
final StreamConfig streamConfig = currentStreamConfigMap.get(streamName);
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamName),
leaseRefresher, streamConfig.initialPositionInStreamExtended(),
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L,
@ -348,22 +349,27 @@ public class Scheduler implements Runnable {
@VisibleForTesting
void runProcessLoop() {
try {
boolean foundCompletedShard = false;
Set<ShardInfo> assignedShards = new HashSet<>();
final Set<ShardInfo> completedShards = new HashSet<>();
for (ShardInfo shardInfo : getShardInfoForAssignments()) {
ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo,
processorConfig.shardRecordProcessorFactory());
if (shardConsumer.isShutdown() && shardConsumer.shutdownReason().equals(ShutdownReason.SHARD_END)) {
foundCompletedShard = true;
completedShards.add(shardInfo);
} else {
shardConsumer.executeLifecycle();
}
assignedShards.add(shardInfo);
}
if (foundCompletedShard) {
shardSyncTaskManager.syncShardAndLeaseInfo();
for (ShardInfo completedShard : completedShards) {
final String streamName = completedShard.streamName()
.orElse(appStreamTracker.map(mst -> null, sc -> sc.streamName()));
Validate.notEmpty(streamName, "Stream name should not be null");
if (createOrGetShardSyncTaskManager(streamName).syncShardAndLeaseInfo()) {
log.info("Found completed shard, initiated new ShardSyncTak for " + completedShard.toString());
}
}
// clean up shard consumers for unassigned shards
@ -630,12 +636,20 @@ public class Scheduler implements Runnable {
RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, metricsFactory);
ShardRecordProcessorCheckpointer checkpointer = coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo,
checkpoint);
// The only case where streamName is not available will be when multistreamtracker not set. In this case,
// get the default stream name for the single stream application.
final String streamName = shardInfo.streamName().orElse(appStreamTracker.map(mst -> null, sc -> sc.streamName()));
Validate.notEmpty(streamName, "StreamName should not be empty");
// Irrespective of single stream app or multi stream app, streamConfig should always be available.
// TODO: Halo : if not available, construct a default config ?
final StreamConfig streamConfig = currentStreamConfigMap.get(streamName);
Validate.notNull(streamConfig, "StreamConfig should not be empty");
ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo,
shardInfo.streamName(),
streamConfig.streamName(),
leaseCoordinator,
executorService,
cache,
shardRecordProcessorFactory.shardRecordProcessor(),
shardRecordProcessorFactory.shardRecordProcessor(streamName),
checkpoint,
checkpointer,
parentShardPollIntervalMillis,
@ -645,10 +659,10 @@ public class Scheduler implements Runnable {
maxListShardsRetryAttempts,
processorConfig.callProcessRecordsEvenForEmptyRecordList(),
shardConsumerDispatchPollIntervalMillis,
initialPosition,
streamConfig.initialPositionInStreamExtended(),
cleanupLeasesUponShardCompletion,
ignoreUnexpetedChildShards,
shardDetector,
shardDetectorProvider.apply(streamConfig.streamName()),
aggregatorUtil,
hierarchicalShardSyncer,
metricsFactory);

View file

@ -268,36 +268,40 @@ public class LeaseManagementConfig {
@Deprecated
public LeaseManagementFactory leaseManagementFactory() {
Validate.notEmpty(streamName(), "Stream name is empty");
return new DynamoDBLeaseManagementFactory(kinesisClient(),
streamName(),
dynamoDBClient(),
tableName(),
workerIdentifier(),
executorService(),
initialPositionInStream(),
failoverTimeMillis(),
epsilonMillis(),
maxLeasesForWorker(),
maxLeasesToStealAtOneTime(),
maxLeaseRenewalThreads(),
cleanupLeasesUponShardCompletion(),
ignoreUnexpectedChildShards(),
shardSyncIntervalMillis(),
consistentReads(),
listShardsBackoffTimeInMillis(),
maxListShardsRetryAttempts(),
maxCacheMissesBeforeReload(),
listShardsCacheAllowedAgeInSeconds(),
cacheMissWarningModulus(),
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(),
tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode());
if (leaseManagementFactory == null) {
Validate.notEmpty(streamName(), "Stream name is empty");
leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(),
streamName(),
dynamoDBClient(),
tableName(),
workerIdentifier(),
executorService(),
initialPositionInStream(),
failoverTimeMillis(),
epsilonMillis(),
maxLeasesForWorker(),
maxLeasesToStealAtOneTime(),
maxLeaseRenewalThreads(),
cleanupLeasesUponShardCompletion(),
ignoreUnexpectedChildShards(),
shardSyncIntervalMillis(),
consistentReads(),
listShardsBackoffTimeInMillis(),
maxListShardsRetryAttempts(),
maxCacheMissesBeforeReload(),
listShardsCacheAllowedAgeInSeconds(),
cacheMissWarningModulus(),
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(),
tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode());
}
return leaseManagementFactory;
}
public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer) {
return new DynamoDBLeaseManagementFactory(kinesisClient(),
if(leaseManagementFactory == null) {
leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(),
dynamoDBClient(),
tableName(),
workerIdentifier(),
@ -323,6 +327,13 @@ public class LeaseManagementConfig {
dynamoDbRequestTimeout(),
billingMode(),
leaseSerializer);
}
return leaseManagementFactory;
}
public LeaseManagementConfig leaseManagementFactory(final LeaseManagementFactory leaseManagementFactory) {
this.leaseManagementFactory = leaseManagementFactory;
return this;
}
// private InitialPositionInStreamExtended getInitialPositionExtendedForStream(String streamName) {

View file

@ -105,7 +105,7 @@ public class ShardInfo {
@Override
public int hashCode() {
return new HashCodeBuilder()
.append(concurrencyToken).append(parentShardIds).append(shardId).append(streamName).toHashCode();
.append(concurrencyToken).append(parentShardIds).append(shardId).append(streamName.orElse("")).toHashCode();
}
/**
@ -130,7 +130,7 @@ public class ShardInfo {
ShardInfo other = (ShardInfo) obj;
return new EqualsBuilder().append(concurrencyToken, other.concurrencyToken)
.append(parentShardIds, other.parentShardIds).append(shardId, other.shardId)
.append(streamName, other.streamName).isEquals();
.append(streamName.orElse(""), other.streamName.orElse("")).isEquals();
}

View file

@ -24,8 +24,7 @@ public interface ShardRecordProcessorFactory {
*
* @return
*/
// TODO : Halo : Reenable
// ShardRecordProcessor shardRecordProcessor();
ShardRecordProcessor shardRecordProcessor();
/**
* Returns a new instance of the ShardRecordProcessor for a stream
@ -33,6 +32,6 @@ public interface ShardRecordProcessorFactory {
* @return ShardRecordProcessor
*/
default ShardRecordProcessor shardRecordProcessor(String streamName) {
throw new UnsupportedOperationException();
return shardRecordProcessor();
}
}

View file

@ -52,6 +52,7 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.CheckpointFactory;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.exceptions.KinesisClientLibNonRetryableException;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig;
@ -498,6 +499,11 @@ public class SchedulerTest {
}
};
}
@Override
public ShardRecordProcessor shardRecordProcessor(String streamName) {
return shardRecordProcessor();
}
}
private class TestKinesisLeaseManagementFactory implements LeaseManagementFactory {
@ -511,6 +517,12 @@ public class SchedulerTest {
return shardSyncTaskManager;
}
@Override
public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory,
StreamConfig streamConfig) {
return shardSyncTaskManager;
}
@Override
public DynamoDBLeaseRefresher createLeaseRefresher() {
return dynamoDBLeaseRefresher;
@ -520,6 +532,11 @@ public class SchedulerTest {
public ShardDetector createShardDetector() {
return shardDetector;
}
@Override
public ShardDetector createShardDetector(StreamConfig streamConfig) {
return shardDetector;
}
}
private class TestKinesisCheckpointFactory implements CheckpointFactory {