Merge branch 'master' into listshards
This commit is contained in:
commit
4b9b5bff1a
19 changed files with 395 additions and 84 deletions
|
|
@ -2,7 +2,7 @@ Manifest-Version: 1.0
|
|||
Bundle-ManifestVersion: 2
|
||||
Bundle-Name: Amazon Kinesis Client Library for Java
|
||||
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
|
||||
Bundle-Version: 1.8.8
|
||||
Bundle-Version: 1.8.9
|
||||
Bundle-Vendor: Amazon Technologies, Inc
|
||||
Bundle-RequiredExecutionEnvironment: JavaSE-1.8
|
||||
Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
|
||||
|
|
|
|||
2
pom.xml
2
pom.xml
|
|
@ -25,7 +25,7 @@
|
|||
</licenses>
|
||||
|
||||
<properties>
|
||||
<aws-java-sdk.version>1.11.218</aws-java-sdk.version>
|
||||
<aws-java-sdk.version>1.11.271</aws-java-sdk.version>
|
||||
<sqlite4java.version>1.0.392</sqlite4java.version>
|
||||
<sqlite4java.native>libsqlite4java</sqlite4java.native>
|
||||
<sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath>
|
||||
|
|
|
|||
|
|
@ -527,6 +527,7 @@ class ConsumerStates {
|
|||
consumer.getStreamConfig().getStreamProxy(),
|
||||
consumer.getStreamConfig().getInitialPositionInStream(),
|
||||
consumer.isCleanupLeasesOfCompletedShards(),
|
||||
consumer.isIgnoreUnexpectedChildShards(),
|
||||
consumer.getLeaseManager(),
|
||||
consumer.getTaskBackoffTimeMillis(),
|
||||
consumer.getGetRecordsCache());
|
||||
|
|
|
|||
|
|
@ -126,7 +126,7 @@ public class KinesisClientLibConfiguration {
|
|||
/**
|
||||
* User agent set when Amazon Kinesis Client Library makes AWS requests.
|
||||
*/
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.8";
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.10";
|
||||
|
||||
/**
|
||||
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
|
||||
|
|
@ -210,6 +210,7 @@ public class KinesisClientLibConfiguration {
|
|||
private boolean callProcessRecordsEvenForEmptyRecordList;
|
||||
private long parentShardPollIntervalMillis;
|
||||
private boolean cleanupLeasesUponShardCompletion;
|
||||
private boolean ignoreUnexpectedChildShards;
|
||||
private ClientConfiguration kinesisClientConfig;
|
||||
private ClientConfiguration dynamoDBClientConfig;
|
||||
private ClientConfiguration cloudWatchClientConfig;
|
||||
|
|
@ -818,6 +819,13 @@ public class KinesisClientLibConfiguration {
|
|||
return cleanupLeasesUponShardCompletion;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if we should ignore child shards which have open parents
|
||||
*/
|
||||
public boolean shouldIgnoreUnexpectedChildShards() {
|
||||
return ignoreUnexpectedChildShards;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if KCL should validate client provided sequence numbers with a call to Amazon Kinesis before
|
||||
* checkpointing for calls to {@link RecordProcessorCheckpointer#checkpoint(String)}
|
||||
|
|
@ -1038,6 +1046,16 @@ public class KinesisClientLibConfiguration {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param ignoreUnexpectedChildShards Ignore child shards with open parents.
|
||||
* @return KinesisClientLibConfiguration
|
||||
*/
|
||||
public KinesisClientLibConfiguration withIgnoreUnexpectedChildShards(
|
||||
boolean ignoreUnexpectedChildShards) {
|
||||
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param clientConfig Common client configuration used by Kinesis/DynamoDB/CloudWatch client
|
||||
* @return KinesisClientLibConfiguration
|
||||
|
|
|
|||
|
|
@ -14,6 +14,9 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
|
@ -50,6 +53,8 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
|
|||
private SequenceNumberValidator sequenceNumberValidator;
|
||||
|
||||
private ExtendedSequenceNumber sequenceNumberAtShardEnd;
|
||||
|
||||
private IMetricsFactory metricsFactory;
|
||||
|
||||
/**
|
||||
* Only has package level access, since only the Amazon Kinesis Client Library should be creating these.
|
||||
|
|
@ -59,10 +64,12 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
|
|||
*/
|
||||
RecordProcessorCheckpointer(ShardInfo shardInfo,
|
||||
ICheckpoint checkpoint,
|
||||
SequenceNumberValidator validator) {
|
||||
SequenceNumberValidator validator,
|
||||
IMetricsFactory metricsFactory) {
|
||||
this.shardInfo = shardInfo;
|
||||
this.checkpoint = checkpoint;
|
||||
this.sequenceNumberValidator = validator;
|
||||
this.metricsFactory = metricsFactory;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -283,21 +290,33 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
|
|||
// just checkpoint at SHARD_END
|
||||
checkpointToRecord = ExtendedSequenceNumber.SHARD_END;
|
||||
}
|
||||
|
||||
boolean unsetMetrics = false;
|
||||
// Don't checkpoint a value we already successfully checkpointed
|
||||
if (extendedSequenceNumber != null && !extendedSequenceNumber.equals(lastCheckpointValue)) {
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Setting " + shardInfo.getShardId() + ", token " + shardInfo.getConcurrencyToken()
|
||||
+ " checkpoint to " + checkpointToRecord);
|
||||
try {
|
||||
if (!MetricsHelper.isMetricsScopePresent()) {
|
||||
MetricsHelper.setMetricsScope(new ThreadSafeMetricsDelegatingScope(metricsFactory.createMetrics()));
|
||||
unsetMetrics = true;
|
||||
}
|
||||
if (extendedSequenceNumber != null && !extendedSequenceNumber.equals(lastCheckpointValue)) {
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Setting " + shardInfo.getShardId() + ", token " + shardInfo.getConcurrencyToken()
|
||||
+ " checkpoint to " + checkpointToRecord);
|
||||
}
|
||||
checkpoint.setCheckpoint(shardInfo.getShardId(), checkpointToRecord, shardInfo.getConcurrencyToken());
|
||||
lastCheckpointValue = checkpointToRecord;
|
||||
} catch (ThrottlingException | ShutdownException | InvalidStateException
|
||||
| KinesisClientLibDependencyException e) {
|
||||
throw e;
|
||||
} catch (KinesisClientLibException e) {
|
||||
LOG.warn("Caught exception setting checkpoint.", e);
|
||||
throw new KinesisClientLibDependencyException("Caught exception while checkpointing", e);
|
||||
}
|
||||
checkpoint.setCheckpoint(shardInfo.getShardId(), checkpointToRecord, shardInfo.getConcurrencyToken());
|
||||
lastCheckpointValue = checkpointToRecord;
|
||||
} catch (ThrottlingException | ShutdownException | InvalidStateException
|
||||
| KinesisClientLibDependencyException e) {
|
||||
throw e;
|
||||
} catch (KinesisClientLibException e) {
|
||||
LOG.warn("Caught exception setting checkpoint.", e);
|
||||
throw new KinesisClientLibDependencyException("Caught exception while checkpointing", e);
|
||||
}
|
||||
} finally {
|
||||
if (unsetMetrics) {
|
||||
MetricsHelper.unsetMetricsScope();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -170,7 +170,8 @@ class ShardConsumer {
|
|||
new SequenceNumberValidator(
|
||||
streamConfig.getStreamProxy(),
|
||||
shardInfo.getShardId(),
|
||||
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing())),
|
||||
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()),
|
||||
metricsFactory),
|
||||
leaseManager,
|
||||
parentShardPollIntervalMillis,
|
||||
cleanupLeasesOfCompletedShards,
|
||||
|
|
@ -484,6 +485,10 @@ class ShardConsumer {
|
|||
return cleanupLeasesOfCompletedShards;
|
||||
}
|
||||
|
||||
boolean isIgnoreUnexpectedChildShards() {
|
||||
return config.shouldIgnoreUnexpectedChildShards();
|
||||
}
|
||||
|
||||
long getTaskBackoffTimeMillis() {
|
||||
return taskBackoffTimeMillis;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ class ShardSyncTask implements ITask {
|
|||
private final ILeaseManager<KinesisClientLease> leaseManager;
|
||||
private InitialPositionInStreamExtended initialPosition;
|
||||
private final boolean cleanupLeasesUponShardCompletion;
|
||||
private final boolean ignoreUnexpectedChildShards;
|
||||
private final long shardSyncTaskIdleTimeMillis;
|
||||
private final TaskType taskType = TaskType.SHARDSYNC;
|
||||
|
||||
|
|
@ -49,11 +50,13 @@ class ShardSyncTask implements ITask {
|
|||
ILeaseManager<KinesisClientLease> leaseManager,
|
||||
InitialPositionInStreamExtended initialPositionInStream,
|
||||
boolean cleanupLeasesUponShardCompletion,
|
||||
boolean ignoreUnexpectedChildShards,
|
||||
long shardSyncTaskIdleTimeMillis) {
|
||||
this.kinesisProxy = kinesisProxy;
|
||||
this.leaseManager = leaseManager;
|
||||
this.initialPosition = initialPositionInStream;
|
||||
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
|
||||
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
|
||||
this.shardSyncTaskIdleTimeMillis = shardSyncTaskIdleTimeMillis;
|
||||
}
|
||||
|
||||
|
|
@ -68,7 +71,8 @@ class ShardSyncTask implements ITask {
|
|||
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
|
||||
leaseManager,
|
||||
initialPosition,
|
||||
cleanupLeasesUponShardCompletion);
|
||||
cleanupLeasesUponShardCompletion,
|
||||
ignoreUnexpectedChildShards);
|
||||
if (shardSyncTaskIdleTimeMillis > 0) {
|
||||
Thread.sleep(shardSyncTaskIdleTimeMillis);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ class ShardSyncTaskManager {
|
|||
private final ExecutorService executorService;
|
||||
private final InitialPositionInStreamExtended initialPositionInStream;
|
||||
private boolean cleanupLeasesUponShardCompletion;
|
||||
private boolean ignoreUnexpectedChildShards;
|
||||
private final long shardSyncIdleTimeMillis;
|
||||
|
||||
|
||||
|
|
@ -55,6 +56,7 @@ class ShardSyncTaskManager {
|
|||
* @param initialPositionInStream Initial position in stream
|
||||
* @param cleanupLeasesUponShardCompletion Clean up leases for shards that we've finished processing (don't wait
|
||||
* until they expire)
|
||||
* @param ignoreUnexpectedChildShards Ignore child shards with open parents
|
||||
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
|
||||
* @param metricsFactory Metrics factory
|
||||
* @param executorService ExecutorService to execute the shard sync tasks
|
||||
|
|
@ -63,6 +65,7 @@ class ShardSyncTaskManager {
|
|||
final ILeaseManager<KinesisClientLease> leaseManager,
|
||||
final InitialPositionInStreamExtended initialPositionInStream,
|
||||
final boolean cleanupLeasesUponShardCompletion,
|
||||
final boolean ignoreUnexpectedChildShards,
|
||||
final long shardSyncIdleTimeMillis,
|
||||
final IMetricsFactory metricsFactory,
|
||||
ExecutorService executorService) {
|
||||
|
|
@ -70,6 +73,7 @@ class ShardSyncTaskManager {
|
|||
this.leaseManager = leaseManager;
|
||||
this.metricsFactory = metricsFactory;
|
||||
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
|
||||
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
|
||||
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
|
||||
this.executorService = executorService;
|
||||
this.initialPositionInStream = initialPositionInStream;
|
||||
|
|
@ -99,6 +103,7 @@ class ShardSyncTaskManager {
|
|||
leaseManager,
|
||||
initialPositionInStream,
|
||||
cleanupLeasesUponShardCompletion,
|
||||
ignoreUnexpectedChildShards,
|
||||
shardSyncIdleTimeMillis), metricsFactory);
|
||||
future = executorService.submit(currentTask);
|
||||
submittedNewTask = true;
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ import java.util.Set;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||
|
|
@ -60,9 +61,11 @@ class ShardSyncer {
|
|||
static synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy,
|
||||
ILeaseManager<KinesisClientLease> leaseManager,
|
||||
InitialPositionInStreamExtended initialPositionInStream,
|
||||
boolean cleanupLeasesOfCompletedShards)
|
||||
boolean cleanupLeasesOfCompletedShards,
|
||||
boolean ignoreUnexpectedChildShards)
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
||||
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards);
|
||||
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards,
|
||||
ignoreUnexpectedChildShards);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -71,21 +74,28 @@ class ShardSyncer {
|
|||
* @param kinesisProxy
|
||||
* @param leaseManager
|
||||
* @param initialPositionInStream
|
||||
* @param expectedClosedShardId If this is not null, we will assert that the shard list we get from Kinesis
|
||||
* shows this shard to be closed (e.g. parent shard must be closed after a reshard operation).
|
||||
* If it is open, we assume this is an race condition around a reshard event and throw
|
||||
* a KinesisClientLibIOException so client can backoff and retry later.
|
||||
* @param cleanupLeasesOfCompletedShards
|
||||
* @param ignoreUnexpectedChildShards
|
||||
* @throws DependencyException
|
||||
* @throws InvalidStateException
|
||||
* @throws ProvisionedThroughputException
|
||||
* @throws KinesisClientLibIOException
|
||||
*/
|
||||
static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
|
||||
ILeaseManager<KinesisClientLease> leaseManager,
|
||||
InitialPositionInStreamExtended initialPositionInStream,
|
||||
boolean cleanupLeasesOfCompletedShards,
|
||||
boolean ignoreUnexpectedChildShards)
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
||||
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards);
|
||||
}
|
||||
|
||||
static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
|
||||
ILeaseManager<KinesisClientLease> leaseManager,
|
||||
InitialPositionInStreamExtended initialPositionInStream,
|
||||
boolean cleanupLeasesOfCompletedShards)
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
||||
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards);
|
||||
checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -93,11 +103,9 @@ class ShardSyncer {
|
|||
*
|
||||
* @param kinesisProxy
|
||||
* @param leaseManager
|
||||
* @param expectedClosedShardId If this is not null, we will assert that the shard list we get from Kinesis
|
||||
* does not show this shard to be open (e.g. parent shard must be closed after a reshard operation).
|
||||
* If it is still open, we assume this is a race condition around a reshard event and
|
||||
* throw a KinesisClientLibIOException so client can backoff and retry later. If the shard doesn't exist in
|
||||
* Kinesis at all, we assume this is an old/expired shard and continue with the sync operation.
|
||||
* @param initialPosition
|
||||
* @param cleanupLeasesOfCompletedShards
|
||||
* @param ignoreUnexpectedChildShards
|
||||
* @throws DependencyException
|
||||
* @throws InvalidStateException
|
||||
* @throws ProvisionedThroughputException
|
||||
|
|
@ -107,18 +115,23 @@ class ShardSyncer {
|
|||
private static synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
|
||||
ILeaseManager<KinesisClientLease> leaseManager,
|
||||
InitialPositionInStreamExtended initialPosition,
|
||||
boolean cleanupLeasesOfCompletedShards)
|
||||
boolean cleanupLeasesOfCompletedShards,
|
||||
boolean ignoreUnexpectedChildShards)
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
||||
List<Shard> shards = getShardList(kinesisProxy);
|
||||
LOG.debug("Num shards: " + shards.size());
|
||||
|
||||
Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(shards);
|
||||
Map<String, Set<String>> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(shardIdToShardMap);
|
||||
assertAllParentShardsAreClosed(shardIdToChildShardIdsMap, shardIdToShardMap);
|
||||
|
||||
Set<String> inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap);
|
||||
if (!ignoreUnexpectedChildShards) {
|
||||
assertAllParentShardsAreClosed(inconsistentShardIds);
|
||||
}
|
||||
|
||||
List<KinesisClientLease> currentLeases = leaseManager.listLeases();
|
||||
|
||||
List<KinesisClientLease> newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition);
|
||||
|
||||
List<KinesisClientLease> newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition,
|
||||
inconsistentShardIds);
|
||||
LOG.debug("Num new leases to create: " + newLeasesToCreate.size());
|
||||
for (KinesisClientLease lease : newLeasesToCreate) {
|
||||
long startTimeMillis = System.currentTimeMillis();
|
||||
|
|
@ -149,19 +162,37 @@ class ShardSyncer {
|
|||
|
||||
/** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls
|
||||
* and a reshard operation.
|
||||
* @param shardIdToChildShardIdsMap
|
||||
* @param shardIdToShardMap
|
||||
* @param inconsistentShardIds
|
||||
* @throws KinesisClientLibIOException
|
||||
*/
|
||||
private static void assertAllParentShardsAreClosed(Map<String, Set<String>> shardIdToChildShardIdsMap,
|
||||
Map<String, Shard> shardIdToShardMap) throws KinesisClientLibIOException {
|
||||
private static void assertAllParentShardsAreClosed(Set<String> inconsistentShardIds)
|
||||
throws KinesisClientLibIOException {
|
||||
if (!inconsistentShardIds.isEmpty()) {
|
||||
String ids = StringUtils.join(inconsistentShardIds, ' ');
|
||||
throw new KinesisClientLibIOException(String.format("%d open child shards (%s) are inconsistent. "
|
||||
+ "This can happen due to a race condition between describeStream and a reshard operation.",
|
||||
inconsistentShardIds.size(), ids));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to construct the list of inconsistent shards, which are open shards with non-closed ancestor
|
||||
* parent(s).
|
||||
* @param shardIdToChildShardIdsMap
|
||||
* @param shardIdToShardMap
|
||||
* @return Set of inconsistent open shard ids for shards having open parents.
|
||||
*/
|
||||
private static Set<String> findInconsistentShardIds(Map<String, Set<String>> shardIdToChildShardIdsMap,
|
||||
Map<String, Shard> shardIdToShardMap) {
|
||||
Set<String> result = new HashSet<String>();
|
||||
for (String parentShardId : shardIdToChildShardIdsMap.keySet()) {
|
||||
Shard parentShard = shardIdToShardMap.get(parentShardId);
|
||||
if ((parentShardId == null) || (parentShard.getSequenceNumberRange().getEndingSequenceNumber() == null)) {
|
||||
throw new KinesisClientLibIOException("Parent shardId " + parentShardId + " is not closed. "
|
||||
+ "This can happen due to a race condition between describeStream and a reshard operation.");
|
||||
Set<String> childShardIdsMap = shardIdToChildShardIdsMap.get(parentShardId);
|
||||
result.addAll(childShardIdsMap);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -296,8 +327,8 @@ class ShardSyncer {
|
|||
/**
|
||||
* Determine new leases to create and their initial checkpoint.
|
||||
* Note: Package level access only for testing purposes.
|
||||
*
|
||||
* For each open (no ending sequence number) shard that doesn't already have a lease,
|
||||
*
|
||||
* For each open (no ending sequence number) shard without open parents that doesn't already have a lease,
|
||||
* determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists):
|
||||
* If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed.
|
||||
* If not, set checkpoint of the shard to the initial position specified by the client.
|
||||
|
|
@ -315,27 +346,35 @@ class ShardSyncer {
|
|||
*
|
||||
* For example:
|
||||
* Shard structure (each level depicts a stream segment):
|
||||
* 0 1 2 3 4 5- shards till epoch 102
|
||||
* \ / \ / | |
|
||||
* 6 7 4 5- shards from epoch 103 - 205
|
||||
* \ / | /\
|
||||
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
|
||||
* 0 1 2 3 4 5 - shards till epoch 102
|
||||
* \ / \ / | |
|
||||
* 6 7 4 5 - shards from epoch 103 - 205
|
||||
* \ / | / \
|
||||
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
|
||||
* Current leases: (3, 4, 5)
|
||||
* New leases to create: (2, 6, 7, 8, 9, 10)
|
||||
*
|
||||
* The leases returned are sorted by the starting sequence number - following the same order
|
||||
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
|
||||
* before creating all the leases.
|
||||
*
|
||||
* If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it
|
||||
* here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very
|
||||
* high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only
|
||||
* currently happen here if ignoreUnexpectedChildShards was true in syncShardleases.
|
||||
*
|
||||
*
|
||||
* @param shards List of all shards in Kinesis (we'll create new leases based on this set)
|
||||
* @param currentLeases List of current leases
|
||||
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
|
||||
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
|
||||
* @param inconsistentShardIds Set of child shard ids having open parents.
|
||||
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
|
||||
*/
|
||||
static List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
|
||||
List<KinesisClientLease> currentLeases,
|
||||
InitialPositionInStreamExtended initialPosition) {
|
||||
InitialPositionInStreamExtended initialPosition,
|
||||
Set<String> inconsistentShardIds) {
|
||||
Map<String, KinesisClientLease> shardIdToNewLeaseMap = new HashMap<String, KinesisClientLease>();
|
||||
Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
|
||||
|
||||
|
|
@ -354,6 +393,8 @@ class ShardSyncer {
|
|||
LOG.debug("Evaluating leases for open shard " + shardId + " and its ancestors.");
|
||||
if (shardIdsOfCurrentLeases.contains(shardId)) {
|
||||
LOG.debug("Lease for shardId " + shardId + " already exists. Not creating a lease");
|
||||
} else if (inconsistentShardIds.contains(shardId)) {
|
||||
LOG.info("shardId " + shardId + " is an inconsistent child. Not creating a lease");
|
||||
} else {
|
||||
LOG.debug("Need to create a lease for shardId " + shardId);
|
||||
KinesisClientLease newLease = newKCLLease(shard);
|
||||
|
|
@ -407,6 +448,17 @@ class ShardSyncer {
|
|||
return newLeasesToCreate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine new leases to create and their initial checkpoint.
|
||||
* Note: Package level access only for testing purposes.
|
||||
*/
|
||||
static List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
|
||||
List<KinesisClientLease> currentLeases,
|
||||
InitialPositionInStreamExtended initialPosition) {
|
||||
Set<String> inconsistentShardIds = new HashSet<String>();
|
||||
return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds);
|
||||
}
|
||||
|
||||
/**
|
||||
* Note: Package level access for testing purposes only.
|
||||
* Check if this shard is a descendant of a shard that is (or will be) processed.
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ class ShutdownTask implements ITask {
|
|||
private final ILeaseManager<KinesisClientLease> leaseManager;
|
||||
private final InitialPositionInStreamExtended initialPositionInStream;
|
||||
private final boolean cleanupLeasesOfCompletedShards;
|
||||
private final boolean ignoreUnexpectedChildShards;
|
||||
private final TaskType taskType = TaskType.SHUTDOWN;
|
||||
private final long backoffTimeMillis;
|
||||
private final GetRecordsCache getRecordsCache;
|
||||
|
|
@ -59,6 +60,7 @@ class ShutdownTask implements ITask {
|
|||
IKinesisProxy kinesisProxy,
|
||||
InitialPositionInStreamExtended initialPositionInStream,
|
||||
boolean cleanupLeasesOfCompletedShards,
|
||||
boolean ignoreUnexpectedChildShards,
|
||||
ILeaseManager<KinesisClientLease> leaseManager,
|
||||
long backoffTimeMillis,
|
||||
GetRecordsCache getRecordsCache) {
|
||||
|
|
@ -69,6 +71,7 @@ class ShutdownTask implements ITask {
|
|||
this.kinesisProxy = kinesisProxy;
|
||||
this.initialPositionInStream = initialPositionInStream;
|
||||
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
|
||||
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
|
||||
this.leaseManager = leaseManager;
|
||||
this.backoffTimeMillis = backoffTimeMillis;
|
||||
this.getRecordsCache = getRecordsCache;
|
||||
|
|
@ -127,7 +130,8 @@ class ShutdownTask implements ITask {
|
|||
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
|
||||
leaseManager,
|
||||
initialPositionInStream,
|
||||
cleanupLeasesOfCompletedShards);
|
||||
cleanupLeasesOfCompletedShards,
|
||||
ignoreUnexpectedChildShards);
|
||||
LOG.debug("Finished checking for child shards of shard " + shardInfo.getShardId());
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ import java.util.concurrent.ThreadPoolExecutor;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
|
@ -406,8 +407,8 @@ public class Worker implements Runnable {
|
|||
this.leaseCoordinator = leaseCoordinator;
|
||||
this.metricsFactory = metricsFactory;
|
||||
this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
|
||||
initialPositionInStream, cleanupLeasesUponShardCompletion, shardSyncIdleTimeMillis, metricsFactory,
|
||||
executorService);
|
||||
initialPositionInStream, cleanupLeasesUponShardCompletion, config.shouldIgnoreUnexpectedChildShards(),
|
||||
shardSyncIdleTimeMillis, metricsFactory, executorService);
|
||||
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
|
||||
this.failoverTimeMillis = failoverTimeMillis;
|
||||
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
|
||||
|
|
@ -498,7 +499,8 @@ public class Worker implements Runnable {
|
|||
|| leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) {
|
||||
LOG.info("Syncing Kinesis shard info");
|
||||
ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(),
|
||||
leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, 0L);
|
||||
leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion,
|
||||
config.shouldIgnoreUnexpectedChildShards(), 0L);
|
||||
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
|
||||
} else {
|
||||
LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
|
||||
|
|
@ -994,6 +996,11 @@ public class Worker implements Runnable {
|
|||
metricsFactory, execService);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
StreamConfig getStreamConfig() {
|
||||
return streamConfig;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given configuration, returns appropriate metrics factory.
|
||||
*
|
||||
|
|
@ -1071,6 +1078,7 @@ public class Worker implements Runnable {
|
|||
private IMetricsFactory metricsFactory;
|
||||
private ExecutorService execService;
|
||||
private ShardPrioritization shardPrioritization;
|
||||
private IKinesisProxy kinesisProxy;
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
|
|
@ -1190,6 +1198,19 @@ public class Worker implements Runnable {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set KinesisProxy for the worker.
|
||||
*
|
||||
* @param kinesisProxy
|
||||
* Sets an implementation of IKinesisProxy.
|
||||
*
|
||||
* @return A reference to this updated object so that method calls can be chained together.
|
||||
*/
|
||||
public Builder kinesisProxy(IKinesisProxy kinesisProxy) {
|
||||
this.kinesisProxy = kinesisProxy;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the Worker instance.
|
||||
*
|
||||
|
|
@ -1255,12 +1276,14 @@ public class Worker implements Runnable {
|
|||
if (shardPrioritization == null) {
|
||||
shardPrioritization = new ParentsFirstShardPrioritization(1);
|
||||
}
|
||||
|
||||
if (kinesisProxy == null) {
|
||||
kinesisProxy = new KinesisProxy(config, kinesisClient);
|
||||
}
|
||||
|
||||
return new Worker(config.getApplicationName(),
|
||||
recordProcessorFactory,
|
||||
config,
|
||||
new StreamConfig(new KinesisProxy(config, kinesisClient),
|
||||
new StreamConfig(kinesisProxy,
|
||||
config.getMaxRecords(),
|
||||
config.getIdleTimeBetweenReadsInMillis(),
|
||||
config.shouldCallProcessRecordsEvenForEmptyRecordList(),
|
||||
|
|
|
|||
|
|
@ -72,13 +72,22 @@ public class MetricsHelper {
|
|||
* @param scope
|
||||
*/
|
||||
public static void setMetricsScope(IMetricsScope scope) {
|
||||
if (currentScope.get() != null) {
|
||||
if (isMetricsScopePresent()) {
|
||||
throw new RuntimeException(String.format(
|
||||
"Metrics scope is already set for the current thread %s", Thread.currentThread().getName()));
|
||||
}
|
||||
currentScope.set(scope);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if current metricsscope is present or not.
|
||||
*
|
||||
* @return true if metrics scope is present, else returns false
|
||||
*/
|
||||
public static boolean isMetricsScopePresent() {
|
||||
return currentScope.get() != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsets the metrics scope for the current thread.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.util.Date;
|
||||
|
|
@ -407,4 +408,14 @@ public class KinesisClientLibConfigurationTest {
|
|||
fail("Should not have thrown");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKCLConfigurationWithIgnoreUnexpectedChildShards() {
|
||||
KinesisClientLibConfiguration config =
|
||||
new KinesisClientLibConfiguration("TestApplication", "TestStream", null, "TestWorker");
|
||||
// By default, unexpected child shards should not be ignored.
|
||||
assertFalse(config.shouldIgnoreUnexpectedChildShards());
|
||||
config = config.withIgnoreUnexpectedChildShards(true);
|
||||
assertTrue(config.shouldIgnoreUnexpectedChildShards());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,16 +14,27 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IPreparedCheckpointer;
|
||||
|
|
@ -31,15 +42,15 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheck
|
|||
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsScope;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class RecordProcessorCheckpointerTest {
|
||||
private String startingSequenceNumber = "13";
|
||||
private ExtendedSequenceNumber startingExtendedSequenceNumber = new ExtendedSequenceNumber(startingSequenceNumber);
|
||||
|
|
@ -48,6 +59,9 @@ public class RecordProcessorCheckpointerTest {
|
|||
private ShardInfo shardInfo;
|
||||
private SequenceNumberValidator sequenceNumberValidator;
|
||||
private String shardId = "shardId-123";
|
||||
|
||||
@Mock
|
||||
IMetricsFactory metricsFactory;
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
|
|
@ -78,7 +92,7 @@ public class RecordProcessorCheckpointerTest {
|
|||
public final void testCheckpoint() throws Exception {
|
||||
// First call to checkpoint
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, null);
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory);
|
||||
processingCheckpointer.setLargestPermittedCheckpointValue(startingExtendedSequenceNumber);
|
||||
processingCheckpointer.checkpoint();
|
||||
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId));
|
||||
|
|
@ -98,7 +112,7 @@ public class RecordProcessorCheckpointerTest {
|
|||
@Test
|
||||
public final void testCheckpointRecord() throws Exception {
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
|
||||
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
|
||||
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5025");
|
||||
Record record = new Record().withSequenceNumber("5025");
|
||||
|
|
@ -114,7 +128,7 @@ public class RecordProcessorCheckpointerTest {
|
|||
@Test
|
||||
public final void testCheckpointSubRecord() throws Exception {
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
|
||||
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
|
||||
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5030");
|
||||
Record record = new Record().withSequenceNumber("5030");
|
||||
|
|
@ -131,7 +145,7 @@ public class RecordProcessorCheckpointerTest {
|
|||
@Test
|
||||
public final void testCheckpointSequenceNumber() throws Exception {
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
|
||||
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
|
||||
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5035");
|
||||
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
|
||||
|
|
@ -146,7 +160,7 @@ public class RecordProcessorCheckpointerTest {
|
|||
@Test
|
||||
public final void testCheckpointExtendedSequenceNumber() throws Exception {
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
|
||||
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
|
||||
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5040");
|
||||
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
|
||||
|
|
@ -162,7 +176,7 @@ public class RecordProcessorCheckpointerTest {
|
|||
public final void testPrepareCheckpoint() throws Exception {
|
||||
// First call to checkpoint
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
|
||||
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
|
||||
|
||||
ExtendedSequenceNumber sequenceNumber1 = new ExtendedSequenceNumber("5001");
|
||||
|
|
@ -193,7 +207,7 @@ public class RecordProcessorCheckpointerTest {
|
|||
@Test
|
||||
public final void testPrepareCheckpointRecord() throws Exception {
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
|
||||
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
|
||||
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5025");
|
||||
Record record = new Record().withSequenceNumber("5025");
|
||||
|
|
@ -218,7 +232,7 @@ public class RecordProcessorCheckpointerTest {
|
|||
@Test
|
||||
public final void testPrepareCheckpointSubRecord() throws Exception {
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
|
||||
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
|
||||
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5030");
|
||||
Record record = new Record().withSequenceNumber("5030");
|
||||
|
|
@ -244,7 +258,7 @@ public class RecordProcessorCheckpointerTest {
|
|||
@Test
|
||||
public final void testPrepareCheckpointSequenceNumber() throws Exception {
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
|
||||
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
|
||||
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5035");
|
||||
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
|
||||
|
|
@ -268,7 +282,7 @@ public class RecordProcessorCheckpointerTest {
|
|||
@Test
|
||||
public final void testPrepareCheckpointExtendedSequenceNumber() throws Exception {
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
|
||||
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
|
||||
ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber("5040");
|
||||
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
|
||||
|
|
@ -291,7 +305,7 @@ public class RecordProcessorCheckpointerTest {
|
|||
@Test
|
||||
public final void testMultipleOutstandingCheckpointersHappyCase() throws Exception {
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
|
||||
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
|
||||
processingCheckpointer.setLargestPermittedCheckpointValue(new ExtendedSequenceNumber("6040"));
|
||||
|
||||
|
|
@ -323,7 +337,7 @@ public class RecordProcessorCheckpointerTest {
|
|||
@Test
|
||||
public final void testMultipleOutstandingCheckpointersOutOfOrder() throws Exception {
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator);
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
|
||||
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
|
||||
processingCheckpointer.setLargestPermittedCheckpointValue(new ExtendedSequenceNumber("7040"));
|
||||
|
||||
|
|
@ -358,7 +372,7 @@ public class RecordProcessorCheckpointerTest {
|
|||
*/
|
||||
@Test
|
||||
public final void testUpdate() throws Exception {
|
||||
RecordProcessorCheckpointer checkpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, null);
|
||||
RecordProcessorCheckpointer checkpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory);
|
||||
|
||||
ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("10");
|
||||
checkpointer.setLargestPermittedCheckpointValue(sequenceNumber);
|
||||
|
|
@ -379,7 +393,7 @@ public class RecordProcessorCheckpointerTest {
|
|||
SequenceNumberValidator validator = mock(SequenceNumberValidator.class);
|
||||
Mockito.doNothing().when(validator).validateSequenceNumber(anyString());
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator);
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory);
|
||||
|
||||
// Several checkpoints we're gonna hit
|
||||
ExtendedSequenceNumber tooSmall = new ExtendedSequenceNumber("2");
|
||||
|
|
@ -467,7 +481,7 @@ public class RecordProcessorCheckpointerTest {
|
|||
SequenceNumberValidator validator = mock(SequenceNumberValidator.class);
|
||||
Mockito.doNothing().when(validator).validateSequenceNumber(anyString());
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator);
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory);
|
||||
|
||||
// Several checkpoints we're gonna hit
|
||||
ExtendedSequenceNumber tooSmall = new ExtendedSequenceNumber("2");
|
||||
|
|
@ -595,7 +609,7 @@ public class RecordProcessorCheckpointerTest {
|
|||
|
||||
for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) {
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator);
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory);
|
||||
testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.CHECKPOINTER);
|
||||
}
|
||||
}
|
||||
|
|
@ -615,7 +629,7 @@ public class RecordProcessorCheckpointerTest {
|
|||
|
||||
for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) {
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator);
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory);
|
||||
testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.PREPARED_CHECKPOINTER);
|
||||
}
|
||||
}
|
||||
|
|
@ -636,7 +650,7 @@ public class RecordProcessorCheckpointerTest {
|
|||
|
||||
for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) {
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator);
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory);
|
||||
testMixedCheckpointCalls(processingCheckpointer, testPlan, CheckpointerType.PREPARE_THEN_CHECKPOINTER);
|
||||
}
|
||||
}
|
||||
|
|
@ -785,4 +799,47 @@ public class RecordProcessorCheckpointerTest {
|
|||
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public final void testUnsetMetricsScopeDuringCheckpointing() throws Exception {
|
||||
// First call to checkpoint
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory);
|
||||
IMetricsScope scope = null;
|
||||
if (MetricsHelper.isMetricsScopePresent()) {
|
||||
scope = MetricsHelper.getMetricsScope();
|
||||
MetricsHelper.unsetMetricsScope();
|
||||
}
|
||||
ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("5019");
|
||||
processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber);
|
||||
processingCheckpointer.checkpoint();
|
||||
Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId));
|
||||
verify(metricsFactory).createMetrics();
|
||||
Assert.assertFalse(MetricsHelper.isMetricsScopePresent());
|
||||
if (scope != null) {
|
||||
MetricsHelper.setMetricsScope(scope);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public final void testSetMetricsScopeDuringCheckpointing() throws Exception {
|
||||
// First call to checkpoint
|
||||
RecordProcessorCheckpointer processingCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo, checkpoint, null, metricsFactory);
|
||||
boolean shouldUnset = false;
|
||||
if (!MetricsHelper.isMetricsScopePresent()) {
|
||||
shouldUnset = true;
|
||||
MetricsHelper.setMetricsScope(new NullMetricsScope());
|
||||
}
|
||||
ExtendedSequenceNumber sequenceNumber = new ExtendedSequenceNumber("5019");
|
||||
processingCheckpointer.setLargestPermittedCheckpointValue(sequenceNumber);
|
||||
processingCheckpointer.checkpoint();
|
||||
Assert.assertEquals(sequenceNumber, checkpoint.getCheckpoint(shardId));
|
||||
verify(metricsFactory, never()).createMetrics();
|
||||
Assert.assertTrue(MetricsHelper.isMetricsScopePresent());
|
||||
assertEquals(NullMetricsScope.class, MetricsHelper.getMetricsScope().getClass());
|
||||
if (shouldUnset) {
|
||||
MetricsHelper.unsetMetricsScope();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -342,7 +342,8 @@ public class ShardConsumerTest {
|
|||
streamConfig.getStreamProxy(),
|
||||
shardInfo.getShardId(),
|
||||
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()
|
||||
)
|
||||
),
|
||||
metricsFactory
|
||||
);
|
||||
|
||||
dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
|
||||
|
|
@ -493,7 +494,8 @@ public class ShardConsumerTest {
|
|||
streamConfig.getStreamProxy(),
|
||||
shardInfo.getShardId(),
|
||||
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()
|
||||
)
|
||||
),
|
||||
metricsFactory
|
||||
);
|
||||
|
||||
ShardConsumer consumer =
|
||||
|
|
@ -621,7 +623,8 @@ public class ShardConsumerTest {
|
|||
streamConfig.getStreamProxy(),
|
||||
shardInfo.getShardId(),
|
||||
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()
|
||||
)
|
||||
),
|
||||
metricsFactory
|
||||
);
|
||||
|
||||
dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
|
||||
|
|
|
|||
|
|
@ -127,6 +127,7 @@ public class ShardSyncTaskIntegrationTest {
|
|||
leaseManager,
|
||||
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST),
|
||||
false,
|
||||
false,
|
||||
0L);
|
||||
syncTask.call();
|
||||
List<KinesisClientLease> leases = leaseManager.listLeases();
|
||||
|
|
|
|||
|
|
@ -146,6 +146,39 @@ public class ShardSyncerTest {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test determineNewLeasesToCreate() where there are no leases and no resharding operations have been performed, but one of
|
||||
* the shards was marked as inconsistent.
|
||||
*/
|
||||
@Test
|
||||
public final void testDetermineNewLeasesToCreate0Leases0Reshards1Inconsistent() {
|
||||
List<Shard> shards = new ArrayList<Shard>();
|
||||
List<KinesisClientLease> currentLeases = new ArrayList<KinesisClientLease>();
|
||||
SequenceNumberRange sequenceRange = ShardObjectHelper.newSequenceNumberRange("342980", null);
|
||||
|
||||
String shardId0 = "shardId-0";
|
||||
shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange));
|
||||
|
||||
String shardId1 = "shardId-1";
|
||||
shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
|
||||
|
||||
String shardId2 = "shardId-2";
|
||||
shards.add(ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange));
|
||||
|
||||
Set<String> inconsistentShardIds = new HashSet<String>();
|
||||
inconsistentShardIds.add(shardId2);
|
||||
|
||||
List<KinesisClientLease> newLeases =
|
||||
ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds);
|
||||
Assert.assertEquals(2, newLeases.size());
|
||||
Set<String> expectedLeaseShardIds = new HashSet<String>();
|
||||
expectedLeaseShardIds.add(shardId0);
|
||||
expectedLeaseShardIds.add(shardId1);
|
||||
for (KinesisClientLease lease : newLeases) {
|
||||
Assert.assertTrue(expectedLeaseShardIds.contains(lease.getLeaseKey()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test bootstrapShardLeases() starting at TRIM_HORIZON ("beginning" of stream)
|
||||
*
|
||||
|
|
@ -296,6 +329,41 @@ public class ShardSyncerTest {
|
|||
dataFile.delete();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test checkAndCreateLeasesForNewShards() when a parent is open and children of open parents are being ignored.
|
||||
*/
|
||||
@Test
|
||||
public final void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringInconsistentChildren()
|
||||
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException,
|
||||
IOException {
|
||||
List<Shard> shards = constructShardListForGraphA();
|
||||
Shard shard = shards.get(5);
|
||||
Assert.assertEquals("shardId-5", shard.getShardId());
|
||||
SequenceNumberRange range = shard.getSequenceNumberRange();
|
||||
// shardId-5 in graph A has two children (shardId-9 and shardId-10). if shardId-5
|
||||
// is not closed, those children should be ignored when syncing shards, no leases
|
||||
// should be obtained for them, and we should obtain a lease on the still-open
|
||||
// parent.
|
||||
range.setEndingSequenceNumber(null);
|
||||
shard.setSequenceNumberRange(range);
|
||||
File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1");
|
||||
dataFile.deleteOnExit();
|
||||
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
|
||||
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST,
|
||||
cleanupLeasesOfCompletedShards, true);
|
||||
List<KinesisClientLease> newLeases = leaseManager.listLeases();
|
||||
Set<String> expectedLeaseShardIds = new HashSet<String>();
|
||||
expectedLeaseShardIds.add("shardId-4");
|
||||
expectedLeaseShardIds.add("shardId-5");
|
||||
expectedLeaseShardIds.add("shardId-8");
|
||||
Assert.assertEquals(expectedLeaseShardIds.size(), newLeases.size());
|
||||
for (KinesisClientLease lease1 : newLeases) {
|
||||
Assert.assertTrue(expectedLeaseShardIds.contains(lease1.getLeaseKey()));
|
||||
Assert.assertEquals(ExtendedSequenceNumber.LATEST, lease1.getCheckpoint());
|
||||
}
|
||||
dataFile.delete();
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws KinesisClientLibIOException
|
||||
* @throws DependencyException
|
||||
|
|
@ -586,7 +654,8 @@ public class ShardSyncerTest {
|
|||
dataFile.deleteOnExit();
|
||||
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
|
||||
|
||||
ShardSyncer.bootstrapShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards);
|
||||
ShardSyncer.bootstrapShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards,
|
||||
false);
|
||||
List<KinesisClientLease> newLeases = leaseManager.listLeases();
|
||||
Assert.assertEquals(2, newLeases.size());
|
||||
Set<String> expectedLeaseShardIds = new HashSet<String>();
|
||||
|
|
|
|||
|
|
@ -100,6 +100,7 @@ public class ShutdownTaskTest {
|
|||
IKinesisProxy kinesisProxy = mock(IKinesisProxy.class);
|
||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||
boolean cleanupLeasesOfCompletedShards = false;
|
||||
boolean ignoreUnexpectedChildShards = false;
|
||||
ShutdownTask task = new ShutdownTask(defaultShardInfo,
|
||||
defaultRecordProcessor,
|
||||
checkpointer,
|
||||
|
|
@ -107,6 +108,7 @@ public class ShutdownTaskTest {
|
|||
kinesisProxy,
|
||||
INITIAL_POSITION_TRIM_HORIZON,
|
||||
cleanupLeasesOfCompletedShards,
|
||||
ignoreUnexpectedChildShards,
|
||||
leaseManager,
|
||||
TASK_BACKOFF_TIME_MILLIS,
|
||||
getRecordsCache);
|
||||
|
|
@ -126,6 +128,7 @@ public class ShutdownTaskTest {
|
|||
when(kinesisProxy.getShardList()).thenReturn(null);
|
||||
ILeaseManager<KinesisClientLease> leaseManager = mock(KinesisClientLeaseManager.class);
|
||||
boolean cleanupLeasesOfCompletedShards = false;
|
||||
boolean ignoreUnexpectedChildShards = false;
|
||||
ShutdownTask task = new ShutdownTask(defaultShardInfo,
|
||||
defaultRecordProcessor,
|
||||
checkpointer,
|
||||
|
|
@ -133,6 +136,7 @@ public class ShutdownTaskTest {
|
|||
kinesisProxy,
|
||||
INITIAL_POSITION_TRIM_HORIZON,
|
||||
cleanupLeasesOfCompletedShards,
|
||||
ignoreUnexpectedChildShards,
|
||||
leaseManager,
|
||||
TASK_BACKOFF_TIME_MILLIS,
|
||||
getRecordsCache);
|
||||
|
|
@ -147,7 +151,7 @@ public class ShutdownTaskTest {
|
|||
*/
|
||||
@Test
|
||||
public final void testGetTaskType() {
|
||||
ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0, getRecordsCache);
|
||||
ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, null, 0, getRecordsCache);
|
||||
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -90,6 +90,7 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcess
|
|||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerCWMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerThreadPoolExecutor;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
|
|
@ -1474,6 +1475,31 @@ public class WorkerTest {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBuilderWithDefaultKinesisProxy() {
|
||||
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
||||
Worker worker = new Worker.Builder()
|
||||
.recordProcessorFactory(recordProcessorFactory)
|
||||
.config(config)
|
||||
.build();
|
||||
Assert.assertNotNull(worker.getStreamConfig().getStreamProxy());
|
||||
Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisProxy);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBuilderWhenKinesisProxyIsSet() {
|
||||
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
||||
// Create an instance of KinesisLocalFileProxy for injection and validation
|
||||
IKinesisProxy kinesisProxy = mock(KinesisLocalFileProxy.class);
|
||||
Worker worker = new Worker.Builder()
|
||||
.recordProcessorFactory(recordProcessorFactory)
|
||||
.config(config)
|
||||
.kinesisProxy(kinesisProxy)
|
||||
.build();
|
||||
Assert.assertNotNull(worker.getStreamConfig().getStreamProxy());
|
||||
Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisLocalFileProxy);
|
||||
}
|
||||
|
||||
private abstract class InjectableWorker extends Worker {
|
||||
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config, StreamConfig streamConfig,
|
||||
|
|
|
|||
Loading…
Reference in a new issue