Testcase fixes

This commit is contained in:
Ashwin Giridharan 2020-02-18 22:14:39 -08:00
parent 781a2339b6
commit 484b6cfcd9
5 changed files with 97 additions and 56 deletions

View file

@ -16,7 +16,6 @@
package software.amazon.kinesis.coordinator; package software.amazon.kinesis.coordinator;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -34,7 +33,6 @@ import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.plugins.RxJavaPlugins;
import lombok.AccessLevel; import lombok.AccessLevel;
import lombok.Getter; import lombok.Getter;
@ -42,12 +40,10 @@ import lombok.NoArgsConstructor;
import lombok.NonNull; import lombok.NonNull;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.awssdk.utils.Either; import software.amazon.awssdk.utils.Either;
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
@ -124,7 +120,8 @@ public class Scheduler implements Runnable {
private final MetricsFactory metricsFactory; private final MetricsFactory metricsFactory;
private final long failoverTimeMillis; private final long failoverTimeMillis;
private final long taskBackoffTimeMillis; private final long taskBackoffTimeMillis;
private final Either<MultiStreamTracker, StreamConfig> applicationStreamTracker; private final Either<MultiStreamTracker, StreamConfig> appStreamTracker;
private final Map<String, StreamConfig> currentStreamConfigMap;
private final long listShardsBackoffTimeMillis; private final long listShardsBackoffTimeMillis;
private final int maxListShardsRetryAttempts; private final int maxListShardsRetryAttempts;
private final LeaseRefresher leaseRefresher; private final LeaseRefresher leaseRefresher;
@ -186,15 +183,20 @@ public class Scheduler implements Runnable {
this.applicationName = this.coordinatorConfig.applicationName(); this.applicationName = this.coordinatorConfig.applicationName();
final MultiStreamTracker multiStreamTracker = this.retrievalConfig.multiStreamTracker(); final MultiStreamTracker multiStreamTracker = this.retrievalConfig.multiStreamTracker();
if(multiStreamTracker == null) { if(multiStreamTracker == null) {
this.applicationStreamTracker = Either.right(new StreamConfig(this.retrievalConfig.streamName(), final StreamConfig streamConfig = new StreamConfig(this.retrievalConfig.streamName(),
this.retrievalConfig.initialPositionInStreamExtended())); this.retrievalConfig.initialPositionInStreamExtended());
this.appStreamTracker = Either.right(streamConfig);
this.currentStreamConfigMap = new HashMap<String, StreamConfig>() {{
put(streamConfig.streamName(), streamConfig);
}};
} else { } else {
this.applicationStreamTracker = Either.left(multiStreamTracker); this.appStreamTracker = Either.left(multiStreamTracker);
this.currentStreamConfigMap = multiStreamTracker.streamConfigMap();
} }
this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts(); this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts();
this.metricsFactory = this.metricsConfig.metricsFactory(); this.metricsFactory = this.metricsConfig.metricsFactory();
// Determine leaseSerializer based on availability of MultiStreamTracker. // Determine leaseSerializer based on availability of MultiStreamTracker.
final LeaseSerializer leaseSerializer = this.applicationStreamTracker.map(mst -> true, sc -> false) ? final LeaseSerializer leaseSerializer = this.appStreamTracker.map(mst -> true, sc -> false) ?
new DynamoDBMultiStreamLeaseSerializer() : new DynamoDBMultiStreamLeaseSerializer() :
new DynamoDBLeaseSerializer(); new DynamoDBLeaseSerializer();
this.leaseCoordinator = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer) this.leaseCoordinator = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer)
@ -218,8 +220,8 @@ public class Scheduler implements Runnable {
// TODO : Halo : Handle case of no StreamConfig present in streamConfigMap() for the supplied streamName. // TODO : Halo : Handle case of no StreamConfig present in streamConfigMap() for the supplied streamName.
// TODO : Pass the immutable map here instead of using mst.streamConfigMap() // TODO : Pass the immutable map here instead of using mst.streamConfigMap()
this.shardSyncTaskManagerProvider = streamName -> this.leaseManagementConfig this.shardSyncTaskManagerProvider = streamName -> this.leaseManagementConfig
.leaseManagementFactory(leaseSerializer).createShardSyncTaskManager(this.metricsFactory, .leaseManagementFactory(leaseSerializer)
applicationStreamTracker.map(mst -> mst.streamConfigMap().get(streamName), sc -> sc)); .createShardSyncTaskManager(this.metricsFactory, this.currentStreamConfigMap.get(streamName));
this.shardPrioritization = this.coordinatorConfig.shardPrioritization(); this.shardPrioritization = this.coordinatorConfig.shardPrioritization();
this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion(); this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
this.skipShardSyncAtWorkerInitializationIfLeasesExist = this.skipShardSyncAtWorkerInitializationIfLeasesExist =
@ -245,6 +247,7 @@ public class Scheduler implements Runnable {
this.shardDetectorProvider = streamName -> createOrGetShardSyncTaskManager(streamName).shardDetector(); this.shardDetectorProvider = streamName -> createOrGetShardSyncTaskManager(streamName).shardDetector();
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards(); this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
// TODO : Halo : Check if this needs to be per stream.
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(); this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer();
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
} }
@ -293,11 +296,9 @@ public class Scheduler implements Runnable {
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
// TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing // TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing
// TODO: for already synced streams // TODO: for already synced streams
final Map<String, StreamConfig> streamConfigMap = applicationStreamTracker for(String streamName : currentStreamConfigMap.keySet().stream().collect(Collectors.toList())) {
.map(mst -> mst.streamConfigMap(), sc -> Collections.singletonMap(sc.streamName(), sc));
for(String streamName : streamConfigMap.keySet().stream().collect(Collectors.toList())) {
log.info("Syncing Kinesis shard info"); log.info("Syncing Kinesis shard info");
final StreamConfig streamConfig = streamConfigMap.get(streamName); final StreamConfig streamConfig = currentStreamConfigMap.get(streamName);
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamName), ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamName),
leaseRefresher, streamConfig.initialPositionInStreamExtended(), leaseRefresher, streamConfig.initialPositionInStreamExtended(),
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L,
@ -348,22 +349,27 @@ public class Scheduler implements Runnable {
@VisibleForTesting @VisibleForTesting
void runProcessLoop() { void runProcessLoop() {
try { try {
boolean foundCompletedShard = false;
Set<ShardInfo> assignedShards = new HashSet<>(); Set<ShardInfo> assignedShards = new HashSet<>();
final Set<ShardInfo> completedShards = new HashSet<>();
for (ShardInfo shardInfo : getShardInfoForAssignments()) { for (ShardInfo shardInfo : getShardInfoForAssignments()) {
ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo,
processorConfig.shardRecordProcessorFactory()); processorConfig.shardRecordProcessorFactory());
if (shardConsumer.isShutdown() && shardConsumer.shutdownReason().equals(ShutdownReason.SHARD_END)) { if (shardConsumer.isShutdown() && shardConsumer.shutdownReason().equals(ShutdownReason.SHARD_END)) {
foundCompletedShard = true; completedShards.add(shardInfo);
} else { } else {
shardConsumer.executeLifecycle(); shardConsumer.executeLifecycle();
} }
assignedShards.add(shardInfo); assignedShards.add(shardInfo);
} }
if (foundCompletedShard) { for (ShardInfo completedShard : completedShards) {
shardSyncTaskManager.syncShardAndLeaseInfo(); final String streamName = completedShard.streamName()
.orElse(appStreamTracker.map(mst -> null, sc -> sc.streamName()));
Validate.notEmpty(streamName, "Stream name should not be null");
if (createOrGetShardSyncTaskManager(streamName).syncShardAndLeaseInfo()) {
log.info("Found completed shard, initiated new ShardSyncTak for " + completedShard.toString());
}
} }
// clean up shard consumers for unassigned shards // clean up shard consumers for unassigned shards
@ -630,12 +636,20 @@ public class Scheduler implements Runnable {
RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, metricsFactory); RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, metricsFactory);
ShardRecordProcessorCheckpointer checkpointer = coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo, ShardRecordProcessorCheckpointer checkpointer = coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo,
checkpoint); checkpoint);
// The only case where streamName is not available will be when multistreamtracker not set. In this case,
// get the default stream name for the single stream application.
final String streamName = shardInfo.streamName().orElse(appStreamTracker.map(mst -> null, sc -> sc.streamName()));
Validate.notEmpty(streamName, "StreamName should not be empty");
// Irrespective of single stream app or multi stream app, streamConfig should always be available.
// TODO: Halo : if not available, construct a default config ?
final StreamConfig streamConfig = currentStreamConfigMap.get(streamName);
Validate.notNull(streamConfig, "StreamConfig should not be empty");
ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo, ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo,
shardInfo.streamName(), streamConfig.streamName(),
leaseCoordinator, leaseCoordinator,
executorService, executorService,
cache, cache,
shardRecordProcessorFactory.shardRecordProcessor(), shardRecordProcessorFactory.shardRecordProcessor(streamName),
checkpoint, checkpoint,
checkpointer, checkpointer,
parentShardPollIntervalMillis, parentShardPollIntervalMillis,
@ -645,10 +659,10 @@ public class Scheduler implements Runnable {
maxListShardsRetryAttempts, maxListShardsRetryAttempts,
processorConfig.callProcessRecordsEvenForEmptyRecordList(), processorConfig.callProcessRecordsEvenForEmptyRecordList(),
shardConsumerDispatchPollIntervalMillis, shardConsumerDispatchPollIntervalMillis,
initialPosition, streamConfig.initialPositionInStreamExtended(),
cleanupLeasesUponShardCompletion, cleanupLeasesUponShardCompletion,
ignoreUnexpetedChildShards, ignoreUnexpetedChildShards,
shardDetector, shardDetectorProvider.apply(streamConfig.streamName()),
aggregatorUtil, aggregatorUtil,
hierarchicalShardSyncer, hierarchicalShardSyncer,
metricsFactory); metricsFactory);

View file

@ -268,8 +268,9 @@ public class LeaseManagementConfig {
@Deprecated @Deprecated
public LeaseManagementFactory leaseManagementFactory() { public LeaseManagementFactory leaseManagementFactory() {
if (leaseManagementFactory == null) {
Validate.notEmpty(streamName(), "Stream name is empty"); Validate.notEmpty(streamName(), "Stream name is empty");
return new DynamoDBLeaseManagementFactory(kinesisClient(), leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(),
streamName(), streamName(),
dynamoDBClient(), dynamoDBClient(),
tableName(), tableName(),
@ -295,9 +296,12 @@ public class LeaseManagementConfig {
hierarchicalShardSyncer(), hierarchicalShardSyncer(),
tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode()); tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode());
} }
return leaseManagementFactory;
}
public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer) { public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer) {
return new DynamoDBLeaseManagementFactory(kinesisClient(), if(leaseManagementFactory == null) {
leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(),
dynamoDBClient(), dynamoDBClient(),
tableName(), tableName(),
workerIdentifier(), workerIdentifier(),
@ -324,6 +328,13 @@ public class LeaseManagementConfig {
billingMode(), billingMode(),
leaseSerializer); leaseSerializer);
} }
return leaseManagementFactory;
}
public LeaseManagementConfig leaseManagementFactory(final LeaseManagementFactory leaseManagementFactory) {
this.leaseManagementFactory = leaseManagementFactory;
return this;
}
// private InitialPositionInStreamExtended getInitialPositionExtendedForStream(String streamName) { // private InitialPositionInStreamExtended getInitialPositionExtendedForStream(String streamName) {
// return multiStreamTracker() == null ? // return multiStreamTracker() == null ?

View file

@ -105,7 +105,7 @@ public class ShardInfo {
@Override @Override
public int hashCode() { public int hashCode() {
return new HashCodeBuilder() return new HashCodeBuilder()
.append(concurrencyToken).append(parentShardIds).append(shardId).append(streamName).toHashCode(); .append(concurrencyToken).append(parentShardIds).append(shardId).append(streamName.orElse("")).toHashCode();
} }
/** /**
@ -130,7 +130,7 @@ public class ShardInfo {
ShardInfo other = (ShardInfo) obj; ShardInfo other = (ShardInfo) obj;
return new EqualsBuilder().append(concurrencyToken, other.concurrencyToken) return new EqualsBuilder().append(concurrencyToken, other.concurrencyToken)
.append(parentShardIds, other.parentShardIds).append(shardId, other.shardId) .append(parentShardIds, other.parentShardIds).append(shardId, other.shardId)
.append(streamName, other.streamName).isEquals(); .append(streamName.orElse(""), other.streamName.orElse("")).isEquals();
} }

View file

@ -24,8 +24,7 @@ public interface ShardRecordProcessorFactory {
* *
* @return * @return
*/ */
// TODO : Halo : Reenable ShardRecordProcessor shardRecordProcessor();
// ShardRecordProcessor shardRecordProcessor();
/** /**
* Returns a new instance of the ShardRecordProcessor for a stream * Returns a new instance of the ShardRecordProcessor for a stream
@ -33,6 +32,6 @@ public interface ShardRecordProcessorFactory {
* @return ShardRecordProcessor * @return ShardRecordProcessor
*/ */
default ShardRecordProcessor shardRecordProcessor(String streamName) { default ShardRecordProcessor shardRecordProcessor(String streamName) {
throw new UnsupportedOperationException(); return shardRecordProcessor();
} }
} }

View file

@ -52,6 +52,7 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.CheckpointFactory; import software.amazon.kinesis.checkpoint.CheckpointFactory;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.exceptions.KinesisClientLibNonRetryableException; import software.amazon.kinesis.exceptions.KinesisClientLibNonRetryableException;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseManagementConfig;
@ -498,6 +499,11 @@ public class SchedulerTest {
} }
}; };
} }
@Override
public ShardRecordProcessor shardRecordProcessor(String streamName) {
return shardRecordProcessor();
}
} }
private class TestKinesisLeaseManagementFactory implements LeaseManagementFactory { private class TestKinesisLeaseManagementFactory implements LeaseManagementFactory {
@ -511,6 +517,12 @@ public class SchedulerTest {
return shardSyncTaskManager; return shardSyncTaskManager;
} }
@Override
public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory,
StreamConfig streamConfig) {
return shardSyncTaskManager;
}
@Override @Override
public DynamoDBLeaseRefresher createLeaseRefresher() { public DynamoDBLeaseRefresher createLeaseRefresher() {
return dynamoDBLeaseRefresher; return dynamoDBLeaseRefresher;
@ -520,6 +532,11 @@ public class SchedulerTest {
public ShardDetector createShardDetector() { public ShardDetector createShardDetector() {
return shardDetector; return shardDetector;
} }
@Override
public ShardDetector createShardDetector(StreamConfig streamConfig) {
return shardDetector;
}
} }
private class TestKinesisCheckpointFactory implements CheckpointFactory { private class TestKinesisCheckpointFactory implements CheckpointFactory {