From 484b6cfcd9e75edc85254023334dd2f6216a6f2b Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 18 Feb 2020 22:14:39 -0800 Subject: [PATCH] Testcase fixes --- .../amazon/kinesis/coordinator/Scheduler.java | 62 +++++++++++------- .../kinesis/leases/LeaseManagementConfig.java | 65 +++++++++++-------- .../amazon/kinesis/leases/ShardInfo.java | 4 +- .../ShardRecordProcessorFactory.java | 5 +- .../kinesis/coordinator/SchedulerTest.java | 17 +++++ 5 files changed, 97 insertions(+), 56 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index fffc88a6..f564f47b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -16,7 +16,6 @@ package software.amazon.kinesis.coordinator; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -34,7 +33,6 @@ import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import io.reactivex.plugins.RxJavaPlugins; import lombok.AccessLevel; import lombok.Getter; @@ -42,12 +40,10 @@ import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.Validate; -import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.Either; +import software.amazon.awssdk.utils.Validate; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; -import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; @@ -124,7 +120,8 @@ public class Scheduler implements Runnable { private final MetricsFactory metricsFactory; private final long failoverTimeMillis; private final long taskBackoffTimeMillis; - private final Either applicationStreamTracker; + private final Either appStreamTracker; + private final Map currentStreamConfigMap; private final long listShardsBackoffTimeMillis; private final int maxListShardsRetryAttempts; private final LeaseRefresher leaseRefresher; @@ -186,15 +183,20 @@ public class Scheduler implements Runnable { this.applicationName = this.coordinatorConfig.applicationName(); final MultiStreamTracker multiStreamTracker = this.retrievalConfig.multiStreamTracker(); if(multiStreamTracker == null) { - this.applicationStreamTracker = Either.right(new StreamConfig(this.retrievalConfig.streamName(), - this.retrievalConfig.initialPositionInStreamExtended())); + final StreamConfig streamConfig = new StreamConfig(this.retrievalConfig.streamName(), + this.retrievalConfig.initialPositionInStreamExtended()); + this.appStreamTracker = Either.right(streamConfig); + this.currentStreamConfigMap = new HashMap() {{ + put(streamConfig.streamName(), streamConfig); + }}; } else { - this.applicationStreamTracker = Either.left(multiStreamTracker); + this.appStreamTracker = Either.left(multiStreamTracker); + this.currentStreamConfigMap = multiStreamTracker.streamConfigMap(); } this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts(); this.metricsFactory = this.metricsConfig.metricsFactory(); // Determine leaseSerializer based on availability of MultiStreamTracker. - final LeaseSerializer leaseSerializer = this.applicationStreamTracker.map(mst -> true, sc -> false) ? + final LeaseSerializer leaseSerializer = this.appStreamTracker.map(mst -> true, sc -> false) ? new DynamoDBMultiStreamLeaseSerializer() : new DynamoDBLeaseSerializer(); this.leaseCoordinator = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer) @@ -218,8 +220,8 @@ public class Scheduler implements Runnable { // TODO : Halo : Handle case of no StreamConfig present in streamConfigMap() for the supplied streamName. // TODO : Pass the immutable map here instead of using mst.streamConfigMap() this.shardSyncTaskManagerProvider = streamName -> this.leaseManagementConfig - .leaseManagementFactory(leaseSerializer).createShardSyncTaskManager(this.metricsFactory, - applicationStreamTracker.map(mst -> mst.streamConfigMap().get(streamName), sc -> sc)); + .leaseManagementFactory(leaseSerializer) + .createShardSyncTaskManager(this.metricsFactory, this.currentStreamConfigMap.get(streamName)); this.shardPrioritization = this.coordinatorConfig.shardPrioritization(); this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion(); this.skipShardSyncAtWorkerInitializationIfLeasesExist = @@ -245,6 +247,7 @@ public class Scheduler implements Runnable { this.shardDetectorProvider = streamName -> createOrGetShardSyncTaskManager(streamName).shardDetector(); this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards(); this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); + // TODO : Halo : Check if this needs to be per stream. this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); } @@ -293,11 +296,9 @@ public class Scheduler implements Runnable { if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { // TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing // TODO: for already synced streams - final Map streamConfigMap = applicationStreamTracker - .map(mst -> mst.streamConfigMap(), sc -> Collections.singletonMap(sc.streamName(), sc)); - for(String streamName : streamConfigMap.keySet().stream().collect(Collectors.toList())) { + for(String streamName : currentStreamConfigMap.keySet().stream().collect(Collectors.toList())) { log.info("Syncing Kinesis shard info"); - final StreamConfig streamConfig = streamConfigMap.get(streamName); + final StreamConfig streamConfig = currentStreamConfigMap.get(streamName); ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamName), leaseRefresher, streamConfig.initialPositionInStreamExtended(), cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, @@ -348,22 +349,27 @@ public class Scheduler implements Runnable { @VisibleForTesting void runProcessLoop() { try { - boolean foundCompletedShard = false; Set assignedShards = new HashSet<>(); + final Set completedShards = new HashSet<>(); for (ShardInfo shardInfo : getShardInfoForAssignments()) { ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, processorConfig.shardRecordProcessorFactory()); if (shardConsumer.isShutdown() && shardConsumer.shutdownReason().equals(ShutdownReason.SHARD_END)) { - foundCompletedShard = true; + completedShards.add(shardInfo); } else { shardConsumer.executeLifecycle(); } assignedShards.add(shardInfo); } - if (foundCompletedShard) { - shardSyncTaskManager.syncShardAndLeaseInfo(); + for (ShardInfo completedShard : completedShards) { + final String streamName = completedShard.streamName() + .orElse(appStreamTracker.map(mst -> null, sc -> sc.streamName())); + Validate.notEmpty(streamName, "Stream name should not be null"); + if (createOrGetShardSyncTaskManager(streamName).syncShardAndLeaseInfo()) { + log.info("Found completed shard, initiated new ShardSyncTak for " + completedShard.toString()); + } } // clean up shard consumers for unassigned shards @@ -630,12 +636,20 @@ public class Scheduler implements Runnable { RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, metricsFactory); ShardRecordProcessorCheckpointer checkpointer = coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo, checkpoint); + // The only case where streamName is not available will be when multistreamtracker not set. In this case, + // get the default stream name for the single stream application. + final String streamName = shardInfo.streamName().orElse(appStreamTracker.map(mst -> null, sc -> sc.streamName())); + Validate.notEmpty(streamName, "StreamName should not be empty"); + // Irrespective of single stream app or multi stream app, streamConfig should always be available. + // TODO: Halo : if not available, construct a default config ? + final StreamConfig streamConfig = currentStreamConfigMap.get(streamName); + Validate.notNull(streamConfig, "StreamConfig should not be empty"); ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo, - shardInfo.streamName(), + streamConfig.streamName(), leaseCoordinator, executorService, cache, - shardRecordProcessorFactory.shardRecordProcessor(), + shardRecordProcessorFactory.shardRecordProcessor(streamName), checkpoint, checkpointer, parentShardPollIntervalMillis, @@ -645,10 +659,10 @@ public class Scheduler implements Runnable { maxListShardsRetryAttempts, processorConfig.callProcessRecordsEvenForEmptyRecordList(), shardConsumerDispatchPollIntervalMillis, - initialPosition, + streamConfig.initialPositionInStreamExtended(), cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, - shardDetector, + shardDetectorProvider.apply(streamConfig.streamName()), aggregatorUtil, hierarchicalShardSyncer, metricsFactory); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 3c71934b..6504bb24 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -268,36 +268,40 @@ public class LeaseManagementConfig { @Deprecated public LeaseManagementFactory leaseManagementFactory() { - Validate.notEmpty(streamName(), "Stream name is empty"); - return new DynamoDBLeaseManagementFactory(kinesisClient(), - streamName(), - dynamoDBClient(), - tableName(), - workerIdentifier(), - executorService(), - initialPositionInStream(), - failoverTimeMillis(), - epsilonMillis(), - maxLeasesForWorker(), - maxLeasesToStealAtOneTime(), - maxLeaseRenewalThreads(), - cleanupLeasesUponShardCompletion(), - ignoreUnexpectedChildShards(), - shardSyncIntervalMillis(), - consistentReads(), - listShardsBackoffTimeInMillis(), - maxListShardsRetryAttempts(), - maxCacheMissesBeforeReload(), - listShardsCacheAllowedAgeInSeconds(), - cacheMissWarningModulus(), - initialLeaseTableReadCapacity(), - initialLeaseTableWriteCapacity(), - hierarchicalShardSyncer(), - tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode()); + if (leaseManagementFactory == null) { + Validate.notEmpty(streamName(), "Stream name is empty"); + leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(), + streamName(), + dynamoDBClient(), + tableName(), + workerIdentifier(), + executorService(), + initialPositionInStream(), + failoverTimeMillis(), + epsilonMillis(), + maxLeasesForWorker(), + maxLeasesToStealAtOneTime(), + maxLeaseRenewalThreads(), + cleanupLeasesUponShardCompletion(), + ignoreUnexpectedChildShards(), + shardSyncIntervalMillis(), + consistentReads(), + listShardsBackoffTimeInMillis(), + maxListShardsRetryAttempts(), + maxCacheMissesBeforeReload(), + listShardsCacheAllowedAgeInSeconds(), + cacheMissWarningModulus(), + initialLeaseTableReadCapacity(), + initialLeaseTableWriteCapacity(), + hierarchicalShardSyncer(), + tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode()); + } + return leaseManagementFactory; } public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer) { - return new DynamoDBLeaseManagementFactory(kinesisClient(), + if(leaseManagementFactory == null) { + leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(), dynamoDBClient(), tableName(), workerIdentifier(), @@ -323,6 +327,13 @@ public class LeaseManagementConfig { dynamoDbRequestTimeout(), billingMode(), leaseSerializer); + } + return leaseManagementFactory; + } + + public LeaseManagementConfig leaseManagementFactory(final LeaseManagementFactory leaseManagementFactory) { + this.leaseManagementFactory = leaseManagementFactory; + return this; } // private InitialPositionInStreamExtended getInitialPositionExtendedForStream(String streamName) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java index 6a6c73b7..2d51c0bc 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java @@ -105,7 +105,7 @@ public class ShardInfo { @Override public int hashCode() { return new HashCodeBuilder() - .append(concurrencyToken).append(parentShardIds).append(shardId).append(streamName).toHashCode(); + .append(concurrencyToken).append(parentShardIds).append(shardId).append(streamName.orElse("")).toHashCode(); } /** @@ -130,7 +130,7 @@ public class ShardInfo { ShardInfo other = (ShardInfo) obj; return new EqualsBuilder().append(concurrencyToken, other.concurrencyToken) .append(parentShardIds, other.parentShardIds).append(shardId, other.shardId) - .append(streamName, other.streamName).isEquals(); + .append(streamName.orElse(""), other.streamName.orElse("")).isEquals(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ShardRecordProcessorFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ShardRecordProcessorFactory.java index 973b0393..4b691401 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ShardRecordProcessorFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ShardRecordProcessorFactory.java @@ -24,8 +24,7 @@ public interface ShardRecordProcessorFactory { * * @return */ - // TODO : Halo : Reenable -// ShardRecordProcessor shardRecordProcessor(); + ShardRecordProcessor shardRecordProcessor(); /** * Returns a new instance of the ShardRecordProcessor for a stream @@ -33,6 +32,6 @@ public interface ShardRecordProcessorFactory { * @return ShardRecordProcessor */ default ShardRecordProcessor shardRecordProcessor(String streamName) { - throw new UnsupportedOperationException(); + return shardRecordProcessor(); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 8be1bb8f..a17ad4cb 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -52,6 +52,7 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointFactory; +import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.exceptions.KinesisClientLibNonRetryableException; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementConfig; @@ -498,6 +499,11 @@ public class SchedulerTest { } }; } + + @Override + public ShardRecordProcessor shardRecordProcessor(String streamName) { + return shardRecordProcessor(); + } } private class TestKinesisLeaseManagementFactory implements LeaseManagementFactory { @@ -511,6 +517,12 @@ public class SchedulerTest { return shardSyncTaskManager; } + @Override + public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, + StreamConfig streamConfig) { + return shardSyncTaskManager; + } + @Override public DynamoDBLeaseRefresher createLeaseRefresher() { return dynamoDBLeaseRefresher; @@ -520,6 +532,11 @@ public class SchedulerTest { public ShardDetector createShardDetector() { return shardDetector; } + + @Override + public ShardDetector createShardDetector(StreamConfig streamConfig) { + return shardDetector; + } } private class TestKinesisCheckpointFactory implements CheckpointFactory {