improve comments
This commit is contained in:
parent
6372087bc3
commit
0c204685a9
17 changed files with 107 additions and 66 deletions
|
|
@ -16,6 +16,8 @@
|
||||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||||
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Package checkpoint
|
||||||
// The implementation is derived from https://github.com/patrobinson/gokini
|
// The implementation is derived from https://github.com/patrobinson/gokini
|
||||||
//
|
//
|
||||||
// Copyright 2018 Patrick robinson
|
// Copyright 2018 Patrick robinson
|
||||||
|
|
@ -42,11 +44,11 @@ const (
|
||||||
ParentShardIdKey = "ParentShardId"
|
ParentShardIdKey = "ParentShardId"
|
||||||
ClaimRequestKey = "ClaimRequest"
|
ClaimRequestKey = "ClaimRequest"
|
||||||
|
|
||||||
// We've completely processed all records in this shard.
|
// ShardEnd We've completely processed all records in this shard.
|
||||||
ShardEnd = "SHARD_END"
|
ShardEnd = "SHARD_END"
|
||||||
|
|
||||||
// ErrShardClaimed is returned when shard is claimed
|
// ErrShardClaimed is returned when shard is claimed
|
||||||
ErrShardClaimed = "Shard is already claimed by another node"
|
ErrShardClaimed = "shard is already claimed by another node"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ErrLeaseNotAcquired struct {
|
type ErrLeaseNotAcquired struct {
|
||||||
|
|
@ -77,8 +79,7 @@ type Checkpointer interface {
|
||||||
// RemoveLeaseOwner to remove lease owner for the shard entry to make the shard available for reassignment
|
// RemoveLeaseOwner to remove lease owner for the shard entry to make the shard available for reassignment
|
||||||
RemoveLeaseOwner(string) error
|
RemoveLeaseOwner(string) error
|
||||||
|
|
||||||
// New Lease Stealing Methods
|
// ListActiveWorkers returns active workers and their shards (New Lease Stealing Methods)
|
||||||
// ListActiveWorkers returns active workers and their shards
|
|
||||||
ListActiveWorkers(map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error)
|
ListActiveWorkers(map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error)
|
||||||
|
|
||||||
// ClaimShard claims a shard for stealing
|
// ClaimShard claims a shard for stealing
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,8 @@
|
||||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||||
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Package checkpoint
|
||||||
// The implementation is derived from https://github.com/patrobinson/gokini
|
// The implementation is derived from https://github.com/patrobinson/gokini
|
||||||
//
|
//
|
||||||
// Copyright 2018 Patrick robinson
|
// Copyright 2018 Patrick robinson
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,8 @@
|
||||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||||
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Package config
|
||||||
// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client
|
// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client
|
||||||
/*
|
/*
|
||||||
* Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
* Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||||
|
|
@ -54,85 +56,85 @@ const (
|
||||||
// AT_TIMESTAMP start from the record at or after the specified server-side Timestamp.
|
// AT_TIMESTAMP start from the record at or after the specified server-side Timestamp.
|
||||||
AT_TIMESTAMP
|
AT_TIMESTAMP
|
||||||
|
|
||||||
// The location in the shard from which the KinesisClientLibrary will start fetching records from
|
// DefaultInitialPositionInStream The location in the shard from which the KinesisClientLibrary will start fetching records from
|
||||||
// when the application starts for the first time and there is no checkpoint for the shard.
|
// when the application starts for the first time and there is no checkpoint for the shard.
|
||||||
DefaultInitialPositionInStream = LATEST
|
DefaultInitialPositionInStream = LATEST
|
||||||
|
|
||||||
// Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
|
// DefaultFailoverTimeMillis Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
|
||||||
// will be regarded as having problems and it's shards will be assigned to other workers.
|
// will be regarded as having problems and it's shards will be assigned to other workers.
|
||||||
// For applications that have a large number of shards, this may be set to a higher number to reduce
|
// For applications that have a large number of shards, this may be set to a higher number to reduce
|
||||||
// the number of DynamoDB IOPS required for tracking leases.
|
// the number of DynamoDB IOPS required for tracking leases.
|
||||||
DefaultFailoverTimeMillis = 10000
|
DefaultFailoverTimeMillis = 10000
|
||||||
|
|
||||||
// Period before the end of lease during which a lease is refreshed by the owner.
|
// DefaultLeaseRefreshPeriodMillis Period before the end of lease during which a lease is refreshed by the owner.
|
||||||
DefaultLeaseRefreshPeriodMillis = 5000
|
DefaultLeaseRefreshPeriodMillis = 5000
|
||||||
|
|
||||||
// Max records to fetch from Kinesis in a single GetRecords call.
|
// DefaultMaxRecords Max records to fetch from Kinesis in a single GetRecords call.
|
||||||
DefaultMaxRecords = 10000
|
DefaultMaxRecords = 10000
|
||||||
|
|
||||||
// The default value for how long the {@link ShardConsumer} should sleep if no records are returned
|
// DefaultIdleTimeBetweenReadsMillis The default value for how long the {@link ShardConsumer}
|
||||||
// from the call to
|
// should sleep if no records are returned from the call to
|
||||||
DefaultIdletimeBetweenReadsMillis = 1000
|
DefaultIdleTimeBetweenReadsMillis = 1000
|
||||||
|
|
||||||
// Don't call processRecords() on the record processor for empty record lists.
|
// DefaultDontCallProcessRecordsForEmptyRecordList Don't call processRecords() on the record processor for empty record lists.
|
||||||
DefaultDontCallProcessRecordsForEmptyRecordList = false
|
DefaultDontCallProcessRecordsForEmptyRecordList = false
|
||||||
|
|
||||||
// Interval in milliseconds between polling to check for parent shard completion.
|
// DefaultParentShardPollIntervalMillis Interval in milliseconds between polling to check for parent shard completion.
|
||||||
// Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
|
// Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
|
||||||
// completion of parent shards).
|
// completion of parent shards).
|
||||||
DefaultParentShardPollIntervalMillis = 10000
|
DefaultParentShardPollIntervalMillis = 10000
|
||||||
|
|
||||||
// Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
|
// DefaultShardSyncIntervalMillis Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
|
||||||
DefaultShardSyncIntervalMillis = 60000
|
DefaultShardSyncIntervalMillis = 60000
|
||||||
|
|
||||||
// Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
|
// DefaultCleanupLeasesUponShardsCompletion Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
|
||||||
// Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by
|
// Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by
|
||||||
// default we try to delete the ones we don't need any longer.
|
// default we try to delete the ones we don't need any longer.
|
||||||
DefaultCleanupLeasesUponShardsCompletion = true
|
DefaultCleanupLeasesUponShardsCompletion = true
|
||||||
|
|
||||||
// Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
|
// DefaultTaskBackoffTimeMillis Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
|
||||||
DefaultTaskBackoffTimeMillis = 500
|
DefaultTaskBackoffTimeMillis = 500
|
||||||
|
|
||||||
// KCL will validate client provided sequence numbers with a call to Amazon Kinesis before
|
// DefaultValidateSequenceNumberBeforeCheckpointing KCL will validate client provided sequence numbers with a call to Amazon Kinesis before
|
||||||
// checkpointing for calls to {@link RecordProcessorCheckpointer#checkpoint(String)} by default.
|
// checkpointing for calls to {@link RecordProcessorCheckpointer#checkpoint(String)} by default.
|
||||||
DefaultValidateSequenceNumberBeforeCheckpointing = true
|
DefaultValidateSequenceNumberBeforeCheckpointing = true
|
||||||
|
|
||||||
// The max number of leases (shards) this worker should process.
|
// DefaultMaxLeasesForWorker The max number of leases (shards) this worker should process.
|
||||||
// This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints
|
// This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints
|
||||||
// or during deployment.
|
// or during deployment.
|
||||||
// NOTE: Setting this to a low value can cause data loss if workers are not able to pick up all shards in the
|
// NOTE: Setting this to a low value can cause data loss if workers are not able to pick up all shards in the
|
||||||
// stream due to the max limit.
|
// stream due to the max limit.
|
||||||
DefaultMaxLeasesForWorker = math.MaxInt16
|
DefaultMaxLeasesForWorker = math.MaxInt16
|
||||||
|
|
||||||
// Max leases to steal from another worker at one time (for load balancing).
|
// DefaultMaxLeasesToStealAtOneTime Max leases to steal from another worker at one time (for load balancing).
|
||||||
// Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts),
|
// Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts),
|
||||||
// but can cause higher churn in the system.
|
// but can cause higher churn in the system.
|
||||||
DefaultMaxLeasesToStealAtOneTime = 1
|
DefaultMaxLeasesToStealAtOneTime = 1
|
||||||
|
|
||||||
// The Amazon DynamoDB table used for tracking leases will be provisioned with this read capacity.
|
// DefaultInitialLeaseTableReadCapacity The Amazon DynamoDB table used for tracking leases will be provisioned with this read capacity.
|
||||||
DefaultInitialLeaseTableReadCapacity = 10
|
DefaultInitialLeaseTableReadCapacity = 10
|
||||||
|
|
||||||
// The Amazon DynamoDB table used for tracking leases will be provisioned with this write capacity.
|
// DefaultInitialLeaseTableWriteCapacity The Amazon DynamoDB table used for tracking leases will be provisioned with this write capacity.
|
||||||
DefaultInitialLeaseTableWriteCapacity = 10
|
DefaultInitialLeaseTableWriteCapacity = 10
|
||||||
|
|
||||||
// The Worker will skip shard sync during initialization if there are one or more leases in the lease table. This
|
// DefaultSkipShardSyncAtStartupIfLeasesExist The Worker will skip shard sync during initialization if there are one or more leases in the lease table. This
|
||||||
// assumes that the shards and leases are in-sync. This enables customers to choose faster startup times (e.g.
|
// assumes that the shards and leases are in-sync. This enables customers to choose faster startup times (e.g.
|
||||||
// during incremental deployments of an application).
|
// during incremental deployments of an application).
|
||||||
DefaultSkipShardSyncAtStartupIfLeasesExist = false
|
DefaultSkipShardSyncAtStartupIfLeasesExist = false
|
||||||
|
|
||||||
// The amount of milliseconds to wait before graceful shutdown forcefully terminates.
|
// DefaultShutdownGraceMillis The amount of milliseconds to wait before graceful shutdown forcefully terminates.
|
||||||
DefaultShutdownGraceMillis = 5000
|
DefaultShutdownGraceMillis = 5000
|
||||||
|
|
||||||
// Lease stealing defaults to false for backwards compatibility.
|
// DefaultEnableLeaseStealing Lease stealing defaults to false for backwards compatibility.
|
||||||
DefaultEnableLeaseStealing = false
|
DefaultEnableLeaseStealing = false
|
||||||
|
|
||||||
// Interval between rebalance tasks defaults to 5 seconds.
|
// DefaultLeaseStealingIntervalMillis Interval between rebalance tasks defaults to 5 seconds.
|
||||||
DefaultLeaseStealingIntervalMillis = 5000
|
DefaultLeaseStealingIntervalMillis = 5000
|
||||||
|
|
||||||
// Number of milliseconds to wait before another worker can aquire a claimed shard
|
// DefaultLeaseStealingClaimTimeoutMillis Number of milliseconds to wait before another worker can aquire a claimed shard
|
||||||
DefaultLeaseStealingClaimTimeoutMillis = 120000
|
DefaultLeaseStealingClaimTimeoutMillis = 120000
|
||||||
|
|
||||||
// Number of milliseconds to wait before syncing with lease table (dynamodDB)
|
// DefaultLeaseSyncingIntervalMillis Number of milliseconds to wait before syncing with lease table (dynamodDB)
|
||||||
DefaultLeaseSyncingIntervalMillis = 60000
|
DefaultLeaseSyncingIntervalMillis = 60000
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -141,7 +143,7 @@ type (
|
||||||
// This is used during initial application bootstrap (when a checkpoint doesn't exist for a shard or its parents)
|
// This is used during initial application bootstrap (when a checkpoint doesn't exist for a shard or its parents)
|
||||||
InitialPositionInStream int
|
InitialPositionInStream int
|
||||||
|
|
||||||
// Class that houses the entities needed to specify the Position in the stream from where a new application should
|
// InitialPositionInStreamExtended Class that houses the entities needed to specify the Position in the stream from where a new application should
|
||||||
// start.
|
// start.
|
||||||
InitialPositionInStreamExtended struct {
|
InitialPositionInStreamExtended struct {
|
||||||
Position InitialPositionInStream
|
Position InitialPositionInStream
|
||||||
|
|
@ -156,7 +158,7 @@ type (
|
||||||
Timestamp *time.Time `type:"Timestamp" timestampFormat:"unix"`
|
Timestamp *time.Time `type:"Timestamp" timestampFormat:"unix"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Configuration for the Kinesis Client Library.
|
// KinesisClientLibConfiguration Configuration for the Kinesis Client Library.
|
||||||
// Note: There is no need to configure credential provider. Credential can be get from InstanceProfile.
|
// Note: There is no need to configure credential provider. Credential can be get from InstanceProfile.
|
||||||
KinesisClientLibConfiguration struct {
|
KinesisClientLibConfiguration struct {
|
||||||
// ApplicationName is name of application. Kinesis allows multiple applications to consume the same stream.
|
// ApplicationName is name of application. Kinesis allows multiple applications to consume the same stream.
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,8 @@
|
||||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||||
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Package config
|
||||||
// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client
|
// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client
|
||||||
/*
|
/*
|
||||||
* Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
* Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,8 @@
|
||||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||||
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Package config
|
||||||
// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client
|
// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client
|
||||||
/*
|
/*
|
||||||
* Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
* Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||||
|
|
@ -165,25 +167,24 @@ func (c *KinesisClientLibConfiguration) WithMaxLeasesForWorker(n int) *KinesisCl
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/* WithIdleTimeBetweenReadsInMillis
|
||||||
* Controls how long the KCL will sleep if no records are returned from Kinesis
|
Controls how long the KCL will sleep if no records are returned from Kinesis
|
||||||
*
|
|
||||||
* <p>
|
<p>
|
||||||
* This value is only used when no records are returned; if records are returned, the {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask} will
|
This value is only used when no records are returned; if records are returned, the {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask} will
|
||||||
* immediately retrieve the next set of records after the call to
|
immediately retrieve the next set of records after the call to
|
||||||
* {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#processRecords(ProcessRecordsInput)}
|
{@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#processRecords(ProcessRecordsInput)}
|
||||||
* has returned. Setting this value to high may result in the KCL being unable to catch up. If you are changing this
|
has returned. Setting this value to high may result in the KCL being unable to catch up. If you are changing this
|
||||||
* value it's recommended that you enable {@link #withCallProcessRecordsEvenForEmptyRecordList(boolean)}, and
|
value it's recommended that you enable {@link #withCallProcessRecordsEvenForEmptyRecordList(boolean)}, and
|
||||||
* monitor how far behind the records retrieved are by inspecting
|
monitor how far behind the records retrieved are by inspecting
|
||||||
* {@link com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput#getMillisBehindLatest()}, and the
|
{@link com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput#getMillisBehindLatest()}, and the
|
||||||
* <a href=
|
<a href=
|
||||||
* "http://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html#kinesis-metrics-stream">CloudWatch
|
"http://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html#kinesis-metrics-stream">CloudWatch
|
||||||
* Metric: GetRecords.MillisBehindLatest</a>
|
Metric: GetRecords.MillisBehindLatest</a>
|
||||||
* </p>
|
</p>
|
||||||
*
|
|
||||||
* @param IdleTimeBetweenReadsInMillis
|
@param IdleTimeBetweenReadsInMillis: how long to sleep between GetRecords calls when no records are returned.
|
||||||
* how long to sleep between GetRecords calls when no records are returned.
|
@return KinesisClientLibConfiguration
|
||||||
* @return KinesisClientLibConfiguration
|
|
||||||
*/
|
*/
|
||||||
func (c *KinesisClientLibConfiguration) WithIdleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis int) *KinesisClientLibConfiguration {
|
func (c *KinesisClientLibConfiguration) WithIdleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis int) *KinesisClientLibConfiguration {
|
||||||
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis)
|
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis)
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,8 @@
|
||||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||||
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Package interfaces
|
||||||
// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client
|
// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client
|
||||||
/*
|
/*
|
||||||
* Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
* Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||||
|
|
@ -57,7 +59,7 @@ const (
|
||||||
*/
|
*/
|
||||||
TERMINATE
|
TERMINATE
|
||||||
|
|
||||||
/**
|
/*
|
||||||
* Processing will be moved to a different record processor (fail over, load balancing use cases).
|
* Processing will be moved to a different record processor (fail over, load balancing use cases).
|
||||||
* Applications SHOULD NOT checkpoint their progress (as another record processor may have already started
|
* Applications SHOULD NOT checkpoint their progress (as another record processor may have already started
|
||||||
* processing data).
|
* processing data).
|
||||||
|
|
@ -67,12 +69,12 @@ const (
|
||||||
|
|
||||||
// Containers for the parameters to the IRecordProcessor
|
// Containers for the parameters to the IRecordProcessor
|
||||||
type (
|
type (
|
||||||
/**
|
/*
|
||||||
* Reason the RecordProcessor is being shutdown.
|
* Reason the RecordProcessor is being shutdown.
|
||||||
* Used to distinguish between a fail-over vs. a termination (shard is closed and all records have been delivered).
|
* Used to distinguish between a fail-over vs. a termination (shard is closed and all records have been delivered).
|
||||||
* In case of a fail over, applications should NOT checkpoint as part of shutdown,
|
* In case of a fail-over, applications should NOT checkpoint as part of shutdown,
|
||||||
* since another record processor may have already started processing records for that shard.
|
* since another record processor may have already started processing records for that shard.
|
||||||
* In case of termination (resharding use case), applications SHOULD checkpoint their progress to indicate
|
* In case of termination (resharding use case), applications SHOULD keep checkpointing their progress to indicate
|
||||||
* that they have successfully processed all the records (processing of child shards can then begin).
|
* that they have successfully processed all the records (processing of child shards can then begin).
|
||||||
*/
|
*/
|
||||||
ShutdownReason int
|
ShutdownReason int
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,8 @@
|
||||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||||
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Package interfaces
|
||||||
// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client
|
// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client
|
||||||
/*
|
/*
|
||||||
* Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
* Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||||
|
|
@ -37,7 +39,8 @@ type (
|
||||||
IPreparedCheckpointer interface {
|
IPreparedCheckpointer interface {
|
||||||
GetPendingCheckpoint() *ExtendedSequenceNumber
|
GetPendingCheckpoint() *ExtendedSequenceNumber
|
||||||
|
|
||||||
/**
|
// Checkpoint
|
||||||
|
/*
|
||||||
* This method will record a pending checkpoint.
|
* This method will record a pending checkpoint.
|
||||||
*
|
*
|
||||||
* @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently.
|
* @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently.
|
||||||
|
|
@ -56,13 +59,15 @@ type (
|
||||||
Checkpoint() error
|
Checkpoint() error
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
// IRecordProcessorCheckpointer
|
||||||
|
/*
|
||||||
* Used by RecordProcessors when they want to checkpoint their progress.
|
* Used by RecordProcessors when they want to checkpoint their progress.
|
||||||
* The Kinesis Client Library will pass an object implementing this interface to RecordProcessors, so they can
|
* The Kinesis Client Library will pass an object implementing this interface to RecordProcessors, so they can
|
||||||
* checkpoint their progress.
|
* checkpoint their progress.
|
||||||
*/
|
*/
|
||||||
IRecordProcessorCheckpointer interface {
|
IRecordProcessorCheckpointer interface {
|
||||||
/**
|
// Checkpoint
|
||||||
|
/*
|
||||||
* This method will checkpoint the progress at the provided sequenceNumber. This method is analogous to
|
* This method will checkpoint the progress at the provided sequenceNumber. This method is analogous to
|
||||||
* {@link #checkpoint()} but provides the ability to specify the sequence number at which to
|
* {@link #checkpoint()} but provides the ability to specify the sequence number at which to
|
||||||
* checkpoint.
|
* checkpoint.
|
||||||
|
|
@ -85,6 +90,7 @@ type (
|
||||||
*/
|
*/
|
||||||
Checkpoint(sequenceNumber *string) error
|
Checkpoint(sequenceNumber *string) error
|
||||||
|
|
||||||
|
// PrepareCheckpoint
|
||||||
/**
|
/**
|
||||||
* This method will record a pending checkpoint at the provided sequenceNumber.
|
* This method will record a pending checkpoint at the provided sequenceNumber.
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,8 @@
|
||||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||||
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Package interfaces
|
||||||
// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client
|
// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client
|
||||||
/*
|
/*
|
||||||
* Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
* Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||||
|
|
@ -38,7 +40,8 @@ type (
|
||||||
// The main task of using KCL is to provide implementation on IRecordProcessor interface.
|
// The main task of using KCL is to provide implementation on IRecordProcessor interface.
|
||||||
// Note: This is exactly the same interface as Amazon KCL IRecordProcessor v2
|
// Note: This is exactly the same interface as Amazon KCL IRecordProcessor v2
|
||||||
IRecordProcessor interface {
|
IRecordProcessor interface {
|
||||||
/**
|
// Initialize
|
||||||
|
/*
|
||||||
* Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance
|
* Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance
|
||||||
* (via processRecords).
|
* (via processRecords).
|
||||||
*
|
*
|
||||||
|
|
@ -46,7 +49,8 @@ type (
|
||||||
*/
|
*/
|
||||||
Initialize(initializationInput *InitializationInput)
|
Initialize(initializationInput *InitializationInput)
|
||||||
|
|
||||||
/**
|
// ProcessRecords
|
||||||
|
/*
|
||||||
* Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the
|
* Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the
|
||||||
* application.
|
* application.
|
||||||
* Upon fail over, the new instance will get records with sequence number > checkpoint position
|
* Upon fail over, the new instance will get records with sequence number > checkpoint position
|
||||||
|
|
@ -57,7 +61,8 @@ type (
|
||||||
*/
|
*/
|
||||||
ProcessRecords(processRecordsInput *ProcessRecordsInput)
|
ProcessRecords(processRecordsInput *ProcessRecordsInput)
|
||||||
|
|
||||||
/**
|
// Shutdown
|
||||||
|
/*
|
||||||
* Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this
|
* Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this
|
||||||
* RecordProcessor instance.
|
* RecordProcessor instance.
|
||||||
*
|
*
|
||||||
|
|
@ -77,7 +82,8 @@ type (
|
||||||
// for processing shard. Client can choose either creating one processor per shard or sharing them.
|
// for processing shard. Client can choose either creating one processor per shard or sharing them.
|
||||||
IRecordProcessorFactory interface {
|
IRecordProcessorFactory interface {
|
||||||
|
|
||||||
/**
|
// CreateProcessor
|
||||||
|
/*
|
||||||
* Returns a record processor to be used for processing data records for a (assigned) shard.
|
* Returns a record processor to be used for processing data records for a (assigned) shard.
|
||||||
*
|
*
|
||||||
* @return Returns a processor object.
|
* @return Returns a processor object.
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,8 @@
|
||||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||||
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Package interfaces
|
||||||
// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client
|
// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client
|
||||||
/*
|
/*
|
||||||
* Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
* Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,8 @@
|
||||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||||
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Package cloudwatch
|
||||||
// The implementation is derived from https://github.com/patrobinson/gokini
|
// The implementation is derived from https://github.com/patrobinson/gokini
|
||||||
//
|
//
|
||||||
// Copyright 2018 Patrick robinson
|
// Copyright 2018 Patrick robinson
|
||||||
|
|
@ -125,7 +127,7 @@ func (cw *MonitoringService) Shutdown() {
|
||||||
cw.logger.Infof("Cloudwatch metrics system has been shutdown.")
|
cw.logger.Infof("Cloudwatch metrics system has been shutdown.")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start daemon to flush metrics periodically
|
// eventloop start daemon to flush metrics periodically
|
||||||
func (cw *MonitoringService) eventloop() {
|
func (cw *MonitoringService) eventloop() {
|
||||||
defer cw.waitGroup.Done()
|
defer cw.waitGroup.Done()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,8 @@
|
||||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||||
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Package metrics
|
||||||
// The implementation is derived from https://github.com/patrobinson/gokini
|
// The implementation is derived from https://github.com/patrobinson/gokini
|
||||||
//
|
//
|
||||||
// Copyright 2018 Patrick robinson
|
// Copyright 2018 Patrick robinson
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,8 @@
|
||||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||||
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Package prometheus
|
||||||
// The implementation is derived from https://github.com/patrobinson/gokini
|
// The implementation is derived from https://github.com/patrobinson/gokini
|
||||||
//
|
//
|
||||||
// Copyright 2018 Patrick robinson
|
// Copyright 2018 Patrick robinson
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,8 @@
|
||||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||||
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Package partition
|
||||||
// The implementation is derived from https://github.com/patrobinson/gokini
|
// The implementation is derived from https://github.com/patrobinson/gokini
|
||||||
//
|
//
|
||||||
// Copyright 2018 Patrick robinson
|
// Copyright 2018 Patrick robinson
|
||||||
|
|
@ -25,7 +27,7 @@
|
||||||
// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||||
//
|
//
|
||||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
package worker
|
package partition
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,8 @@
|
||||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||||
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Package utils
|
||||||
package utils
|
package utils
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,8 @@
|
||||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||||
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Package utils
|
||||||
package utils
|
package utils
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|
@ -28,5 +30,6 @@ func MustNewUUID() string {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return id.String()
|
return id.String()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,11 +16,11 @@
|
||||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||||
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Package worker
|
||||||
package worker
|
package worker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
|
||||||
|
|
||||||
chk "github.com/vmware/vmware-go-kcl/clientlibrary/checkpoint"
|
chk "github.com/vmware/vmware-go-kcl/clientlibrary/checkpoint"
|
||||||
kcl "github.com/vmware/vmware-go-kcl/clientlibrary/interfaces"
|
kcl "github.com/vmware/vmware-go-kcl/clientlibrary/interfaces"
|
||||||
par "github.com/vmware/vmware-go-kcl/clientlibrary/partition"
|
par "github.com/vmware/vmware-go-kcl/clientlibrary/partition"
|
||||||
|
|
@ -28,7 +28,9 @@ import (
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
|
||||||
/* Objects of this class are prepared to checkpoint at a specific sequence number. They use an
|
// PreparedCheckpointer
|
||||||
|
/*
|
||||||
|
* Objects of this class are prepared to checkpoint at a specific sequence number. They use an
|
||||||
* IRecordProcessorCheckpointer to do the actual checkpointing, so their checkpoint is subject to the same 'didn't go
|
* IRecordProcessorCheckpointer to do the actual checkpointing, so their checkpoint is subject to the same 'didn't go
|
||||||
* backwards' validation as a normal checkpoint.
|
* backwards' validation as a normal checkpoint.
|
||||||
*/
|
*/
|
||||||
|
|
@ -37,7 +39,8 @@ type (
|
||||||
checkpointer kcl.IRecordProcessorCheckpointer
|
checkpointer kcl.IRecordProcessorCheckpointer
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
//RecordProcessorCheckpointer
|
||||||
|
/*
|
||||||
* This class is used to enable RecordProcessors to checkpoint their progress.
|
* This class is used to enable RecordProcessors to checkpoint their progress.
|
||||||
* The Amazon Kinesis Client Library will instantiate an object and provide a reference to the application
|
* The Amazon Kinesis Client Library will instantiate an object and provide a reference to the application
|
||||||
* RecordProcessor instance. Amazon Kinesis Client Library will create one instance per shard assignment.
|
* RecordProcessor instance. Amazon Kinesis Client Library will create one instance per shard assignment.
|
||||||
|
|
|
||||||
|
|
@ -16,12 +16,13 @@
|
||||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||||
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Package test
|
||||||
package test
|
package test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
kc "github.com/vmware/vmware-go-kcl/clientlibrary/interfaces"
|
kc "github.com/vmware/vmware-go-kcl/clientlibrary/interfaces"
|
||||||
)
|
)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue