diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java index 7ee718d0..ef772be0 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java @@ -1,5 +1,6 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.model.GetRecordsResult; import lombok.extern.apachecommons.CommonsLog; @@ -17,10 +18,14 @@ public class BlockingGetRecordsCache implements GetRecordsCache { this.maxRecordsPerCall = maxRecordsPerCall; this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; } - + @Override - public GetRecordsResult getNextResult() { - return getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); + public ProcessRecordsInput getNextResult() { + GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); + ProcessRecordsInput processRecordsInput = new ProcessRecordsInput() + .withRecords(getRecordsResult.getRecords()) + .withMillisBehindLatest(getRecordsResult.getMillisBehindLatest()); + return processRecordsInput; } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java index 88df34fb..ead4723e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java @@ -1,8 +1,6 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import com.amazonaws.services.kinesis.model.GetRecordsResult; - -import java.util.Collections; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; /** * This class is used as a cache for Prefetching data from Kinesis. @@ -14,7 +12,7 @@ public interface GetRecordsCache { * * @return The next set of records. */ - GetRecordsResult getNextResult(); + ProcessRecordsInput getNextResult(); void shutdown(); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index a60b5d3e..e0ceb6de 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -1,8 +1,10 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.time.Instant; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.model.GetRecordsResult; import lombok.NonNull; @@ -18,20 +20,20 @@ import lombok.extern.apachecommons.CommonsLog; */ @CommonsLog public class PrefetchGetRecordsCache implements GetRecordsCache { - private LinkedBlockingQueue getRecordsResultQueue; + LinkedBlockingQueue getRecordsResultQueue; private int maxSize; private int maxByteSize; private int maxRecordsCount; private final int maxRecordsPerCall; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final ExecutorService executorService; - + private PrefetchCounters prefetchCounters; - + private boolean started = false; public PrefetchGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount, - final int maxRecordsPerCall, @NonNull final DataFetchingStrategy dataFetchingStrategy, + final int maxRecordsPerCall, @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, @NonNull final ExecutorService executorService) { this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; @@ -40,11 +42,11 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { this.maxByteSize = maxByteSize; this.maxRecordsCount = maxRecordsCount; this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize); - prefetchCounters = new PrefetchCounters(); + this.prefetchCounters = new PrefetchCounters(); this.executorService = executorService; } - - private void start() { + + void start() { if (!started) { log.info("Starting prefetching thread."); executorService.execute(new DefaultGetRecordsCacheDaemon()); @@ -53,13 +55,14 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { } @Override - public GetRecordsResult getNextResult() { + public ProcessRecordsInput getNextResult() { if (!started) { start(); } - GetRecordsResult result = null; + ProcessRecordsInput result = null; try { result = getRecordsResultQueue.take(); + result.withCacheExitTime(Instant.now()); prefetchCounters.removed(result); } catch (InterruptedException e) { log.error("Interrupted while getting records from the cache", e); @@ -69,18 +72,26 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { @Override public void shutdown() { - executorService.shutdown(); + executorService.shutdownNow(); } - + private class DefaultGetRecordsCacheDaemon implements Runnable { @Override public void run() { while (true) { + if (Thread.interrupted()) { + log.warn("Prefetch thread was interrupted."); + break; + } if (prefetchCounters.byteSize < maxByteSize && prefetchCounters.size < maxRecordsCount) { try { GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); - getRecordsResultQueue.put(getRecordsResult); - prefetchCounters.added(getRecordsResult); + ProcessRecordsInput processRecordsInput = new ProcessRecordsInput() + .withRecords(getRecordsResult.getRecords()) + .withMillisBehindLatest(getRecordsResult.getMillisBehindLatest()) + .withCacheEntryTime(Instant.now()); + getRecordsResultQueue.put(processRecordsInput); + prefetchCounters.added(processRecordsInput); } catch (InterruptedException e) { log.error("Interrupted while adding records to the cache", e); } @@ -88,28 +99,28 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { } } } - + private class PrefetchCounters { private volatile long size = 0; private volatile long byteSize = 0; - - public void added(final GetRecordsResult result) { + + public void added(final ProcessRecordsInput result) { size += getSize(result); byteSize += getByteSize(result); } - - public void removed(final GetRecordsResult result) { + + public void removed(final ProcessRecordsInput result) { size -= getSize(result); byteSize -= getByteSize(result); } - - private long getSize(final GetRecordsResult result) { + + private long getSize(final ProcessRecordsInput result) { return result.getRecords().size(); } - - private long getByteSize(final GetRecordsResult result) { + + private long getByteSize(final ProcessRecordsInput result) { return result.getRecords().stream().mapToLong(record -> record.getData().array().length).sum(); } } - + } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java index f617e5e8..9c7c3f5a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java @@ -14,10 +14,15 @@ */ package com.amazonaws.services.kinesis.clientlibrary.types; +import java.time.Duration; +import java.time.Instant; import java.util.List; + import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.model.Record; +import lombok.Getter; + /** * Container for the parameters to the IRecordProcessor's * {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#processRecords( @@ -25,6 +30,10 @@ import com.amazonaws.services.kinesis.model.Record; */ public class ProcessRecordsInput { + @Getter + private Instant cacheEntryTime; + @Getter + private Instant cacheExitTime; private List records; private IRecordProcessorCheckpointer checkpointer; private Long millisBehindLatest; @@ -96,4 +105,21 @@ public class ProcessRecordsInput { this.millisBehindLatest = millisBehindLatest; return this; } + + public ProcessRecordsInput withCacheEntryTime(Instant cacheEntryTime) { + this.cacheEntryTime = cacheEntryTime; + return this; + } + + public ProcessRecordsInput withCacheExitTime(Instant cacheExitTime) { + this.cacheExitTime = cacheExitTime; + return this; + } + + public Duration getTimeSpentInCache() { + if (cacheEntryTime == null || cacheExitTime == null) { + return Duration.ZERO; + } + return Duration.between(cacheEntryTime, cacheExitTime); + } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java new file mode 100644 index 00000000..67afe09e --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.when; + +import java.time.Duration; +import java.util.List; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; + +/** + * Test class for the BlockingGetRecordsCache class. + */ +@RunWith(MockitoJUnitRunner.class) +public class BlockingGetRecordsCacheTest { + private static final int MAX_RECORDS_PER_COUNT = 10_000; + + @Mock + private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + @Mock + private GetRecordsResult getRecordsResult; + @Mock + private List records; + + private BlockingGetRecordsCache blockingGetRecordsCache; + + @Before + public void setup() { + when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_COUNT))).thenReturn(getRecordsResult); + when(getRecordsResult.getRecords()).thenReturn(records); + + blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy); + } + + @Test + public void testGetNextRecords() { + ProcessRecordsInput result = blockingGetRecordsCache.getNextResult(); + + assertEquals(result.getRecords(), records); + assertNull(result.getCacheEntryTime()); + assertNull(result.getCacheExitTime()); + assertEquals(result.getTimeSpentInCache(), Duration.ZERO); + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java index 2597d76b..4b013424 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java @@ -27,10 +27,6 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.model.ResourceNotFoundException; -import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint; @@ -39,6 +35,10 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.ShardIteratorType; /** * Unit tests for KinesisDataFetcher. @@ -189,6 +189,23 @@ public class KinesisDataFetcherTest { // Test shard has reached the end Assert.assertTrue("Shard should reach the end", dataFetcher.isShardEndReached()); } + + @Test + public void testNonNullGetRecords() { + String nextIterator = "TestIterator"; + int maxRecords = 100; + + KinesisProxy mockProxy = mock(KinesisProxy.class); + doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords); + + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO); + dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); + + GetRecordsResult getRecordsResult = dataFetcher.getRecords(maxRecords); + + Assert.assertNotNull(getRecordsResult); + Assert.assertTrue(getRecordsResult.getRecords().isEmpty()); + } private void testInitializeAndFetch(String iteratorType, String seqNo, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java new file mode 100644 index 00000000..28458fb4 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java @@ -0,0 +1,195 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.IntStream; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; + +/** + * Test class for the PrefetchGetRecordsCache class. + */ +@RunWith(MockitoJUnitRunner.class) +public class PrefetchGetRecordsCacheTest { + private static final int SIZE_512_KB = 512 * 1024; + private static final int SIZE_1_MB = 2 * SIZE_512_KB; + private static final int MAX_RECORDS_PER_CALL = 10000; + private static final int MAX_SIZE = 5; + private static final int MAX_RECORDS_COUNT = 15000; + + @Mock + private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + @Mock + private GetRecordsResult getRecordsResult; + @Mock + private Record record; + + private List records; + private ExecutorService executorService; + private LinkedBlockingQueue spyQueue; + private PrefetchGetRecordsCache getRecordsCache; + + @Before + public void setup() { + executorService = spy(Executors.newFixedThreadPool(1)); + getRecordsCache = new PrefetchGetRecordsCache( + MAX_SIZE, + 3 * SIZE_1_MB, + MAX_RECORDS_COUNT, + MAX_RECORDS_PER_CALL, + getRecordsRetrievalStrategy, + executorService); + spyQueue = spy(getRecordsCache.getRecordsResultQueue); + records = spy(new ArrayList<>()); + + when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(getRecordsResult); + when(getRecordsResult.getRecords()).thenReturn(records); + } + + @Test + public void testGetRecords() { + when(records.size()).thenReturn(1000); + when(record.getData()).thenReturn(createByteBufferWithSize(SIZE_512_KB)); + + records.add(record); + records.add(record); + records.add(record); + records.add(record); + records.add(record); + + ProcessRecordsInput result = getRecordsCache.getNextResult(); + + assertEquals(result.getRecords(), records); + + verify(executorService).execute(any()); + verify(getRecordsRetrievalStrategy, atLeast(1)).getRecords(eq(MAX_RECORDS_PER_CALL)); + } + + @Test + public void testFullCacheByteSize() { + when(records.size()).thenReturn(500); + when(record.getData()).thenReturn(createByteBufferWithSize(SIZE_1_MB)); + + records.add(record); + + getRecordsCache.start(); + + // Sleep for a few seconds for the cache to fill up. + sleep(2000); + + verify(getRecordsRetrievalStrategy, times(3)).getRecords(eq(MAX_RECORDS_PER_CALL)); + assertEquals(spyQueue.size(), 3); + } + + @Test + public void testFullCacheRecordsCount() { + int recordsSize = 4500; + when(records.size()).thenReturn(recordsSize); + + getRecordsCache.start(); + + sleep(2000); + + int callRate = (int) Math.ceil((double) MAX_RECORDS_COUNT/recordsSize); + verify(getRecordsRetrievalStrategy, times(callRate)).getRecords(MAX_RECORDS_PER_CALL); + assertEquals(spyQueue.size(), callRate); + assertTrue(callRate < MAX_SIZE); + } + + @Test + public void testFullCacheSize() { + int recordsSize = 200; + when(records.size()).thenReturn(recordsSize); + + getRecordsCache.start(); + + // Sleep for a few seconds for the cache to fill up. + sleep(2000); + + verify(getRecordsRetrievalStrategy, times(MAX_SIZE + 1)).getRecords(eq(MAX_RECORDS_PER_CALL)); + assertEquals(spyQueue.size(), MAX_SIZE); + } + + @Test + public void testMultipleCacheCalls() { + int recordsSize = 20; + when(record.getData()).thenReturn(createByteBufferWithSize(1024)); + + IntStream.range(0, recordsSize).forEach(i -> records.add(record)); + + ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult(); + + verify(executorService).execute(any()); + assertEquals(processRecordsInput.getRecords(), records); + assertNotNull(processRecordsInput.getCacheEntryTime()); + assertNotNull(processRecordsInput.getCacheExitTime()); + + sleep(2000); + + ProcessRecordsInput processRecordsInput2 = getRecordsCache.getNextResult(); + assertNotEquals(processRecordsInput, processRecordsInput2); + assertEquals(processRecordsInput2.getRecords(), records); + assertNotEquals(processRecordsInput2.getTimeSpentInCache(), Duration.ZERO); + + assertTrue(spyQueue.size() <= MAX_SIZE); + } + + @After + public void shutdown() { + getRecordsCache.shutdown(); + verify(executorService).shutdownNow(); + } + + private void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) {} + } + + private ByteBuffer createByteBufferWithSize(int size) { + ByteBuffer byteBuffer = ByteBuffer.allocate(size); + byteBuffer.put(new byte[size]); + return byteBuffer; + } +}