Adding licenses to files, addressing comments. Throwing IllegalStateException if threadpool is not started. Exposing the start method in the GetRecordsCache interface. Adding new test case for the new behavior.

This commit is contained in:
Sahil Palvia 2017-09-21 11:27:49 -07:00
parent ed1f9d4dca
commit 3404ddfcf4
5 changed files with 77 additions and 8 deletions

View file

@ -1,3 +1,18 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
@ -19,6 +34,11 @@ public class BlockingGetRecordsCache implements GetRecordsCache {
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
}
@Override
public void start() {
// Do nothing, this behavior is not supported by this cache.
}
@Override
public ProcessRecordsInput getNextResult() {
GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);

View file

@ -1,3 +1,18 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
@ -6,6 +21,11 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
* This class is used as a cache for Prefetching data from Kinesis.
*/
public interface GetRecordsCache {
/**
* This method calls the start behavior on the cache, if available.
*/
void start();
/**
* This method returns the next set of records from the Cache if present, or blocks the request till it gets the
* next set of records back from Kinesis.
@ -13,6 +33,9 @@ public interface GetRecordsCache {
* @return The next set of records.
*/
ProcessRecordsInput getNextResult();
/**
* This method calls the shutdown behavior on the cache, if available.
*/
void shutdown();
}

View file

@ -1,3 +1,18 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.time.Instant;
@ -46,7 +61,8 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
this.executorService = executorService;
}
void start() {
@Override
public void start() {
if (!started) {
log.info("Starting prefetching thread.");
executorService.execute(new DefaultGetRecordsCacheDaemon());
@ -57,12 +73,11 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
@Override
public ProcessRecordsInput getNextResult() {
if (!started) {
start();
throw new IllegalStateException("Threadpool in the cache was not started, make sure to call start on the cache");
}
ProcessRecordsInput result = null;
try {
result = getRecordsResultQueue.take();
result.withCacheExitTime(Instant.now());
result = getRecordsResultQueue.take().withCacheExitTime(Instant.now());
prefetchCounters.removed(result);
} catch (InterruptedException e) {
log.error("Interrupted while getting records from the cache", e);
@ -83,7 +98,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
log.warn("Prefetch thread was interrupted.");
break;
}
if (prefetchCounters.byteSize < maxByteSize && prefetchCounters.size < maxRecordsCount) {
if (prefetchCounters.shouldGetNewRecords()) {
try {
GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
@ -93,7 +108,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
getRecordsResultQueue.put(processRecordsInput);
prefetchCounters.added(processRecordsInput);
} catch (InterruptedException e) {
log.error("Interrupted while adding records to the cache", e);
log.info("Thread was interrupted, indicating shutdown was called on the cache", e);
}
}
}
@ -121,6 +136,10 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
private long getByteSize(final ProcessRecordsInput result) {
return result.getRecords().stream().mapToLong(record -> record.getData().array().length).sum();
}
public boolean shouldGetNewRecords() {
return size < maxRecordsCount && byteSize < maxByteSize;
}
}
}

View file

@ -29,7 +29,6 @@ import lombok.Getter;
* ProcessRecordsInput processRecordsInput) processRecords} method.
*/
public class ProcessRecordsInput {
@Getter
private Instant cacheEntryTime;
@Getter

View file

@ -98,6 +98,7 @@ public class PrefetchGetRecordsCacheTest {
records.add(record);
records.add(record);
getRecordsCache.start();
ProcessRecordsInput result = getRecordsCache.getNextResult();
assertEquals(result.getRecords(), records);
@ -158,6 +159,7 @@ public class PrefetchGetRecordsCacheTest {
IntStream.range(0, recordsSize).forEach(i -> records.add(record));
getRecordsCache.start();
ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult();
verify(executorService).execute(any());
@ -174,6 +176,12 @@ public class PrefetchGetRecordsCacheTest {
assertTrue(spyQueue.size() <= MAX_SIZE);
}
@Test(expected = IllegalStateException.class)
public void testGetNextRecordsWithoutStarting() {
verify(executorService, times(0)).execute(any());
getRecordsCache.getNextResult();
}
@After
public void shutdown() {