From 85b6b9f1510875a539bfcdf4920f534635d709dc Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Wed, 17 Aug 2016 07:45:33 -0700 Subject: [PATCH] Allow Prioritization of Parent Shard Tasks Added a new interface that allows the worker to prioritize which lease assignment it will work on next. When using the ParentsFirstshardprioritization the worker will select parents for processing before selecting children. This will prevent ShardConsumers from spending time sleeping in the WAITING_ON_PARENT_SHARDS state. --- .../worker/KinesisClientLibConfiguration.java | 25 +++ .../KinesisClientLibLeaseCoordinator.java | 3 +- .../lib/worker/NoOpShardPrioritization.java | 21 +++ .../ParentsFirstShardPrioritization.java | 135 +++++++++++++++ .../lib/worker/ShardConsumer.java | 39 ++--- .../clientlibrary/lib/worker/ShardInfo.java | 64 ++++++- .../lib/worker/ShardPrioritization.java | 19 ++ .../clientlibrary/lib/worker/Worker.java | 38 +++- .../worker/BlockOnParentShardTaskTest.java | 15 +- .../lib/worker/KinesisDataFetcherTest.java | 2 +- ...rentsFirstShardPrioritizationUnitTest.java | 162 ++++++++++++++++++ .../lib/worker/ProcessTaskTest.java | 2 +- .../RecordProcessorCheckpointerTest.java | 16 +- .../lib/worker/ShardConsumerTest.java | 10 +- .../lib/worker/ShardInfoTest.java | 33 ++-- .../lib/worker/ShutdownTaskTest.java | 6 +- .../clientlibrary/lib/worker/WorkerTest.java | 23 ++- 17 files changed, 536 insertions(+), 77 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NoOpShardPrioritization.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritization.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardPrioritization.java create mode 100644 src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritizationUnitTest.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 9a6fb4f5..590382b4 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -154,6 +154,10 @@ public class KinesisClientLibConfiguration { */ public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10; + /** + * Default Shard prioritization strategy. + */ + public static final ShardPrioritization DEFAULT_SHARD_PRIORITIZATION = new NoOpShardPrioritization(); private String applicationName; private String tableName; @@ -187,6 +191,7 @@ public class KinesisClientLibConfiguration { private int initialLeaseTableReadCapacity; private int initialLeaseTableWriteCapacity; private InitialPositionInStreamExtended initialPositionInStreamExtended; + private ShardPrioritization shardPrioritization; /** * Constructor. @@ -333,6 +338,7 @@ public class KinesisClientLibConfiguration { this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; this.initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); + this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; } // Check if value is positive, otherwise throw an exception @@ -599,6 +605,13 @@ public class KinesisClientLibConfiguration { return initialPositionInStreamExtended.getTimestamp(); } + /** + * @return Shard prioritization strategy. + */ + public ShardPrioritization getShardPrioritizationStrategy() { + return shardPrioritization; + } + // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES /** * @param tableName name of the lease table in DynamoDB @@ -913,4 +926,16 @@ public class KinesisClientLibConfiguration { this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; return this; } + + /** + * @param shardPrioritization Implementation of ShardPrioritization interface that should be used during processing. + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withShardPrioritizationStrategy(ShardPrioritization shardPrioritization) { + if (shardPrioritization == null) { + throw new IllegalArgumentException("shardPrioritization cannot be null"); + } + this.shardPrioritization = shardPrioritization; + return this; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java index 20ca4d90..0119dcc3 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java @@ -209,7 +209,8 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator prioritize(List original) { + return original; + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritization.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritization.java new file mode 100644 index 00000000..dbacbd98 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritization.java @@ -0,0 +1,135 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Shard Prioritization that prioritizes parent shards first. + * It also limits number of shards that will be available for initialization based on their depth. + * It doesn't make a lot of sense to work on a shard that has too many unfinished parents. + */ +public class ParentsFirstShardPrioritization implements + ShardPrioritization { + private static final SortingNode PROCESSING_NODE = new SortingNode(null, Integer.MIN_VALUE); + + private final int maxDepth; + + /** + * Creates ParentFirst prioritization with filtering based on depth of the shard. + * Shards that have depth > maxDepth will be ignored and will not be returned by this prioritization. + * + * @param maxDepth any shard that is deeper than max depth, will be excluded from processing + */ + public ParentsFirstShardPrioritization(int maxDepth) { + /* Depth 0 means that shard is completed or cannot be found, + * it is impossible to process such shards. + */ + if (maxDepth <= 0) { + throw new IllegalArgumentException("Max depth cannot be negative or zero. Provided value: " + maxDepth); + } + this.maxDepth = maxDepth; + } + + @Override + public List prioritize(List original) { + Map shards = new HashMap<>(); + for (ShardInfo shardInfo : original) { + shards.put(shardInfo.getShardId(), + shardInfo); + } + + Map processedNodes = new HashMap<>(); + + for (ShardInfo shardInfo : original) { + populateDepth(shardInfo.getShardId(), + shards, + processedNodes); + } + + List orderedInfos = new ArrayList<>(original.size()); + + List orderedNodes = new ArrayList<>(processedNodes.values()); + Collections.sort(orderedNodes); + + for (SortingNode sortingTreeNode : orderedNodes) { + // don't process shards with depth > maxDepth + if (sortingTreeNode.getDepth() <= maxDepth) { + orderedInfos.add(sortingTreeNode.shardInfo); + } + } + return orderedInfos; + } + + private int populateDepth(String shardId, + Map shards, + Map processedNodes) { + SortingNode processed = processedNodes.get(shardId); + if (processed != null) { + if (processed == PROCESSING_NODE) { + throw new IllegalArgumentException("Circular dependency detected. Shard Id " + + shardId + " is processed twice"); + } + return processed.getDepth(); + } + + ShardInfo shardInfo = shards.get(shardId); + if (shardInfo == null) { + // parent doesn't exist in our list, so this shard is root-level node + return 0; + } + + if (shardInfo.isCompleted()) { + // we treat completed shards as 0-level + return 0; + } + + // storing processing node to make sure we track progress and avoid circular dependencies + processedNodes.put(shardId, PROCESSING_NODE); + + int maxParentDepth = 0; + for (String parentId : shardInfo.getParentShardIds()) { + maxParentDepth = Math.max(maxParentDepth, + populateDepth(parentId, + shards, + processedNodes)); + } + + int currentNodeLevel = maxParentDepth + 1; + SortingNode previousValue = processedNodes.put(shardId, + new SortingNode(shardInfo, + currentNodeLevel)); + if (previousValue != PROCESSING_NODE) { + throw new IllegalStateException("Validation failed. Depth for shardId " + shardId + " was populated twice"); + } + + return currentNodeLevel; + } + + /** + * Class to store depth of shards during prioritization. + */ + private static class SortingNode implements + Comparable { + private final ShardInfo shardInfo; + private final int depth; + + public SortingNode(ShardInfo shardInfo, + int depth) { + this.shardInfo = shardInfo; + this.depth = depth; + } + + public int getDepth() { + return depth; + } + + @Override + public int compareTo(SortingNode o) { + return Integer.compare(depth, + o.depth); + } + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 10dacc04..b6cc76aa 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -297,36 +297,35 @@ class ShardConsumer { */ // CHECKSTYLE:OFF CyclomaticComplexity void updateState(boolean taskCompletedSuccessfully) { + if (currentState == ShardConsumerState.SHUTDOWN_COMPLETE) { + // Shutdown was completed and there nothing we can do after that + return; + } + if ((currentTask == null) && beginShutdown) { + // Shard didn't start any tasks and can be shutdown fast + currentState = ShardConsumerState.SHUTDOWN_COMPLETE; + return; + } + if (beginShutdown && currentState != ShardConsumerState.SHUTTING_DOWN) { + // Shard received signal to start shutdown. + // Whatever task we were working on should be stopped and shutdown task should be executed + currentState = ShardConsumerState.SHUTTING_DOWN; + return; + } switch (currentState) { case WAITING_ON_PARENT_SHARDS: if (taskCompletedSuccessfully && TaskType.BLOCK_ON_PARENT_SHARDS.equals(currentTask.getTaskType())) { - if (beginShutdown) { - currentState = ShardConsumerState.SHUTTING_DOWN; - } else { - currentState = ShardConsumerState.INITIALIZING; - } - } else if ((currentTask == null) && beginShutdown) { - currentState = ShardConsumerState.SHUTDOWN_COMPLETE; + currentState = ShardConsumerState.INITIALIZING; } break; case INITIALIZING: if (taskCompletedSuccessfully && TaskType.INITIALIZE.equals(currentTask.getTaskType())) { - if (beginShutdown) { - currentState = ShardConsumerState.SHUTTING_DOWN; - } else { - currentState = ShardConsumerState.PROCESSING; - } - } else if ((currentTask == null) && beginShutdown) { - currentState = ShardConsumerState.SHUTDOWN_COMPLETE; + currentState = ShardConsumerState.PROCESSING; } break; case PROCESSING: if (taskCompletedSuccessfully && TaskType.PROCESS.equals(currentTask.getTaskType())) { - if (beginShutdown) { - currentState = ShardConsumerState.SHUTTING_DOWN; - } else { - currentState = ShardConsumerState.PROCESSING; - } + currentState = ShardConsumerState.PROCESSING; } break; case SHUTTING_DOWN: @@ -335,8 +334,6 @@ class ShardConsumer { currentState = ShardConsumerState.SHUTDOWN_COMPLETE; } break; - case SHUTDOWN_COMPLETE: - break; default: LOG.error("Unexpected state: " + currentState); break; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java index 54f64568..9890d02f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java @@ -19,6 +19,8 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; + /** * Used to pass shard related info among different classes and as a key to the map of shard consumers. */ @@ -28,13 +30,18 @@ class ShardInfo { private final String concurrencyToken; // Sorted list of parent shardIds. private final List parentShardIds; + private final ExtendedSequenceNumber checkpoint; /** * @param shardId Kinesis shardId * @param concurrencyToken Used to differentiate between lost and reclaimed leases * @param parentShardIds Parent shards of the shard identified by Kinesis shardId + * @param checkpoint the latest checkpoint from lease */ - public ShardInfo(String shardId, String concurrencyToken, Collection parentShardIds) { + public ShardInfo(String shardId, + String concurrencyToken, + Collection parentShardIds, + ExtendedSequenceNumber checkpoint) { this.shardId = shardId; this.concurrencyToken = concurrencyToken; this.parentShardIds = new LinkedList(); @@ -44,6 +51,7 @@ class ShardInfo { // ShardInfo stores parent shard Ids in canonical order in the parentShardIds list. // This makes it easy to check for equality in ShardInfo.equals method. Collections.sort(this.parentShardIds); + this.checkpoint = checkpoint; } /** @@ -67,6 +75,13 @@ class ShardInfo { return new LinkedList(parentShardIds); } + /** + * @return completion status of the shard + */ + protected boolean isCompleted() { + return ExtendedSequenceNumber.SHARD_END.equals(checkpoint); + } + /** * {@inheritDoc} */ @@ -77,6 +92,7 @@ class ShardInfo { result = prime * result + ((concurrencyToken == null) ? 0 : concurrencyToken.hashCode()); result = prime * result + ((parentShardIds == null) ? 0 : parentShardIds.hashCode()); result = prime * result + ((shardId == null) ? 0 : shardId.hashCode()); + result = prime * result + ((checkpoint == null) ? 0 : checkpoint.hashCode()); return result; } @@ -126,6 +142,13 @@ class ShardInfo { } else if (!shardId.equals(other.shardId)) { return false; } + if (checkpoint == null) { + if (other.checkpoint != null) { + return false; + } + } else if (!checkpoint.equals(other.checkpoint)) { + return false; + } return true; } @@ -135,7 +158,44 @@ class ShardInfo { @Override public String toString() { return "ShardInfo [shardId=" + shardId + ", concurrencyToken=" + concurrencyToken + ", parentShardIds=" - + parentShardIds + "]"; + + parentShardIds + ", checkpoint=" + checkpoint + "]"; + } + + /** + * Builder class for ShardInfo. + */ + public static class Builder { + private String shardId; + private String concurrencyToken; + private List parentShardIds = Collections.emptyList(); + private ExtendedSequenceNumber checkpoint = ExtendedSequenceNumber.LATEST; + + public Builder() { + } + + public Builder withShardId(String shardId) { + this.shardId = shardId; + return this; + } + + public Builder withConcurrencyToken(String concurrencyToken) { + this.concurrencyToken = concurrencyToken; + return this; + } + + public Builder withParentShards(List parentShardIds) { + this.parentShardIds = parentShardIds; + return this; + } + + public Builder withCheckpoint(ExtendedSequenceNumber checkpoint) { + this.checkpoint = checkpoint; + return this; + } + + public ShardInfo build() { + return new ShardInfo(shardId, concurrencyToken, parentShardIds, checkpoint); + } } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardPrioritization.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardPrioritization.java new file mode 100644 index 00000000..54f7517d --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardPrioritization.java @@ -0,0 +1,19 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.util.List; + +/** + * Provides logic to prioritize or filter shards before their execution. + */ +public interface ShardPrioritization { + + /** + * Returns new list of shards ordered based on their priority. + * Resulted list may have fewer shards compared to original list + * + * @param original + * list of shards needed to be prioritized + * @return new list that contains only shards that should be processed + */ + List prioritize(List original); +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 50861c29..3da1f2cd 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -38,6 +38,7 @@ import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; @@ -80,6 +81,8 @@ public class Worker implements Runnable { private final KinesisClientLibLeaseCoordinator leaseCoordinator; private final ShardSyncTaskManager controlServer; + private final ShardPrioritization shardPrioritization; + private volatile boolean shutdown; private volatile long shutdownStartTimeMillis; @@ -231,7 +234,8 @@ public class Worker implements Runnable { execService, metricsFactory, config.getTaskBackoffTimeMillis(), - config.getFailoverTimeMillis()); + config.getFailoverTimeMillis(), + config.getShardPrioritizationStrategy()); // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { Region region = RegionUtils.getRegion(config.getRegionName()); @@ -271,6 +275,7 @@ public class Worker implements Runnable { * consumption) * @param metricsFactory Metrics factory used to emit metrics * @param taskBackoffTimeMillis Backoff period when tasks encounter an exception + * @param shardPrioritization Provides prioritization logic to decide which available shards process first */ // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES @@ -286,7 +291,8 @@ public class Worker implements Runnable { ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, - long failoverTimeMillis) { + long failoverTimeMillis, + ShardPrioritization shardPrioritization) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; this.streamConfig = streamConfig; @@ -308,6 +314,7 @@ public class Worker implements Runnable { executorService); this.taskBackoffTimeMillis = taskBackoffTimeMillis; this.failoverTimeMillis = failoverTimeMillis; + this.shardPrioritization = shardPrioritization; } /** @@ -449,12 +456,13 @@ public class Worker implements Runnable { private List getShardInfoForAssignments() { List assignedStreamShards = leaseCoordinator.getCurrentAssignments(); + List prioritizedShards = shardPrioritization.prioritize(assignedStreamShards); - if ((assignedStreamShards != null) && (!assignedStreamShards.isEmpty())) { + if ((prioritizedShards != null) && (!prioritizedShards.isEmpty())) { if (wlog.isInfoEnabled()) { StringBuilder builder = new StringBuilder(); boolean firstItem = true; - for (ShardInfo shardInfo : assignedStreamShards) { + for (ShardInfo shardInfo : prioritizedShards) { if (!firstItem) { builder.append(", "); } @@ -467,7 +475,7 @@ public class Worker implements Runnable { wlog.info("No activities assigned"); } - return assignedStreamShards; + return prioritizedShards; } /** @@ -780,6 +788,7 @@ public class Worker implements Runnable { private AmazonCloudWatch cloudWatchClient; private IMetricsFactory metricsFactory; private ExecutorService execService; + private ShardPrioritization shardPrioritization; /** * Default constructor. @@ -879,6 +888,19 @@ public class Worker implements Runnable { return this; } + /** + * Provides logic how to prioritize shard processing. + * + * @param shardPrioritization + * shardPrioritization is responsible to order shards before processing + * + * @return A reference to this updated object so that method calls can be chained together. + */ + public Builder shardPrioritization(ShardPrioritization shardPrioritization) { + this.shardPrioritization = shardPrioritization; + return this; + } + /** * Build the Worker instance. * @@ -937,6 +959,9 @@ public class Worker implements Runnable { if (metricsFactory == null) { metricsFactory = getMetricsFactory(cloudWatchClient, config); } + if (shardPrioritization == null) { + shardPrioritization = new ParentsFirstShardPrioritization(1); + } return new Worker(config.getApplicationName(), recordProcessorFactory, @@ -965,7 +990,8 @@ public class Worker implements Runnable { execService, metricsFactory, config.getTaskBackoffTimeMillis(), - config.getFailoverTimeMillis()); + config.getFailoverTimeMillis(), + shardPrioritization); } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTaskTest.java index 4c98701a..a42e0683 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTaskTest.java @@ -20,12 +20,11 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.List; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -47,7 +46,7 @@ public class BlockOnParentShardTaskTest { private final String shardId = "shardId-97"; private final String concurrencyToken = "testToken"; private final List emptyParentShardIds = new ArrayList(); - ShardInfo defaultShardInfo = new ShardInfo(shardId, concurrencyToken, emptyParentShardIds); + ShardInfo defaultShardInfo = new ShardInfo(shardId, concurrencyToken, emptyParentShardIds, ExtendedSequenceNumber.TRIM_HORIZON); /** * @throws java.lang.Exception @@ -122,14 +121,14 @@ public class BlockOnParentShardTaskTest { // test single parent parentShardIds.add(parent1ShardId); - shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds); + shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON); task = new BlockOnParentShardTask(shardInfo, leaseManager, backoffTimeInMillis); result = task.call(); Assert.assertNull(result.getException()); // test two parents parentShardIds.add(parent2ShardId); - shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds); + shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON); task = new BlockOnParentShardTask(shardInfo, leaseManager, backoffTimeInMillis); result = task.call(); Assert.assertNull(result.getException()); @@ -164,14 +163,14 @@ public class BlockOnParentShardTaskTest { // test single parent parentShardIds.add(parent1ShardId); - shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds); + shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON); task = new BlockOnParentShardTask(shardInfo, leaseManager, backoffTimeInMillis); result = task.call(); Assert.assertNotNull(result.getException()); // test two parents parentShardIds.add(parent2ShardId); - shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds); + shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON); task = new BlockOnParentShardTask(shardInfo, leaseManager, backoffTimeInMillis); result = task.call(); Assert.assertNotNull(result.getException()); @@ -191,7 +190,7 @@ public class BlockOnParentShardTaskTest { String parentShardId = "shardId-1"; List parentShardIds = new ArrayList<>(); parentShardIds.add(parentShardId); - ShardInfo shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds); + ShardInfo shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON); TaskResult result = null; KinesisClientLease parentLease = new KinesisClientLease(); ILeaseManager leaseManager = mock(ILeaseManager.class); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java index 556a1e0e..dd56a256 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java @@ -48,7 +48,7 @@ public class KinesisDataFetcherTest { private static final int MAX_RECORDS = 1; private static final String SHARD_ID = "shardId-1"; private static final String AT_SEQUENCE_NUMBER = ShardIteratorType.AT_SEQUENCE_NUMBER.toString(); - private static final ShardInfo SHARD_INFO = new ShardInfo(SHARD_ID, null, null); + private static final ShardInfo SHARD_INFO = new ShardInfo(SHARD_ID, null, null, null); private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST); private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritizationUnitTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritizationUnitTest.java new file mode 100644 index 00000000..35b56b32 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritizationUnitTest.java @@ -0,0 +1,162 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import org.junit.Test; + +public class ParentsFirstShardPrioritizationUnitTest { + + @Test(expected = IllegalArgumentException.class) + public void testMaxDepthNegativeShouldFail() { + new ParentsFirstShardPrioritization(-1); + } + + @Test(expected = IllegalArgumentException.class) + public void testMaxDepthZeroShouldFail() { + new ParentsFirstShardPrioritization(0); + } + + @Test + public void testMaxDepthPositiveShouldNotFail() { + new ParentsFirstShardPrioritization(1); + } + + @Test + public void testSorting() { + Random random = new Random(987654); + int numberOfShards = 7; + + List shardIdsDependencies = new ArrayList<>(); + shardIdsDependencies.add("unknown"); + List original = new ArrayList<>(); + for (int shardNumber = 0; shardNumber < numberOfShards; shardNumber++) { + String shardId = shardId(shardNumber); + original.add(shardInfo(shardId, shardIdsDependencies)); + shardIdsDependencies.add(shardId); + } + + ParentsFirstShardPrioritization ordering = new ParentsFirstShardPrioritization(Integer.MAX_VALUE); + + // shuffle original list as it is already ordered in right way + Collections.shuffle(original, random); + List ordered = ordering.prioritize(original); + + assertEquals(numberOfShards, ordered.size()); + for (int shardNumber = 0; shardNumber < numberOfShards; shardNumber++) { + String shardId = shardId(shardNumber); + assertEquals(shardId, ordered.get(shardNumber).getShardId()); + } + } + + @Test + public void testSortingAndFiltering() { + Random random = new Random(45677); + int numberOfShards = 10; + + List shardIdsDependencies = new ArrayList<>(); + shardIdsDependencies.add("unknown"); + List original = new ArrayList<>(); + for (int shardNumber = 0; shardNumber < numberOfShards; shardNumber++) { + String shardId = shardId(shardNumber); + original.add(shardInfo(shardId, shardIdsDependencies)); + shardIdsDependencies.add(shardId); + } + + int maxDepth = 3; + ParentsFirstShardPrioritization ordering = new ParentsFirstShardPrioritization(maxDepth); + + // shuffle original list as it is already ordered in right way + Collections.shuffle(original, random); + List ordered = ordering.prioritize(original); + // in this case every shard has its own level, so we don't expect to + // have more shards than max depth + assertEquals(maxDepth, ordered.size()); + + for (int shardNumber = 0; shardNumber < maxDepth; shardNumber++) { + String shardId = shardId(shardNumber); + assertEquals(shardId, ordered.get(shardNumber).getShardId()); + } + } + + @Test + public void testSimpleOrdering() { + Random random = new Random(1234); + int numberOfShards = 10; + + String parentId = "unknown"; + List original = new ArrayList<>(); + for (int shardNumber = 0; shardNumber < numberOfShards; shardNumber++) { + String shardId = shardId(shardNumber); + original.add(shardInfo(shardId, parentId)); + parentId = shardId; + } + + ParentsFirstShardPrioritization ordering = new ParentsFirstShardPrioritization(Integer.MAX_VALUE); + + // shuffle original list as it is already ordered in right way + Collections.shuffle(original, random); + List ordered = ordering.prioritize(original); + assertEquals(numberOfShards, ordered.size()); + for (int shardNumber = 0; shardNumber < numberOfShards; shardNumber++) { + String shardId = shardId(shardNumber); + assertEquals(shardId, ordered.get(shardNumber).getShardId()); + } + } + + /** + * This should be impossible as shards don't have circular dependencies, + * but this code should handle it properly and fail + */ + @Test + public void testCircularDependencyBetweenShards() { + Random random = new Random(13468798); + int numberOfShards = 10; + + // shard-0 will point in middle shard (shard-5) in current test + String parentId = shardId(numberOfShards / 2); + List original = new ArrayList<>(); + for (int shardNumber = 0; shardNumber < numberOfShards; shardNumber++) { + String shardId = shardId(shardNumber); + original.add(shardInfo(shardId, parentId)); + parentId = shardId; + } + + ParentsFirstShardPrioritization ordering = new ParentsFirstShardPrioritization(Integer.MAX_VALUE); + + // shuffle original list as it is already ordered in right way + Collections.shuffle(original, random); + try { + ordering.prioritize(original); + fail("Processing should fail in case we have circular dependency"); + } catch (IllegalArgumentException expected) { + + } + } + + private String shardId(int shardNumber) { + return "shardId-" + shardNumber; + } + + private static ShardInfo shardInfo(String shardId, List parentShardIds) { + // copy into new list just in case ShardInfo will stop doing it + List newParentShardIds = new ArrayList<>(parentShardIds); + return new ShardInfo.Builder() + .withShardId(shardId) + .withParentShards(newParentShardIds) + .build(); + } + + private static ShardInfo shardInfo(String shardId, String... parentShardIds) { + return new ShardInfo.Builder() + .withShardId(shardId) + .withParentShards(Arrays.asList(parentShardIds)) + .build(); + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java index 6576e47f..f1d908f0 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java @@ -87,7 +87,7 @@ public class ProcessTaskTest { new StreamConfig(null, maxRecords, idleTimeMillis, callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - final ShardInfo shardInfo = new ShardInfo(shardId, null, null); + final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); processTask = new ProcessTask( shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java index 4741ea14..d5f6b53f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java @@ -75,7 +75,7 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testCheckpoint() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null); + ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); // First call to checkpoint RecordProcessorCheckpointer processingCheckpointer = @@ -98,7 +98,7 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testCheckpointRecord() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null); + ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); SequenceNumberValidator sequenceNumberValidator = new SequenceNumberValidator(null, shardId, false); RecordProcessorCheckpointer processingCheckpointer = @@ -117,7 +117,7 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testCheckpointSubRecord() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null); + ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); SequenceNumberValidator sequenceNumberValidator = new SequenceNumberValidator(null, shardId, false); RecordProcessorCheckpointer processingCheckpointer = @@ -137,7 +137,7 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testCheckpointSequenceNumber() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null); + ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); SequenceNumberValidator sequenceNumberValidator = new SequenceNumberValidator(null, shardId, false); RecordProcessorCheckpointer processingCheckpointer = @@ -155,7 +155,7 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testCheckpointExtendedSequenceNumber() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null); + ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); SequenceNumberValidator sequenceNumberValidator = new SequenceNumberValidator(null, shardId, false); RecordProcessorCheckpointer processingCheckpointer = @@ -173,7 +173,7 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testUpdate() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null); + ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); RecordProcessorCheckpointer checkpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, null); @@ -193,7 +193,7 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testClientSpecifiedCheckpoint() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null); + ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); SequenceNumberValidator validator = mock(SequenceNumberValidator.class); Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); @@ -290,7 +290,7 @@ public class RecordProcessorCheckpointerTest { @SuppressWarnings("serial") @Test public final void testMixedCheckpointCalls() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null); + ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); SequenceNumberValidator validator = mock(SequenceNumberValidator.class); Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index d8d39377..26337381 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -92,7 +92,7 @@ public class ShardConsumerTest { @SuppressWarnings("unchecked") @Test public final void testInitializationStateUponFailure() throws Exception { - ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null); + ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); ICheckpoint checkpoint = mock(ICheckpoint.class); when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class); @@ -141,7 +141,7 @@ public class ShardConsumerTest { @SuppressWarnings("unchecked") @Test public final void testInitializationStateUponSubmissionFailure() throws Exception { - ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null); + ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); ICheckpoint checkpoint = mock(ICheckpoint.class); ExecutorService spyExecutorService = spy(executorService); @@ -189,7 +189,7 @@ public class ShardConsumerTest { @SuppressWarnings("unchecked") @Test public final void testRecordProcessorThrowable() throws Exception { - ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null); + ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); ICheckpoint checkpoint = mock(ICheckpoint.class); IRecordProcessor processor = mock(IRecordProcessor.class); IKinesisProxy streamProxy = mock(IKinesisProxy.class); @@ -289,7 +289,7 @@ public class ShardConsumerTest { callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null); + ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null); ShardConsumer consumer = new ShardConsumer(shardInfo, streamConfig, @@ -379,7 +379,7 @@ public class ShardConsumerTest { skipCheckpointValidationValue, atTimestamp); - ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null); + ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardConsumer consumer = new ShardConsumer(shardInfo, streamConfig, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfoTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfoTest.java index d62d880d..a2434d69 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfoTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfoTest.java @@ -20,11 +20,12 @@ import java.util.List; import java.util.Set; import java.util.UUID; -import junit.framework.Assert; - +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; + public class ShardInfoTest { private static final String CONCURRENCY_TOKEN = UUID.randomUUID().toString(); private static final String SHARD_ID = "shardId-test"; @@ -37,12 +38,12 @@ public class ShardInfoTest { parentShardIds.add("shard-1"); parentShardIds.add("shard-2"); - testShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds); + testShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST); } @Test public void testPacboyShardInfoEqualsWithSameArgs() { - ShardInfo equalShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds); + ShardInfo equalShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST); Assert.assertTrue("Equal should return true for arguments all the same", testShardInfo.equals(equalShardInfo)); } @@ -53,18 +54,18 @@ public class ShardInfoTest { @Test public void testPacboyShardInfoEqualsForShardId() { - ShardInfo diffShardInfo = new ShardInfo("shardId-diff", CONCURRENCY_TOKEN, parentShardIds); + ShardInfo diffShardInfo = new ShardInfo("shardId-diff", CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST); Assert.assertFalse("Equal should return false with different shard id", diffShardInfo.equals(testShardInfo)); - diffShardInfo = new ShardInfo(null, CONCURRENCY_TOKEN, parentShardIds); + diffShardInfo = new ShardInfo(null, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST); Assert.assertFalse("Equal should return false with null shard id", diffShardInfo.equals(testShardInfo)); } @Test public void testPacboyShardInfoEqualsForfToken() { - ShardInfo diffShardInfo = new ShardInfo(SHARD_ID, UUID.randomUUID().toString(), parentShardIds); + ShardInfo diffShardInfo = new ShardInfo(SHARD_ID, UUID.randomUUID().toString(), parentShardIds, ExtendedSequenceNumber.LATEST); Assert.assertFalse("Equal should return false with different concurrency token", diffShardInfo.equals(testShardInfo)); - diffShardInfo = new ShardInfo(SHARD_ID, null, parentShardIds); + diffShardInfo = new ShardInfo(SHARD_ID, null, parentShardIds, ExtendedSequenceNumber.LATEST); Assert.assertFalse("Equal should return false for null concurrency token", diffShardInfo.equals(testShardInfo)); } @@ -74,7 +75,7 @@ public class ShardInfoTest { differentlyOrderedParentShardIds.add("shard-2"); differentlyOrderedParentShardIds.add("shard-1"); ShardInfo shardInfoWithDifferentlyOrderedParentShardIds = - new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, differentlyOrderedParentShardIds); + new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, differentlyOrderedParentShardIds, ExtendedSequenceNumber.LATEST); Assert.assertTrue("Equal should return true even with parent shard Ids reordered", shardInfoWithDifferentlyOrderedParentShardIds.equals(testShardInfo)); } @@ -84,16 +85,24 @@ public class ShardInfoTest { Set diffParentIds = new HashSet<>(); diffParentIds.add("shard-3"); diffParentIds.add("shard-4"); - ShardInfo diffShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, diffParentIds); + ShardInfo diffShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, diffParentIds, ExtendedSequenceNumber.LATEST); Assert.assertFalse("Equal should return false with different parent shard Ids", diffShardInfo.equals(testShardInfo)); - diffShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, null); + diffShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, null, ExtendedSequenceNumber.LATEST); Assert.assertFalse("Equal should return false with null parent shard Ids", diffShardInfo.equals(testShardInfo)); } + @Test + public void testPacboyShardInfoEqualsForCheckpoint() { + ShardInfo diffShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.SHARD_END); + Assert.assertFalse("Equal should return false with different checkpoint", diffShardInfo.equals(testShardInfo)); + diffShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, null); + Assert.assertFalse("Equal should return false with null checkpoint", diffShardInfo.equals(testShardInfo)); + } + @Test public void testPacboyShardInfoSameHashCode() { - ShardInfo equalShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds); + ShardInfo equalShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST); Assert.assertTrue("Shard info objects should have same hashCode for the same arguments", equalShardInfo.hashCode() == testShardInfo.hashCode()); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index a2302ad0..67b42a0e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -20,10 +20,9 @@ import static org.mockito.Mockito.when; import java.util.HashSet; import java.util.Set; -import junit.framework.Assert; - import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -50,7 +49,8 @@ public class ShutdownTaskTest { String defaultShardId = "shardId-0000397840"; ShardInfo defaultShardInfo = new ShardInfo(defaultShardId, defaultConcurrencyToken, - defaultParentShardIds); + defaultParentShardIds, + ExtendedSequenceNumber.LATEST); IRecordProcessor defaultRecordProcessor = new TestStreamlet(); /** diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index f0b42671..a68c229b 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -105,6 +105,7 @@ public class WorkerTest { InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST); private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); + private final ShardPrioritization shardPrioritization = new NoOpShardPrioritization(); // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY = @@ -192,14 +193,15 @@ public class WorkerTest { execService, nullMetricsFactory, taskBackoffTimeMillis, - failoverTimeMillis); - ShardInfo shardInfo = new ShardInfo(dummyKinesisShardId, testConcurrencyToken, null); + failoverTimeMillis, + shardPrioritization); + ShardInfo shardInfo = new ShardInfo(dummyKinesisShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardConsumer consumer = worker.createOrGetShardConsumer(shardInfo, streamletFactory); Assert.assertNotNull(consumer); ShardConsumer consumer2 = worker.createOrGetShardConsumer(shardInfo, streamletFactory); Assert.assertSame(consumer, consumer2); ShardInfo shardInfoWithSameShardIdButDifferentConcurrencyToken = - new ShardInfo(dummyKinesisShardId, anotherConcurrencyToken, null); + new ShardInfo(dummyKinesisShardId, anotherConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardConsumer consumer3 = worker.createOrGetShardConsumer(shardInfoWithSameShardIdButDifferentConcurrencyToken, streamletFactory); Assert.assertNotNull(consumer3); @@ -241,12 +243,13 @@ public class WorkerTest { execService, nullMetricsFactory, taskBackoffTimeMillis, - failoverTimeMillis); + failoverTimeMillis, + shardPrioritization); - ShardInfo shardInfo1 = new ShardInfo(dummyKinesisShardId, concurrencyToken, null); + ShardInfo shardInfo1 = new ShardInfo(dummyKinesisShardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardInfo duplicateOfShardInfo1ButWithAnotherConcurrencyToken = - new ShardInfo(dummyKinesisShardId, anotherConcurrencyToken, null); - ShardInfo shardInfo2 = new ShardInfo(anotherDummyKinesisShardId, concurrencyToken, null); + new ShardInfo(dummyKinesisShardId, anotherConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); + ShardInfo shardInfo2 = new ShardInfo(anotherDummyKinesisShardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardConsumer consumerOfShardInfo1 = worker.createOrGetShardConsumer(shardInfo1, streamletFactory); ShardConsumer consumerOfDuplicateOfShardInfo1ButWithAnotherConcurrencyToken = @@ -297,7 +300,8 @@ public class WorkerTest { execService, nullMetricsFactory, taskBackoffTimeMillis, - failoverTimeMillis); + failoverTimeMillis, + shardPrioritization); worker.run(); Assert.assertTrue(count > 0); } @@ -745,7 +749,8 @@ public class WorkerTest { executorService, metricsFactory, taskBackoffTimeMillis, - failoverTimeMillis); + failoverTimeMillis, + shardPrioritization); WorkerThread workerThread = new WorkerThread(worker); workerThread.start();