2023-03-06 07:57:05 +13:00
|
|
|
import { Server } from "socket.io"
|
|
|
|
import http from "http"
|
|
|
|
import Koa from "koa"
|
|
|
|
import Cookies from "cookies"
|
|
|
|
import { userAgent } from "koa-useragent"
|
2023-06-01 02:13:22 +12:00
|
|
|
import { auth, redis } from "@budibase/backend-core"
|
2023-03-07 04:09:42 +13:00
|
|
|
import currentApp from "../middleware/currentapp"
|
2023-05-25 19:48:56 +12:00
|
|
|
import { createAdapter } from "@socket.io/redis-adapter"
|
2023-05-31 21:21:50 +12:00
|
|
|
import { Socket } from "socket.io"
|
2023-06-01 02:13:22 +12:00
|
|
|
import { getSocketPubSubClients } from "../utilities/redis"
|
|
|
|
import { SocketEvents, SocketSessionTTL } from "@budibase/shared-core"
|
|
|
|
import { SocketSession } from "@budibase/types"
|
2023-03-06 07:57:05 +13:00
|
|
|
|
2023-05-31 21:21:50 +12:00
|
|
|
export class BaseSocket {
|
2023-03-06 07:57:05 +13:00
|
|
|
io: Server
|
2023-05-31 21:21:50 +12:00
|
|
|
path: string
|
2023-06-01 02:13:22 +12:00
|
|
|
redisClient?: redis.Client
|
2023-03-06 07:57:05 +13:00
|
|
|
|
|
|
|
constructor(
|
|
|
|
app: Koa,
|
|
|
|
server: http.Server,
|
2023-05-25 19:48:56 +12:00
|
|
|
path: string = "/",
|
2023-03-06 07:57:05 +13:00
|
|
|
additionalMiddlewares?: any[]
|
|
|
|
) {
|
2023-05-31 21:21:50 +12:00
|
|
|
this.path = path
|
2023-03-06 07:57:05 +13:00
|
|
|
this.io = new Server(server, {
|
|
|
|
path,
|
|
|
|
})
|
|
|
|
|
|
|
|
// Attach default middlewares
|
|
|
|
const authenticate = auth.buildAuthMiddleware([], {
|
|
|
|
publicAllowed: true,
|
|
|
|
})
|
|
|
|
const middlewares = [
|
|
|
|
userAgent,
|
|
|
|
authenticate,
|
2023-04-20 19:02:49 +12:00
|
|
|
currentApp,
|
2023-03-06 07:57:05 +13:00
|
|
|
...(additionalMiddlewares || []),
|
|
|
|
]
|
|
|
|
|
|
|
|
// Apply middlewares
|
|
|
|
this.io.use(async (socket, next) => {
|
|
|
|
// Build fake koa context
|
|
|
|
const res = new http.ServerResponse(socket.request)
|
|
|
|
const ctx: any = {
|
|
|
|
...app.createContext(socket.request, res),
|
|
|
|
|
|
|
|
// Additional overrides needed to make our middlewares work with this
|
|
|
|
// fake koa context
|
|
|
|
cookies: new Cookies(socket.request, res),
|
|
|
|
get: (field: string) => socket.request.headers[field],
|
|
|
|
throw: (code: number, message: string) => {
|
|
|
|
throw new Error(message)
|
|
|
|
},
|
|
|
|
|
|
|
|
// Needed for koa-useragent middleware
|
2023-04-20 19:02:49 +12:00
|
|
|
headers: socket.request.headers,
|
2023-03-06 07:57:05 +13:00
|
|
|
header: socket.request.headers,
|
2023-04-21 02:23:57 +12:00
|
|
|
|
|
|
|
// We don't really care about the path since it will never contain
|
|
|
|
// an app ID
|
|
|
|
path: "/socket",
|
2023-03-06 07:57:05 +13:00
|
|
|
}
|
|
|
|
|
|
|
|
// Run all koa middlewares
|
|
|
|
try {
|
|
|
|
for (let [idx, middleware] of middlewares.entries()) {
|
|
|
|
await middleware(ctx, () => {
|
|
|
|
if (idx === middlewares.length - 1) {
|
2023-05-17 01:18:31 +12:00
|
|
|
// Middlewares are finished
|
2023-03-06 07:57:05 +13:00
|
|
|
// Extract some data from our enriched koa context to persist
|
|
|
|
// as metadata for the socket
|
2023-05-17 01:18:31 +12:00
|
|
|
const { _id, email, firstName, lastName } = ctx.user
|
2023-05-31 21:21:50 +12:00
|
|
|
socket.data = {
|
2023-05-26 20:24:53 +12:00
|
|
|
_id,
|
2023-05-17 01:18:31 +12:00
|
|
|
email,
|
2023-05-26 20:24:53 +12:00
|
|
|
firstName,
|
|
|
|
lastName,
|
2023-05-31 21:21:50 +12:00
|
|
|
sessionId: socket.id,
|
2023-03-06 20:43:45 +13:00
|
|
|
}
|
2023-03-06 07:57:05 +13:00
|
|
|
next()
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
} catch (error: any) {
|
|
|
|
next(error)
|
|
|
|
}
|
|
|
|
})
|
2023-05-25 19:48:56 +12:00
|
|
|
|
2023-06-01 02:13:22 +12:00
|
|
|
// Initialise redis before handling connections
|
|
|
|
this.initialise().then(() => {
|
|
|
|
this.io.on("connection", async socket => {
|
|
|
|
// Add built in handler to allow fetching all other users in this room
|
|
|
|
socket.on(SocketEvents.GetUsers, async (payload, callback) => {
|
|
|
|
const sessions = await this.getRoomSessions(socket.data.room)
|
|
|
|
callback({ users: sessions })
|
|
|
|
})
|
2023-05-31 21:21:50 +12:00
|
|
|
|
2023-06-01 02:13:22 +12:00
|
|
|
// Add built in handler for heartbeats
|
|
|
|
socket.on(SocketEvents.Heartbeat, async () => {
|
|
|
|
console.log(socket.data.email, "heartbeat received")
|
|
|
|
await this.extendSessionTTL(socket.data.sessionId)
|
|
|
|
})
|
2023-05-31 21:21:50 +12:00
|
|
|
|
2023-06-01 02:13:22 +12:00
|
|
|
// Add early disconnection handler to clean up and leave room
|
|
|
|
socket.on("disconnect", async () => {
|
|
|
|
// Run any custom disconnection logic before we leave the room,
|
|
|
|
// so that we have access to their room etc before disconnection
|
|
|
|
await this.onDisconnect(socket)
|
2023-05-31 21:21:50 +12:00
|
|
|
|
2023-06-01 02:13:22 +12:00
|
|
|
// Leave the current room when the user disconnects if we're in one
|
2023-05-31 21:21:50 +12:00
|
|
|
await this.leaveRoom(socket)
|
2023-06-01 02:13:22 +12:00
|
|
|
})
|
2023-05-31 21:21:50 +12:00
|
|
|
|
2023-06-01 02:13:22 +12:00
|
|
|
// Add handlers for this socket
|
|
|
|
await this.onConnect(socket)
|
2023-05-31 21:21:50 +12:00
|
|
|
})
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2023-06-01 02:13:22 +12:00
|
|
|
async initialise() {
|
|
|
|
// Instantiate redis adapter.
|
|
|
|
// We use a fully qualified key name here as this bypasses the normal
|
|
|
|
// redis client#s key prefixing.
|
|
|
|
const { pub, sub } = getSocketPubSubClients()
|
|
|
|
const opts = {
|
|
|
|
key: `${redis.utils.Databases.SOCKET_IO}-${this.path}-pubsub`,
|
|
|
|
}
|
|
|
|
this.io.adapter(createAdapter(pub, sub, opts))
|
|
|
|
|
|
|
|
// Fetch redis client
|
|
|
|
this.redisClient = await redis.clients.getSocketClient()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Gets the redis key for a certain session ID
|
|
|
|
getSessionKey(sessionId: string) {
|
|
|
|
return `${this.path}-session:${sessionId}`
|
|
|
|
}
|
|
|
|
|
|
|
|
// Gets the redis key for certain room name
|
|
|
|
getRoomKey(room: string) {
|
|
|
|
return `${this.path}-room:${room}`
|
|
|
|
}
|
|
|
|
|
|
|
|
async extendSessionTTL(sessionId: string) {
|
|
|
|
const key = this.getSessionKey(sessionId)
|
|
|
|
await this.redisClient?.setExpiry(key, SocketSessionTTL)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Gets an array of all redis keys of users inside a certain room
|
2023-06-01 02:37:39 +12:00
|
|
|
async getRoomSessionIds(room: string): Promise<string[]> {
|
2023-06-01 02:13:22 +12:00
|
|
|
const keys = await this.redisClient?.get(this.getRoomKey(room))
|
|
|
|
return keys || []
|
|
|
|
}
|
|
|
|
|
|
|
|
// Sets the list of redis keys for users inside a certain room.
|
|
|
|
// There is no TTL on the actual room key map itself.
|
2023-06-01 02:37:39 +12:00
|
|
|
async setRoomSessionIds(room: string, ids: string[]) {
|
|
|
|
await this.redisClient?.store(this.getRoomKey(room), ids)
|
2023-06-01 02:13:22 +12:00
|
|
|
}
|
|
|
|
|
2023-05-31 21:21:50 +12:00
|
|
|
// Gets a list of all users inside a certain room
|
2023-06-01 02:13:22 +12:00
|
|
|
async getRoomSessions(room?: string): Promise<SocketSession[]> {
|
2023-05-31 21:21:50 +12:00
|
|
|
if (room) {
|
2023-06-01 02:37:39 +12:00
|
|
|
const sessionIds = await this.getRoomSessionIds(room)
|
|
|
|
const keys = sessionIds.map(this.getSessionKey.bind(this))
|
2023-06-01 02:13:22 +12:00
|
|
|
const sessions = await this.redisClient?.bulkGet(keys)
|
|
|
|
return Object.values(sessions || {})
|
2023-05-31 21:21:50 +12:00
|
|
|
} else {
|
|
|
|
return []
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-06-01 02:13:22 +12:00
|
|
|
// Detects keys which have been pruned from redis due to TTL expiry in a certain
|
|
|
|
// room and broadcasts disconnection messages to ensure clients are aware
|
|
|
|
async pruneRoom(room: string) {
|
2023-06-01 02:37:39 +12:00
|
|
|
const sessionIds = await this.getRoomSessionIds(room)
|
|
|
|
const sessionsExist = await Promise.all(
|
|
|
|
sessionIds.map(id => this.redisClient?.exists(this.getSessionKey(id)))
|
2023-06-01 02:13:22 +12:00
|
|
|
)
|
2023-06-01 02:37:39 +12:00
|
|
|
const prunedSessionIds = sessionIds.filter((id, idx) => {
|
|
|
|
if (!sessionsExist[idx]) {
|
|
|
|
this.io.to(room).emit(SocketEvents.UserDisconnect, sessionIds[idx])
|
2023-06-01 02:13:22 +12:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
})
|
|
|
|
|
|
|
|
// Store new pruned keys
|
2023-06-01 02:37:39 +12:00
|
|
|
await this.setRoomSessionIds(room, prunedSessionIds)
|
2023-06-01 02:13:22 +12:00
|
|
|
}
|
|
|
|
|
2023-05-31 21:21:50 +12:00
|
|
|
// Adds a user to a certain room
|
|
|
|
async joinRoom(socket: Socket, room: string) {
|
2023-06-01 02:13:22 +12:00
|
|
|
if (!room) {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
// Prune room before joining
|
|
|
|
await this.pruneRoom(room)
|
|
|
|
|
2023-05-31 21:21:50 +12:00
|
|
|
// Check if we're already in a room, as we'll need to leave if we are before we
|
|
|
|
// can join a different room
|
|
|
|
const oldRoom = socket.data.room
|
|
|
|
if (oldRoom && oldRoom !== room) {
|
|
|
|
await this.leaveRoom(socket)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Join new room
|
|
|
|
if (!oldRoom || oldRoom !== room) {
|
|
|
|
socket.join(room)
|
|
|
|
socket.data.room = room
|
|
|
|
}
|
|
|
|
|
2023-06-01 02:13:22 +12:00
|
|
|
// Store in redis
|
2023-05-31 21:21:50 +12:00
|
|
|
// @ts-ignore
|
2023-06-01 02:13:22 +12:00
|
|
|
let user: SocketSession = socket.data
|
2023-06-01 02:37:39 +12:00
|
|
|
const { sessionId } = user
|
|
|
|
const key = this.getSessionKey(sessionId)
|
2023-06-01 02:13:22 +12:00
|
|
|
await this.redisClient?.store(key, user, SocketSessionTTL)
|
2023-06-01 02:37:39 +12:00
|
|
|
const sessionIds = await this.getRoomSessionIds(room)
|
|
|
|
if (!sessionIds.includes(sessionId)) {
|
|
|
|
await this.setRoomSessionIds(room, [...sessionIds, sessionId])
|
2023-05-31 21:21:50 +12:00
|
|
|
}
|
2023-06-01 02:13:22 +12:00
|
|
|
|
|
|
|
// Notify other users
|
2023-05-31 21:21:50 +12:00
|
|
|
socket.to(room).emit(SocketEvents.UserUpdate, user)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Disconnects a socket from its current room
|
|
|
|
async leaveRoom(socket: Socket) {
|
|
|
|
// @ts-ignore
|
2023-06-01 02:13:22 +12:00
|
|
|
let user: SocketSession = socket.data
|
2023-05-31 21:21:50 +12:00
|
|
|
const { room, sessionId } = user
|
|
|
|
if (!room) {
|
|
|
|
return
|
|
|
|
}
|
2023-06-01 02:13:22 +12:00
|
|
|
|
|
|
|
// Leave room
|
2023-05-31 21:21:50 +12:00
|
|
|
socket.leave(room)
|
|
|
|
socket.data.room = undefined
|
|
|
|
|
2023-06-01 02:13:22 +12:00
|
|
|
// Delete from redis
|
|
|
|
const key = this.getSessionKey(sessionId)
|
|
|
|
await this.redisClient?.delete(key)
|
2023-06-01 02:37:39 +12:00
|
|
|
const sessionIds = await this.getRoomSessionIds(room)
|
|
|
|
await this.setRoomSessionIds(
|
2023-06-01 02:13:22 +12:00
|
|
|
room,
|
2023-06-01 02:37:39 +12:00
|
|
|
sessionIds.filter(id => id !== sessionId)
|
2023-06-01 02:13:22 +12:00
|
|
|
)
|
2023-05-31 21:21:50 +12:00
|
|
|
|
2023-06-01 02:13:22 +12:00
|
|
|
// Notify other users
|
2023-06-01 02:37:39 +12:00
|
|
|
socket.to(room).emit(SocketEvents.UserDisconnect, sessionId)
|
2023-05-31 21:21:50 +12:00
|
|
|
}
|
|
|
|
|
|
|
|
// Updates a connected user's metadata, assuming a room change is not required.
|
|
|
|
async updateUser(socket: Socket, patch: Object) {
|
|
|
|
socket.data = {
|
|
|
|
...socket.data,
|
|
|
|
...patch,
|
|
|
|
}
|
|
|
|
|
|
|
|
// If we're in a room, notify others of this change and update redis
|
|
|
|
if (socket.data.room) {
|
|
|
|
await this.joinRoom(socket, socket.data.room)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async onConnect(socket: Socket) {
|
|
|
|
// Override
|
|
|
|
}
|
|
|
|
|
|
|
|
async onDisconnect(socket: Socket) {
|
|
|
|
// Override
|
2023-03-06 07:57:05 +13:00
|
|
|
}
|
|
|
|
|
|
|
|
// Emit an event to all sockets
|
|
|
|
emit(event: string, payload: any) {
|
|
|
|
this.io.sockets.emit(event, payload)
|
|
|
|
}
|
|
|
|
}
|