Bugfix: moved instantation of Future inside Supplier.
This commit is contained in:
parent
4c4708ac09
commit
8beb105078
2 changed files with 9 additions and 13 deletions
|
|
@ -1,8 +1,6 @@
|
||||||
package software.amazon.kinesis.retrieval;
|
package software.amazon.kinesis.retrieval;
|
||||||
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import lombok.NonNull;
|
import lombok.NonNull;
|
||||||
|
|
@ -48,9 +46,8 @@ public final class KinesisClientFacade {
|
||||||
public static DescribeStreamSummaryResponse describeStreamSummary(final String streamArn) {
|
public static DescribeStreamSummaryResponse describeStreamSummary(final String streamArn) {
|
||||||
final DescribeStreamSummaryRequest request = KinesisRequestsBuilder
|
final DescribeStreamSummaryRequest request = KinesisRequestsBuilder
|
||||||
.describeStreamSummaryRequestBuilder().streamARN(streamArn).build();
|
.describeStreamSummaryRequestBuilder().streamARN(streamArn).build();
|
||||||
final CompletableFuture<DescribeStreamSummaryResponse> future = kinesisClient.describeStreamSummary(request);
|
|
||||||
final ServiceCallerSupplier<DescribeStreamSummaryResponse> dss =
|
final ServiceCallerSupplier<DescribeStreamSummaryResponse> dss =
|
||||||
() -> future.get(10, TimeUnit.SECONDS);
|
() -> kinesisClient.describeStreamSummary(request).get();
|
||||||
return retryWhenThrottled(dss, 3, streamArn, "DescribeStreamSummary");
|
return retryWhenThrottled(dss, 3, streamArn, "DescribeStreamSummary");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,9 @@
|
||||||
package software.amazon.kinesis.retrieval;
|
package software.amazon.kinesis.retrieval;
|
||||||
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.mockito.Matchers.anyInt;
|
|
||||||
import static org.mockito.Mockito.any;
|
import static org.mockito.Mockito.any;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
|
|
@ -21,6 +19,7 @@ import org.mockito.runners.MockitoJUnitRunner;
|
||||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||||
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
|
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
|
||||||
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
|
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
|
||||||
|
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
|
||||||
|
|
||||||
@RunWith(MockitoJUnitRunner.class)
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
public class KinesisClientFacadeTest {
|
public class KinesisClientFacadeTest {
|
||||||
|
|
@ -49,19 +48,19 @@ public class KinesisClientFacadeTest {
|
||||||
public void testDescribeStreamSummaryRetries() throws Exception {
|
public void testDescribeStreamSummaryRetries() throws Exception {
|
||||||
final DescribeStreamSummaryResponse expectedResponse = DescribeStreamSummaryResponse.builder().build();
|
final DescribeStreamSummaryResponse expectedResponse = DescribeStreamSummaryResponse.builder().build();
|
||||||
final CompletableFuture<DescribeStreamSummaryResponse> mockFuture = mock(CompletableFuture.class);
|
final CompletableFuture<DescribeStreamSummaryResponse> mockFuture = mock(CompletableFuture.class);
|
||||||
final TimeoutException timeoutException = new TimeoutException();
|
final ExecutionException executionException = new ExecutionException(LimitExceededException.builder().build());
|
||||||
|
|
||||||
when(mockKinesisClient.describeStreamSummary(any(DescribeStreamSummaryRequest.class)))
|
when(mockKinesisClient.describeStreamSummary(any(DescribeStreamSummaryRequest.class)))
|
||||||
.thenReturn(mockFuture);
|
.thenReturn(mockFuture);
|
||||||
when(mockFuture.get(anyInt(), any(TimeUnit.class)))
|
when(mockFuture.get())
|
||||||
.thenThrow(timeoutException)
|
.thenThrow(executionException)
|
||||||
.thenThrow(timeoutException)
|
.thenThrow(executionException)
|
||||||
.thenReturn(expectedResponse);
|
.thenReturn(expectedResponse);
|
||||||
|
|
||||||
final DescribeStreamSummaryResponse actualResponse = describeStreamSummary("retry me plz");
|
final DescribeStreamSummaryResponse actualResponse = describeStreamSummary("retry me plz");
|
||||||
assertEquals(expectedResponse, actualResponse);
|
assertEquals(expectedResponse, actualResponse);
|
||||||
|
|
||||||
verify(mockKinesisClient).describeStreamSummary(any(DescribeStreamSummaryRequest.class));
|
verify(mockKinesisClient, times(3)).describeStreamSummary(any(DescribeStreamSummaryRequest.class));
|
||||||
verify(mockFuture, times(3)).get(anyInt(), any(TimeUnit.class));
|
verify(mockFuture, times(3)).get();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Loading…
Reference in a new issue