db progress

This commit is contained in:
maddalax 2024-09-30 17:31:09 -05:00
parent 8cdc625133
commit ae983473b3
12 changed files with 484 additions and 35 deletions

View file

@ -1,40 +1,92 @@
package chat
import (
"chat/internal/db"
"chat/ws"
"context"
"fmt"
"github.com/google/uuid"
"github.com/maddalax/htmgo/framework/h"
"github.com/maddalax/htmgo/framework/service"
)
func StartListener(loader *service.Locator) {
manager := service.Get[ws.SocketManager](loader)
type Manager struct {
socketManager *ws.SocketManager
queries *db.Queries
}
c := make(chan ws.MessageEvent)
manager.Listen(c)
func NewManager(loader *service.Locator) *Manager {
return &Manager{
socketManager: service.Get[ws.SocketManager](loader),
queries: service.Get[db.Queries](loader),
}
}
func (m *Manager) StartListener() {
c := make(chan ws.SocketEvent)
m.socketManager.Listen(c)
for {
select {
case event := <-c:
fmt.Printf("Received message from %s: %v\n", event.Id, event.Message)
message := event.Message["message"].(string)
if message == "" {
continue
switch event.Type {
case ws.ConnectedEvent:
fmt.Printf("User %s connected\n", event.Id)
m.backFill(event.Id)
case ws.DisconnectedEvent:
fmt.Printf("User %s disconnected\n", event.Id)
case ws.MessageEvent:
m.onMessage(event.Id, event.Payload)
}
messageEle := h.Div(
h.Attribute("hx-swap-oob", "beforeend"),
h.Class("flex flex-col gap-2 w-full"),
h.Id("messages"),
h.Pf(message),
)
manager.BroadcastText(
h.Render(
h.Fragment(
messageEle,
)),
)
}
}
}
func (m *Manager) backFill(socketId string) {
messages, _ := m.queries.GetLastMessages(context.Background(), db.GetLastMessagesParams{
ChatRoomID: "4ccc3f90a27c9375c98477571034b2e1",
Limit: 50,
})
for _, message := range messages {
m.socketManager.SendText(socketId,
h.Render(MessageRow(message.Message)),
)
}
}
func (m *Manager) onMessage(socketId string, payload map[string]any) {
fmt.Printf("Received message from %s: %v\n", socketId, payload)
message := payload["message"].(string)
if message == "" {
return
}
ctx := context.Background()
user, err := m.queries.CreateUser(ctx, uuid.NewString())
if err != nil {
fmt.Printf("Error creating user: %v\n", err)
return
}
//chat, _ := m.queries.CreateChatRoom(ctx, "General")
err = m.queries.InsertMessage(
context.Background(),
db.InsertMessageParams{
ChatRoomID: "4ccc3f90a27c9375c98477571034b2e1",
UserID: user.ID,
Message: message,
},
)
if err != nil {
fmt.Printf("Error inserting message: %v\n", err)
return
}
m.socketManager.BroadcastText(
h.Render(MessageRow(message)),
)
}

View file

@ -0,0 +1,12 @@
package chat
import "github.com/maddalax/htmgo/framework/h"
func MessageRow(text string) *h.Element {
return h.Div(
h.Attribute("hx-swap-oob", "beforeend"),
h.Class("flex flex-col gap-2 w-full"),
h.Id("messages"),
h.Pf(text),
)
}

View file

@ -0,0 +1,31 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.27.0
package db
import (
"context"
"database/sql"
)
type DBTX interface {
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
PrepareContext(context.Context, string) (*sql.Stmt, error)
QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
QueryRowContext(context.Context, string, ...interface{}) *sql.Row
}
func New(db DBTX) *Queries {
return &Queries{db: db}
}
type Queries struct {
db DBTX
}
func (q *Queries) WithTx(tx *sql.Tx) *Queries {
return &Queries{
db: tx,
}
}

View file

@ -0,0 +1,33 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.27.0
package db
import (
"database/sql"
)
type ChatRoom struct {
ID string
Name string
LastMessageSentAt sql.NullString
CreatedAt string
UpdatedAt string
}
type Message struct {
ID int64
ChatRoomID string
UserID int64
Message string
CreatedAt string
UpdatedAt string
}
type User struct {
ID int64
Name string
CreatedAt string
UpdatedAt string
}

View file

@ -0,0 +1,25 @@
package db
import (
"context"
"database/sql"
_ "embed"
_ "github.com/mattn/go-sqlite3"
)
//go:embed schema.sql
var ddl string
func Provide() *Queries {
db, err := sql.Open("sqlite3", "file:chat.db?cache=shared&_fk=1")
if err != nil {
panic(err)
}
if _, err := db.ExecContext(context.Background(), ddl); err != nil {
panic(err)
}
return New(db)
}

View file

@ -0,0 +1,44 @@
-- name: CreateChatRoom :one
INSERT INTO chat_rooms (name, created_at, updated_at)
VALUES (?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
RETURNING id, name, created_at, updated_at, last_message_sent_at;
-- name: InsertMessage :exec
INSERT INTO messages (chat_room_id, user_id, message, created_at, updated_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
RETURNING id, chat_room_id, user_id, message, created_at, updated_at;
-- name: UpdateChatRoomLastMessageSentAt :exec
UPDATE chat_rooms
SET last_message_sent_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP
WHERE id = ?;
-- name: GetChatRoom :one
SELECT
id,
name,
last_message_sent_at,
created_at,
updated_at
FROM chat_rooms
WHERE chat_rooms.id = ?;
-- name: CreateUser :one
INSERT INTO users (name, created_at, updated_at)
VALUES (?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
RETURNING id, name, created_at, updated_at;
-- name: GetLastMessages :many
SELECT
messages.id,
messages.chat_room_id,
messages.user_id,
users.name AS user_name,
messages.message,
messages.created_at,
messages.updated_at
FROM messages
JOIN users ON messages.user_id = users.id
WHERE messages.chat_room_id = ?
ORDER BY messages.created_at
LIMIT ?;

View file

@ -0,0 +1,170 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.27.0
// source: queries.sql
package db
import (
"context"
"database/sql"
)
const createChatRoom = `-- name: CreateChatRoom :one
INSERT INTO chat_rooms (name, created_at, updated_at)
VALUES (?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
RETURNING id, name, created_at, updated_at, last_message_sent_at
`
type CreateChatRoomRow struct {
ID string
Name string
CreatedAt string
UpdatedAt string
LastMessageSentAt sql.NullString
}
func (q *Queries) CreateChatRoom(ctx context.Context, name string) (CreateChatRoomRow, error) {
row := q.db.QueryRowContext(ctx, createChatRoom, name)
var i CreateChatRoomRow
err := row.Scan(
&i.ID,
&i.Name,
&i.CreatedAt,
&i.UpdatedAt,
&i.LastMessageSentAt,
)
return i, err
}
const createUser = `-- name: CreateUser :one
INSERT INTO users (name, created_at, updated_at)
VALUES (?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
RETURNING id, name, created_at, updated_at
`
func (q *Queries) CreateUser(ctx context.Context, name string) (User, error) {
row := q.db.QueryRowContext(ctx, createUser, name)
var i User
err := row.Scan(
&i.ID,
&i.Name,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const getChatRoom = `-- name: GetChatRoom :one
SELECT
id,
name,
last_message_sent_at,
created_at,
updated_at
FROM chat_rooms
WHERE chat_rooms.id = ?
`
func (q *Queries) GetChatRoom(ctx context.Context, id string) (ChatRoom, error) {
row := q.db.QueryRowContext(ctx, getChatRoom, id)
var i ChatRoom
err := row.Scan(
&i.ID,
&i.Name,
&i.LastMessageSentAt,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const getLastMessages = `-- name: GetLastMessages :many
SELECT
messages.id,
messages.chat_room_id,
messages.user_id,
users.name AS user_name,
messages.message,
messages.created_at,
messages.updated_at
FROM messages
JOIN users ON messages.user_id = users.id
WHERE messages.chat_room_id = ?
ORDER BY messages.created_at
LIMIT ?
`
type GetLastMessagesParams struct {
ChatRoomID string
Limit int64
}
type GetLastMessagesRow struct {
ID int64
ChatRoomID string
UserID int64
UserName string
Message string
CreatedAt string
UpdatedAt string
}
func (q *Queries) GetLastMessages(ctx context.Context, arg GetLastMessagesParams) ([]GetLastMessagesRow, error) {
rows, err := q.db.QueryContext(ctx, getLastMessages, arg.ChatRoomID, arg.Limit)
if err != nil {
return nil, err
}
defer rows.Close()
var items []GetLastMessagesRow
for rows.Next() {
var i GetLastMessagesRow
if err := rows.Scan(
&i.ID,
&i.ChatRoomID,
&i.UserID,
&i.UserName,
&i.Message,
&i.CreatedAt,
&i.UpdatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const insertMessage = `-- name: InsertMessage :exec
INSERT INTO messages (chat_room_id, user_id, message, created_at, updated_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
RETURNING id, chat_room_id, user_id, message, created_at, updated_at
`
type InsertMessageParams struct {
ChatRoomID string
UserID int64
Message string
}
func (q *Queries) InsertMessage(ctx context.Context, arg InsertMessageParams) error {
_, err := q.db.ExecContext(ctx, insertMessage, arg.ChatRoomID, arg.UserID, arg.Message)
return err
}
const updateChatRoomLastMessageSentAt = `-- name: UpdateChatRoomLastMessageSentAt :exec
UPDATE chat_rooms
SET last_message_sent_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
`
func (q *Queries) UpdateChatRoomLastMessageSentAt(ctx context.Context, id string) error {
_, err := q.db.ExecContext(ctx, updateChatRoomLastMessageSentAt, id)
return err
}

View file

@ -0,0 +1,31 @@
CREATE TABLE IF NOT EXISTS users
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
) STRICT;
CREATE TABLE IF NOT EXISTS chat_rooms
(
id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))), -- Generates a UUID
name TEXT NOT NULL,
last_message_sent_at TEXT,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
) STRICT;
CREATE TABLE IF NOT EXISTS messages
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_room_id TEXT NOT NULL,
user_id INTEGER NOT NULL,
message TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (chat_room_id) REFERENCES chat_rooms (id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
) STRICT;
CREATE INDEX IF NOT EXISTS idx_messages_chat_room_id ON messages (chat_room_id);
CREATE INDEX IF NOT EXISTS idx_messages_user_id ON messages (user_id);

View file

@ -3,6 +3,7 @@ package main
import (
"chat/__htmgo"
"chat/chat"
"chat/internal/db"
"chat/ws"
"embed"
"github.com/maddalax/htmgo/framework/h"
@ -17,11 +18,13 @@ var StaticAssets embed.FS
func main() {
locator := service.NewLocator()
service.Set[db.Queries](locator, service.Singleton, db.Provide)
service.Set[ws.SocketManager](locator, service.Singleton, func() *ws.SocketManager {
return ws.NewSocketManager()
})
go chat.StartListener(locator)
chatManager := chat.NewManager(locator)
go chatManager.StartListener()
h.Start(h.AppOpts{
ServiceLocator: locator,

9
examples/chat/sqlc.yaml Normal file
View file

@ -0,0 +1,9 @@
version: "2"
sql:
- schema: "internal/db/schema.sql"
queries: "internal/db/queries.sql"
engine: "sqlite"
gen:
go:
package: "db"
out: "internal/db"

View file

@ -7,14 +7,23 @@ import (
"github.com/puzpuzpuz/xsync/v3"
)
type MessageEvent struct {
type EventType string
const (
ConnectedEvent EventType = "connected"
DisconnectedEvent EventType = "disconnected"
MessageEvent EventType = "message"
)
type SocketEvent struct {
Id string
Message map[string]any
Type EventType
Payload map[string]any
}
type SocketManager struct {
sockets *xsync.MapOf[string, *websocket.Conn]
listeners []chan MessageEvent
listeners []chan SocketEvent
}
func NewSocketManager() *SocketManager {
@ -23,29 +32,49 @@ func NewSocketManager() *SocketManager {
}
}
func (manager *SocketManager) Listen(listener chan MessageEvent) {
func (manager *SocketManager) Listen(listener chan SocketEvent) {
if manager.listeners == nil {
manager.listeners = make([]chan MessageEvent, 0)
manager.listeners = make([]chan SocketEvent, 0)
}
manager.listeners = append(manager.listeners, listener)
}
func (manager *SocketManager) OnMessage(id string, message map[string]any) {
func (manager *SocketManager) dispatch(event SocketEvent) {
for _, listener := range manager.listeners {
listener <- MessageEvent{
listener <- event
}
}
func (manager *SocketManager) OnMessage(id string, message map[string]any) {
manager.dispatch(SocketEvent{
Id: id,
Message: message,
}
}
Type: MessageEvent,
Payload: message,
})
}
func (manager *SocketManager) Add(id string, conn *websocket.Conn) {
manager.sockets.Store(id, conn)
manager.dispatch(SocketEvent{
Id: id,
Type: ConnectedEvent,
Payload: map[string]any{},
})
}
func (manager *SocketManager) OnClose(id string) {
manager.dispatch(SocketEvent{
Id: id,
Type: DisconnectedEvent,
Payload: map[string]any{},
})
manager.sockets.Delete(id)
}
func (manager *SocketManager) CloseWithError(id string, message string) {
conn := manager.Get(id)
if conn != nil {
defer manager.OnClose(id)
conn.Close(websocket.StatusInternalError, message)
}
}
@ -53,9 +82,9 @@ func (manager *SocketManager) CloseWithError(id string, message string) {
func (manager *SocketManager) Disconnect(id string) {
conn := manager.Get(id)
if conn != nil {
defer manager.OnClose(id)
_ = conn.CloseNow()
}
manager.sockets.Delete(id)
}
func (manager *SocketManager) Get(id string) *websocket.Conn {
@ -78,3 +107,10 @@ func (manager *SocketManager) BroadcastText(message string) {
fmt.Printf("Broadcasting message: \n%s\n", message)
manager.Broadcast([]byte(message), websocket.MessageText)
}
func (manager *SocketManager) SendText(id string, message string) {
conn := manager.Get(id)
if conn != nil {
_ = conn.Write(context.Background(), websocket.MessageText, []byte(message))
}
}

View file

@ -51,7 +51,9 @@ func (ctx *RenderContext) AddScript(funcName string, body string) {
}
func (node *Element) Render(context *RenderContext) {
// some elements may not have a tag, such as a Fragment
if node == nil {
return
}
if node.tag == CachedNodeTag {
meta := node.meta.(*CachedNode)
@ -65,6 +67,7 @@ func (node *Element) Render(context *RenderContext) {
return
}
// some elements may not have a tag, such as a Fragment
if node.tag != "" {
context.builder.WriteString("<")
context.builder.WriteString(node.tag)