diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java index a914c876..5220c6ae 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java @@ -36,7 +36,9 @@ public class BlockingGetRecordsCache implements GetRecordsCache { @Override public void start() { - // Do nothing, this behavior is not supported by this cache. + // + // Nothing to do here + // } @Override @@ -50,6 +52,8 @@ public class BlockingGetRecordsCache implements GetRecordsCache { @Override public void shutdown() { + // // Nothing to do here. + // } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index 625cd611..f5d82dab 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -116,15 +116,15 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { } private class PrefetchCounters { - private volatile long size = 0; - private volatile long byteSize = 0; + private long size = 0; + private long byteSize = 0; - public void added(final ProcessRecordsInput result) { + public synchronized void added(final ProcessRecordsInput result) { size += getSize(result); byteSize += getByteSize(result); } - public void removed(final ProcessRecordsInput result) { + public synchronized void removed(final ProcessRecordsInput result) { size -= getSize(result); byteSize -= getByteSize(result); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java index 67afe09e..dbe9d843 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java @@ -21,6 +21,7 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.when; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import org.junit.Before; @@ -44,9 +45,8 @@ public class BlockingGetRecordsCacheTest { private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; @Mock private GetRecordsResult getRecordsResult; - @Mock - private List records; - + + private List records = new ArrayList<>(); private BlockingGetRecordsCache blockingGetRecordsCache; @Before @@ -58,7 +58,7 @@ public class BlockingGetRecordsCacheTest { } @Test - public void testGetNextRecords() { + public void testGetNextRecordsWithNoRecords() { ProcessRecordsInput result = blockingGetRecordsCache.getNextResult(); assertEquals(result.getRecords(), records); @@ -66,4 +66,17 @@ public class BlockingGetRecordsCacheTest { assertNull(result.getCacheExitTime()); assertEquals(result.getTimeSpentInCache(), Duration.ZERO); } + + @Test + public void testGetNextRecordsWithRecords() { + Record record = new Record(); + records.add(record); + records.add(record); + records.add(record); + records.add(record); + + ProcessRecordsInput result = blockingGetRecordsCache.getNextResult(); + + assertEquals(result.getRecords(), records); + } }