diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java index 4ad2c508..073c1d19 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; +import com.amazonaws.services.kinesis.clientlibrary.utils.RequestUtil; import com.amazonaws.services.kinesis.model.ShardFilter; import com.amazonaws.util.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -60,7 +61,6 @@ import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.model.StreamStatus; import lombok.AccessLevel; -import lombok.Data; import lombok.Getter; import lombok.Setter; @@ -442,7 +442,7 @@ public class KinesisProxy implements IKinesisProxyExtended { @Override public synchronized List getShardListWithFilter(ShardFilter shardFilter) { final List shards = new ArrayList<>(); - + final List requestIds = new ArrayList<>(); if (isKinesisClient) { ListShardsResult result; String nextToken = null; @@ -458,6 +458,7 @@ public class KinesisProxy implements IKinesisProxyExtended { return null; } else { shards.addAll(result.getShards()); + requestIds.add(RequestUtil.requestId(result)); nextToken = result.getNextToken(); } } while (StringUtils.isNotEmpty(result.getNextToken())); @@ -478,6 +479,7 @@ public class KinesisProxy implements IKinesisProxyExtended { } else { final List pageOfShards = response.getStreamDescription().getShards(); shards.addAll(pageOfShards); + requestIds.add(RequestUtil.requestId(response)); final Shard lastShard = pageOfShards.get(pageOfShards.size() - 1); if (lastShardId == null || lastShardId.compareTo(lastShard.getShardId()) < 0) { @@ -487,6 +489,9 @@ public class KinesisProxy implements IKinesisProxyExtended { } while (response.getStreamDescription().isHasMoreShards()); } final List dedupedShards = new ArrayList<>(new LinkedHashSet<>(shards)); + if (dedupedShards.size() < shards.size()) { + LOG.warn("Found duplicate child shards in ListShards response. Request ids - " + requestIds); + } this.cachedShardMap = dedupedShards.stream().collect(Collectors.toMap(Shard::getShardId, Function.identity())); this.lastCacheUpdateTime = Instant.now(); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/utils/RequestUtil.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/utils/RequestUtil.java new file mode 100644 index 00000000..cac65d45 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/utils/RequestUtil.java @@ -0,0 +1,24 @@ +package com.amazonaws.services.kinesis.clientlibrary.utils; + +import com.amazonaws.AmazonWebServiceResult; + +/** + * Helper class to parse metadata from AWS requests. + */ +public class RequestUtil { + private static final String DEFAULT_REQUEST_ID = "NONE"; + + /** + * Get the requestId associated with a request. + * + * @param result + * @return the requestId for a request, or "NONE" if one is not available. + */ + public static String requestId(AmazonWebServiceResult result) { + if (result == null || result.getSdkResponseMetadata() == null || result.getSdkResponseMetadata().getRequestId() == null) { + return DEFAULT_REQUEST_ID; + } + + return result.getSdkResponseMetadata().getRequestId(); + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java index 553d451a..76671176 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxyTest.java @@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.doReturn; @@ -92,6 +93,7 @@ public class KinesisProxyTest { private static final String SHARD_4 = "shard-4"; private static final String NOT_CACHED_SHARD = "ShardId-0005"; private static final String NEVER_PRESENT_SHARD = "ShardId-0010"; + private static final String REQUEST_ID = "requestId"; @Mock private AmazonKinesis mockClient; @@ -435,26 +437,41 @@ public class KinesisProxyTest { verify(mockClient).listShards(any()); } + /** + * Tests that if we fail halfway through a listShards call, we fail gracefully and subsequent calls are not + * affected by the failure of the first request. + */ @Test public void testNoDuplicateShardsInPartialFailure() { proxy.setCachedShardMap(null); + ListShardsResult firstPage = new ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN); ListShardsResult lastPage = new ListShardsResult().withShards(shards.subList(2, shards.size())).withNextToken(null); - when(mockClient.listShards(any())).thenReturn(firstPage).thenThrow(new RuntimeException("Failed!")) // First call - .thenReturn(firstPage).thenReturn(lastPage); // second call + + when(mockClient.listShards(any())) + .thenReturn(firstPage).thenThrow(new RuntimeException("Failed!")) + .thenReturn(firstPage).thenReturn(lastPage); try { proxy.getShardList(); + fail("First ListShards call should have failed!"); } catch (Exception e) { // Do nothing } assertEquals(shards, proxy.getShardList()); } + /** + * Tests that if we receive any duplicate shard responses from the service during a shard sync, we dedup the response + * and continue gracefully. + */ @Test public void testDuplicateShardResponseDedupedGracefully() { proxy.setCachedShardMap(null); - ListShardsResult pageOfShards = new ListShardsResult().withShards(shards).withNextToken(null); + List duplicateShards = new ArrayList<>(shards); + duplicateShards.addAll(shards); + ListShardsResult pageOfShards = new ListShardsResult().withShards(duplicateShards).withNextToken(null); + when(mockClient.listShards(any())).thenReturn(pageOfShards); proxy.getShardList();