Add Redshift Manifest functionality
Use a manifest file for batch importing files uploaded to S3.
This commit is contained in:
parent
8e8ee5af73
commit
106f0d66bb
10 changed files with 258 additions and 28 deletions
|
|
@ -87,7 +87,7 @@ func NewPipeline(cfg Config) *connector.Pipeline {
|
||||||
StreamName: cfg.KinesisStream,
|
StreamName: cfg.KinesisStream,
|
||||||
}
|
}
|
||||||
|
|
||||||
e := connector.RedshiftEmitter{
|
e := connector.RedshiftBasicEmtitter{
|
||||||
TableName: cfg.TableName,
|
TableName: cfg.TableName,
|
||||||
S3Bucket: cfg.S3Bucket,
|
S3Bucket: cfg.S3Bucket,
|
||||||
Format: cfg.Format,
|
Format: cfg.Format,
|
||||||
|
|
|
||||||
10
manifest.go
Normal file
10
manifest.go
Normal file
|
|
@ -0,0 +1,10 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
type Entry struct {
|
||||||
|
Url string `json:"url"`
|
||||||
|
Mandatory bool `json:"mandatory"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Manifest struct {
|
||||||
|
Entries []Entry `json:"entries"`
|
||||||
|
}
|
||||||
17
pipeline.go
17
pipeline.go
|
|
@ -1,7 +1,7 @@
|
||||||
package connector
|
package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
"github.com/sendgridlabs/go-kinesis"
|
||||||
|
|
@ -38,8 +38,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
shardInfo, err := ksis.GetShardIterator(args)
|
shardInfo, err := ksis.GetShardIterator(args)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("Error fetching shard itterator: %v", err)
|
log.Fatalf("GetShardIterator ERROR: %v\n", err)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
shardIterator := shardInfo.ShardIterator
|
shardIterator := shardInfo.ShardIterator
|
||||||
|
|
@ -50,9 +49,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
recordSet, err := ksis.GetRecords(args)
|
recordSet, err := ksis.GetRecords(args)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("GetRecords ERROR: %v\n", err)
|
log.Fatalf("GetRecords ERROR: %v\n", err)
|
||||||
time.Sleep(10 * time.Second)
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(recordSet.Records) > 0 {
|
if len(recordSet.Records) > 0 {
|
||||||
|
|
@ -60,7 +57,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
data := v.GetData()
|
data := v.GetData()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("GetData ERROR: %v\n", err)
|
log.Printf("GetData ERROR: %v\n", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -71,15 +68,13 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil {
|
} else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil {
|
||||||
fmt.Printf("NextShardIterator ERROR: %v\n", err)
|
log.Printf("NextShardIterator ERROR: %v\n", err)
|
||||||
break
|
break
|
||||||
} else {
|
} else {
|
||||||
fmt.Printf("Sleeping: %v\n", shardID)
|
time.Sleep(5 * time.Second)
|
||||||
time.Sleep(10 * time.Second)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.Buffer.ShouldFlush() {
|
if p.Buffer.ShouldFlush() {
|
||||||
fmt.Printf("Emitting to Shard: %v\n", shardID)
|
|
||||||
p.Emitter.Emit(p.Buffer, p.Transformer)
|
p.Emitter.Emit(p.Buffer, p.Transformer)
|
||||||
p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber())
|
p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber())
|
||||||
p.Buffer.Flush()
|
p.Buffer.Flush()
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@ import (
|
||||||
// RedshiftEmitter is an implementation of Emitter that buffered batches of records into Redshift one by one.
|
// RedshiftEmitter is an implementation of Emitter that buffered batches of records into Redshift one by one.
|
||||||
// It first emits records into S3 and then perfors the Redshift JSON COPY command. S3 storage of buffered
|
// It first emits records into S3 and then perfors the Redshift JSON COPY command. S3 storage of buffered
|
||||||
// data achieved using the S3Emitter. A link to jsonpaths must be provided when configuring the struct.
|
// data achieved using the S3Emitter. A link to jsonpaths must be provided when configuring the struct.
|
||||||
type RedshiftEmitter struct {
|
type RedshiftBasicEmtitter struct {
|
||||||
Delimiter string
|
Delimiter string
|
||||||
Format string
|
Format string
|
||||||
Jsonpaths string
|
Jsonpaths string
|
||||||
|
|
@ -24,7 +24,7 @@ type RedshiftEmitter struct {
|
||||||
|
|
||||||
// Emit is invoked when the buffer is full. This method leverages the S3Emitter and
|
// Emit is invoked when the buffer is full. This method leverages the S3Emitter and
|
||||||
// then issues a copy command to Redshift data store.
|
// then issues a copy command to Redshift data store.
|
||||||
func (e RedshiftEmitter) Emit(b Buffer, t Transformer) {
|
func (e RedshiftBasicEmtitter) Emit(b Buffer, t Transformer) {
|
||||||
s3Emitter := S3Emitter{S3Bucket: e.S3Bucket}
|
s3Emitter := S3Emitter{S3Bucket: e.S3Bucket}
|
||||||
s3Emitter.Emit(b, t)
|
s3Emitter.Emit(b, t)
|
||||||
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
|
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
|
||||||
|
|
@ -45,13 +45,12 @@ func (e RedshiftEmitter) Emit(b Buffer, t Transformer) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates the SQL copy statement issued to Redshift cluster.
|
// Creates the SQL copy statement issued to Redshift cluster.
|
||||||
func (e RedshiftEmitter) copyStatement(s3File string) string {
|
func (e RedshiftBasicEmtitter) copyStatement(s3File string) string {
|
||||||
var b bytes.Buffer
|
b := new(bytes.Buffer)
|
||||||
b.WriteString(fmt.Sprintf("COPY %v ", e.TableName))
|
b.WriteString(fmt.Sprintf("COPY %v ", e.TableName))
|
||||||
b.WriteString(fmt.Sprintf("FROM 's3://%v%v' ", e.S3Bucket, s3File))
|
b.WriteString(fmt.Sprintf("FROM 's3://%v%v' ", e.S3Bucket, s3File))
|
||||||
b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", os.Getenv("AWS_ACCESS_KEY")))
|
b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", os.Getenv("AWS_ACCESS_KEY")))
|
||||||
b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", os.Getenv("AWS_SECRET_KEY")))
|
b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", os.Getenv("AWS_SECRET_KEY")))
|
||||||
|
|
||||||
switch e.Format {
|
switch e.Format {
|
||||||
case "json":
|
case "json":
|
||||||
b.WriteString(fmt.Sprintf("json 'auto'"))
|
b.WriteString(fmt.Sprintf("json 'auto'"))
|
||||||
|
|
@ -60,7 +59,6 @@ func (e RedshiftEmitter) copyStatement(s3File string) string {
|
||||||
default:
|
default:
|
||||||
b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter))
|
b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter))
|
||||||
}
|
}
|
||||||
|
|
||||||
b.WriteString(";")
|
b.WriteString(";")
|
||||||
return b.String()
|
return b.String()
|
||||||
}
|
}
|
||||||
|
|
@ -5,7 +5,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestCopyStatement(t *testing.T) {
|
func TestCopyStatement(t *testing.T) {
|
||||||
e := RedshiftEmitter{
|
e := RedshiftBasicEmtitter{
|
||||||
Delimiter: ",",
|
Delimiter: ",",
|
||||||
S3Bucket: "test_bucket",
|
S3Bucket: "test_bucket",
|
||||||
TableName: "test_table",
|
TableName: "test_table",
|
||||||
149
redshift_manifest_emitter.go
Normal file
149
redshift_manifest_emitter.go
Normal file
|
|
@ -0,0 +1,149 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/crowdmob/goamz/aws"
|
||||||
|
"github.com/crowdmob/goamz/s3"
|
||||||
|
_ "github.com/lib/pq"
|
||||||
|
)
|
||||||
|
|
||||||
|
// An implementation of Emitter that reads S3 file paths from a stream, creates a
|
||||||
|
// manifest file and batch copies them into Redshift.
|
||||||
|
type RedshiftManifestEmitter struct {
|
||||||
|
AccessKey string
|
||||||
|
CopyMandatory bool
|
||||||
|
DataTable string
|
||||||
|
Delimiter string
|
||||||
|
FileTable string
|
||||||
|
Format string
|
||||||
|
Jsonpaths string
|
||||||
|
S3Bucket string
|
||||||
|
SecretKey string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Invoked when the buffer is full.
|
||||||
|
// Emits a Manifest file to S3 and then performs the Redshift copy command.
|
||||||
|
func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) {
|
||||||
|
db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL"))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Aggregate file paths as strings
|
||||||
|
files := []string{}
|
||||||
|
for _, r := range b.Records() {
|
||||||
|
f := t.FromRecord(r)
|
||||||
|
files = append(files, string(f))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Manifest file name
|
||||||
|
date := time.Now().UTC().Format("2006/01/02")
|
||||||
|
manifestFileName := e.getManifestName(date, files)
|
||||||
|
|
||||||
|
// Issue manifest COPY to Redshift
|
||||||
|
e.writeManifestToS3(files, manifestFileName)
|
||||||
|
c := e.copyStmt(manifestFileName)
|
||||||
|
_, err = db.Exec(c)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert file paths into File Names table
|
||||||
|
i := e.fileInsertStmt(files)
|
||||||
|
_, err = db.Exec(i)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("[%v] copied to Redshift", manifestFileName)
|
||||||
|
db.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creates the INSERT statement for the file names database table.
|
||||||
|
func (e RedshiftManifestEmitter) fileInsertStmt(fileNames []string) string {
|
||||||
|
i := new(bytes.Buffer)
|
||||||
|
i.WriteString("('")
|
||||||
|
i.WriteString(strings.Join(fileNames, "'),('"))
|
||||||
|
i.WriteString("')")
|
||||||
|
|
||||||
|
b := new(bytes.Buffer)
|
||||||
|
b.WriteString("INSERT INTO ")
|
||||||
|
b.WriteString(e.FileTable)
|
||||||
|
b.WriteString(" VALUES ")
|
||||||
|
b.WriteString(i.String())
|
||||||
|
b.WriteString(";")
|
||||||
|
|
||||||
|
return b.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creates the COPY statment for Redshift insertion.
|
||||||
|
func (e RedshiftManifestEmitter) copyStmt(filePath string) string {
|
||||||
|
b := new(bytes.Buffer)
|
||||||
|
c := fmt.Sprintf(
|
||||||
|
"CREDENTIALS 'aws_access_key_id=%s;aws_secret_access_key=%s' ",
|
||||||
|
os.Getenv("AWS_ACCESS_KEY"),
|
||||||
|
os.Getenv("AWS_SECRET_KEY"),
|
||||||
|
)
|
||||||
|
b.WriteString("COPY " + e.DataTable + " ")
|
||||||
|
b.WriteString("FROM 's3://" + e.S3Bucket + "/" + filePath + "' ")
|
||||||
|
b.WriteString(c)
|
||||||
|
switch e.Format {
|
||||||
|
case "json":
|
||||||
|
b.WriteString(fmt.Sprintf("json 'auto' "))
|
||||||
|
case "jsonpaths":
|
||||||
|
b.WriteString(fmt.Sprintf("json '%s' ", e.Jsonpaths))
|
||||||
|
default:
|
||||||
|
b.WriteString(fmt.Sprintf("DELIMITER '%s' ", e.Delimiter))
|
||||||
|
}
|
||||||
|
b.WriteString("MANIFEST")
|
||||||
|
b.WriteString(";")
|
||||||
|
return b.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put the Manifest file contents to Redshift
|
||||||
|
func (e RedshiftManifestEmitter) writeManifestToS3(files []string, manifestFileName string) {
|
||||||
|
auth, _ := aws.EnvAuth()
|
||||||
|
s3Con := s3.New(auth, aws.USEast)
|
||||||
|
bucket := s3Con.Bucket(e.S3Bucket)
|
||||||
|
content := e.generateManifestFile(files)
|
||||||
|
err := bucket.Put(manifestFileName, content, "text/plain", s3.Private, s3.Options{})
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Error occured while uploding to S3: %v\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Manifest file name based on First and Last sequence numbers
|
||||||
|
func (e RedshiftManifestEmitter) getManifestName(date string, files []string) string {
|
||||||
|
firstSeq := e.getSeq(files[0])
|
||||||
|
lastSeq := e.getSeq(files[len(files)-1])
|
||||||
|
return fmt.Sprintf("%v/_manifest/%v_%v", date, firstSeq, lastSeq)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trims the date and suffix information from string
|
||||||
|
func (e RedshiftManifestEmitter) getSeq(file string) string {
|
||||||
|
matches := strings.Split(file, "/")
|
||||||
|
return matches[len(matches)-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Manifest file contents in JSON structure
|
||||||
|
func (e RedshiftManifestEmitter) generateManifestFile(files []string) []byte {
|
||||||
|
m := &Manifest{}
|
||||||
|
for _, r := range files {
|
||||||
|
var url = fmt.Sprintf("s3://%s/%s", e.S3Bucket, r)
|
||||||
|
var entry = Entry{Url: url, Mandatory: e.CopyMandatory}
|
||||||
|
m.Entries = append(m.Entries, entry)
|
||||||
|
}
|
||||||
|
b, _ := json.Marshal(m)
|
||||||
|
return b
|
||||||
|
}
|
||||||
39
redshift_manifest_emitter_test.go
Normal file
39
redshift_manifest_emitter_test.go
Normal file
|
|
@ -0,0 +1,39 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestInsertStmt(t *testing.T) {
|
||||||
|
e := RedshiftManifestEmitter{FileTable: "funz"}
|
||||||
|
s := []string{"file1", "file2"}
|
||||||
|
|
||||||
|
expected := "INSERT INTO funz VALUES ('file1'),('file2');"
|
||||||
|
result := e.fileInsertStmt(s)
|
||||||
|
|
||||||
|
if result != expected {
|
||||||
|
t.Errorf("fileInsertStmt() = %v want %v", result, expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestManifestName(t *testing.T) {
|
||||||
|
e := RedshiftManifestEmitter{}
|
||||||
|
s := []string{"2014/01/01/a-b", "2014/01/01/c-d"}
|
||||||
|
|
||||||
|
expected := "2000/01/01/_manifest/a-b_c-d"
|
||||||
|
result := e.getManifestName("2000/01/01", s)
|
||||||
|
|
||||||
|
if result != expected {
|
||||||
|
t.Errorf("getManifestName() = %v want %v", result, expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGenerateManifestFile(t *testing.T) {
|
||||||
|
e := RedshiftManifestEmitter{S3Bucket: "bucket_name", CopyMandatory: true}
|
||||||
|
s := []string{"file1"}
|
||||||
|
|
||||||
|
expected := "{\"entries\":[{\"url\":\"s3://bucket_name/file1\",\"mandatory\":true}]}"
|
||||||
|
result := string(e.generateManifestFile(s))
|
||||||
|
|
||||||
|
if result != expected {
|
||||||
|
t.Errorf("generateManifestFile() = %v want %v", result, expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -3,6 +3,7 @@ package connector
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/crowdmob/goamz/aws"
|
"github.com/crowdmob/goamz/aws"
|
||||||
|
|
@ -22,8 +23,8 @@ type S3Emitter struct {
|
||||||
// S3FileName generates a file name based on the First and Last sequence numbers from the buffer. The current
|
// S3FileName generates a file name based on the First and Last sequence numbers from the buffer. The current
|
||||||
// UTC date (YYYY-MM-DD) is base of the path to logically group days of batches.
|
// UTC date (YYYY-MM-DD) is base of the path to logically group days of batches.
|
||||||
func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string {
|
func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string {
|
||||||
date := time.Now().UTC().Format("2006-01-02")
|
date := time.Now().UTC().Format("2006/01/02")
|
||||||
return fmt.Sprintf("/%v/%v-%v.txt", date, firstSeq, lastSeq)
|
return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Emit is invoked when the buffer is full. This method emits the set of filtered records.
|
// Emit is invoked when the buffer is full. This method emits the set of filtered records.
|
||||||
|
|
@ -43,8 +44,8 @@ func (e S3Emitter) Emit(b Buffer, t Transformer) {
|
||||||
err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{})
|
err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("Error occured while uploding to S3: %v\n", err)
|
log.Printf("S3Put ERROR: %v\n", err)
|
||||||
} else {
|
} else {
|
||||||
fmt.Printf("Emitted %v records to S3 in s3://%v%v\n", b.NumRecordsInBuffer(), e.S3Bucket, s3File)
|
log.Printf("[%v] records emitted to [%s]\n", b.NumRecordsInBuffer(), e.S3Bucket)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,12 +7,13 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestS3FileName(t *testing.T) {
|
func TestS3FileName(t *testing.T) {
|
||||||
d := time.Now().UTC().Format("2006-01-02")
|
d := time.Now().UTC().Format("2006/01/02")
|
||||||
n := fmt.Sprintf("/%v/a-b.txt", d)
|
|
||||||
e := S3Emitter{}
|
e := S3Emitter{}
|
||||||
f := e.S3FileName("a", "b")
|
|
||||||
|
|
||||||
if f != n {
|
expected := fmt.Sprintf("%v/a-b", d)
|
||||||
t.Errorf("S3FileName() = want %v", f, n)
|
result := e.S3FileName("a", "b")
|
||||||
|
|
||||||
|
if result != expected {
|
||||||
|
t.Errorf("S3FileName() = %v want %v", result, expected)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
37
s3_manifest_emitter.go
Normal file
37
s3_manifest_emitter.go
Normal file
|
|
@ -0,0 +1,37 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"github.com/sendgridlabs/go-kinesis"
|
||||||
|
)
|
||||||
|
|
||||||
|
// An implementation of Emitter that puts event data on S3 file, and then puts the
|
||||||
|
// S3 file path onto the output stream for processing by manifest application.
|
||||||
|
type S3ManifestEmitter struct {
|
||||||
|
OutputStream string
|
||||||
|
S3Bucket string
|
||||||
|
Ksis *kinesis.Kinesis
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) {
|
||||||
|
|
||||||
|
// Emit buffer contents to S3 Bucket
|
||||||
|
s3Emitter := S3Emitter{S3Bucket: e.S3Bucket}
|
||||||
|
s3Emitter.Emit(b, t)
|
||||||
|
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
|
||||||
|
|
||||||
|
// Emit the file path to Kinesis Output stream
|
||||||
|
args := kinesis.NewArgs()
|
||||||
|
args.Add("StreamName", e.OutputStream)
|
||||||
|
args.Add("PartitionKey", s3File)
|
||||||
|
args.AddData([]byte(s3File))
|
||||||
|
|
||||||
|
_, err := e.Ksis.PutRecord(args)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("PutRecord ERROR: %v", err)
|
||||||
|
} else {
|
||||||
|
log.Printf("[%s] emitted to [%s]", b.FirstSequenceNumber(), e.OutputStream)
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue