From ae983473b34cd9c92585655177950f80663e5f4e Mon Sep 17 00:00:00 2001 From: maddalax Date: Mon, 30 Sep 2024 17:31:09 -0500 Subject: [PATCH] db progress --- examples/chat/chat/broadcast.go | 96 ++++++++++--- examples/chat/chat/component.go | 12 ++ examples/chat/internal/db/db.go | 31 +++++ examples/chat/internal/db/models.go | 33 +++++ examples/chat/internal/db/provider.go | 25 ++++ examples/chat/internal/db/queries.sql | 44 ++++++ examples/chat/internal/db/queries.sql.go | 170 +++++++++++++++++++++++ examples/chat/internal/db/schema.sql | 31 +++++ examples/chat/main.go | 5 +- examples/chat/sqlc.yaml | 9 ++ examples/chat/ws/manager.go | 58 ++++++-- framework/h/renderer.go | 5 +- 12 files changed, 484 insertions(+), 35 deletions(-) create mode 100644 examples/chat/chat/component.go create mode 100644 examples/chat/internal/db/db.go create mode 100644 examples/chat/internal/db/models.go create mode 100644 examples/chat/internal/db/provider.go create mode 100644 examples/chat/internal/db/queries.sql create mode 100644 examples/chat/internal/db/queries.sql.go create mode 100644 examples/chat/internal/db/schema.sql create mode 100644 examples/chat/sqlc.yaml diff --git a/examples/chat/chat/broadcast.go b/examples/chat/chat/broadcast.go index 7ddb247..7e61aff 100644 --- a/examples/chat/chat/broadcast.go +++ b/examples/chat/chat/broadcast.go @@ -1,40 +1,92 @@ package chat import ( + "chat/internal/db" "chat/ws" + "context" "fmt" + "github.com/google/uuid" "github.com/maddalax/htmgo/framework/h" "github.com/maddalax/htmgo/framework/service" ) -func StartListener(loader *service.Locator) { - manager := service.Get[ws.SocketManager](loader) +type Manager struct { + socketManager *ws.SocketManager + queries *db.Queries +} - c := make(chan ws.MessageEvent) - manager.Listen(c) +func NewManager(loader *service.Locator) *Manager { + return &Manager{ + socketManager: service.Get[ws.SocketManager](loader), + queries: service.Get[db.Queries](loader), + } +} + +func (m *Manager) StartListener() { + c := make(chan ws.SocketEvent) + m.socketManager.Listen(c) for { select { case event := <-c: - fmt.Printf("Received message from %s: %v\n", event.Id, event.Message) - message := event.Message["message"].(string) - if message == "" { - continue + switch event.Type { + case ws.ConnectedEvent: + fmt.Printf("User %s connected\n", event.Id) + m.backFill(event.Id) + case ws.DisconnectedEvent: + fmt.Printf("User %s disconnected\n", event.Id) + case ws.MessageEvent: + m.onMessage(event.Id, event.Payload) } - - messageEle := h.Div( - h.Attribute("hx-swap-oob", "beforeend"), - h.Class("flex flex-col gap-2 w-full"), - h.Id("messages"), - h.Pf(message), - ) - - manager.BroadcastText( - h.Render( - h.Fragment( - messageEle, - )), - ) } } } + +func (m *Manager) backFill(socketId string) { + messages, _ := m.queries.GetLastMessages(context.Background(), db.GetLastMessagesParams{ + ChatRoomID: "4ccc3f90a27c9375c98477571034b2e1", + Limit: 50, + }) + for _, message := range messages { + m.socketManager.SendText(socketId, + h.Render(MessageRow(message.Message)), + ) + } +} + +func (m *Manager) onMessage(socketId string, payload map[string]any) { + fmt.Printf("Received message from %s: %v\n", socketId, payload) + message := payload["message"].(string) + + if message == "" { + return + } + + ctx := context.Background() + + user, err := m.queries.CreateUser(ctx, uuid.NewString()) + + if err != nil { + fmt.Printf("Error creating user: %v\n", err) + return + } + //chat, _ := m.queries.CreateChatRoom(ctx, "General") + + err = m.queries.InsertMessage( + context.Background(), + db.InsertMessageParams{ + ChatRoomID: "4ccc3f90a27c9375c98477571034b2e1", + UserID: user.ID, + Message: message, + }, + ) + + if err != nil { + fmt.Printf("Error inserting message: %v\n", err) + return + } + + m.socketManager.BroadcastText( + h.Render(MessageRow(message)), + ) +} diff --git a/examples/chat/chat/component.go b/examples/chat/chat/component.go new file mode 100644 index 0000000..2a8c066 --- /dev/null +++ b/examples/chat/chat/component.go @@ -0,0 +1,12 @@ +package chat + +import "github.com/maddalax/htmgo/framework/h" + +func MessageRow(text string) *h.Element { + return h.Div( + h.Attribute("hx-swap-oob", "beforeend"), + h.Class("flex flex-col gap-2 w-full"), + h.Id("messages"), + h.Pf(text), + ) +} diff --git a/examples/chat/internal/db/db.go b/examples/chat/internal/db/db.go new file mode 100644 index 0000000..41b7a34 --- /dev/null +++ b/examples/chat/internal/db/db.go @@ -0,0 +1,31 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 + +package db + +import ( + "context" + "database/sql" +) + +type DBTX interface { + ExecContext(context.Context, string, ...interface{}) (sql.Result, error) + PrepareContext(context.Context, string) (*sql.Stmt, error) + QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error) + QueryRowContext(context.Context, string, ...interface{}) *sql.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx *sql.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/examples/chat/internal/db/models.go b/examples/chat/internal/db/models.go new file mode 100644 index 0000000..71aadfb --- /dev/null +++ b/examples/chat/internal/db/models.go @@ -0,0 +1,33 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 + +package db + +import ( + "database/sql" +) + +type ChatRoom struct { + ID string + Name string + LastMessageSentAt sql.NullString + CreatedAt string + UpdatedAt string +} + +type Message struct { + ID int64 + ChatRoomID string + UserID int64 + Message string + CreatedAt string + UpdatedAt string +} + +type User struct { + ID int64 + Name string + CreatedAt string + UpdatedAt string +} diff --git a/examples/chat/internal/db/provider.go b/examples/chat/internal/db/provider.go new file mode 100644 index 0000000..9078f36 --- /dev/null +++ b/examples/chat/internal/db/provider.go @@ -0,0 +1,25 @@ +package db + +import ( + "context" + "database/sql" + _ "embed" + _ "github.com/mattn/go-sqlite3" +) + +//go:embed schema.sql +var ddl string + +func Provide() *Queries { + db, err := sql.Open("sqlite3", "file:chat.db?cache=shared&_fk=1") + + if err != nil { + panic(err) + } + + if _, err := db.ExecContext(context.Background(), ddl); err != nil { + panic(err) + } + + return New(db) +} diff --git a/examples/chat/internal/db/queries.sql b/examples/chat/internal/db/queries.sql new file mode 100644 index 0000000..597223e --- /dev/null +++ b/examples/chat/internal/db/queries.sql @@ -0,0 +1,44 @@ +-- name: CreateChatRoom :one +INSERT INTO chat_rooms (name, created_at, updated_at) +VALUES (?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) +RETURNING id, name, created_at, updated_at, last_message_sent_at; + +-- name: InsertMessage :exec +INSERT INTO messages (chat_room_id, user_id, message, created_at, updated_at) +VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) +RETURNING id, chat_room_id, user_id, message, created_at, updated_at; + +-- name: UpdateChatRoomLastMessageSentAt :exec +UPDATE chat_rooms +SET last_message_sent_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP +WHERE id = ?; + +-- name: GetChatRoom :one +SELECT + id, + name, + last_message_sent_at, + created_at, + updated_at +FROM chat_rooms +WHERE chat_rooms.id = ?; + +-- name: CreateUser :one +INSERT INTO users (name, created_at, updated_at) +VALUES (?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) +RETURNING id, name, created_at, updated_at; + +-- name: GetLastMessages :many +SELECT + messages.id, + messages.chat_room_id, + messages.user_id, + users.name AS user_name, + messages.message, + messages.created_at, + messages.updated_at +FROM messages + JOIN users ON messages.user_id = users.id +WHERE messages.chat_room_id = ? +ORDER BY messages.created_at +LIMIT ?; diff --git a/examples/chat/internal/db/queries.sql.go b/examples/chat/internal/db/queries.sql.go new file mode 100644 index 0000000..8d7cc09 --- /dev/null +++ b/examples/chat/internal/db/queries.sql.go @@ -0,0 +1,170 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 +// source: queries.sql + +package db + +import ( + "context" + "database/sql" +) + +const createChatRoom = `-- name: CreateChatRoom :one +INSERT INTO chat_rooms (name, created_at, updated_at) +VALUES (?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) +RETURNING id, name, created_at, updated_at, last_message_sent_at +` + +type CreateChatRoomRow struct { + ID string + Name string + CreatedAt string + UpdatedAt string + LastMessageSentAt sql.NullString +} + +func (q *Queries) CreateChatRoom(ctx context.Context, name string) (CreateChatRoomRow, error) { + row := q.db.QueryRowContext(ctx, createChatRoom, name) + var i CreateChatRoomRow + err := row.Scan( + &i.ID, + &i.Name, + &i.CreatedAt, + &i.UpdatedAt, + &i.LastMessageSentAt, + ) + return i, err +} + +const createUser = `-- name: CreateUser :one +INSERT INTO users (name, created_at, updated_at) +VALUES (?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) +RETURNING id, name, created_at, updated_at +` + +func (q *Queries) CreateUser(ctx context.Context, name string) (User, error) { + row := q.db.QueryRowContext(ctx, createUser, name) + var i User + err := row.Scan( + &i.ID, + &i.Name, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + +const getChatRoom = `-- name: GetChatRoom :one +SELECT + id, + name, + last_message_sent_at, + created_at, + updated_at +FROM chat_rooms +WHERE chat_rooms.id = ? +` + +func (q *Queries) GetChatRoom(ctx context.Context, id string) (ChatRoom, error) { + row := q.db.QueryRowContext(ctx, getChatRoom, id) + var i ChatRoom + err := row.Scan( + &i.ID, + &i.Name, + &i.LastMessageSentAt, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + +const getLastMessages = `-- name: GetLastMessages :many +SELECT + messages.id, + messages.chat_room_id, + messages.user_id, + users.name AS user_name, + messages.message, + messages.created_at, + messages.updated_at +FROM messages + JOIN users ON messages.user_id = users.id +WHERE messages.chat_room_id = ? +ORDER BY messages.created_at +LIMIT ? +` + +type GetLastMessagesParams struct { + ChatRoomID string + Limit int64 +} + +type GetLastMessagesRow struct { + ID int64 + ChatRoomID string + UserID int64 + UserName string + Message string + CreatedAt string + UpdatedAt string +} + +func (q *Queries) GetLastMessages(ctx context.Context, arg GetLastMessagesParams) ([]GetLastMessagesRow, error) { + rows, err := q.db.QueryContext(ctx, getLastMessages, arg.ChatRoomID, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetLastMessagesRow + for rows.Next() { + var i GetLastMessagesRow + if err := rows.Scan( + &i.ID, + &i.ChatRoomID, + &i.UserID, + &i.UserName, + &i.Message, + &i.CreatedAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const insertMessage = `-- name: InsertMessage :exec +INSERT INTO messages (chat_room_id, user_id, message, created_at, updated_at) +VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) +RETURNING id, chat_room_id, user_id, message, created_at, updated_at +` + +type InsertMessageParams struct { + ChatRoomID string + UserID int64 + Message string +} + +func (q *Queries) InsertMessage(ctx context.Context, arg InsertMessageParams) error { + _, err := q.db.ExecContext(ctx, insertMessage, arg.ChatRoomID, arg.UserID, arg.Message) + return err +} + +const updateChatRoomLastMessageSentAt = `-- name: UpdateChatRoomLastMessageSentAt :exec +UPDATE chat_rooms +SET last_message_sent_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP +WHERE id = ? +` + +func (q *Queries) UpdateChatRoomLastMessageSentAt(ctx context.Context, id string) error { + _, err := q.db.ExecContext(ctx, updateChatRoomLastMessageSentAt, id) + return err +} diff --git a/examples/chat/internal/db/schema.sql b/examples/chat/internal/db/schema.sql new file mode 100644 index 0000000..01c0f80 --- /dev/null +++ b/examples/chat/internal/db/schema.sql @@ -0,0 +1,31 @@ +CREATE TABLE IF NOT EXISTS users +( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL UNIQUE, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP +) STRICT; + +CREATE TABLE IF NOT EXISTS chat_rooms +( + id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))), -- Generates a UUID + name TEXT NOT NULL, + last_message_sent_at TEXT, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP +) STRICT; + +CREATE TABLE IF NOT EXISTS messages +( + id INTEGER PRIMARY KEY AUTOINCREMENT, + chat_room_id TEXT NOT NULL, + user_id INTEGER NOT NULL, + message TEXT NOT NULL, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (chat_room_id) REFERENCES chat_rooms (id) ON DELETE CASCADE, + FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE +) STRICT; + +CREATE INDEX IF NOT EXISTS idx_messages_chat_room_id ON messages (chat_room_id); +CREATE INDEX IF NOT EXISTS idx_messages_user_id ON messages (user_id); diff --git a/examples/chat/main.go b/examples/chat/main.go index 2b5aed3..dc16ca7 100644 --- a/examples/chat/main.go +++ b/examples/chat/main.go @@ -3,6 +3,7 @@ package main import ( "chat/__htmgo" "chat/chat" + "chat/internal/db" "chat/ws" "embed" "github.com/maddalax/htmgo/framework/h" @@ -17,11 +18,13 @@ var StaticAssets embed.FS func main() { locator := service.NewLocator() + service.Set[db.Queries](locator, service.Singleton, db.Provide) service.Set[ws.SocketManager](locator, service.Singleton, func() *ws.SocketManager { return ws.NewSocketManager() }) - go chat.StartListener(locator) + chatManager := chat.NewManager(locator) + go chatManager.StartListener() h.Start(h.AppOpts{ ServiceLocator: locator, diff --git a/examples/chat/sqlc.yaml b/examples/chat/sqlc.yaml new file mode 100644 index 0000000..30c0518 --- /dev/null +++ b/examples/chat/sqlc.yaml @@ -0,0 +1,9 @@ +version: "2" +sql: + - schema: "internal/db/schema.sql" + queries: "internal/db/queries.sql" + engine: "sqlite" + gen: + go: + package: "db" + out: "internal/db" diff --git a/examples/chat/ws/manager.go b/examples/chat/ws/manager.go index ae52bd8..1cd60b8 100644 --- a/examples/chat/ws/manager.go +++ b/examples/chat/ws/manager.go @@ -7,14 +7,23 @@ import ( "github.com/puzpuzpuz/xsync/v3" ) -type MessageEvent struct { +type EventType string + +const ( + ConnectedEvent EventType = "connected" + DisconnectedEvent EventType = "disconnected" + MessageEvent EventType = "message" +) + +type SocketEvent struct { Id string - Message map[string]any + Type EventType + Payload map[string]any } type SocketManager struct { sockets *xsync.MapOf[string, *websocket.Conn] - listeners []chan MessageEvent + listeners []chan SocketEvent } func NewSocketManager() *SocketManager { @@ -23,29 +32,49 @@ func NewSocketManager() *SocketManager { } } -func (manager *SocketManager) Listen(listener chan MessageEvent) { +func (manager *SocketManager) Listen(listener chan SocketEvent) { if manager.listeners == nil { - manager.listeners = make([]chan MessageEvent, 0) + manager.listeners = make([]chan SocketEvent, 0) } manager.listeners = append(manager.listeners, listener) } -func (manager *SocketManager) OnMessage(id string, message map[string]any) { +func (manager *SocketManager) dispatch(event SocketEvent) { for _, listener := range manager.listeners { - listener <- MessageEvent{ - Id: id, - Message: message, - } + listener <- event } } +func (manager *SocketManager) OnMessage(id string, message map[string]any) { + manager.dispatch(SocketEvent{ + Id: id, + Type: MessageEvent, + Payload: message, + }) +} + func (manager *SocketManager) Add(id string, conn *websocket.Conn) { manager.sockets.Store(id, conn) + manager.dispatch(SocketEvent{ + Id: id, + Type: ConnectedEvent, + Payload: map[string]any{}, + }) +} + +func (manager *SocketManager) OnClose(id string) { + manager.dispatch(SocketEvent{ + Id: id, + Type: DisconnectedEvent, + Payload: map[string]any{}, + }) + manager.sockets.Delete(id) } func (manager *SocketManager) CloseWithError(id string, message string) { conn := manager.Get(id) if conn != nil { + defer manager.OnClose(id) conn.Close(websocket.StatusInternalError, message) } } @@ -53,9 +82,9 @@ func (manager *SocketManager) CloseWithError(id string, message string) { func (manager *SocketManager) Disconnect(id string) { conn := manager.Get(id) if conn != nil { + defer manager.OnClose(id) _ = conn.CloseNow() } - manager.sockets.Delete(id) } func (manager *SocketManager) Get(id string) *websocket.Conn { @@ -78,3 +107,10 @@ func (manager *SocketManager) BroadcastText(message string) { fmt.Printf("Broadcasting message: \n%s\n", message) manager.Broadcast([]byte(message), websocket.MessageText) } + +func (manager *SocketManager) SendText(id string, message string) { + conn := manager.Get(id) + if conn != nil { + _ = conn.Write(context.Background(), websocket.MessageText, []byte(message)) + } +} diff --git a/framework/h/renderer.go b/framework/h/renderer.go index e31240f..f0fd913 100644 --- a/framework/h/renderer.go +++ b/framework/h/renderer.go @@ -51,7 +51,9 @@ func (ctx *RenderContext) AddScript(funcName string, body string) { } func (node *Element) Render(context *RenderContext) { - // some elements may not have a tag, such as a Fragment + if node == nil { + return + } if node.tag == CachedNodeTag { meta := node.meta.(*CachedNode) @@ -65,6 +67,7 @@ func (node *Element) Render(context *RenderContext) { return } + // some elements may not have a tag, such as a Fragment if node.tag != "" { context.builder.WriteString("<") context.builder.WriteString(node.tag)