first go of refactoring the deaggregate method

This commit is contained in:
laurenmoos 2016-09-23 15:05:15 -07:00
parent 51663f96c7
commit 2196a053fb

View file

@ -14,6 +14,7 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.types;
import java.io.UnsupportedEncodingException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
@ -206,7 +207,6 @@ public class UserRecord extends Record {
for (Record r : records) {
boolean isAggregated = true;
long subSeqNum = 0;
ByteBuffer bb = r.getData();
if (bb.remaining() >= magic.length) {
@ -215,7 +215,8 @@ public class UserRecord extends Record {
isAggregated = false;
}
if (!Arrays.equals(AGGREGATED_RECORD_MAGIC, magic) || bb.remaining() <= DIGEST_SIZE) {
if (!Arrays.equals(AGGREGATED_RECORD_MAGIC, magic)
|| bb.remaining() <= DIGEST_SIZE) {
isAggregated = false;
}
@ -232,68 +233,12 @@ public class UserRecord extends Record {
isAggregated = false;
} else {
try {
Messages.AggregatedRecord ar = Messages.AggregatedRecord.parseFrom(messageData);
List<String> pks = ar.getPartitionKeyTableList();
List<String> ehks = ar.getExplicitHashKeyTableList();
long aat = r.getApproximateArrivalTimestamp() == null
? -1 : r.getApproximateArrivalTimestamp().getTime();
try {
int recordsInCurrRecord = 0;
for (Messages.Record mr : ar.getRecordsList()) {
String explicitHashKey = null;
String partitionKey = pks.get((int) mr.getPartitionKeyIndex());
if (mr.hasExplicitHashKeyIndex()) {
explicitHashKey = ehks.get((int) mr.getExplicitHashKeyIndex());
}
BigInteger effectiveHashKey = explicitHashKey != null
? new BigInteger(explicitHashKey)
: new BigInteger(1, md5(partitionKey.getBytes("UTF-8")));
if (effectiveHashKey.compareTo(startingHashKey) < 0
|| effectiveHashKey.compareTo(endingHashKey) > 0) {
for (int toRemove = 0; toRemove < recordsInCurrRecord; ++toRemove) {
result.remove(result.size() - 1);
}
break;
}
++recordsInCurrRecord;
Record record = new Record()
.withData(ByteBuffer.wrap(mr.getData().toByteArray()))
.withPartitionKey(partitionKey)
.withSequenceNumber(r.getSequenceNumber())
.withApproximateArrivalTimestamp(aat < 0 ? null : new Date(aat));
result.add(new UserRecord(true, record, subSeqNum++, explicitHashKey));
}
} catch (Exception e) {
StringBuilder sb = new StringBuilder();
sb.append("Unexpected exception during deaggregation, record was:\n");
sb.append("PKS:\n");
for (String s : pks) {
sb.append(s).append("\n");
}
sb.append("EHKS: \n");
for (String s : ehks) {
sb.append(s).append("\n");
}
for (Messages.Record mr : ar.getRecordsList()) {
sb.append("Record: [hasEhk=").append(mr.hasExplicitHashKeyIndex()).append(", ")
.append("ehkIdx=").append(mr.getExplicitHashKeyIndex()).append(", ")
.append("pkIdx=").append(mr.getPartitionKeyIndex()).append(", ")
.append("dataLen=").append(mr.getData().toByteArray().length).append("]\n");
}
sb.append("Sequence number: ").append(r.getSequenceNumber()).append("\n")
.append("Raw data: ")
.append(javax.xml.bind.DatatypeConverter.printBase64Binary(messageData)).append("\n");
LOG.error(sb.toString(), e);
}
deaggregateRecords(messageData, r, startingHashKey, endingHashKey);
} catch (InvalidProtocolBufferException e) {
isAggregated = false;
}
}
}
if (!isAggregated) {
bb.rewind();
result.add(new UserRecord(r));
@ -301,5 +246,95 @@ public class UserRecord extends Record {
}
return result;
}
private static List<UserRecord> deaggregateRecords(
byte[] messageData,
Record record,
BigInteger startingHashKey,
BigInteger endingHashKey
) throws InvalidProtocolBufferException{
Messages.AggregatedRecord aggregatedRecord = Messages.AggregatedRecord.parseFrom(messageData);
List<UserRecord> results = new ArrayList<>();
Date arrivalTimeStamp = record.getApproximateArrivalTimestamp();
String sequenceNumber = record.getSequenceNumber();
List<String> partitionKeyTable = aggregatedRecord.getPartitionKeyTableList();
List<String> explicitHashKeyTable = aggregatedRecord.getExplicitHashKeyTableList();
try {
long subSeqNum = 0;
for (Messages.Record subRecord : aggregatedRecord.getRecordsList()) {
String explicitHashKey = subRecord.hasExplicitHashKeyIndex() ?
explicitHashKeyTable.get((int) subRecord.getExplicitHashKeyIndex()) : null;
String partitionKey = partitionKeyTable.get((int) subRecord.getPartitionKeyIndex());
BigInteger effectiveHashKey = explicitHashKey != null
? new BigInteger(explicitHashKey)
: new BigInteger(1, md5(partitionKey.getBytes("UTF-8")));
//if hash key is in invalid range, remove all records prior and exit loop
if (effectiveHashKey.compareTo(startingHashKey) < 0
|| effectiveHashKey.compareTo(endingHashKey) > 0) {
for (int toRemove = 0; toRemove < subSeqNum; ++toRemove) {
results.remove(results.size() - 1);
}
break;
}
Record newRecord = new Record()
.withData(ByteBuffer.wrap(subRecord.getData().toByteArray()))
.withPartitionKey(partitionKey)
.withSequenceNumber(sequenceNumber)
.withApproximateArrivalTimestamp(arrivalTimeStamp);
results.add(new UserRecord(true, newRecord, subSeqNum++, explicitHashKey));
}
} catch (UnsupportedEncodingException e) {
logUnsupportedEncodingException(
e,
partitionKeyTable,
explicitHashKeyTable,
aggregatedRecord,
record,
messageData
);
return null;
}
return results;
}
private static void logUnsupportedEncodingException(
UnsupportedEncodingException e,
List<String> partitionKeyTable,
List<String> explicitHashKeyTable,
Messages.AggregatedRecord aggregatedRecord,
Record record,
byte[] messageData
) {
StringBuilder sb = new StringBuilder();
sb.append("Unexpected exception during deaggregation, record was:\n");
sb.append("PKS:\n");
for (String s : partitionKeyTable) {
sb.append(s).append("\n");
}
sb.append("EHKS: \n");
for (String s : explicitHashKeyTable) {
sb.append(s).append("\n");
}
for (Messages.Record mr : aggregatedRecord.getRecordsList()) {
sb.append("Record: [hasEhk=").append(mr.hasExplicitHashKeyIndex()).append(", ")
.append("ehkIdx=").append(mr.getExplicitHashKeyIndex()).append(", ")
.append("pkIdx=").append(mr.getPartitionKeyIndex()).append(", ")
.append("dataLen=").append(mr.getData().toByteArray().length).append("]\n");
}
sb.append("Sequence number: ").append(record.getSequenceNumber()).append("\n")
.append("Raw data: ")
.append(javax.xml.bind.DatatypeConverter.printBase64Binary(messageData)).append("\n");
LOG.error(sb.toString(), e);
}
// CHECKSTYLE:ON NPathComplexity
}