Fix data race during checkpointing (#82)
Make sure shard is locked during checkpointing. Signed-off-by: Tao Jiang <taoj@vmware.com>
This commit is contained in:
parent
499e9cf1be
commit
f1982602ff
1 changed files with 1 additions and 1 deletions
|
|
@ -65,6 +65,7 @@ func (pc *PreparedCheckpointer) Checkpoint() error {
|
||||||
|
|
||||||
func (rc *RecordProcessorCheckpointer) Checkpoint(sequenceNumber *string) error {
|
func (rc *RecordProcessorCheckpointer) Checkpoint(sequenceNumber *string) error {
|
||||||
rc.shard.Mux.Lock()
|
rc.shard.Mux.Lock()
|
||||||
|
defer rc.shard.Mux.Unlock()
|
||||||
|
|
||||||
// checkpoint the last sequence of a closed shard
|
// checkpoint the last sequence of a closed shard
|
||||||
if sequenceNumber == nil {
|
if sequenceNumber == nil {
|
||||||
|
|
@ -73,7 +74,6 @@ func (rc *RecordProcessorCheckpointer) Checkpoint(sequenceNumber *string) error
|
||||||
rc.shard.Checkpoint = aws.StringValue(sequenceNumber)
|
rc.shard.Checkpoint = aws.StringValue(sequenceNumber)
|
||||||
}
|
}
|
||||||
|
|
||||||
rc.shard.Mux.Unlock()
|
|
||||||
return rc.checkpoint.CheckpointSequence(rc.shard)
|
return rc.checkpoint.CheckpointSequence(rc.shard)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue