diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index c24ec618..85632500 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -53,7 +53,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { private static final String PENDING_CHECKPOINT_SUBSEQUENCE_KEY = "pendingCheckpointSubSequenceNumber"; private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState"; private static final String PARENT_SHARD_ID_KEY = "parentShardId"; - private static final String CHILD_SHARD_ID_KEY = "childShardIds"; + private static final String CHILD_SHARD_IDS_KEY = "childShardIds"; @Override public Map toDynamoRecord(final Lease lease) { @@ -73,7 +73,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { result.put(PARENT_SHARD_ID_KEY, DynamoUtils.createAttributeValue(lease.parentShardIds())); } if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) { - result.put(CHILD_SHARD_ID_KEY, DynamoUtils.createAttributeValue(lease.childShardIds())); + result.put(CHILD_SHARD_IDS_KEY, DynamoUtils.createAttributeValue(lease.childShardIds())); } if (lease.pendingCheckpoint() != null && !lease.pendingCheckpoint().sequenceNumber().isEmpty()) { @@ -107,7 +107,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { DynamoUtils.safeGetLong(dynamoRecord, CHECKPOINT_SUBSEQUENCE_NUMBER_KEY)) ); leaseToUpdate.parentShardIds(DynamoUtils.safeGetSS(dynamoRecord, PARENT_SHARD_ID_KEY)); - leaseToUpdate.childShardIds(DynamoUtils.safeGetSS(dynamoRecord, CHILD_SHARD_ID_KEY)); + leaseToUpdate.childShardIds(DynamoUtils.safeGetSS(dynamoRecord, CHILD_SHARD_IDS_KEY)); if (!Strings.isNullOrEmpty(DynamoUtils.safeGetString(dynamoRecord, PENDING_CHECKPOINT_SEQUENCE_KEY))) { leaseToUpdate.pendingCheckpoint( @@ -243,7 +243,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) { - result.put(CHILD_SHARD_ID_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds()))); + result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds()))); } return result; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 9fb822a5..bf07f6e2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -52,6 +52,7 @@ import java.util.List; import java.util.Set; import java.util.UUID; import java.util.function.Function; +import java.util.stream.Collectors; /** * Task for invoking the ShardRecordProcessor shutdown() callback. @@ -195,14 +196,11 @@ public class ShutdownTask implements ConsumerTask { private void updateLeasesForChildShards() throws DependencyException, InvalidStateException, ProvisionedThroughputException { final Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfoIdProvider.apply(shardInfo)); - Set childShardIds = new HashSet<>(); - for (ChildShard childShard : childShards) { - childShardIds.add(childShard.shardId()); - } + Set childShardIds = childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet()); - final Lease upatedLease = currentLease.copy(); - upatedLease.childShardIds(childShardIds); - leaseCoordinator.updateLease(upatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, shardInfoIdProvider.apply(shardInfo)); + final Lease updatedLease = currentLease.copy(); + updatedLease.childShardIds(childShardIds); + leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, shardInfoIdProvider.apply(shardInfo)); } /* diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 9f1737fe..c94a3266 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.UUID; @@ -123,7 +124,8 @@ public class ShutdownTaskTest { @Test public final void testCallWhenApplicationDoesNotCheckpoint() { when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298")); - when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(createLease()); + Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId")); + when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease); when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); final TaskResult result = task.call(); @@ -164,7 +166,8 @@ public class ShutdownTaskTest { hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards()); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); - when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(createLease()); + Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId")); + when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease); when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); final TaskResult result = task.call(); @@ -253,11 +256,11 @@ public class ShutdownTaskTest { return childShards; } - private Lease createLease() { + private Lease createLease(String leaseKey, String leaseOwner, Collection parentShardIds) { Lease lease = new Lease(); - lease.leaseKey("shardId-0"); - lease.leaseOwner("leaseOwner"); - lease.parentShardIds(Collections.singleton("parentShardIds")); + lease.leaseKey(leaseKey); + lease.leaseOwner(leaseOwner); + lease.parentShardIds(parentShardIds); return lease; }