diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 9a6fb4f5..590382b4 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -154,6 +154,10 @@ public class KinesisClientLibConfiguration { */ public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10; + /** + * Default Shard prioritization strategy. + */ + public static final ShardPrioritization DEFAULT_SHARD_PRIORITIZATION = new NoOpShardPrioritization(); private String applicationName; private String tableName; @@ -187,6 +191,7 @@ public class KinesisClientLibConfiguration { private int initialLeaseTableReadCapacity; private int initialLeaseTableWriteCapacity; private InitialPositionInStreamExtended initialPositionInStreamExtended; + private ShardPrioritization shardPrioritization; /** * Constructor. @@ -333,6 +338,7 @@ public class KinesisClientLibConfiguration { this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; this.initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); + this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; } // Check if value is positive, otherwise throw an exception @@ -599,6 +605,13 @@ public class KinesisClientLibConfiguration { return initialPositionInStreamExtended.getTimestamp(); } + /** + * @return Shard prioritization strategy. + */ + public ShardPrioritization getShardPrioritizationStrategy() { + return shardPrioritization; + } + // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES /** * @param tableName name of the lease table in DynamoDB @@ -913,4 +926,16 @@ public class KinesisClientLibConfiguration { this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; return this; } + + /** + * @param shardPrioritization Implementation of ShardPrioritization interface that should be used during processing. + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withShardPrioritizationStrategy(ShardPrioritization shardPrioritization) { + if (shardPrioritization == null) { + throw new IllegalArgumentException("shardPrioritization cannot be null"); + } + this.shardPrioritization = shardPrioritization; + return this; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java index 20ca4d90..0119dcc3 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java @@ -209,7 +209,8 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator prioritize(List original) { + return original; + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritization.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritization.java new file mode 100644 index 00000000..dbacbd98 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritization.java @@ -0,0 +1,135 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Shard Prioritization that prioritizes parent shards first. + * It also limits number of shards that will be available for initialization based on their depth. + * It doesn't make a lot of sense to work on a shard that has too many unfinished parents. + */ +public class ParentsFirstShardPrioritization implements + ShardPrioritization { + private static final SortingNode PROCESSING_NODE = new SortingNode(null, Integer.MIN_VALUE); + + private final int maxDepth; + + /** + * Creates ParentFirst prioritization with filtering based on depth of the shard. + * Shards that have depth > maxDepth will be ignored and will not be returned by this prioritization. + * + * @param maxDepth any shard that is deeper than max depth, will be excluded from processing + */ + public ParentsFirstShardPrioritization(int maxDepth) { + /* Depth 0 means that shard is completed or cannot be found, + * it is impossible to process such shards. + */ + if (maxDepth <= 0) { + throw new IllegalArgumentException("Max depth cannot be negative or zero. Provided value: " + maxDepth); + } + this.maxDepth = maxDepth; + } + + @Override + public List prioritize(List original) { + Map shards = new HashMap<>(); + for (ShardInfo shardInfo : original) { + shards.put(shardInfo.getShardId(), + shardInfo); + } + + Map processedNodes = new HashMap<>(); + + for (ShardInfo shardInfo : original) { + populateDepth(shardInfo.getShardId(), + shards, + processedNodes); + } + + List orderedInfos = new ArrayList<>(original.size()); + + List orderedNodes = new ArrayList<>(processedNodes.values()); + Collections.sort(orderedNodes); + + for (SortingNode sortingTreeNode : orderedNodes) { + // don't process shards with depth > maxDepth + if (sortingTreeNode.getDepth() <= maxDepth) { + orderedInfos.add(sortingTreeNode.shardInfo); + } + } + return orderedInfos; + } + + private int populateDepth(String shardId, + Map shards, + Map processedNodes) { + SortingNode processed = processedNodes.get(shardId); + if (processed != null) { + if (processed == PROCESSING_NODE) { + throw new IllegalArgumentException("Circular dependency detected. Shard Id " + + shardId + " is processed twice"); + } + return processed.getDepth(); + } + + ShardInfo shardInfo = shards.get(shardId); + if (shardInfo == null) { + // parent doesn't exist in our list, so this shard is root-level node + return 0; + } + + if (shardInfo.isCompleted()) { + // we treat completed shards as 0-level + return 0; + } + + // storing processing node to make sure we track progress and avoid circular dependencies + processedNodes.put(shardId, PROCESSING_NODE); + + int maxParentDepth = 0; + for (String parentId : shardInfo.getParentShardIds()) { + maxParentDepth = Math.max(maxParentDepth, + populateDepth(parentId, + shards, + processedNodes)); + } + + int currentNodeLevel = maxParentDepth + 1; + SortingNode previousValue = processedNodes.put(shardId, + new SortingNode(shardInfo, + currentNodeLevel)); + if (previousValue != PROCESSING_NODE) { + throw new IllegalStateException("Validation failed. Depth for shardId " + shardId + " was populated twice"); + } + + return currentNodeLevel; + } + + /** + * Class to store depth of shards during prioritization. + */ + private static class SortingNode implements + Comparable { + private final ShardInfo shardInfo; + private final int depth; + + public SortingNode(ShardInfo shardInfo, + int depth) { + this.shardInfo = shardInfo; + this.depth = depth; + } + + public int getDepth() { + return depth; + } + + @Override + public int compareTo(SortingNode o) { + return Integer.compare(depth, + o.depth); + } + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 10dacc04..b6cc76aa 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -297,36 +297,35 @@ class ShardConsumer { */ // CHECKSTYLE:OFF CyclomaticComplexity void updateState(boolean taskCompletedSuccessfully) { + if (currentState == ShardConsumerState.SHUTDOWN_COMPLETE) { + // Shutdown was completed and there nothing we can do after that + return; + } + if ((currentTask == null) && beginShutdown) { + // Shard didn't start any tasks and can be shutdown fast + currentState = ShardConsumerState.SHUTDOWN_COMPLETE; + return; + } + if (beginShutdown && currentState != ShardConsumerState.SHUTTING_DOWN) { + // Shard received signal to start shutdown. + // Whatever task we were working on should be stopped and shutdown task should be executed + currentState = ShardConsumerState.SHUTTING_DOWN; + return; + } switch (currentState) { case WAITING_ON_PARENT_SHARDS: if (taskCompletedSuccessfully && TaskType.BLOCK_ON_PARENT_SHARDS.equals(currentTask.getTaskType())) { - if (beginShutdown) { - currentState = ShardConsumerState.SHUTTING_DOWN; - } else { - currentState = ShardConsumerState.INITIALIZING; - } - } else if ((currentTask == null) && beginShutdown) { - currentState = ShardConsumerState.SHUTDOWN_COMPLETE; + currentState = ShardConsumerState.INITIALIZING; } break; case INITIALIZING: if (taskCompletedSuccessfully && TaskType.INITIALIZE.equals(currentTask.getTaskType())) { - if (beginShutdown) { - currentState = ShardConsumerState.SHUTTING_DOWN; - } else { - currentState = ShardConsumerState.PROCESSING; - } - } else if ((currentTask == null) && beginShutdown) { - currentState = ShardConsumerState.SHUTDOWN_COMPLETE; + currentState = ShardConsumerState.PROCESSING; } break; case PROCESSING: if (taskCompletedSuccessfully && TaskType.PROCESS.equals(currentTask.getTaskType())) { - if (beginShutdown) { - currentState = ShardConsumerState.SHUTTING_DOWN; - } else { - currentState = ShardConsumerState.PROCESSING; - } + currentState = ShardConsumerState.PROCESSING; } break; case SHUTTING_DOWN: @@ -335,8 +334,6 @@ class ShardConsumer { currentState = ShardConsumerState.SHUTDOWN_COMPLETE; } break; - case SHUTDOWN_COMPLETE: - break; default: LOG.error("Unexpected state: " + currentState); break; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java index 54f64568..9890d02f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java @@ -19,6 +19,8 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; + /** * Used to pass shard related info among different classes and as a key to the map of shard consumers. */ @@ -28,13 +30,18 @@ class ShardInfo { private final String concurrencyToken; // Sorted list of parent shardIds. private final List parentShardIds; + private final ExtendedSequenceNumber checkpoint; /** * @param shardId Kinesis shardId * @param concurrencyToken Used to differentiate between lost and reclaimed leases * @param parentShardIds Parent shards of the shard identified by Kinesis shardId + * @param checkpoint the latest checkpoint from lease */ - public ShardInfo(String shardId, String concurrencyToken, Collection parentShardIds) { + public ShardInfo(String shardId, + String concurrencyToken, + Collection parentShardIds, + ExtendedSequenceNumber checkpoint) { this.shardId = shardId; this.concurrencyToken = concurrencyToken; this.parentShardIds = new LinkedList(); @@ -44,6 +51,7 @@ class ShardInfo { // ShardInfo stores parent shard Ids in canonical order in the parentShardIds list. // This makes it easy to check for equality in ShardInfo.equals method. Collections.sort(this.parentShardIds); + this.checkpoint = checkpoint; } /** @@ -67,6 +75,13 @@ class ShardInfo { return new LinkedList(parentShardIds); } + /** + * @return completion status of the shard + */ + protected boolean isCompleted() { + return ExtendedSequenceNumber.SHARD_END.equals(checkpoint); + } + /** * {@inheritDoc} */ @@ -77,6 +92,7 @@ class ShardInfo { result = prime * result + ((concurrencyToken == null) ? 0 : concurrencyToken.hashCode()); result = prime * result + ((parentShardIds == null) ? 0 : parentShardIds.hashCode()); result = prime * result + ((shardId == null) ? 0 : shardId.hashCode()); + result = prime * result + ((checkpoint == null) ? 0 : checkpoint.hashCode()); return result; } @@ -126,6 +142,13 @@ class ShardInfo { } else if (!shardId.equals(other.shardId)) { return false; } + if (checkpoint == null) { + if (other.checkpoint != null) { + return false; + } + } else if (!checkpoint.equals(other.checkpoint)) { + return false; + } return true; } @@ -135,7 +158,44 @@ class ShardInfo { @Override public String toString() { return "ShardInfo [shardId=" + shardId + ", concurrencyToken=" + concurrencyToken + ", parentShardIds=" - + parentShardIds + "]"; + + parentShardIds + ", checkpoint=" + checkpoint + "]"; + } + + /** + * Builder class for ShardInfo. + */ + public static class Builder { + private String shardId; + private String concurrencyToken; + private List parentShardIds = Collections.emptyList(); + private ExtendedSequenceNumber checkpoint = ExtendedSequenceNumber.LATEST; + + public Builder() { + } + + public Builder withShardId(String shardId) { + this.shardId = shardId; + return this; + } + + public Builder withConcurrencyToken(String concurrencyToken) { + this.concurrencyToken = concurrencyToken; + return this; + } + + public Builder withParentShards(List parentShardIds) { + this.parentShardIds = parentShardIds; + return this; + } + + public Builder withCheckpoint(ExtendedSequenceNumber checkpoint) { + this.checkpoint = checkpoint; + return this; + } + + public ShardInfo build() { + return new ShardInfo(shardId, concurrencyToken, parentShardIds, checkpoint); + } } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardPrioritization.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardPrioritization.java new file mode 100644 index 00000000..54f7517d --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardPrioritization.java @@ -0,0 +1,19 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.util.List; + +/** + * Provides logic to prioritize or filter shards before their execution. + */ +public interface ShardPrioritization { + + /** + * Returns new list of shards ordered based on their priority. + * Resulted list may have fewer shards compared to original list + * + * @param original + * list of shards needed to be prioritized + * @return new list that contains only shards that should be processed + */ + List prioritize(List original); +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 50861c29..3da1f2cd 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -38,6 +38,7 @@ import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; @@ -80,6 +81,8 @@ public class Worker implements Runnable { private final KinesisClientLibLeaseCoordinator leaseCoordinator; private final ShardSyncTaskManager controlServer; + private final ShardPrioritization shardPrioritization; + private volatile boolean shutdown; private volatile long shutdownStartTimeMillis; @@ -231,7 +234,8 @@ public class Worker implements Runnable { execService, metricsFactory, config.getTaskBackoffTimeMillis(), - config.getFailoverTimeMillis()); + config.getFailoverTimeMillis(), + config.getShardPrioritizationStrategy()); // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { Region region = RegionUtils.getRegion(config.getRegionName()); @@ -271,6 +275,7 @@ public class Worker implements Runnable { * consumption) * @param metricsFactory Metrics factory used to emit metrics * @param taskBackoffTimeMillis Backoff period when tasks encounter an exception + * @param shardPrioritization Provides prioritization logic to decide which available shards process first */ // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES @@ -286,7 +291,8 @@ public class Worker implements Runnable { ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, - long failoverTimeMillis) { + long failoverTimeMillis, + ShardPrioritization shardPrioritization) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; this.streamConfig = streamConfig; @@ -308,6 +314,7 @@ public class Worker implements Runnable { executorService); this.taskBackoffTimeMillis = taskBackoffTimeMillis; this.failoverTimeMillis = failoverTimeMillis; + this.shardPrioritization = shardPrioritization; } /** @@ -449,12 +456,13 @@ public class Worker implements Runnable { private List getShardInfoForAssignments() { List assignedStreamShards = leaseCoordinator.getCurrentAssignments(); + List prioritizedShards = shardPrioritization.prioritize(assignedStreamShards); - if ((assignedStreamShards != null) && (!assignedStreamShards.isEmpty())) { + if ((prioritizedShards != null) && (!prioritizedShards.isEmpty())) { if (wlog.isInfoEnabled()) { StringBuilder builder = new StringBuilder(); boolean firstItem = true; - for (ShardInfo shardInfo : assignedStreamShards) { + for (ShardInfo shardInfo : prioritizedShards) { if (!firstItem) { builder.append(", "); } @@ -467,7 +475,7 @@ public class Worker implements Runnable { wlog.info("No activities assigned"); } - return assignedStreamShards; + return prioritizedShards; } /** @@ -780,6 +788,7 @@ public class Worker implements Runnable { private AmazonCloudWatch cloudWatchClient; private IMetricsFactory metricsFactory; private ExecutorService execService; + private ShardPrioritization shardPrioritization; /** * Default constructor. @@ -879,6 +888,19 @@ public class Worker implements Runnable { return this; } + /** + * Provides logic how to prioritize shard processing. + * + * @param shardPrioritization + * shardPrioritization is responsible to order shards before processing + * + * @return A reference to this updated object so that method calls can be chained together. + */ + public Builder shardPrioritization(ShardPrioritization shardPrioritization) { + this.shardPrioritization = shardPrioritization; + return this; + } + /** * Build the Worker instance. * @@ -937,6 +959,9 @@ public class Worker implements Runnable { if (metricsFactory == null) { metricsFactory = getMetricsFactory(cloudWatchClient, config); } + if (shardPrioritization == null) { + shardPrioritization = new ParentsFirstShardPrioritization(1); + } return new Worker(config.getApplicationName(), recordProcessorFactory, @@ -965,7 +990,8 @@ public class Worker implements Runnable { execService, metricsFactory, config.getTaskBackoffTimeMillis(), - config.getFailoverTimeMillis()); + config.getFailoverTimeMillis(), + shardPrioritization); } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTaskTest.java index 4c98701a..a42e0683 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTaskTest.java @@ -20,12 +20,11 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.List; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -47,7 +46,7 @@ public class BlockOnParentShardTaskTest { private final String shardId = "shardId-97"; private final String concurrencyToken = "testToken"; private final List emptyParentShardIds = new ArrayList(); - ShardInfo defaultShardInfo = new ShardInfo(shardId, concurrencyToken, emptyParentShardIds); + ShardInfo defaultShardInfo = new ShardInfo(shardId, concurrencyToken, emptyParentShardIds, ExtendedSequenceNumber.TRIM_HORIZON); /** * @throws java.lang.Exception @@ -122,14 +121,14 @@ public class BlockOnParentShardTaskTest { // test single parent parentShardIds.add(parent1ShardId); - shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds); + shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON); task = new BlockOnParentShardTask(shardInfo, leaseManager, backoffTimeInMillis); result = task.call(); Assert.assertNull(result.getException()); // test two parents parentShardIds.add(parent2ShardId); - shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds); + shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON); task = new BlockOnParentShardTask(shardInfo, leaseManager, backoffTimeInMillis); result = task.call(); Assert.assertNull(result.getException()); @@ -164,14 +163,14 @@ public class BlockOnParentShardTaskTest { // test single parent parentShardIds.add(parent1ShardId); - shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds); + shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON); task = new BlockOnParentShardTask(shardInfo, leaseManager, backoffTimeInMillis); result = task.call(); Assert.assertNotNull(result.getException()); // test two parents parentShardIds.add(parent2ShardId); - shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds); + shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON); task = new BlockOnParentShardTask(shardInfo, leaseManager, backoffTimeInMillis); result = task.call(); Assert.assertNotNull(result.getException()); @@ -191,7 +190,7 @@ public class BlockOnParentShardTaskTest { String parentShardId = "shardId-1"; List parentShardIds = new ArrayList<>(); parentShardIds.add(parentShardId); - ShardInfo shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds); + ShardInfo shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON); TaskResult result = null; KinesisClientLease parentLease = new KinesisClientLease(); ILeaseManager leaseManager = mock(ILeaseManager.class); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java index 556a1e0e..dd56a256 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java @@ -48,7 +48,7 @@ public class KinesisDataFetcherTest { private static final int MAX_RECORDS = 1; private static final String SHARD_ID = "shardId-1"; private static final String AT_SEQUENCE_NUMBER = ShardIteratorType.AT_SEQUENCE_NUMBER.toString(); - private static final ShardInfo SHARD_INFO = new ShardInfo(SHARD_ID, null, null); + private static final ShardInfo SHARD_INFO = new ShardInfo(SHARD_ID, null, null, null); private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST); private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritizationUnitTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritizationUnitTest.java new file mode 100644 index 00000000..35b56b32 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritizationUnitTest.java @@ -0,0 +1,162 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import org.junit.Test; + +public class ParentsFirstShardPrioritizationUnitTest { + + @Test(expected = IllegalArgumentException.class) + public void testMaxDepthNegativeShouldFail() { + new ParentsFirstShardPrioritization(-1); + } + + @Test(expected = IllegalArgumentException.class) + public void testMaxDepthZeroShouldFail() { + new ParentsFirstShardPrioritization(0); + } + + @Test + public void testMaxDepthPositiveShouldNotFail() { + new ParentsFirstShardPrioritization(1); + } + + @Test + public void testSorting() { + Random random = new Random(987654); + int numberOfShards = 7; + + List shardIdsDependencies = new ArrayList<>(); + shardIdsDependencies.add("unknown"); + List original = new ArrayList<>(); + for (int shardNumber = 0; shardNumber < numberOfShards; shardNumber++) { + String shardId = shardId(shardNumber); + original.add(shardInfo(shardId, shardIdsDependencies)); + shardIdsDependencies.add(shardId); + } + + ParentsFirstShardPrioritization ordering = new ParentsFirstShardPrioritization(Integer.MAX_VALUE); + + // shuffle original list as it is already ordered in right way + Collections.shuffle(original, random); + List ordered = ordering.prioritize(original); + + assertEquals(numberOfShards, ordered.size()); + for (int shardNumber = 0; shardNumber < numberOfShards; shardNumber++) { + String shardId = shardId(shardNumber); + assertEquals(shardId, ordered.get(shardNumber).getShardId()); + } + } + + @Test + public void testSortingAndFiltering() { + Random random = new Random(45677); + int numberOfShards = 10; + + List shardIdsDependencies = new ArrayList<>(); + shardIdsDependencies.add("unknown"); + List original = new ArrayList<>(); + for (int shardNumber = 0; shardNumber < numberOfShards; shardNumber++) { + String shardId = shardId(shardNumber); + original.add(shardInfo(shardId, shardIdsDependencies)); + shardIdsDependencies.add(shardId); + } + + int maxDepth = 3; + ParentsFirstShardPrioritization ordering = new ParentsFirstShardPrioritization(maxDepth); + + // shuffle original list as it is already ordered in right way + Collections.shuffle(original, random); + List ordered = ordering.prioritize(original); + // in this case every shard has its own level, so we don't expect to + // have more shards than max depth + assertEquals(maxDepth, ordered.size()); + + for (int shardNumber = 0; shardNumber < maxDepth; shardNumber++) { + String shardId = shardId(shardNumber); + assertEquals(shardId, ordered.get(shardNumber).getShardId()); + } + } + + @Test + public void testSimpleOrdering() { + Random random = new Random(1234); + int numberOfShards = 10; + + String parentId = "unknown"; + List original = new ArrayList<>(); + for (int shardNumber = 0; shardNumber < numberOfShards; shardNumber++) { + String shardId = shardId(shardNumber); + original.add(shardInfo(shardId, parentId)); + parentId = shardId; + } + + ParentsFirstShardPrioritization ordering = new ParentsFirstShardPrioritization(Integer.MAX_VALUE); + + // shuffle original list as it is already ordered in right way + Collections.shuffle(original, random); + List ordered = ordering.prioritize(original); + assertEquals(numberOfShards, ordered.size()); + for (int shardNumber = 0; shardNumber < numberOfShards; shardNumber++) { + String shardId = shardId(shardNumber); + assertEquals(shardId, ordered.get(shardNumber).getShardId()); + } + } + + /** + * This should be impossible as shards don't have circular dependencies, + * but this code should handle it properly and fail + */ + @Test + public void testCircularDependencyBetweenShards() { + Random random = new Random(13468798); + int numberOfShards = 10; + + // shard-0 will point in middle shard (shard-5) in current test + String parentId = shardId(numberOfShards / 2); + List original = new ArrayList<>(); + for (int shardNumber = 0; shardNumber < numberOfShards; shardNumber++) { + String shardId = shardId(shardNumber); + original.add(shardInfo(shardId, parentId)); + parentId = shardId; + } + + ParentsFirstShardPrioritization ordering = new ParentsFirstShardPrioritization(Integer.MAX_VALUE); + + // shuffle original list as it is already ordered in right way + Collections.shuffle(original, random); + try { + ordering.prioritize(original); + fail("Processing should fail in case we have circular dependency"); + } catch (IllegalArgumentException expected) { + + } + } + + private String shardId(int shardNumber) { + return "shardId-" + shardNumber; + } + + private static ShardInfo shardInfo(String shardId, List parentShardIds) { + // copy into new list just in case ShardInfo will stop doing it + List newParentShardIds = new ArrayList<>(parentShardIds); + return new ShardInfo.Builder() + .withShardId(shardId) + .withParentShards(newParentShardIds) + .build(); + } + + private static ShardInfo shardInfo(String shardId, String... parentShardIds) { + return new ShardInfo.Builder() + .withShardId(shardId) + .withParentShards(Arrays.asList(parentShardIds)) + .build(); + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java index 6576e47f..f1d908f0 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java @@ -87,7 +87,7 @@ public class ProcessTaskTest { new StreamConfig(null, maxRecords, idleTimeMillis, callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - final ShardInfo shardInfo = new ShardInfo(shardId, null, null); + final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); processTask = new ProcessTask( shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java index 4741ea14..d5f6b53f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java @@ -75,7 +75,7 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testCheckpoint() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null); + ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); // First call to checkpoint RecordProcessorCheckpointer processingCheckpointer = @@ -98,7 +98,7 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testCheckpointRecord() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null); + ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); SequenceNumberValidator sequenceNumberValidator = new SequenceNumberValidator(null, shardId, false); RecordProcessorCheckpointer processingCheckpointer = @@ -117,7 +117,7 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testCheckpointSubRecord() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null); + ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); SequenceNumberValidator sequenceNumberValidator = new SequenceNumberValidator(null, shardId, false); RecordProcessorCheckpointer processingCheckpointer = @@ -137,7 +137,7 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testCheckpointSequenceNumber() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null); + ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); SequenceNumberValidator sequenceNumberValidator = new SequenceNumberValidator(null, shardId, false); RecordProcessorCheckpointer processingCheckpointer = @@ -155,7 +155,7 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testCheckpointExtendedSequenceNumber() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null); + ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); SequenceNumberValidator sequenceNumberValidator = new SequenceNumberValidator(null, shardId, false); RecordProcessorCheckpointer processingCheckpointer = @@ -173,7 +173,7 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testUpdate() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null); + ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); RecordProcessorCheckpointer checkpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, null); @@ -193,7 +193,7 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testClientSpecifiedCheckpoint() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null); + ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); SequenceNumberValidator validator = mock(SequenceNumberValidator.class); Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); @@ -290,7 +290,7 @@ public class RecordProcessorCheckpointerTest { @SuppressWarnings("serial") @Test public final void testMixedCheckpointCalls() throws Exception { - ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null); + ShardInfo shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); SequenceNumberValidator validator = mock(SequenceNumberValidator.class); Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index d8d39377..26337381 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -92,7 +92,7 @@ public class ShardConsumerTest { @SuppressWarnings("unchecked") @Test public final void testInitializationStateUponFailure() throws Exception { - ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null); + ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); ICheckpoint checkpoint = mock(ICheckpoint.class); when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class); @@ -141,7 +141,7 @@ public class ShardConsumerTest { @SuppressWarnings("unchecked") @Test public final void testInitializationStateUponSubmissionFailure() throws Exception { - ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null); + ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); ICheckpoint checkpoint = mock(ICheckpoint.class); ExecutorService spyExecutorService = spy(executorService); @@ -189,7 +189,7 @@ public class ShardConsumerTest { @SuppressWarnings("unchecked") @Test public final void testRecordProcessorThrowable() throws Exception { - ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null); + ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); ICheckpoint checkpoint = mock(ICheckpoint.class); IRecordProcessor processor = mock(IRecordProcessor.class); IKinesisProxy streamProxy = mock(IKinesisProxy.class); @@ -289,7 +289,7 @@ public class ShardConsumerTest { callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null); + ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null); ShardConsumer consumer = new ShardConsumer(shardInfo, streamConfig, @@ -379,7 +379,7 @@ public class ShardConsumerTest { skipCheckpointValidationValue, atTimestamp); - ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null); + ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardConsumer consumer = new ShardConsumer(shardInfo, streamConfig, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfoTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfoTest.java index d62d880d..a2434d69 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfoTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfoTest.java @@ -20,11 +20,12 @@ import java.util.List; import java.util.Set; import java.util.UUID; -import junit.framework.Assert; - +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; + public class ShardInfoTest { private static final String CONCURRENCY_TOKEN = UUID.randomUUID().toString(); private static final String SHARD_ID = "shardId-test"; @@ -37,12 +38,12 @@ public class ShardInfoTest { parentShardIds.add("shard-1"); parentShardIds.add("shard-2"); - testShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds); + testShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST); } @Test public void testPacboyShardInfoEqualsWithSameArgs() { - ShardInfo equalShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds); + ShardInfo equalShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST); Assert.assertTrue("Equal should return true for arguments all the same", testShardInfo.equals(equalShardInfo)); } @@ -53,18 +54,18 @@ public class ShardInfoTest { @Test public void testPacboyShardInfoEqualsForShardId() { - ShardInfo diffShardInfo = new ShardInfo("shardId-diff", CONCURRENCY_TOKEN, parentShardIds); + ShardInfo diffShardInfo = new ShardInfo("shardId-diff", CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST); Assert.assertFalse("Equal should return false with different shard id", diffShardInfo.equals(testShardInfo)); - diffShardInfo = new ShardInfo(null, CONCURRENCY_TOKEN, parentShardIds); + diffShardInfo = new ShardInfo(null, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST); Assert.assertFalse("Equal should return false with null shard id", diffShardInfo.equals(testShardInfo)); } @Test public void testPacboyShardInfoEqualsForfToken() { - ShardInfo diffShardInfo = new ShardInfo(SHARD_ID, UUID.randomUUID().toString(), parentShardIds); + ShardInfo diffShardInfo = new ShardInfo(SHARD_ID, UUID.randomUUID().toString(), parentShardIds, ExtendedSequenceNumber.LATEST); Assert.assertFalse("Equal should return false with different concurrency token", diffShardInfo.equals(testShardInfo)); - diffShardInfo = new ShardInfo(SHARD_ID, null, parentShardIds); + diffShardInfo = new ShardInfo(SHARD_ID, null, parentShardIds, ExtendedSequenceNumber.LATEST); Assert.assertFalse("Equal should return false for null concurrency token", diffShardInfo.equals(testShardInfo)); } @@ -74,7 +75,7 @@ public class ShardInfoTest { differentlyOrderedParentShardIds.add("shard-2"); differentlyOrderedParentShardIds.add("shard-1"); ShardInfo shardInfoWithDifferentlyOrderedParentShardIds = - new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, differentlyOrderedParentShardIds); + new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, differentlyOrderedParentShardIds, ExtendedSequenceNumber.LATEST); Assert.assertTrue("Equal should return true even with parent shard Ids reordered", shardInfoWithDifferentlyOrderedParentShardIds.equals(testShardInfo)); } @@ -84,16 +85,24 @@ public class ShardInfoTest { Set diffParentIds = new HashSet<>(); diffParentIds.add("shard-3"); diffParentIds.add("shard-4"); - ShardInfo diffShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, diffParentIds); + ShardInfo diffShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, diffParentIds, ExtendedSequenceNumber.LATEST); Assert.assertFalse("Equal should return false with different parent shard Ids", diffShardInfo.equals(testShardInfo)); - diffShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, null); + diffShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, null, ExtendedSequenceNumber.LATEST); Assert.assertFalse("Equal should return false with null parent shard Ids", diffShardInfo.equals(testShardInfo)); } + @Test + public void testPacboyShardInfoEqualsForCheckpoint() { + ShardInfo diffShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.SHARD_END); + Assert.assertFalse("Equal should return false with different checkpoint", diffShardInfo.equals(testShardInfo)); + diffShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, null); + Assert.assertFalse("Equal should return false with null checkpoint", diffShardInfo.equals(testShardInfo)); + } + @Test public void testPacboyShardInfoSameHashCode() { - ShardInfo equalShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds); + ShardInfo equalShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST); Assert.assertTrue("Shard info objects should have same hashCode for the same arguments", equalShardInfo.hashCode() == testShardInfo.hashCode()); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index a2302ad0..67b42a0e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -20,10 +20,9 @@ import static org.mockito.Mockito.when; import java.util.HashSet; import java.util.Set; -import junit.framework.Assert; - import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -50,7 +49,8 @@ public class ShutdownTaskTest { String defaultShardId = "shardId-0000397840"; ShardInfo defaultShardInfo = new ShardInfo(defaultShardId, defaultConcurrencyToken, - defaultParentShardIds); + defaultParentShardIds, + ExtendedSequenceNumber.LATEST); IRecordProcessor defaultRecordProcessor = new TestStreamlet(); /** diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index f0b42671..a68c229b 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -105,6 +105,7 @@ public class WorkerTest { InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST); private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); + private final ShardPrioritization shardPrioritization = new NoOpShardPrioritization(); // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY = @@ -192,14 +193,15 @@ public class WorkerTest { execService, nullMetricsFactory, taskBackoffTimeMillis, - failoverTimeMillis); - ShardInfo shardInfo = new ShardInfo(dummyKinesisShardId, testConcurrencyToken, null); + failoverTimeMillis, + shardPrioritization); + ShardInfo shardInfo = new ShardInfo(dummyKinesisShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardConsumer consumer = worker.createOrGetShardConsumer(shardInfo, streamletFactory); Assert.assertNotNull(consumer); ShardConsumer consumer2 = worker.createOrGetShardConsumer(shardInfo, streamletFactory); Assert.assertSame(consumer, consumer2); ShardInfo shardInfoWithSameShardIdButDifferentConcurrencyToken = - new ShardInfo(dummyKinesisShardId, anotherConcurrencyToken, null); + new ShardInfo(dummyKinesisShardId, anotherConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardConsumer consumer3 = worker.createOrGetShardConsumer(shardInfoWithSameShardIdButDifferentConcurrencyToken, streamletFactory); Assert.assertNotNull(consumer3); @@ -241,12 +243,13 @@ public class WorkerTest { execService, nullMetricsFactory, taskBackoffTimeMillis, - failoverTimeMillis); + failoverTimeMillis, + shardPrioritization); - ShardInfo shardInfo1 = new ShardInfo(dummyKinesisShardId, concurrencyToken, null); + ShardInfo shardInfo1 = new ShardInfo(dummyKinesisShardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardInfo duplicateOfShardInfo1ButWithAnotherConcurrencyToken = - new ShardInfo(dummyKinesisShardId, anotherConcurrencyToken, null); - ShardInfo shardInfo2 = new ShardInfo(anotherDummyKinesisShardId, concurrencyToken, null); + new ShardInfo(dummyKinesisShardId, anotherConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); + ShardInfo shardInfo2 = new ShardInfo(anotherDummyKinesisShardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardConsumer consumerOfShardInfo1 = worker.createOrGetShardConsumer(shardInfo1, streamletFactory); ShardConsumer consumerOfDuplicateOfShardInfo1ButWithAnotherConcurrencyToken = @@ -297,7 +300,8 @@ public class WorkerTest { execService, nullMetricsFactory, taskBackoffTimeMillis, - failoverTimeMillis); + failoverTimeMillis, + shardPrioritization); worker.run(); Assert.assertTrue(count > 0); } @@ -745,7 +749,8 @@ public class WorkerTest { executorService, metricsFactory, taskBackoffTimeMillis, - failoverTimeMillis); + failoverTimeMillis, + shardPrioritization); WorkerThread workerThread = new WorkerThread(worker); workerThread.start();