From 131b1e4b0f3e373975cdaa97fa3b1a8ef76aa47e Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Thu, 20 Sep 2018 03:30:35 +0530 Subject: [PATCH] Cleaning up tests (#408) * Deleting all unused tests for KinesisProxy --- .../proxies/KinesisLocalFileProxy.java | 433 ------------------ .../proxies/KinesisLocalFileProxyFactory.java | 58 --- .../proxies/KinesisProxyTest.java | 408 ----------------- .../util/KinesisLocalFileDataCreator.java | 219 --------- 4 files changed, 1118 deletions(-) delete mode 100644 amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java delete mode 100644 amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxyFactory.java delete mode 100644 amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java delete mode 100644 amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java deleted file mode 100644 index e8bb6b24..00000000 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java +++ /dev/null @@ -1,433 +0,0 @@ -/* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package com.amazonaws.services.kinesis.clientlibrary.proxies; - -import java.io.IOException; - -import lombok.extern.slf4j.Slf4j; - -/** - * This is a (temporary) test utility class, to mimic Kinesis without having to integrate with Alpha. - * In future, we should consider moving this to the Kinesis client/sampleApp package (if useful to - * other Kinesis clients). - */ -@Slf4j -public class KinesisLocalFileProxy { -/* - *//** - * Fields in the local file and their position in a line. - *//* - public enum LocalFileFields { - *//** Shard identifier. *//* - SHARD_ID(0), - *//** Sequence number (assumed unique across shards. *//* - SEQUENCE_NUMBER(1), - *//** Partition key associated with data record. *//* - PARTITION_KEY(2), - *//** Data. *//* - DATA(3), - *//** Approximate arrival timestamp. *//* - APPROXIMATE_ARRIVAL_TIMESTAMP(4); - - private final int position; - - LocalFileFields(int position) { - this.position = position; - } - - *//** - * @return Position of the field in the line. - *//* - public int getPosition() { - return position; - } - }; - - private static final String ITERATOR_DELIMITER = ":"; - - private static final int NUM_FIELDS_IN_FILE = LocalFileFields.values().length; - - private final Map> shardedDataRecords = new HashMap>(); - - private List shardList; - - // Ids of shards that are closed - used to return a null iterator in getRecords after the last record - private Set closedShards = new HashSet(); - - private static final int EXPONENT = 128; - - *//** - * Max value of the hashed partition key (2^128-1). Useful for constructing shards for a stream. - *//* - public static final BigInteger MAX_HASHKEY_VALUE = new BigInteger("2").pow(EXPONENT).subtract(BigInteger.ONE); - - *//** - * Max value of a sequence number (2^128 -1). Useful for defining sequence number range for a shard. - *//* - public static final BigInteger MAX_SEQUENCE_NUMBER = new BigInteger("2").pow(EXPONENT).subtract(BigInteger.ONE); - - *//** - * @param fileName File with data records (one per line). - * File format (shardId, sequenceNumber, partitionKey, dataRecord). - * @throws IOException IOException - *//* - public KinesisLocalFileProxy(String fileName) throws IOException { - super(); - populateDataRecordsFromFile(fileName); - } - - private void populateDataRecordsFromFile(String file) throws IOException { - try (BufferedReader in = new BufferedReader( - new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))) { - Charset charset = Charset.forName("UTF-8"); - CharsetEncoder encoder = charset.newEncoder(); - String str; - str = in.readLine(); - if (str != null) { - ObjectMapper objectMapper = new ObjectMapper(); - SerializedShardList shards = objectMapper.readValue(str, SerializedShardList.class); - shardList = shards.getShardList(); - } - if (shardList == null) { - shardList = new ArrayList(); - } - - // Populate shardIds of shards that have an ending sequence number (and which != maxSeqNum). - // GetRecords will return a null iterator for these after all data has been returned. - for (Shard shard : shardList) { - SequenceNumberRange range = shard.getSequenceNumberRange(); - if ((range != null) && (range.getEndingSequenceNumber() != null)) { - BigInteger endingSequenceNumber = new BigInteger(range.getEndingSequenceNumber()); - if (endingSequenceNumber.compareTo(MAX_SEQUENCE_NUMBER) != 0) { - closedShards.add(shard.shardId()); - } - } - shardedDataRecords.put(shard.shardId(), new ArrayList()); - } - - while ((str = in.readLine()) != null) { - String[] strArr = str.split(","); - if (strArr.length != NUM_FIELDS_IN_FILE) { - throw new InvalidArgumentException("Unexpected input in file." - + "Expected format (shardId, sequenceNumber, partitionKey, dataRecord, timestamp)"); - } - String shardId = strArr[LocalFileFields.SHARD_ID.getPosition()]; - Record record = new Record(); - record.setSequenceNumber(strArr[LocalFileFields.SEQUENCE_NUMBER.getPosition()]); - record.setPartitionKey(strArr[LocalFileFields.PARTITION_KEY.getPosition()]); - ByteBuffer byteBuffer = encoder.encode(CharBuffer.wrap(strArr[LocalFileFields.DATA.getPosition()])); - record.setData(byteBuffer); - Date timestamp = - new Date(Long.parseLong(strArr[LocalFileFields.APPROXIMATE_ARRIVAL_TIMESTAMP.getPosition()])); - record.setApproximateArrivalTimestamp(timestamp); - List shardRecords = shardedDataRecords.get(shardId); - if (shardRecords == null) { - shardRecords = new ArrayList(); - } - shardRecords.add(record); - shardedDataRecords.put(shardId, shardRecords); - } - } - } - - *//* - * (non-Javadoc) - * - * @see com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy#getStreamInfo() - *//* - @Override - public DescribeStreamResult getStreamInfo(String startShardId) throws ResourceNotFoundException { - assert false : "getStreamInfo is not implemented."; - return null; - } - - @Override - public Set getAllShardIds() throws ResourceNotFoundException { - Set shardIds = new HashSet(); - if (shardedDataRecords != null) { - shardIds.addAll(shardedDataRecords.keySet()); - } - - return shardIds; - } - - *//** - * Note, this method has package level access solely for testing purposes. - *//* - static String serializeIterator(String shardId, String sequenceNumber) { - return String.format("%s%s%s", shardId, ITERATOR_DELIMITER, sequenceNumber); - } - - *//** - * Container class for the return tuple of deserializeIterator. - *//* - // CHECKSTYLE:IGNORE VisibilityModifier FOR NEXT 10 LINES - static class IteratorInfo { - public String shardId; - - public String sequenceNumber; - - public IteratorInfo(String shardId, String sequenceNumber) { - this.shardId = shardId; - this.sequenceNumber = sequenceNumber; - } - } - - *//** - * Deserialize our iterator - used by test cases to inspect returned iterators. - * - * @param iterator - * @return iteratorInfo - *//* - static IteratorInfo deserializeIterator(String iterator) { - String[] splits = iterator.split(ITERATOR_DELIMITER); - return new IteratorInfo(splits[0], splits[1]); - } - - *//** - * {@inheritDoc} - *//* - @Override - public String getIterator(String shardId, String iteratorEnum, String sequenceNumber) - throws ResourceNotFoundException, InvalidArgumentException { - *//* - * If we don't have records in this shard, any iterator will return the empty list. Using a - * sequence number of 1 on an empty shard will give this behavior. - *//* - List shardRecords = shardedDataRecords.get(shardId); - if (shardRecords == null) { - throw new ResourceNotFoundException(shardId + " does not exist"); - } - if (shardRecords.isEmpty()) { - return serializeIterator(shardId, "1"); - } - - if (ShardIteratorType.LATEST.toString().equals(iteratorEnum)) { - *//* - * If we do have records, LATEST should return an iterator that can be used to read the - * last record. Our iterators are inclusive for convenience. - *//* - Record last = shardRecords.get(shardRecords.size() - 1); - return serializeIterator(shardId, last.sequenceNumber()); - } else if (ShardIteratorType.TRIM_HORIZON.toString().equals(iteratorEnum)) { - return serializeIterator(shardId, shardRecords.get(0).sequenceNumber()); - } else if (ShardIteratorType.AT_SEQUENCE_NUMBER.toString().equals(iteratorEnum)) { - return serializeIterator(shardId, sequenceNumber); - } else if (ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString().equals(iteratorEnum)) { - BigInteger num = new BigInteger(sequenceNumber); - num = num.add(BigInteger.ONE); - return serializeIterator(shardId, num.toString()); - } else { - throw new IllegalArgumentException("IteratorEnum value was invalid: " + iteratorEnum); - } - } - - *//** - * {@inheritDoc} - *//* - @Override - public String getIterator(String shardId, String iteratorEnum) - throws ResourceNotFoundException, InvalidArgumentException { - *//* - * If we don't have records in this shard, any iterator will return the empty list. Using a - * sequence number of 1 on an empty shard will give this behavior. - *//* - List shardRecords = shardedDataRecords.get(shardId); - if (shardRecords == null) { - throw new ResourceNotFoundException(shardId + " does not exist"); - } - if (shardRecords.isEmpty()) { - return serializeIterator(shardId, "1"); - } - - final String serializedIterator; - if (ShardIteratorType.LATEST.toString().equals(iteratorEnum)) { - *//* - * If we do have records, LATEST should return an iterator that can be used to read the - * last record. Our iterators are inclusive for convenience. - *//* - Record last = shardRecords.get(shardRecords.size() - 1); - serializedIterator = serializeIterator(shardId, last.sequenceNumber()); - } else if (ShardIteratorType.TRIM_HORIZON.toString().equals(iteratorEnum)) { - serializedIterator = serializeIterator(shardId, shardRecords.get(0).sequenceNumber()); - } else { - throw new IllegalArgumentException("IteratorEnum value was invalid: " + iteratorEnum); - } - return serializedIterator; - } - - *//** - * {@inheritDoc} - *//* - @Override - public String getIterator(String shardId, Date timestamp) - throws ResourceNotFoundException, InvalidArgumentException { - *//* - * If we don't have records in this shard, any iterator will return the empty list. Using a - * sequence number of 1 on an empty shard will give this behavior. - *//* - List shardRecords = shardedDataRecords.get(shardId); - if (shardRecords == null) { - throw new ResourceNotFoundException(shardId + " does not exist"); - } - if (shardRecords.isEmpty()) { - return serializeIterator(shardId, "1"); - } - - final String serializedIterator; - if (timestamp != null) { - String seqNumAtTimestamp = findSequenceNumberAtTimestamp(shardRecords, timestamp); - serializedIterator = serializeIterator(shardId, seqNumAtTimestamp); - } else { - throw new IllegalArgumentException("Timestamp must be specified for AT_TIMESTAMP iterator"); - } - return serializedIterator; - } - - private String findSequenceNumberAtTimestamp(final List shardRecords, final Date timestamp) { - for (Record rec : shardRecords) { - if (rec.getApproximateArrivalTimestamp().getTime() >= timestamp.getTime()) { - return rec.sequenceNumber(); - } - } - return null; - } - - *//* - * (non-Javadoc) - * - * @see com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy#get(java.nio.ByteBuffer, int) - *//* - @Override - public GetRecordsResult get(String serializedKinesisIterator, int maxRecords) - throws ResourceNotFoundException, InvalidArgumentException, ExpiredIteratorException { - IteratorInfo iterator = deserializeIterator(serializedKinesisIterator); - - BigInteger startingPosition = new BigInteger(iterator.sequenceNumber); - BigInteger lastRecordsSeqNo = BigInteger.ONE; - List recordsToReturn = new ArrayList(); - List shardRecords = shardedDataRecords.get(iterator.shardId); - if (shardRecords == null) { - throw new ResourceNotFoundException(iterator.shardId + " does not exist"); - } - - boolean isHasMoreShards = false; - - for (int i = 0; i < shardRecords.size(); i++) { - Record record = shardRecords.get(i); - BigInteger recordSequenceNumber = new BigInteger(record.sequenceNumber()); - // update lastRecordsSeqNo so if we return no records, it will be the seqNo of the last record. - lastRecordsSeqNo = recordSequenceNumber; - if (recordSequenceNumber.compareTo(startingPosition) >= 0) { - // Set endIndex (of sublist) to cap at either maxRecords or end of list. - int endIndex = Math.min(i + maxRecords, shardRecords.size()); - recordsToReturn.addAll(shardRecords.subList(i, endIndex)); - - lastRecordsSeqNo = new BigInteger(shardRecords.get(endIndex - 1).sequenceNumber()); - if (endIndex < shardRecords.size()) { - isHasMoreShards = true; - } - - break; - } - } - - GetRecordsResult response = new GetRecordsResult(); - response.setRecords(recordsToReturn); - - // Set iterator only if the shard is not closed. - if (isHasMoreShards || (!closedShards.contains(iterator.shardId))) { - *//* - * Use the sequence number of the last record returned + 1 to compute the next iterator. - *//* - response.setNextShardIterator(serializeIterator(iterator.shardId, lastRecordsSeqNo.add(BigInteger.ONE) - .toString())); - log.debug("Returning a non null iterator for shard {}", iterator.shardId); - } else { - log.info("Returning null iterator for shard {}", iterator.shardId); - } - - return response; - } - - *//** - * {@inheritDoc} - *//* - @Override - public PutRecordResult put(String exclusiveMinimumSequenceNumber, - String explicitHashKey, - String partitionKey, - ByteBuffer data) throws ResourceNotFoundException, InvalidArgumentException { - PutRecordResult output = new PutRecordResult(); - - BigInteger startingPosition = BigInteger.ONE; - - if (exclusiveMinimumSequenceNumber != null) { - startingPosition = new BigInteger(exclusiveMinimumSequenceNumber).add(BigInteger.ONE); - } - - output.setSequenceNumber(startingPosition.toString()); - return output; - } - - *//** - * {@inheritDoc} - *//* - @Override - public List getShardList() throws ResourceNotFoundException { - List shards = new LinkedList(); - shards.addAll(shardList); - return shards; - } - - *//** - * Used for serializing/deserializing the shard list to the file. - *//* - public static class SerializedShardList { - - private List shardList = new LinkedList(); - - *//** - * Public to enable Jackson object mapper serialization. - *//* - public SerializedShardList() { - } - - *//** - * @param shardList List of shards for the stream. - *//* - public SerializedShardList(List shardList) { - this.shardList.addAll(shardList); - } - - *//** - * public to enable Jackson object mapper serialization. - * - * @return shardList - *//* - public List getShardList() { - return shardList; - } - - *//** - * public to enable Jackson object mapper deserialization. - * - * @param shardList List of shards - *//* - public void setShardList(List shardList) { - this.shardList = shardList; - } - }*/ -} diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxyFactory.java b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxyFactory.java deleted file mode 100644 index 85e67b00..00000000 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxyFactory.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package com.amazonaws.services.kinesis.clientlibrary.proxies; - -import java.io.IOException; - -/** Factory for KinesisProxy objects that use a local file for data. Useful for testing purposes. - * - */ -public class KinesisLocalFileProxyFactory { - - /*private static final int DEFAULT_NUM_SHARDS = 3; - private static final String DEFAULT_SHARD_ID_PREFIX = "ShardId-"; - private static final int DEFAULT_NUM_RECORDS_PER_SHARD = 10; - private static final BigInteger DEFAULT_STARTING_SEQUENCE_NUMBER = BigInteger.ZERO; - - private static final String DEFAULT_TEST_PROXY_FILE = "defaultKinesisProxyLocalFile"; - - private IKinesisProxy testKinesisProxy; - - *//** - * @param fileName File to be used for stream data. - * If the file exists then it is expected to contain information for creating a test proxy object. - * If the file does not exist then a temporary file containing default values for a test proxy object - * will be created and used. - - * @throws IOException This will be thrown if we can't read/create the data file. - *//* - public KinesisLocalFileProxyFactory(String fileName) throws IOException { - File f = new File(fileName); - if (!f.exists()) { - f = KinesisLocalFileDataCreator.generateTempDataFile( - DEFAULT_NUM_SHARDS, DEFAULT_SHARD_ID_PREFIX, DEFAULT_NUM_RECORDS_PER_SHARD, - DEFAULT_STARTING_SEQUENCE_NUMBER, DEFAULT_TEST_PROXY_FILE); - } - testKinesisProxy = new KinesisLocalFileProxy(f.getAbsolutePath()); - } - - *//* (non-Javadoc) - * @see com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxyFactory#getProxy(java.lang.String) - *//* - @Override - public IKinesisProxy getProxy(String streamARN) { - return testKinesisProxy; - }*/ -} diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java deleted file mode 100644 index 2b50b9d0..00000000 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java +++ /dev/null @@ -1,408 +0,0 @@ -/* - * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package com.amazonaws.services.kinesis.clientlibrary.proxies; - -public class KinesisProxyTest { - /*private static final String TEST_STRING = "TestString"; - private static final long DESCRIBE_STREAM_BACKOFF_TIME = 10L; - private static final long LIST_SHARDS_BACKOFF_TIME = 10L; - private static final int DESCRIBE_STREAM_RETRY_TIMES = 3; - private static final int LIST_SHARDS_RETRY_TIMES = 3; - private static final String NEXT_TOKEN = "NextToken"; - - @Mock - private AmazonKinesis mockClient; - @Mock - private AmazonDynamoDBStreamsAdapterClient mockDDBStreamClient; - @Mock - private AmazonDynamoDBStreamsAdapterClientChild mockDDBChildClient; - @Mock - private AWSCredentialsProvider mockCredentialsProvider; - @Mock - private GetShardIteratorResult shardIteratorResult; - @Mock - private DescribeStreamResult describeStreamResult; - @Mock - private StreamDescription streamDescription; - @Mock - private Shard shard; - @Mock - private KinesisClientLibConfiguration config; - - private KinesisProxy proxy; - private KinesisProxy ddbProxy; - private KinesisProxy ddbChildProxy; - - // Test shards for verifying. - private Set shardIdSet; - private List shards; - - @Before - public void setUpTest() { - // Set up kinesis ddbProxy - when(config.getStreamName()).thenReturn(TEST_STRING); - when(config.getListShardsBackoffTimeInMillis()).thenReturn(LIST_SHARDS_BACKOFF_TIME); - when(config.getMaxListShardsRetryAttempts()).thenReturn(LIST_SHARDS_RETRY_TIMES); - when(config.getKinesisCredentialsProvider()).thenReturn(mockCredentialsProvider); - - proxy = new KinesisProxy(config, mockClient); - ddbProxy = new KinesisProxy(TEST_STRING, mockCredentialsProvider, mockDDBStreamClient, - DESCRIBE_STREAM_BACKOFF_TIME, DESCRIBE_STREAM_RETRY_TIMES, LIST_SHARDS_BACKOFF_TIME, - LIST_SHARDS_RETRY_TIMES); - ddbChildProxy = new KinesisProxy(TEST_STRING, mockCredentialsProvider, mockDDBChildClient, - DESCRIBE_STREAM_BACKOFF_TIME, DESCRIBE_STREAM_RETRY_TIMES, LIST_SHARDS_BACKOFF_TIME, - LIST_SHARDS_RETRY_TIMES); - - // Set up test shards - List shardIds = Arrays.asList("shard-1", "shard-2", "shard-3", "shard-4"); - shardIdSet = new HashSet<>(shardIds); - shards = shardIds.stream().map(shardId -> new Shard().shardId(shardId)).collect(Collectors.toList()); - } - - @Test - public void testGetShardListWithMoreDataAvailable() { - // Set up mock : - // First call describeStream returning response with first two shards in the list; - // Second call describeStream returning response with rest shards. - DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true); - DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false); - doReturn(responseWithMoreData).when(mockDDBStreamClient).describeStream(argThat(new IsRequestWithStartShardId(null))); - doReturn(responseFinal).when(mockDDBStreamClient) - .describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).shardId()))); - - Set resultShardIdSets = ddbProxy.getAllShardIds(); - assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSets)); - } - - @Test - public void testGetShardListWithLimitExceededException() { - // Set up mock : - // First call describeStream throwing LimitExceededException; - // Second call describeStream returning shards list. - DescribeStreamResult response = createGetStreamInfoResponse(shards, false); - doThrow(new LimitExceededException("Test Exception")).doReturn(response).when(mockDDBStreamClient) - .describeStream(argThat(new OldIsRequestWithStartShardId(null))); - - Set resultShardIdSet = ddbProxy.getAllShardIds(); - assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSet)); - } - - @Test - public void testValidShardIteratorType() { - when(mockDDBStreamClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult); - String expectedShardIteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(); - ddbProxy.getIterator("Shard-001", expectedShardIteratorType, "1234"); - - verify(mockDDBStreamClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class)) - .and(hasProperty("shardIteratorType", equalTo(expectedShardIteratorType))))); - } - - @Test - public void testInvalidShardIteratorIsntChanged() { - when(mockDDBStreamClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult); - String expectedShardIteratorType = ShardIteratorType.AT_TIMESTAMP.toString(); - ddbProxy.getIterator("Shard-001", expectedShardIteratorType, "1234"); - - verify(mockDDBStreamClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class)) - .and(hasProperty("shardIteratorType", equalTo(expectedShardIteratorType))))); - } - - @Test(expected = AmazonServiceException.class) - public void testNullShardIteratorType() throws Exception { - when(mockDDBStreamClient.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(new AmazonServiceException("expected null")); - String expectedShardIteratorType = null; - ddbProxy.getIterator("Shard-001", expectedShardIteratorType, "1234"); - - verify(mockDDBStreamClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class)) - .and(hasProperty("shardIteratorType", nullValue(String.class))))); - } - - @Test(expected = AmazonServiceException.class) - public void testGetStreamInfoFails() { - when(mockDDBStreamClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new AmazonServiceException("Test")); - try { - ddbProxy.getShardList(); - } finally { - verify(mockDDBStreamClient).describeStream(any(DescribeStreamRequest.class)); - } - } - - @Test - public void testGetStreamInfoThrottledOnce() throws Exception { - when(mockDDBStreamClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new LimitExceededException("Test")) - .thenReturn(describeStreamResult); - when(describeStreamResult.getStreamDescription()).thenReturn(streamDescription); - when(streamDescription.getHasMoreShards()).thenReturn(false); - when(streamDescription.getStreamStatus()).thenReturn(StreamStatus.ACTIVE.name()); - List expectedShards = Collections.singletonList(shard); - when(streamDescription.getShards()).thenReturn(expectedShards); - - List actualShards = ddbProxy.getShardList(); - - assertThat(actualShards, equalTo(expectedShards)); - - verify(mockDDBStreamClient, times(2)).describeStream(any(DescribeStreamRequest.class)); - verify(describeStreamResult, times(3)).getStreamDescription(); - verify(streamDescription).getStreamStatus(); - verify(streamDescription).isHasMoreShards(); - } - - @Test(expected = LimitExceededException.class) - public void testGetStreamInfoThrottledAll() throws Exception { - when(mockDDBStreamClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new LimitExceededException("Test")); - - ddbProxy.getShardList(); - } - - @Test - public void testGetStreamInfoStoresOffset() throws Exception { - when(describeStreamResult.getStreamDescription()).thenReturn(streamDescription); - when(streamDescription.getStreamStatus()).thenReturn(StreamStatus.ACTIVE.name()); - Shard shard1 = mock(Shard.class); - Shard shard2 = mock(Shard.class); - Shard shard3 = mock(Shard.class); - List shardList1 = Collections.singletonList(shard1); - List shardList2 = Collections.singletonList(shard2); - List shardList3 = Collections.singletonList(shard3); - - String shardId1 = "ShardId-0001"; - String shardId2 = "ShardId-0002"; - String shardId3 = "ShardId-0003"; - - when(shard1.shardId()).thenReturn(shardId1); - when(shard2.shardId()).thenReturn(shardId2); - when(shard3.shardId()).thenReturn(shardId3); - - when(streamDescription.getShards()).thenReturn(shardList1).thenReturn(shardList2).thenReturn(shardList3); - when(streamDescription.isHasMoreShards()).thenReturn(true, true, false); - when(mockDDBStreamClient.describeStream(argThat(describeWithoutShardId()))).thenReturn(describeStreamResult); - - when(mockDDBStreamClient.describeStream(argThat(describeWithShardId(shardId1)))) - .thenThrow(new LimitExceededException("1"), new LimitExceededException("2"), - new LimitExceededException("3")) - .thenReturn(describeStreamResult); - - when(mockDDBStreamClient.describeStream(argThat(describeWithShardId(shardId2)))).thenReturn(describeStreamResult); - - boolean limitExceeded = false; - try { - ddbProxy.getShardList(); - } catch (LimitExceededException le) { - limitExceeded = true; - } - assertThat(limitExceeded, equalTo(true)); - List actualShards = ddbProxy.getShardList(); - List expectedShards = Arrays.asList(shard1, shard2, shard3); - - assertThat(actualShards, equalTo(expectedShards)); - - verify(mockDDBStreamClient).describeStream(argThat(describeWithoutShardId())); - verify(mockDDBStreamClient, times(4)).describeStream(argThat(describeWithShardId(shardId1))); - verify(mockDDBStreamClient).describeStream(argThat(describeWithShardId(shardId2))); - - } - - @Test - public void testListShardsWithMoreDataAvailable() { - ListShardsResult responseWithMoreData = new ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN); - ListShardsResult responseFinal = new ListShardsResult().withShards(shards.subList(2, shards.size())).withNextToken(null); - when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenReturn(responseWithMoreData); - when(mockClient.listShards(argThat(listShardsNextToken(NEXT_TOKEN)))).thenReturn(responseFinal); - - Set resultShardIdSets = proxy.getAllShardIds(); - assertEquals(shardIdSet, resultShardIdSets); - } - - @Test - public void testListShardsWithLimiteExceededException() { - ListShardsResult result = new ListShardsResult().withShards(shards); - when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(LimitExceededException.class).thenReturn(result); - - Set resultShardIdSet = proxy.getAllShardIds(); - assertEquals(shardIdSet, resultShardIdSet); - } - - @Test(expected = AmazonServiceException.class) - public void testListShardsFails() { - when(mockClient.listShards(any(ListShardsRequest.class))).thenThrow(AmazonServiceException.class); - try { - proxy.getShardList(); - } finally { - verify(mockClient).listShards(any(ListShardsRequest.class)); - } - } - - @Test - public void testListShardsThrottledOnce() { - List expected = Collections.singletonList(shard); - ListShardsResult result = new ListShardsResult().withShards(expected); - when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(LimitExceededException.class) - .thenReturn(result); - - List actualShards = proxy.getShardList(); - - assertEquals(expected, actualShards); - verify(mockClient, times(2)).listShards(argThat(initialListShardsRequestMatcher())); - } - - @Test(expected = LimitExceededException.class) - public void testListShardsThrottledAll() { - when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(LimitExceededException.class); - proxy.getShardList(); - } - - @Test - public void testStreamNotInCorrectStatus() { - when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(ResourceInUseException.class); - assertNull(proxy.getShardList()); - } - - @Test - public void testGetShardListWithDDBChildClient() { - DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true); - DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false); - doReturn(responseWithMoreData).when(mockDDBChildClient).describeStream(argThat(new IsRequestWithStartShardId(null))); - doReturn(responseFinal).when(mockDDBChildClient) - .describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).shardId()))); - - Set resultShardIdSets = ddbChildProxy.getAllShardIds(); - assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSets)); - } - - private DescribeStreamResult createGetStreamInfoResponse(List shards1, boolean isHasMoreShards) { - // Create stream description - StreamDescription description = new StreamDescription(); - description.setHasMoreShards(isHasMoreShards); - description.setShards(shards1); - description.setStreamStatus(StreamStatus.ACTIVE); - - // Create Describe Stream Result - DescribeStreamResult response = new DescribeStreamResult(); - response.setStreamDescription(description); - return response; - } - - private IsRequestWithStartShardId describeWithoutShardId() { - return describeWithShardId(null); - } - - private IsRequestWithStartShardId describeWithShardId(String shardId) { - return new IsRequestWithStartShardId(shardId); - } - - private static class IsRequestWithStartShardId extends TypeSafeDiagnosingMatcher { - - private final String shardId; - - public IsRequestWithStartShardId(String shardId) { - this.shardId = shardId; - } - - @Override - protected boolean matchesSafely(DescribeStreamRequest item, Description mismatchDescription) { - if (shardId == null) { - if (item.getExclusiveStartShardId() != null) { - mismatchDescription.appendText("Expected starting shard id of null, but was ") - .appendValue(item.getExclusiveStartShardId()); - return false; - } - } else if (!shardId.equals(item.getExclusiveStartShardId())) { - mismatchDescription.appendValue(shardId).appendText(" doesn't match expected ") - .appendValue(item.getExclusiveStartShardId()); - return false; - } - - return true; - } - - @Override - public void describeTo(Description description) { - description.appendText("A DescribeStreamRequest with a starting shard if of ").appendValue(shardId); - } - } - // Matcher for testing describe stream request with specific start shard ID. - - private static class OldIsRequestWithStartShardId extends ArgumentMatcher { - private final String shardId; - - public OldIsRequestWithStartShardId(String shardId) { - this.shardId = shardId; - } - - @Override - public boolean matches(Object request) { - String startShardId = ((DescribeStreamRequest) request).getExclusiveStartShardId(); - // If startShardId equals to null, shardId should also be null. - if (startShardId == null) { - return shardId == null; - } - return startShardId.equals(shardId); - } - } - - private static ListShardsRequestMatcher initialListShardsRequestMatcher() { - return new ListShardsRequestMatcher(null, null); - } - - private static ListShardsRequestMatcher listShardsNextToken(final String nextToken) { - return new ListShardsRequestMatcher(null, nextToken); - } - - @AllArgsConstructor - private static class ListShardsRequestMatcher extends TypeSafeDiagnosingMatcher { - private final String shardId; - private final String nextToken; - - @Override - protected boolean matchesSafely(final ListShardsRequest listShardsRequest, final Description description) { - if (shardId == null) { - if (StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) { - description.appendText("Expected ExclusiveStartShardId to be null, but was ") - .appendValue(listShardsRequest.getExclusiveStartShardId()); - return false; - } - } else { - if (!shardId.equals(listShardsRequest.getExclusiveStartShardId())) { - description.appendText("Expected shardId: ").appendValue(shardId) - .appendText(" doesn't match actual shardId: ") - .appendValue(listShardsRequest.getExclusiveStartShardId()); - return false; - } - } - - if (StringUtils.isNotEmpty(listShardsRequest.getNextToken())) { - if (StringUtils.isNotEmpty(listShardsRequest.getStreamName()) || StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) { - return false; - } - - if (!listShardsRequest.getNextToken().equals(nextToken)) { - description.appendText("Found nextToken: ").appendValue(listShardsRequest.getNextToken()) - .appendText(" when it was supposed to be null."); - return false; - } - } else { - return nextToken == null; - } - return true; - } - - @Override - public void describeTo(final Description description) { - description.appendText("A ListShardsRequest with a shardId: ").appendValue(shardId) - .appendText(" and empty nextToken"); - } - }*/ - -} diff --git a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java b/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java deleted file mode 100644 index 6136bbf3..00000000 --- a/amazon-kinesis-client/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package com.amazonaws.services.kinesis.clientlibrary.proxies.util; - -import software.amazon.awssdk.services.kinesis.model.Shard; - -import java.io.File; -import java.io.IOException; -import java.math.BigInteger; -import java.util.List; - - -/** - * Temporary util class for generating data in a local file (used by KinesisLocalFileProxy). - */ -public class KinesisLocalFileDataCreator { - - /** - * Partition key prefix - also referenced in KinesisLocalFileProxyTest. - */ -// public static final String PARTITION_KEY_PREFIX = "PK_"; - -// private static final String FILE_NAME_SUFFIX = ".dat"; - -// private static final long RAND_SEED_VALUE = 1092387456L; - // Used to cap the size of the random "hole" in sequence numbers. -// private static final int NUM_BITS = 3; -// private static Random randomGenerator = new Random(RAND_SEED_VALUE); - -// private static final int PARTITION_KEY_LENGTH = 10; -// private static final int DATA_LENGTH = 40; - - /** - * Starting timestamp - also referenced in KinesisLocalFileProxyTest. - */ -// public static final long STARTING_TIMESTAMP = 1462345678910L; - - /** - * This is used to allow few records to have the same timestamps (to mimic real life scenarios). - * Records 5n-1 and 5n will have the same timestamp (n > 0). - */ -// private static final int DIVISOR = 5; - - private KinesisLocalFileDataCreator() { - } - - /** Creates a temp file (in default temp file location) with fake Kinesis data records. - * This method does not support resharding use cases. - * @param numShards Number of shards - * @param shardIdPrefix Prefix for shardIds (1, 2, ..., N will be added at the end to create shardIds) - * @param numRecordsPerShard Number of records to generate per shard - * @param startingSequenceNumber Sequence numbers in the generated data will be >= this number - * @param fileNamePrefix Prefix of the filename - * @return File created with the fake Kinesis records. - * @throws IOException Thrown if there are issues creating the file. - */ - public static File generateTempDataFile( - int numShards, - String shardIdPrefix, - int numRecordsPerShard, - BigInteger startingSequenceNumber, - String fileNamePrefix) - throws IOException { - List shardList = createShardList(numShards, shardIdPrefix, startingSequenceNumber); - return generateTempDataFile(shardList, numRecordsPerShard, fileNamePrefix); - } - - /** - * Creates a temp file (in default temp file location) with fake Kinesis data records. - * Records will be put in all shards. - * @param fileNamePrefix Prefix for the name of the temp file - * @param shardList List of shards (we use the shardId and sequenceNumberRange fields) - * @param numRecordsPerShard Num records per shard (the shard sequenceNumberRange should be large enough - * for us to allow these many records with some "holes") - * @return File with stream data filled in - * @throws IOException Thrown if there are issues creating/updating the file - */ - public static File generateTempDataFile(List shardList, int numRecordsPerShard, String fileNamePrefix) - throws IOException { -// File file = File.createTempFile(fileNamePrefix, FILE_NAME_SUFFIX); -// try (BufferedWriter fileWriter = new BufferedWriter( -// new OutputStreamWriter(new FileOutputStream(file), StandardCharsets.UTF_8))) { -// ObjectMapper objectMapper = new ObjectMapper(); -// String serializedShardList = -// objectMapper.writeValueAsString(new KinesisLocalFileProxy.SerializedShardList(shardList)); -// fileWriter.write(serializedShardList); -// fileWriter.newLine(); -// BigInteger sequenceNumberIncrement = new BigInteger("0"); -// long timestamp = STARTING_TIMESTAMP; -// for (int i = 0; i < numRecordsPerShard; i++) { -// for (Shard shard : shardList) { -// BigInteger sequenceNumber = -// new BigInteger(shard.getSequenceNumberRange().getStartingSequenceNumber()).add( -// sequenceNumberIncrement); -// String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber(); -// BigInteger maxSequenceNumber = KinesisLocalFileProxy.MAX_SEQUENCE_NUMBER; -// if (endingSequenceNumber != null) { -// maxSequenceNumber = new BigInteger(endingSequenceNumber); -// } -// if (maxSequenceNumber.compareTo(sequenceNumber) != 1) { -// throw new IllegalArgumentException("Not enough space in shard"); -// } -// String partitionKey = -// PARTITION_KEY_PREFIX + shard.getShardId() + generateRandomString(PARTITION_KEY_LENGTH); -// String data = generateRandomString(DATA_LENGTH); - - // Allow few records to have the same timestamps (to mimic real life scenarios). -// timestamp = (i % DIVISOR == 0) ? timestamp : timestamp + 1; -// String line = shard.getShardId() + "," + sequenceNumber + "," + partitionKey + "," + data + "," -// + timestamp; -// -// fileWriter.write(line); -// fileWriter.newLine(); -// sequenceNumberIncrement = sequenceNumberIncrement.add(BigInteger.ONE); -// sequenceNumberIncrement = sequenceNumberIncrement.add(new BigInteger(NUM_BITS, randomGenerator)); -// } -// } -// } - return null; - } - - /** Helper method to create a list of shards (which can then be used to generate data files). - * @param numShards Number of shards - * @param shardIdPrefix Prefix for the shardIds - * @param startingSequenceNumber Starting sequence number for all the shards - * @return List of shards (with no reshard events). - */ - public static List createShardList(int numShards, String shardIdPrefix, BigInteger startingSequenceNumber) { -// List shards = new ArrayList(numShards); -// -// SequenceNumberRange sequenceNumberRange = new SequenceNumberRange(); -// sequenceNumberRange.setStartingSequenceNumber(startingSequenceNumber.toString()); -// sequenceNumberRange.setEndingSequenceNumber(null); -// BigInteger perShardHashKeyRange = -// KinesisLocalFileProxy.MAX_HASHKEY_VALUE.divide(new BigInteger(Integer.toString(numShards))); -// BigInteger hashKeyRangeStart = new BigInteger("0"); -// for (int i = 0; i < numShards; i++) { -// Shard shard = new Shard(); -// shard.setShardId(shardIdPrefix + i); -// shard.setSequenceNumberRange(sequenceNumberRange); -// BigInteger hashKeyRangeEnd = hashKeyRangeStart.add(perShardHashKeyRange); -// HashKeyRange hashKeyRange = new HashKeyRange(); -// hashKeyRange.setStartingHashKey(hashKeyRangeStart.toString()); -// hashKeyRange.setEndingHashKey(hashKeyRangeEnd.toString()); -// shards.add(shard); -// } -// - return null; - } - - /** Generates a random string of specified length. - * @param length String of length will be generated - * @return Random generated string - */ - private static String generateRandomString(int length) { -// StringBuffer str = new StringBuffer(); -// final int startingCharAsciiValue = 97; -// final int numChars = 26; -// for (int i = 0; i < length; i++) { -// str.append((char) (randomGenerator.nextInt(numChars - 1) + startingCharAsciiValue)); -// } - return ""; - } - - /** Creates a new temp file populated with fake Kinesis data records. - * @param args Expects 5 args: numShards, shardPrefix, numRecordsPerShard, startingSequenceNumber, fileNamePrefix - */ - // CHECKSTYLE:OFF MagicNumber - // CHECKSTYLE:IGNORE UncommentedMain FOR NEXT 2 LINES - public static void main(String[] args) { -// int numShards = 1; -// String shardIdPrefix = "shardId"; -// int numRecordsPerShard = 17; -// BigInteger startingSequenceNumber = new BigInteger("99"); -// String fileNamePrefix = "kinesisFakeRecords"; -// -// try { -// if ((args.length != 0) && (args.length != 5)) { -// Temporary util code, so not providing detailed usage feedback. -// System.out.println("Unexpected number of arguments."); -// System.exit(0); -// } -// -// if (args.length == 5) { -// numShards = Integer.parseInt(args[0]); -// shardIdPrefix = args[1]; -// numRecordsPerShard = Integer.parseInt(args[2]); -// startingSequenceNumber = new BigInteger(args[3]); -// fileNamePrefix = args[4]; -// } -// -// File file = KinesisLocalFileDataCreator.generateTempDataFile( -// numShards, -// shardIdPrefix, -// numRecordsPerShard, -// startingSequenceNumber, -// fileNamePrefix); -// System.out.println("Created fake kinesis records in file: " + file.getAbsolutePath()); -// } catch (Exception e) { -// CHECKSTYLE:IGNORE IllegalCatch FOR NEXT -1 LINES -// System.out.println("Caught Exception: " + e); -// } - - } - // CHECKSTYLE:ON MagicNumber - -}