Added Unit Test cases, code comments and other code refactoring

This commit is contained in:
Ashwin Giridharan 2020-03-13 01:36:33 -07:00
parent 8be8d7a62b
commit 726e10c49e
20 changed files with 562 additions and 82 deletions

View file

@ -3,41 +3,67 @@ package software.amazon.kinesis.common;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.Getter; import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.Validate;
import java.util.Optional;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@RequiredArgsConstructor @EqualsAndHashCode @Getter @Accessors(fluent = true)
@EqualsAndHashCode
@Getter
@Accessors(fluent = true)
public class StreamIdentifier { public class StreamIdentifier {
private final String accountName; private final Optional<String> accountIdOptional;
private final String streamName; private final String streamName;
private final Long streamCreationEpoch; private final Optional<Long> streamCreationEpochOptional;
private static final String DEFAULT_ACCOUNT = "default";
private static final String DELIMITER = ":"; private static final String DELIMITER = ":";
private static final Pattern PATTERN = Pattern.compile(".*" + ":" + ".*" + ":" + "[0-9]*"); private static final Pattern PATTERN = Pattern.compile(".*" + ":" + ".*" + ":" + "[0-9]*");
private StreamIdentifier(Optional<String> accountIdOptional, String streamName,
Optional<Long> streamCreationEpochOptional) {
Validate.isTrue((accountIdOptional.isPresent() && streamCreationEpochOptional.isPresent()) ||
(!accountIdOptional.isPresent() && !streamCreationEpochOptional.isPresent()),
"AccountId and StreamCreationEpoch must either be present together or not");
this.accountIdOptional = accountIdOptional;
this.streamName = streamName;
this.streamCreationEpochOptional = streamCreationEpochOptional;
}
/**
* Serialize the current StreamIdentifier instance.
* @return
*/
public String serialize() {
return accountIdOptional.isPresent() ?
Joiner.on(DELIMITER).join(accountIdOptional.get(), streamName, streamCreationEpochOptional.get()) :
streamName;
}
@Override @Override
public String toString() { public String toString() {
return Joiner.on(DELIMITER).join(accountName, streamName, streamCreationEpoch); return serialize();
} }
public static StreamIdentifier fromString(String streamIdentifier) { /**
if (PATTERN.matcher(streamIdentifier).matches()) { * Create a multi stream instance for StreamIdentifier from serialized stream identifier.
final String[] split = streamIdentifier.split(DELIMITER); * @param streamIdentifierSer
return new StreamIdentifier(split[0], split[1], Long.parseLong(split[2])); * @return StreamIdentifier
*/
public static StreamIdentifier multiStreamInstance(String streamIdentifierSer) {
if (PATTERN.matcher(streamIdentifierSer).matches()) {
final String[] split = streamIdentifierSer.split(DELIMITER);
return new StreamIdentifier(Optional.of(split[0]), split[1], Optional.of(Long.parseLong(split[2])));
} else { } else {
throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifier); throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifierSer);
} }
} }
public static StreamIdentifier fromStreamName(String streamName) { /**
* Create a single stream instance for StreamIdentifier from stream name.
* @param streamName
* @return StreamIdentifier
*/
public static StreamIdentifier singleStreamInstance(String streamName) {
Validate.notEmpty(streamName, "StreamName should not be empty"); Validate.notEmpty(streamName, "StreamName should not be empty");
return new StreamIdentifier(DEFAULT_ACCOUNT, streamName, 0L); return new StreamIdentifier(Optional.empty(), streamName, Optional.empty());
} }
} }

View file

@ -42,7 +42,6 @@ import lombok.NoArgsConstructor;
import lombok.NonNull; import lombok.NonNull;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.utils.Either;
import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
@ -295,9 +294,10 @@ public class Scheduler implements Runnable {
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
// TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing // TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing
// TODO: for already synced streams // TODO: for already synced streams
for(StreamIdentifier streamIdentifier : currentStreamConfigMap.keySet().stream().collect(Collectors.toList())) { for(Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) {
log.info("Syncing Kinesis shard info"); final StreamIdentifier streamIdentifier = streamConfigEntry.getKey();
final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier); log.info("Syncing Kinesis shard info for " + streamIdentifier);
final StreamConfig streamConfig = streamConfigEntry.getValue();
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamIdentifier), ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetectorProvider.apply(streamIdentifier),
leaseRefresher, streamConfig.initialPositionInStreamExtended(), leaseRefresher, streamConfig.initialPositionInStreamExtended(),
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L,
@ -363,7 +363,7 @@ public class Scheduler implements Runnable {
} }
for (ShardInfo completedShard : completedShards) { for (ShardInfo completedShard : completedShards) {
final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifier()); final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifierSerOpt());
if (createOrGetShardSyncTaskManager(streamIdentifier).syncShardAndLeaseInfo()) { if (createOrGetShardSyncTaskManager(streamIdentifier).syncShardAndLeaseInfo()) {
log.info("Found completed shard, initiated new ShardSyncTak for " + completedShard.toString()); log.info("Found completed shard, initiated new ShardSyncTak for " + completedShard.toString());
} }
@ -635,7 +635,7 @@ public class Scheduler implements Runnable {
checkpoint); checkpoint);
// The only case where streamName is not available will be when multistreamtracker not set. In this case, // The only case where streamName is not available will be when multistreamtracker not set. In this case,
// get the default stream name for the single stream application. // get the default stream name for the single stream application.
final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifier()); final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifierSerOpt());
// Irrespective of single stream app or multi stream app, streamConfig should always be available. // Irrespective of single stream app or multi stream app, streamConfig should always be available.
final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier); final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier);
Validate.notNull(streamConfig, "StreamConfig should not be empty"); Validate.notNull(streamConfig, "StreamConfig should not be empty");
@ -712,7 +712,7 @@ public class Scheduler implements Runnable {
private StreamIdentifier getStreamIdentifier(Optional<String> streamIdentifierString) { private StreamIdentifier getStreamIdentifier(Optional<String> streamIdentifierString) {
final StreamIdentifier streamIdentifier; final StreamIdentifier streamIdentifier;
if(streamIdentifierString.isPresent()) { if(streamIdentifierString.isPresent()) {
streamIdentifier = StreamIdentifier.fromString(streamIdentifierString.get()); streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierString.get());
} else { } else {
Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode"); Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode");
streamIdentifier = this.currentStreamConfigMap.values().iterator().next().streamIdentifier(); streamIdentifier = this.currentStreamConfigMap.values().iterator().next().streamIdentifier();

View file

@ -29,6 +29,7 @@ import java.util.function.BiFunction;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import lombok.Data; import lombok.Data;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -162,7 +163,7 @@ public class HierarchicalShardSyncer {
throws DependencyException, ProvisionedThroughputException, InvalidStateException { throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<Lease> streamLeases = new ArrayList<>(); List<Lease> streamLeases = new ArrayList<>();
for (Lease lease : leaseRefresher.listLeases()) { for (Lease lease : leaseRefresher.listLeases()) {
if (streamIdentifier.toString().equals(((MultiStreamLease)lease).streamIdentifier())) { if (streamIdentifier.serialize().equals(((MultiStreamLease)lease).streamIdentifier())) {
streamLeases.add(lease); streamLeases.add(lease);
} }
} }
@ -761,7 +762,7 @@ public class HierarchicalShardSyncer {
private static Lease newKCLMultiStreamLease(final Shard shard, final StreamIdentifier streamIdentifier) { private static Lease newKCLMultiStreamLease(final Shard shard, final StreamIdentifier streamIdentifier) {
MultiStreamLease newLease = new MultiStreamLease(); MultiStreamLease newLease = new MultiStreamLease();
newLease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.toString(), shard.shardId())); newLease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), shard.shardId()));
List<String> parentShardIds = new ArrayList<>(2); List<String> parentShardIds = new ArrayList<>(2);
if (shard.parentShardId() != null) { if (shard.parentShardId() != null) {
parentShardIds.add(shard.parentShardId()); parentShardIds.add(shard.parentShardId());
@ -771,7 +772,7 @@ public class HierarchicalShardSyncer {
} }
newLease.parentShardIds(parentShardIds); newLease.parentShardIds(parentShardIds);
newLease.ownerSwitchesSinceCheckpoint(0L); newLease.ownerSwitchesSinceCheckpoint(0L);
newLease.streamIdentifier(streamIdentifier.toString()); newLease.streamIdentifier(streamIdentifier.serialize());
newLease.shardId(shard.shardId()); newLease.shardId(shard.shardId());
return newLease; return newLease;
} }
@ -857,7 +858,8 @@ public class HierarchicalShardSyncer {
@Data @Data
@Accessors(fluent = true) @Accessors(fluent = true)
private static class MultiStreamArgs { @VisibleForTesting
static class MultiStreamArgs {
private final Boolean isMultiStreamMode; private final Boolean isMultiStreamMode;
private final StreamIdentifier streamIdentifier; private final StreamIdentifier streamIdentifier;
} }

View file

@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function; import java.util.function.Function;
@ -78,7 +77,7 @@ public class KinesisShardDetector implements ShardDetector {
public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis, public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload, int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload,
int cacheMissWarningModulus) { int cacheMissWarningModulus) {
this(kinesisClient, StreamIdentifier.fromStreamName(streamName), listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, this(kinesisClient, StreamIdentifier.singleStreamInstance(streamName), listShardsBackoffTimeInMillis, maxListShardsRetryAttempts,
listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, cacheMissWarningModulus, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, cacheMissWarningModulus,
LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT); LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
} }

View file

@ -273,6 +273,12 @@ public class LeaseManagementConfig {
return hierarchicalShardSyncer; return hierarchicalShardSyncer;
} }
/**
* Vends HierarchicalShardSyncer based on MultiStreamingMode. With MultiStreamMode shard syncer creates
* leases to accommodate more than one stream.
* @param isMultiStreamingMode
* @return HierarchicalShardSyncer
*/
public HierarchicalShardSyncer hierarchicalShardSyncer(boolean isMultiStreamingMode) { public HierarchicalShardSyncer hierarchicalShardSyncer(boolean isMultiStreamingMode) {
if(hierarchicalShardSyncer == null) { if(hierarchicalShardSyncer == null) {
hierarchicalShardSyncer = new HierarchicalShardSyncer(isMultiStreamingMode); hierarchicalShardSyncer = new HierarchicalShardSyncer(isMultiStreamingMode);
@ -313,6 +319,12 @@ public class LeaseManagementConfig {
return leaseManagementFactory; return leaseManagementFactory;
} }
/**
* Vends LeaseManagementFactory that performs serde based on leaseSerializer and shard sync based on isMultiStreamingMode
* @param leaseSerializer
* @param isMultiStreamingMode
* @return LeaseManagementFactory
*/
public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer, boolean isMultiStreamingMode) { public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer, boolean isMultiStreamingMode) {
if(leaseManagementFactory == null) { if(leaseManagementFactory == null) {
leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(), leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(),
@ -345,17 +357,14 @@ public class LeaseManagementConfig {
return leaseManagementFactory; return leaseManagementFactory;
} }
/**
* Set leaseManagementFactory and return the current LeaseManagementConfig instance.
* @param leaseManagementFactory
* @return LeaseManagementConfig
*/
public LeaseManagementConfig leaseManagementFactory(final LeaseManagementFactory leaseManagementFactory) { public LeaseManagementConfig leaseManagementFactory(final LeaseManagementFactory leaseManagementFactory) {
this.leaseManagementFactory = leaseManagementFactory; this.leaseManagementFactory = leaseManagementFactory;
return this; return this;
} }
// private InitialPositionInStreamExtended getInitialPositionExtendedForStream(String streamName) {
// return multiStreamTracker() == null ?
// initialPositionInStream() :
// multiStreamTracker().initialPositionInStreamExtended(streamName) == null ?
// initialPositionInStream() :
// multiStreamTracker().initialPositionInStreamExtended(streamName);
// }
} }

View file

@ -37,7 +37,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ToString @ToString
public class ShardInfo { public class ShardInfo {
private final Optional<String> streamIdentifier; private final Optional<String> streamIdentifierSerOpt;
private final String shardId; private final String shardId;
private final String concurrencyToken; private final String concurrencyToken;
// Sorted list of parent shardIds. // Sorted list of parent shardIds.
@ -63,11 +63,20 @@ public class ShardInfo {
this(shardId, concurrencyToken, parentShardIds, checkpoint, null); this(shardId, concurrencyToken, parentShardIds, checkpoint, null);
} }
/**
* Creates a new ShardInfo object that has an option to pass a serialized streamIdentifier.
* The checkpoint is not part of the equality, but is used for debugging output.
* @param shardId
* @param concurrencyToken
* @param parentShardIds
* @param checkpoint
* @param streamIdentifierSer
*/
public ShardInfo(@NonNull final String shardId, public ShardInfo(@NonNull final String shardId,
final String concurrencyToken, final String concurrencyToken,
final Collection<String> parentShardIds, final Collection<String> parentShardIds,
final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber checkpoint,
final String streamIdentifier) { final String streamIdentifierSer) {
this.shardId = shardId; this.shardId = shardId;
this.concurrencyToken = concurrencyToken; this.concurrencyToken = concurrencyToken;
this.parentShardIds = new LinkedList<>(); this.parentShardIds = new LinkedList<>();
@ -78,7 +87,7 @@ public class ShardInfo {
// This makes it easy to check for equality in ShardInfo.equals method. // This makes it easy to check for equality in ShardInfo.equals method.
Collections.sort(this.parentShardIds); Collections.sort(this.parentShardIds);
this.checkpoint = checkpoint; this.checkpoint = checkpoint;
this.streamIdentifier = Optional.ofNullable(streamIdentifier); this.streamIdentifierSerOpt = Optional.ofNullable(streamIdentifierSer);
} }
/** /**
@ -105,7 +114,7 @@ public class ShardInfo {
@Override @Override
public int hashCode() { public int hashCode() {
return new HashCodeBuilder() return new HashCodeBuilder()
.append(concurrencyToken).append(parentShardIds).append(shardId).append(streamIdentifier.orElse("")).toHashCode(); .append(concurrencyToken).append(parentShardIds).append(shardId).append(streamIdentifierSerOpt.orElse("")).toHashCode();
} }
/** /**
@ -130,18 +139,18 @@ public class ShardInfo {
ShardInfo other = (ShardInfo) obj; ShardInfo other = (ShardInfo) obj;
return new EqualsBuilder().append(concurrencyToken, other.concurrencyToken) return new EqualsBuilder().append(concurrencyToken, other.concurrencyToken)
.append(parentShardIds, other.parentShardIds).append(shardId, other.shardId) .append(parentShardIds, other.parentShardIds).append(shardId, other.shardId)
.append(streamIdentifier.orElse(""), other.streamIdentifier.orElse("")).isEquals(); .append(streamIdentifierSerOpt.orElse(""), other.streamIdentifierSerOpt.orElse("")).isEquals();
} }
/** /**
* * Utility method to derive lease key from ShardInfo
* @param shardInfo * @param shardInfo
* @return * @return lease key
*/ */
public static String getLeaseKey(ShardInfo shardInfo) { public static String getLeaseKey(ShardInfo shardInfo) {
return shardInfo.streamIdentifier().isPresent() ? return shardInfo.streamIdentifierSerOpt().isPresent() ?
MultiStreamLease.getLeaseKey(shardInfo.streamIdentifier().get(), shardInfo.shardId()) : MultiStreamLease.getLeaseKey(shardInfo.streamIdentifierSerOpt().get(), shardInfo.shardId()) :
shardInfo.shardId(); shardInfo.shardId();
} }

View file

@ -378,7 +378,11 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
return leases.stream().map(DynamoDBLeaseCoordinator::convertLeaseToAssignment).collect(Collectors.toList()); return leases.stream().map(DynamoDBLeaseCoordinator::convertLeaseToAssignment).collect(Collectors.toList());
} }
// TODO : Halo : Check for better way /**
* Utility method to convert the basic lease or multistream lease to ShardInfo
* @param lease
* @return ShardInfo
*/
public static ShardInfo convertLeaseToAssignment(final Lease lease) { public static ShardInfo convertLeaseToAssignment(final Lease lease) {
if (lease instanceof MultiStreamLease) { if (lease instanceof MultiStreamLease) {
return new ShardInfo(((MultiStreamLease) lease).shardId(), lease.concurrencyToken().toString(), lease.parentShardIds(), return new ShardInfo(((MultiStreamLease) lease).shardId(), lease.concurrencyToken().toString(), lease.parentShardIds(),

View file

@ -16,9 +16,6 @@
package software.amazon.kinesis.leases.dynamodb; package software.amazon.kinesis.leases.dynamodb;
import java.time.Duration; import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import lombok.Data; import lombok.Data;
@ -26,8 +23,6 @@ import lombok.NonNull;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamConfig;
@ -331,7 +326,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode) { Duration dynamoDbRequestTimeout, BillingMode billingMode) {
this(kinesisClient, new StreamConfig(StreamIdentifier.fromStreamName(streamName), initialPositionInStream), dynamoDBClient, tableName, this(kinesisClient, new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream), dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
@ -391,6 +386,35 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.streamConfig = streamConfig; this.streamConfig = streamConfig;
} }
/**
* Constructor.
* @param kinesisClient
* @param dynamoDBClient
* @param tableName
* @param workerIdentifier
* @param executorService
* @param failoverTimeMillis
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
* @param maxLeaseRenewalThreads
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIntervalMillis
* @param consistentReads
* @param listShardsBackoffTimeMillis
* @param maxListShardsRetryAttempts
* @param maxCacheMissesBeforeReload
* @param listShardsCacheAllowedAgeInSeconds
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param leaseSerializer
*/
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis, final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis,
@ -457,6 +481,12 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
metricsFactory); metricsFactory);
} }
/**
* Create ShardSyncTaskManager from the streamConfig passed
* @param metricsFactory
* @param streamConfig
* @return ShardSyncTaskManager
*/
@Override @Override
public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) { public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) {
return new ShardSyncTaskManager(this.createShardDetector(streamConfig), return new ShardSyncTaskManager(this.createShardDetector(streamConfig),
@ -476,7 +506,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
tableCreatorCallback, dynamoDbRequestTimeout, billingMode); tableCreatorCallback, dynamoDbRequestTimeout, billingMode);
} }
@Override @Deprecated @Override
@Deprecated
public ShardDetector createShardDetector() { public ShardDetector createShardDetector() {
return new KinesisShardDetector(kinesisClient, streamConfig.streamIdentifier(), return new KinesisShardDetector(kinesisClient, streamConfig.streamIdentifier(),
listShardsBackoffTimeMillis, maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds,
@ -487,7 +518,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* KinesisShardDetector supports reading from service only using streamName. Support for accountId and * KinesisShardDetector supports reading from service only using streamName. Support for accountId and
* stream creation epoch is yet to be provided. * stream creation epoch is yet to be provided.
* @param streamConfig * @param streamConfig
* @return * @return ShardDetector
*/ */
@Override @Override
public ShardDetector createShardDetector(StreamConfig streamConfig) { public ShardDetector createShardDetector(StreamConfig streamConfig) {

View file

@ -12,9 +12,9 @@ import java.util.Map;
public interface MultiStreamTracker { public interface MultiStreamTracker {
/** /**
* Returns the map of streams and its associated stream specific config. * Returns the list of stream config, to be processed by the current application.
* *
* @return List of stream names * @return List of StreamConfig
*/ */
List<StreamConfig> streamConfigList(); List<StreamConfig> streamConfigList();
} }

View file

@ -29,7 +29,7 @@ public interface ShardRecordProcessorFactory {
ShardRecordProcessor shardRecordProcessor(); ShardRecordProcessor shardRecordProcessor();
/** /**
* Returns a new instance of the ShardRecordProcessor for a stream * Returns a new instance of the ShardRecordProcessor for a stream identifier
* @param streamIdentifier * @param streamIdentifier
* @return ShardRecordProcessor * @return ShardRecordProcessor
*/ */

View file

@ -94,7 +94,7 @@ public class RetrievalConfig {
@NonNull String applicationName) { @NonNull String applicationName) {
this.kinesisClient = kinesisAsyncClient; this.kinesisClient = kinesisAsyncClient;
this.appStreamTracker = Either this.appStreamTracker = Either
.right(new StreamConfig(StreamIdentifier.fromStreamName(streamName), initialPositionInStreamExtended)); .right(new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStreamExtended));
this.applicationName = applicationName; this.applicationName = applicationName;
} }

View file

@ -85,6 +85,7 @@ public class FanOutConfig implements RetrievalSpecificConfig {
return new FanOutRetrievalFactory(kinesisClient, streamName, this::getOrCreateConsumerArn); return new FanOutRetrievalFactory(kinesisClient, streamName, this::getOrCreateConsumerArn);
} }
// TODO : Halo. Need Stream Specific ConsumerArn to be passed from Customer
private String getOrCreateConsumerArn(String streamName) { private String getOrCreateConsumerArn(String streamName) {
if (consumerArn != null) { if (consumerArn != null) {
return consumerArn; return consumerArn;

View file

@ -49,10 +49,10 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
@Override @Override
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo, public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
final MetricsFactory metricsFactory) { final MetricsFactory metricsFactory) {
final Optional<String> streamIdentifierStr = shardInfo.streamIdentifier(); final Optional<String> streamIdentifierStr = shardInfo.streamIdentifierSerOpt();
final String streamName; final String streamName;
if(streamIdentifierStr.isPresent()) { if(streamIdentifierStr.isPresent()) {
streamName = StreamIdentifier.fromString(streamIdentifierStr.get()).streamName(); streamName = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get()).streamName();
} else { } else {
streamName = defaultStreamName; streamName = defaultStreamName;
} }

View file

@ -74,10 +74,18 @@ public class KinesisDataFetcher {
@Deprecated @Deprecated
public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory) { public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory) {
this(kinesisClient, StreamIdentifier.fromStreamName(streamName), shardId, maxRecords, metricsFactory, PollingConfig.DEFAULT_REQUEST_TIMEOUT); this(kinesisClient, StreamIdentifier.singleStreamInstance(streamName), shardId, maxRecords, metricsFactory, PollingConfig.DEFAULT_REQUEST_TIMEOUT);
} }
// Changing the constructor directly as this is an internal API /**
* Constructs KinesisDataFetcher.
* @param kinesisClient
* @param streamIdentifier
* @param shardId
* @param maxRecords
* @param metricsFactory
* @param maxFutureWait
*/
public KinesisDataFetcher(KinesisAsyncClient kinesisClient, StreamIdentifier streamIdentifier, String shardId, int maxRecords, MetricsFactory metricsFactory, Duration maxFutureWait) { public KinesisDataFetcher(KinesisAsyncClient kinesisClient, StreamIdentifier streamIdentifier, String shardId, int maxRecords, MetricsFactory metricsFactory, Duration maxFutureWait) {
this.kinesisClient = kinesisClient; this.kinesisClient = kinesisClient;
this.streamIdentifier = streamIdentifier; this.streamIdentifier = streamIdentifier;

View file

@ -63,9 +63,9 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
@Override @Override
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
@NonNull final MetricsFactory metricsFactory) { @NonNull final MetricsFactory metricsFactory) {
final StreamIdentifier streamIdentifier = shardInfo.streamIdentifier().isPresent() ? final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ?
StreamIdentifier.fromString(shardInfo.streamIdentifier().get()) : StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) :
StreamIdentifier.fromStreamName(streamName); StreamIdentifier.singleStreamInstance(streamName);
return new SynchronousGetRecordsRetrievalStrategy( return new SynchronousGetRecordsRetrievalStrategy(
new KinesisDataFetcher(kinesisClient, streamIdentifier, shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout)); new KinesisDataFetcher(kinesisClient, streamIdentifier, shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout));
} }

View file

@ -68,9 +68,9 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory
@Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
@NonNull final MetricsFactory metricsFactory) { @NonNull final MetricsFactory metricsFactory) {
final StreamIdentifier streamIdentifier = shardInfo.streamIdentifier().isPresent() ? final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ?
StreamIdentifier.fromString(shardInfo.streamIdentifier().get()) : StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) :
StreamIdentifier.fromStreamName(streamName); StreamIdentifier.singleStreamInstance(streamName);
return new SynchronousGetRecordsRetrievalStrategy( return new SynchronousGetRecordsRetrievalStrategy(
new KinesisDataFetcher(kinesisClient, streamIdentifier, shardInfo.shardId(), new KinesisDataFetcher(kinesisClient, streamIdentifier, shardInfo.shardId(),
maxRecords, metricsFactory, maxFutureWait)); maxRecords, metricsFactory, maxFutureWait));

View file

@ -22,8 +22,10 @@ import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same; import static org.mockito.Matchers.same;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -32,14 +34,20 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.atMost;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Collectors;
import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.plugins.RxJavaPlugins;
import lombok.RequiredArgsConstructor;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -52,8 +60,11 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.CheckpointFactory; import software.amazon.kinesis.checkpoint.CheckpointFactory;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.KinesisClientLibException;
import software.amazon.kinesis.exceptions.KinesisClientLibNonRetryableException; import software.amazon.kinesis.exceptions.KinesisClientLibNonRetryableException;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseManagementConfig;
@ -63,6 +74,8 @@ import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.ShardConsumer; import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput;
@ -73,6 +86,7 @@ import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor;
@ -124,6 +138,11 @@ public class SchedulerTest {
private Checkpointer checkpoint; private Checkpointer checkpoint;
@Mock @Mock
private WorkerStateChangeListener workerStateChangeListener; private WorkerStateChangeListener workerStateChangeListener;
@Mock
private MultiStreamTracker multiStreamTracker;
private Map<StreamIdentifier, ShardSyncTaskManager> shardSyncTaskManagerMap = new HashMap<>();
private Map<StreamIdentifier, ShardDetector> shardDetectorMap = new HashMap<>();
@Before @Before
public void setup() { public void setup() {
@ -132,13 +151,25 @@ public class SchedulerTest {
checkpointConfig = new CheckpointConfig().checkpointFactory(new TestKinesisCheckpointFactory()); checkpointConfig = new CheckpointConfig().checkpointFactory(new TestKinesisCheckpointFactory());
coordinatorConfig = new CoordinatorConfig(applicationName).parentShardPollIntervalMillis(100L).workerStateChangeListener(workerStateChangeListener); coordinatorConfig = new CoordinatorConfig(applicationName).parentShardPollIntervalMillis(100L).workerStateChangeListener(workerStateChangeListener);
leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName, leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName,
workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory()); workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(false, false));
lifecycleConfig = new LifecycleConfig(); lifecycleConfig = new LifecycleConfig();
metricsConfig = new MetricsConfig(cloudWatchClient, namespace); metricsConfig = new MetricsConfig(cloudWatchClient, namespace);
processorConfig = new ProcessorConfig(shardRecordProcessorFactory); processorConfig = new ProcessorConfig(shardRecordProcessorFactory);
retrievalConfig = new RetrievalConfig(kinesisClient, streamName, applicationName) retrievalConfig = new RetrievalConfig(kinesisClient, streamName, applicationName)
.retrievalFactory(retrievalFactory); .retrievalFactory(retrievalFactory);
final List<StreamConfig> streamConfigList = new ArrayList<StreamConfig>() {{
add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc1:stream1:1"), InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST)));
add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc1:stream2:2"), InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST)));
add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc2:stream1:1"), InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST)));
add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc2:stream2:3"), InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST)));
}};
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList);
when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher); when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(MetricsFactory.class))).thenReturn(recordsPublisher); when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(MetricsFactory.class))).thenReturn(recordsPublisher);
@ -245,7 +276,10 @@ public class SchedulerTest {
public final void testInitializationFailureWithRetries() throws Exception { public final void testInitializationFailureWithRetries() throws Exception {
doNothing().when(leaseCoordinator).initialize(); doNothing().when(leaseCoordinator).initialize();
when(shardDetector.listShards()).thenThrow(new RuntimeException()); when(shardDetector.listShards()).thenThrow(new RuntimeException());
leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName,
workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(false, true));
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
scheduler.run(); scheduler.run();
verify(shardDetector, times(coordinatorConfig.maxInitializationAttempts())).listShards(); verify(shardDetector, times(coordinatorConfig.maxInitializationAttempts())).listShards();
@ -255,6 +289,8 @@ public class SchedulerTest {
public final void testInitializationFailureWithRetriesWithConfiguredMaxInitializationAttempts() throws Exception { public final void testInitializationFailureWithRetriesWithConfiguredMaxInitializationAttempts() throws Exception {
final int maxInitializationAttempts = 5; final int maxInitializationAttempts = 5;
coordinatorConfig.maxInitializationAttempts(maxInitializationAttempts); coordinatorConfig.maxInitializationAttempts(maxInitializationAttempts);
leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName,
workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(false, true));
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig); metricsConfig, processorConfig, retrievalConfig);
@ -267,6 +303,76 @@ public class SchedulerTest {
verify(shardDetector, times(maxInitializationAttempts)).listShards(); verify(shardDetector, times(maxInitializationAttempts)).listShards();
} }
@Test
public final void testMultiStreamInitialization() throws ProvisionedThroughputException, DependencyException {
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
scheduler.initialize();
shardDetectorMap.values().stream()
.forEach(shardDetector -> verify(shardDetector, times(1)).listShards());
}
@Test
public final void testMultiStreamInitializationWithFailures() {
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient,
workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(true, false));
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
scheduler.initialize();
// Note : As of today we retry for all streams in the next attempt. Hence the retry for each stream will vary.
// At the least we expect 2 retries for each stream. Since there are 4 streams, we expect at most
// the number of calls to be 5.
shardDetectorMap.values().stream()
.forEach(shardDetector -> verify(shardDetector, atLeast(2)).listShards());
shardDetectorMap.values().stream()
.forEach(shardDetector -> verify(shardDetector, atMost(5)).listShards());
}
@Test
public final void testMultiStreamConsumersAreBuiltOncePerAccountStreamShard() throws KinesisClientLibException {
final String shardId = "shardId-000000000000";
final String concurrencyToken = "concurrencyToken";
final ExtendedSequenceNumber firstSequenceNumber = ExtendedSequenceNumber.TRIM_HORIZON;
final ExtendedSequenceNumber secondSequenceNumber = new ExtendedSequenceNumber("1000");
final ExtendedSequenceNumber finalSequenceNumber = new ExtendedSequenceNumber("2000");
final List<ShardInfo> initialShardInfo = multiStreamTracker.streamConfigList().stream()
.map(sc -> new ShardInfo(shardId, concurrencyToken, null, firstSequenceNumber,
sc.streamIdentifier().serialize())).collect(Collectors.toList());
final List<ShardInfo> firstShardInfo = multiStreamTracker.streamConfigList().stream()
.map(sc -> new ShardInfo(shardId, concurrencyToken, null, secondSequenceNumber,
sc.streamIdentifier().serialize())).collect(Collectors.toList());
final List<ShardInfo> secondShardInfo = multiStreamTracker.streamConfigList().stream()
.map(sc -> new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber,
sc.streamIdentifier().serialize())).collect(Collectors.toList());
final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null);
when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo);
when(checkpoint.getCheckpointObject(anyString())).thenReturn(firstCheckpoint);
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
Scheduler schedulerSpy = spy(scheduler);
schedulerSpy.runProcessLoop();
schedulerSpy.runProcessLoop();
schedulerSpy.runProcessLoop();
initialShardInfo.stream().forEach(
shardInfo -> verify(schedulerSpy).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory)));
firstShardInfo.stream().forEach(
shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory)));
secondShardInfo.stream().forEach(
shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory)));
}
@Test @Test
public final void testSchedulerShutdown() { public final void testSchedulerShutdown() {
scheduler.shutdown(); scheduler.shutdown();
@ -508,7 +614,12 @@ public class SchedulerTest {
} }
@RequiredArgsConstructor
private class TestKinesisLeaseManagementFactory implements LeaseManagementFactory { private class TestKinesisLeaseManagementFactory implements LeaseManagementFactory {
private final boolean shardSyncFirstAttemptFailure;
private final boolean shouldReturnDefaultShardSyncTaskmanager;
@Override @Override
public LeaseCoordinator createLeaseCoordinator(MetricsFactory metricsFactory) { public LeaseCoordinator createLeaseCoordinator(MetricsFactory metricsFactory) {
return leaseCoordinator; return leaseCoordinator;
@ -522,6 +633,19 @@ public class SchedulerTest {
@Override @Override
public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory,
StreamConfig streamConfig) { StreamConfig streamConfig) {
if(shouldReturnDefaultShardSyncTaskmanager) {
return shardSyncTaskManager;
}
final ShardSyncTaskManager shardSyncTaskManager = mock(ShardSyncTaskManager.class);
final ShardDetector shardDetector = mock(ShardDetector.class);
shardSyncTaskManagerMap.put(streamConfig.streamIdentifier(), shardSyncTaskManager);
shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
if(shardSyncFirstAttemptFailure) {
when(shardDetector.listShards())
.thenThrow(new RuntimeException("Service Exception"))
.thenReturn(Collections.EMPTY_LIST);
}
return shardSyncTaskManager; return shardSyncTaskManager;
} }
@ -537,7 +661,7 @@ public class SchedulerTest {
@Override @Override
public ShardDetector createShardDetector(StreamConfig streamConfig) { public ShardDetector createShardDetector(StreamConfig streamConfig) {
return shardDetector; return shardDetectorMap.get(streamConfig.streamIdentifier());
} }
} }

View file

@ -55,6 +55,7 @@ import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.DependencyException;
@ -74,6 +75,10 @@ public class HierarchicalShardSyncerTest {
private static final int EXPONENT = 128; private static final int EXPONENT = 128;
private static final String LEASE_OWNER = "TestOwnere"; private static final String LEASE_OWNER = "TestOwnere";
private static final MetricsScope SCOPE = new NullMetricsScope(); private static final MetricsScope SCOPE = new NullMetricsScope();
private static final boolean MULTISTREAM_MODE_ON = true;
private static final String STREAM_IDENTIFIER = "acc:stream:1";
private static final HierarchicalShardSyncer.MultiStreamArgs MULTI_STREAM_ARGS = new HierarchicalShardSyncer.MultiStreamArgs(
MULTISTREAM_MODE_ON, StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER));
private final boolean cleanupLeasesOfCompletedShards = true; private final boolean cleanupLeasesOfCompletedShards = true;
private final boolean ignoreUnexpectedChildShards = false; private final boolean ignoreUnexpectedChildShards = false;
@ -95,6 +100,11 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer = new HierarchicalShardSyncer(); hierarchicalShardSyncer = new HierarchicalShardSyncer();
} }
private void setupMultiStream() {
hierarchicalShardSyncer = new HierarchicalShardSyncer(true);
when(shardDetector.streamIdentifier()).thenReturn(StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER));
}
/** /**
* Test determineNewLeasesToCreate() where there are no shards * Test determineNewLeasesToCreate() where there are no shards
*/ */
@ -107,6 +117,18 @@ public class HierarchicalShardSyncerTest {
equalTo(true)); equalTo(true));
} }
/**
* Test determineNewLeasesToCreate() where there are no shards for MultiStream
*/
@Test public void testDetermineNewLeasesToCreateNoShardsForMultiStream() {
final List<Shard> shards = Collections.emptyList();
final List<Lease> leases = Collections.emptyList();
assertThat(HierarchicalShardSyncer
.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS)
.isEmpty(), equalTo(true));
}
/** /**
* Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed * Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed
*/ */
@ -129,6 +151,29 @@ public class HierarchicalShardSyncerTest {
assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds)); assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds));
} }
/**
* Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed
*/
@Test
public void testDetermineNewLeasesToCreate0Leases0ReshardsForMultiStream() {
final String shardId0 = "shardId-0";
final String shardId1 = "shardId-1";
final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
final List<Lease> currentLeases = Collections.emptyList();
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1)));
assertThat(newLeases.size(), equalTo(expectedLeaseIds.size()));
assertThat(newLeaseKeys, equalTo(expectedLeaseIds));
}
/** /**
* Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but * Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but
* one of the shards was marked as inconsistent. * one of the shards was marked as inconsistent.
@ -155,6 +200,33 @@ public class HierarchicalShardSyncerTest {
assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds)); assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds));
} }
/**
* Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but
* one of the shards was marked as inconsistent.
*/
@Test
public void testDetermineNewLeasesToCreate0Leases0Reshards1InconsistentMultiStream() {
final String shardId0 = "shardId-0";
final String shardId1 = "shardId-1";
final String shardId2 = "shardId-2";
final SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId1, null, null, sequenceRange),
ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange));
final List<Lease> currentLeases = Collections.emptyList();
final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseShardIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1)));
assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size()));
assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds));
}
/** /**
* Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream) * Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream)
*/ */
@ -208,6 +280,45 @@ public class HierarchicalShardSyncerTest {
} }
@Test
public void testCheckAndCreateLeasesForShardsIfMissingAtLatestMultiStream() throws Exception {
final List<Shard> shards = constructShardListForGraphA();
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
setupMultiStream();
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE);
final Set<String> expectedShardIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")));
final List<Lease> requestLeases = leaseCaptor.getAllValues();
final Set<String> requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet());
assertThat(requestLeases.size(), equalTo(expectedShardIds.size()));
assertThat(requestLeaseKeys, equalTo(expectedShardIds));
assertThat(extendedSequenceNumbers.size(), equalTo(1));
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector).listShards();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
private List<String> toMultiStreamLeaseList(List<String> shardIdBasedLeases) {
return shardIdBasedLeases.stream().map(s -> STREAM_IDENTIFIER + ":" + s)
.collect(Collectors.toList());
}
/** /**
* Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards() * Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards()
* should never be called. * should never be called.
@ -244,6 +355,42 @@ public class HierarchicalShardSyncerTest {
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
} }
/**
* Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards()
* should never be called.
*/
@Test
public void testCheckAndCreateLeasesForShardsWithShardListMultiStream() throws Exception {
final List<Shard> latestShards = constructShardListForGraphA();
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(latestShards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
setupMultiStream();
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE, latestShards);
final Set<String> expectedShardIds = new HashSet<>(
toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")));
final List<Lease> requestLeases = leaseCaptor.getAllValues();
final Set<String> requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet());
assertThat(requestLeases.size(), equalTo(expectedShardIds.size()));
assertThat(requestLeaseKeys, equalTo(expectedShardIds));
assertThat(extendedSequenceNumbers.size(), equalTo(1));
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector, never()).listShards();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
/** /**
* Test checkAndCreateLeaseForNewShards with an empty list of shards. In this scenario, shardDetector.listShards() * Test checkAndCreateLeaseForNewShards with an empty list of shards. In this scenario, shardDetector.listShards()
* should never be called. * should never be called.
@ -306,6 +453,26 @@ public class HierarchicalShardSyncerTest {
} }
} }
@Test(expected = KinesisClientLibIOException.class)
public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenForMultiStream() throws Exception {
final List<Shard> shards = new ArrayList<>(constructShardListForGraphA());
final SequenceNumberRange range = shards.get(0).sequenceNumberRange().toBuilder().endingSequenceNumber(null)
.build();
final Shard shard = shards.get(3).toBuilder().sequenceNumberRange(range).build();
shards.remove(3);
shards.add(3, shard);
when(shardDetector.listShards()).thenReturn(shards);
setupMultiStream();
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, false, SCOPE);
} finally {
verify(shardDetector).listShards();
verify(dynamoDBLeaseRefresher, never()).listLeases();
}
}
/** /**
* Test checkAndCreateLeasesForNewShards() when a parent is open and children of open parents are being ignored. * Test checkAndCreateLeasesForNewShards() when a parent is open and children of open parents are being ignored.
*/ */
@ -354,6 +521,51 @@ public class HierarchicalShardSyncerTest {
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
} }
@Test
public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringInconsistentChildrenMultiStream() throws Exception {
final List<Shard> shards = new ArrayList<>(constructShardListForGraphA());
final Shard shard = shards.get(5);
assertThat(shard.shardId(), equalTo("shardId-5"));
shards.remove(5);
// shardId-5 in graph A has two children (shardId-9 and shardId-10). if shardId-5
// is not closed, those children should be ignored when syncing shards, no leases
// should be obtained for them, and we should obtain a lease on the still-open
// parent.
shards.add(5,
shard.toBuilder()
.sequenceNumberRange(shard.sequenceNumberRange().toBuilder().endingSequenceNumber(null).build())
.build());
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
setupMultiStream();
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, true, SCOPE);
final List<Lease> leases = leaseCaptor.getAllValues();
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> leaseSequenceNumbers = leases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet());
final Set<String> expectedShardIds = new HashSet<>(toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-5", "shardId-8")));
assertThat(leaseKeys.size(), equalTo(expectedShardIds.size()));
assertThat(leaseKeys, equalTo(expectedShardIds));
assertThat(leaseSequenceNumbers.size(), equalTo(1));
leaseSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector).listShards();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
@Test @Test
public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShard() throws Exception { public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShard() throws Exception {
testCheckAndCreateLeasesForNewShardsAndClosedShard(ExtendedSequenceNumber.TRIM_HORIZON, testCheckAndCreateLeasesForNewShardsAndClosedShard(ExtendedSequenceNumber.TRIM_HORIZON,
@ -711,6 +923,11 @@ public class HierarchicalShardSyncerTest {
return createLeasesFromShards(Collections.singletonList(shard), checkpoint, leaseOwner).get(0); return createLeasesFromShards(Collections.singletonList(shard), checkpoint, leaseOwner).get(0);
} }
private MultiStreamLease createMultiStreamLeaseFromShard(final Shard shard, final ExtendedSequenceNumber checkpoint,
final String leaseOwner) {
return createMultiStreamLeasesFromShards(Collections.singletonList(shard), checkpoint, leaseOwner).get(0);
}
private List<Lease> createLeasesFromShards(final List<Shard> shards, final ExtendedSequenceNumber checkpoint, private List<Lease> createLeasesFromShards(final List<Shard> shards, final ExtendedSequenceNumber checkpoint,
final String leaseOwner) { final String leaseOwner) {
return shards.stream().map(shard -> { return shards.stream().map(shard -> {
@ -726,6 +943,29 @@ public class HierarchicalShardSyncerTest {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
} }
private List<MultiStreamLease> createMultiStreamLeasesFromShards(final List<Shard> shards, final ExtendedSequenceNumber checkpoint,
final String leaseOwner) {
return shards.stream().map(shard -> {
final Set<String> parentShardIds = new HashSet<>();
if (StringUtils.isNotEmpty(shard.parentShardId())) {
parentShardIds.add(shard.parentShardId());
}
if (StringUtils.isNotEmpty(shard.adjacentParentShardId())) {
parentShardIds.add(shard.adjacentParentShardId());
}
final MultiStreamLease msLease = new MultiStreamLease();
msLease.shardId(shard.shardId());
msLease.leaseOwner(leaseOwner);
msLease.leaseCounter(0L);
msLease.concurrencyToken(UUID.randomUUID());
msLease.lastCounterIncrementNanos(0L);
msLease.checkpoint(checkpoint);
msLease.parentShardIds(parentShardIds);
msLease.streamIdentifier(STREAM_IDENTIFIER);
return msLease;
}).collect(Collectors.toList());
}
@Test @Test
public void testCleanUpGarbageLeaseForNonExistentShard() throws Exception { public void testCleanUpGarbageLeaseForNonExistentShard() throws Exception {
final List<Shard> shards = constructShardListForGraphA(); final List<Shard> shards = constructShardListForGraphA();
@ -755,6 +995,35 @@ public class HierarchicalShardSyncerTest {
verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
} }
@Test
public void testCleanUpGarbageLeaseForNonExistentShardForMultiStream() throws Exception {
final List<Shard> shards = constructShardListForGraphA();
final String garbageShardId = "shardId-garbage-001";
final Shard garbageShard = ShardObjectHelper.newShard(garbageShardId, null, null,
ShardObjectHelper.newSequenceNumberRange("101", null));
final Lease garbageLease = createMultiStreamLeaseFromShard(garbageShard, new ExtendedSequenceNumber("99"), LEASE_OWNER);
final List<Lease> leases = new ArrayList<>(
createMultiStreamLeasesFromShards(shards, ExtendedSequenceNumber.TRIM_HORIZON, LEASE_OWNER));
leases.add(garbageLease);
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(leases);
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture());
setupMultiStream();
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
assertThat(leaseCaptor.getAllValues().size(), equalTo(1));
assertThat(leaseCaptor.getValue(), equalTo(garbageLease));
verify(shardDetector, times(2)).listShards();
verify(dynamoDBLeaseRefresher).listLeases();
verify(dynamoDBLeaseRefresher).deleteLease(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
}
private void testCheckAndCreateLeasesForShardsIfMissing(InitialPositionInStreamExtended initialPosition) private void testCheckAndCreateLeasesForShardsIfMissing(InitialPositionInStreamExtended initialPosition)
throws Exception { throws Exception {
final String shardId0 = "shardId-0"; final String shardId0 = "shardId-0";

View file

@ -57,8 +57,6 @@ import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
import javax.swing.*;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class ConsumerStatesTest { public class ConsumerStatesTest {
private static final String STREAM_NAME = "TestStream"; private static final String STREAM_NAME = "TestStream";
@ -115,7 +113,7 @@ public class ConsumerStatesTest {
@Before @Before
public void setup() { public void setup() {
argument = new ShardConsumerArgument(shardInfo, StreamIdentifier.fromStreamName(STREAM_NAME), leaseCoordinator, executorService, recordsPublisher, argument = new ShardConsumerArgument(shardInfo, StreamIdentifier.singleStreamInstance(STREAM_NAME), leaseCoordinator, executorService, recordsPublisher,
shardRecordProcessor, checkpointer, recordProcessorCheckpointer, parentShardPollIntervalMillis, shardRecordProcessor, checkpointer, recordProcessorCheckpointer, parentShardPollIntervalMillis,
taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis,
maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis,

View file

@ -31,7 +31,6 @@ import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
@ -67,7 +66,8 @@ public class FanOutConfigTest {
.streamName(TEST_STREAM_NAME); .streamName(TEST_STREAM_NAME);
RetrievalFactory retrievalFactory = config.retrievalFactory(); RetrievalFactory retrievalFactory = config.retrievalFactory();
ShardInfo shardInfo = mock(ShardInfo.class); ShardInfo shardInfo = mock(ShardInfo.class);
doReturn(Optional.of(StreamIdentifier.fromStreamName(TEST_STREAM_NAME).toString())).when(shardInfo).streamIdentifier(); // doReturn(Optional.of(StreamIdentifier.singleStreamInstance(TEST_STREAM_NAME).serialize())).when(shardInfo).streamIdentifier();
doReturn(Optional.empty()).when(shardInfo).streamIdentifierSerOpt();
retrievalFactory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class)); retrievalFactory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class));
assertThat(retrievalFactory, not(nullValue())); assertThat(retrievalFactory, not(nullValue()));
verify(consumerRegistration).getOrCreateStreamConsumerArn(); verify(consumerRegistration).getOrCreateStreamConsumerArn();
@ -93,7 +93,7 @@ public class FanOutConfigTest {
.streamName(TEST_STREAM_NAME); .streamName(TEST_STREAM_NAME);
RetrievalFactory factory = config.retrievalFactory(); RetrievalFactory factory = config.retrievalFactory();
ShardInfo shardInfo = mock(ShardInfo.class); ShardInfo shardInfo = mock(ShardInfo.class);
doReturn(Optional.of(StreamIdentifier.fromStreamName(TEST_STREAM_NAME).toString())).when(shardInfo).streamIdentifier(); doReturn(Optional.empty()).when(shardInfo).streamIdentifierSerOpt();
factory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class)); factory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class));
assertThat(factory, not(nullValue())); assertThat(factory, not(nullValue()));
@ -108,7 +108,7 @@ public class FanOutConfigTest {
.streamName(TEST_STREAM_NAME); .streamName(TEST_STREAM_NAME);
RetrievalFactory factory = config.retrievalFactory(); RetrievalFactory factory = config.retrievalFactory();
ShardInfo shardInfo = mock(ShardInfo.class); ShardInfo shardInfo = mock(ShardInfo.class);
doReturn(Optional.of(StreamIdentifier.fromStreamName(TEST_STREAM_NAME).toString())).when(shardInfo).streamIdentifier(); doReturn(Optional.empty()).when(shardInfo).streamIdentifierSerOpt();
factory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class)); factory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class));
assertThat(factory, not(nullValue())); assertThat(factory, not(nullValue()));
TestingConfig testingConfig = (TestingConfig) config; TestingConfig testingConfig = (TestingConfig) config;
@ -122,7 +122,7 @@ public class FanOutConfigTest {
.consumerName(TEST_CONSUMER_NAME).streamName(TEST_STREAM_NAME); .consumerName(TEST_CONSUMER_NAME).streamName(TEST_STREAM_NAME);
RetrievalFactory factory = config.retrievalFactory(); RetrievalFactory factory = config.retrievalFactory();
ShardInfo shardInfo = mock(ShardInfo.class); ShardInfo shardInfo = mock(ShardInfo.class);
doReturn(Optional.of(StreamIdentifier.fromStreamName(TEST_STREAM_NAME).toString())).when(shardInfo).streamIdentifier(); doReturn(Optional.empty()).when(shardInfo).streamIdentifierSerOpt();
factory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class)); factory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class));
assertThat(factory, not(nullValue())); assertThat(factory, not(nullValue()));