From 84d2f1bcd867d156de3e9a4e7dab50ae2324feec Mon Sep 17 00:00:00 2001 From: Meher Mankikar Date: Wed, 21 Jun 2023 13:13:47 -0700 Subject: [PATCH] Cleaner implementation of list all resources --- .../kinesis/utils/LeaseTableManager.java | 31 ++++++------------- .../kinesis/utils/StreamExistenceManager.java | 31 ++++++------------- .../amazon/kinesis/utils/TestConsumer.java | 1 - 3 files changed, 19 insertions(+), 44 deletions(-) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/LeaseTableManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/LeaseTableManager.java index d7dd31a1..69bf619b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/LeaseTableManager.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/LeaseTableManager.java @@ -55,27 +55,14 @@ public class LeaseTableManager extends AWSResourceManager { } public List _getAllResourceNames() throws Exception { - List tableNames = new ArrayList<>(); - ListTablesRequest request = ListTablesRequest.builder().build(); - ListTablesResponse response = null; - String startTableName = null; - - // Continue while paginated call is still returning table names - while(response == null || response.lastEvaluatedTableName() != null) { - if (startTableName != null) { - request = ListTablesRequest.builder().exclusiveStartTableName(startTableName).build(); - } - try { - response = FutureUtils.resolveOrCancelFuture(dynamoClient.listTables(request), Duration.ofSeconds(60)); - } catch (ExecutionException | InterruptedException e) { - throw new Exception("Error listing all lease tables"); - } - // Add all table names to list - tableNames.addAll(response.tableNames()); - // Set startTableName for next call to be the last table name evaluated in current call - startTableName = response.lastEvaluatedTableName(); - } - return tableNames; + ListTablesRequest listTableRequest = ListTablesRequest.builder().build(); + List allTableNames = new ArrayList<>(); + ListTablesResponse result = null; + do { + result = FutureUtils.resolveOrCancelFuture(dynamoClient.listTables(listTableRequest), Duration.ofSeconds(60)); + allTableNames.addAll(result.tableNames()); + listTableRequest = ListTablesRequest.builder().exclusiveStartTableName(result.lastEvaluatedTableName()).build(); + } while (result.lastEvaluatedTableName() != null); + return allTableNames; } - } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java index 321a36b5..bf5b634c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java @@ -3,6 +3,7 @@ package software.amazon.kinesis.utils; import lombok.Value; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; +import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; @@ -63,27 +64,15 @@ public class StreamExistenceManager extends AWSResourceManager { } public List _getAllResourceNames() throws Exception { - List streamNames = new ArrayList<>(); - ListStreamsRequest request = ListStreamsRequest.builder().build(); - ListStreamsResponse response = null; - String startStreamName = null; - - // Continue while paginated call is still returning stream names - while(response == null || response.hasMoreStreams()) { - if (startStreamName != null) { - request = ListStreamsRequest.builder().exclusiveStartStreamName(startStreamName).build(); - } - try { - response = FutureUtils.resolveOrCancelFuture(client.listStreams(request), Duration.ofSeconds(60)); - } catch (Exception e) { - throw new Exception("Error listing all lease tables"); - } - // Add all stream names to list - streamNames.addAll(response.streamNames()); - // Set startTableName for next call to be the last table name evaluated in current call - startStreamName = response.nextToken(); - } - return streamNames; + ListStreamsRequest listStreamRequest = ListStreamsRequest.builder().build(); + List allStreamNames = new ArrayList<>(); + ListStreamsResponse result = null; + do { + result = FutureUtils.resolveOrCancelFuture(client.listStreams(listStreamRequest), Duration.ofSeconds(60)); + allStreamNames.addAll(result.streamNames()); + listStreamRequest = ListStreamsRequest.builder().exclusiveStartStreamName(result.nextToken()).build(); + } while (result.hasMoreStreams()); + return allStreamNames; } public void checkStreamAndCreateIfNecessary(String streamName) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java index 394c2838..223ca99a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java @@ -99,7 +99,6 @@ public class TestConsumer { throw e; } finally { // Clean up resources created - Thread.sleep(TimeUnit.SECONDS.toMillis(30)); deleteResources(streamExistenceManager, leaseTableManager); } }