diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/CheckpointValueComparator.java b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/CheckpointValueComparator.java index 4238f313..f8bafbff 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/CheckpointValueComparator.java +++ b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/CheckpointValueComparator.java @@ -18,6 +18,7 @@ import java.io.Serializable; import java.math.BigInteger; import java.util.Comparator; +import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.SentinelCheckpoint; /** @@ -87,7 +88,7 @@ class CheckpointValueComparator implements Comparator, Serializable { * @return a BigInteger value representation of the checkpointValue */ private static BigInteger bigIntegerValue(String checkpointValue) { - if (SequenceNumberValidator.isDigits(checkpointValue)) { + if (Checkpoint.SequenceNumberValidator.isDigits(checkpointValue)) { return new BigInteger(checkpointValue); } else if (SentinelCheckpoint.LATEST.toString().equals(checkpointValue)) { return LATEST_BIG_INTEGER_VALUE; @@ -106,7 +107,7 @@ class CheckpointValueComparator implements Comparator, Serializable { * @return true if and only if the string is all digits or one of the SentinelCheckpoint values */ private static boolean isDigitsOrSentinelValue(String string) { - return SequenceNumberValidator.isDigits(string) || isSentinelValue(string); + return Checkpoint.SequenceNumberValidator.isDigits(string) || isSentinelValue(string); } /** diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 072c0e4f..afa85119 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -23,6 +23,8 @@ import org.apache.commons.lang.Validate; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.regions.RegionUtils; +import software.amazon.kinesis.leases.NoOpShardPrioritization; +import software.amazon.kinesis.leases.ShardPrioritization; import software.amazon.kinesis.lifecycle.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.ProcessTask; import software.amazon.kinesis.lifecycle.ShardConsumer; diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java index 4df3e0ff..1dc5f869 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java @@ -19,6 +19,10 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; +import software.amazon.kinesis.checkpoint.Checkpoint; +import software.amazon.kinesis.checkpoint.DoesNothingPreparedCheckpointer; +import software.amazon.kinesis.checkpoint.PreparedCheckpointer; +import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.IPreparedCheckpointer; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; @@ -47,7 +51,7 @@ public class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer private ShardInfo shardInfo; - private SequenceNumberValidator sequenceNumberValidator; + private Checkpoint.SequenceNumberValidator sequenceNumberValidator; private ExtendedSequenceNumber sequenceNumberAtShardEnd; @@ -61,7 +65,7 @@ public class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer */ public RecordProcessorCheckpointer(ShardInfo shardInfo, ICheckpoint checkpoint, - SequenceNumberValidator validator, + Checkpoint.SequenceNumberValidator validator, IMetricsFactory metricsFactory) { this.shardInfo = shardInfo; this.checkpoint = checkpoint; diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java deleted file mode 100644 index 6e680696..00000000 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; - -import com.amazonaws.AmazonServiceException; -import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException; -import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; -import software.amazon.kinesis.retrieval.IKinesisProxy; -import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import com.amazonaws.services.kinesis.model.InvalidArgumentException; -import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; -import com.amazonaws.services.kinesis.model.ShardIteratorType; - -import lombok.extern.slf4j.Slf4j; - -/** - * This class provides some methods for validating sequence numbers. It provides a method - * {@link #validateSequenceNumber(String)} which validates a sequence number by attempting to get an iterator from - * Amazon Kinesis for that sequence number. (e.g. Before checkpointing a client provided sequence number in - * {@link RecordProcessorCheckpointer#checkpoint(String)} to prevent invalid sequence numbers from being checkpointed, - * which could prevent another shard consumer instance from processing the shard later on). This class also provides a - * utility function {@link #isDigits(String)} which is used to check whether a string is all digits - */ -@Slf4j -public class SequenceNumberValidator { - private IKinesisProxy proxy; - private String shardId; - private boolean validateWithGetIterator; - private static final int SERVER_SIDE_ERROR_CODE = 500; - - /** - * Constructor. - * - * @param proxy Kinesis proxy to be used for getIterator call - * @param shardId ShardId to check with sequence numbers - * @param validateWithGetIterator Whether to attempt to get an iterator for this shard id and the sequence numbers - * being validated - */ - public SequenceNumberValidator(IKinesisProxy proxy, String shardId, boolean validateWithGetIterator) { - this.proxy = proxy; - this.shardId = shardId; - this.validateWithGetIterator = validateWithGetIterator; - } - - /** - * Validates the sequence number by attempting to get an iterator from Amazon Kinesis. Repackages exceptions from - * Amazon Kinesis into the appropriate KCL exception to allow clients to determine exception handling strategies - * - * @param sequenceNumber The sequence number to be validated. Must be a numeric string - * @throws IllegalArgumentException Thrown when sequence number validation fails. - * @throws ThrottlingException Thrown when GetShardIterator returns a ProvisionedThroughputExceededException which - * indicates that too many getIterator calls are being made for this shard. - * @throws KinesisClientLibDependencyException Thrown when a service side error is received. This way clients have - * the option of retrying - */ - void validateSequenceNumber(String sequenceNumber) - throws IllegalArgumentException, ThrottlingException, KinesisClientLibDependencyException { - boolean atShardEnd = ExtendedSequenceNumber.SHARD_END.getSequenceNumber().equals(sequenceNumber); - - if (!atShardEnd && !isDigits(sequenceNumber)) { - log.info("Sequence number must be numeric, but was {}", sequenceNumber); - throw new IllegalArgumentException("Sequence number must be numeric, but was " + sequenceNumber); - } - try { - if (!atShardEnd &&validateWithGetIterator) { - proxy.getIterator(shardId, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), sequenceNumber); - log.info("Validated sequence number {} with shard id {}", sequenceNumber, shardId); - } - } catch (InvalidArgumentException e) { - log.info("Sequence number {} is invalid for shard {}", sequenceNumber, shardId, e); - throw new IllegalArgumentException("Sequence number " + sequenceNumber + " is invalid for shard " - + shardId, e); - } catch (ProvisionedThroughputExceededException e) { - // clients should have back off logic in their checkpoint logic - log.info("Exceeded throughput while getting an iterator for shard {}", shardId, e); - throw new ThrottlingException("Exceeded throughput while getting an iterator for shard " + shardId, e); - } catch (AmazonServiceException e) { - log.info("Encountered service exception while getting an iterator for shard {}", shardId, e); - if (e.getStatusCode() >= SERVER_SIDE_ERROR_CODE) { - // clients can choose whether to retry in their checkpoint logic - throw new KinesisClientLibDependencyException("Encountered service exception while getting an iterator" - + " for shard " + shardId, e); - } - // Just throw any other exceptions, e.g. 400 errors caused by the client - throw e; - } - } - - void validateSequenceNumber(ExtendedSequenceNumber checkpoint) - throws IllegalArgumentException, ThrottlingException, KinesisClientLibDependencyException { - validateSequenceNumber(checkpoint.getSequenceNumber()); - if (checkpoint.getSubSequenceNumber() < 0) { - throw new IllegalArgumentException("SubSequence number must be non-negative, but was " - + checkpoint.getSubSequenceNumber()); - } - } - - /** - * Checks if the string is composed of only digits. - * - * @param string - * @return true for a string of all digits, false otherwise (including false for null and empty string) - */ - static boolean isDigits(String string) { - if (string == null || string.length() == 0) { - return false; - } - for (int i = 0; i < string.length(); i++) { - if (!Character.isDigit(string.charAt(i))) { - return false; - } - } - return true; - } -} diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/V1ToV2RecordProcessorFactoryAdapter.java b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/V1ToV2RecordProcessorFactoryAdapter.java deleted file mode 100644 index fa1cbcd1..00000000 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/V1ToV2RecordProcessorFactoryAdapter.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; - -import software.amazon.kinesis.processor.v2.IRecordProcessorFactory; -import software.amazon.kinesis.processor.v2.IRecordProcessor; - -/** - * Adapts a V1 {@link software.amazon.kinesis.processor.IRecordProcessorFactory - * IRecordProcessorFactory} to V2 - * {@link IRecordProcessorFactory IRecordProcessorFactory}. - */ -class V1ToV2RecordProcessorFactoryAdapter implements IRecordProcessorFactory { - - private software.amazon.kinesis.processor.IRecordProcessorFactory factory; - - V1ToV2RecordProcessorFactoryAdapter( - software.amazon.kinesis.processor.IRecordProcessorFactory factory) { - this.factory = factory; - } - - @Override - public IRecordProcessor createProcessor() { - return new V1ToV2RecordProcessorAdapter(factory.createProcessor()); - } -} diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index dbd999d3..4b6ac7d6 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -40,10 +40,18 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClient; +import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator; +import software.amazon.kinesis.leases.ParentsFirstShardPrioritization; +import software.amazon.kinesis.leases.ShardInfo; +import software.amazon.kinesis.leases.ShardPrioritization; +import software.amazon.kinesis.leases.ShardSyncTask; +import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.lifecycle.ShardConsumer; import software.amazon.kinesis.lifecycle.ShutdownReason; import software.amazon.kinesis.lifecycle.TaskResult; +import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; import software.amazon.kinesis.processor.ICheckpoint; +import software.amazon.kinesis.processor.V1ToV2RecordProcessorFactoryAdapter; import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.processor.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.v2.IShutdownNotificationAware; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java index 8fdb9aab..363db9a0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/Checkpoint.java @@ -14,6 +14,15 @@ */ package software.amazon.kinesis.checkpoint; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.model.InvalidArgumentException; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.retrieval.IKinesisProxy; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import lombok.Data; @@ -38,4 +47,105 @@ import lombok.Data; this.checkpoint = checkpoint; this.pendingCheckpoint = pendingCheckpoint; } + + /** + * This class provides some methods for validating sequence numbers. It provides a method + * {@link #validateSequenceNumber(String)} which validates a sequence number by attempting to get an iterator from + * Amazon Kinesis for that sequence number. (e.g. Before checkpointing a client provided sequence number in + * {@link RecordProcessorCheckpointer#checkpoint(String)} to prevent invalid sequence numbers from being checkpointed, + * which could prevent another shard consumer instance from processing the shard later on). This class also provides a + * utility function {@link #isDigits(String)} which is used to check whether a string is all digits + */ + @Slf4j + public static class SequenceNumberValidator { + private IKinesisProxy proxy; + private String shardId; + private boolean validateWithGetIterator; + private static final int SERVER_SIDE_ERROR_CODE = 500; + + /** + * Constructor. + * + * @param proxy Kinesis proxy to be used for getIterator call + * @param shardId ShardId to check with sequence numbers + * @param validateWithGetIterator Whether to attempt to get an iterator for this shard id and the sequence numbers + * being validated + */ + public SequenceNumberValidator(IKinesisProxy proxy, String shardId, boolean validateWithGetIterator) { + this.proxy = proxy; + this.shardId = shardId; + this.validateWithGetIterator = validateWithGetIterator; + } + + /** + * Validates the sequence number by attempting to get an iterator from Amazon Kinesis. Repackages exceptions from + * Amazon Kinesis into the appropriate KCL exception to allow clients to determine exception handling strategies + * + * @param sequenceNumber The sequence number to be validated. Must be a numeric string + * @throws IllegalArgumentException Thrown when sequence number validation fails. + * @throws ThrottlingException Thrown when GetShardIterator returns a ProvisionedThroughputExceededException which + * indicates that too many getIterator calls are being made for this shard. + * @throws KinesisClientLibDependencyException Thrown when a service side error is received. This way clients have + * the option of retrying + */ + public void validateSequenceNumber(String sequenceNumber) + throws IllegalArgumentException, ThrottlingException, KinesisClientLibDependencyException { + boolean atShardEnd = ExtendedSequenceNumber.SHARD_END.getSequenceNumber().equals(sequenceNumber); + + if (!atShardEnd && !isDigits(sequenceNumber)) { + SequenceNumberValidator.log.info("Sequence number must be numeric, but was {}", sequenceNumber); + throw new IllegalArgumentException("Sequence number must be numeric, but was " + sequenceNumber); + } + try { + if (!atShardEnd &&validateWithGetIterator) { + proxy.getIterator(shardId, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), sequenceNumber); + SequenceNumberValidator.log.info("Validated sequence number {} with shard id {}", sequenceNumber, shardId); + } + } catch (InvalidArgumentException e) { + SequenceNumberValidator.log.info("Sequence number {} is invalid for shard {}", sequenceNumber, shardId, e); + throw new IllegalArgumentException("Sequence number " + sequenceNumber + " is invalid for shard " + + shardId, e); + } catch (ProvisionedThroughputExceededException e) { + // clients should have back off logic in their checkpoint logic + SequenceNumberValidator.log.info("Exceeded throughput while getting an iterator for shard {}", shardId, e); + throw new ThrottlingException("Exceeded throughput while getting an iterator for shard " + shardId, e); + } catch (AmazonServiceException e) { + SequenceNumberValidator.log.info("Encountered service exception while getting an iterator for shard {}", shardId, e); + if (e.getStatusCode() >= SERVER_SIDE_ERROR_CODE) { + // clients can choose whether to retry in their checkpoint logic + throw new KinesisClientLibDependencyException("Encountered service exception while getting an iterator" + + " for shard " + shardId, e); + } + // Just throw any other exceptions, e.g. 400 errors caused by the client + throw e; + } + } + + void validateSequenceNumber(ExtendedSequenceNumber checkpoint) + throws IllegalArgumentException, ThrottlingException, KinesisClientLibDependencyException { + validateSequenceNumber(checkpoint.getSequenceNumber()); + if (checkpoint.getSubSequenceNumber() < 0) { + throw new IllegalArgumentException("SubSequence number must be non-negative, but was " + + checkpoint.getSubSequenceNumber()); + } + } + + /** + * Checks if the string is composed of only digits. + * + * @param string + * @return true for a string of all digits, false otherwise (including false for null and empty string) + */ + public static boolean isDigits(String string) { + if (string == null || string.length() == 0) { + return false; + } + for (int i = 0; i < string.length(); i++) { + if (!Character.isDigit(string.charAt(i))) { + return false; + } + } + return true; + } + } } diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DoesNothingPreparedCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/DoesNothingPreparedCheckpointer.java similarity index 97% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DoesNothingPreparedCheckpointer.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/DoesNothingPreparedCheckpointer.java index a6083bc7..e26386e1 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DoesNothingPreparedCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/DoesNothingPreparedCheckpointer.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.checkpoint; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException; diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PreparedCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/PreparedCheckpointer.java similarity index 97% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PreparedCheckpointer.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/PreparedCheckpointer.java index 3e9544df..91c92c52 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PreparedCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/PreparedCheckpointer.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.checkpoint; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException; diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinator.java similarity index 91% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinator.java index 4d0bf6dc..6151d83a 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinator.java @@ -1,18 +1,18 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.leases; import java.util.ArrayList; import java.util.Collection; @@ -33,9 +33,6 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; -import software.amazon.kinesis.leases.KinesisClientLease; -import software.amazon.kinesis.leases.LeaseCoordinator; -import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.metrics.IMetricsFactory; import lombok.extern.slf4j.Slf4j; @@ -44,7 +41,7 @@ import lombok.extern.slf4j.Slf4j; * This class is used to coordinate/manage leases owned by this worker process and to get/set checkpoints. */ @Slf4j -class KinesisClientLibLeaseCoordinator extends LeaseCoordinator implements ICheckpoint { +public class KinesisClientLibLeaseCoordinator extends LeaseCoordinator implements ICheckpoint { private static final long DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10L; private static final long DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L; @@ -144,7 +141,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator getLeaseManager() { + public ILeaseManager getLeaseManager() { return leaseManager; } diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NoOpShardPrioritization.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/NoOpShardPrioritization.java similarity index 94% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NoOpShardPrioritization.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/NoOpShardPrioritization.java index 59a42199..ec93d764 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NoOpShardPrioritization.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/NoOpShardPrioritization.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.leases; import java.util.List; diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritization.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ParentsFirstShardPrioritization.java similarity index 98% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritization.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ParentsFirstShardPrioritization.java index 8e211eef..d4794a30 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritization.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ParentsFirstShardPrioritization.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.leases; import java.util.ArrayList; import java.util.Collections; diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java similarity index 97% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java index e15b920a..0ed97993 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.leases; import java.util.Collection; import java.util.Collections; @@ -95,7 +95,7 @@ public class ShardInfo { * * @return completion status of the shard */ - protected boolean isCompleted() { + public boolean isCompleted() { return ExtendedSequenceNumber.SHARD_END.equals(checkpoint); } diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardPrioritization.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardPrioritization.java similarity index 94% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardPrioritization.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardPrioritization.java index 442c37dd..11b7586a 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardPrioritization.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardPrioritization.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.leases; import java.util.List; diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java similarity index 71% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java index e8de2aa5..af4e43a3 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java @@ -1,25 +1,24 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.leases; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import software.amazon.kinesis.lifecycle.ITask; import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.lifecycle.TaskType; import software.amazon.kinesis.retrieval.IKinesisProxy; -import software.amazon.kinesis.leases.KinesisClientLease; -import software.amazon.kinesis.leases.ILeaseManager; import lombok.extern.slf4j.Slf4j; @@ -30,7 +29,7 @@ import lombok.extern.slf4j.Slf4j; * cleanupLeasesUponShardCompletion is true). */ @Slf4j -class ShardSyncTask implements ITask { +public class ShardSyncTask implements ITask { private final IKinesisProxy kinesisProxy; private final ILeaseManager leaseManager; private InitialPositionInStreamExtended initialPosition; @@ -46,12 +45,12 @@ class ShardSyncTask implements ITask { * start processing records from this point in the stream (when an application starts up for the first time) * except for shards that already have a checkpoint (and their descendant shards). */ - ShardSyncTask(IKinesisProxy kinesisProxy, - ILeaseManager leaseManager, - InitialPositionInStreamExtended initialPositionInStream, - boolean cleanupLeasesUponShardCompletion, - boolean ignoreUnexpectedChildShards, - long shardSyncTaskIdleTimeMillis) { + public ShardSyncTask(IKinesisProxy kinesisProxy, + ILeaseManager leaseManager, + InitialPositionInStreamExtended initialPositionInStream, + boolean cleanupLeasesUponShardCompletion, + boolean ignoreUnexpectedChildShards, + long shardSyncTaskIdleTimeMillis) { this.kinesisProxy = kinesisProxy; this.leaseManager = leaseManager; this.initialPosition = initialPositionInStream; diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java similarity index 73% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java index ccdb02ee..e086a543 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java @@ -1,29 +1,29 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.leases; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import software.amazon.kinesis.lifecycle.ITask; import software.amazon.kinesis.lifecycle.TaskResult; +import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; import software.amazon.kinesis.retrieval.IKinesisProxy; -import software.amazon.kinesis.leases.KinesisClientLease; -import software.amazon.kinesis.leases.ILeaseManager; import software.amazon.kinesis.metrics.IMetricsFactory; import lombok.extern.slf4j.Slf4j; @@ -34,7 +34,7 @@ import lombok.extern.slf4j.Slf4j; * Worker will use this class to kick off a sync task when it finds shards which have been completely processed. */ @Slf4j -class ShardSyncTaskManager { +public class ShardSyncTaskManager { private ITask currentTask; private Future future; private final IKinesisProxy kinesisProxy; @@ -60,14 +60,14 @@ class ShardSyncTaskManager { * @param metricsFactory Metrics factory * @param executorService ExecutorService to execute the shard sync tasks */ - ShardSyncTaskManager(final IKinesisProxy kinesisProxy, - final ILeaseManager leaseManager, - final InitialPositionInStreamExtended initialPositionInStream, - final boolean cleanupLeasesUponShardCompletion, - final boolean ignoreUnexpectedChildShards, - final long shardSyncIdleTimeMillis, - final IMetricsFactory metricsFactory, - ExecutorService executorService) { + public ShardSyncTaskManager(final IKinesisProxy kinesisProxy, + final ILeaseManager leaseManager, + final InitialPositionInStreamExtended initialPositionInStream, + final boolean cleanupLeasesUponShardCompletion, + final boolean ignoreUnexpectedChildShards, + final long shardSyncIdleTimeMillis, + final IMetricsFactory metricsFactory, + ExecutorService executorService) { this.kinesisProxy = kinesisProxy; this.leaseManager = leaseManager; this.metricsFactory = metricsFactory; @@ -78,7 +78,7 @@ class ShardSyncTaskManager { this.initialPositionInStream = initialPositionInStream; } - synchronized boolean syncShardAndLeaseInfo(Set closedShardIds) { + public synchronized boolean syncShardAndLeaseInfo(Set closedShardIds) { return checkAndSubmitNextTask(closedShardIds); } diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncer.java similarity index 98% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncer.java index 899830b3..20a2f624 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncer.java @@ -1,18 +1,18 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.leases; import java.io.Serializable; import java.math.BigInteger; @@ -26,6 +26,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import org.apache.commons.lang.StringUtils; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException; @@ -771,7 +773,7 @@ public class ShardSyncer { * @param shard * @return */ - static KinesisClientLease newKCLLease(Shard shard) { + public static KinesisClientLease newKCLLease(Shard shard) { KinesisClientLease newLease = new KinesisClientLease(); newLease.setLeaseKey(shard.getShardId()); List parentShardIds = new ArrayList(2); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java index 56ed6699..3848ec61 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java @@ -15,7 +15,7 @@ package software.amazon.kinesis.lifecycle; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.ILeaseManager; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java index 8bb45739..09dcc3d3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java @@ -15,7 +15,7 @@ package software.amazon.kinesis.lifecycle; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import software.amazon.kinesis.leases.ShardInfo; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig; import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.v2.IRecordProcessor; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index 2661b831..5e3a7b39 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -20,7 +20,7 @@ import java.util.ListIterator; import com.amazonaws.services.cloudwatch.model.StandardUnit; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import software.amazon.kinesis.leases.ShardInfo; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ThrottlingReporter; import software.amazon.kinesis.processor.v2.IRecordProcessor; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index f629783f..ee923dfe 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -22,10 +22,10 @@ import java.util.concurrent.RejectedExecutionException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator; +import software.amazon.kinesis.checkpoint.Checkpoint; +import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.SequenceNumberValidator; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import software.amazon.kinesis.leases.ShardInfo; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownNotification; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig; import software.amazon.kinesis.processor.ICheckpoint; @@ -175,7 +175,7 @@ public class ShardConsumer { new RecordProcessorCheckpointer( shardInfo, checkpoint, - new SequenceNumberValidator( + new Checkpoint.SequenceNumberValidator( streamConfig.getStreamProxy(), shardInfo.getShardId(), streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownNotificationTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownNotificationTask.java index 7379687a..34c1171f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownNotificationTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownNotificationTask.java @@ -14,7 +14,7 @@ */ package software.amazon.kinesis.lifecycle; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import software.amazon.kinesis.leases.ShardInfo; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownNotification; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import software.amazon.kinesis.processor.v2.IRecordProcessor; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index ed7d6956..81002552 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -16,8 +16,8 @@ package software.amazon.kinesis.lifecycle; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncer; +import software.amazon.kinesis.leases.ShardInfo; +import software.amazon.kinesis.leases.ShardSyncer; import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.IKinesisProxy; diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsCollectingTaskDecorator.java similarity index 72% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsCollectingTaskDecorator.java index 9a171e55..2e95acfb 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsCollectingTaskDecorator.java @@ -1,18 +1,18 @@ /* - * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.metrics; import software.amazon.kinesis.lifecycle.ITask; import software.amazon.kinesis.lifecycle.TaskResult; @@ -71,7 +71,7 @@ public class MetricsCollectingTaskDecorator implements ITask { return this.getClass().getName() + "<" + other.getTaskType() + ">(" + other + ")"; } - ITask getOther() { + public ITask getOther() { return other; } } diff --git a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/V1ToV2RecordProcessorAdapter.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/V1ToV2RecordProcessorAdapter.java similarity index 67% rename from amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/V1ToV2RecordProcessorAdapter.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/V1ToV2RecordProcessorAdapter.java index 840ea50a..7ebbd9d1 100644 --- a/amazon-kinesis-client/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/V1ToV2RecordProcessorAdapter.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/V1ToV2RecordProcessorAdapter.java @@ -1,18 +1,18 @@ /* - * Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.processor; import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.lifecycle.InitializationInput; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/V1ToV2RecordProcessorFactoryAdapter.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/V1ToV2RecordProcessorFactoryAdapter.java new file mode 100644 index 00000000..99a4df85 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/V1ToV2RecordProcessorFactoryAdapter.java @@ -0,0 +1,38 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.processor; + +import software.amazon.kinesis.processor.v2.IRecordProcessorFactory; +import software.amazon.kinesis.processor.v2.IRecordProcessor; + +/** + * Adapts a V1 {@link software.amazon.kinesis.processor.IRecordProcessorFactory + * IRecordProcessorFactory} to V2 + * {@link IRecordProcessorFactory IRecordProcessorFactory}. + */ +public class V1ToV2RecordProcessorFactoryAdapter implements IRecordProcessorFactory { + + private software.amazon.kinesis.processor.IRecordProcessorFactory factory; + + public V1ToV2RecordProcessorFactoryAdapter( + software.amazon.kinesis.processor.IRecordProcessorFactory factory) { + this.factory = factory; + } + + @Override + public IRecordProcessor createProcessor() { + return new V1ToV2RecordProcessorAdapter(factory.createProcessor()); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisDataFetcher.java index 122cebdc..0459df68 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisDataFetcher.java @@ -18,13 +18,10 @@ import java.util.Collections; import java.util.Date; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import software.amazon.kinesis.leases.ShardInfo; import org.apache.commons.lang.StringUtils; import software.amazon.kinesis.checkpoint.SentinelCheckpoint; -import software.amazon.kinesis.retrieval.DataFetcherResult; -import software.amazon.kinesis.retrieval.IKinesisProxy; -import software.amazon.kinesis.retrieval.MetricsCollectingKinesisProxyDecorator; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.ResourceNotFoundException; diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java index daee9980..6207f967 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java +++ b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java @@ -36,6 +36,7 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; import org.mockito.verification.VerificationMode; +import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.lifecycle.ShardConsumer; @RunWith(MockitoJUnitRunner.class) diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TestStreamlet.java b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TestStreamlet.java index 60f60ed7..af834448 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TestStreamlet.java +++ b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TestStreamlet.java @@ -26,6 +26,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; +import software.amazon.kinesis.leases.ShardSequenceVerifier; import software.amazon.kinesis.lifecycle.ShutdownReason; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import software.amazon.kinesis.processor.v2.IRecordProcessor; diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TestStreamletFactory.java b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TestStreamletFactory.java index 113c267b..ef18a077 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TestStreamletFactory.java +++ b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TestStreamletFactory.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Semaphore; +import software.amazon.kinesis.leases.ShardSequenceVerifier; import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.processor.v2.IRecordProcessorFactory; diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 7cf1d7b0..ec4e4324 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -82,6 +82,13 @@ import org.mockito.stubbing.Answer; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException; +import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator; +import software.amazon.kinesis.leases.NoOpShardPrioritization; +import software.amazon.kinesis.leases.ShardInfo; +import software.amazon.kinesis.leases.ShardObjectHelper; +import software.amazon.kinesis.leases.ShardPrioritization; +import software.amazon.kinesis.leases.ShardSequenceVerifier; +import software.amazon.kinesis.leases.ShardSyncer; import software.amazon.kinesis.lifecycle.BlockOnParentShardTask; import software.amazon.kinesis.lifecycle.ITask; import software.amazon.kinesis.lifecycle.InitializeTask; @@ -91,8 +98,10 @@ import software.amazon.kinesis.lifecycle.ShutdownReason; import software.amazon.kinesis.lifecycle.ShutdownTask; import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.lifecycle.TaskType; +import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; +import software.amazon.kinesis.processor.V1ToV2RecordProcessorFactoryAdapter; import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.processor.v2.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerCWMetricsFactory; diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PreparedCheckpointerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/PreparedCheckpointerTest.java similarity index 71% rename from amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PreparedCheckpointerTest.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/PreparedCheckpointerTest.java index 799e39c9..99441a6d 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PreparedCheckpointerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/PreparedCheckpointerTest.java @@ -1,5 +1,21 @@ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.kinesis.checkpoint; +import software.amazon.kinesis.checkpoint.DoesNothingPreparedCheckpointer; +import software.amazon.kinesis.checkpoint.PreparedCheckpointer; import software.amazon.kinesis.processor.IPreparedCheckpointer; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/RecordProcessorCheckpointerTest.java similarity index 97% rename from amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/RecordProcessorCheckpointerTest.java index eb897975..396fd1a4 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/RecordProcessorCheckpointerTest.java @@ -1,18 +1,18 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.checkpoint; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -26,6 +26,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map.Entry; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer; +import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.metrics.IMetricsScope; import org.junit.After; import org.junit.Assert; @@ -39,7 +41,6 @@ import org.mockito.runners.MockitoJUnitRunner; import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.IPreparedCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheckpointImpl; -import software.amazon.kinesis.checkpoint.SentinelCheckpoint; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.UserRecord; import software.amazon.kinesis.metrics.MetricsHelper; @@ -57,7 +58,7 @@ public class RecordProcessorCheckpointerTest { private String testConcurrencyToken = "testToken"; private ICheckpoint checkpoint; private ShardInfo shardInfo; - private SequenceNumberValidator sequenceNumberValidator; + private Checkpoint.SequenceNumberValidator sequenceNumberValidator; private String shardId = "shardId-123"; @Mock @@ -74,7 +75,7 @@ public class RecordProcessorCheckpointerTest { Assert.assertEquals(this.startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId)); shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - sequenceNumberValidator = new SequenceNumberValidator(null, shardId, false); + sequenceNumberValidator = new Checkpoint.SequenceNumberValidator(null, shardId, false); } /** @@ -429,7 +430,7 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testClientSpecifiedCheckpoint() throws Exception { - SequenceNumberValidator validator = mock(SequenceNumberValidator.class); + Checkpoint.SequenceNumberValidator validator = mock(Checkpoint.SequenceNumberValidator.class); Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); RecordProcessorCheckpointer processingCheckpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory); @@ -517,7 +518,7 @@ public class RecordProcessorCheckpointerTest { */ @Test public final void testClientSpecifiedTwoPhaseCheckpoint() throws Exception { - SequenceNumberValidator validator = mock(SequenceNumberValidator.class); + Checkpoint.SequenceNumberValidator validator = mock(Checkpoint.SequenceNumberValidator.class); Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); RecordProcessorCheckpointer processingCheckpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory); @@ -643,7 +644,7 @@ public class RecordProcessorCheckpointerTest { @SuppressWarnings("serial") @Test public final void testMixedCheckpointCalls() throws Exception { - SequenceNumberValidator validator = mock(SequenceNumberValidator.class); + Checkpoint.SequenceNumberValidator validator = mock(Checkpoint.SequenceNumberValidator.class); Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); for (LinkedHashMap testPlan : getMixedCallsTestPlan()) { @@ -663,7 +664,7 @@ public class RecordProcessorCheckpointerTest { @SuppressWarnings("serial") @Test public final void testMixedTwoPhaseCheckpointCalls() throws Exception { - SequenceNumberValidator validator = mock(SequenceNumberValidator.class); + Checkpoint.SequenceNumberValidator validator = mock(Checkpoint.SequenceNumberValidator.class); Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); for (LinkedHashMap testPlan : getMixedCallsTestPlan()) { @@ -684,7 +685,7 @@ public class RecordProcessorCheckpointerTest { @SuppressWarnings("serial") @Test public final void testMixedTwoPhaseCheckpointCalls2() throws Exception { - SequenceNumberValidator validator = mock(SequenceNumberValidator.class); + Checkpoint.SequenceNumberValidator validator = mock(Checkpoint.SequenceNumberValidator.class); Mockito.doNothing().when(validator).validateSequenceNumber(anyString()); for (LinkedHashMap testPlan : getMixedCallsTestPlan()) { diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidatorTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/SequenceNumberValidatorTest.java similarity index 79% rename from amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidatorTest.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/SequenceNumberValidatorTest.java index 7817b2ec..10b6f51a 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidatorTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/SequenceNumberValidatorTest.java @@ -1,18 +1,18 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.checkpoint; import junit.framework.Assert; @@ -21,6 +21,7 @@ import org.mockito.Mockito; import static org.junit.Assert.fail; +import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.SentinelCheckpoint; import software.amazon.kinesis.retrieval.IKinesisProxy; import com.amazonaws.services.kinesis.model.InvalidArgumentException; @@ -36,7 +37,7 @@ public class SequenceNumberValidatorTest { IKinesisProxy proxy = Mockito.mock(IKinesisProxy.class); - SequenceNumberValidator validator = new SequenceNumberValidator(proxy, shardId, validateWithGetIterator); + Checkpoint.SequenceNumberValidator validator = new Checkpoint.SequenceNumberValidator(proxy, shardId, validateWithGetIterator); String goodSequence = "456"; String iterator = "happyiterator"; @@ -69,7 +70,7 @@ public class SequenceNumberValidatorTest { public final void testNoValidation() { IKinesisProxy proxy = Mockito.mock(IKinesisProxy.class); String shardId = "shardid-123"; - SequenceNumberValidator validator = new SequenceNumberValidator(proxy, shardId, !validateWithGetIterator); + Checkpoint.SequenceNumberValidator validator = new Checkpoint.SequenceNumberValidator(proxy, shardId, !validateWithGetIterator); String goodSequence = "456"; // Just checking that the false flag for validating against getIterator is honored @@ -82,7 +83,7 @@ public class SequenceNumberValidatorTest { nonNumericValueValidationTest(validator, proxy, !validateWithGetIterator); } - private void nonNumericValueValidationTest(SequenceNumberValidator validator, + private void nonNumericValueValidationTest(Checkpoint.SequenceNumberValidator validator, IKinesisProxy proxy, boolean validateWithGetIterator) { @@ -115,7 +116,7 @@ public class SequenceNumberValidatorTest { }; for (String digits : stringsOfDigits) { Assert.assertTrue("Expected that " + digits + " would be considered a string of digits.", - SequenceNumberValidator.isDigits(digits)); + Checkpoint.SequenceNumberValidator.isDigits(digits)); } // Check things that are not all digits String[] stringsWithNonDigits = { @@ -133,7 +134,7 @@ public class SequenceNumberValidatorTest { }; for (String notAllDigits : stringsWithNonDigits) { Assert.assertFalse("Expected that " + notAllDigits + " would not be considered a string of digits.", - SequenceNumberValidator.isDigits(notAllDigits)); + Checkpoint.SequenceNumberValidator.isDigits(notAllDigits)); } } } diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseManager.java similarity index 90% rename from amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseManager.java index 16b4a29a..891bebd7 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseManager.java @@ -1,18 +1,18 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.leases; import java.util.Arrays; import java.util.List; @@ -30,7 +30,7 @@ import lombok.extern.slf4j.Slf4j; * */ @Slf4j -class ExceptionThrowingLeaseManager implements ILeaseManager { +public class ExceptionThrowingLeaseManager implements ILeaseManager { private static final Throwable EXCEPTION_MSG = new Throwable("Test Exception"); // Use array below to control in what situations we want to throw exceptions. @@ -39,7 +39,7 @@ class ExceptionThrowingLeaseManager implements ILeaseManager /** * Methods which we support (simulate exceptions). */ - enum ExceptionThrowingLeaseManagerMethods { + public enum ExceptionThrowingLeaseManagerMethods { CREATELEASETABLEIFNOTEXISTS(0), LEASETABLEEXISTS(1), WAITUNTILLEASETABLEEXISTS(2), diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinatorIntegrationTest.java similarity index 91% rename from amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorIntegrationTest.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinatorIntegrationTest.java index 1cc286c7..889b1932 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinatorIntegrationTest.java @@ -1,18 +1,18 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.leases; import java.util.ArrayList; import java.util.Collections; @@ -35,10 +35,6 @@ import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.LeasingException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; -import software.amazon.kinesis.leases.KinesisClientLease; -import software.amazon.kinesis.leases.KinesisClientLeaseManager; -import software.amazon.kinesis.leases.Lease; -import software.amazon.kinesis.leases.ILeaseRenewer; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinatorTest.java similarity index 79% rename from amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorTest.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinatorTest.java index 095e6647..5a231f80 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisClientLibLeaseCoordinatorTest.java @@ -1,18 +1,18 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.leases; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doReturn; @@ -28,6 +28,7 @@ import org.mockito.MockitoAnnotations; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; +import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritizationUnitTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ParentsFirstShardPrioritizationUnitTest.java similarity index 99% rename from amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritizationUnitTest.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ParentsFirstShardPrioritizationUnitTest.java index e0ab7efe..9ca5df70 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ParentsFirstShardPrioritizationUnitTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ParentsFirstShardPrioritizationUnitTest.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.leases; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfoTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardInfoTest.java similarity index 89% rename from amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfoTest.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardInfoTest.java index 7a4c91ea..87facf0e 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfoTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardInfoTest.java @@ -1,18 +1,18 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.leases; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -28,6 +28,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; public class ShardInfoTest { diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardObjectHelper.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java similarity index 71% rename from amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardObjectHelper.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java index f154119a..406f5bb6 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardObjectHelper.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardObjectHelper.java @@ -1,18 +1,18 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.leases; import java.math.BigInteger; import java.util.ArrayList; @@ -25,7 +25,7 @@ import com.amazonaws.services.kinesis.model.Shard; /** * Helper class to create Shard, SequenceRange and related objects. */ -class ShardObjectHelper { +public class ShardObjectHelper { private static final int EXPONENT = 128; @@ -42,7 +42,7 @@ class ShardObjectHelper { /** * Max value of a hash key (2^128 -1). Useful for defining hash key range for a shard. */ - static final String MAX_HASH_KEY = new BigInteger("2").pow(EXPONENT).subtract(BigInteger.ONE).toString(); + public static final String MAX_HASH_KEY = new BigInteger("2").pow(EXPONENT).subtract(BigInteger.ONE).toString(); /** * Min value of a hash key (0). Useful for defining sequence number range for a shard. @@ -78,11 +78,11 @@ class ShardObjectHelper { * @param hashKeyRange * @return */ - static Shard newShard(String shardId, - String parentShardId, - String adjacentParentShardId, - SequenceNumberRange sequenceNumberRange, - HashKeyRange hashKeyRange) { + public static Shard newShard(String shardId, + String parentShardId, + String adjacentParentShardId, + SequenceNumberRange sequenceNumberRange, + HashKeyRange hashKeyRange) { Shard shard = new Shard(); shard.setShardId(shardId); shard.setParentShardId(parentShardId); @@ -98,7 +98,7 @@ class ShardObjectHelper { * @param endingSequenceNumber * @return */ - static SequenceNumberRange newSequenceNumberRange(String startingSequenceNumber, String endingSequenceNumber) { + public static SequenceNumberRange newSequenceNumberRange(String startingSequenceNumber, String endingSequenceNumber) { SequenceNumberRange range = new SequenceNumberRange(); range.setStartingSequenceNumber(startingSequenceNumber); range.setEndingSequenceNumber(endingSequenceNumber); @@ -110,7 +110,7 @@ class ShardObjectHelper { * @param endingHashKey * @return */ - static HashKeyRange newHashKeyRange(String startingHashKey, String endingHashKey) { + public static HashKeyRange newHashKeyRange(String startingHashKey, String endingHashKey) { HashKeyRange range = new HashKeyRange(); range.setStartingHashKey(startingHashKey); range.setEndingHashKey(endingHashKey); diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSequenceVerifier.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSequenceVerifier.java similarity index 69% rename from amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSequenceVerifier.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSequenceVerifier.java index 4176575c..89e90983 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSequenceVerifier.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSequenceVerifier.java @@ -1,18 +1,18 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.leases; import java.util.ArrayList; import java.util.Collections; @@ -25,6 +25,7 @@ import com.amazonaws.services.kinesis.model.Shard; import junit.framework.Assert; import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.leases.ShardObjectHelper; import software.amazon.kinesis.lifecycle.ShutdownReason; /** @@ -32,7 +33,7 @@ import software.amazon.kinesis.lifecycle.ShutdownReason; * Verifies that parent shard processors were shutdown before child shard processor was initialized. */ @Slf4j -class ShardSequenceVerifier { +public class ShardSequenceVerifier { private Map shardIdToShards = new HashMap(); private ConcurrentSkipListSet initializedShards = new ConcurrentSkipListSet<>(); private ConcurrentSkipListSet shutdownShards = new ConcurrentSkipListSet<>(); @@ -41,13 +42,13 @@ class ShardSequenceVerifier { /** * Constructor with the shard list for the stream. */ - ShardSequenceVerifier(List shardList) { + public ShardSequenceVerifier(List shardList) { for (Shard shard : shardList) { shardIdToShards.put(shard.getShardId(), shard); } } - void registerInitialization(String shardId) { + public void registerInitialization(String shardId) { List parentShardIds = ShardObjectHelper.getParentShardIds(shardIdToShards.get(shardId)); for (String parentShardId : parentShardIds) { if (initializedShards.contains(parentShardId)) { @@ -62,13 +63,13 @@ class ShardSequenceVerifier { initializedShards.add(shardId); } - void registerShutdown(String shardId, ShutdownReason reason) { + public void registerShutdown(String shardId, ShutdownReason reason) { if (reason.equals(ShutdownReason.TERMINATE)) { shutdownShards.add(shardId); } } - void verify() { + public void verify() { for (String message : validationFailures) { log.error(message); } diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java similarity index 84% rename from amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java index dc7ed966..610ec419 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java @@ -1,23 +1,25 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.leases; import java.util.HashSet; import java.util.List; import java.util.Set; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -31,6 +33,7 @@ import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClient; +import software.amazon.kinesis.leases.ShardSyncTask; import software.amazon.kinesis.retrieval.IKinesisProxy; import software.amazon.kinesis.retrieval.KinesisProxy; import software.amazon.kinesis.leases.exceptions.DependencyException; diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncerTest.java similarity index 98% rename from amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncerTest.java index 345ad008..7ccae4e1 100644 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncerTest.java @@ -1,18 +1,18 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +package software.amazon.kinesis.leases; import java.io.File; import java.io.IOException; @@ -25,6 +25,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -34,7 +36,7 @@ import org.junit.Test; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ExceptionThrowingLeaseManager.ExceptionThrowingLeaseManagerMethods; +import software.amazon.kinesis.leases.ExceptionThrowingLeaseManager.ExceptionThrowingLeaseManagerMethods; import software.amazon.kinesis.retrieval.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator; @@ -43,9 +45,6 @@ import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.LeasingException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; -import software.amazon.kinesis.leases.KinesisClientLease; -import software.amazon.kinesis.leases.KinesisClientLeaseManager; -import software.amazon.kinesis.leases.LeaseManager; import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.Shard; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java index fb2bbf7f..c8885077 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java @@ -20,7 +20,7 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.List; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import software.amazon.kinesis.leases.ShardInfo; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -28,9 +28,6 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import software.amazon.kinesis.lifecycle.BlockOnParentShardTask; -import software.amazon.kinesis.lifecycle.TaskResult; -import software.amazon.kinesis.lifecycle.TaskType; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index 988569d8..38cd09b4 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -31,7 +31,7 @@ import java.util.concurrent.Future; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import software.amazon.kinesis.leases.ShardInfo; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownNotification; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig; import org.hamcrest.Condition; @@ -44,17 +44,6 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import software.amazon.kinesis.lifecycle.BlockOnParentShardTask; -import software.amazon.kinesis.lifecycle.ConsumerStates; -import software.amazon.kinesis.lifecycle.ITask; -import software.amazon.kinesis.lifecycle.InitializeTask; -import software.amazon.kinesis.lifecycle.ProcessTask; -import software.amazon.kinesis.lifecycle.ShardConsumer; -import software.amazon.kinesis.lifecycle.ShutdownNotificationTask; -import software.amazon.kinesis.lifecycle.ShutdownReason; -import software.amazon.kinesis.lifecycle.ShutdownTask; -import software.amazon.kinesis.lifecycle.TaskResult; -import software.amazon.kinesis.lifecycle.TaskType; import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.IRecordProcessorCheckpointer; import software.amazon.kinesis.processor.v2.IRecordProcessor; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java index e3ff7e46..44544e36 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java @@ -41,7 +41,7 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import software.amazon.kinesis.leases.ShardInfo; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ThrottlingReporter; import org.junit.Before; @@ -50,15 +50,12 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import software.amazon.kinesis.lifecycle.ProcessTask; -import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.KinesisDataFetcher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.Messages; import software.amazon.kinesis.retrieval.kpl.Messages.AggregatedRecord; -import software.amazon.kinesis.lifecycle.ProcessRecordsInput; import software.amazon.kinesis.retrieval.kpl.UserRecord; import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.Record; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index 2c8ab0f1..b87aa8fe 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -54,8 +54,7 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.SequenceNumberValidator; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import software.amazon.kinesis.leases.ShardInfo; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownNotification; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.TestStreamlet; @@ -68,12 +67,6 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import software.amazon.kinesis.lifecycle.ConsumerStates; -import software.amazon.kinesis.lifecycle.ITask; -import software.amazon.kinesis.lifecycle.InitializeTask; -import software.amazon.kinesis.lifecycle.ShardConsumer; -import software.amazon.kinesis.lifecycle.ShutdownReason; -import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.checkpoint.Checkpoint; @@ -90,8 +83,6 @@ import software.amazon.kinesis.retrieval.RecordsFetcherFactory; import software.amazon.kinesis.retrieval.SimpleRecordsFetcherFactory; import software.amazon.kinesis.retrieval.SynchronousGetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import software.amazon.kinesis.lifecycle.InitializationInput; -import software.amazon.kinesis.lifecycle.ShutdownInput; import software.amazon.kinesis.retrieval.kpl.UserRecord; import software.amazon.kinesis.leases.KinesisClientLease; import software.amazon.kinesis.leases.ILeaseManager; @@ -359,7 +350,7 @@ public class ShardConsumerTest { RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( shardInfo, checkpoint, - new SequenceNumberValidator( + new Checkpoint.SequenceNumberValidator( streamConfig.getStreamProxy(), shardInfo.getShardId(), streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() @@ -511,7 +502,7 @@ public class ShardConsumerTest { RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( shardInfo, checkpoint, - new SequenceNumberValidator( + new Checkpoint.SequenceNumberValidator( streamConfig.getStreamProxy(), shardInfo.getShardId(), streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() @@ -640,7 +631,7 @@ public class ShardConsumerTest { RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( shardInfo, checkpoint, - new SequenceNumberValidator( + new Checkpoint.SequenceNumberValidator( streamConfig.getStreamProxy(), shardInfo.getShardId(), streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 315a6f56..fde3c213 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -25,7 +25,7 @@ import java.util.Set; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import software.amazon.kinesis.leases.ShardInfo; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.TestStreamlet; import org.junit.After; import org.junit.AfterClass; @@ -35,10 +35,6 @@ import org.junit.BeforeClass; import org.junit.Test; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException; -import software.amazon.kinesis.lifecycle.ShutdownReason; -import software.amazon.kinesis.lifecycle.ShutdownTask; -import software.amazon.kinesis.lifecycle.TaskResult; -import software.amazon.kinesis.lifecycle.TaskType; import software.amazon.kinesis.processor.v2.IRecordProcessor; import software.amazon.kinesis.retrieval.GetRecordsCache; import software.amazon.kinesis.retrieval.IKinesisProxy; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java index 76de1507..642ce2c5 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java @@ -38,7 +38,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import software.amazon.kinesis.leases.ShardInfo; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import org.junit.After; import org.junit.Before; @@ -49,13 +49,9 @@ import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; -import software.amazon.kinesis.retrieval.AsynchronousGetRecordsRetrievalStrategy; -import software.amazon.kinesis.retrieval.DataFetcherResult; -import software.amazon.kinesis.retrieval.IKinesisProxy; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.mockito.stubbing.Answer; -import software.amazon.kinesis.retrieval.KinesisDataFetcher; @RunWith(MockitoJUnitRunner.class) public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/KinesisDataFetcherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/KinesisDataFetcherTest.java index 94677f31..e8c14e8a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/KinesisDataFetcherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/KinesisDataFetcherTest.java @@ -38,7 +38,7 @@ import java.util.List; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import software.amazon.kinesis.leases.ShardInfo; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -49,12 +49,6 @@ import org.mockito.runners.MockitoJUnitRunner; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException; import software.amazon.kinesis.processor.ICheckpoint; import software.amazon.kinesis.checkpoint.SentinelCheckpoint; -import software.amazon.kinesis.retrieval.DataFetcherResult; -import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; -import software.amazon.kinesis.retrieval.IKinesisProxy; -import software.amazon.kinesis.retrieval.KinesisDataFetcher; -import software.amazon.kinesis.retrieval.KinesisProxy; -import software.amazon.kinesis.retrieval.SynchronousGetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.metrics.MetricsHelper; import software.amazon.kinesis.metrics.NullMetricsFactory; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCacheIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCacheIntegrationTest.java index 34d53a38..4f5fe955 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCacheIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/PrefetchGetRecordsCacheIntegrationTest.java @@ -33,7 +33,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import software.amazon.kinesis.leases.ShardInfo; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -43,11 +43,6 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; import org.mockito.stubbing.Answer; -import software.amazon.kinesis.retrieval.AsynchronousGetRecordsRetrievalStrategy; -import software.amazon.kinesis.retrieval.DataFetcherResult; -import software.amazon.kinesis.retrieval.GetRecordsCache; -import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; -import software.amazon.kinesis.retrieval.IKinesisProxy; import software.amazon.kinesis.lifecycle.ProcessRecordsInput; import software.amazon.kinesis.metrics.NullMetricsFactory; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; @@ -55,9 +50,6 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.Record; import lombok.extern.apachecommons.CommonsLog; -import software.amazon.kinesis.retrieval.KinesisDataFetcher; -import software.amazon.kinesis.retrieval.PrefetchGetRecordsCache; -import software.amazon.kinesis.retrieval.SynchronousGetRecordsRetrievalStrategy; /** * These are the integration tests for the PrefetchGetRecordsCache class.