2.2.5 changes to master

This commit is contained in:
Chunxue Yang 2019-10-23 13:01:13 -07:00
commit 4a5a42f835
9 changed files with 164 additions and 54 deletions

View file

@ -588,7 +588,7 @@ public class Scheduler implements Runnable {
checkpoint); checkpoint);
ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo, ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo,
streamName, streamName,
leaseRefresher, leaseCoordinator,
executorService, executorService,
cache, cache,
shardRecordProcessorFactory.shardRecordProcessor(), shardRecordProcessorFactory.shardRecordProcessor(),

View file

@ -78,22 +78,21 @@ public class HierarchicalShardSyncer {
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final MetricsScope scope) throws DependencyException, InvalidStateException, final MetricsScope scope) throws DependencyException, InvalidStateException,
ProvisionedThroughputException, KinesisClientLibIOException { ProvisionedThroughputException, KinesisClientLibIOException {
final List<Shard> shards = getShardList(shardDetector); final List<Shard> latestShards = getShardList(shardDetector);
checkAndCreateLeaseForNewShards(shards, shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards, checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, scope); ignoreUnexpectedChildShards, scope, latestShards);
} }
//Provide a pre-collcted list of shards to avoid calling ListShards API //Provide a pre-collcted list of shards to avoid calling ListShards API
public synchronized void checkAndCreateLeaseForNewShards(List<Shard> shards, @NonNull final ShardDetector shardDetector, public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards,
final boolean ignoreUnexpectedChildShards, final MetricsScope scope)throws DependencyException, InvalidStateException, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List<Shard> latestShards)throws DependencyException, InvalidStateException,
ProvisionedThroughputException, KinesisClientLibIOException { ProvisionedThroughputException, KinesisClientLibIOException {
if(CollectionUtils.isNullOrEmpty(shards)) { if (!CollectionUtils.isNullOrEmpty(latestShards)) {
shards = getShardList(shardDetector); log.debug("Num shards: {}", latestShards.size());
} }
log.debug("Num shards: {}", shards.size());
final Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(shards); final Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(latestShards);
final Map<String, Set<String>> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap( final Map<String, Set<String>> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(
shardIdToShardMap); shardIdToShardMap);
final Set<String> inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap); final Set<String> inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap);
@ -103,8 +102,7 @@ public class HierarchicalShardSyncer {
final List<Lease> currentLeases = leaseRefresher.listLeases(); final List<Lease> currentLeases = leaseRefresher.listLeases();
final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition, final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(latestShards, currentLeases, initialPosition, inconsistentShardIds);
inconsistentShardIds);
log.debug("Num new leases to create: {}", newLeasesToCreate.size()); log.debug("Num new leases to create: {}", newLeasesToCreate.size());
for (Lease lease : newLeasesToCreate) { for (Lease lease : newLeasesToCreate) {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
@ -116,13 +114,11 @@ public class HierarchicalShardSyncer {
MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED); MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED);
} }
} }
final List<Lease> trackedLeases = new ArrayList<>(currentLeases); final List<Lease> trackedLeases = new ArrayList<>(currentLeases);
trackedLeases.addAll(newLeasesToCreate); trackedLeases.addAll(newLeasesToCreate);
cleanupGarbageLeases(shardDetector, shards, trackedLeases, leaseRefresher); cleanupGarbageLeases(shardDetector, latestShards, trackedLeases, leaseRefresher);
if (cleanupLeasesOfCompletedShards) { if (cleanupLeasesOfCompletedShards) {
cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, leaseRefresher);
leaseRefresher);
} }
} }

View file

@ -135,7 +135,7 @@ class ConsumerStates {
@Override @Override
public ConsumerTask createTask(ShardConsumerArgument consumerArgument, ShardConsumer consumer, ProcessRecordsInput input) { public ConsumerTask createTask(ShardConsumerArgument consumerArgument, ShardConsumer consumer, ProcessRecordsInput input) {
return new BlockOnParentShardTask(consumerArgument.shardInfo(), return new BlockOnParentShardTask(consumerArgument.shardInfo(),
consumerArgument.leaseRefresher(), consumerArgument.leaseCoordinator().leaseRefresher(),
consumerArgument.parentShardPollIntervalMillis()); consumerArgument.parentShardPollIntervalMillis());
} }
@ -492,7 +492,7 @@ class ConsumerStates {
argument.initialPositionInStream(), argument.initialPositionInStream(),
argument.cleanupLeasesOfCompletedShards(), argument.cleanupLeasesOfCompletedShards(),
argument.ignoreUnexpectedChildShards(), argument.ignoreUnexpectedChildShards(),
argument.leaseRefresher(), argument.leaseCoordinator(),
argument.taskBackoffTimeMillis(), argument.taskBackoffTimeMillis(),
argument.recordsPublisher(), argument.recordsPublisher(),
argument.hierarchicalShardSyncer(), argument.hierarchicalShardSyncer(),

View file

@ -21,6 +21,7 @@ import lombok.experimental.Accessors;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
@ -42,7 +43,7 @@ public class ShardConsumerArgument {
@NonNull @NonNull
private final String streamName; private final String streamName;
@NonNull @NonNull
private final LeaseRefresher leaseRefresher; private final LeaseCoordinator leaseCoordinator;
@NonNull @NonNull
private final ExecutorService executorService; private final ExecutorService executorService;
@NonNull @NonNull

View file

@ -20,10 +20,14 @@ import com.sun.org.apache.bcel.internal.generic.LUSHR;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
@ -39,6 +43,7 @@ import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
/** /**
@ -60,13 +65,13 @@ public class ShutdownTask implements ConsumerTask {
@NonNull @NonNull
private final ShardRecordProcessorCheckpointer recordProcessorCheckpointer; private final ShardRecordProcessorCheckpointer recordProcessorCheckpointer;
@NonNull @NonNull
private ShutdownReason reason; private final ShutdownReason reason;
@NonNull @NonNull
private final InitialPositionInStreamExtended initialPositionInStream; private final InitialPositionInStreamExtended initialPositionInStream;
private final boolean cleanupLeasesOfCompletedShards; private final boolean cleanupLeasesOfCompletedShards;
private final boolean ignoreUnexpectedChildShards; private final boolean ignoreUnexpectedChildShards;
@NonNull @NonNull
private final LeaseRefresher leaseRefresher; private final LeaseCoordinator leaseCoordinator;
private final long backoffTimeMillis; private final long backoffTimeMillis;
@NonNull @NonNull
private final RecordsPublisher recordsPublisher; private final RecordsPublisher recordsPublisher;
@ -93,29 +98,38 @@ public class ShutdownTask implements ConsumerTask {
try { try {
try { try {
List<Shard> allShards = new ArrayList<>(); ShutdownReason localReason = reason;
if(reason == ShutdownReason.SHARD_END) { List<Shard> latestShards = null;
allShards = shardDetector.listShards(); /*
* Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END
* If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows
* active workers to contend for the lease of this shard.
*/
if (localReason == ShutdownReason.SHARD_END) {
latestShards = shardDetector.listShards();
if(!isRealShardEnd(allShards)) { //If latestShards is empty, should also shutdown the ShardConsumer without checkpoint with SHARD_END
reason = ShutdownReason.LEASE_LOST; if (CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) {
localReason = ShutdownReason.LEASE_LOST;
dropLease();
log.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.shardId());
} }
} }
// If we reached end of the shard, set sequence number to SHARD_END. // If we reached end of the shard, set sequence number to SHARD_END.
if (reason == ShutdownReason.SHARD_END) { if (localReason == ShutdownReason.SHARD_END) {
recordProcessorCheckpointer recordProcessorCheckpointer
.sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue());
recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
} }
log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}", log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}",
shardInfo.shardId(), shardInfo.concurrencyToken(), reason); shardInfo.shardId(), shardInfo.concurrencyToken(), localReason);
final ShutdownInput shutdownInput = ShutdownInput.builder().shutdownReason(reason) final ShutdownInput shutdownInput = ShutdownInput.builder().shutdownReason(localReason)
.checkpointer(recordProcessorCheckpointer).build(); .checkpointer(recordProcessorCheckpointer).build();
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
try { try {
if (reason == ShutdownReason.SHARD_END) { if (localReason == ShutdownReason.SHARD_END) {
shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue(); ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue();
if (lastCheckpointValue == null if (lastCheckpointValue == null
@ -137,11 +151,11 @@ public class ShutdownTask implements ConsumerTask {
MetricsUtil.addLatency(scope, RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY); MetricsUtil.addLatency(scope, RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY);
} }
if (reason == ShutdownReason.SHARD_END) { if (localReason == ShutdownReason.SHARD_END) {
log.debug("Looking for child shards of shard {}", shardInfo.shardId()); log.debug("Looking for child shards of shard {}", shardInfo.shardId());
// create leases for the child shards // create leases for the child shards
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(allShards, shardDetector, leaseRefresher, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(),
initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope); initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards);
log.debug("Finished checking for child shards of shard {}", shardInfo.shardId()); log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
} }
@ -183,17 +197,26 @@ public class ShutdownTask implements ConsumerTask {
return reason; return reason;
} }
private boolean isRealShardEnd(List<Shard> shards) { private boolean isShardInContextParentOfAny(List<Shard> shards) {
boolean realShardEnd = false;
for(Shard shard : shards) { for(Shard shard : shards) {
if(shard.parentShardId() != null && shard.parentShardId().equals(shardInfo.shardId()) if (isChildShardOfShardInContext(shard)) {
|| shard.adjacentParentShardId() != null && shard.adjacentParentShardId().equals(shardInfo.shardId())) { return true;
realShardEnd = true;
break;
} }
} }
return realShardEnd; return false;
}
private boolean isChildShardOfShardInContext(Shard shard) {
return (StringUtils.equals(shard.parentShardId(), shardInfo.shardId())
|| StringUtils.equals(shard.adjacentParentShardId(), shardInfo.shardId()));
}
private void dropLease() {
Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId());
leaseCoordinator.dropLease(currentLease);
if(currentLease != null) {
log.warn("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey());
}
} }
} }

View file

@ -171,6 +171,9 @@ public class HierarchicalShardSyncerTest {
testCheckAndCreateLeasesForShardsIfMissing(INITIAL_POSITION_LATEST); testCheckAndCreateLeasesForShardsIfMissing(INITIAL_POSITION_LATEST);
} }
/**
* Test checkAndCreateLeaseForNewShards while not providing a pre-fetched list of shards
*/
@Test @Test
public void testCheckAndCreateLeasesForShardsIfMissingAtLatest() throws Exception { public void testCheckAndCreateLeasesForShardsIfMissingAtLatest() throws Exception {
final List<Shard> shards = constructShardListForGraphA(); final List<Shard> shards = constructShardListForGraphA();
@ -205,6 +208,74 @@ public class HierarchicalShardSyncerTest {
} }
/**
* Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards()
* should never be called.
*/
@Test
public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception {
final List<Shard> latestShards = constructShardListForGraphA();
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(latestShards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE, latestShards);
final Set<String> expectedShardIds = new HashSet<>(
Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"));
final List<Lease> requestLeases = leaseCaptor.getAllValues();
final Set<String> requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet());
assertThat(requestLeases.size(), equalTo(expectedShardIds.size()));
assertThat(requestLeaseKeys, equalTo(expectedShardIds));
assertThat(extendedSequenceNumbers.size(), equalTo(1));
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector, never()).listShards();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
/**
* Test checkAndCreateLeaseForNewShards with an empty list of shards. In this scenario, shardDetector.listShards()
* should never be called.
*/
@Test
public void testCheckAndCreateLeasesForShardsWithEmptyShardList() throws Exception {
final List<Shard> shards = constructShardListForGraphA();
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE, new ArrayList<Shard>());
final Set<String> expectedShardIds = new HashSet<>();
final List<Lease> requestLeases = leaseCaptor.getAllValues();
final Set<String> requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet());
assertThat(requestLeases.size(), equalTo(expectedShardIds.size()));
assertThat(extendedSequenceNumbers.size(), equalTo(0));
verify(shardDetector, never()).listShards();
verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
@Test @Test
public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception { public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception {
final List<Shard> shards = constructShardListForGraphA(); final List<Shard> shards = constructShardListForGraphA();

View file

@ -43,6 +43,7 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
@ -55,6 +56,8 @@ import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
import javax.swing.*;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class ConsumerStatesTest { public class ConsumerStatesTest {
private static final String STREAM_NAME = "TestStream"; private static final String STREAM_NAME = "TestStream";
@ -73,6 +76,8 @@ public class ConsumerStatesTest {
@Mock @Mock
private ShardInfo shardInfo; private ShardInfo shardInfo;
@Mock @Mock
private LeaseCoordinator leaseCoordinator;
@Mock
private LeaseRefresher leaseRefresher; private LeaseRefresher leaseRefresher;
@Mock @Mock
private Checkpointer checkpointer; private Checkpointer checkpointer;
@ -109,7 +114,7 @@ public class ConsumerStatesTest {
@Before @Before
public void setup() { public void setup() {
argument = new ShardConsumerArgument(shardInfo, STREAM_NAME, leaseRefresher, executorService, recordsPublisher, argument = new ShardConsumerArgument(shardInfo, STREAM_NAME, leaseCoordinator, executorService, recordsPublisher,
shardRecordProcessor, checkpointer, recordProcessorCheckpointer, parentShardPollIntervalMillis, shardRecordProcessor, checkpointer, recordProcessorCheckpointer, parentShardPollIntervalMillis,
taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis,
maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis,
@ -127,6 +132,7 @@ public class ConsumerStatesTest {
@Test @Test
public void blockOnParentStateTest() { public void blockOnParentStateTest() {
ConsumerState state = ShardConsumerState.WAITING_ON_PARENT_SHARDS.consumerState(); ConsumerState state = ShardConsumerState.WAITING_ON_PARENT_SHARDS.consumerState();
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
ConsumerTask task = state.createTask(argument, consumer, null); ConsumerTask task = state.createTask(argument, consumer, null);
@ -309,7 +315,7 @@ public class ConsumerStatesTest {
assertThat(task, shutdownTask(ShardRecordProcessorCheckpointer.class, "recordProcessorCheckpointer", assertThat(task, shutdownTask(ShardRecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
equalTo(recordProcessorCheckpointer))); equalTo(recordProcessorCheckpointer)));
assertThat(task, shutdownTask(ShutdownReason.class, "reason", equalTo(reason))); assertThat(task, shutdownTask(ShutdownReason.class, "reason", equalTo(reason)));
assertThat(task, shutdownTask(LEASE_REFRESHER_CLASS, "leaseRefresher", equalTo(leaseRefresher))); assertThat(task, shutdownTask(LeaseCoordinator.class, "leaseCoordinator", equalTo(leaseCoordinator)));
assertThat(task, shutdownTask(InitialPositionInStreamExtended.class, "initialPositionInStream", assertThat(task, shutdownTask(InitialPositionInStreamExtended.class, "initialPositionInStream",
equalTo(initialPositionInStream))); equalTo(initialPositionInStream)));
assertThat(task, assertThat(task,

View file

@ -42,6 +42,8 @@ import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
@ -83,6 +85,8 @@ public class ShutdownTaskTest {
@Mock @Mock
private LeaseRefresher leaseRefresher; private LeaseRefresher leaseRefresher;
@Mock @Mock
private LeaseCoordinator leaseCoordinator;
@Mock
private ShardDetector shardDetector; private ShardDetector shardDetector;
@Mock @Mock
private HierarchicalShardSyncer hierarchicalShardSyncer; private HierarchicalShardSyncer hierarchicalShardSyncer;
@ -99,12 +103,13 @@ public class ShutdownTaskTest {
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY); hierarchicalShardSyncer, NULL_METRICS_FACTORY);
} }
/** /**
* Test method for {@link ShutdownTask#call()}. * Test method for {@link ShutdownTask#call()}.
* This test is for the scenario that customer doesn't implement checkpoint in their implementation
*/ */
@Test @Test
public final void testCallWhenApplicationDoesNotCheckpoint() { public final void testCallWhenApplicationDoesNotCheckpoint() {
@ -118,19 +123,21 @@ public class ShutdownTaskTest {
/** /**
* Test method for {@link ShutdownTask#call()}. * Test method for {@link ShutdownTask#call()}.
* This test is for the scenario that checkAndCreateLeaseForNewShards throws an exception.
*/ */
@Test @Test
public final void testCallWhenSyncingShardsThrows() throws Exception { public final void testCallWhenSyncingShardsThrows() throws Exception {
List<Shard> shards = constructShardListGraphA(); List<Shard> latestShards = constructShardListGraphA();
when(shardDetector.listShards()).thenReturn(shards); when(shardDetector.listShards()).thenReturn(latestShards);
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
doAnswer((invocation) -> { doAnswer((invocation) -> {
throw new KinesisClientLibIOException("KinesisClientLibIOException"); throw new KinesisClientLibIOException("KinesisClientLibIOException");
}).when(hierarchicalShardSyncer) }).when(hierarchicalShardSyncer)
.checkAndCreateLeaseForNewShards(shards, shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON, .checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards,
NULL_METRICS_FACTORY.createMetrics()); NULL_METRICS_FACTORY.createMetrics(), latestShards);
final TaskResult result = task.call(); final TaskResult result = task.call();
assertNotNull(result.getException()); assertNotNull(result.getException());
@ -142,6 +149,7 @@ public class ShutdownTaskTest {
/** /**
* Test method for {@link ShutdownTask#call()}. * Test method for {@link ShutdownTask#call()}.
* This test is for the scenario that ShutdownTask is created for ShardConsumer reaching the Shard End.
*/ */
@Test @Test
public final void testCallWhenTrueShardEnd() { public final void testCallWhenTrueShardEnd() {
@ -149,7 +157,7 @@ public class ShutdownTaskTest {
ExtendedSequenceNumber.LATEST); ExtendedSequenceNumber.LATEST);
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY); hierarchicalShardSyncer, NULL_METRICS_FACTORY);
when(shardDetector.listShards()).thenReturn(constructShardListGraphA()); when(shardDetector.listShards()).thenReturn(constructShardListGraphA());
@ -161,10 +169,12 @@ public class ShutdownTaskTest {
verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
verify(shardDetector, times(1)).listShards(); verify(shardDetector, times(1)).listShards();
verify(leaseCoordinator, never()).getAssignments();
} }
/** /**
* Test method for {@link ShutdownTask#call()}. * Test method for {@link ShutdownTask#call()}.
* This test is for the scenario that a ShutdownTask is created for detecting a false Shard End.
*/ */
@Test @Test
public final void testCallWhenFalseShardEnd() { public final void testCallWhenFalseShardEnd() {
@ -172,7 +182,7 @@ public class ShutdownTaskTest {
ExtendedSequenceNumber.LATEST); ExtendedSequenceNumber.LATEST);
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY); hierarchicalShardSyncer, NULL_METRICS_FACTORY);
when(shardDetector.listShards()).thenReturn(constructShardListGraphA()); when(shardDetector.listShards()).thenReturn(constructShardListGraphA());
@ -183,10 +193,12 @@ public class ShutdownTaskTest {
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build()); verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build());
verify(shardDetector, times(1)).listShards(); verify(shardDetector, times(1)).listShards();
verify(leaseCoordinator).getCurrentlyHeldLease(shardInfo.shardId());
} }
/** /**
* Test method for {@link ShutdownTask#call()}. * Test method for {@link ShutdownTask#call()}.
* This test is for the scenario that a ShutdownTask is created for the ShardConsumer losing the lease.
*/ */
@Test @Test
public final void testCallWhenLeaseLost() { public final void testCallWhenLeaseLost() {
@ -194,7 +206,7 @@ public class ShutdownTaskTest {
ExtendedSequenceNumber.LATEST); ExtendedSequenceNumber.LATEST);
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
LEASE_LOST_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, LEASE_LOST_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY); hierarchicalShardSyncer, NULL_METRICS_FACTORY);
when(shardDetector.listShards()).thenReturn(constructShardListGraphA()); when(shardDetector.listShards()).thenReturn(constructShardListGraphA());
@ -205,6 +217,7 @@ public class ShutdownTaskTest {
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build()); verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build());
verify(shardDetector, never()).listShards(); verify(shardDetector, never()).listShards();
verify(leaseCoordinator, never()).getAssignments();
} }
/** /**

View file

@ -63,11 +63,11 @@
<distributionManagement> <distributionManagement>
<snapshotRepository> <snapshotRepository>
<id>ossrh</id> <id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url> <url>https://aws.oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository> </snapshotRepository>
<repository> <repository>
<id>ossrh</id> <id>ossrh</id>
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url> <url>https://aws.oss.sonatype.org/service/local/staging/deploy/maven2/</url>
</repository> </repository>
</distributionManagement> </distributionManagement>
@ -114,7 +114,7 @@
<extensions>true</extensions> <extensions>true</extensions>
<configuration> <configuration>
<serverId>sonatype-nexus-staging</serverId> <serverId>sonatype-nexus-staging</serverId>
<nexusUrl>https://oss.sonatype.org</nexusUrl> <nexusUrl>https://aws.oss.sonatype.org</nexusUrl>
<autoReleaseAfterClose>false</autoReleaseAfterClose> <autoReleaseAfterClose>false</autoReleaseAfterClose>
</configuration> </configuration>
</plugin> </plugin>