Checkstyle: tightened LineLength restriction from 170 to 150. (#1158)
This commit is contained in:
parent
42eb753d62
commit
8d1ee6b5e1
22 changed files with 157 additions and 167 deletions
|
|
@ -32,7 +32,8 @@ public class V2CredentialWrapper implements AwsCredentialsProvider {
|
|||
public AwsCredentials resolveCredentials() {
|
||||
AWSCredentials current = oldCredentialsProvider.getCredentials();
|
||||
if (current instanceof AWSSessionCredentials) {
|
||||
return AwsSessionCredentials.create(current.getAWSAccessKeyId(), current.getAWSSecretKey(), ((AWSSessionCredentials) current).getSessionToken());
|
||||
return AwsSessionCredentials.create(current.getAWSAccessKeyId(), current.getAWSSecretKey(),
|
||||
((AWSSessionCredentials) current).getSessionToken());
|
||||
}
|
||||
return new AwsCredentials() {
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -36,8 +36,8 @@ public class AWSCredentialsProviderPropertyValueDecoderTest {
|
|||
private static final String TEST_ACCESS_KEY_ID = "123";
|
||||
private static final String TEST_SECRET_KEY = "456";
|
||||
|
||||
private String credentialName1 = "software.amazon.kinesis.multilang.config.AWSCredentialsProviderPropertyValueDecoderTest$AlwaysSucceedCredentialsProvider";
|
||||
private String credentialName2 = "software.amazon.kinesis.multilang.config.AWSCredentialsProviderPropertyValueDecoderTest$ConstructorCredentialsProvider";
|
||||
private final String credentialName1 = AlwaysSucceedCredentialsProvider.class.getName();
|
||||
private final String credentialName2 = ConstructorCredentialsProvider.class.getName();
|
||||
private AWSCredentialsProviderPropertyValueDecoder decoder = new AWSCredentialsProviderPropertyValueDecoder();
|
||||
|
||||
@ToString
|
||||
|
|
|
|||
|
|
@ -36,35 +36,25 @@ import com.amazonaws.auth.AWSCredentialsProvider;
|
|||
import com.amazonaws.auth.BasicAWSCredentials;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
|
||||
import software.amazon.kinesis.common.InitialPositionInStream;
|
||||
import software.amazon.kinesis.metrics.MetricsLevel;
|
||||
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class KinesisClientLibConfiguratorTest {
|
||||
|
||||
private String credentialName1 = "software.amazon.kinesis.multilang.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProvider";
|
||||
private String credentialName2 = "software.amazon.kinesis.multilang.config.KinesisClientLibConfiguratorTest$AlwaysFailCredentialsProvider";
|
||||
private String credentialNameKinesis = "software.amazon.kinesis.multilang.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderKinesis";
|
||||
private String credentialNameDynamoDB = "software.amazon.kinesis.multilang.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderDynamoDB";
|
||||
private String credentialNameCloudWatch = "software.amazon.kinesis.multilang.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderCloudWatch";
|
||||
private KinesisClientLibConfigurator configurator = new KinesisClientLibConfigurator();
|
||||
|
||||
@Rule
|
||||
public final ExpectedException thrown = ExpectedException.none();
|
||||
|
||||
@Mock
|
||||
private ShardRecordProcessorFactory shardRecordProcessorFactory;
|
||||
private final String credentialName1 = AlwaysSucceedCredentialsProvider.class.getName();
|
||||
private final String credentialName2 = AlwaysFailCredentialsProvider.class.getName();
|
||||
private final String credentialNameKinesis = AlwaysSucceedCredentialsProviderKinesis.class.getName();
|
||||
private final String credentialNameDynamoDB = AlwaysSucceedCredentialsProviderDynamoDB.class.getName();
|
||||
private final String credentialNameCloudWatch = AlwaysSucceedCredentialsProviderCloudWatch.class.getName();
|
||||
private final KinesisClientLibConfigurator configurator = new KinesisClientLibConfigurator();
|
||||
|
||||
@Test
|
||||
public void testWithBasicSetup() {
|
||||
|
|
@ -241,52 +231,32 @@ public class KinesisClientLibConfiguratorTest {
|
|||
"AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 123",
|
||||
"initialPositionInStream = TriM_Horizon", "maxGetRecordsThreadPool = 0",
|
||||
"retryGetRecordsInSeconds = 0" }, '\n');
|
||||
InputStream input = new ByteArrayInputStream(test.getBytes());
|
||||
|
||||
try {
|
||||
configurator.getConfiguration(input);
|
||||
} catch (Exception e) {
|
||||
fail("Don't expect to fail on invalid variable value");
|
||||
|
||||
}
|
||||
getConfiguration(test);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithInvalidIntValue() {
|
||||
String test = StringUtils.join(new String[] { "streamName = a", "applicationName = b",
|
||||
"AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = 100nf" }, '\n');
|
||||
InputStream input = new ByteArrayInputStream(test.getBytes());
|
||||
|
||||
try {
|
||||
configurator.getConfiguration(input);
|
||||
} catch (Exception e) {
|
||||
fail("Don't expect to fail on invalid variable value");
|
||||
}
|
||||
getConfiguration(test);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithNegativeIntValue() {
|
||||
String test = StringUtils.join(new String[] { "streamName = a", "applicationName = b",
|
||||
"AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = -12" }, '\n');
|
||||
InputStream input = new ByteArrayInputStream(test.getBytes());
|
||||
|
||||
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
|
||||
try {
|
||||
configurator.getConfiguration(input);
|
||||
} catch (Exception e) {
|
||||
fail("Don't expect to fail on invalid variable value");
|
||||
}
|
||||
getConfiguration(test);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testWithMissingCredentialsProvider() {
|
||||
|
||||
String test = StringUtils.join(new String[] { "streamName = a", "applicationName = b", "workerId = 123",
|
||||
"failoverTimeMillis = 100", "shardSyncIntervalMillis = 500" }, '\n');
|
||||
InputStream input = new ByteArrayInputStream(test.getBytes());
|
||||
|
||||
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
|
||||
configurator.getConfiguration(input);
|
||||
getConfiguration(test);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -295,8 +265,7 @@ public class KinesisClientLibConfiguratorTest {
|
|||
new String[] { "streamName = a", "applicationName = b", "AWSCredentialsProvider = " + credentialName1,
|
||||
"failoverTimeMillis = 100", "shardSyncIntervalMillis = 500" },
|
||||
'\n');
|
||||
InputStream input = new ByteArrayInputStream(test.getBytes());
|
||||
MultiLangDaemonConfiguration config = configurator.getConfiguration(input);
|
||||
MultiLangDaemonConfiguration config = getConfiguration(test);
|
||||
|
||||
// if workerId is not provided, configurator should assign one for it automatically
|
||||
assertNotNull(config.getWorkerIdentifier());
|
||||
|
|
@ -311,14 +280,11 @@ public class KinesisClientLibConfiguratorTest {
|
|||
"workerId = 123",
|
||||
"failoverTimeMillis = 100" },
|
||||
'\n');
|
||||
InputStream input = new ByteArrayInputStream(test.getBytes());
|
||||
|
||||
configurator.getConfiguration(input);
|
||||
getConfiguration(test);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testWithEmptyStreamNameAndMissingStreamArn() {
|
||||
|
||||
String test = StringUtils.join(new String[] {
|
||||
"applicationName = b",
|
||||
"AWSCredentialsProvider = " + credentialName1,
|
||||
|
|
@ -327,18 +293,14 @@ public class KinesisClientLibConfiguratorTest {
|
|||
"streamName = ",
|
||||
"streamArn = "},
|
||||
'\n');
|
||||
InputStream input = new ByteArrayInputStream(test.getBytes());
|
||||
|
||||
configurator.getConfiguration(input);
|
||||
getConfiguration(test);
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void testWithMissingApplicationName() {
|
||||
|
||||
String test = StringUtils.join(new String[] { "streamName = a", "AWSCredentialsProvider = " + credentialName1,
|
||||
"workerId = 123", "failoverTimeMillis = 100" }, '\n');
|
||||
InputStream input = new ByteArrayInputStream(test.getBytes());
|
||||
configurator.getConfiguration(input);
|
||||
getConfiguration(test);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -347,11 +309,10 @@ public class KinesisClientLibConfiguratorTest {
|
|||
new String[] { "streamName = a", "applicationName = b", "AWSCredentialsProvider = " + credentialName2,
|
||||
"failoverTimeMillis = 100", "shardSyncIntervalMillis = 500" },
|
||||
'\n');
|
||||
InputStream input = new ByteArrayInputStream(test.getBytes());
|
||||
MultiLangDaemonConfiguration config = getConfiguration(test);
|
||||
|
||||
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
|
||||
try {
|
||||
MultiLangDaemonConfiguration config = configurator.getConfiguration(input);
|
||||
config.getKinesisCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials();
|
||||
fail("expect failure with wrong credentials provider");
|
||||
} catch (Exception e) {
|
||||
|
|
@ -367,25 +328,12 @@ public class KinesisClientLibConfiguratorTest {
|
|||
"AWSCredentialsProviderDynamoDB = " + credentialNameDynamoDB,
|
||||
"AWSCredentialsProviderCloudWatch = " + credentialNameCloudWatch, "failoverTimeMillis = 100",
|
||||
"shardSyncIntervalMillis = 500" }, '\n');
|
||||
InputStream input = new ByteArrayInputStream(test.getBytes());
|
||||
|
||||
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
|
||||
MultiLangDaemonConfiguration config = configurator.getConfiguration(input);
|
||||
try {
|
||||
config.getKinesisCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials();
|
||||
} catch (Exception e) {
|
||||
fail("Kinesis credential providers should not fail.");
|
||||
}
|
||||
try {
|
||||
config.getDynamoDBCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials();
|
||||
} catch (Exception e) {
|
||||
fail("DynamoDB credential providers should not fail.");
|
||||
}
|
||||
try {
|
||||
config.getCloudWatchCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials();
|
||||
} catch (Exception e) {
|
||||
fail("CloudWatch credential providers should not fail.");
|
||||
}
|
||||
final MultiLangDaemonConfiguration config = getConfiguration(test);
|
||||
config.getKinesisCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials();
|
||||
config.getDynamoDBCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials();
|
||||
config.getCloudWatchCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials();
|
||||
}
|
||||
|
||||
// TODO: fix this test
|
||||
|
|
@ -396,17 +344,10 @@ public class KinesisClientLibConfiguratorTest {
|
|||
"AWSCredentialsProviderDynamoDB = " + credentialName2,
|
||||
"AWSCredentialsProviderCloudWatch = " + credentialName2, "failoverTimeMillis = 100",
|
||||
"shardSyncIntervalMillis = 500" }, '\n');
|
||||
InputStream input = new ByteArrayInputStream(test.getBytes());
|
||||
|
||||
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
|
||||
|
||||
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
|
||||
MultiLangDaemonConfiguration config = configurator.getConfiguration(input);
|
||||
try {
|
||||
config.getKinesisCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials();
|
||||
} catch (Exception e) {
|
||||
fail("Kinesis credential providers should not fail.");
|
||||
}
|
||||
final MultiLangDaemonConfiguration config = getConfiguration(test);
|
||||
config.getKinesisCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials();
|
||||
try {
|
||||
config.getDynamoDBCredentialsProvider().build(AwsCredentialsProvider.class).resolveCredentials();
|
||||
fail("DynamoDB credential providers should fail.");
|
||||
|
|
@ -503,7 +444,6 @@ public class KinesisClientLibConfiguratorTest {
|
|||
|
||||
private MultiLangDaemonConfiguration getConfiguration(String configString) {
|
||||
InputStream input = new ByteArrayInputStream(configString.getBytes());
|
||||
MultiLangDaemonConfiguration config = configurator.getConfiguration(input);
|
||||
return config;
|
||||
return configurator.getConfiguration(input);
|
||||
}
|
||||
}
|
||||
|
|
@ -97,6 +97,7 @@ import software.amazon.kinesis.retrieval.RetrievalConfig;
|
|||
import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder;
|
||||
|
||||
import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.StreamsLeasesDeletionType;
|
||||
import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION;
|
||||
|
||||
/**
|
||||
*
|
||||
|
|
@ -489,7 +490,7 @@ public class Scheduler implements Runnable {
|
|||
}
|
||||
};
|
||||
|
||||
if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION) {
|
||||
if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION) {
|
||||
// Now, we are identifying the stale/old streams and enqueuing it for deferred deletion.
|
||||
// It is assumed that all the workers will always have the latest and consistent snapshot of streams
|
||||
// from the multiStreamTracker.
|
||||
|
|
@ -521,7 +522,8 @@ public class Scheduler implements Runnable {
|
|||
if (!newStreamConfigMap.containsKey(streamIdentifier)) {
|
||||
if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) {
|
||||
log.info(
|
||||
"Found old/deleted stream : {}. Triggering shard sync. Removing from tracked active streams.", streamIdentifier);
|
||||
"Found old/deleted stream : {}. Triggering shard sync. Removing from tracked active streams.",
|
||||
streamIdentifier);
|
||||
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(
|
||||
currentStreamConfigMap.get(streamIdentifier));
|
||||
shardSyncTaskManager.submitShardSyncTask();
|
||||
|
|
@ -541,10 +543,13 @@ public class Scheduler implements Runnable {
|
|||
// Now let's scan the streamIdentifiersForLeaseCleanup eligible for deferred deletion and delete them.
|
||||
// StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and
|
||||
// the streamIdentifiersForLeaseCleanup are not present in the latest snapshot.
|
||||
final Map<Boolean, Set<StreamIdentifier>> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet().stream().collect(Collectors
|
||||
.partitioningBy(newStreamConfigMap::containsKey, Collectors.toSet()));
|
||||
final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier ->
|
||||
Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis())
|
||||
final Map<Boolean, Set<StreamIdentifier>> staleStreamIdDeletionDecisionMap =
|
||||
staleStreamDeletionMap.keySet().stream().collect(
|
||||
Collectors.partitioningBy(newStreamConfigMap::containsKey, Collectors.toSet()));
|
||||
final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false)
|
||||
.stream().filter(streamIdentifier ->
|
||||
Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now())
|
||||
.toMillis() >= waitPeriodToDeleteOldStreams.toMillis())
|
||||
.collect(Collectors.toSet());
|
||||
// These are the streams which are deleted in Kinesis and we encounter resource not found during
|
||||
// shardSyncTask. This is applicable in MultiStreamMode only, in case of SingleStreamMode, store will
|
||||
|
|
|
|||
|
|
@ -92,7 +92,8 @@ public class HierarchicalShardSyncer {
|
|||
this(isMultiStreamMode, streamIdentifier, null);
|
||||
}
|
||||
|
||||
public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier, final DeletedStreamListProvider deletedStreamListProvider) {
|
||||
public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier,
|
||||
final DeletedStreamListProvider deletedStreamListProvider) {
|
||||
this.isMultiStreamMode = isMultiStreamMode;
|
||||
this.streamIdentifier = streamIdentifier;
|
||||
this.deletedStreamListProvider = deletedStreamListProvider;
|
||||
|
|
@ -191,7 +192,9 @@ public class HierarchicalShardSyncer {
|
|||
if (!CollectionUtils.isNullOrEmpty(inconsistentShardIds)) {
|
||||
final String ids = StringUtils.join(inconsistentShardIds, ' ');
|
||||
throw new KinesisClientLibIOException(String.format(
|
||||
// CHECKSTYLE.OFF: LineLength
|
||||
"%d open child shards (%s) are inconsistent. This can happen due to a race condition between describeStream and a reshard operation.",
|
||||
// CHECKSTYLE.ON: LineLength
|
||||
inconsistentShardIds.size(), ids));
|
||||
}
|
||||
}
|
||||
|
|
@ -564,7 +567,8 @@ public class HierarchicalShardSyncer {
|
|||
return parentShardIds;
|
||||
}
|
||||
|
||||
public synchronized Lease createLeaseForChildShard(final ChildShard childShard, final StreamIdentifier streamIdentifier) throws InvalidStateException {
|
||||
public synchronized Lease createLeaseForChildShard(final ChildShard childShard,
|
||||
final StreamIdentifier streamIdentifier) throws InvalidStateException {
|
||||
final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, streamIdentifier);
|
||||
|
||||
return multiStreamArgs.isMultiStreamMode() ? newKCLMultiStreamLeaseForChildShard(childShard, streamIdentifier)
|
||||
|
|
@ -583,7 +587,8 @@ public class HierarchicalShardSyncer {
|
|||
if (!CollectionUtils.isNullOrEmpty(childShard.parentShards())) {
|
||||
newLease.parentShardIds(childShard.parentShards());
|
||||
} else {
|
||||
throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.shardId() + "because parent shards cannot be found.");
|
||||
throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.shardId()
|
||||
+ " because parent shards cannot be found.");
|
||||
}
|
||||
newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
newLease.ownerSwitchesSinceCheckpoint(0L);
|
||||
|
|
@ -591,13 +596,15 @@ public class HierarchicalShardSyncer {
|
|||
return newLease;
|
||||
}
|
||||
|
||||
private static Lease newKCLMultiStreamLeaseForChildShard(final ChildShard childShard, final StreamIdentifier streamIdentifier) throws InvalidStateException {
|
||||
private static Lease newKCLMultiStreamLeaseForChildShard(final ChildShard childShard,
|
||||
final StreamIdentifier streamIdentifier) throws InvalidStateException {
|
||||
MultiStreamLease newLease = new MultiStreamLease();
|
||||
newLease.leaseKey(MultiStreamLease.getLeaseKey(streamIdentifier.serialize(), childShard.shardId()));
|
||||
if (!CollectionUtils.isNullOrEmpty(childShard.parentShards())) {
|
||||
newLease.parentShardIds(childShard.parentShards());
|
||||
} else {
|
||||
throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.shardId() + "because parent shards cannot be found.");
|
||||
throw new InvalidStateException("Unable to populate new lease for child shard " + childShard.shardId()
|
||||
+ " because parent shards cannot be found.");
|
||||
}
|
||||
newLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
newLease.ownerSwitchesSinceCheckpoint(0L);
|
||||
|
|
@ -612,7 +619,6 @@ public class HierarchicalShardSyncer {
|
|||
* Note: Package level access only for testing purposes
|
||||
*
|
||||
* @param shard
|
||||
* @return
|
||||
*/
|
||||
private static Lease newKCLLease(final Shard shard) {
|
||||
Lease newLease = new Lease();
|
||||
|
|
|
|||
|
|
@ -208,7 +208,8 @@ public class LeaseCleanupManager {
|
|||
log.warn("Unable to cleanup lease for shard {} in {}", shardInfo.shardId(), streamIdentifier.streamName(), e);
|
||||
}
|
||||
} else {
|
||||
log.info("Lease not present in lease table while cleaning the shard {} of {}", shardInfo.shardId(), streamIdentifier.streamName());
|
||||
log.info("Lease not present in lease table while cleaning the shard {} of {}",
|
||||
shardInfo.shardId(), streamIdentifier.streamName());
|
||||
cleanedUpCompletedLease = true;
|
||||
}
|
||||
}
|
||||
|
|
@ -232,14 +233,17 @@ public class LeaseCleanupManager {
|
|||
|
||||
// A lease that ended with SHARD_END from ResourceNotFoundException is safe to delete if it no longer exists in the
|
||||
// stream (known explicitly from ResourceNotFound being thrown when processing this shard),
|
||||
private boolean cleanupLeaseForGarbageShard(Lease lease, Throwable e) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
private boolean cleanupLeaseForGarbageShard(Lease lease, Throwable e)
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
log.warn("Deleting lease {} as it is not present in the stream.", lease, e);
|
||||
leaseCoordinator.leaseRefresher().deleteLease(lease);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the all of the parent shards for a given lease have an ongoing lease. If any one parent still has a lease, return false. Otherwise return true
|
||||
* Check if the all of the parent shards for a given lease have an ongoing lease. If any one parent still has a
|
||||
* lease, return false. Otherwise return true
|
||||
*
|
||||
* @param lease
|
||||
* @param shardInfo
|
||||
* @return
|
||||
|
|
@ -247,7 +251,8 @@ public class LeaseCleanupManager {
|
|||
* @throws ProvisionedThroughputException
|
||||
* @throws InvalidStateException
|
||||
*/
|
||||
private boolean allParentShardLeasesDeleted(Lease lease, ShardInfo shardInfo) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
private boolean allParentShardLeasesDeleted(Lease lease, ShardInfo shardInfo)
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
for (String parentShard : lease.parentShardIds()) {
|
||||
final Lease parentLease = leaseCoordinator.leaseRefresher().getLease(ShardInfo.getLeaseKey(shardInfo, parentShard));
|
||||
|
||||
|
|
|
|||
|
|
@ -152,7 +152,8 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
|
|||
final LeaseSerializer serializer, final boolean consistentReads,
|
||||
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout,
|
||||
final BillingMode billingMode) {
|
||||
this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, DefaultSdkAutoConstructList.getInstance());
|
||||
this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout,
|
||||
billingMode, DefaultSdkAutoConstructList.getInstance());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -224,11 +224,13 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
|
|||
public Map<String, AttributeValueUpdate> getDynamoTakeLeaseUpdate(final Lease lease, String owner) {
|
||||
Map<String, AttributeValueUpdate> result = new HashMap<>();
|
||||
|
||||
result.put(LEASE_OWNER_KEY, AttributeValueUpdate.builder().value(DynamoUtils.createAttributeValue(owner)).action(AttributeAction.PUT).build());
|
||||
result.put(LEASE_OWNER_KEY, AttributeValueUpdate.builder().value(DynamoUtils.createAttributeValue(owner))
|
||||
.action(AttributeAction.PUT).build());
|
||||
|
||||
String oldOwner = lease.leaseOwner();
|
||||
if (oldOwner != null && !oldOwner.equals(owner)) {
|
||||
result.put(OWNER_SWITCHES_KEY, AttributeValueUpdate.builder().value(DynamoUtils.createAttributeValue(1L)).action(AttributeAction.ADD).build());
|
||||
result.put(OWNER_SWITCHES_KEY, AttributeValueUpdate.builder().value(DynamoUtils.createAttributeValue(1L))
|
||||
.action(AttributeAction.ADD).build());
|
||||
}
|
||||
|
||||
return result;
|
||||
|
|
@ -257,7 +259,8 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
|
|||
|
||||
if (lease.pendingCheckpoint() != null && !lease.pendingCheckpoint().sequenceNumber().isEmpty()) {
|
||||
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.pendingCheckpoint().sequenceNumber())));
|
||||
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.pendingCheckpoint().subSequenceNumber())));
|
||||
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, putUpdate(DynamoUtils.createAttributeValue(
|
||||
lease.pendingCheckpoint().subSequenceNumber())));
|
||||
} else {
|
||||
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build());
|
||||
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build());
|
||||
|
|
|
|||
|
|
@ -180,7 +180,8 @@ public class ShutdownTask implements ConsumerTask {
|
|||
// Create new lease for the child shards if they don't exist.
|
||||
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
|
||||
// This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception.
|
||||
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
|
||||
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a
|
||||
// shutdown task with empty list of childShards.
|
||||
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
|
||||
if (currentShardLease == null) {
|
||||
throw new InvalidStateException(leaseKey
|
||||
|
|
@ -286,7 +287,8 @@ public class ShutdownTask implements ConsumerTask {
|
|||
for (ChildShard childShard : childShards) {
|
||||
final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId());
|
||||
if (leaseRefresher.getLease(leaseKey) == null) {
|
||||
log.debug("{} - Shard {} - Attempting to create lease for child shard {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey);
|
||||
log.debug("{} - Shard {} - Attempting to create lease for child shard {}",
|
||||
shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey);
|
||||
final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier());
|
||||
final long startTime = System.currentTimeMillis();
|
||||
boolean success = false;
|
||||
|
|
@ -296,7 +298,8 @@ public class ShutdownTask implements ConsumerTask {
|
|||
} finally {
|
||||
MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED);
|
||||
if (leaseToCreate.checkpoint() != null) {
|
||||
final String metricName = leaseToCreate.checkpoint().isSentinelCheckpoint() ? leaseToCreate.checkpoint().sequenceNumber() : "SEQUENCE_NUMBER";
|
||||
final String metricName = leaseToCreate.checkpoint().isSentinelCheckpoint() ?
|
||||
leaseToCreate.checkpoint().sequenceNumber() : "SEQUENCE_NUMBER";
|
||||
MetricsUtil.addSuccess(scope, "CreateLease_" + metricName, true, MetricsLevel.DETAILED);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -230,8 +230,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
subscriber.onNext(recordsRetrieved);
|
||||
}
|
||||
} catch (IllegalStateException e) {
|
||||
|
||||
// CHECKSTYLE.OFF: LineLength
|
||||
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful request details -- {}",
|
||||
// CHECKSTYLE.ON: LineLength
|
||||
streamAndShardId, recordsDeliveryQueue.remainingCapacity(), lastSuccessfulRequestDetails);
|
||||
throw e;
|
||||
} catch (Throwable t) {
|
||||
|
|
@ -382,7 +383,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
// Clear any lingering records in the queue.
|
||||
if (!recordsDeliveryQueue.isEmpty()) {
|
||||
log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of "
|
||||
+ "previous subscription - {}. Last successful request details -- {}", streamAndShardId, subscribeToShardId, lastSuccessfulRequestDetails);
|
||||
+ "previous subscription - {}. Last successful request details -- {}",
|
||||
streamAndShardId, subscribeToShardId, lastSuccessfulRequestDetails);
|
||||
recordsDeliveryQueue.clear();
|
||||
}
|
||||
}
|
||||
|
|
@ -402,7 +404,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
// The ack received for this onNext event will be ignored by the publisher as the global flow object should
|
||||
// be either null or renewed when the ack's flow identifier is evaluated.
|
||||
FanoutRecordsRetrieved response = new FanoutRecordsRetrieved(
|
||||
ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true).childShards(Collections.emptyList()).build(), null,
|
||||
ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true)
|
||||
.childShards(Collections.emptyList()).build(), null,
|
||||
triggeringFlow != null ? triggeringFlow.getSubscribeToShardId() : shardId + "-no-flow-found");
|
||||
subscriber.onNext(response);
|
||||
subscriber.onComplete();
|
||||
|
|
@ -515,7 +518,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
private void updateAvailableQueueSpaceAndRequestUpstream(RecordFlow triggeringFlow) {
|
||||
if (availableQueueSpace <= 0) {
|
||||
log.debug(
|
||||
// CHECKSTYLE.OFF: LineLength
|
||||
"{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement availableQueueSpace to below 0",
|
||||
// CHECKSTYLE.ON: LineLength
|
||||
streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
|
||||
} else {
|
||||
availableQueueSpace--;
|
||||
|
|
@ -544,7 +549,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
|
||||
if (!isActiveFlow(triggeringFlow)) {
|
||||
log.debug(
|
||||
// CHECKSTYLE.OFF: LineLength
|
||||
"{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {} -- Received spurious onComplete from unexpected flow. Ignoring.",
|
||||
// CHECKSTYLE.ON: LineLength
|
||||
streamAndShardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);
|
||||
return;
|
||||
}
|
||||
|
|
@ -801,7 +808,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
throwable.getMessage());
|
||||
if (this.isDisposed) {
|
||||
log.debug(
|
||||
// CHECKSTYLE.OFF: LineLength
|
||||
"{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- This flow has been disposed, not dispatching error. {}: {}",
|
||||
// CHECKSTYLE.ON: LineLength
|
||||
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, throwable.getClass().getName(),
|
||||
throwable.getMessage());
|
||||
this.isErrorDispatched = true;
|
||||
|
|
@ -891,7 +900,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
subscription.cancel();
|
||||
} catch (Throwable t) {
|
||||
log.error(
|
||||
// CHECKSTYLE.OFF: LineLength
|
||||
"{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- Exception while trying to cancel failed subscription: {}",
|
||||
// CHECKSTYLE.ON: LineLength
|
||||
parent.streamAndShardId, connectionStartedAt, subscribeToShardId, t.getMessage(), t);
|
||||
}
|
||||
}
|
||||
|
|
@ -953,12 +964,16 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
|||
if (flow.shouldSubscriptionCancel()) {
|
||||
if (flow.isCancelled) {
|
||||
log.debug(
|
||||
// CHECKSTYLE.OFF: LineLength
|
||||
"{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- Subscription was cancelled before onSubscribe",
|
||||
// CHECKSTYLE.ON: LineLength
|
||||
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
|
||||
}
|
||||
if (flow.isDisposed) {
|
||||
log.debug(
|
||||
// CHECKSTYLE.OFF: LineLength
|
||||
"{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- RecordFlow has been disposed cancelling subscribe",
|
||||
// CHECKSTYLE.ON: LineLength
|
||||
parent.streamAndShardId, connectionStartedAt, subscribeToShardId);
|
||||
}
|
||||
log.debug(
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ import static org.mockito.internal.verification.VerificationModeFactory.atMost;
|
|||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
|
|
@ -176,7 +177,8 @@ public class SchedulerTest {
|
|||
shardRecordProcessorFactory = new TestShardRecordProcessorFactory();
|
||||
|
||||
checkpointConfig = new CheckpointConfig().checkpointFactory(new TestKinesisCheckpointFactory());
|
||||
coordinatorConfig = new CoordinatorConfig(applicationName).parentShardPollIntervalMillis(100L).workerStateChangeListener(workerStateChangeListener);
|
||||
coordinatorConfig = new CoordinatorConfig(applicationName).parentShardPollIntervalMillis(100L)
|
||||
.workerStateChangeListener(workerStateChangeListener);
|
||||
leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName,
|
||||
workerIdentifier).leaseManagementFactory(new TestKinesisLeaseManagementFactory(false, false));
|
||||
lifecycleConfig = new LifecycleConfig();
|
||||
|
|
@ -188,7 +190,8 @@ public class SchedulerTest {
|
|||
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
|
||||
when(shardSyncTaskManager.hierarchicalShardSyncer()).thenReturn(new HierarchicalShardSyncer());
|
||||
when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null));
|
||||
when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(StreamConfig.class), any(MetricsFactory.class))).thenReturn(recordsPublisher);
|
||||
when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(StreamConfig.class),
|
||||
any(MetricsFactory.class))).thenReturn(recordsPublisher);
|
||||
when(shardDetector.streamIdentifier()).thenReturn(mock(StreamIdentifier.class));
|
||||
|
||||
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
|
||||
|
|
@ -645,7 +648,8 @@ public class SchedulerTest {
|
|||
testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(true, null);
|
||||
}
|
||||
|
||||
private final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(boolean expectSyncedStreams, Set<StreamConfig> currentStreamConfigMapOverride)
|
||||
private void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(boolean expectSyncedStreams,
|
||||
Set<StreamConfig> currentStreamConfigMapOverride)
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
|
||||
StreamIdentifier.multiStreamInstance(
|
||||
|
|
@ -1294,21 +1298,20 @@ public class SchedulerTest {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: Upgrade to mockito >= 2.7.13, and use Spy on MultiStreamTracker to directly access the default methods without implementing TestMultiStreamTracker class
|
||||
// TODO: Upgrade to mockito >= 2.7.13, and use Spy on MultiStreamTracker to directly access the default methods
|
||||
// without implementing TestMultiStreamTracker class
|
||||
@NoArgsConstructor
|
||||
private class TestMultiStreamTracker implements MultiStreamTracker {
|
||||
@Override
|
||||
public List<StreamConfig> streamConfigList(){
|
||||
return new ArrayList<StreamConfig>() {{
|
||||
add(new StreamConfig(StreamIdentifier.multiStreamInstance("123456789012:stream1:1"), InitialPositionInStreamExtended.newInitialPosition(
|
||||
InitialPositionInStream.LATEST)));
|
||||
add(new StreamConfig(StreamIdentifier.multiStreamInstance("123456789012:stream2:2"), InitialPositionInStreamExtended.newInitialPosition(
|
||||
InitialPositionInStream.LATEST)));
|
||||
add(new StreamConfig(StreamIdentifier.multiStreamInstance("210987654321:stream1:1"), InitialPositionInStreamExtended.newInitialPosition(
|
||||
InitialPositionInStream.LATEST)));
|
||||
add(new StreamConfig(StreamIdentifier.multiStreamInstance("210987654321:stream2:3"), InitialPositionInStreamExtended.newInitialPosition(
|
||||
InitialPositionInStream.LATEST)));
|
||||
}};
|
||||
public List<StreamConfig> streamConfigList() {
|
||||
final InitialPositionInStreamExtended latest =
|
||||
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
|
||||
|
||||
return Arrays.asList(
|
||||
new StreamConfig(StreamIdentifier.multiStreamInstance("123456789012:stream1:1"), latest),
|
||||
new StreamConfig(StreamIdentifier.multiStreamInstance("123456789012:stream2:2"), latest),
|
||||
new StreamConfig(StreamIdentifier.multiStreamInstance("210987654321:stream1:1"), latest),
|
||||
new StreamConfig(StreamIdentifier.multiStreamInstance("210987654321:stream2:3"), latest));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -141,7 +141,8 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher {
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<Lease> listLeasesForStream(StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||
public List<Lease> listLeasesForStream(StreamIdentifier streamIdentifier)
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||
throwExceptions("listLeasesForStream", ExceptionThrowingLeaseRefresherMethods.LISTLEASESFORSTREAM);
|
||||
|
||||
return leaseRefresher.listLeasesForStream(streamIdentifier);
|
||||
|
|
|
|||
|
|
@ -2257,10 +2257,12 @@ public class HierarchicalShardSyncerTest {
|
|||
testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_AT_TIMESTAMP, shardFilter);
|
||||
}
|
||||
|
||||
public void testEmptyLeaseTableBootstrapUsesListShardsWithFilter(InitialPositionInStreamExtended initialPosition, ShardFilter shardFilter) throws Exception {
|
||||
public void testEmptyLeaseTableBootstrapUsesListShardsWithFilter(InitialPositionInStreamExtended initialPosition,
|
||||
ShardFilter shardFilter) throws Exception {
|
||||
final String shardId0 = "shardId-0";
|
||||
final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null,
|
||||
ShardObjectHelper.newSequenceNumberRange("1", null), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, ShardObjectHelper.MAX_HASH_KEY)));
|
||||
ShardObjectHelper.newSequenceNumberRange("1", null),
|
||||
ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, ShardObjectHelper.MAX_HASH_KEY)));
|
||||
|
||||
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
|
||||
when(shardDetector.listShardsWithFilter(shardFilter)).thenReturn(shards);
|
||||
|
|
@ -2278,8 +2280,10 @@ public class HierarchicalShardSyncerTest {
|
|||
final String shardId0 = "shardId-0";
|
||||
final String shardId1 = "shardId-1";
|
||||
|
||||
final List<Shard> shardsWithLeases = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, ShardObjectHelper.newSequenceNumberRange("1", "2")));
|
||||
final List<Shard> shardsWithoutLeases = Arrays.asList(ShardObjectHelper.newShard(shardId1, null, null, ShardObjectHelper.newSequenceNumberRange("3", "4")));
|
||||
final List<Shard> shardsWithLeases = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null,
|
||||
ShardObjectHelper.newSequenceNumberRange("1", "2")));
|
||||
final List<Shard> shardsWithoutLeases = Arrays.asList(ShardObjectHelper.newShard(shardId1, null, null,
|
||||
ShardObjectHelper.newSequenceNumberRange("3", "4")));
|
||||
|
||||
final List<Lease> currentLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo");
|
||||
|
||||
|
|
@ -2301,8 +2305,10 @@ public class HierarchicalShardSyncerTest {
|
|||
@Test(expected = KinesisClientLibIOException.class)
|
||||
public void testEmptyLeaseTableThrowsExceptionWhenHashRangeIsStillIncompleteAfterRetries() throws Exception {
|
||||
final List<Shard> shardsWithIncompleteHashRange = Arrays.asList(
|
||||
ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("0", "1")),
|
||||
ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("2", "3"))
|
||||
ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
|
||||
ShardObjectHelper.newHashKeyRange("0", "1")),
|
||||
ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"),
|
||||
ShardObjectHelper.newHashKeyRange("2", "3"))
|
||||
);
|
||||
|
||||
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
|
||||
|
|
|
|||
|
|
@ -67,7 +67,8 @@ public class ShardObjectHelper {
|
|||
String parentShardId,
|
||||
String adjacentParentShardId,
|
||||
SequenceNumberRange sequenceNumberRange) {
|
||||
return newShard(shardId, parentShardId, adjacentParentShardId, sequenceNumberRange, HashKeyRange.builder().startingHashKey("1").endingHashKey("100").build());
|
||||
return newShard(shardId, parentShardId, adjacentParentShardId, sequenceNumberRange,
|
||||
HashKeyRange.builder().startingHashKey("1").endingHashKey("100").build());
|
||||
}
|
||||
|
||||
/** Helper method to create a new shard object.
|
||||
|
|
|
|||
|
|
@ -141,7 +141,8 @@ public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest {
|
|||
.withLease("5", "foo")
|
||||
.build();
|
||||
|
||||
// In the current DynamoDBLeaseTaker implementation getAllLeases() gets leases from an internal cache that is built during takeLeases() operation
|
||||
// In the current DynamoDBLeaseTaker implementation getAllLeases() gets leases from an internal cache that is
|
||||
// built during takeLeases() operation
|
||||
assertThat(taker.allLeases().size(), equalTo(0));
|
||||
|
||||
taker.takeLeases();
|
||||
|
|
|
|||
|
|
@ -40,7 +40,6 @@ import org.junit.runner.RunWith;
|
|||
import org.mockito.Mock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||
import software.amazon.awssdk.services.kinesis.model.ChildShard;
|
||||
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
|
||||
import software.amazon.kinesis.common.InitialPositionInStream;
|
||||
|
|
@ -88,12 +87,8 @@ public class ConsumerStatesTest {
|
|||
@Mock
|
||||
private ShutdownNotification shutdownNotification;
|
||||
@Mock
|
||||
private InitialPositionInStreamExtended initialPositionInStream;
|
||||
@Mock
|
||||
private RecordsPublisher recordsPublisher;
|
||||
@Mock
|
||||
private KinesisAsyncClient kinesisClient;
|
||||
@Mock
|
||||
private ShardDetector shardDetector;
|
||||
@Mock
|
||||
private HierarchicalShardSyncer hierarchicalShardSyncer;
|
||||
|
|
@ -121,7 +116,8 @@ public class ConsumerStatesTest {
|
|||
|
||||
@Before
|
||||
public void setup() {
|
||||
argument = new ShardConsumerArgument(shardInfo, StreamIdentifier.singleStreamInstance(STREAM_NAME), leaseCoordinator, executorService, recordsPublisher,
|
||||
argument = new ShardConsumerArgument(shardInfo, StreamIdentifier.singleStreamInstance(STREAM_NAME),
|
||||
leaseCoordinator, executorService, recordsPublisher,
|
||||
shardRecordProcessor, checkpointer, recordProcessorCheckpointer, parentShardPollIntervalMillis,
|
||||
taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis,
|
||||
maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis,
|
||||
|
|
@ -134,7 +130,7 @@ public class ConsumerStatesTest {
|
|||
when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer);
|
||||
}
|
||||
|
||||
private static final Class<LeaseRefresher> LEASE_REFRESHER_CLASS = (Class<LeaseRefresher>) (Class<?>) LeaseRefresher.class;
|
||||
private static final Class<LeaseRefresher> LEASE_REFRESHER_CLASS = LeaseRefresher.class;
|
||||
|
||||
@Test
|
||||
public void blockOnParentStateTest() {
|
||||
|
|
@ -431,7 +427,6 @@ public class ConsumerStatesTest {
|
|||
}
|
||||
}
|
||||
this.matchingField = matching;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -101,7 +101,6 @@ public class ProcessTaskTest {
|
|||
@Mock
|
||||
private GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer;
|
||||
|
||||
|
||||
private static final byte[] TEST_DATA = new byte[] { 1, 2, 3, 4 };
|
||||
|
||||
private final String shardId = "shard-test";
|
||||
|
|
@ -116,7 +115,6 @@ public class ProcessTaskTest {
|
|||
|
||||
private ProcessTask processTask;
|
||||
|
||||
|
||||
@Before
|
||||
public void setUpProcessTask() {
|
||||
when(checkpointer.checkpointer()).thenReturn(mock(Checkpointer.class));
|
||||
|
|
@ -130,7 +128,8 @@ public class ProcessTaskTest {
|
|||
}
|
||||
|
||||
private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput, GlueSchemaRegistryDeserializer deserializer) {
|
||||
return makeProcessTask(processRecordsInput, new AggregatorUtil(), skipShardSyncAtWorkerInitializationIfLeasesExist, new SchemaRegistryDecoder(deserializer));
|
||||
return makeProcessTask(processRecordsInput, new AggregatorUtil(), skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||
new SchemaRegistryDecoder(deserializer));
|
||||
}
|
||||
|
||||
private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput, AggregatorUtil aggregatorUtil,
|
||||
|
|
@ -149,11 +148,8 @@ public class ProcessTaskTest {
|
|||
);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testProcessTaskWithShardEndReached() {
|
||||
|
||||
processTask = makeProcessTask(processRecordsInput);
|
||||
when(processRecordsInput.isAtShardEnd()).thenReturn(true);
|
||||
|
||||
|
|
@ -429,7 +425,8 @@ public class ProcessTaskTest {
|
|||
|
||||
when(processRecordsInput.records()).thenReturn(rawRecords);
|
||||
ProcessTask processTask = makeProcessTask(processRecordsInput, aggregatorUtil, false);
|
||||
ShardRecordProcessorOutcome outcome = testWithRecords(processTask, new ExtendedSequenceNumber(sequenceNumber.subtract(BigInteger.valueOf(100)).toString(), 0L),
|
||||
ShardRecordProcessorOutcome outcome = testWithRecords(processTask,
|
||||
new ExtendedSequenceNumber(sequenceNumber.subtract(BigInteger.valueOf(100)).toString(), 0L),
|
||||
new ExtendedSequenceNumber(sequenceNumber.toString(), 0L));
|
||||
|
||||
assertThat(outcome.processRecordsCall.records(), equalTo(expectedRecords));
|
||||
|
|
@ -645,12 +642,12 @@ public class ProcessTaskTest {
|
|||
}
|
||||
|
||||
private ShardRecordProcessorOutcome testWithRecords(List<KinesisClientRecord> records,
|
||||
ExtendedSequenceNumber lastCheckpointValue, ExtendedSequenceNumber largestPermittedCheckpointValue) {
|
||||
ExtendedSequenceNumber lastCheckpointValue, ExtendedSequenceNumber largestPermittedCheckpointValue) {
|
||||
return testWithRecords(records, lastCheckpointValue, largestPermittedCheckpointValue, new AggregatorUtil());
|
||||
}
|
||||
|
||||
private ShardRecordProcessorOutcome testWithRecords(List<KinesisClientRecord> records, ExtendedSequenceNumber lastCheckpointValue,
|
||||
ExtendedSequenceNumber largestPermittedCheckpointValue, AggregatorUtil aggregatorUtil) {
|
||||
ExtendedSequenceNumber largestPermittedCheckpointValue, AggregatorUtil aggregatorUtil) {
|
||||
when(processRecordsInput.records()).thenReturn(records);
|
||||
return testWithRecords(
|
||||
makeProcessTask(processRecordsInput, aggregatorUtil, skipShardSyncAtWorkerInitializationIfLeasesExist),
|
||||
|
|
@ -658,7 +655,7 @@ public class ProcessTaskTest {
|
|||
}
|
||||
|
||||
private ShardRecordProcessorOutcome testWithRecords(ProcessTask processTask, ExtendedSequenceNumber lastCheckpointValue,
|
||||
ExtendedSequenceNumber largestPermittedCheckpointValue) {
|
||||
ExtendedSequenceNumber largestPermittedCheckpointValue) {
|
||||
when(checkpointer.lastCheckpointValue()).thenReturn(lastCheckpointValue);
|
||||
when(checkpointer.largestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue);
|
||||
processTask.call();
|
||||
|
|
|
|||
|
|
@ -103,7 +103,7 @@ public class FanOutRecordsPublisherTest {
|
|||
private SubscribeToShardEvent batchEvent;
|
||||
|
||||
@Test
|
||||
public void simpleTest() throws Exception {
|
||||
public void testSimple() {
|
||||
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
|
||||
|
||||
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
|
||||
|
|
@ -218,8 +218,10 @@ public class FanOutRecordsPublisherTest {
|
|||
List<KinesisClientRecordMatcher> matchers = records.stream().map(KinesisClientRecordMatcher::new)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
batchEvent = SubscribeToShardEvent.builder().millisBehindLatest(100L).records(records).continuationSequenceNumber(CONTINUATION_SEQUENCE_NUMBER).build();
|
||||
SubscribeToShardEvent invalidEvent = SubscribeToShardEvent.builder().millisBehindLatest(100L).records(records).childShards(Collections.emptyList()).build();
|
||||
batchEvent = SubscribeToShardEvent.builder().millisBehindLatest(100L).records(records)
|
||||
.continuationSequenceNumber(CONTINUATION_SEQUENCE_NUMBER).build();
|
||||
SubscribeToShardEvent invalidEvent = SubscribeToShardEvent.builder().millisBehindLatest(100L)
|
||||
.records(records).childShards(Collections.emptyList()).build();
|
||||
|
||||
captor.getValue().onNext(batchEvent);
|
||||
captor.getValue().onNext(invalidEvent);
|
||||
|
|
@ -238,7 +240,7 @@ public class FanOutRecordsPublisherTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testIfAllEventsReceivedWhenNoTasksRejectedByExecutor() throws Exception {
|
||||
public void testIfAllEventsReceivedWhenNoTasksRejectedByExecutor() {
|
||||
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
|
||||
|
||||
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
|
||||
|
|
@ -414,7 +416,8 @@ public class FanOutRecordsPublisherTest {
|
|||
int totalServicePublisherEvents = 1000;
|
||||
int initialDemand = 0;
|
||||
BackpressureAdheringServicePublisher servicePublisher =
|
||||
new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, initialDemand);
|
||||
new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents,
|
||||
servicePublisherTaskCompletionLatch, initialDemand);
|
||||
|
||||
doNothing().when(publisher).subscribe(captor.capture());
|
||||
|
||||
|
|
@ -848,7 +851,8 @@ public class FanOutRecordsPublisherTest {
|
|||
int totalServicePublisherEvents = 1000;
|
||||
int initialDemand = 9;
|
||||
BackpressureAdheringServicePublisher servicePublisher =
|
||||
new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, initialDemand);
|
||||
new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents,
|
||||
servicePublisherTaskCompletionLatch, initialDemand);
|
||||
|
||||
doNothing().when(publisher).subscribe(captor.capture());
|
||||
|
||||
|
|
@ -941,7 +945,8 @@ public class FanOutRecordsPublisherTest {
|
|||
int totalServicePublisherEvents = 1000;
|
||||
int initialDemand = 11;
|
||||
BackpressureAdheringServicePublisher servicePublisher =
|
||||
new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, initialDemand);
|
||||
new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents,
|
||||
servicePublisherTaskCompletionLatch, initialDemand);
|
||||
|
||||
doNothing().when(publisher).subscribe(captor.capture());
|
||||
|
||||
|
|
|
|||
|
|
@ -126,7 +126,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
|
|||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void testStrategyIsShutdown() throws Exception {
|
||||
public void testStrategyIsShutdown() {
|
||||
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
|
||||
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
|
||||
|
||||
|
|
@ -141,7 +141,8 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
|
|||
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
|
||||
|
||||
when(executorService.isShutdown()).thenReturn(false);
|
||||
when(completionService.submit(any())).thenReturn(blockedFuture).thenThrow(new RejectedExecutionException("Rejected!")).thenReturn(successfulFuture);
|
||||
when(completionService.submit(any())).thenReturn(blockedFuture)
|
||||
.thenThrow(new RejectedExecutionException("Rejected!")).thenReturn(successfulFuture);
|
||||
when(completionService.poll(anyLong(), any())).thenReturn(null).thenReturn(null).thenReturn(successfulFuture);
|
||||
when(successfulFuture.get()).thenReturn(dataFetcherResult);
|
||||
when(successfulFuture.cancel(anyBoolean())).thenReturn(false);
|
||||
|
|
@ -156,7 +157,6 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
|
|||
verify(successfulFuture).cancel(eq(true));
|
||||
verify(blockedFuture).cancel(eq(true));
|
||||
|
||||
|
||||
assertThat(actualResult, equalTo(expectedResponses));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -162,7 +162,8 @@ public class PrefetchRecordsPublisherIntegrationTest {
|
|||
@Test
|
||||
public void testDifferentShardCaches() {
|
||||
final ExecutorService executorService2 = spy(Executors.newFixedThreadPool(1));
|
||||
final KinesisDataFetcher kinesisDataFetcher = spy(new KinesisDataFetcher(kinesisClient, streamName, shardId, MAX_RECORDS_PER_CALL, NULL_METRICS_FACTORY));
|
||||
final KinesisDataFetcher kinesisDataFetcher = spy(new KinesisDataFetcher(kinesisClient, streamName, shardId,
|
||||
MAX_RECORDS_PER_CALL, NULL_METRICS_FACTORY));
|
||||
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy2 =
|
||||
spy(new AsynchronousGetRecordsRetrievalStrategy(kinesisDataFetcher, 5 , 5, shardId));
|
||||
final PrefetchRecordsPublisher recordsPublisher2 = new PrefetchRecordsPublisher(
|
||||
|
|
|
|||
|
|
@ -133,7 +133,8 @@ public class PrefetchRecordsPublisherTest {
|
|||
getRecordsCache = createPrefetchRecordsPublisher(0L);
|
||||
spyQueue = spy(getRecordsCache.getPublisherSession().prefetchRecordsQueue());
|
||||
records = spy(new ArrayList<>());
|
||||
getRecordsResponse = GetRecordsResponse.builder().records(records).nextShardIterator(NEXT_SHARD_ITERATOR).childShards(new ArrayList<>()).build();
|
||||
getRecordsResponse = GetRecordsResponse.builder().records(records).nextShardIterator(NEXT_SHARD_ITERATOR)
|
||||
.childShards(Collections.emptyList()).build();
|
||||
|
||||
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(getRecordsResponse);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@
|
|||
|
||||
<module name="LineLength">
|
||||
<property name="fileExtensions" value="java"/>
|
||||
<property name="max" value="170"/>
|
||||
<property name="max" value="150"/>
|
||||
<property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
|
||||
</module>
|
||||
|
||||
|
|
@ -22,8 +22,8 @@
|
|||
</module>
|
||||
|
||||
<module name="TreeWalker">
|
||||
<module name="AvoidStarImport"/>
|
||||
<module name="ArrayTrailingComma"/>
|
||||
<module name="AvoidStarImport"/>
|
||||
<module name="ConstantName"/>
|
||||
<module name="CovariantEquals"/>
|
||||
<module name="EmptyStatement"/>
|
||||
|
|
|
|||
Loading…
Reference in a new issue