diff --git a/kinesis.go b/kinesis.go new file mode 100644 index 0000000..77f0644 --- /dev/null +++ b/kinesis.go @@ -0,0 +1,55 @@ +package connector + +import ( + "fmt" + "time" + + "github.com/sendgridlabs/go-kinesis" +) + +// CreateStream creates a new Kinesis stream (uses existing stream if exists) and +// waits for it to become available. +func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) { + if !StreamExists(k, streamName) { + err := k.CreateStream(streamName, shardCount) + + if err != nil { + fmt.Printf("CreateStream ERROR: %v\n", err) + return + } + } + + resp := &kinesis.DescribeStreamResp{} + timeout := make(chan bool, 30) + + for { + args := kinesis.NewArgs() + args.Add("StreamName", streamName) + resp, _ = k.DescribeStream(args) + streamStatus := resp.StreamDescription.StreamStatus + fmt.Printf("Stream [%v] is %v\n", streamName, streamStatus) + + if streamStatus != "ACTIVE" { + time.Sleep(4 * time.Second) + timeout <- true + } else { + break + } + } +} + +// StreamExists checks if a Kinesis stream exists. +func StreamExists(k *kinesis.Kinesis, streamName string) bool { + args := kinesis.NewArgs() + resp, err := k.ListStreams(args) + if err != nil { + fmt.Printf("ListStream ERROR: %v\n", err) + return false + } + for _, s := range resp.StreamNames { + if s == streamName { + return true + } + } + return false +} diff --git a/utils.go b/utils.go deleted file mode 100644 index 870d7a7..0000000 --- a/utils.go +++ /dev/null @@ -1,137 +0,0 @@ -package connector - -import ( - "bufio" - "fmt" - "log" - "os" - "reflect" - "regexp" - "strconv" - "strings" - "time" - "unicode" - - "github.com/sendgridlabs/go-kinesis" -) - -func readLines(path string) ([]string, error) { - file, err := os.Open(path) - - if err != nil { - return nil, err - } - - defer file.Close() - var lines []string - - scanner := bufio.NewScanner(file) - - for scanner.Scan() { - lines = append(lines, scanner.Text()) - } - - return lines, scanner.Err() -} - -var ( - assignRegex = regexp.MustCompile(`^([^=]+)=(.*)$`) -) - -func upcaseInitial(str string) string { - for i, v := range str { - return string(unicode.ToUpper(v)) + str[i+1:] - } - - return "" -} - -// LoadConfig opens the file path and loads config values into the sturct. -func LoadConfig(config interface{}, filename string) error { - lines, err := readLines(filename) - - if err != nil { - log.Fatalf("Config Load ERROR: %s\n", err) - } - - mutable := reflect.ValueOf(config).Elem() - - for _, line := range lines { - line = strings.TrimSpace(line) - - if len(line) == 0 || line[0] == ';' || line[0] == '#' { - continue - } - - if groups := assignRegex.FindStringSubmatch(line); groups != nil { - key, val := groups[1], groups[2] - key, val = strings.TrimSpace(key), strings.TrimSpace(val) - key = upcaseInitial(key) - field := mutable.FieldByName(key) - - if !field.IsValid() { - log.Fatalf("Config ERROR: Field %s not found\n", key) - } - - switch field.Type().Name() { - case "int": - val, _ := strconv.ParseInt(val, 0, 64) - mutable.FieldByName(key).SetInt(val) - case "bool": - val, _ := strconv.ParseBool(val) - mutable.FieldByName(key).SetBool(val) - default: - mutable.FieldByName(key).SetString(val) - } - } - } - - return nil -} - -// CreateStream creates a new Kinesis stream (uses existing stream if exists) and -// waits for it to become available. -func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) { - if !StreamExists(k, streamName) { - err := k.CreateStream(streamName, shardCount) - - if err != nil { - fmt.Printf("CreateStream ERROR: %v\n", err) - return - } - } - - resp := &kinesis.DescribeStreamResp{} - timeout := make(chan bool, 30) - - for { - args := kinesis.NewArgs() - args.Add("StreamName", streamName) - resp, _ = k.DescribeStream(args) - streamStatus := resp.StreamDescription.StreamStatus - fmt.Printf("Stream [%v] is %v\n", streamName, streamStatus) - - if streamStatus != "ACTIVE" { - time.Sleep(4 * time.Second) - timeout <- true - } else { - break - } - } -} - -// StreamExists checks if a Kinesis stream exists. -func StreamExists(k *kinesis.Kinesis, streamName string) bool { - args := kinesis.NewArgs() - resp, err := k.ListStreams(args) - if err != nil { - fmt.Printf("ListStream ERROR: %v\n", err) - return false - } - for _, s := range resp.StreamNames { - if s == streamName { - return true - } - } - return false -}