Added Unit Test cases, code comments and other code refactoring

This commit is contained in:
Ashwin Giridharan 2020-03-13 01:36:33 -07:00
parent 255ae932d2
commit 2e113dbd6c
20 changed files with 562 additions and 82 deletions

View file

@ -3,41 +3,67 @@ package software.amazon.kinesis.common;
import com.google.common.base.Joiner;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors;
import software.amazon.awssdk.utils.Validate;
import java.util.Optional;
import java.util.regex.Pattern;
@RequiredArgsConstructor
@EqualsAndHashCode
@Getter
@Accessors(fluent = true)
@EqualsAndHashCode @Getter @Accessors(fluent = true)
public class StreamIdentifier {
private final String accountName;
private final Optional<String> accountIdOptional;
private final String streamName;
private final Long streamCreationEpoch;
private final Optional<Long> streamCreationEpochOptional;
private static final String DEFAULT_ACCOUNT = "default";
private static final String DELIMITER = ":";
private static final Pattern PATTERN = Pattern.compile(".*" + ":" + ".*" + ":" + "[0-9]*");
private StreamIdentifier(Optional<String> accountIdOptional, String streamName,
Optional<Long> streamCreationEpochOptional) {
Validate.isTrue((accountIdOptional.isPresent() && streamCreationEpochOptional.isPresent()) ||
(!accountIdOptional.isPresent() && !streamCreationEpochOptional.isPresent()),
"AccountId and StreamCreationEpoch must either be present together or not");
this.accountIdOptional = accountIdOptional;
this.streamName = streamName;
this.streamCreationEpochOptional = streamCreationEpochOptional;
}
/**
* Serialize the current StreamIdentifier instance.
* @return
*/
public String serialize() {
return accountIdOptional.isPresent() ?
Joiner.on(DELIMITER).join(accountIdOptional.get(), streamName, streamCreationEpochOptional.get()) :
streamName;
}
@Override
public String toString() {
return Joiner.on(DELIMITER).join(accountName, streamName, streamCreationEpoch);
return serialize();
}
public static StreamIdentifier fromString(String streamIdentifier) {
if (PATTERN.matcher(streamIdentifier).matches()) {
final String[] split = streamIdentifier.split(DELIMITER);
return new StreamIdentifier(split[0], split[1], Long.parseLong(split[2]));
/**
* Create a multi stream instance for StreamIdentifier from serialized stream identifier.
* @param streamIdentifierSer
* @return StreamIdentifier
*/
public static StreamIdentifier multiStreamInstance(String streamIdentifierSer) {
if (PATTERN.matcher(streamIdentifierSer).matches()) {
final String[] split = streamIdentifierSer.split(DELIMITER);
return new StreamIdentifier(Optional.of(split[0]), split[1], Optional.of(Long.parseLong(split[2])));
} else {
throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifier);
throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifierSer);
}
}
public static StreamIdentifier fromStreamName(String streamName) {
/**
* Create a single stream instance for StreamIdentifier from stream name.
* @param streamName
* @return StreamIdentifier
*/
public static StreamIdentifier singleStreamInstance(String streamName) {
Validate.notEmpty(streamName, "StreamName should not be empty");
return new StreamIdentifier(DEFAULT_ACCOUNT, streamName, 0L);
return new StreamIdentifier(Optional.empty(), streamName, Optional.empty());
}
}

View file

@ -42,7 +42,6 @@ import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.utils.Either;
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
@ -295,9 +294,10 @@ public class Scheduler implements Runnable {
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
// TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing
// TODO: for already synced streams
for(StreamIdentifier streamIdentifier : currentStreamConfigMap.keySet().stream().collect(Collectors.toList())) {
log.info("Syncing Kinesis shard info");
final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier);
for(Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) {
final StreamIdentifier streamIdentifier = streamConfigEntry.getKey();
log.info("Syncing Kinesis shard info for " + streamIdentifier);
final StreamConfig streamConfig = streamConfigEntry.getValue();
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamIdentifier),
leaseRefresher, streamConfig.initialPositionInStreamExtended(),
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L,
@ -363,7 +363,7 @@ public class Scheduler implements Runnable {
}
for (ShardInfo completedShard : completedShards) {
final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifier());
final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifierSerOpt());
if (createOrGetShardSyncTaskManager(streamIdentifier).syncShardAndLeaseInfo()) {
log.info("Found completed shard, initiated new ShardSyncTak for " + completedShard.toString());
}
@ -635,7 +635,7 @@ public class Scheduler implements Runnable {
checkpoint);
// The only case where streamName is not available will be when multistreamtracker not set. In this case,
// get the default stream name for the single stream application.
final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifier());
final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifierSerOpt());
// Irrespective of single stream app or multi stream app, streamConfig should always be available.
final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier);
Validate.notNull(streamConfig, "StreamConfig should not be empty");
@ -712,7 +712,7 @@ public class Scheduler implements Runnable {
private StreamIdentifier getStreamIdentifier(Optional<String> streamIdentifierString) {
final StreamIdentifier streamIdentifier;
if(streamIdentifierString.isPresent()) {
streamIdentifier = StreamIdentifier.fromString(streamIdentifierString.get());
streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierString.get());
} else {
Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode");
streamIdentifier = this.currentStreamConfigMap.values().iterator().next().streamIdentifier();

View file

@ -29,6 +29,7 @@ import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;
@ -162,7 +163,7 @@ public class HierarchicalShardSyncer {
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<Lease> streamLeases = new ArrayList<>();
for (Lease lease : leaseRefresher.listLeases()) {
if (streamIdentifier.toString().equals(((MultiStreamLease)lease).streamIdentifier())) {
if (streamIdentifier.serialize().equals(((MultiStreamLease)lease).streamIdentifier())) {
streamLeases.add(lease);
}
}
@ -761,7 +762,7 @@ public class HierarchicalShardSyncer {
private static Lease newKCLMultiStreamLease(final Shard shard, final StreamIdentifier streamIdentifier) {
MultiStreamLease newLease = new MultiStreamLease();
newLease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.toString(), shard.shardId()));
newLease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), shard.shardId()));
List<String> parentShardIds = new ArrayList<>(2);
if (shard.parentShardId() != null) {
parentShardIds.add(shard.parentShardId());
@ -771,7 +772,7 @@ public class HierarchicalShardSyncer {
}
newLease.parentShardIds(parentShardIds);
newLease.ownerSwitchesSinceCheckpoint(0L);
newLease.streamIdentifier(streamIdentifier.toString());
newLease.streamIdentifier(streamIdentifier.serialize());
newLease.shardId(shard.shardId());
return newLease;
}
@ -857,7 +858,8 @@ public class HierarchicalShardSyncer {
@Data
@Accessors(fluent = true)
private static class MultiStreamArgs {
@VisibleForTesting
static class MultiStreamArgs {
private final Boolean isMultiStreamMode;
private final StreamIdentifier streamIdentifier;
}

View file

@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@ -78,7 +77,7 @@ public class KinesisShardDetector implements ShardDetector {
public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload,
int cacheMissWarningModulus) {
this(kinesisClient, StreamIdentifier.fromStreamName(streamName), listShardsBackoffTimeInMillis, maxListShardsRetryAttempts,
this(kinesisClient, StreamIdentifier.singleStreamInstance(streamName), listShardsBackoffTimeInMillis, maxListShardsRetryAttempts,
listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, cacheMissWarningModulus,
LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
}

View file

@ -273,6 +273,12 @@ public class LeaseManagementConfig {
return hierarchicalShardSyncer;
}
/**
* Vends HierarchicalShardSyncer based on MultiStreamingMode. With MultiStreamMode shard syncer creates
* leases to accommodate more than one stream.
* @param isMultiStreamingMode
* @return HierarchicalShardSyncer
*/
public HierarchicalShardSyncer hierarchicalShardSyncer(boolean isMultiStreamingMode) {
if(hierarchicalShardSyncer == null) {
hierarchicalShardSyncer = new HierarchicalShardSyncer(isMultiStreamingMode);
@ -313,6 +319,12 @@ public class LeaseManagementConfig {
return leaseManagementFactory;
}
/**
* Vends LeaseManagementFactory that performs serde based on leaseSerializer and shard sync based on isMultiStreamingMode
* @param leaseSerializer
* @param isMultiStreamingMode
* @return LeaseManagementFactory
*/
public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer, boolean isMultiStreamingMode) {
if(leaseManagementFactory == null) {
leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(),
@ -345,17 +357,14 @@ public class LeaseManagementConfig {
return leaseManagementFactory;
}
/**
* Set leaseManagementFactory and return the current LeaseManagementConfig instance.
* @param leaseManagementFactory
* @return LeaseManagementConfig
*/
public LeaseManagementConfig leaseManagementFactory(final LeaseManagementFactory leaseManagementFactory) {
this.leaseManagementFactory = leaseManagementFactory;
return this;
}
// private InitialPositionInStreamExtended getInitialPositionExtendedForStream(String streamName) {
// return multiStreamTracker() == null ?
// initialPositionInStream() :
// multiStreamTracker().initialPositionInStreamExtended(streamName) == null ?
// initialPositionInStream() :
// multiStreamTracker().initialPositionInStreamExtended(streamName);
// }
}

View file

@ -37,7 +37,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ToString
public class ShardInfo {
private final Optional<String> streamIdentifier;
private final Optional<String> streamIdentifierSerOpt;
private final String shardId;
private final String concurrencyToken;
// Sorted list of parent shardIds.
@ -63,11 +63,20 @@ public class ShardInfo {
this(shardId, concurrencyToken, parentShardIds, checkpoint, null);
}
/**
* Creates a new ShardInfo object that has an option to pass a serialized streamIdentifier.
* The checkpoint is not part of the equality, but is used for debugging output.
* @param shardId
* @param concurrencyToken
* @param parentShardIds
* @param checkpoint
* @param streamIdentifierSer
*/
public ShardInfo(@NonNull final String shardId,
final String concurrencyToken,
final Collection<String> parentShardIds,
final ExtendedSequenceNumber checkpoint,
final String streamIdentifier) {
final String streamIdentifierSer) {
this.shardId = shardId;
this.concurrencyToken = concurrencyToken;
this.parentShardIds = new LinkedList<>();
@ -78,7 +87,7 @@ public class ShardInfo {
// This makes it easy to check for equality in ShardInfo.equals method.
Collections.sort(this.parentShardIds);
this.checkpoint = checkpoint;
this.streamIdentifier = Optional.ofNullable(streamIdentifier);
this.streamIdentifierSerOpt = Optional.ofNullable(streamIdentifierSer);
}
/**
@ -105,7 +114,7 @@ public class ShardInfo {
@Override
public int hashCode() {
return new HashCodeBuilder()
.append(concurrencyToken).append(parentShardIds).append(shardId).append(streamIdentifier.orElse("")).toHashCode();
.append(concurrencyToken).append(parentShardIds).append(shardId).append(streamIdentifierSerOpt.orElse("")).toHashCode();
}
/**
@ -130,18 +139,18 @@ public class ShardInfo {
ShardInfo other = (ShardInfo) obj;
return new EqualsBuilder().append(concurrencyToken, other.concurrencyToken)
.append(parentShardIds, other.parentShardIds).append(shardId, other.shardId)
.append(streamIdentifier.orElse(""), other.streamIdentifier.orElse("")).isEquals();
.append(streamIdentifierSerOpt.orElse(""), other.streamIdentifierSerOpt.orElse("")).isEquals();
}
/**
*
* Utility method to derive lease key from ShardInfo
* @param shardInfo
* @return
* @return lease key
*/
public static String getLeaseKey(ShardInfo shardInfo) {
return shardInfo.streamIdentifier().isPresent() ?
MultiStreamLease.getLeaseKey(shardInfo.streamIdentifier().get(), shardInfo.shardId()) :
return shardInfo.streamIdentifierSerOpt().isPresent() ?
MultiStreamLease.getLeaseKey(shardInfo.streamIdentifierSerOpt().get(), shardInfo.shardId()) :
shardInfo.shardId();
}

View file

@ -378,7 +378,11 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
return leases.stream().map(DynamoDBLeaseCoordinator::convertLeaseToAssignment).collect(Collectors.toList());
}
// TODO : Halo : Check for better way
/**
* Utility method to convert the basic lease or multistream lease to ShardInfo
* @param lease
* @return ShardInfo
*/
public static ShardInfo convertLeaseToAssignment(final Lease lease) {
if (lease instanceof MultiStreamLease) {
return new ShardInfo(((MultiStreamLease) lease).shardId(), lease.concurrencyToken().toString(), lease.parentShardIds(),

View file

@ -16,9 +16,6 @@
package software.amazon.kinesis.leases.dynamodb;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import lombok.Data;
@ -26,8 +23,6 @@ import lombok.NonNull;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
@ -331,7 +326,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode) {
this(kinesisClient, new StreamConfig(StreamIdentifier.fromStreamName(streamName), initialPositionInStream), dynamoDBClient, tableName,
this(kinesisClient, new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream), dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
@ -391,6 +386,35 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.streamConfig = streamConfig;
}
/**
* Constructor.
* @param kinesisClient
* @param dynamoDBClient
* @param tableName
* @param workerIdentifier
* @param executorService
* @param failoverTimeMillis
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
* @param maxLeaseRenewalThreads
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIntervalMillis
* @param consistentReads
* @param listShardsBackoffTimeMillis
* @param maxListShardsRetryAttempts
* @param maxCacheMissesBeforeReload
* @param listShardsCacheAllowedAgeInSeconds
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param leaseSerializer
*/
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis,
@ -457,6 +481,12 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
metricsFactory);
}
/**
* Create ShardSyncTaskManager from the streamConfig passed
* @param metricsFactory
* @param streamConfig
* @return ShardSyncTaskManager
*/
@Override
public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) {
return new ShardSyncTaskManager(this.createShardDetector(streamConfig),
@ -476,7 +506,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
tableCreatorCallback, dynamoDbRequestTimeout, billingMode);
}
@Override @Deprecated
@Override
@Deprecated
public ShardDetector createShardDetector() {
return new KinesisShardDetector(kinesisClient, streamConfig.streamIdentifier(),
listShardsBackoffTimeMillis, maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds,
@ -487,7 +518,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* KinesisShardDetector supports reading from service only using streamName. Support for accountId and
* stream creation epoch is yet to be provided.
* @param streamConfig
* @return
* @return ShardDetector
*/
@Override
public ShardDetector createShardDetector(StreamConfig streamConfig) {

View file

@ -12,9 +12,9 @@ import java.util.Map;
public interface MultiStreamTracker {
/**
* Returns the map of streams and its associated stream specific config.
* Returns the list of stream config, to be processed by the current application.
*
* @return List of stream names
* @return List of StreamConfig
*/
List<StreamConfig> streamConfigList();
}

View file

@ -29,7 +29,7 @@ public interface ShardRecordProcessorFactory {
ShardRecordProcessor shardRecordProcessor();
/**
* Returns a new instance of the ShardRecordProcessor for a stream
* Returns a new instance of the ShardRecordProcessor for a stream identifier
* @param streamIdentifier
* @return ShardRecordProcessor
*/

View file

@ -94,7 +94,7 @@ public class RetrievalConfig {
@NonNull String applicationName) {
this.kinesisClient = kinesisAsyncClient;
this.appStreamTracker = Either
.right(new StreamConfig(StreamIdentifier.fromStreamName(streamName), initialPositionInStreamExtended));
.right(new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStreamExtended));
this.applicationName = applicationName;
}

View file

@ -85,6 +85,7 @@ public class FanOutConfig implements RetrievalSpecificConfig {
return new FanOutRetrievalFactory(kinesisClient, streamName, this::getOrCreateConsumerArn);
}
// TODO : Halo. Need Stream Specific ConsumerArn to be passed from Customer
private String getOrCreateConsumerArn(String streamName) {
if (consumerArn != null) {
return consumerArn;

View file

@ -49,10 +49,10 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
@Override
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
final MetricsFactory metricsFactory) {
final Optional<String> streamIdentifierStr = shardInfo.streamIdentifier();
final Optional<String> streamIdentifierStr = shardInfo.streamIdentifierSerOpt();
final String streamName;
if(streamIdentifierStr.isPresent()) {
streamName = StreamIdentifier.fromString(streamIdentifierStr.get()).streamName();
streamName = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get()).streamName();
} else {
streamName = defaultStreamName;
}

View file

@ -74,10 +74,18 @@ public class KinesisDataFetcher {
@Deprecated
public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory) {
this(kinesisClient, StreamIdentifier.fromStreamName(streamName), shardId, maxRecords, metricsFactory, PollingConfig.DEFAULT_REQUEST_TIMEOUT);
this(kinesisClient, StreamIdentifier.singleStreamInstance(streamName), shardId, maxRecords, metricsFactory, PollingConfig.DEFAULT_REQUEST_TIMEOUT);
}
// Changing the constructor directly as this is an internal API
/**
* Constructs KinesisDataFetcher.
* @param kinesisClient
* @param streamIdentifier
* @param shardId
* @param maxRecords
* @param metricsFactory
* @param maxFutureWait
*/
public KinesisDataFetcher(KinesisAsyncClient kinesisClient, StreamIdentifier streamIdentifier, String shardId, int maxRecords, MetricsFactory metricsFactory, Duration maxFutureWait) {
this.kinesisClient = kinesisClient;
this.streamIdentifier = streamIdentifier;

View file

@ -63,9 +63,9 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
@Override
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
@NonNull final MetricsFactory metricsFactory) {
final StreamIdentifier streamIdentifier = shardInfo.streamIdentifier().isPresent() ?
StreamIdentifier.fromString(shardInfo.streamIdentifier().get()) :
StreamIdentifier.fromStreamName(streamName);
final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ?
StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) :
StreamIdentifier.singleStreamInstance(streamName);
return new SynchronousGetRecordsRetrievalStrategy(
new KinesisDataFetcher(kinesisClient, streamIdentifier, shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout));
}

View file

@ -68,9 +68,9 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory
@Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
@NonNull final MetricsFactory metricsFactory) {
final StreamIdentifier streamIdentifier = shardInfo.streamIdentifier().isPresent() ?
StreamIdentifier.fromString(shardInfo.streamIdentifier().get()) :
StreamIdentifier.fromStreamName(streamName);
final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ?
StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) :
StreamIdentifier.singleStreamInstance(streamName);
return new SynchronousGetRecordsRetrievalStrategy(
new KinesisDataFetcher(kinesisClient, streamIdentifier, shardInfo.shardId(),
maxRecords, metricsFactory, maxFutureWait));

View file

@ -22,8 +22,10 @@ import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
@ -32,14 +34,20 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.atMost;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Collectors;
import io.reactivex.plugins.RxJavaPlugins;
import lombok.RequiredArgsConstructor;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -52,8 +60,11 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.CheckpointFactory;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.KinesisClientLibException;
import software.amazon.kinesis.exceptions.KinesisClientLibNonRetryableException;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig;
@ -63,6 +74,8 @@ import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
@ -73,6 +86,7 @@ import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.ShardRecordProcessor;
@ -124,6 +138,11 @@ public class SchedulerTest {
private Checkpointer checkpoint;
@Mock
private WorkerStateChangeListener workerStateChangeListener;
@Mock
private MultiStreamTracker multiStreamTracker;
private Map<StreamIdentifier, ShardSyncTaskManager> shardSyncTaskManagerMap = new HashMap<>();
private Map<StreamIdentifier, ShardDetector> shardDetectorMap = new HashMap<>();
@Before
public void setup() {
@ -132,13 +151,25 @@ public class SchedulerTest {
checkpointConfig = new CheckpointConfig().checkpointFactory(new TestKinesisCheckpointFactory());
coordinatorConfig = new CoordinatorConfig(applicationName).parentShardPollIntervalMillis(100L).workerStateChangeListener(workerStateChangeListener);
leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName,
workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory());
workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(false, false));
lifecycleConfig = new LifecycleConfig();
metricsConfig = new MetricsConfig(cloudWatchClient, namespace);
processorConfig = new ProcessorConfig(shardRecordProcessorFactory);
retrievalConfig = new RetrievalConfig(kinesisClient, streamName, applicationName)
.retrievalFactory(retrievalFactory);
final List<StreamConfig> streamConfigList = new ArrayList<StreamConfig>() {{
add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc1:stream1:1"), InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST)));
add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc1:stream2:2"), InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST)));
add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc2:stream1:1"), InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST)));
add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc2:stream2:3"), InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST)));
}};
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList);
when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(MetricsFactory.class))).thenReturn(recordsPublisher);
@ -245,7 +276,10 @@ public class SchedulerTest {
public final void testInitializationFailureWithRetries() throws Exception {
doNothing().when(leaseCoordinator).initialize();
when(shardDetector.listShards()).thenThrow(new RuntimeException());
leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName,
workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(false, true));
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
scheduler.run();
verify(shardDetector, times(coordinatorConfig.maxInitializationAttempts())).listShards();
@ -255,6 +289,8 @@ public class SchedulerTest {
public final void testInitializationFailureWithRetriesWithConfiguredMaxInitializationAttempts() throws Exception {
final int maxInitializationAttempts = 5;
coordinatorConfig.maxInitializationAttempts(maxInitializationAttempts);
leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName,
workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(false, true));
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
@ -267,6 +303,76 @@ public class SchedulerTest {
verify(shardDetector, times(maxInitializationAttempts)).listShards();
}
@Test
public final void testMultiStreamInitialization() throws ProvisionedThroughputException, DependencyException {
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
scheduler.initialize();
shardDetectorMap.values().stream()
.forEach(shardDetector -> verify(shardDetector, times(1)).listShards());
}
@Test
public final void testMultiStreamInitializationWithFailures() {
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient,
workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(true, false));
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
scheduler.initialize();
// Note : As of today we retry for all streams in the next attempt. Hence the retry for each stream will vary.
// At the least we expect 2 retries for each stream. Since there are 4 streams, we expect at most
// the number of calls to be 5.
shardDetectorMap.values().stream()
.forEach(shardDetector -> verify(shardDetector, atLeast(2)).listShards());
shardDetectorMap.values().stream()
.forEach(shardDetector -> verify(shardDetector, atMost(5)).listShards());
}
@Test
public final void testMultiStreamConsumersAreBuiltOncePerAccountStreamShard() throws KinesisClientLibException {
final String shardId = "shardId-000000000000";
final String concurrencyToken = "concurrencyToken";
final ExtendedSequenceNumber firstSequenceNumber = ExtendedSequenceNumber.TRIM_HORIZON;
final ExtendedSequenceNumber secondSequenceNumber = new ExtendedSequenceNumber("1000");
final ExtendedSequenceNumber finalSequenceNumber = new ExtendedSequenceNumber("2000");
final List<ShardInfo> initialShardInfo = multiStreamTracker.streamConfigList().stream()
.map(sc -> new ShardInfo(shardId, concurrencyToken, null, firstSequenceNumber,
sc.streamIdentifier().serialize())).collect(Collectors.toList());
final List<ShardInfo> firstShardInfo = multiStreamTracker.streamConfigList().stream()
.map(sc -> new ShardInfo(shardId, concurrencyToken, null, secondSequenceNumber,
sc.streamIdentifier().serialize())).collect(Collectors.toList());
final List<ShardInfo> secondShardInfo = multiStreamTracker.streamConfigList().stream()
.map(sc -> new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber,
sc.streamIdentifier().serialize())).collect(Collectors.toList());
final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null);
when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo);
when(checkpoint.getCheckpointObject(anyString())).thenReturn(firstCheckpoint);
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
Scheduler schedulerSpy = spy(scheduler);
schedulerSpy.runProcessLoop();
schedulerSpy.runProcessLoop();
schedulerSpy.runProcessLoop();
initialShardInfo.stream().forEach(
shardInfo -> verify(schedulerSpy).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory)));
firstShardInfo.stream().forEach(
shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory)));
secondShardInfo.stream().forEach(
shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory)));
}
@Test
public final void testSchedulerShutdown() {
scheduler.shutdown();
@ -508,7 +614,12 @@ public class SchedulerTest {
}
@RequiredArgsConstructor
private class TestKinesisLeaseManagementFactory implements LeaseManagementFactory {
private final boolean shardSyncFirstAttemptFailure;
private final boolean shouldReturnDefaultShardSyncTaskmanager;
@Override
public LeaseCoordinator createLeaseCoordinator(MetricsFactory metricsFactory) {
return leaseCoordinator;
@ -522,6 +633,19 @@ public class SchedulerTest {
@Override
public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory,
StreamConfig streamConfig) {
if(shouldReturnDefaultShardSyncTaskmanager) {
return shardSyncTaskManager;
}
final ShardSyncTaskManager shardSyncTaskManager = mock(ShardSyncTaskManager.class);
final ShardDetector shardDetector = mock(ShardDetector.class);
shardSyncTaskManagerMap.put(streamConfig.streamIdentifier(), shardSyncTaskManager);
shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
if(shardSyncFirstAttemptFailure) {
when(shardDetector.listShards())
.thenThrow(new RuntimeException("Service Exception"))
.thenReturn(Collections.EMPTY_LIST);
}
return shardSyncTaskManager;
}
@ -537,7 +661,7 @@ public class SchedulerTest {
@Override
public ShardDetector createShardDetector(StreamConfig streamConfig) {
return shardDetector;
return shardDetectorMap.get(streamConfig.streamIdentifier());
}
}

View file

@ -55,6 +55,7 @@ import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
import software.amazon.kinesis.leases.exceptions.DependencyException;
@ -74,6 +75,10 @@ public class HierarchicalShardSyncerTest {
private static final int EXPONENT = 128;
private static final String LEASE_OWNER = "TestOwnere";
private static final MetricsScope SCOPE = new NullMetricsScope();
private static final boolean MULTISTREAM_MODE_ON = true;
private static final String STREAM_IDENTIFIER = "acc:stream:1";
private static final HierarchicalShardSyncer.MultiStreamArgs MULTI_STREAM_ARGS = new HierarchicalShardSyncer.MultiStreamArgs(
MULTISTREAM_MODE_ON, StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER));
private final boolean cleanupLeasesOfCompletedShards = true;
private final boolean ignoreUnexpectedChildShards = false;
@ -95,6 +100,11 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer = new HierarchicalShardSyncer();
}
private void setupMultiStream() {
hierarchicalShardSyncer = new HierarchicalShardSyncer(true);
when(shardDetector.streamIdentifier()).thenReturn(StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER));
}
/**
* Test determineNewLeasesToCreate() where there are no shards
*/
@ -107,6 +117,18 @@ public class HierarchicalShardSyncerTest {
equalTo(true));
}
/**
* Test determineNewLeasesToCreate() where there are no shards for MultiStream
*/
@Test public void testDetermineNewLeasesToCreateNoShardsForMultiStream() {
final List<Shard> shards = Collections.emptyList();
final List<Lease> leases = Collections.emptyList();
assertThat(HierarchicalShardSyncer
.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS)
.isEmpty(), equalTo(true));
}
/**
* Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed
*/
@ -129,6 +151,29 @@ public class HierarchicalShardSyncerTest {
assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds));
}
/**
* Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed
*/
@Test
public void testDetermineNewLeasesToCreate0Leases0ReshardsForMultiStream() {
final String shardId0 = "shardId-0";
final String shardId1 = "shardId-1";
final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
final List<Lease> currentLeases = Collections.emptyList();
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1)));
assertThat(newLeases.size(), equalTo(expectedLeaseIds.size()));
assertThat(newLeaseKeys, equalTo(expectedLeaseIds));
}
/**
* Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but
* one of the shards was marked as inconsistent.
@ -155,6 +200,33 @@ public class HierarchicalShardSyncerTest {
assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds));
}
/**
* Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but
* one of the shards was marked as inconsistent.
*/
@Test
public void testDetermineNewLeasesToCreate0Leases0Reshards1InconsistentMultiStream() {
final String shardId0 = "shardId-0";
final String shardId1 = "shardId-1";
final String shardId2 = "shardId-2";
final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange));
final List<Lease> currentLeases = Collections.emptyList();
final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseShardIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1)));
assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size()));
assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds));
}
/**
* Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream)
*/
@ -208,6 +280,45 @@ public class HierarchicalShardSyncerTest {
}
@Test
public void testCheckAndCreateLeasesForShardsIfMissingAtLatestMultiStream() throws Exception {
final List<Shard> shards = constructShardListForGraphA();
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
setupMultiStream();
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE);
final Set<String> expectedShardIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")));
final List<Lease> requestLeases = leaseCaptor.getAllValues();
final Set<String> requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet());
assertThat(requestLeases.size(), equalTo(expectedShardIds.size()));
assertThat(requestLeaseKeys, equalTo(expectedShardIds));
assertThat(extendedSequenceNumbers.size(), equalTo(1));
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector).listShards();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
private List<String> toMultiStreamLeaseList(List<String> shardIdBasedLeases) {
return shardIdBasedLeases.stream().map(s -> STREAM_IDENTIFIER + ":" + s)
.collect(Collectors.toList());
}
/**
* Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards()
* should never be called.
@ -244,6 +355,42 @@ public class HierarchicalShardSyncerTest {
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
/**
* Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards()
* should never be called.
*/
@Test
public void testCheckAndCreateLeasesForShardsWithShardListMultiStream() throws Exception {
final List<Shard> latestShards = constructShardListForGraphA();
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(latestShards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
setupMultiStream();
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE, latestShards);
final Set<String> expectedShardIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")));
final List<Lease> requestLeases = leaseCaptor.getAllValues();
final Set<String> requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet());
assertThat(requestLeases.size(), equalTo(expectedShardIds.size()));
assertThat(requestLeaseKeys, equalTo(expectedShardIds));
assertThat(extendedSequenceNumbers.size(), equalTo(1));
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector, never()).listShards();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
/**
* Test checkAndCreateLeaseForNewShards with an empty list of shards. In this scenario, shardDetector.listShards()
* should never be called.
@ -306,6 +453,26 @@ public class HierarchicalShardSyncerTest {
}
}
@Test(expected = KinesisClientLibIOException.class)
public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenForMultiStream() throws Exception {
final List<Shard> shards = new ArrayList<>(constructShardListForGraphA());
final SequenceNumberRange range = shards.get(0).sequenceNumberRange().toBuilder().endingSequenceNumber(null)
.build();
final Shard shard = shards.get(3).toBuilder().sequenceNumberRange(range).build();
shards.remove(3);
shards.add(3, shard);
when(shardDetector.listShards()).thenReturn(shards);
setupMultiStream();
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, false, SCOPE);
} finally {
verify(shardDetector).listShards();
verify(dynamoDBLeaseRefresher, never()).listLeases();
}
}
/**
* Test checkAndCreateLeasesForNewShards() when a parent is open and children of open parents are being ignored.
*/
@ -354,6 +521,51 @@ public class HierarchicalShardSyncerTest {
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
@Test
public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringInconsistentChildrenMultiStream() throws Exception {
final List<Shard> shards = new ArrayList<>(constructShardListForGraphA());
final Shard shard = shards.get(5);
assertThat(shard.shardId(), equalTo("shardId-5"));
shards.remove(5);
// shardId-5 in graph A has two children (shardId-9 and shardId-10). if shardId-5
// is not closed, those children should be ignored when syncing shards, no leases
// should be obtained for them, and we should obtain a lease on the still-open
// parent.
shards.add(5,
shard.toBuilder()
.sequenceNumberRange(shard.sequenceNumberRange().toBuilder().endingSequenceNumber(null).build())
.build());
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
setupMultiStream();
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, true, SCOPE);
final List<Lease> leases = leaseCaptor.getAllValues();
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> leaseSequenceNumbers = leases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet());
final Set<String> expectedShardIds = new HashSet<>(toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-5", "shardId-8")));
assertThat(leaseKeys.size(), equalTo(expectedShardIds.size()));
assertThat(leaseKeys, equalTo(expectedShardIds));
assertThat(leaseSequenceNumbers.size(), equalTo(1));
leaseSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector).listShards();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
@Test
public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShard() throws Exception {
testCheckAndCreateLeasesForNewShardsAndClosedShard(ExtendedSequenceNumber.TRIM_HORIZON,
@ -711,6 +923,11 @@ public class HierarchicalShardSyncerTest {
return createLeasesFromShards(Collections.singletonList(shard), checkpoint, leaseOwner).get(0);
}
private MultiStreamLease createMultiStreamLeaseFromShard(final Shard shard, final ExtendedSequenceNumber checkpoint,
final String leaseOwner) {
return createMultiStreamLeasesFromShards(Collections.singletonList(shard), checkpoint, leaseOwner).get(0);
}
private List<Lease> createLeasesFromShards(final List<Shard> shards, final ExtendedSequenceNumber checkpoint,
final String leaseOwner) {
return shards.stream().map(shard -> {
@ -726,6 +943,29 @@ public class HierarchicalShardSyncerTest {
}).collect(Collectors.toList());
}
private List<MultiStreamLease> createMultiStreamLeasesFromShards(final List<Shard> shards, final ExtendedSequenceNumber checkpoint,
final String leaseOwner) {
return shards.stream().map(shard -> {
final Set<String> parentShardIds = new HashSet<>();
if (StringUtils.isNotEmpty(shard.parentShardId())) {
parentShardIds.add(shard.parentShardId());
}
if (StringUtils.isNotEmpty(shard.adjacentParentShardId())) {
parentShardIds.add(shard.adjacentParentShardId());
}
final MultiStreamLease msLease = new MultiStreamLease();
msLease.shardId(shard.shardId());
msLease.leaseOwner(leaseOwner);
msLease.leaseCounter(0L);
msLease.concurrencyToken(UUID.randomUUID());
msLease.lastCounterIncrementNanos(0L);
msLease.checkpoint(checkpoint);
msLease.parentShardIds(parentShardIds);
msLease.streamIdentifier(STREAM_IDENTIFIER);
return msLease;
}).collect(Collectors.toList());
}
@Test
public void testCleanUpGarbageLeaseForNonExistentShard() throws Exception {
final List<Shard> shards = constructShardListForGraphA();
@ -755,6 +995,35 @@ public class HierarchicalShardSyncerTest {
verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
}
@Test
public void testCleanUpGarbageLeaseForNonExistentShardForMultiStream() throws Exception {
final List<Shard> shards = constructShardListForGraphA();
final String garbageShardId = "shardId-garbage-001";
final Shard garbageShard = ShardObjectHelper.newShard(garbageShardId, null, null,
ShardObjectHelper.newSequenceNumberRange("101", null));
final Lease garbageLease = createMultiStreamLeaseFromShard(garbageShard, new ExtendedSequenceNumber("99"), LEASE_OWNER);
final List<Lease> leases = new ArrayList<>(
createMultiStreamLeasesFromShards(shards, ExtendedSequenceNumber.TRIM_HORIZON, LEASE_OWNER));
leases.add(garbageLease);
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(leases);
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture());
setupMultiStream();
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
assertThat(leaseCaptor.getAllValues().size(), equalTo(1));
assertThat(leaseCaptor.getValue(), equalTo(garbageLease));
verify(shardDetector, times(2)).listShards();
verify(dynamoDBLeaseRefresher).listLeases();
verify(dynamoDBLeaseRefresher).deleteLease(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
}
private void testCheckAndCreateLeasesForShardsIfMissing(InitialPositionInStreamExtended initialPosition)
throws Exception {
final String shardId0 = "shardId-0";

View file

@ -57,8 +57,6 @@ import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import javax.swing.*;
@RunWith(MockitoJUnitRunner.class)
public class ConsumerStatesTest {
private static final String STREAM_NAME = "TestStream";
@ -115,7 +113,7 @@ public class ConsumerStatesTest {
@Before
public void setup() {
argument = new ShardConsumerArgument(shardInfo, StreamIdentifier.fromStreamName(STREAM_NAME), leaseCoordinator, executorService, recordsPublisher,
argument = new ShardConsumerArgument(shardInfo, StreamIdentifier.singleStreamInstance(STREAM_NAME), leaseCoordinator, executorService, recordsPublisher,
shardRecordProcessor, checkpointer, recordProcessorCheckpointer, parentShardPollIntervalMillis,
taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis,
maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis,

View file

@ -31,7 +31,6 @@ import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.metrics.MetricsFactory;
@ -67,7 +66,8 @@ public class FanOutConfigTest {
.streamName(TEST_STREAM_NAME);
RetrievalFactory retrievalFactory = config.retrievalFactory();
ShardInfo shardInfo = mock(ShardInfo.class);
doReturn(Optional.of(StreamIdentifier.fromStreamName(TEST_STREAM_NAME).toString())).when(shardInfo).streamIdentifier();
// doReturn(Optional.of(StreamIdentifier.singleStreamInstance(TEST_STREAM_NAME).serialize())).when(shardInfo).streamIdentifier();
doReturn(Optional.empty()).when(shardInfo).streamIdentifierSerOpt();
retrievalFactory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class));
assertThat(retrievalFactory, not(nullValue()));
verify(consumerRegistration).getOrCreateStreamConsumerArn();
@ -93,7 +93,7 @@ public class FanOutConfigTest {
.streamName(TEST_STREAM_NAME);
RetrievalFactory factory = config.retrievalFactory();
ShardInfo shardInfo = mock(ShardInfo.class);
doReturn(Optional.of(StreamIdentifier.fromStreamName(TEST_STREAM_NAME).toString())).when(shardInfo).streamIdentifier();
doReturn(Optional.empty()).when(shardInfo).streamIdentifierSerOpt();
factory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class));
assertThat(factory, not(nullValue()));
@ -108,7 +108,7 @@ public class FanOutConfigTest {
.streamName(TEST_STREAM_NAME);
RetrievalFactory factory = config.retrievalFactory();
ShardInfo shardInfo = mock(ShardInfo.class);
doReturn(Optional.of(StreamIdentifier.fromStreamName(TEST_STREAM_NAME).toString())).when(shardInfo).streamIdentifier();
doReturn(Optional.empty()).when(shardInfo).streamIdentifierSerOpt();
factory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class));
assertThat(factory, not(nullValue()));
TestingConfig testingConfig = (TestingConfig) config;
@ -122,7 +122,7 @@ public class FanOutConfigTest {
.consumerName(TEST_CONSUMER_NAME).streamName(TEST_STREAM_NAME);
RetrievalFactory factory = config.retrievalFactory();
ShardInfo shardInfo = mock(ShardInfo.class);
doReturn(Optional.of(StreamIdentifier.fromStreamName(TEST_STREAM_NAME).toString())).when(shardInfo).streamIdentifier();
doReturn(Optional.empty()).when(shardInfo).streamIdentifierSerOpt();
factory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class));
assertThat(factory, not(nullValue()));