Adding account and stream epoch support. Checkpoint 2

This commit is contained in:
Ashwin Giridharan 2020-03-04 02:52:20 -08:00
parent 2b507342d8
commit 8e8f6ed352
14 changed files with 94 additions and 46 deletions

View file

@ -3,11 +3,13 @@ package software.amazon.kinesis.common;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors;
import software.amazon.awssdk.utils.Validate;
@RequiredArgsConstructor
@EqualsAndHashCode
@Getter
@Accessors(fluent = true)
public class StreamIdentifier {
private final String accountName;
private final String streamName;

View file

@ -41,6 +41,7 @@ import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
@ -120,9 +121,9 @@ public class HierarchicalShardSyncer {
assertAllParentShardsAreClosed(inconsistentShardIds);
}
final List<Lease> currentLeases = isMultiStreamMode ?
getLeasesForStream(shardDetector.streamName(), leaseRefresher) :
getLeasesForStream(shardDetector.streamIdentifier(), leaseRefresher) :
leaseRefresher.listLeases();
final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, shardDetector.streamName());
final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, shardDetector.streamIdentifier());
final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(latestShards, currentLeases, initialPosition,
inconsistentShardIds, multiStreamArgs);
log.debug("Num new leases to create: {}", newLeasesToCreate.size());
@ -149,19 +150,19 @@ public class HierarchicalShardSyncer {
/** Note: This method has package level access solely for testing purposes.
*
* @param streamName We'll use this stream name to filter leases
* @param streamIdentifier We'll use this stream identifier to filter leases
* @param leaseRefresher Used to fetch leases
* @return Return list of leases (corresponding to shards) of the specified stream.
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
*/
static List<Lease> getLeasesForStream(String streamName,
static List<Lease> getLeasesForStream(StreamIdentifier streamIdentifier,
LeaseRefresher leaseRefresher)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<Lease> streamLeases = new ArrayList<>();
for (Lease lease : leaseRefresher.listLeases()) {
if (streamName.equals(((MultiStreamLease)lease).streamIdentifier())) {
if (streamIdentifier.toString().equals(((MultiStreamLease)lease).streamIdentifier())) {
streamLeases.add(lease);
}
}
@ -378,7 +379,7 @@ public class HierarchicalShardSyncer {
} else {
log.debug("Need to create a lease for shardId {}", shardId);
final Lease newLease = multiStreamArgs.isMultiStreamMode() ?
newKCLMultiStreamLease(shard, multiStreamArgs.streamName()) :
newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) :
newKCLLease(shard);
final boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition,
shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap,
@ -502,7 +503,7 @@ public class HierarchicalShardSyncer {
if (lease == null) {
lease = multiStreamArgs.isMultiStreamMode() ?
newKCLMultiStreamLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId),
multiStreamArgs.streamName()) :
multiStreamArgs.streamIdentifier()) :
newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId));
shardIdToLeaseMapOfNewShards.put(parentShardId, lease);
}
@ -758,9 +759,9 @@ public class HierarchicalShardSyncer {
return newLease;
}
private static Lease newKCLMultiStreamLease(final Shard shard, final String streamName) {
private static Lease newKCLMultiStreamLease(final Shard shard, final StreamIdentifier streamIdentifier) {
MultiStreamLease newLease = new MultiStreamLease();
newLease.leaseKey(MultiStreamLease.getLeaseKey(streamName, shard.shardId()));
newLease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.toString(), shard.shardId()));
List<String> parentShardIds = new ArrayList<>(2);
if (shard.parentShardId() != null) {
parentShardIds.add(shard.parentShardId());
@ -770,7 +771,7 @@ public class HierarchicalShardSyncer {
}
newLease.parentShardIds(parentShardIds);
newLease.ownerSwitchesSinceCheckpoint(0L);
newLease.streamIdentifier(streamName);
newLease.streamIdentifier(streamIdentifier.toString());
newLease.shardId(shard.shardId());
return newLease;
}
@ -858,7 +859,7 @@ public class HierarchicalShardSyncer {
@Accessors(fluent = true)
private static class MultiStreamArgs {
private final Boolean isMultiStreamMode;
private final String streamName;
private final StreamIdentifier streamIdentifier;
}
}

View file

@ -47,6 +47,7 @@ import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.FutureUtils;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.retrieval.AWSExceptionManager;
/**
@ -60,7 +61,7 @@ public class KinesisShardDetector implements ShardDetector {
@NonNull
private final KinesisAsyncClient kinesisClient;
@NonNull @Getter
private final String streamName;
private final StreamIdentifier streamIdentifier;
private final long listShardsBackoffTimeInMillis;
private final int maxListShardsRetryAttempts;
private final long listShardsCacheAllowedAgeInSeconds;
@ -77,16 +78,16 @@ public class KinesisShardDetector implements ShardDetector {
public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload,
int cacheMissWarningModulus) {
this(kinesisClient, streamName, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts,
this(kinesisClient, StreamIdentifier.fromStreamName(streamName), listShardsBackoffTimeInMillis, maxListShardsRetryAttempts,
listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, cacheMissWarningModulus,
LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
}
public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis,
public KinesisShardDetector(KinesisAsyncClient kinesisClient, StreamIdentifier streamIdentifier, long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload,
int cacheMissWarningModulus, Duration kinesisRequestTimeout) {
this.kinesisClient = kinesisClient;
this.streamName = streamName;
this.streamIdentifier = streamIdentifier;
this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis;
this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
this.listShardsCacheAllowedAgeInSeconds = listShardsCacheAllowedAgeInSeconds;
@ -180,7 +181,7 @@ public class KinesisShardDetector implements ShardDetector {
ListShardsRequest.Builder request = KinesisRequestsBuilder.listShardsRequestBuilder();
if (StringUtils.isEmpty(nextToken)) {
request = request.streamName(streamName);
request = request.streamName(streamIdentifier.streamName());
} else {
request = request.nextToken(nextToken);
}
@ -205,12 +206,12 @@ public class KinesisShardDetector implements ShardDetector {
+ " Active or Updating)");
return null;
} catch (LimitExceededException e) {
log.info("Got LimitExceededException when listing shards {}. Backing off for {} millis.", streamName,
log.info("Got LimitExceededException when listing shards {}. Backing off for {} millis.", streamIdentifier,
listShardsBackoffTimeInMillis);
try {
Thread.sleep(listShardsBackoffTimeInMillis);
} catch (InterruptedException ie) {
log.debug("Stream {} : Sleep was interrupted ", streamName, ie);
log.debug("Stream {} : Sleep was interrupted ", streamIdentifier, ie);
}
lastException = e;
} catch (TimeoutException te) {

View file

@ -35,10 +35,10 @@ public class MultiStreamLease extends Lease {
shardId(casted.shardId);
}
public static String getLeaseKey(String streamName, String shardId) {
verifyNotNull(streamName, "streamName should not be null");
public static String getLeaseKey(String streamIdentifier, String shardId) {
verifyNotNull(streamIdentifier, "streamIdentifier should not be null");
verifyNotNull(shardId, "shardId should not be null");
return streamName + ":" + shardId;
return streamIdentifier + ":" + shardId;
}
@Override

View file

@ -16,6 +16,7 @@
package software.amazon.kinesis.leases;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.kinesis.common.StreamIdentifier;
import java.util.List;
@ -27,7 +28,7 @@ public interface ShardDetector {
List<Shard> listShards();
default String streamName() {
default StreamIdentifier streamIdentifier() {
throw new UnsupportedOperationException("StreamName not available");
}

View file

@ -31,6 +31,7 @@ import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.KinesisShardDetector;
import software.amazon.kinesis.leases.LeaseCoordinator;
@ -330,7 +331,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode) {
this(kinesisClient, new StreamConfig(streamName, initialPositionInStream), dynamoDBClient, tableName,
this(kinesisClient, new StreamConfig(StreamIdentifier.fromStreamName(streamName), initialPositionInStream), dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
@ -477,14 +478,20 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@Override @Deprecated
public ShardDetector createShardDetector() {
return new KinesisShardDetector(kinesisClient, streamConfig.streamName(),
return new KinesisShardDetector(kinesisClient, streamConfig.streamIdentifier(),
listShardsBackoffTimeMillis, maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds,
maxCacheMissesBeforeReload, cacheMissWarningModulus, dynamoDbRequestTimeout);
}
/**
* KinesisShardDetector supports reading from service only using streamName. Support for accountId and
* stream creation epoch is yet to be provided.
* @param streamConfig
* @return
*/
@Override
public ShardDetector createShardDetector(StreamConfig streamConfig) {
return new KinesisShardDetector(kinesisClient, streamConfig.streamName(), listShardsBackoffTimeMillis,
return new KinesisShardDetector(kinesisClient, streamConfig.streamIdentifier(), listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload,
cacheMissWarningModulus, dynamoDbRequestTimeout);
}

View file

@ -14,14 +14,15 @@ import static software.amazon.kinesis.leases.MultiStreamLease.validateAndCast;
@NoArgsConstructor
public class DynamoDBMultiStreamLeaseSerializer extends DynamoDBLeaseSerializer {
private static final String STREAM_NAME_KEY = "streamName";
// Keeping the stream id as "streamName" for legacy reasons.
private static final String STREAM_ID_KEY = "streamName";
private static final String SHARD_ID_KEY = "shardId";
@Override
public Map<String, AttributeValue> toDynamoRecord(Lease lease) {
final MultiStreamLease multiStreamLease = validateAndCast(lease);
final Map<String, AttributeValue> result = super.toDynamoRecord(multiStreamLease);
result.put(STREAM_NAME_KEY, DynamoUtils.createAttributeValue(multiStreamLease.streamIdentifier()));
result.put(STREAM_ID_KEY, DynamoUtils.createAttributeValue(multiStreamLease.streamIdentifier()));
result.put(SHARD_ID_KEY, DynamoUtils.createAttributeValue(multiStreamLease.shardId()));
return result;
}
@ -30,7 +31,7 @@ public class DynamoDBMultiStreamLeaseSerializer extends DynamoDBLeaseSerializer
public MultiStreamLease fromDynamoRecord(Map<String, AttributeValue> dynamoRecord) {
final MultiStreamLease multiStreamLease = (MultiStreamLease) super
.fromDynamoRecord(dynamoRecord, new MultiStreamLease());
multiStreamLease.streamIdentifier(DynamoUtils.safeGetString(dynamoRecord, STREAM_NAME_KEY));
multiStreamLease.streamIdentifier(DynamoUtils.safeGetString(dynamoRecord, STREAM_ID_KEY));
multiStreamLease.shardId(DynamoUtils.safeGetString(dynamoRecord, SHARD_ID_KEY));
return multiStreamLease;
}
@ -40,7 +41,7 @@ public class DynamoDBMultiStreamLeaseSerializer extends DynamoDBLeaseSerializer
public Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(Lease lease) {
final MultiStreamLease multiStreamLease = validateAndCast(lease);
final Map<String, AttributeValueUpdate> result = super.getDynamoUpdateLeaseUpdate(multiStreamLease);
result.put(STREAM_NAME_KEY, putUpdate(DynamoUtils.createAttributeValue(multiStreamLease.streamIdentifier())));
result.put(STREAM_ID_KEY, putUpdate(DynamoUtils.createAttributeValue(multiStreamLease.streamIdentifier())));
result.put(SHARD_ID_KEY, putUpdate(DynamoUtils.createAttributeValue(multiStreamLease.shardId())));
return result;
}

View file

@ -19,6 +19,7 @@ import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
@ -27,6 +28,7 @@ import software.amazon.kinesis.retrieval.RetrievalFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
@RequiredArgsConstructor
@ -36,7 +38,7 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
private final KinesisAsyncClient kinesisClient;
private final String defaultStreamName;
private final Function<String, String> consumerArnProvider;
private Map<String,String> streamToconsumerArnMap = new HashMap<>();
private Map<String,String> streamToConsumerArnMap = new HashMap<>();
@Override
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo,
@ -47,8 +49,14 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
@Override
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
final MetricsFactory metricsFactory) {
final String streamName = shardInfo.streamIdentifier().orElse(defaultStreamName);
final Optional<String> streamIdentifierStr = shardInfo.streamIdentifier();
final String streamName;
if(streamIdentifierStr.isPresent()) {
streamName = StreamIdentifier.fromString(streamIdentifierStr.get()).streamName();
} else {
streamName = defaultStreamName;
}
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
streamToconsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply));
streamToConsumerArnMap.computeIfAbsent(streamName, consumerArnProvider::apply));
}
}

View file

@ -40,6 +40,7 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.FutureUtils;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
@ -63,7 +64,7 @@ public class KinesisDataFetcher {
@NonNull
private final KinesisAsyncClient kinesisClient;
@NonNull
private final String streamName;
private final StreamIdentifier streamIdentifier;
@NonNull
private final String shardId;
private final int maxRecords;
@ -73,12 +74,13 @@ public class KinesisDataFetcher {
@Deprecated
public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory) {
this(kinesisClient, streamName, shardId, maxRecords, metricsFactory, PollingConfig.DEFAULT_REQUEST_TIMEOUT);
this(kinesisClient, StreamIdentifier.fromStreamName(streamName), shardId, maxRecords, metricsFactory, PollingConfig.DEFAULT_REQUEST_TIMEOUT);
}
public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, String shardId, int maxRecords, MetricsFactory metricsFactory, Duration maxFutureWait) {
// Changing the constructor directly as this is an internal API
public KinesisDataFetcher(KinesisAsyncClient kinesisClient, StreamIdentifier streamIdentifier, String shardId, int maxRecords, MetricsFactory metricsFactory, Duration maxFutureWait) {
this.kinesisClient = kinesisClient;
this.streamName = streamName;
this.streamIdentifier = streamIdentifier;
this.shardId = shardId;
this.maxRecords = maxRecords;
this.metricsFactory = metricsFactory;
@ -199,7 +201,7 @@ public class KinesisDataFetcher {
final AWSExceptionManager exceptionManager = createExceptionManager();
GetShardIteratorRequest.Builder builder = KinesisRequestsBuilder.getShardIteratorRequestBuilder()
.streamName(streamName).shardId(shardId);
.streamName(streamIdentifier.streamName()).shardId(shardId);
GetShardIteratorRequest request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStream)
.build();

View file

@ -19,6 +19,7 @@ import lombok.Data;
import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
@ -62,8 +63,11 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
@Override
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
@NonNull final MetricsFactory metricsFactory) {
final StreamIdentifier streamIdentifier = shardInfo.streamIdentifier().isPresent() ?
StreamIdentifier.fromString(shardInfo.streamIdentifier().get()) :
StreamIdentifier.fromStreamName(streamName);
return new SynchronousGetRecordsRetrievalStrategy(
new KinesisDataFetcher(kinesisClient, shardInfo.streamIdentifier().orElse(streamName), shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout));
new KinesisDataFetcher(kinesisClient, streamIdentifier, shardInfo.shardId(), maxRecords, metricsFactory, kinesisRequestTimeout));
}
@Override

View file

@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService;
import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
@ -67,8 +68,11 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory
@Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
@NonNull final MetricsFactory metricsFactory) {
final StreamIdentifier streamIdentifier = shardInfo.streamIdentifier().isPresent() ?
StreamIdentifier.fromString(shardInfo.streamIdentifier().get()) :
StreamIdentifier.fromStreamName(streamName);
return new SynchronousGetRecordsRetrievalStrategy(
new KinesisDataFetcher(kinesisClient, shardInfo.streamIdentifier().orElse(streamName), shardInfo.shardId(),
new KinesisDataFetcher(kinesisClient, streamIdentifier, shardInfo.shardId(),
maxRecords, metricsFactory, maxFutureWait));
}

View file

@ -53,6 +53,7 @@ import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.CheckpointFactory;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.KinesisClientLibNonRetryableException;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig;
@ -501,9 +502,10 @@ public class SchedulerTest {
}
@Override
public ShardRecordProcessor shardRecordProcessor(String streamName) {
public ShardRecordProcessor shardRecordProcessor(StreamIdentifier streamIdentifier) {
return shardRecordProcessor();
}
}
private class TestKinesisLeaseManagementFactory implements LeaseManagementFactory {

View file

@ -43,6 +43,7 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
@ -114,7 +115,7 @@ public class ConsumerStatesTest {
@Before
public void setup() {
argument = new ShardConsumerArgument(shardInfo, STREAM_NAME, leaseCoordinator, executorService, recordsPublisher,
argument = new ShardConsumerArgument(shardInfo, StreamIdentifier.fromStreamName(STREAM_NAME), leaseCoordinator, executorService, recordsPublisher,
shardRecordProcessor, checkpointer, recordProcessorCheckpointer, parentShardPollIntervalMillis,
taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis,
maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis,

View file

@ -19,6 +19,8 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -29,9 +31,14 @@ import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.RetrievalFactory;
import java.util.Optional;
@RunWith(MockitoJUnitRunner.class)
public class FanOutConfigTest {
@ -59,7 +66,9 @@ public class FanOutConfigTest {
FanOutConfig config = new TestingConfig(kinesisClient).applicationName(TEST_APPLICATION_NAME)
.streamName(TEST_STREAM_NAME);
RetrievalFactory retrievalFactory = config.retrievalFactory();
ShardInfo shardInfo = mock(ShardInfo.class);
doReturn(Optional.of(StreamIdentifier.fromStreamName(TEST_STREAM_NAME).toString())).when(shardInfo).streamIdentifier();
retrievalFactory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class));
assertThat(retrievalFactory, not(nullValue()));
verify(consumerRegistration).getOrCreateStreamConsumerArn();
}
@ -83,7 +92,9 @@ public class FanOutConfigTest {
FanOutConfig config = new TestingConfig(kinesisClient).applicationName(TEST_APPLICATION_NAME)
.streamName(TEST_STREAM_NAME);
RetrievalFactory factory = config.retrievalFactory();
ShardInfo shardInfo = mock(ShardInfo.class);
doReturn(Optional.of(StreamIdentifier.fromStreamName(TEST_STREAM_NAME).toString())).when(shardInfo).streamIdentifier();
factory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class));
assertThat(factory, not(nullValue()));
TestingConfig testingConfig = (TestingConfig) config;
@ -96,9 +107,10 @@ public class FanOutConfigTest {
FanOutConfig config = new TestingConfig(kinesisClient).consumerName(TEST_CONSUMER_NAME)
.streamName(TEST_STREAM_NAME);
RetrievalFactory factory = config.retrievalFactory();
ShardInfo shardInfo = mock(ShardInfo.class);
doReturn(Optional.of(StreamIdentifier.fromStreamName(TEST_STREAM_NAME).toString())).when(shardInfo).streamIdentifier();
factory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class));
assertThat(factory, not(nullValue()));
TestingConfig testingConfig = (TestingConfig) config;
assertThat(testingConfig.stream, equalTo(TEST_STREAM_NAME));
assertThat(testingConfig.consumerToCreate, equalTo(TEST_CONSUMER_NAME));
@ -109,7 +121,9 @@ public class FanOutConfigTest {
FanOutConfig config = new TestingConfig(kinesisClient).applicationName(TEST_APPLICATION_NAME)
.consumerName(TEST_CONSUMER_NAME).streamName(TEST_STREAM_NAME);
RetrievalFactory factory = config.retrievalFactory();
ShardInfo shardInfo = mock(ShardInfo.class);
doReturn(Optional.of(StreamIdentifier.fromStreamName(TEST_STREAM_NAME).toString())).when(shardInfo).streamIdentifier();
factory.createGetRecordsCache(shardInfo, mock(MetricsFactory.class));
assertThat(factory, not(nullValue()));
TestingConfig testingConfig = (TestingConfig) config;