Changed KinesisDataFetecher.getRecords not to return null back. Added test for KinesisDataFetcher.getRecords. Adding test classes for GetRecordsCache implemetations. Removing the DataFetchingStrategy from the PrefetchGetRecordsCache.

This commit is contained in:
Sahil Palvia 2017-09-20 17:28:05 -07:00
parent a8edb70552
commit a3cbfaff31
7 changed files with 355 additions and 34 deletions

View file

@ -1,5 +1,6 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import lombok.extern.apachecommons.CommonsLog;
@ -19,8 +20,12 @@ public class BlockingGetRecordsCache implements GetRecordsCache {
}
@Override
public GetRecordsResult getNextResult() {
return getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
public ProcessRecordsInput getNextResult() {
GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
.withRecords(getRecordsResult.getRecords())
.withMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
return processRecordsInput;
}
@Override

View file

@ -1,8 +1,6 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import java.util.Collections;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
/**
* This class is used as a cache for Prefetching data from Kinesis.
@ -14,7 +12,7 @@ public interface GetRecordsCache {
*
* @return The next set of records.
*/
GetRecordsResult getNextResult();
ProcessRecordsInput getNextResult();
void shutdown();
}

View file

@ -1,8 +1,10 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.time.Instant;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import lombok.NonNull;
@ -18,7 +20,7 @@ import lombok.extern.apachecommons.CommonsLog;
*/
@CommonsLog
public class PrefetchGetRecordsCache implements GetRecordsCache {
private LinkedBlockingQueue<GetRecordsResult> getRecordsResultQueue;
LinkedBlockingQueue<ProcessRecordsInput> getRecordsResultQueue;
private int maxSize;
private int maxByteSize;
private int maxRecordsCount;
@ -31,7 +33,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
private boolean started = false;
public PrefetchGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount,
final int maxRecordsPerCall, @NonNull final DataFetchingStrategy dataFetchingStrategy,
final int maxRecordsPerCall,
@NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
@NonNull final ExecutorService executorService) {
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
@ -40,11 +42,11 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
this.maxByteSize = maxByteSize;
this.maxRecordsCount = maxRecordsCount;
this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize);
prefetchCounters = new PrefetchCounters();
this.prefetchCounters = new PrefetchCounters();
this.executorService = executorService;
}
private void start() {
void start() {
if (!started) {
log.info("Starting prefetching thread.");
executorService.execute(new DefaultGetRecordsCacheDaemon());
@ -53,13 +55,14 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
}
@Override
public GetRecordsResult getNextResult() {
public ProcessRecordsInput getNextResult() {
if (!started) {
start();
}
GetRecordsResult result = null;
ProcessRecordsInput result = null;
try {
result = getRecordsResultQueue.take();
result.withCacheExitTime(Instant.now());
prefetchCounters.removed(result);
} catch (InterruptedException e) {
log.error("Interrupted while getting records from the cache", e);
@ -69,18 +72,26 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
@Override
public void shutdown() {
executorService.shutdown();
executorService.shutdownNow();
}
private class DefaultGetRecordsCacheDaemon implements Runnable {
@Override
public void run() {
while (true) {
if (Thread.interrupted()) {
log.warn("Prefetch thread was interrupted.");
break;
}
if (prefetchCounters.byteSize < maxByteSize && prefetchCounters.size < maxRecordsCount) {
try {
GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
getRecordsResultQueue.put(getRecordsResult);
prefetchCounters.added(getRecordsResult);
ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
.withRecords(getRecordsResult.getRecords())
.withMillisBehindLatest(getRecordsResult.getMillisBehindLatest())
.withCacheEntryTime(Instant.now());
getRecordsResultQueue.put(processRecordsInput);
prefetchCounters.added(processRecordsInput);
} catch (InterruptedException e) {
log.error("Interrupted while adding records to the cache", e);
}
@ -93,21 +104,21 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
private volatile long size = 0;
private volatile long byteSize = 0;
public void added(final GetRecordsResult result) {
public void added(final ProcessRecordsInput result) {
size += getSize(result);
byteSize += getByteSize(result);
}
public void removed(final GetRecordsResult result) {
public void removed(final ProcessRecordsInput result) {
size -= getSize(result);
byteSize -= getByteSize(result);
}
private long getSize(final GetRecordsResult result) {
private long getSize(final ProcessRecordsInput result) {
return result.getRecords().size();
}
private long getByteSize(final GetRecordsResult result) {
private long getByteSize(final ProcessRecordsInput result) {
return result.getRecords().stream().mapToLong(record -> record.getData().array().length).sum();
}
}

View file

@ -14,10 +14,15 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.types;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.model.Record;
import lombok.Getter;
/**
* Container for the parameters to the IRecordProcessor's
* {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#processRecords(
@ -25,6 +30,10 @@ import com.amazonaws.services.kinesis.model.Record;
*/
public class ProcessRecordsInput {
@Getter
private Instant cacheEntryTime;
@Getter
private Instant cacheExitTime;
private List<Record> records;
private IRecordProcessorCheckpointer checkpointer;
private Long millisBehindLatest;
@ -96,4 +105,21 @@ public class ProcessRecordsInput {
this.millisBehindLatest = millisBehindLatest;
return this;
}
public ProcessRecordsInput withCacheEntryTime(Instant cacheEntryTime) {
this.cacheEntryTime = cacheEntryTime;
return this;
}
public ProcessRecordsInput withCacheExitTime(Instant cacheExitTime) {
this.cacheExitTime = cacheExitTime;
return this;
}
public Duration getTimeSpentInCache() {
if (cacheEntryTime == null || cacheExitTime == null) {
return Duration.ZERO;
}
return Duration.between(cacheEntryTime, cacheExitTime);
}
}

View file

@ -0,0 +1,69 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
import java.time.Duration;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
/**
* Test class for the BlockingGetRecordsCache class.
*/
@RunWith(MockitoJUnitRunner.class)
public class BlockingGetRecordsCacheTest {
private static final int MAX_RECORDS_PER_COUNT = 10_000;
@Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
@Mock
private GetRecordsResult getRecordsResult;
@Mock
private List<Record> records;
private BlockingGetRecordsCache blockingGetRecordsCache;
@Before
public void setup() {
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_COUNT))).thenReturn(getRecordsResult);
when(getRecordsResult.getRecords()).thenReturn(records);
blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy);
}
@Test
public void testGetNextRecords() {
ProcessRecordsInput result = blockingGetRecordsCache.getNextResult();
assertEquals(result.getRecords(), records);
assertNull(result.getCacheEntryTime());
assertNull(result.getCacheExitTime());
assertEquals(result.getTimeSpentInCache(), Duration.ZERO);
}
}

View file

@ -27,10 +27,6 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
@ -39,6 +35,10 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
/**
* Unit tests for KinesisDataFetcher.
@ -190,6 +190,23 @@ public class KinesisDataFetcherTest {
Assert.assertTrue("Shard should reach the end", dataFetcher.isShardEndReached());
}
@Test
public void testNonNullGetRecords() {
String nextIterator = "TestIterator";
int maxRecords = 100;
KinesisProxy mockProxy = mock(KinesisProxy.class);
doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO);
dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST);
GetRecordsResult getRecordsResult = dataFetcher.getRecords(maxRecords);
Assert.assertNotNull(getRecordsResult);
Assert.assertTrue(getRecordsResult.getRecords().isEmpty());
}
private void testInitializeAndFetch(String iteratorType,
String seqNo,
InitialPositionInStreamExtended initialPositionInStream) throws Exception {

View file

@ -0,0 +1,195 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.IntStream;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
/**
* Test class for the PrefetchGetRecordsCache class.
*/
@RunWith(MockitoJUnitRunner.class)
public class PrefetchGetRecordsCacheTest {
private static final int SIZE_512_KB = 512 * 1024;
private static final int SIZE_1_MB = 2 * SIZE_512_KB;
private static final int MAX_RECORDS_PER_CALL = 10000;
private static final int MAX_SIZE = 5;
private static final int MAX_RECORDS_COUNT = 15000;
@Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
@Mock
private GetRecordsResult getRecordsResult;
@Mock
private Record record;
private List<Record> records;
private ExecutorService executorService;
private LinkedBlockingQueue<ProcessRecordsInput> spyQueue;
private PrefetchGetRecordsCache getRecordsCache;
@Before
public void setup() {
executorService = spy(Executors.newFixedThreadPool(1));
getRecordsCache = new PrefetchGetRecordsCache(
MAX_SIZE,
3 * SIZE_1_MB,
MAX_RECORDS_COUNT,
MAX_RECORDS_PER_CALL,
getRecordsRetrievalStrategy,
executorService);
spyQueue = spy(getRecordsCache.getRecordsResultQueue);
records = spy(new ArrayList<>());
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(getRecordsResult);
when(getRecordsResult.getRecords()).thenReturn(records);
}
@Test
public void testGetRecords() {
when(records.size()).thenReturn(1000);
when(record.getData()).thenReturn(createByteBufferWithSize(SIZE_512_KB));
records.add(record);
records.add(record);
records.add(record);
records.add(record);
records.add(record);
ProcessRecordsInput result = getRecordsCache.getNextResult();
assertEquals(result.getRecords(), records);
verify(executorService).execute(any());
verify(getRecordsRetrievalStrategy, atLeast(1)).getRecords(eq(MAX_RECORDS_PER_CALL));
}
@Test
public void testFullCacheByteSize() {
when(records.size()).thenReturn(500);
when(record.getData()).thenReturn(createByteBufferWithSize(SIZE_1_MB));
records.add(record);
getRecordsCache.start();
// Sleep for a few seconds for the cache to fill up.
sleep(2000);
verify(getRecordsRetrievalStrategy, times(3)).getRecords(eq(MAX_RECORDS_PER_CALL));
assertEquals(spyQueue.size(), 3);
}
@Test
public void testFullCacheRecordsCount() {
int recordsSize = 4500;
when(records.size()).thenReturn(recordsSize);
getRecordsCache.start();
sleep(2000);
int callRate = (int) Math.ceil((double) MAX_RECORDS_COUNT/recordsSize);
verify(getRecordsRetrievalStrategy, times(callRate)).getRecords(MAX_RECORDS_PER_CALL);
assertEquals(spyQueue.size(), callRate);
assertTrue(callRate < MAX_SIZE);
}
@Test
public void testFullCacheSize() {
int recordsSize = 200;
when(records.size()).thenReturn(recordsSize);
getRecordsCache.start();
// Sleep for a few seconds for the cache to fill up.
sleep(2000);
verify(getRecordsRetrievalStrategy, times(MAX_SIZE + 1)).getRecords(eq(MAX_RECORDS_PER_CALL));
assertEquals(spyQueue.size(), MAX_SIZE);
}
@Test
public void testMultipleCacheCalls() {
int recordsSize = 20;
when(record.getData()).thenReturn(createByteBufferWithSize(1024));
IntStream.range(0, recordsSize).forEach(i -> records.add(record));
ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult();
verify(executorService).execute(any());
assertEquals(processRecordsInput.getRecords(), records);
assertNotNull(processRecordsInput.getCacheEntryTime());
assertNotNull(processRecordsInput.getCacheExitTime());
sleep(2000);
ProcessRecordsInput processRecordsInput2 = getRecordsCache.getNextResult();
assertNotEquals(processRecordsInput, processRecordsInput2);
assertEquals(processRecordsInput2.getRecords(), records);
assertNotEquals(processRecordsInput2.getTimeSpentInCache(), Duration.ZERO);
assertTrue(spyQueue.size() <= MAX_SIZE);
}
@After
public void shutdown() {
getRecordsCache.shutdown();
verify(executorService).shutdownNow();
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {}
}
private ByteBuffer createByteBufferWithSize(int size) {
ByteBuffer byteBuffer = ByteBuffer.allocate(size);
byteBuffer.put(new byte[size]);
return byteBuffer;
}
}