Cleaning up tests (#408)

* Deleting all unused tests for KinesisProxy
This commit is contained in:
Sahil Palvia 2018-09-20 03:30:35 +05:30 committed by Justin Pfifer
parent a893da6942
commit 131b1e4b0f
4 changed files with 0 additions and 1118 deletions

View file

@ -1,433 +0,0 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.proxies;
import java.io.IOException;
import lombok.extern.slf4j.Slf4j;
/**
* This is a (temporary) test utility class, to mimic Kinesis without having to integrate with Alpha.
* In future, we should consider moving this to the Kinesis client/sampleApp package (if useful to
* other Kinesis clients).
*/
@Slf4j
public class KinesisLocalFileProxy {
/*
*//**
* Fields in the local file and their position in a line.
*//*
public enum LocalFileFields {
*//** Shard identifier. *//*
SHARD_ID(0),
*//** Sequence number (assumed unique across shards. *//*
SEQUENCE_NUMBER(1),
*//** Partition key associated with data record. *//*
PARTITION_KEY(2),
*//** Data. *//*
DATA(3),
*//** Approximate arrival timestamp. *//*
APPROXIMATE_ARRIVAL_TIMESTAMP(4);
private final int position;
LocalFileFields(int position) {
this.position = position;
}
*//**
* @return Position of the field in the line.
*//*
public int getPosition() {
return position;
}
};
private static final String ITERATOR_DELIMITER = ":";
private static final int NUM_FIELDS_IN_FILE = LocalFileFields.values().length;
private final Map<String, List<Record>> shardedDataRecords = new HashMap<String, List<Record>>();
private List<Shard> shardList;
// Ids of shards that are closed - used to return a null iterator in getRecords after the last record
private Set<String> closedShards = new HashSet<String>();
private static final int EXPONENT = 128;
*//**
* Max value of the hashed partition key (2^128-1). Useful for constructing shards for a stream.
*//*
public static final BigInteger MAX_HASHKEY_VALUE = new BigInteger("2").pow(EXPONENT).subtract(BigInteger.ONE);
*//**
* Max value of a sequence number (2^128 -1). Useful for defining sequence number range for a shard.
*//*
public static final BigInteger MAX_SEQUENCE_NUMBER = new BigInteger("2").pow(EXPONENT).subtract(BigInteger.ONE);
*//**
* @param fileName File with data records (one per line).
* File format (shardId, sequenceNumber, partitionKey, dataRecord).
* @throws IOException IOException
*//*
public KinesisLocalFileProxy(String fileName) throws IOException {
super();
populateDataRecordsFromFile(fileName);
}
private void populateDataRecordsFromFile(String file) throws IOException {
try (BufferedReader in = new BufferedReader(
new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))) {
Charset charset = Charset.forName("UTF-8");
CharsetEncoder encoder = charset.newEncoder();
String str;
str = in.readLine();
if (str != null) {
ObjectMapper objectMapper = new ObjectMapper();
SerializedShardList shards = objectMapper.readValue(str, SerializedShardList.class);
shardList = shards.getShardList();
}
if (shardList == null) {
shardList = new ArrayList<Shard>();
}
// Populate shardIds of shards that have an ending sequence number (and which != maxSeqNum).
// GetRecords will return a null iterator for these after all data has been returned.
for (Shard shard : shardList) {
SequenceNumberRange range = shard.getSequenceNumberRange();
if ((range != null) && (range.getEndingSequenceNumber() != null)) {
BigInteger endingSequenceNumber = new BigInteger(range.getEndingSequenceNumber());
if (endingSequenceNumber.compareTo(MAX_SEQUENCE_NUMBER) != 0) {
closedShards.add(shard.shardId());
}
}
shardedDataRecords.put(shard.shardId(), new ArrayList<Record>());
}
while ((str = in.readLine()) != null) {
String[] strArr = str.split(",");
if (strArr.length != NUM_FIELDS_IN_FILE) {
throw new InvalidArgumentException("Unexpected input in file."
+ "Expected format (shardId, sequenceNumber, partitionKey, dataRecord, timestamp)");
}
String shardId = strArr[LocalFileFields.SHARD_ID.getPosition()];
Record record = new Record();
record.setSequenceNumber(strArr[LocalFileFields.SEQUENCE_NUMBER.getPosition()]);
record.setPartitionKey(strArr[LocalFileFields.PARTITION_KEY.getPosition()]);
ByteBuffer byteBuffer = encoder.encode(CharBuffer.wrap(strArr[LocalFileFields.DATA.getPosition()]));
record.setData(byteBuffer);
Date timestamp =
new Date(Long.parseLong(strArr[LocalFileFields.APPROXIMATE_ARRIVAL_TIMESTAMP.getPosition()]));
record.setApproximateArrivalTimestamp(timestamp);
List<Record> shardRecords = shardedDataRecords.get(shardId);
if (shardRecords == null) {
shardRecords = new ArrayList<Record>();
}
shardRecords.add(record);
shardedDataRecords.put(shardId, shardRecords);
}
}
}
*//*
* (non-Javadoc)
*
* @see com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy#getStreamInfo()
*//*
@Override
public DescribeStreamResult getStreamInfo(String startShardId) throws ResourceNotFoundException {
assert false : "getStreamInfo is not implemented.";
return null;
}
@Override
public Set<String> getAllShardIds() throws ResourceNotFoundException {
Set<String> shardIds = new HashSet<String>();
if (shardedDataRecords != null) {
shardIds.addAll(shardedDataRecords.keySet());
}
return shardIds;
}
*//**
* Note, this method has package level access solely for testing purposes.
*//*
static String serializeIterator(String shardId, String sequenceNumber) {
return String.format("%s%s%s", shardId, ITERATOR_DELIMITER, sequenceNumber);
}
*//**
* Container class for the return tuple of deserializeIterator.
*//*
// CHECKSTYLE:IGNORE VisibilityModifier FOR NEXT 10 LINES
static class IteratorInfo {
public String shardId;
public String sequenceNumber;
public IteratorInfo(String shardId, String sequenceNumber) {
this.shardId = shardId;
this.sequenceNumber = sequenceNumber;
}
}
*//**
* Deserialize our iterator - used by test cases to inspect returned iterators.
*
* @param iterator
* @return iteratorInfo
*//*
static IteratorInfo deserializeIterator(String iterator) {
String[] splits = iterator.split(ITERATOR_DELIMITER);
return new IteratorInfo(splits[0], splits[1]);
}
*//**
* {@inheritDoc}
*//*
@Override
public String getIterator(String shardId, String iteratorEnum, String sequenceNumber)
throws ResourceNotFoundException, InvalidArgumentException {
*//*
* If we don't have records in this shard, any iterator will return the empty list. Using a
* sequence number of 1 on an empty shard will give this behavior.
*//*
List<Record> shardRecords = shardedDataRecords.get(shardId);
if (shardRecords == null) {
throw new ResourceNotFoundException(shardId + " does not exist");
}
if (shardRecords.isEmpty()) {
return serializeIterator(shardId, "1");
}
if (ShardIteratorType.LATEST.toString().equals(iteratorEnum)) {
*//*
* If we do have records, LATEST should return an iterator that can be used to read the
* last record. Our iterators are inclusive for convenience.
*//*
Record last = shardRecords.get(shardRecords.size() - 1);
return serializeIterator(shardId, last.sequenceNumber());
} else if (ShardIteratorType.TRIM_HORIZON.toString().equals(iteratorEnum)) {
return serializeIterator(shardId, shardRecords.get(0).sequenceNumber());
} else if (ShardIteratorType.AT_SEQUENCE_NUMBER.toString().equals(iteratorEnum)) {
return serializeIterator(shardId, sequenceNumber);
} else if (ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString().equals(iteratorEnum)) {
BigInteger num = new BigInteger(sequenceNumber);
num = num.add(BigInteger.ONE);
return serializeIterator(shardId, num.toString());
} else {
throw new IllegalArgumentException("IteratorEnum value was invalid: " + iteratorEnum);
}
}
*//**
* {@inheritDoc}
*//*
@Override
public String getIterator(String shardId, String iteratorEnum)
throws ResourceNotFoundException, InvalidArgumentException {
*//*
* If we don't have records in this shard, any iterator will return the empty list. Using a
* sequence number of 1 on an empty shard will give this behavior.
*//*
List<Record> shardRecords = shardedDataRecords.get(shardId);
if (shardRecords == null) {
throw new ResourceNotFoundException(shardId + " does not exist");
}
if (shardRecords.isEmpty()) {
return serializeIterator(shardId, "1");
}
final String serializedIterator;
if (ShardIteratorType.LATEST.toString().equals(iteratorEnum)) {
*//*
* If we do have records, LATEST should return an iterator that can be used to read the
* last record. Our iterators are inclusive for convenience.
*//*
Record last = shardRecords.get(shardRecords.size() - 1);
serializedIterator = serializeIterator(shardId, last.sequenceNumber());
} else if (ShardIteratorType.TRIM_HORIZON.toString().equals(iteratorEnum)) {
serializedIterator = serializeIterator(shardId, shardRecords.get(0).sequenceNumber());
} else {
throw new IllegalArgumentException("IteratorEnum value was invalid: " + iteratorEnum);
}
return serializedIterator;
}
*//**
* {@inheritDoc}
*//*
@Override
public String getIterator(String shardId, Date timestamp)
throws ResourceNotFoundException, InvalidArgumentException {
*//*
* If we don't have records in this shard, any iterator will return the empty list. Using a
* sequence number of 1 on an empty shard will give this behavior.
*//*
List<Record> shardRecords = shardedDataRecords.get(shardId);
if (shardRecords == null) {
throw new ResourceNotFoundException(shardId + " does not exist");
}
if (shardRecords.isEmpty()) {
return serializeIterator(shardId, "1");
}
final String serializedIterator;
if (timestamp != null) {
String seqNumAtTimestamp = findSequenceNumberAtTimestamp(shardRecords, timestamp);
serializedIterator = serializeIterator(shardId, seqNumAtTimestamp);
} else {
throw new IllegalArgumentException("Timestamp must be specified for AT_TIMESTAMP iterator");
}
return serializedIterator;
}
private String findSequenceNumberAtTimestamp(final List<Record> shardRecords, final Date timestamp) {
for (Record rec : shardRecords) {
if (rec.getApproximateArrivalTimestamp().getTime() >= timestamp.getTime()) {
return rec.sequenceNumber();
}
}
return null;
}
*//*
* (non-Javadoc)
*
* @see com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy#get(java.nio.ByteBuffer, int)
*//*
@Override
public GetRecordsResult get(String serializedKinesisIterator, int maxRecords)
throws ResourceNotFoundException, InvalidArgumentException, ExpiredIteratorException {
IteratorInfo iterator = deserializeIterator(serializedKinesisIterator);
BigInteger startingPosition = new BigInteger(iterator.sequenceNumber);
BigInteger lastRecordsSeqNo = BigInteger.ONE;
List<Record> recordsToReturn = new ArrayList<Record>();
List<Record> shardRecords = shardedDataRecords.get(iterator.shardId);
if (shardRecords == null) {
throw new ResourceNotFoundException(iterator.shardId + " does not exist");
}
boolean isHasMoreShards = false;
for (int i = 0; i < shardRecords.size(); i++) {
Record record = shardRecords.get(i);
BigInteger recordSequenceNumber = new BigInteger(record.sequenceNumber());
// update lastRecordsSeqNo so if we return no records, it will be the seqNo of the last record.
lastRecordsSeqNo = recordSequenceNumber;
if (recordSequenceNumber.compareTo(startingPosition) >= 0) {
// Set endIndex (of sublist) to cap at either maxRecords or end of list.
int endIndex = Math.min(i + maxRecords, shardRecords.size());
recordsToReturn.addAll(shardRecords.subList(i, endIndex));
lastRecordsSeqNo = new BigInteger(shardRecords.get(endIndex - 1).sequenceNumber());
if (endIndex < shardRecords.size()) {
isHasMoreShards = true;
}
break;
}
}
GetRecordsResult response = new GetRecordsResult();
response.setRecords(recordsToReturn);
// Set iterator only if the shard is not closed.
if (isHasMoreShards || (!closedShards.contains(iterator.shardId))) {
*//*
* Use the sequence number of the last record returned + 1 to compute the next iterator.
*//*
response.setNextShardIterator(serializeIterator(iterator.shardId, lastRecordsSeqNo.add(BigInteger.ONE)
.toString()));
log.debug("Returning a non null iterator for shard {}", iterator.shardId);
} else {
log.info("Returning null iterator for shard {}", iterator.shardId);
}
return response;
}
*//**
* {@inheritDoc}
*//*
@Override
public PutRecordResult put(String exclusiveMinimumSequenceNumber,
String explicitHashKey,
String partitionKey,
ByteBuffer data) throws ResourceNotFoundException, InvalidArgumentException {
PutRecordResult output = new PutRecordResult();
BigInteger startingPosition = BigInteger.ONE;
if (exclusiveMinimumSequenceNumber != null) {
startingPosition = new BigInteger(exclusiveMinimumSequenceNumber).add(BigInteger.ONE);
}
output.setSequenceNumber(startingPosition.toString());
return output;
}
*//**
* {@inheritDoc}
*//*
@Override
public List<Shard> getShardList() throws ResourceNotFoundException {
List<Shard> shards = new LinkedList<Shard>();
shards.addAll(shardList);
return shards;
}
*//**
* Used for serializing/deserializing the shard list to the file.
*//*
public static class SerializedShardList {
private List<Shard> shardList = new LinkedList<Shard>();
*//**
* Public to enable Jackson object mapper serialization.
*//*
public SerializedShardList() {
}
*//**
* @param shardList List of shards for the stream.
*//*
public SerializedShardList(List<Shard> shardList) {
this.shardList.addAll(shardList);
}
*//**
* public to enable Jackson object mapper serialization.
*
* @return shardList
*//*
public List<Shard> getShardList() {
return shardList;
}
*//**
* public to enable Jackson object mapper deserialization.
*
* @param shardList List of shards
*//*
public void setShardList(List<Shard> shardList) {
this.shardList = shardList;
}
}*/
}

View file

@ -1,58 +0,0 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.proxies;
import java.io.IOException;
/** Factory for KinesisProxy objects that use a local file for data. Useful for testing purposes.
*
*/
public class KinesisLocalFileProxyFactory {
/*private static final int DEFAULT_NUM_SHARDS = 3;
private static final String DEFAULT_SHARD_ID_PREFIX = "ShardId-";
private static final int DEFAULT_NUM_RECORDS_PER_SHARD = 10;
private static final BigInteger DEFAULT_STARTING_SEQUENCE_NUMBER = BigInteger.ZERO;
private static final String DEFAULT_TEST_PROXY_FILE = "defaultKinesisProxyLocalFile";
private IKinesisProxy testKinesisProxy;
*//**
* @param fileName File to be used for stream data.
* If the file exists then it is expected to contain information for creating a test proxy object.
* If the file does not exist then a temporary file containing default values for a test proxy object
* will be created and used.
* @throws IOException This will be thrown if we can't read/create the data file.
*//*
public KinesisLocalFileProxyFactory(String fileName) throws IOException {
File f = new File(fileName);
if (!f.exists()) {
f = KinesisLocalFileDataCreator.generateTempDataFile(
DEFAULT_NUM_SHARDS, DEFAULT_SHARD_ID_PREFIX, DEFAULT_NUM_RECORDS_PER_SHARD,
DEFAULT_STARTING_SEQUENCE_NUMBER, DEFAULT_TEST_PROXY_FILE);
}
testKinesisProxy = new KinesisLocalFileProxy(f.getAbsolutePath());
}
*//* (non-Javadoc)
* @see com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxyFactory#getProxy(java.lang.String)
*//*
@Override
public IKinesisProxy getProxy(String streamARN) {
return testKinesisProxy;
}*/
}

View file

@ -1,408 +0,0 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.proxies;
public class KinesisProxyTest {
/*private static final String TEST_STRING = "TestString";
private static final long DESCRIBE_STREAM_BACKOFF_TIME = 10L;
private static final long LIST_SHARDS_BACKOFF_TIME = 10L;
private static final int DESCRIBE_STREAM_RETRY_TIMES = 3;
private static final int LIST_SHARDS_RETRY_TIMES = 3;
private static final String NEXT_TOKEN = "NextToken";
@Mock
private AmazonKinesis mockClient;
@Mock
private AmazonDynamoDBStreamsAdapterClient mockDDBStreamClient;
@Mock
private AmazonDynamoDBStreamsAdapterClientChild mockDDBChildClient;
@Mock
private AWSCredentialsProvider mockCredentialsProvider;
@Mock
private GetShardIteratorResult shardIteratorResult;
@Mock
private DescribeStreamResult describeStreamResult;
@Mock
private StreamDescription streamDescription;
@Mock
private Shard shard;
@Mock
private KinesisClientLibConfiguration config;
private KinesisProxy proxy;
private KinesisProxy ddbProxy;
private KinesisProxy ddbChildProxy;
// Test shards for verifying.
private Set<String> shardIdSet;
private List<Shard> shards;
@Before
public void setUpTest() {
// Set up kinesis ddbProxy
when(config.getStreamName()).thenReturn(TEST_STRING);
when(config.getListShardsBackoffTimeInMillis()).thenReturn(LIST_SHARDS_BACKOFF_TIME);
when(config.getMaxListShardsRetryAttempts()).thenReturn(LIST_SHARDS_RETRY_TIMES);
when(config.getKinesisCredentialsProvider()).thenReturn(mockCredentialsProvider);
proxy = new KinesisProxy(config, mockClient);
ddbProxy = new KinesisProxy(TEST_STRING, mockCredentialsProvider, mockDDBStreamClient,
DESCRIBE_STREAM_BACKOFF_TIME, DESCRIBE_STREAM_RETRY_TIMES, LIST_SHARDS_BACKOFF_TIME,
LIST_SHARDS_RETRY_TIMES);
ddbChildProxy = new KinesisProxy(TEST_STRING, mockCredentialsProvider, mockDDBChildClient,
DESCRIBE_STREAM_BACKOFF_TIME, DESCRIBE_STREAM_RETRY_TIMES, LIST_SHARDS_BACKOFF_TIME,
LIST_SHARDS_RETRY_TIMES);
// Set up test shards
List<String> shardIds = Arrays.asList("shard-1", "shard-2", "shard-3", "shard-4");
shardIdSet = new HashSet<>(shardIds);
shards = shardIds.stream().map(shardId -> new Shard().shardId(shardId)).collect(Collectors.toList());
}
@Test
public void testGetShardListWithMoreDataAvailable() {
// Set up mock :
// First call describeStream returning response with first two shards in the list;
// Second call describeStream returning response with rest shards.
DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true);
DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false);
doReturn(responseWithMoreData).when(mockDDBStreamClient).describeStream(argThat(new IsRequestWithStartShardId(null)));
doReturn(responseFinal).when(mockDDBStreamClient)
.describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).shardId())));
Set<String> resultShardIdSets = ddbProxy.getAllShardIds();
assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSets));
}
@Test
public void testGetShardListWithLimitExceededException() {
// Set up mock :
// First call describeStream throwing LimitExceededException;
// Second call describeStream returning shards list.
DescribeStreamResult response = createGetStreamInfoResponse(shards, false);
doThrow(new LimitExceededException("Test Exception")).doReturn(response).when(mockDDBStreamClient)
.describeStream(argThat(new OldIsRequestWithStartShardId(null)));
Set<String> resultShardIdSet = ddbProxy.getAllShardIds();
assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSet));
}
@Test
public void testValidShardIteratorType() {
when(mockDDBStreamClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult);
String expectedShardIteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString();
ddbProxy.getIterator("Shard-001", expectedShardIteratorType, "1234");
verify(mockDDBStreamClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class))
.and(hasProperty("shardIteratorType", equalTo(expectedShardIteratorType)))));
}
@Test
public void testInvalidShardIteratorIsntChanged() {
when(mockDDBStreamClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult);
String expectedShardIteratorType = ShardIteratorType.AT_TIMESTAMP.toString();
ddbProxy.getIterator("Shard-001", expectedShardIteratorType, "1234");
verify(mockDDBStreamClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class))
.and(hasProperty("shardIteratorType", equalTo(expectedShardIteratorType)))));
}
@Test(expected = AmazonServiceException.class)
public void testNullShardIteratorType() throws Exception {
when(mockDDBStreamClient.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(new AmazonServiceException("expected null"));
String expectedShardIteratorType = null;
ddbProxy.getIterator("Shard-001", expectedShardIteratorType, "1234");
verify(mockDDBStreamClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class))
.and(hasProperty("shardIteratorType", nullValue(String.class)))));
}
@Test(expected = AmazonServiceException.class)
public void testGetStreamInfoFails() {
when(mockDDBStreamClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new AmazonServiceException("Test"));
try {
ddbProxy.getShardList();
} finally {
verify(mockDDBStreamClient).describeStream(any(DescribeStreamRequest.class));
}
}
@Test
public void testGetStreamInfoThrottledOnce() throws Exception {
when(mockDDBStreamClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new LimitExceededException("Test"))
.thenReturn(describeStreamResult);
when(describeStreamResult.getStreamDescription()).thenReturn(streamDescription);
when(streamDescription.getHasMoreShards()).thenReturn(false);
when(streamDescription.getStreamStatus()).thenReturn(StreamStatus.ACTIVE.name());
List<Shard> expectedShards = Collections.singletonList(shard);
when(streamDescription.getShards()).thenReturn(expectedShards);
List<Shard> actualShards = ddbProxy.getShardList();
assertThat(actualShards, equalTo(expectedShards));
verify(mockDDBStreamClient, times(2)).describeStream(any(DescribeStreamRequest.class));
verify(describeStreamResult, times(3)).getStreamDescription();
verify(streamDescription).getStreamStatus();
verify(streamDescription).isHasMoreShards();
}
@Test(expected = LimitExceededException.class)
public void testGetStreamInfoThrottledAll() throws Exception {
when(mockDDBStreamClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new LimitExceededException("Test"));
ddbProxy.getShardList();
}
@Test
public void testGetStreamInfoStoresOffset() throws Exception {
when(describeStreamResult.getStreamDescription()).thenReturn(streamDescription);
when(streamDescription.getStreamStatus()).thenReturn(StreamStatus.ACTIVE.name());
Shard shard1 = mock(Shard.class);
Shard shard2 = mock(Shard.class);
Shard shard3 = mock(Shard.class);
List<Shard> shardList1 = Collections.singletonList(shard1);
List<Shard> shardList2 = Collections.singletonList(shard2);
List<Shard> shardList3 = Collections.singletonList(shard3);
String shardId1 = "ShardId-0001";
String shardId2 = "ShardId-0002";
String shardId3 = "ShardId-0003";
when(shard1.shardId()).thenReturn(shardId1);
when(shard2.shardId()).thenReturn(shardId2);
when(shard3.shardId()).thenReturn(shardId3);
when(streamDescription.getShards()).thenReturn(shardList1).thenReturn(shardList2).thenReturn(shardList3);
when(streamDescription.isHasMoreShards()).thenReturn(true, true, false);
when(mockDDBStreamClient.describeStream(argThat(describeWithoutShardId()))).thenReturn(describeStreamResult);
when(mockDDBStreamClient.describeStream(argThat(describeWithShardId(shardId1))))
.thenThrow(new LimitExceededException("1"), new LimitExceededException("2"),
new LimitExceededException("3"))
.thenReturn(describeStreamResult);
when(mockDDBStreamClient.describeStream(argThat(describeWithShardId(shardId2)))).thenReturn(describeStreamResult);
boolean limitExceeded = false;
try {
ddbProxy.getShardList();
} catch (LimitExceededException le) {
limitExceeded = true;
}
assertThat(limitExceeded, equalTo(true));
List<Shard> actualShards = ddbProxy.getShardList();
List<Shard> expectedShards = Arrays.asList(shard1, shard2, shard3);
assertThat(actualShards, equalTo(expectedShards));
verify(mockDDBStreamClient).describeStream(argThat(describeWithoutShardId()));
verify(mockDDBStreamClient, times(4)).describeStream(argThat(describeWithShardId(shardId1)));
verify(mockDDBStreamClient).describeStream(argThat(describeWithShardId(shardId2)));
}
@Test
public void testListShardsWithMoreDataAvailable() {
ListShardsResult responseWithMoreData = new ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN);
ListShardsResult responseFinal = new ListShardsResult().withShards(shards.subList(2, shards.size())).withNextToken(null);
when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenReturn(responseWithMoreData);
when(mockClient.listShards(argThat(listShardsNextToken(NEXT_TOKEN)))).thenReturn(responseFinal);
Set<String> resultShardIdSets = proxy.getAllShardIds();
assertEquals(shardIdSet, resultShardIdSets);
}
@Test
public void testListShardsWithLimiteExceededException() {
ListShardsResult result = new ListShardsResult().withShards(shards);
when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(LimitExceededException.class).thenReturn(result);
Set <String> resultShardIdSet = proxy.getAllShardIds();
assertEquals(shardIdSet, resultShardIdSet);
}
@Test(expected = AmazonServiceException.class)
public void testListShardsFails() {
when(mockClient.listShards(any(ListShardsRequest.class))).thenThrow(AmazonServiceException.class);
try {
proxy.getShardList();
} finally {
verify(mockClient).listShards(any(ListShardsRequest.class));
}
}
@Test
public void testListShardsThrottledOnce() {
List<Shard> expected = Collections.singletonList(shard);
ListShardsResult result = new ListShardsResult().withShards(expected);
when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(LimitExceededException.class)
.thenReturn(result);
List<Shard> actualShards = proxy.getShardList();
assertEquals(expected, actualShards);
verify(mockClient, times(2)).listShards(argThat(initialListShardsRequestMatcher()));
}
@Test(expected = LimitExceededException.class)
public void testListShardsThrottledAll() {
when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(LimitExceededException.class);
proxy.getShardList();
}
@Test
public void testStreamNotInCorrectStatus() {
when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(ResourceInUseException.class);
assertNull(proxy.getShardList());
}
@Test
public void testGetShardListWithDDBChildClient() {
DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true);
DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false);
doReturn(responseWithMoreData).when(mockDDBChildClient).describeStream(argThat(new IsRequestWithStartShardId(null)));
doReturn(responseFinal).when(mockDDBChildClient)
.describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).shardId())));
Set<String> resultShardIdSets = ddbChildProxy.getAllShardIds();
assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSets));
}
private DescribeStreamResult createGetStreamInfoResponse(List<Shard> shards1, boolean isHasMoreShards) {
// Create stream description
StreamDescription description = new StreamDescription();
description.setHasMoreShards(isHasMoreShards);
description.setShards(shards1);
description.setStreamStatus(StreamStatus.ACTIVE);
// Create Describe Stream Result
DescribeStreamResult response = new DescribeStreamResult();
response.setStreamDescription(description);
return response;
}
private IsRequestWithStartShardId describeWithoutShardId() {
return describeWithShardId(null);
}
private IsRequestWithStartShardId describeWithShardId(String shardId) {
return new IsRequestWithStartShardId(shardId);
}
private static class IsRequestWithStartShardId extends TypeSafeDiagnosingMatcher<DescribeStreamRequest> {
private final String shardId;
public IsRequestWithStartShardId(String shardId) {
this.shardId = shardId;
}
@Override
protected boolean matchesSafely(DescribeStreamRequest item, Description mismatchDescription) {
if (shardId == null) {
if (item.getExclusiveStartShardId() != null) {
mismatchDescription.appendText("Expected starting shard id of null, but was ")
.appendValue(item.getExclusiveStartShardId());
return false;
}
} else if (!shardId.equals(item.getExclusiveStartShardId())) {
mismatchDescription.appendValue(shardId).appendText(" doesn't match expected ")
.appendValue(item.getExclusiveStartShardId());
return false;
}
return true;
}
@Override
public void describeTo(Description description) {
description.appendText("A DescribeStreamRequest with a starting shard if of ").appendValue(shardId);
}
}
// Matcher for testing describe stream request with specific start shard ID.
private static class OldIsRequestWithStartShardId extends ArgumentMatcher<DescribeStreamRequest> {
private final String shardId;
public OldIsRequestWithStartShardId(String shardId) {
this.shardId = shardId;
}
@Override
public boolean matches(Object request) {
String startShardId = ((DescribeStreamRequest) request).getExclusiveStartShardId();
// If startShardId equals to null, shardId should also be null.
if (startShardId == null) {
return shardId == null;
}
return startShardId.equals(shardId);
}
}
private static ListShardsRequestMatcher initialListShardsRequestMatcher() {
return new ListShardsRequestMatcher(null, null);
}
private static ListShardsRequestMatcher listShardsNextToken(final String nextToken) {
return new ListShardsRequestMatcher(null, nextToken);
}
@AllArgsConstructor
private static class ListShardsRequestMatcher extends TypeSafeDiagnosingMatcher<ListShardsRequest> {
private final String shardId;
private final String nextToken;
@Override
protected boolean matchesSafely(final ListShardsRequest listShardsRequest, final Description description) {
if (shardId == null) {
if (StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) {
description.appendText("Expected ExclusiveStartShardId to be null, but was ")
.appendValue(listShardsRequest.getExclusiveStartShardId());
return false;
}
} else {
if (!shardId.equals(listShardsRequest.getExclusiveStartShardId())) {
description.appendText("Expected shardId: ").appendValue(shardId)
.appendText(" doesn't match actual shardId: ")
.appendValue(listShardsRequest.getExclusiveStartShardId());
return false;
}
}
if (StringUtils.isNotEmpty(listShardsRequest.getNextToken())) {
if (StringUtils.isNotEmpty(listShardsRequest.getStreamName()) || StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) {
return false;
}
if (!listShardsRequest.getNextToken().equals(nextToken)) {
description.appendText("Found nextToken: ").appendValue(listShardsRequest.getNextToken())
.appendText(" when it was supposed to be null.");
return false;
}
} else {
return nextToken == null;
}
return true;
}
@Override
public void describeTo(final Description description) {
description.appendText("A ListShardsRequest with a shardId: ").appendValue(shardId)
.appendText(" and empty nextToken");
}
}*/
}

View file

@ -1,219 +0,0 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.proxies.util;
import software.amazon.awssdk.services.kinesis.model.Shard;
import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.util.List;
/**
* Temporary util class for generating data in a local file (used by KinesisLocalFileProxy).
*/
public class KinesisLocalFileDataCreator {
/**
* Partition key prefix - also referenced in KinesisLocalFileProxyTest.
*/
// public static final String PARTITION_KEY_PREFIX = "PK_";
// private static final String FILE_NAME_SUFFIX = ".dat";
// private static final long RAND_SEED_VALUE = 1092387456L;
// Used to cap the size of the random "hole" in sequence numbers.
// private static final int NUM_BITS = 3;
// private static Random randomGenerator = new Random(RAND_SEED_VALUE);
// private static final int PARTITION_KEY_LENGTH = 10;
// private static final int DATA_LENGTH = 40;
/**
* Starting timestamp - also referenced in KinesisLocalFileProxyTest.
*/
// public static final long STARTING_TIMESTAMP = 1462345678910L;
/**
* This is used to allow few records to have the same timestamps (to mimic real life scenarios).
* Records 5n-1 and 5n will have the same timestamp (n > 0).
*/
// private static final int DIVISOR = 5;
private KinesisLocalFileDataCreator() {
}
/** Creates a temp file (in default temp file location) with fake Kinesis data records.
* This method does not support resharding use cases.
* @param numShards Number of shards
* @param shardIdPrefix Prefix for shardIds (1, 2, ..., N will be added at the end to create shardIds)
* @param numRecordsPerShard Number of records to generate per shard
* @param startingSequenceNumber Sequence numbers in the generated data will be >= this number
* @param fileNamePrefix Prefix of the filename
* @return File created with the fake Kinesis records.
* @throws IOException Thrown if there are issues creating the file.
*/
public static File generateTempDataFile(
int numShards,
String shardIdPrefix,
int numRecordsPerShard,
BigInteger startingSequenceNumber,
String fileNamePrefix)
throws IOException {
List<Shard> shardList = createShardList(numShards, shardIdPrefix, startingSequenceNumber);
return generateTempDataFile(shardList, numRecordsPerShard, fileNamePrefix);
}
/**
* Creates a temp file (in default temp file location) with fake Kinesis data records.
* Records will be put in all shards.
* @param fileNamePrefix Prefix for the name of the temp file
* @param shardList List of shards (we use the shardId and sequenceNumberRange fields)
* @param numRecordsPerShard Num records per shard (the shard sequenceNumberRange should be large enough
* for us to allow these many records with some "holes")
* @return File with stream data filled in
* @throws IOException Thrown if there are issues creating/updating the file
*/
public static File generateTempDataFile(List<Shard> shardList, int numRecordsPerShard, String fileNamePrefix)
throws IOException {
// File file = File.createTempFile(fileNamePrefix, FILE_NAME_SUFFIX);
// try (BufferedWriter fileWriter = new BufferedWriter(
// new OutputStreamWriter(new FileOutputStream(file), StandardCharsets.UTF_8))) {
// ObjectMapper objectMapper = new ObjectMapper();
// String serializedShardList =
// objectMapper.writeValueAsString(new KinesisLocalFileProxy.SerializedShardList(shardList));
// fileWriter.write(serializedShardList);
// fileWriter.newLine();
// BigInteger sequenceNumberIncrement = new BigInteger("0");
// long timestamp = STARTING_TIMESTAMP;
// for (int i = 0; i < numRecordsPerShard; i++) {
// for (Shard shard : shardList) {
// BigInteger sequenceNumber =
// new BigInteger(shard.getSequenceNumberRange().getStartingSequenceNumber()).add(
// sequenceNumberIncrement);
// String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber();
// BigInteger maxSequenceNumber = KinesisLocalFileProxy.MAX_SEQUENCE_NUMBER;
// if (endingSequenceNumber != null) {
// maxSequenceNumber = new BigInteger(endingSequenceNumber);
// }
// if (maxSequenceNumber.compareTo(sequenceNumber) != 1) {
// throw new IllegalArgumentException("Not enough space in shard");
// }
// String partitionKey =
// PARTITION_KEY_PREFIX + shard.getShardId() + generateRandomString(PARTITION_KEY_LENGTH);
// String data = generateRandomString(DATA_LENGTH);
// Allow few records to have the same timestamps (to mimic real life scenarios).
// timestamp = (i % DIVISOR == 0) ? timestamp : timestamp + 1;
// String line = shard.getShardId() + "," + sequenceNumber + "," + partitionKey + "," + data + ","
// + timestamp;
//
// fileWriter.write(line);
// fileWriter.newLine();
// sequenceNumberIncrement = sequenceNumberIncrement.add(BigInteger.ONE);
// sequenceNumberIncrement = sequenceNumberIncrement.add(new BigInteger(NUM_BITS, randomGenerator));
// }
// }
// }
return null;
}
/** Helper method to create a list of shards (which can then be used to generate data files).
* @param numShards Number of shards
* @param shardIdPrefix Prefix for the shardIds
* @param startingSequenceNumber Starting sequence number for all the shards
* @return List of shards (with no reshard events).
*/
public static List<Shard> createShardList(int numShards, String shardIdPrefix, BigInteger startingSequenceNumber) {
// List<Shard> shards = new ArrayList<Shard>(numShards);
//
// SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
// sequenceNumberRange.setStartingSequenceNumber(startingSequenceNumber.toString());
// sequenceNumberRange.setEndingSequenceNumber(null);
// BigInteger perShardHashKeyRange =
// KinesisLocalFileProxy.MAX_HASHKEY_VALUE.divide(new BigInteger(Integer.toString(numShards)));
// BigInteger hashKeyRangeStart = new BigInteger("0");
// for (int i = 0; i < numShards; i++) {
// Shard shard = new Shard();
// shard.setShardId(shardIdPrefix + i);
// shard.setSequenceNumberRange(sequenceNumberRange);
// BigInteger hashKeyRangeEnd = hashKeyRangeStart.add(perShardHashKeyRange);
// HashKeyRange hashKeyRange = new HashKeyRange();
// hashKeyRange.setStartingHashKey(hashKeyRangeStart.toString());
// hashKeyRange.setEndingHashKey(hashKeyRangeEnd.toString());
// shards.add(shard);
// }
//
return null;
}
/** Generates a random string of specified length.
* @param length String of length will be generated
* @return Random generated string
*/
private static String generateRandomString(int length) {
// StringBuffer str = new StringBuffer();
// final int startingCharAsciiValue = 97;
// final int numChars = 26;
// for (int i = 0; i < length; i++) {
// str.append((char) (randomGenerator.nextInt(numChars - 1) + startingCharAsciiValue));
// }
return "";
}
/** Creates a new temp file populated with fake Kinesis data records.
* @param args Expects 5 args: numShards, shardPrefix, numRecordsPerShard, startingSequenceNumber, fileNamePrefix
*/
// CHECKSTYLE:OFF MagicNumber
// CHECKSTYLE:IGNORE UncommentedMain FOR NEXT 2 LINES
public static void main(String[] args) {
// int numShards = 1;
// String shardIdPrefix = "shardId";
// int numRecordsPerShard = 17;
// BigInteger startingSequenceNumber = new BigInteger("99");
// String fileNamePrefix = "kinesisFakeRecords";
//
// try {
// if ((args.length != 0) && (args.length != 5)) {
// Temporary util code, so not providing detailed usage feedback.
// System.out.println("Unexpected number of arguments.");
// System.exit(0);
// }
//
// if (args.length == 5) {
// numShards = Integer.parseInt(args[0]);
// shardIdPrefix = args[1];
// numRecordsPerShard = Integer.parseInt(args[2]);
// startingSequenceNumber = new BigInteger(args[3]);
// fileNamePrefix = args[4];
// }
//
// File file = KinesisLocalFileDataCreator.generateTempDataFile(
// numShards,
// shardIdPrefix,
// numRecordsPerShard,
// startingSequenceNumber,
// fileNamePrefix);
// System.out.println("Created fake kinesis records in file: " + file.getAbsolutePath());
// } catch (Exception e) {
// CHECKSTYLE:IGNORE IllegalCatch FOR NEXT -1 LINES
// System.out.println("Caught Exception: " + e);
// }
}
// CHECKSTYLE:ON MagicNumber
}