diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java index 7ee718d0..5220c6ae 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java @@ -1,5 +1,21 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.model.GetRecordsResult; import lombok.extern.apachecommons.CommonsLog; @@ -17,14 +33,27 @@ public class BlockingGetRecordsCache implements GetRecordsCache { this.maxRecordsPerCall = maxRecordsPerCall; this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; } - + @Override - public GetRecordsResult getNextResult() { - return getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); + public void start() { + // + // Nothing to do here + // + } + + @Override + public ProcessRecordsInput getNextResult() { + GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); + ProcessRecordsInput processRecordsInput = new ProcessRecordsInput() + .withRecords(getRecordsResult.getRecords()) + .withMillisBehindLatest(getRecordsResult.getMillisBehindLatest()); + return processRecordsInput; } @Override public void shutdown() { + // // Nothing to do here. + // } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java index 88df34fb..d08ec285 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java @@ -1,20 +1,41 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import com.amazonaws.services.kinesis.model.GetRecordsResult; - -import java.util.Collections; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; /** * This class is used as a cache for Prefetching data from Kinesis. */ public interface GetRecordsCache { + /** + * This method calls the start behavior on the cache, if available. + */ + void start(); + /** * This method returns the next set of records from the Cache if present, or blocks the request till it gets the * next set of records back from Kinesis. * * @return The next set of records. */ - GetRecordsResult getNextResult(); - + ProcessRecordsInput getNextResult(); + + /** + * This method calls the shutdown behavior on the cache, if available. + */ void shutdown(); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index a60b5d3e..979c5b1a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -1,8 +1,25 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.time.Instant; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.model.GetRecordsResult; import lombok.NonNull; @@ -18,20 +35,20 @@ import lombok.extern.apachecommons.CommonsLog; */ @CommonsLog public class PrefetchGetRecordsCache implements GetRecordsCache { - private LinkedBlockingQueue getRecordsResultQueue; + LinkedBlockingQueue getRecordsResultQueue; private int maxSize; private int maxByteSize; private int maxRecordsCount; private final int maxRecordsPerCall; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final ExecutorService executorService; - + private PrefetchCounters prefetchCounters; - + private boolean started = false; public PrefetchGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount, - final int maxRecordsPerCall, @NonNull final DataFetchingStrategy dataFetchingStrategy, + final int maxRecordsPerCall, @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, @NonNull final ExecutorService executorService) { this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; @@ -40,11 +57,12 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { this.maxByteSize = maxByteSize; this.maxRecordsCount = maxRecordsCount; this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize); - prefetchCounters = new PrefetchCounters(); + this.prefetchCounters = new PrefetchCounters(); this.executorService = executorService; } - - private void start() { + + @Override + public void start() { if (!started) { log.info("Starting prefetching thread."); executorService.execute(new DefaultGetRecordsCacheDaemon()); @@ -53,13 +71,13 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { } @Override - public GetRecordsResult getNextResult() { + public ProcessRecordsInput getNextResult() { if (!started) { - start(); + throw new IllegalStateException("Threadpool in the cache was not started, make sure to call start on the cache"); } - GetRecordsResult result = null; + ProcessRecordsInput result = null; try { - result = getRecordsResultQueue.take(); + result = getRecordsResultQueue.take().withCacheExitTime(Instant.now()); prefetchCounters.removed(result); } catch (InterruptedException e) { log.error("Interrupted while getting records from the cache", e); @@ -69,47 +87,59 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { @Override public void shutdown() { - executorService.shutdown(); + executorService.shutdownNow(); } - + private class DefaultGetRecordsCacheDaemon implements Runnable { @Override public void run() { while (true) { - if (prefetchCounters.byteSize < maxByteSize && prefetchCounters.size < maxRecordsCount) { + if (Thread.interrupted()) { + log.warn("Prefetch thread was interrupted."); + break; + } + if (prefetchCounters.shouldGetNewRecords()) { try { GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); - getRecordsResultQueue.put(getRecordsResult); - prefetchCounters.added(getRecordsResult); + ProcessRecordsInput processRecordsInput = new ProcessRecordsInput() + .withRecords(getRecordsResult.getRecords()) + .withMillisBehindLatest(getRecordsResult.getMillisBehindLatest()) + .withCacheEntryTime(Instant.now()); + getRecordsResultQueue.put(processRecordsInput); + prefetchCounters.added(processRecordsInput); } catch (InterruptedException e) { - log.error("Interrupted while adding records to the cache", e); + log.info("Thread was interrupted, indicating shutdown was called on the cache", e); } } } } } - + private class PrefetchCounters { - private volatile long size = 0; - private volatile long byteSize = 0; - - public void added(final GetRecordsResult result) { + private long size = 0; + private long byteSize = 0; + + public synchronized void added(final ProcessRecordsInput result) { size += getSize(result); byteSize += getByteSize(result); } - - public void removed(final GetRecordsResult result) { + + public synchronized void removed(final ProcessRecordsInput result) { size -= getSize(result); byteSize -= getByteSize(result); } - - private long getSize(final GetRecordsResult result) { + + private long getSize(final ProcessRecordsInput result) { return result.getRecords().size(); } - - private long getByteSize(final GetRecordsResult result) { + + private long getByteSize(final ProcessRecordsInput result) { return result.getRecords().stream().mapToLong(record -> record.getData().array().length).sum(); } + + public synchronized boolean shouldGetNewRecords() { + return size < maxRecordsCount && byteSize < maxByteSize; + } } - + } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java index bd960c08..362af357 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java @@ -14,17 +14,25 @@ */ package com.amazonaws.services.kinesis.clientlibrary.types; +import java.time.Duration; +import java.time.Instant; import java.util.List; + import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.model.Record; +import lombok.Getter; + /** * Container for the parameters to the IRecordProcessor's * {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#processRecords( * ProcessRecordsInput processRecordsInput) processRecords} method. */ public class ProcessRecordsInput { - + @Getter + private Instant cacheEntryTime; + @Getter + private Instant cacheExitTime; private List records; private IRecordProcessorCheckpointer checkpointer; private Long millisBehindLatest; @@ -96,4 +104,21 @@ public class ProcessRecordsInput { this.millisBehindLatest = millisBehindLatest; return this; } + + public ProcessRecordsInput withCacheEntryTime(Instant cacheEntryTime) { + this.cacheEntryTime = cacheEntryTime; + return this; + } + + public ProcessRecordsInput withCacheExitTime(Instant cacheExitTime) { + this.cacheExitTime = cacheExitTime; + return this; + } + + public Duration getTimeSpentInCache() { + if (cacheEntryTime == null || cacheExitTime == null) { + return Duration.ZERO; + } + return Duration.between(cacheEntryTime, cacheExitTime); + } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java new file mode 100644 index 00000000..0636baea --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java @@ -0,0 +1,83 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.when; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; + +/** + * Test class for the BlockingGetRecordsCache class. + */ +@RunWith(MockitoJUnitRunner.class) +public class BlockingGetRecordsCacheTest { + private static final int MAX_RECORDS_PER_COUNT = 10_000; + + @Mock + private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + @Mock + private GetRecordsResult getRecordsResult; + + private List records; + private BlockingGetRecordsCache blockingGetRecordsCache; + + @Before + public void setup() { + records = new ArrayList<>(); + blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy); + + when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_COUNT))).thenReturn(getRecordsResult); + when(getRecordsResult.getRecords()).thenReturn(records); + } + + @Test + public void testGetNextRecordsWithNoRecords() { + ProcessRecordsInput result = blockingGetRecordsCache.getNextResult(); + + assertEquals(result.getRecords(), records); + assertNull(result.getCacheEntryTime()); + assertNull(result.getCacheExitTime()); + assertEquals(result.getTimeSpentInCache(), Duration.ZERO); + } + + @Test + public void testGetNextRecordsWithRecords() { + Record record = new Record(); + records.add(record); + records.add(record); + records.add(record); + records.add(record); + + ProcessRecordsInput result = blockingGetRecordsCache.getNextResult(); + + assertEquals(result.getRecords(), records); + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java index 2b89c3c8..7e8937a6 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java @@ -27,10 +27,6 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.model.ResourceNotFoundException; -import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint; @@ -39,6 +35,10 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.ShardIteratorType; /** * Unit tests for KinesisDataFetcher. @@ -189,6 +189,23 @@ public class KinesisDataFetcherTest { // Test shard has reached the end Assert.assertTrue("Shard should reach the end", dataFetcher.isShardEndReached()); } + + @Test + public void testNonNullGetRecords() { + String nextIterator = "TestIterator"; + int maxRecords = 100; + + KinesisProxy mockProxy = mock(KinesisProxy.class); + doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords); + + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO); + dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); + + GetRecordsResult getRecordsResult = dataFetcher.getRecords(maxRecords); + + Assert.assertNotNull(getRecordsResult); + Assert.assertTrue(getRecordsResult.getRecords().isEmpty()); + } private void testInitializeAndFetch(String iteratorType, String seqNo, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java new file mode 100644 index 00000000..eeb8ff1d --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java @@ -0,0 +1,203 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.IntStream; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; + +/** + * Test class for the PrefetchGetRecordsCache class. + */ +@RunWith(MockitoJUnitRunner.class) +public class PrefetchGetRecordsCacheTest { + private static final int SIZE_512_KB = 512 * 1024; + private static final int SIZE_1_MB = 2 * SIZE_512_KB; + private static final int MAX_RECORDS_PER_CALL = 10000; + private static final int MAX_SIZE = 5; + private static final int MAX_RECORDS_COUNT = 15000; + + @Mock + private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + @Mock + private GetRecordsResult getRecordsResult; + @Mock + private Record record; + + private List records; + private ExecutorService executorService; + private LinkedBlockingQueue spyQueue; + private PrefetchGetRecordsCache getRecordsCache; + + @Before + public void setup() { + executorService = spy(Executors.newFixedThreadPool(1)); + getRecordsCache = new PrefetchGetRecordsCache( + MAX_SIZE, + 3 * SIZE_1_MB, + MAX_RECORDS_COUNT, + MAX_RECORDS_PER_CALL, + getRecordsRetrievalStrategy, + executorService); + spyQueue = spy(getRecordsCache.getRecordsResultQueue); + records = spy(new ArrayList<>()); + + when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(getRecordsResult); + when(getRecordsResult.getRecords()).thenReturn(records); + } + + @Test + public void testGetRecords() { + when(records.size()).thenReturn(1000); + when(record.getData()).thenReturn(createByteBufferWithSize(SIZE_512_KB)); + + records.add(record); + records.add(record); + records.add(record); + records.add(record); + records.add(record); + + getRecordsCache.start(); + ProcessRecordsInput result = getRecordsCache.getNextResult(); + + assertEquals(result.getRecords(), records); + + verify(executorService).execute(any()); + verify(getRecordsRetrievalStrategy, atLeast(1)).getRecords(eq(MAX_RECORDS_PER_CALL)); + } + + @Test + public void testFullCacheByteSize() { + when(records.size()).thenReturn(500); + when(record.getData()).thenReturn(createByteBufferWithSize(SIZE_1_MB)); + + records.add(record); + + getRecordsCache.start(); + + // Sleep for a few seconds for the cache to fill up. + sleep(2000); + + verify(getRecordsRetrievalStrategy, times(3)).getRecords(eq(MAX_RECORDS_PER_CALL)); + assertEquals(spyQueue.size(), 3); + } + + @Test + public void testFullCacheRecordsCount() { + int recordsSize = 4500; + when(records.size()).thenReturn(recordsSize); + + getRecordsCache.start(); + + sleep(2000); + + int callRate = (int) Math.ceil((double) MAX_RECORDS_COUNT/recordsSize); + verify(getRecordsRetrievalStrategy, times(callRate)).getRecords(MAX_RECORDS_PER_CALL); + assertEquals(spyQueue.size(), callRate); + assertTrue(callRate < MAX_SIZE); + } + + @Test + public void testFullCacheSize() { + int recordsSize = 200; + when(records.size()).thenReturn(recordsSize); + + getRecordsCache.start(); + + // Sleep for a few seconds for the cache to fill up. + sleep(2000); + + verify(getRecordsRetrievalStrategy, times(MAX_SIZE + 1)).getRecords(eq(MAX_RECORDS_PER_CALL)); + assertEquals(spyQueue.size(), MAX_SIZE); + } + + @Test + public void testMultipleCacheCalls() { + int recordsSize = 20; + when(record.getData()).thenReturn(createByteBufferWithSize(1024)); + + IntStream.range(0, recordsSize).forEach(i -> records.add(record)); + + getRecordsCache.start(); + ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult(); + + verify(executorService).execute(any()); + assertEquals(processRecordsInput.getRecords(), records); + assertNotNull(processRecordsInput.getCacheEntryTime()); + assertNotNull(processRecordsInput.getCacheExitTime()); + + sleep(2000); + + ProcessRecordsInput processRecordsInput2 = getRecordsCache.getNextResult(); + assertNotEquals(processRecordsInput, processRecordsInput2); + assertEquals(processRecordsInput2.getRecords(), records); + assertNotEquals(processRecordsInput2.getTimeSpentInCache(), Duration.ZERO); + + assertTrue(spyQueue.size() <= MAX_SIZE); + } + + @Test(expected = IllegalStateException.class) + public void testGetNextRecordsWithoutStarting() { + verify(executorService, times(0)).execute(any()); + getRecordsCache.getNextResult(); + } + + @After + public void shutdown() { + getRecordsCache.shutdown(); + verify(executorService).shutdownNow(); + } + + private void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) {} + } + + private ByteBuffer createByteBufferWithSize(int size) { + ByteBuffer byteBuffer = ByteBuffer.allocate(size); + byteBuffer.put(new byte[size]); + return byteBuffer; + } +}