diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index b1f6121c..ee4f42c3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.lifecycle; import java.util.List; import java.util.ListIterator; +import java.util.concurrent.TimeUnit; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index b6e7c068..bf28f818 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -18,10 +18,7 @@ import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.*; import java.util.function.Function; import org.reactivestreams.Subscription; @@ -331,7 +328,11 @@ public class ShardConsumer { taskIsRunning = true; TaskResult result; try { + TimeUnit.MILLISECONDS.sleep(20); result = task.call(); + } catch (InterruptedException e) { + e.printStackTrace(); + result = null; } finally { taskIsRunning = false; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index 223ab367..a9efb480 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -19,6 +19,7 @@ import com.google.common.collect.Iterables; import java.time.Duration; import java.util.Collections; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import lombok.AccessLevel; import lombok.Data; @@ -291,8 +292,10 @@ public class KinesisDataFetcher implements DataFetcher { @Override public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws ExecutionException, InterruptedException, TimeoutException { + TimeUnit.SECONDS.sleep(20); final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request), maxFutureWait); + System.out.println("millisbehindlatest is " + response.millisBehindLatest()); if (!isValidResult(response.nextShardIterator(), response.childShards())) { throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId + ". nextShardIterator: " + response.nextShardIterator() @@ -304,7 +307,7 @@ public class KinesisDataFetcher implements DataFetcher { @Override public GetRecordsRequest getGetRecordsRequest(String nextIterator) { return KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(nextIterator) - .limit(maxRecords).build(); + .limit(10).build(); } @Override diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java index e3616ff0..f2ddabf9 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/ApplicationTest.java @@ -82,7 +82,7 @@ public class ApplicationTest { * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL */ ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor(); - ScheduledFuture producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS); + ScheduledFuture producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 1, 1, TimeUnit.SECONDS); /** * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a @@ -173,17 +173,42 @@ public class ApplicationTest { private void publishRecord() { PutRecordRequest request1 = PutRecordRequest.builder() .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) - .streamName(streamName1) + .streamName(streamName2) .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10))) .build(); PutRecordRequest request2 = PutRecordRequest.builder() .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) - .streamName(streamName2) - .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10))) + .streamName(streamName1) + .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(1))) + .build(); + PutRecordRequest request3 = PutRecordRequest.builder() + .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) + .streamName(streamName1) + .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(1))) + .build(); + PutRecordRequest request4 = PutRecordRequest.builder() + .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) + .streamName(streamName1) + .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(1))) + .build(); + PutRecordRequest request5 = PutRecordRequest.builder() + .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) + .streamName(streamName1) + .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(1))) + .build(); + PutRecordRequest request6 = PutRecordRequest.builder() + .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) + .streamName(streamName1) + .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(1))) .build(); try { kinesisClient.putRecord(request1).get(); kinesisClient.putRecord(request2).get(); + kinesisClient.putRecord(request3).get(); + kinesisClient.putRecord(request4).get(); + kinesisClient.putRecord(request5).get(); + kinesisClient.putRecord(request6).get(); + System.out.println("600 records published yeah"); } catch (InterruptedException e) { log.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) {