testing script for creating millisbehinlatest

This commit is contained in:
Qilin Jin 2021-12-05 13:29:38 -08:00
parent c85b1f42b7
commit e92f077817
4 changed files with 39 additions and 9 deletions

View file

@ -16,6 +16,7 @@ package software.amazon.kinesis.lifecycle;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.concurrent.TimeUnit;
import lombok.NonNull; import lombok.NonNull;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;

View file

@ -18,10 +18,7 @@ import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Function; import java.util.function.Function;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
@ -331,7 +328,11 @@ public class ShardConsumer {
taskIsRunning = true; taskIsRunning = true;
TaskResult result; TaskResult result;
try { try {
TimeUnit.MILLISECONDS.sleep(20);
result = task.call(); result = task.call();
} catch (InterruptedException e) {
e.printStackTrace();
result = null;
} finally { } finally {
taskIsRunning = false; taskIsRunning = false;
} }

View file

@ -19,6 +19,7 @@ import com.google.common.collect.Iterables;
import java.time.Duration; import java.time.Duration;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import lombok.AccessLevel; import lombok.AccessLevel;
import lombok.Data; import lombok.Data;
@ -291,8 +292,10 @@ public class KinesisDataFetcher implements DataFetcher {
@Override @Override
public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws ExecutionException, InterruptedException, TimeoutException { public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws ExecutionException, InterruptedException, TimeoutException {
TimeUnit.SECONDS.sleep(20);
final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request), final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request),
maxFutureWait); maxFutureWait);
System.out.println("millisbehindlatest is " + response.millisBehindLatest());
if (!isValidResult(response.nextShardIterator(), response.childShards())) { if (!isValidResult(response.nextShardIterator(), response.childShards())) {
throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId
+ ". nextShardIterator: " + response.nextShardIterator() + ". nextShardIterator: " + response.nextShardIterator()
@ -304,7 +307,7 @@ public class KinesisDataFetcher implements DataFetcher {
@Override @Override
public GetRecordsRequest getGetRecordsRequest(String nextIterator) { public GetRecordsRequest getGetRecordsRequest(String nextIterator) {
return KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(nextIterator) return KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(nextIterator)
.limit(maxRecords).build(); .limit(10).build();
} }
@Override @Override

View file

@ -82,7 +82,7 @@ public class ApplicationTest {
* Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
*/ */
ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor(); ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS); ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 1, 1, TimeUnit.SECONDS);
/** /**
* Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
@ -173,17 +173,42 @@ public class ApplicationTest {
private void publishRecord() { private void publishRecord() {
PutRecordRequest request1 = PutRecordRequest.builder() PutRecordRequest request1 = PutRecordRequest.builder()
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
.streamName(streamName1) .streamName(streamName2)
.data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10))) .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
.build(); .build();
PutRecordRequest request2 = PutRecordRequest.builder() PutRecordRequest request2 = PutRecordRequest.builder()
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
.streamName(streamName2) .streamName(streamName1)
.data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10))) .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(1)))
.build();
PutRecordRequest request3 = PutRecordRequest.builder()
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
.streamName(streamName1)
.data(SdkBytes.fromByteArray(RandomUtils.nextBytes(1)))
.build();
PutRecordRequest request4 = PutRecordRequest.builder()
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
.streamName(streamName1)
.data(SdkBytes.fromByteArray(RandomUtils.nextBytes(1)))
.build();
PutRecordRequest request5 = PutRecordRequest.builder()
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
.streamName(streamName1)
.data(SdkBytes.fromByteArray(RandomUtils.nextBytes(1)))
.build();
PutRecordRequest request6 = PutRecordRequest.builder()
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
.streamName(streamName1)
.data(SdkBytes.fromByteArray(RandomUtils.nextBytes(1)))
.build(); .build();
try { try {
kinesisClient.putRecord(request1).get(); kinesisClient.putRecord(request1).get();
kinesisClient.putRecord(request2).get(); kinesisClient.putRecord(request2).get();
kinesisClient.putRecord(request3).get();
kinesisClient.putRecord(request4).get();
kinesisClient.putRecord(request5).get();
kinesisClient.putRecord(request6).get();
System.out.println("600 records published yeah");
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.info("Interrupted, assuming shutdown."); log.info("Interrupted, assuming shutdown.");
} catch (ExecutionException e) { } catch (ExecutionException e) {