From 3404ddfcf4cbc54e48114369934214c4d38d3dc4 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Thu, 21 Sep 2017 11:27:49 -0700 Subject: [PATCH] Adding licenses to files, addressing comments. Throwing IllegalStateException if threadpool is not started. Exposing the start method in the GetRecordsCache interface. Adding new test case for the new behavior. --- .../lib/worker/BlockingGetRecordsCache.java | 20 ++++++++++++ .../lib/worker/GetRecordsCache.java | 25 ++++++++++++++- .../lib/worker/PrefetchGetRecordsCache.java | 31 +++++++++++++++---- .../types/ProcessRecordsInput.java | 1 - .../worker/PrefetchGetRecordsCacheTest.java | 8 +++++ 5 files changed, 77 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java index ef772be0..a914c876 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java @@ -1,3 +1,18 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; @@ -19,6 +34,11 @@ public class BlockingGetRecordsCache implements GetRecordsCache { this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; } + @Override + public void start() { + // Do nothing, this behavior is not supported by this cache. + } + @Override public ProcessRecordsInput getNextResult() { GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java index ead4723e..d08ec285 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java @@ -1,3 +1,18 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; @@ -6,6 +21,11 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; * This class is used as a cache for Prefetching data from Kinesis. */ public interface GetRecordsCache { + /** + * This method calls the start behavior on the cache, if available. + */ + void start(); + /** * This method returns the next set of records from the Cache if present, or blocks the request till it gets the * next set of records back from Kinesis. @@ -13,6 +33,9 @@ public interface GetRecordsCache { * @return The next set of records. */ ProcessRecordsInput getNextResult(); - + + /** + * This method calls the shutdown behavior on the cache, if available. + */ void shutdown(); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index e0ceb6de..625cd611 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -1,3 +1,18 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.time.Instant; @@ -46,7 +61,8 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { this.executorService = executorService; } - void start() { + @Override + public void start() { if (!started) { log.info("Starting prefetching thread."); executorService.execute(new DefaultGetRecordsCacheDaemon()); @@ -57,12 +73,11 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { @Override public ProcessRecordsInput getNextResult() { if (!started) { - start(); + throw new IllegalStateException("Threadpool in the cache was not started, make sure to call start on the cache"); } ProcessRecordsInput result = null; try { - result = getRecordsResultQueue.take(); - result.withCacheExitTime(Instant.now()); + result = getRecordsResultQueue.take().withCacheExitTime(Instant.now()); prefetchCounters.removed(result); } catch (InterruptedException e) { log.error("Interrupted while getting records from the cache", e); @@ -83,7 +98,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { log.warn("Prefetch thread was interrupted."); break; } - if (prefetchCounters.byteSize < maxByteSize && prefetchCounters.size < maxRecordsCount) { + if (prefetchCounters.shouldGetNewRecords()) { try { GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); ProcessRecordsInput processRecordsInput = new ProcessRecordsInput() @@ -93,7 +108,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { getRecordsResultQueue.put(processRecordsInput); prefetchCounters.added(processRecordsInput); } catch (InterruptedException e) { - log.error("Interrupted while adding records to the cache", e); + log.info("Thread was interrupted, indicating shutdown was called on the cache", e); } } } @@ -121,6 +136,10 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private long getByteSize(final ProcessRecordsInput result) { return result.getRecords().stream().mapToLong(record -> record.getData().array().length).sum(); } + + public boolean shouldGetNewRecords() { + return size < maxRecordsCount && byteSize < maxByteSize; + } } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java index 4e89e3f4..362af357 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java @@ -29,7 +29,6 @@ import lombok.Getter; * ProcessRecordsInput processRecordsInput) processRecords} method. */ public class ProcessRecordsInput { - @Getter private Instant cacheEntryTime; @Getter diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java index 28458fb4..eeb8ff1d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java @@ -98,6 +98,7 @@ public class PrefetchGetRecordsCacheTest { records.add(record); records.add(record); + getRecordsCache.start(); ProcessRecordsInput result = getRecordsCache.getNextResult(); assertEquals(result.getRecords(), records); @@ -158,6 +159,7 @@ public class PrefetchGetRecordsCacheTest { IntStream.range(0, recordsSize).forEach(i -> records.add(record)); + getRecordsCache.start(); ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult(); verify(executorService).execute(any()); @@ -174,6 +176,12 @@ public class PrefetchGetRecordsCacheTest { assertTrue(spyQueue.size() <= MAX_SIZE); } + + @Test(expected = IllegalStateException.class) + public void testGetNextRecordsWithoutStarting() { + verify(executorService, times(0)).execute(any()); + getRecordsCache.getNextResult(); + } @After public void shutdown() {