Adding serializer for application state to lease info

This commit is contained in:
Joshua Kim 2020-04-07 06:15:18 -04:00
parent ae005ce0f8
commit b335246a30
2 changed files with 33 additions and 0 deletions

View file

@ -14,6 +14,7 @@
*/
package software.amazon.kinesis.leases;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@ -36,6 +37,14 @@ public class DynamoUtils {
return AttributeValue.builder().ss(collectionValue).build();
}
public static AttributeValue createAttributeValue(byte[] byteBufferValue) {
if (byteBufferValue == null) {
throw new IllegalArgumentException("Byte buffer attributeValues cannot be null or empty.");
}
return AttributeValue.builder().b(SdkBytes.fromByteArray(byteBufferValue)).build();
}
public static AttributeValue createAttributeValue(String stringValue) {
if (stringValue == null || stringValue.isEmpty()) {
throw new IllegalArgumentException("String attributeValues cannot be null or empty.");
@ -52,6 +61,15 @@ public class DynamoUtils {
return AttributeValue.builder().n(longValue.toString()).build();
}
public static byte[] safeGetByteArray(Map<String, AttributeValue> dynamoRecord, String key) {
AttributeValue av = dynamoRecord.get(key);
if (av == null) {
return null;
} else {
return av.b().asByteArray();
}
}
public static Long safeGetLong(Map<String, AttributeValue> dynamoRecord, String key) {
AttributeValue av = dynamoRecord.get(key);
if (av == null) {

View file

@ -50,6 +50,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
private static final String CHECKPOINT_SUBSEQUENCE_NUMBER_KEY = "checkpointSubSequenceNumber";
private static final String PENDING_CHECKPOINT_SEQUENCE_KEY = "pendingCheckpoint";
private static final String PENDING_CHECKPOINT_SUBSEQUENCE_KEY = "pendingCheckpointSubSequenceNumber";
private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState";
private static final String PARENT_SHARD_ID_KEY = "parentShardId";
@Override
@ -75,6 +76,10 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.pendingCheckpoint().subSequenceNumber()));
}
if (lease.pendingCheckpointState() != null) {
result.put(PENDING_CHECKPOINT_STATE_KEY, DynamoUtils.createAttributeValue(lease.checkpoint().subSequenceNumber()));
}
return result;
}
@ -105,6 +110,9 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
DynamoUtils.safeGetLong(dynamoRecord, PENDING_CHECKPOINT_SUBSEQUENCE_KEY))
);
}
leaseToUpdate.pendingCheckpointState(DynamoUtils.safeGetByteArray(dynamoRecord, PENDING_CHECKPOINT_STATE_KEY));
return leaseToUpdate;
}
@ -220,6 +228,13 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build());
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build());
}
if (lease.pendingCheckpointState() != null) {
result.put(PENDING_CHECKPOINT_STATE_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.pendingCheckpointState())));
} else {
result.put(PENDING_CHECKPOINT_STATE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build());
}
return result;
}