Cleaner implementation of list all resources

This commit is contained in:
Meher Mankikar 2023-06-21 13:13:47 -07:00
parent 7816727b3a
commit 84d2f1bcd8
3 changed files with 19 additions and 44 deletions

View file

@ -55,27 +55,14 @@ public class LeaseTableManager extends AWSResourceManager {
} }
public List<String> _getAllResourceNames() throws Exception { public List<String> _getAllResourceNames() throws Exception {
List<String> tableNames = new ArrayList<>(); ListTablesRequest listTableRequest = ListTablesRequest.builder().build();
ListTablesRequest request = ListTablesRequest.builder().build(); List<String> allTableNames = new ArrayList<>();
ListTablesResponse response = null; ListTablesResponse result = null;
String startTableName = null; do {
result = FutureUtils.resolveOrCancelFuture(dynamoClient.listTables(listTableRequest), Duration.ofSeconds(60));
// Continue while paginated call is still returning table names allTableNames.addAll(result.tableNames());
while(response == null || response.lastEvaluatedTableName() != null) { listTableRequest = ListTablesRequest.builder().exclusiveStartTableName(result.lastEvaluatedTableName()).build();
if (startTableName != null) { } while (result.lastEvaluatedTableName() != null);
request = ListTablesRequest.builder().exclusiveStartTableName(startTableName).build(); return allTableNames;
} }
try {
response = FutureUtils.resolveOrCancelFuture(dynamoClient.listTables(request), Duration.ofSeconds(60));
} catch (ExecutionException | InterruptedException e) {
throw new Exception("Error listing all lease tables");
}
// Add all table names to list
tableNames.addAll(response.tableNames());
// Set startTableName for next call to be the last table name evaluated in current call
startTableName = response.lastEvaluatedTableName();
}
return tableNames;
}
} }

View file

@ -3,6 +3,7 @@ package software.amazon.kinesis.utils;
import lombok.Value; import lombok.Value;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest;
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
@ -63,27 +64,15 @@ public class StreamExistenceManager extends AWSResourceManager {
} }
public List<String> _getAllResourceNames() throws Exception { public List<String> _getAllResourceNames() throws Exception {
List<String> streamNames = new ArrayList<>(); ListStreamsRequest listStreamRequest = ListStreamsRequest.builder().build();
ListStreamsRequest request = ListStreamsRequest.builder().build(); List<String> allStreamNames = new ArrayList<>();
ListStreamsResponse response = null; ListStreamsResponse result = null;
String startStreamName = null; do {
result = FutureUtils.resolveOrCancelFuture(client.listStreams(listStreamRequest), Duration.ofSeconds(60));
// Continue while paginated call is still returning stream names allStreamNames.addAll(result.streamNames());
while(response == null || response.hasMoreStreams()) { listStreamRequest = ListStreamsRequest.builder().exclusiveStartStreamName(result.nextToken()).build();
if (startStreamName != null) { } while (result.hasMoreStreams());
request = ListStreamsRequest.builder().exclusiveStartStreamName(startStreamName).build(); return allStreamNames;
}
try {
response = FutureUtils.resolveOrCancelFuture(client.listStreams(request), Duration.ofSeconds(60));
} catch (Exception e) {
throw new Exception("Error listing all lease tables");
}
// Add all stream names to list
streamNames.addAll(response.streamNames());
// Set startTableName for next call to be the last table name evaluated in current call
startStreamName = response.nextToken();
}
return streamNames;
} }
public void checkStreamAndCreateIfNecessary(String streamName) { public void checkStreamAndCreateIfNecessary(String streamName) {

View file

@ -99,7 +99,6 @@ public class TestConsumer {
throw e; throw e;
} finally { } finally {
// Clean up resources created // Clean up resources created
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
deleteResources(streamExistenceManager, leaseTableManager); deleteResources(streamExistenceManager, leaseTableManager);
} }
} }