Merged the lastest from upstream

This commit is contained in:
Joshua Morris 2016-10-14 15:26:36 +00:00
commit b05e882105
13 changed files with 297 additions and 32 deletions

View file

@ -50,7 +50,9 @@ public class KinesisClientLibConfigurator {
// Required properties
private static final String PROP_APP_NAME = "applicationName";
private static final String PROP_STREAM_NAME = "streamName";
private static final String PROP_CREDENTIALS_PROVIDER = "AWSCredentialsProvider";
private static final String PROP_CREDENTIALS_PROVIDER_KINESIS = "AWSCredentialsProvider";
private static final String PROP_CREDENTIALS_PROVIDER_DYNAMODB = "AWSCredentialsProviderDynamoDB";
private static final String PROP_CREDENTIALS_PROVIDER_CLOUDWATCH = "AWSCredentialsProviderCloudWatch";
private static final String PROP_WORKER_ID = "workerId";
private Map<Class<?>, IPropertyValueDecoder<?>> classToDecoder;
@ -107,7 +109,7 @@ public class KinesisClientLibConfigurator {
String applicationName = stringValueDecoder.decodeValue(properties.getProperty(PROP_APP_NAME));
String streamName = stringValueDecoder.decodeValue(properties.getProperty(PROP_STREAM_NAME));
AWSCredentialsProvider provider =
awsCPPropGetter.decodeValue(properties.getProperty(PROP_CREDENTIALS_PROVIDER));
awsCPPropGetter.decodeValue(properties.getProperty(PROP_CREDENTIALS_PROVIDER_KINESIS));
if (applicationName == null || applicationName.isEmpty()) {
throw new IllegalArgumentException("Value of applicationName should be explicitly provided.");
@ -116,6 +118,24 @@ public class KinesisClientLibConfigurator {
throw new IllegalArgumentException("Value of streamName should be explicitly provided.");
}
// Decode the DynamoDB credentials provider if it exists. If not use the Kinesis credentials provider.
AWSCredentialsProvider providerDynamoDB;
String propCredentialsProviderDynamoDBValue = properties.getProperty(PROP_CREDENTIALS_PROVIDER_DYNAMODB);
if (propCredentialsProviderDynamoDBValue == null) {
providerDynamoDB = provider;
} else {
providerDynamoDB = awsCPPropGetter.decodeValue(propCredentialsProviderDynamoDBValue);
}
// Decode the CloudWatch credentials provider if it exists. If not use the Kinesis credentials provider.
AWSCredentialsProvider providerCloudWatch;
String propCredentialsProviderCloudWatchValue = properties.getProperty(PROP_CREDENTIALS_PROVIDER_CLOUDWATCH);
if (propCredentialsProviderCloudWatchValue == null) {
providerCloudWatch = provider;
} else {
providerCloudWatch = awsCPPropGetter.decodeValue(propCredentialsProviderCloudWatchValue);
}
// Allow customer not to provide workerId or to provide empty worker id.
String workerId = stringValueDecoder.decodeValue(properties.getProperty(PROP_WORKER_ID));
if (workerId == null || workerId.isEmpty()) {
@ -125,13 +145,13 @@ public class KinesisClientLibConfigurator {
}
KinesisClientLibConfiguration config =
new KinesisClientLibConfiguration(applicationName, streamName, provider, workerId);
new KinesisClientLibConfiguration(applicationName, streamName, provider, providerDynamoDB, providerCloudWatch, workerId);
Set<String> requiredNames =
new HashSet<String>(Arrays.asList(PROP_STREAM_NAME,
PROP_APP_NAME,
PROP_WORKER_ID,
PROP_CREDENTIALS_PROVIDER));
PROP_CREDENTIALS_PROVIDER_KINESIS));
// Set all the variables that are not used for constructor.
for (Object keyObject : properties.keySet()) {

View file

@ -154,6 +154,13 @@ public class KinesisClientLibConfiguration {
*/
public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10;
/*
* The Worker will skip shard sync during initialization if there are one or more leases in the lease table.
* This assumes that the shards and leases are in-sync.
* This enables customers to choose faster startup times (e.g. during incremental deployments of an application).
*/
public static final boolean DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false;
/**
* Default Shard prioritization strategy.
*/
@ -192,6 +199,8 @@ public class KinesisClientLibConfiguration {
private int initialLeaseTableReadCapacity;
private int initialLeaseTableWriteCapacity;
private InitialPositionInStreamExtended initialPositionInStreamExtended;
// This is useful for optimizing deployments to large fleets working on a stable stream.
private boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
private ShardPrioritization shardPrioritization;
/**
@ -410,6 +419,7 @@ public class KinesisClientLibConfiguration {
this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY;
this.initialPositionInStreamExtended =
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
}
@ -640,6 +650,13 @@ public class KinesisClientLibConfiguration {
return regionName;
}
/**
* @return true if Worker should skip syncing shards and leases at startup if leases are present
*/
public boolean getSkipShardSyncAtWorkerInitializationIfLeasesExist() {
return skipShardSyncAtWorkerInitializationIfLeasesExist;
}
/**
* @return Max leases this Worker can handle at a time
*/
@ -950,6 +967,21 @@ public class KinesisClientLibConfiguration {
return this;
}
/**
* If set to true, the Worker will not sync shards and leases during initialization if there are one or more leases
* in the lease table. This assumes that the shards and leases are in-sync.
* This enables customers to choose faster startup times (e.g. during incremental deployments of an application).
*
* @param skipShardSyncAtStartupIfLeasesExist Should Worker skip syncing shards and leases at startup (Worker
* initialization).
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withSkipShardSyncAtStartupIfLeasesExist(
boolean skipShardSyncAtStartupIfLeasesExist) {
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtStartupIfLeasesExist;
return this;
}
/**
*
* @param regionName The region name for the service

View file

@ -73,7 +73,8 @@ class ProcessTask implements ITask {
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisDataFetcher dataFetcher,
long backoffTimeMillis) {
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
super();
this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor;
@ -82,13 +83,19 @@ class ProcessTask implements ITask {
this.streamConfig = streamConfig;
this.backoffTimeMillis = backoffTimeMillis;
IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy();
if (kinesisProxy instanceof IKinesisProxyExtended) {
// If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for
// this ProcessTask. In this case, duplicate KPL user records in the event of resharding will
// not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if
// KPL is used for ingestion and KPL's aggregation feature is used.
if (!skipShardSyncAtWorkerInitializationIfLeasesExist && kinesisProxy instanceof IKinesisProxyExtended) {
this.shard = ((IKinesisProxyExtended) kinesisProxy).getShard(this.shardInfo.getShardId());
} else {
this.shard = null;
}
if (this.shard == null && !skipShardSyncAtWorkerInitializationIfLeasesExist) {
LOG.warn("Cannot get the shard for this ProcessTask, so duplicate KPL user records "
+ "in the event of resharding will not be dropped during deaggregation of Amazon "
+ "Kinesis records.");
this.shard = null;
}
}

View file

@ -58,6 +58,7 @@ class ShardConsumer {
private final long parentShardPollIntervalMillis;
private final boolean cleanupLeasesOfCompletedShards;
private final long taskBackoffTimeMillis;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
private ITask currentTask;
private long currentTaskSubmitTime;
@ -96,7 +97,8 @@ class ShardConsumer {
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis) {
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
this.streamConfig = streamConfig;
this.recordProcessor = recordProcessor;
this.executorService = executorService;
@ -114,6 +116,7 @@ class ShardConsumer {
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
this.taskBackoffTimeMillis = backoffTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
}
/**
@ -262,7 +265,8 @@ class ShardConsumer {
recordProcessor,
recordProcessorCheckpointer,
dataFetcher,
taskBackoffTimeMillis);
taskBackoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist);
break;
case SHUTTING_DOWN:
nextTask =

View file

@ -70,7 +70,7 @@ public class Worker implements Runnable {
private final long idleTimeInMilliseconds;
// Backoff time when polling to check if application has finished processing
// parent shards
private final long parentShardPollIntervalMillis;
private final long parentShardPollIntervalMillis;
private final ExecutorService executorService;
private final IMetricsFactory metricsFactory;
// Backoff time when running tasks if they encounter exceptions
@ -91,6 +91,8 @@ public class Worker implements Runnable {
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap =
new ConcurrentHashMap<ShardInfo, ShardConsumer>();
private final boolean cleanupLeasesUponShardCompletion;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
/**
* Constructor.
@ -208,7 +210,7 @@ public class Worker implements Runnable {
ExecutorService execService) {
this(
config.getApplicationName(),
new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory),
new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory),
new StreamConfig(
new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient)
.getProxy(config.getStreamName()),
@ -235,7 +237,9 @@ public class Worker implements Runnable {
metricsFactory,
config.getTaskBackoffTimeMillis(),
config.getFailoverTimeMillis(),
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
config.getShardPrioritizationStrategy());
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) {
Region region = RegionUtils.getRegion(config.getRegionName());
@ -297,6 +301,7 @@ public class Worker implements Runnable {
IMetricsFactory metricsFactory,
long taskBackoffTimeMillis,
long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
ShardPrioritization shardPrioritization) {
this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory;
@ -318,7 +323,8 @@ public class Worker implements Runnable {
metricsFactory,
executorService);
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
this.failoverTimeMillis = failoverTimeMillis;
this.failoverTimeMillis = failoverTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
this.shardPrioritization = shardPrioritization;
}
@ -400,16 +406,22 @@ public class Worker implements Runnable {
LOG.info("Initializing LeaseCoordinator");
leaseCoordinator.initialize();
LOG.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask =
new ShardSyncTask(streamConfig.getStreamProxy(),
leaseCoordinator.getLeaseManager(),
initialPosition,
cleanupLeasesUponShardCompletion,
0L);
TaskResult result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
TaskResult result = null;
if (!skipShardSyncAtWorkerInitializationIfLeasesExist
|| leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) {
LOG.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask =
new ShardSyncTask(streamConfig.getStreamProxy(),
leaseCoordinator.getLeaseManager(),
initialPosition,
cleanupLeasesUponShardCompletion,
0L);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
} else {
LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
}
if (result.getException() == null) {
if (result == null || result.getException() == null) {
if (!leaseCoordinator.isRunning()) {
LOG.info("Starting LeaseCoordinator");
leaseCoordinator.start();
@ -572,7 +584,7 @@ public class Worker implements Runnable {
return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor,
leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion,
executorService, metricsFactory, taskBackoffTimeMillis);
executorService, metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist);
}
@ -996,7 +1008,9 @@ public class Worker implements Runnable {
metricsFactory,
config.getTaskBackoffTimeMillis(),
config.getFailoverTimeMillis(),
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
shardPrioritization);
}
}

View file

@ -70,6 +70,7 @@ public class LeaseCoordinator<T extends Lease> {
// Package level access for testing.
static final int MAX_LEASE_RENEWAL_THREAD_COUNT = 20;
private final ILeaseRenewer<T> leaseRenewer;
private final ILeaseTaker<T> leaseTaker;
private final long renewerIntervalMillis;

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -214,6 +214,14 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
return list(null);
}
/**
* {@inheritDoc}
*/
@Override
public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
return list(1).isEmpty();
}
/**
* List with the given page size. Package access for integration testing.
*

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -180,4 +180,15 @@ public interface ILeaseManager<T extends Lease> {
public boolean updateLease(T lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* Check (synchronously) if there are any leases in the lease table.
*
* @return true if there are no leases in the lease table
*
* @throws DependencyException if DynamoDB scan fails in an unexpected way
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity
*/
public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException;
}

View file

@ -40,6 +40,12 @@ public class KinesisClientLibConfiguratorTest {
"com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProvider";
private String credentialName2 =
"com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysFailCredentialsProvider";
private String credentialNameKinesis =
"com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderKinesis";
private String credentialNameDynamoDB =
"com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderDynamoDB";
private String credentialNameCloudWatch =
"com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderCloudWatch";
private KinesisClientLibConfigurator configurator = new KinesisClientLibConfigurator();
@Test
@ -329,6 +335,74 @@ public class KinesisClientLibConfiguratorTest {
}
}
@Test
public void testWithDifferentAWSCredentialsForDynamoDBAndCloudWatch() {
String test = StringUtils.join(new String[] {
"streamName = a",
"applicationName = b",
"AWSCredentialsProvider = " + credentialNameKinesis,
"AWSCredentialsProviderDynamoDB = " + credentialNameDynamoDB,
"AWSCredentialsProviderCloudWatch = " + credentialNameCloudWatch,
"failoverTimeMillis = 100",
"shardSyncIntervalMillis = 500"
}, '\n');
InputStream input = new ByteArrayInputStream(test.getBytes());
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
KinesisClientLibConfiguration config = configurator.getConfiguration(input);
try {
config.getKinesisCredentialsProvider().getCredentials();
} catch (Exception e) {
fail("Kinesis credential providers should not fail.");
}
try {
config.getDynamoDBCredentialsProvider().getCredentials();
} catch (Exception e) {
fail("DynamoDB credential providers should not fail.");
}
try {
config.getCloudWatchCredentialsProvider().getCredentials();
} catch (Exception e) {
fail("CloudWatch credential providers should not fail.");
}
}
@Test
public void testWithDifferentAWSCredentialsForDynamoDBAndCloudWatchFailed() {
String test = StringUtils.join(new String[] {
"streamName = a",
"applicationName = b",
"AWSCredentialsProvider = " + credentialNameKinesis,
"AWSCredentialsProviderDynamoDB = " + credentialName1,
"AWSCredentialsProviderCloudWatch = " + credentialName1,
"failoverTimeMillis = 100",
"shardSyncIntervalMillis = 500"
}, '\n');
InputStream input = new ByteArrayInputStream(test.getBytes());
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
KinesisClientLibConfiguration config = configurator.getConfiguration(input);
try {
config.getKinesisCredentialsProvider().getCredentials();
} catch (Exception e) {
fail("Kinesis credential providers should not fail.");
}
try {
config.getDynamoDBCredentialsProvider().getCredentials();
fail("DynamoDB credential providers should fail.");
} catch (Exception e) {
// succeed
}
try {
config.getCloudWatchCredentialsProvider().getCredentials();
fail("CloudWatch credential providers should fail.");
} catch (Exception e) {
// succeed
}
}
/**
* This credentials provider will always succeed
*/
@ -345,6 +419,84 @@ public class KinesisClientLibConfiguratorTest {
}
/**
* This credentials provider will always succeed
*/
public static class AlwaysSucceedCredentialsProviderKinesis implements AWSCredentialsProvider {
@Override
public AWSCredentials getCredentials() {
return new AWSCredentials() {
@Override
public String getAWSAccessKeyId() {
return "";
}
@Override
public String getAWSSecretKey() {
return "";
}
};
}
@Override
public void refresh() {
}
}
/**
* This credentials provider will always succeed
*/
public static class AlwaysSucceedCredentialsProviderDynamoDB implements AWSCredentialsProvider {
@Override
public AWSCredentials getCredentials() {
return new AWSCredentials() {
@Override
public String getAWSAccessKeyId() {
return "";
}
@Override
public String getAWSSecretKey() {
return "";
}
};
}
@Override
public void refresh() {
}
}
/**
* This credentials provider will always succeed
*/
public static class AlwaysSucceedCredentialsProviderCloudWatch implements AWSCredentialsProvider {
@Override
public AWSCredentials getCredentials() {
return new AWSCredentials() {
@Override
public String getAWSAccessKeyId() {
return "";
}
@Override
public String getAWSSecretKey() {
return "";
}
};
}
@Override
public void refresh() {
}
}
/**
* This credentials provider will always fail
*/

View file

@ -212,4 +212,10 @@ class ExceptionThrowingLeaseManager implements ILeaseManager<KinesisClientLease>
leaseManager.deleteAll();
}
@Override
public boolean isLeaseTableEmpty() throws DependencyException,
InvalidStateException, ProvisionedThroughputException {
return false;
}
}

View file

@ -89,7 +89,8 @@ public class ProcessTaskTest {
INITIAL_POSITION_LATEST);
final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null);
processTask = new ProcessTask(
shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis);
shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
}
@Test

View file

@ -117,7 +117,8 @@ public class ShardConsumerTest {
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
taskBackoffTimeMillis);
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize
@ -167,7 +168,8 @@ public class ShardConsumerTest {
cleanupLeasesOfCompletedShards,
spyExecutorService,
metricsFactory,
taskBackoffTimeMillis);
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize
@ -211,7 +213,8 @@ public class ShardConsumerTest {
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
taskBackoffTimeMillis);
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
when(leaseManager.getLease(anyString())).thenReturn(null);
when(checkpoint.getCheckpoint(anyString())).thenReturn(new ExtendedSequenceNumber("123"));
@ -300,7 +303,8 @@ public class ShardConsumerTest {
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
taskBackoffTimeMillis);
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
@ -390,7 +394,8 @@ public class ShardConsumerTest {
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
taskBackoffTimeMillis);
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards

View file

@ -224,6 +224,7 @@ public class WorkerTest {
nullMetricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
shardPrioritization);
ShardInfo shardInfo = new ShardInfo(dummyKinesisShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
ShardConsumer consumer = worker.createOrGetShardConsumer(shardInfo, streamletFactory);
@ -263,7 +264,7 @@ public class WorkerTest {
Worker worker = new Worker(stageName, streamletFactory, streamConfig, INITIAL_POSITION_LATEST,
parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint,
leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis,
shardPrioritization);
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization);
Worker workerSpy = spy(worker);
@ -327,6 +328,7 @@ public class WorkerTest {
nullMetricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
shardPrioritization);
ShardInfo shardInfo1 = new ShardInfo(dummyKinesisShardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
@ -380,6 +382,7 @@ public class WorkerTest {
nullMetricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
shardPrioritization);
worker.run();
Assert.assertTrue(count > 0);
@ -823,8 +826,9 @@ public class WorkerTest {
metricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
shardPrioritization);
WorkerThread workerThread = new WorkerThread(worker);
workerThread.start();
return workerThread;