Moved a lot of the checkpoint and lease classes now

This commit is contained in:
Pfifer, Justin 2018-03-14 11:15:13 -07:00
parent dbbcae9db6
commit 636bda22c7
51 changed files with 456 additions and 480 deletions

View file

@ -18,6 +18,7 @@ import java.io.Serializable;
import java.math.BigInteger;
import java.util.Comparator;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.SentinelCheckpoint;
/**
@ -87,7 +88,7 @@ class CheckpointValueComparator implements Comparator<String>, Serializable {
* @return a BigInteger value representation of the checkpointValue
*/
private static BigInteger bigIntegerValue(String checkpointValue) {
if (SequenceNumberValidator.isDigits(checkpointValue)) {
if (Checkpoint.SequenceNumberValidator.isDigits(checkpointValue)) {
return new BigInteger(checkpointValue);
} else if (SentinelCheckpoint.LATEST.toString().equals(checkpointValue)) {
return LATEST_BIG_INTEGER_VALUE;
@ -106,7 +107,7 @@ class CheckpointValueComparator implements Comparator<String>, Serializable {
* @return true if and only if the string is all digits or one of the SentinelCheckpoint values
*/
private static boolean isDigitsOrSentinelValue(String string) {
return SequenceNumberValidator.isDigits(string) || isSentinelValue(string);
return Checkpoint.SequenceNumberValidator.isDigits(string) || isSentinelValue(string);
}
/**

View file

@ -23,6 +23,8 @@ import org.apache.commons.lang.Validate;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.RegionUtils;
import software.amazon.kinesis.leases.NoOpShardPrioritization;
import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.ProcessTask;
import software.amazon.kinesis.lifecycle.ShardConsumer;

View file

@ -19,6 +19,10 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.DoesNothingPreparedCheckpointer;
import software.amazon.kinesis.checkpoint.PreparedCheckpointer;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.IPreparedCheckpointer;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
@ -47,7 +51,7 @@ public class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer
private ShardInfo shardInfo;
private SequenceNumberValidator sequenceNumberValidator;
private Checkpoint.SequenceNumberValidator sequenceNumberValidator;
private ExtendedSequenceNumber sequenceNumberAtShardEnd;
@ -61,7 +65,7 @@ public class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer
*/
public RecordProcessorCheckpointer(ShardInfo shardInfo,
ICheckpoint checkpoint,
SequenceNumberValidator validator,
Checkpoint.SequenceNumberValidator validator,
IMetricsFactory metricsFactory) {
this.shardInfo = shardInfo;
this.checkpoint = checkpoint;

View file

@ -1,127 +0,0 @@
/*
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.model.InvalidArgumentException;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import lombok.extern.slf4j.Slf4j;
/**
* This class provides some methods for validating sequence numbers. It provides a method
* {@link #validateSequenceNumber(String)} which validates a sequence number by attempting to get an iterator from
* Amazon Kinesis for that sequence number. (e.g. Before checkpointing a client provided sequence number in
* {@link RecordProcessorCheckpointer#checkpoint(String)} to prevent invalid sequence numbers from being checkpointed,
* which could prevent another shard consumer instance from processing the shard later on). This class also provides a
* utility function {@link #isDigits(String)} which is used to check whether a string is all digits
*/
@Slf4j
public class SequenceNumberValidator {
private IKinesisProxy proxy;
private String shardId;
private boolean validateWithGetIterator;
private static final int SERVER_SIDE_ERROR_CODE = 500;
/**
* Constructor.
*
* @param proxy Kinesis proxy to be used for getIterator call
* @param shardId ShardId to check with sequence numbers
* @param validateWithGetIterator Whether to attempt to get an iterator for this shard id and the sequence numbers
* being validated
*/
public SequenceNumberValidator(IKinesisProxy proxy, String shardId, boolean validateWithGetIterator) {
this.proxy = proxy;
this.shardId = shardId;
this.validateWithGetIterator = validateWithGetIterator;
}
/**
* Validates the sequence number by attempting to get an iterator from Amazon Kinesis. Repackages exceptions from
* Amazon Kinesis into the appropriate KCL exception to allow clients to determine exception handling strategies
*
* @param sequenceNumber The sequence number to be validated. Must be a numeric string
* @throws IllegalArgumentException Thrown when sequence number validation fails.
* @throws ThrottlingException Thrown when GetShardIterator returns a ProvisionedThroughputExceededException which
* indicates that too many getIterator calls are being made for this shard.
* @throws KinesisClientLibDependencyException Thrown when a service side error is received. This way clients have
* the option of retrying
*/
void validateSequenceNumber(String sequenceNumber)
throws IllegalArgumentException, ThrottlingException, KinesisClientLibDependencyException {
boolean atShardEnd = ExtendedSequenceNumber.SHARD_END.getSequenceNumber().equals(sequenceNumber);
if (!atShardEnd && !isDigits(sequenceNumber)) {
log.info("Sequence number must be numeric, but was {}", sequenceNumber);
throw new IllegalArgumentException("Sequence number must be numeric, but was " + sequenceNumber);
}
try {
if (!atShardEnd &&validateWithGetIterator) {
proxy.getIterator(shardId, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), sequenceNumber);
log.info("Validated sequence number {} with shard id {}", sequenceNumber, shardId);
}
} catch (InvalidArgumentException e) {
log.info("Sequence number {} is invalid for shard {}", sequenceNumber, shardId, e);
throw new IllegalArgumentException("Sequence number " + sequenceNumber + " is invalid for shard "
+ shardId, e);
} catch (ProvisionedThroughputExceededException e) {
// clients should have back off logic in their checkpoint logic
log.info("Exceeded throughput while getting an iterator for shard {}", shardId, e);
throw new ThrottlingException("Exceeded throughput while getting an iterator for shard " + shardId, e);
} catch (AmazonServiceException e) {
log.info("Encountered service exception while getting an iterator for shard {}", shardId, e);
if (e.getStatusCode() >= SERVER_SIDE_ERROR_CODE) {
// clients can choose whether to retry in their checkpoint logic
throw new KinesisClientLibDependencyException("Encountered service exception while getting an iterator"
+ " for shard " + shardId, e);
}
// Just throw any other exceptions, e.g. 400 errors caused by the client
throw e;
}
}
void validateSequenceNumber(ExtendedSequenceNumber checkpoint)
throws IllegalArgumentException, ThrottlingException, KinesisClientLibDependencyException {
validateSequenceNumber(checkpoint.getSequenceNumber());
if (checkpoint.getSubSequenceNumber() < 0) {
throw new IllegalArgumentException("SubSequence number must be non-negative, but was "
+ checkpoint.getSubSequenceNumber());
}
}
/**
* Checks if the string is composed of only digits.
*
* @param string
* @return true for a string of all digits, false otherwise (including false for null and empty string)
*/
static boolean isDigits(String string) {
if (string == null || string.length() == 0) {
return false;
}
for (int i = 0; i < string.length(); i++) {
if (!Character.isDigit(string.charAt(i))) {
return false;
}
}
return true;
}
}

View file

@ -1,38 +0,0 @@
/*
* Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import software.amazon.kinesis.processor.v2.IRecordProcessorFactory;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
/**
* Adapts a V1 {@link software.amazon.kinesis.processor.IRecordProcessorFactory
* IRecordProcessorFactory} to V2
* {@link IRecordProcessorFactory IRecordProcessorFactory}.
*/
class V1ToV2RecordProcessorFactoryAdapter implements IRecordProcessorFactory {
private software.amazon.kinesis.processor.IRecordProcessorFactory factory;
V1ToV2RecordProcessorFactoryAdapter(
software.amazon.kinesis.processor.IRecordProcessorFactory factory) {
this.factory = factory;
}
@Override
public IRecordProcessor createProcessor() {
return new V1ToV2RecordProcessorAdapter(factory.createProcessor());
}
}

View file

@ -40,10 +40,18 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator;
import software.amazon.kinesis.leases.ParentsFirstShardPrioritization;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSyncTask;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.V1ToV2RecordProcessorFactoryAdapter;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.v2.IRecordProcessorFactory;
import software.amazon.kinesis.processor.v2.IShutdownNotificationAware;

View file

@ -14,6 +14,15 @@
*/
package software.amazon.kinesis.checkpoint;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.model.InvalidArgumentException;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import lombok.Data;
@ -38,4 +47,105 @@ import lombok.Data;
this.checkpoint = checkpoint;
this.pendingCheckpoint = pendingCheckpoint;
}
/**
* This class provides some methods for validating sequence numbers. It provides a method
* {@link #validateSequenceNumber(String)} which validates a sequence number by attempting to get an iterator from
* Amazon Kinesis for that sequence number. (e.g. Before checkpointing a client provided sequence number in
* {@link RecordProcessorCheckpointer#checkpoint(String)} to prevent invalid sequence numbers from being checkpointed,
* which could prevent another shard consumer instance from processing the shard later on). This class also provides a
* utility function {@link #isDigits(String)} which is used to check whether a string is all digits
*/
@Slf4j
public static class SequenceNumberValidator {
private IKinesisProxy proxy;
private String shardId;
private boolean validateWithGetIterator;
private static final int SERVER_SIDE_ERROR_CODE = 500;
/**
* Constructor.
*
* @param proxy Kinesis proxy to be used for getIterator call
* @param shardId ShardId to check with sequence numbers
* @param validateWithGetIterator Whether to attempt to get an iterator for this shard id and the sequence numbers
* being validated
*/
public SequenceNumberValidator(IKinesisProxy proxy, String shardId, boolean validateWithGetIterator) {
this.proxy = proxy;
this.shardId = shardId;
this.validateWithGetIterator = validateWithGetIterator;
}
/**
* Validates the sequence number by attempting to get an iterator from Amazon Kinesis. Repackages exceptions from
* Amazon Kinesis into the appropriate KCL exception to allow clients to determine exception handling strategies
*
* @param sequenceNumber The sequence number to be validated. Must be a numeric string
* @throws IllegalArgumentException Thrown when sequence number validation fails.
* @throws ThrottlingException Thrown when GetShardIterator returns a ProvisionedThroughputExceededException which
* indicates that too many getIterator calls are being made for this shard.
* @throws KinesisClientLibDependencyException Thrown when a service side error is received. This way clients have
* the option of retrying
*/
public void validateSequenceNumber(String sequenceNumber)
throws IllegalArgumentException, ThrottlingException, KinesisClientLibDependencyException {
boolean atShardEnd = ExtendedSequenceNumber.SHARD_END.getSequenceNumber().equals(sequenceNumber);
if (!atShardEnd && !isDigits(sequenceNumber)) {
SequenceNumberValidator.log.info("Sequence number must be numeric, but was {}", sequenceNumber);
throw new IllegalArgumentException("Sequence number must be numeric, but was " + sequenceNumber);
}
try {
if (!atShardEnd &&validateWithGetIterator) {
proxy.getIterator(shardId, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), sequenceNumber);
SequenceNumberValidator.log.info("Validated sequence number {} with shard id {}", sequenceNumber, shardId);
}
} catch (InvalidArgumentException e) {
SequenceNumberValidator.log.info("Sequence number {} is invalid for shard {}", sequenceNumber, shardId, e);
throw new IllegalArgumentException("Sequence number " + sequenceNumber + " is invalid for shard "
+ shardId, e);
} catch (ProvisionedThroughputExceededException e) {
// clients should have back off logic in their checkpoint logic
SequenceNumberValidator.log.info("Exceeded throughput while getting an iterator for shard {}", shardId, e);
throw new ThrottlingException("Exceeded throughput while getting an iterator for shard " + shardId, e);
} catch (AmazonServiceException e) {
SequenceNumberValidator.log.info("Encountered service exception while getting an iterator for shard {}", shardId, e);
if (e.getStatusCode() >= SERVER_SIDE_ERROR_CODE) {
// clients can choose whether to retry in their checkpoint logic
throw new KinesisClientLibDependencyException("Encountered service exception while getting an iterator"
+ " for shard " + shardId, e);
}
// Just throw any other exceptions, e.g. 400 errors caused by the client
throw e;
}
}
void validateSequenceNumber(ExtendedSequenceNumber checkpoint)
throws IllegalArgumentException, ThrottlingException, KinesisClientLibDependencyException {
validateSequenceNumber(checkpoint.getSequenceNumber());
if (checkpoint.getSubSequenceNumber() < 0) {
throw new IllegalArgumentException("SubSequence number must be non-negative, but was "
+ checkpoint.getSubSequenceNumber());
}
}
/**
* Checks if the string is composed of only digits.
*
* @param string
* @return true for a string of all digits, false otherwise (including false for null and empty string)
*/
public static boolean isDigits(String string) {
if (string == null || string.length() == 0) {
return false;
}
for (int i = 0; i < string.length(); i++) {
if (!Character.isDigit(string.charAt(i))) {
return false;
}
}
return true;
}
}
}

View file

@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.checkpoint;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;

View file

@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.checkpoint;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.leases;
import java.util.ArrayList;
import java.util.Collection;
@ -33,9 +33,6 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.ILeaseManager;
import software.amazon.kinesis.metrics.IMetricsFactory;
import lombok.extern.slf4j.Slf4j;
@ -44,7 +41,7 @@ import lombok.extern.slf4j.Slf4j;
* This class is used to coordinate/manage leases owned by this worker process and to get/set checkpoints.
*/
@Slf4j
class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLease> implements ICheckpoint {
public class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLease> implements ICheckpoint {
private static final long DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10L;
private static final long DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L;
@ -144,7 +141,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/
boolean setCheckpoint(String shardId, ExtendedSequenceNumber checkpoint, UUID concurrencyToken)
public boolean setCheckpoint(String shardId, ExtendedSequenceNumber checkpoint, UUID concurrencyToken)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
KinesisClientLease lease = getCurrentlyHeldLease(shardId);
if (lease == null) {
@ -295,7 +292,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
* @throws DependencyException
* @throws ProvisionedThroughputException
*/
void initialize() throws ProvisionedThroughputException, DependencyException, IllegalStateException {
public void initialize() throws ProvisionedThroughputException, DependencyException, IllegalStateException {
final boolean newTableCreated =
leaseManager.createLeaseTableIfNotExists(initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity);
if (newTableCreated) {
@ -317,7 +314,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
* @throws DependencyException
* @throws InvalidStateException
*/
void runLeaseTaker() throws DependencyException, InvalidStateException {
public void runLeaseTaker() throws DependencyException, InvalidStateException {
super.runTaker();
}
@ -327,7 +324,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
* @throws DependencyException
* @throws InvalidStateException
*/
void runLeaseRenewer() throws DependencyException, InvalidStateException {
public void runLeaseRenewer() throws DependencyException, InvalidStateException {
super.runRenewer();
}
@ -337,7 +334,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
*
* @return LeaseManager
*/
ILeaseManager<KinesisClientLease> getLeaseManager() {
public ILeaseManager<KinesisClientLease> getLeaseManager() {
return leaseManager;
}

View file

@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.leases;
import java.util.List;

View file

@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.leases;
import java.util.ArrayList;
import java.util.Collections;

View file

@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.leases;
import java.util.Collection;
import java.util.Collections;
@ -95,7 +95,7 @@ public class ShardInfo {
*
* @return completion status of the shard
*/
protected boolean isCompleted() {
public boolean isCompleted() {
return ExtendedSequenceNumber.SHARD_END.equals(checkpoint);
}

View file

@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.leases;
import java.util.List;

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -12,14 +12,13 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.leases;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import software.amazon.kinesis.lifecycle.ITask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.ILeaseManager;
import lombok.extern.slf4j.Slf4j;
@ -30,7 +29,7 @@ import lombok.extern.slf4j.Slf4j;
* cleanupLeasesUponShardCompletion is true).
*/
@Slf4j
class ShardSyncTask implements ITask {
public class ShardSyncTask implements ITask {
private final IKinesisProxy kinesisProxy;
private final ILeaseManager<KinesisClientLease> leaseManager;
private InitialPositionInStreamExtended initialPosition;
@ -46,7 +45,7 @@ class ShardSyncTask implements ITask {
* start processing records from this point in the stream (when an application starts up for the first time)
* except for shards that already have a checkpoint (and their descendant shards).
*/
ShardSyncTask(IKinesisProxy kinesisProxy,
public ShardSyncTask(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesUponShardCompletion,

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -12,18 +12,18 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.leases;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import software.amazon.kinesis.lifecycle.ITask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.ILeaseManager;
import software.amazon.kinesis.metrics.IMetricsFactory;
import lombok.extern.slf4j.Slf4j;
@ -34,7 +34,7 @@ import lombok.extern.slf4j.Slf4j;
* Worker will use this class to kick off a sync task when it finds shards which have been completely processed.
*/
@Slf4j
class ShardSyncTaskManager {
public class ShardSyncTaskManager {
private ITask currentTask;
private Future<TaskResult> future;
private final IKinesisProxy kinesisProxy;
@ -60,7 +60,7 @@ class ShardSyncTaskManager {
* @param metricsFactory Metrics factory
* @param executorService ExecutorService to execute the shard sync tasks
*/
ShardSyncTaskManager(final IKinesisProxy kinesisProxy,
public ShardSyncTaskManager(final IKinesisProxy kinesisProxy,
final ILeaseManager<KinesisClientLease> leaseManager,
final InitialPositionInStreamExtended initialPositionInStream,
final boolean cleanupLeasesUponShardCompletion,
@ -78,7 +78,7 @@ class ShardSyncTaskManager {
this.initialPositionInStream = initialPositionInStream;
}
synchronized boolean syncShardAndLeaseInfo(Set<String> closedShardIds) {
public synchronized boolean syncShardAndLeaseInfo(Set<String> closedShardIds) {
return checkAndSubmitNextTask(closedShardIds);
}

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.leases;
import java.io.Serializable;
import java.math.BigInteger;
@ -26,6 +26,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import org.apache.commons.lang.StringUtils;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
@ -771,7 +773,7 @@ public class ShardSyncer {
* @param shard
* @return
*/
static KinesisClientLease newKCLLease(Shard shard) {
public static KinesisClientLease newKCLLease(Shard shard) {
KinesisClientLease newLease = new KinesisClientLease();
newLease.setLeaseKey(shard.getShardId());
List<String> parentShardIds = new ArrayList<String>(2);

View file

@ -15,7 +15,7 @@
package software.amazon.kinesis.lifecycle;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.ILeaseManager;

View file

@ -15,7 +15,7 @@
package software.amazon.kinesis.lifecycle;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import software.amazon.kinesis.leases.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.v2.IRecordProcessor;

View file

@ -20,7 +20,7 @@ import java.util.ListIterator;
import com.amazonaws.services.cloudwatch.model.StandardUnit;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import software.amazon.kinesis.leases.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ThrottlingReporter;
import software.amazon.kinesis.processor.v2.IRecordProcessor;

View file

@ -22,10 +22,10 @@ import java.util.concurrent.RejectedExecutionException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.SequenceNumberValidator;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import software.amazon.kinesis.leases.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownNotification;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig;
import software.amazon.kinesis.processor.ICheckpoint;
@ -175,7 +175,7 @@ public class ShardConsumer {
new RecordProcessorCheckpointer(
shardInfo,
checkpoint,
new SequenceNumberValidator(
new Checkpoint.SequenceNumberValidator(
streamConfig.getStreamProxy(),
shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()),

View file

@ -14,7 +14,7 @@
*/
package software.amazon.kinesis.lifecycle;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import software.amazon.kinesis.leases.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownNotification;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import software.amazon.kinesis.processor.v2.IRecordProcessor;

View file

@ -16,8 +16,8 @@ package software.amazon.kinesis.lifecycle;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncer;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardSyncer;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.IKinesisProxy;

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.metrics;
import software.amazon.kinesis.lifecycle.ITask;
import software.amazon.kinesis.lifecycle.TaskResult;
@ -71,7 +71,7 @@ public class MetricsCollectingTaskDecorator implements ITask {
return this.getClass().getName() + "<" + other.getTaskType() + ">(" + other + ")";
}
ITask getOther() {
public ITask getOther() {
return other;
}
}

View file

@ -1,5 +1,5 @@
/*
* Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.processor;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.lifecycle.InitializationInput;

View file

@ -0,0 +1,38 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.processor;
import software.amazon.kinesis.processor.v2.IRecordProcessorFactory;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
/**
* Adapts a V1 {@link software.amazon.kinesis.processor.IRecordProcessorFactory
* IRecordProcessorFactory} to V2
* {@link IRecordProcessorFactory IRecordProcessorFactory}.
*/
public class V1ToV2RecordProcessorFactoryAdapter implements IRecordProcessorFactory {
private software.amazon.kinesis.processor.IRecordProcessorFactory factory;
public V1ToV2RecordProcessorFactoryAdapter(
software.amazon.kinesis.processor.IRecordProcessorFactory factory) {
this.factory = factory;
}
@Override
public IRecordProcessor createProcessor() {
return new V1ToV2RecordProcessorAdapter(factory.createProcessor());
}
}

View file

@ -18,13 +18,10 @@ import java.util.Collections;
import java.util.Date;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import software.amazon.kinesis.leases.ShardInfo;
import org.apache.commons.lang.StringUtils;
import software.amazon.kinesis.checkpoint.SentinelCheckpoint;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.retrieval.MetricsCollectingKinesisProxyDecorator;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;

View file

@ -36,6 +36,7 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.verification.VerificationMode;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.lifecycle.ShardConsumer;
@RunWith(MockitoJUnitRunner.class)

View file

@ -26,6 +26,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import software.amazon.kinesis.leases.ShardSequenceVerifier;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import software.amazon.kinesis.processor.v2.IRecordProcessor;

View file

@ -18,6 +18,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import software.amazon.kinesis.leases.ShardSequenceVerifier;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.v2.IRecordProcessorFactory;

View file

@ -82,6 +82,13 @@ import org.mockito.stubbing.Answer;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException;
import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator;
import software.amazon.kinesis.leases.NoOpShardPrioritization;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardObjectHelper;
import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSequenceVerifier;
import software.amazon.kinesis.leases.ShardSyncer;
import software.amazon.kinesis.lifecycle.BlockOnParentShardTask;
import software.amazon.kinesis.lifecycle.ITask;
import software.amazon.kinesis.lifecycle.InitializeTask;
@ -91,8 +98,10 @@ import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.ShutdownTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import software.amazon.kinesis.processor.V1ToV2RecordProcessorFactoryAdapter;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.processor.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerCWMetricsFactory;

View file

@ -1,5 +1,21 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.checkpoint;
import software.amazon.kinesis.checkpoint.DoesNothingPreparedCheckpointer;
import software.amazon.kinesis.checkpoint.PreparedCheckpointer;
import software.amazon.kinesis.processor.IPreparedCheckpointer;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.checkpoint;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@ -26,6 +26,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map.Entry;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.IMetricsScope;
import org.junit.After;
import org.junit.Assert;
@ -39,7 +41,6 @@ import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.IPreparedCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheckpointImpl;
import software.amazon.kinesis.checkpoint.SentinelCheckpoint;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.retrieval.kpl.UserRecord;
import software.amazon.kinesis.metrics.MetricsHelper;
@ -57,7 +58,7 @@ public class RecordProcessorCheckpointerTest {
private String testConcurrencyToken = "testToken";
private ICheckpoint checkpoint;
private ShardInfo shardInfo;
private SequenceNumberValidator sequenceNumberValidator;
private Checkpoint.SequenceNumberValidator sequenceNumberValidator;
private String shardId = "shardId-123";
@Mock
@ -74,7 +75,7 @@ public class RecordProcessorCheckpointerTest {
Assert.assertEquals(this.startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId));
shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
sequenceNumberValidator = new SequenceNumberValidator(null, shardId, false);
sequenceNumberValidator = new Checkpoint.SequenceNumberValidator(null, shardId, false);
}
/**
@ -429,7 +430,7 @@ public class RecordProcessorCheckpointerTest {
*/
@Test
public final void testClientSpecifiedCheckpoint() throws Exception {
SequenceNumberValidator validator = mock(SequenceNumberValidator.class);
Checkpoint.SequenceNumberValidator validator = mock(Checkpoint.SequenceNumberValidator.class);
Mockito.doNothing().when(validator).validateSequenceNumber(anyString());
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory);
@ -517,7 +518,7 @@ public class RecordProcessorCheckpointerTest {
*/
@Test
public final void testClientSpecifiedTwoPhaseCheckpoint() throws Exception {
SequenceNumberValidator validator = mock(SequenceNumberValidator.class);
Checkpoint.SequenceNumberValidator validator = mock(Checkpoint.SequenceNumberValidator.class);
Mockito.doNothing().when(validator).validateSequenceNumber(anyString());
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, validator, metricsFactory);
@ -643,7 +644,7 @@ public class RecordProcessorCheckpointerTest {
@SuppressWarnings("serial")
@Test
public final void testMixedCheckpointCalls() throws Exception {
SequenceNumberValidator validator = mock(SequenceNumberValidator.class);
Checkpoint.SequenceNumberValidator validator = mock(Checkpoint.SequenceNumberValidator.class);
Mockito.doNothing().when(validator).validateSequenceNumber(anyString());
for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) {
@ -663,7 +664,7 @@ public class RecordProcessorCheckpointerTest {
@SuppressWarnings("serial")
@Test
public final void testMixedTwoPhaseCheckpointCalls() throws Exception {
SequenceNumberValidator validator = mock(SequenceNumberValidator.class);
Checkpoint.SequenceNumberValidator validator = mock(Checkpoint.SequenceNumberValidator.class);
Mockito.doNothing().when(validator).validateSequenceNumber(anyString());
for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) {
@ -684,7 +685,7 @@ public class RecordProcessorCheckpointerTest {
@SuppressWarnings("serial")
@Test
public final void testMixedTwoPhaseCheckpointCalls2() throws Exception {
SequenceNumberValidator validator = mock(SequenceNumberValidator.class);
Checkpoint.SequenceNumberValidator validator = mock(Checkpoint.SequenceNumberValidator.class);
Mockito.doNothing().when(validator).validateSequenceNumber(anyString());
for (LinkedHashMap<String, CheckpointAction> testPlan : getMixedCallsTestPlan()) {

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.checkpoint;
import junit.framework.Assert;
@ -21,6 +21,7 @@ import org.mockito.Mockito;
import static org.junit.Assert.fail;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.SentinelCheckpoint;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import com.amazonaws.services.kinesis.model.InvalidArgumentException;
@ -36,7 +37,7 @@ public class SequenceNumberValidatorTest {
IKinesisProxy proxy = Mockito.mock(IKinesisProxy.class);
SequenceNumberValidator validator = new SequenceNumberValidator(proxy, shardId, validateWithGetIterator);
Checkpoint.SequenceNumberValidator validator = new Checkpoint.SequenceNumberValidator(proxy, shardId, validateWithGetIterator);
String goodSequence = "456";
String iterator = "happyiterator";
@ -69,7 +70,7 @@ public class SequenceNumberValidatorTest {
public final void testNoValidation() {
IKinesisProxy proxy = Mockito.mock(IKinesisProxy.class);
String shardId = "shardid-123";
SequenceNumberValidator validator = new SequenceNumberValidator(proxy, shardId, !validateWithGetIterator);
Checkpoint.SequenceNumberValidator validator = new Checkpoint.SequenceNumberValidator(proxy, shardId, !validateWithGetIterator);
String goodSequence = "456";
// Just checking that the false flag for validating against getIterator is honored
@ -82,7 +83,7 @@ public class SequenceNumberValidatorTest {
nonNumericValueValidationTest(validator, proxy, !validateWithGetIterator);
}
private void nonNumericValueValidationTest(SequenceNumberValidator validator,
private void nonNumericValueValidationTest(Checkpoint.SequenceNumberValidator validator,
IKinesisProxy proxy,
boolean validateWithGetIterator) {
@ -115,7 +116,7 @@ public class SequenceNumberValidatorTest {
};
for (String digits : stringsOfDigits) {
Assert.assertTrue("Expected that " + digits + " would be considered a string of digits.",
SequenceNumberValidator.isDigits(digits));
Checkpoint.SequenceNumberValidator.isDigits(digits));
}
// Check things that are not all digits
String[] stringsWithNonDigits = {
@ -133,7 +134,7 @@ public class SequenceNumberValidatorTest {
};
for (String notAllDigits : stringsWithNonDigits) {
Assert.assertFalse("Expected that " + notAllDigits + " would not be considered a string of digits.",
SequenceNumberValidator.isDigits(notAllDigits));
Checkpoint.SequenceNumberValidator.isDigits(notAllDigits));
}
}
}

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.leases;
import java.util.Arrays;
import java.util.List;
@ -30,7 +30,7 @@ import lombok.extern.slf4j.Slf4j;
*
*/
@Slf4j
class ExceptionThrowingLeaseManager implements ILeaseManager<KinesisClientLease> {
public class ExceptionThrowingLeaseManager implements ILeaseManager<KinesisClientLease> {
private static final Throwable EXCEPTION_MSG = new Throwable("Test Exception");
// Use array below to control in what situations we want to throw exceptions.
@ -39,7 +39,7 @@ class ExceptionThrowingLeaseManager implements ILeaseManager<KinesisClientLease>
/**
* Methods which we support (simulate exceptions).
*/
enum ExceptionThrowingLeaseManagerMethods {
public enum ExceptionThrowingLeaseManagerMethods {
CREATELEASETABLEIFNOTEXISTS(0),
LEASETABLEEXISTS(1),
WAITUNTILLEASETABLEEXISTS(2),

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.leases;
import java.util.ArrayList;
import java.util.Collections;
@ -35,10 +35,6 @@ import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.KinesisClientLeaseManager;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.ILeaseRenewer;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.leases;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doReturn;
@ -28,6 +28,7 @@ import org.mockito.MockitoAnnotations;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import software.amazon.kinesis.leases.KinesisClientLibLeaseCoordinator;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;

View file

@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.leases;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.leases;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@ -28,6 +28,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
public class ShardInfoTest {

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.leases;
import java.math.BigInteger;
import java.util.ArrayList;
@ -25,7 +25,7 @@ import com.amazonaws.services.kinesis.model.Shard;
/**
* Helper class to create Shard, SequenceRange and related objects.
*/
class ShardObjectHelper {
public class ShardObjectHelper {
private static final int EXPONENT = 128;
@ -42,7 +42,7 @@ class ShardObjectHelper {
/**
* Max value of a hash key (2^128 -1). Useful for defining hash key range for a shard.
*/
static final String MAX_HASH_KEY = new BigInteger("2").pow(EXPONENT).subtract(BigInteger.ONE).toString();
public static final String MAX_HASH_KEY = new BigInteger("2").pow(EXPONENT).subtract(BigInteger.ONE).toString();
/**
* Min value of a hash key (0). Useful for defining sequence number range for a shard.
@ -78,7 +78,7 @@ class ShardObjectHelper {
* @param hashKeyRange
* @return
*/
static Shard newShard(String shardId,
public static Shard newShard(String shardId,
String parentShardId,
String adjacentParentShardId,
SequenceNumberRange sequenceNumberRange,
@ -98,7 +98,7 @@ class ShardObjectHelper {
* @param endingSequenceNumber
* @return
*/
static SequenceNumberRange newSequenceNumberRange(String startingSequenceNumber, String endingSequenceNumber) {
public static SequenceNumberRange newSequenceNumberRange(String startingSequenceNumber, String endingSequenceNumber) {
SequenceNumberRange range = new SequenceNumberRange();
range.setStartingSequenceNumber(startingSequenceNumber);
range.setEndingSequenceNumber(endingSequenceNumber);
@ -110,7 +110,7 @@ class ShardObjectHelper {
* @param endingHashKey
* @return
*/
static HashKeyRange newHashKeyRange(String startingHashKey, String endingHashKey) {
public static HashKeyRange newHashKeyRange(String startingHashKey, String endingHashKey) {
HashKeyRange range = new HashKeyRange();
range.setStartingHashKey(startingHashKey);
range.setEndingHashKey(endingHashKey);

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.leases;
import java.util.ArrayList;
import java.util.Collections;
@ -25,6 +25,7 @@ import com.amazonaws.services.kinesis.model.Shard;
import junit.framework.Assert;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.leases.ShardObjectHelper;
import software.amazon.kinesis.lifecycle.ShutdownReason;
/**
@ -32,7 +33,7 @@ import software.amazon.kinesis.lifecycle.ShutdownReason;
* Verifies that parent shard processors were shutdown before child shard processor was initialized.
*/
@Slf4j
class ShardSequenceVerifier {
public class ShardSequenceVerifier {
private Map<String, Shard> shardIdToShards = new HashMap<String, Shard>();
private ConcurrentSkipListSet<String> initializedShards = new ConcurrentSkipListSet<>();
private ConcurrentSkipListSet<String> shutdownShards = new ConcurrentSkipListSet<>();
@ -41,13 +42,13 @@ class ShardSequenceVerifier {
/**
* Constructor with the shard list for the stream.
*/
ShardSequenceVerifier(List<Shard> shardList) {
public ShardSequenceVerifier(List<Shard> shardList) {
for (Shard shard : shardList) {
shardIdToShards.put(shard.getShardId(), shard);
}
}
void registerInitialization(String shardId) {
public void registerInitialization(String shardId) {
List<String> parentShardIds = ShardObjectHelper.getParentShardIds(shardIdToShards.get(shardId));
for (String parentShardId : parentShardIds) {
if (initializedShards.contains(parentShardId)) {
@ -62,13 +63,13 @@ class ShardSequenceVerifier {
initializedShards.add(shardId);
}
void registerShutdown(String shardId, ShutdownReason reason) {
public void registerShutdown(String shardId, ShutdownReason reason) {
if (reason.equals(ShutdownReason.TERMINATE)) {
shutdownShards.add(shardId);
}
}
void verify() {
public void verify() {
for (String message : validationFailures) {
log.error(message);
}

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -12,12 +12,14 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.leases;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@ -31,6 +33,7 @@ import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import software.amazon.kinesis.leases.ShardSyncTask;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.retrieval.KinesisProxy;
import software.amazon.kinesis.leases.exceptions.DependencyException;

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
package software.amazon.kinesis.leases;
import java.io.File;
import java.io.IOException;
@ -25,6 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -34,7 +36,7 @@ import org.junit.Test;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ExceptionThrowingLeaseManager.ExceptionThrowingLeaseManagerMethods;
import software.amazon.kinesis.leases.ExceptionThrowingLeaseManager.ExceptionThrowingLeaseManagerMethods;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator;
@ -43,9 +45,6 @@ import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.KinesisClientLeaseManager;
import software.amazon.kinesis.leases.LeaseManager;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;

View file

@ -20,7 +20,7 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import software.amazon.kinesis.leases.ShardInfo;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@ -28,9 +28,6 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import software.amazon.kinesis.lifecycle.BlockOnParentShardTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;

View file

@ -31,7 +31,7 @@ import java.util.concurrent.Future;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import software.amazon.kinesis.leases.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownNotification;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig;
import org.hamcrest.Condition;
@ -44,17 +44,6 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.lifecycle.BlockOnParentShardTask;
import software.amazon.kinesis.lifecycle.ConsumerStates;
import software.amazon.kinesis.lifecycle.ITask;
import software.amazon.kinesis.lifecycle.InitializeTask;
import software.amazon.kinesis.lifecycle.ProcessTask;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShutdownNotificationTask;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.ShutdownTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.IRecordProcessorCheckpointer;
import software.amazon.kinesis.processor.v2.IRecordProcessor;

View file

@ -41,7 +41,7 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import software.amazon.kinesis.leases.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ThrottlingReporter;
import org.junit.Before;
@ -50,15 +50,12 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import software.amazon.kinesis.lifecycle.ProcessTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.KinesisDataFetcher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.retrieval.kpl.Messages;
import software.amazon.kinesis.retrieval.kpl.Messages.AggregatedRecord;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.kpl.UserRecord;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record;

View file

@ -54,8 +54,7 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.SequenceNumberValidator;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import software.amazon.kinesis.leases.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownNotification;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.TestStreamlet;
@ -68,12 +67,6 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.lifecycle.ConsumerStates;
import software.amazon.kinesis.lifecycle.ITask;
import software.amazon.kinesis.lifecycle.InitializeTask;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.checkpoint.Checkpoint;
@ -90,8 +83,6 @@ import software.amazon.kinesis.retrieval.RecordsFetcherFactory;
import software.amazon.kinesis.retrieval.SimpleRecordsFetcherFactory;
import software.amazon.kinesis.retrieval.SynchronousGetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.lifecycle.InitializationInput;
import software.amazon.kinesis.lifecycle.ShutdownInput;
import software.amazon.kinesis.retrieval.kpl.UserRecord;
import software.amazon.kinesis.leases.KinesisClientLease;
import software.amazon.kinesis.leases.ILeaseManager;
@ -359,7 +350,7 @@ public class ShardConsumerTest {
RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
shardInfo,
checkpoint,
new SequenceNumberValidator(
new Checkpoint.SequenceNumberValidator(
streamConfig.getStreamProxy(),
shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()
@ -511,7 +502,7 @@ public class ShardConsumerTest {
RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
shardInfo,
checkpoint,
new SequenceNumberValidator(
new Checkpoint.SequenceNumberValidator(
streamConfig.getStreamProxy(),
shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()
@ -640,7 +631,7 @@ public class ShardConsumerTest {
RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
shardInfo,
checkpoint,
new SequenceNumberValidator(
new Checkpoint.SequenceNumberValidator(
streamConfig.getStreamProxy(),
shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()

View file

@ -25,7 +25,7 @@ import java.util.Set;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import software.amazon.kinesis.leases.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.TestStreamlet;
import org.junit.After;
import org.junit.AfterClass;
@ -35,10 +35,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.ShutdownTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.processor.v2.IRecordProcessor;
import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.IKinesisProxy;

View file

@ -38,7 +38,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import software.amazon.kinesis.leases.ShardInfo;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import org.junit.After;
import org.junit.Before;
@ -49,13 +49,9 @@ import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.retrieval.AsynchronousGetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.mockito.stubbing.Answer;
import software.amazon.kinesis.retrieval.KinesisDataFetcher;
@RunWith(MockitoJUnitRunner.class)
public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {

View file

@ -38,7 +38,7 @@ import java.util.List;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import software.amazon.kinesis.leases.ShardInfo;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@ -49,12 +49,6 @@ import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException;
import software.amazon.kinesis.processor.ICheckpoint;
import software.amazon.kinesis.checkpoint.SentinelCheckpoint;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.retrieval.KinesisDataFetcher;
import software.amazon.kinesis.retrieval.KinesisProxy;
import software.amazon.kinesis.retrieval.SynchronousGetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.metrics.MetricsHelper;
import software.amazon.kinesis.metrics.NullMetricsFactory;

View file

@ -33,7 +33,7 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import software.amazon.kinesis.leases.ShardInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -43,11 +43,6 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import software.amazon.kinesis.retrieval.AsynchronousGetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.GetRecordsCache;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.IKinesisProxy;
import software.amazon.kinesis.lifecycle.ProcessRecordsInput;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
@ -55,9 +50,6 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import lombok.extern.apachecommons.CommonsLog;
import software.amazon.kinesis.retrieval.KinesisDataFetcher;
import software.amazon.kinesis.retrieval.PrefetchGetRecordsCache;
import software.amazon.kinesis.retrieval.SynchronousGetRecordsRetrievalStrategy;
/**
* These are the integration tests for the PrefetchGetRecordsCache class.