import { Server } from "socket.io" import http from "http" import Koa from "koa" import { userAgent } from "koa-useragent" import { auth, Header, redis } from "@budibase/backend-core" import { createAdapter } from "@socket.io/redis-adapter" import { Socket } from "socket.io" import { getSocketPubSubClients } from "../utilities/redis" import { SocketEvent, SocketSessionTTL } from "@budibase/shared-core" import { SocketSession } from "@budibase/types" import { v4 as uuid } from "uuid" import { createContext, runMiddlewares } from "./middleware" const anonUser = () => ({ _id: uuid(), email: "user@mail.com", firstName: "Anonymous", }) export class BaseSocket { app: Koa io: Server path: string redisClient?: redis.Client constructor( app: Koa, server: http.Server, path: string = "/", additionalMiddlewares?: any[] ) { this.app = app this.path = path this.io = new Server(server, { path, }) // Attach default middlewares const authenticate = auth.buildAuthMiddleware([], { publicAllowed: true, }) const middlewares = [ userAgent, authenticate, ...(additionalMiddlewares || []), ] // Apply middlewares this.io.use(async (socket, next) => { const ctx = createContext(this.app, socket) try { await runMiddlewares(ctx, middlewares, () => { // Middlewares are finished // Extract some data from our enriched koa context to persist // as metadata for the socket const user = ctx.user?._id ? ctx.user : anonUser() const { _id, email, firstName, lastName } = user socket.data = { _id, email, firstName, lastName, sessionId: socket.id, connectedAt: Date.now(), } next() }) } catch (error: any) { next(error) } }) // Initialise redis before handling connections this.initialise().then(() => { this.io.on("connection", async socket => { // Add built in handler for heartbeats socket.on(SocketEvent.Heartbeat, async () => { await this.extendSessionTTL(socket.data.sessionId) }) // Add early disconnection handler to clean up and leave room socket.on("disconnect", async () => { // Run any custom disconnection logic before we leave the room, // so that we have access to their room etc before disconnection await this.onDisconnect(socket) // Leave the current room when the user disconnects if we're in one await this.leaveRoom(socket) }) // Add handlers for this socket await this.onConnect(socket) }) }) } async initialise() { // Instantiate redis adapter. // We use a fully qualified key name here as this bypasses the normal // redis client#s key prefixing. const { pub, sub } = getSocketPubSubClients() const opts = { key: `${redis.utils.Databases.SOCKET_IO}-${this.path}-pubsub`, } this.io.adapter(createAdapter(pub, sub, opts)) // Fetch redis client this.redisClient = await redis.clients.getSocketClient() } // Gets the redis key for a certain session ID getSessionKey(sessionId: string) { return `${this.path}-session:${sessionId}` } // Gets the redis key for certain room name getRoomKey(room: string) { return `${this.path}-room:${room}` } async extendSessionTTL(sessionId: string) { const key = this.getSessionKey(sessionId) await this.redisClient?.setExpiry(key, SocketSessionTTL) } // Gets an array of all redis keys of users inside a certain room async getRoomSessionIds(room: string | string[]): Promise { if (Array.isArray(room)) { const roomKeys = room.map(this.getRoomKey.bind(this)) const roomSessionIdMap = await this.redisClient?.bulkGet(roomKeys) let sessionIds: any[] = [] Object.values(roomSessionIdMap || {}).forEach(roomSessionIds => { sessionIds = sessionIds.concat(roomSessionIds) }) return sessionIds } else { return (await this.redisClient?.get(this.getRoomKey(room))) || [] } } // Sets the list of redis keys for users inside a certain room. // There is no TTL on the actual room key map itself. async setRoomSessionIds(room: string, ids: string[]) { await this.redisClient?.store(this.getRoomKey(room), ids) } // Gets a list of all users inside a certain room async getRoomSessions(room?: string | string[]): Promise { if (room) { const sessionIds = await this.getRoomSessionIds(room) const keys = sessionIds.map(this.getSessionKey.bind(this)) const sessions = await this.redisClient?.bulkGet(keys) return Object.values(sessions || {}) } else { return [] } } // Detects keys which have been pruned from redis due to TTL expiry in a certain // room and broadcasts disconnection messages to ensure clients are aware async pruneRoom(room: string) { const sessionIds = await this.getRoomSessionIds(room) const sessionsExist = await Promise.all( sessionIds.map(id => this.redisClient?.exists(this.getSessionKey(id))) ) const prunedSessionIds = sessionIds.filter((id, idx) => { if (!sessionsExist[idx]) { this.io.to(room).emit(SocketEvent.UserDisconnect, { sessionId: sessionIds[idx], }) return false } return true }) // Store new pruned keys await this.setRoomSessionIds(room, prunedSessionIds) } // Adds a user to a certain room async joinRoom(socket: Socket, room: string) { if (!room) { return } // Prune room before joining await this.pruneRoom(room) // Check if we're already in a room, as we'll need to leave if we are before we // can join a different room const oldRoom = socket.data.room if (oldRoom && oldRoom !== room) { await this.leaveRoom(socket) } // Join new room if (!oldRoom || oldRoom !== room) { socket.join(room) socket.data.room = room } // Store in redis // @ts-ignore let user: SocketSession = socket.data const { sessionId } = user const key = this.getSessionKey(sessionId) await this.redisClient?.store(key, user, SocketSessionTTL) const sessionIds = await this.getRoomSessionIds(room) if (!sessionIds.includes(sessionId)) { await this.setRoomSessionIds(room, [...sessionIds, sessionId]) } // Notify other users socket.to(room).emit(SocketEvent.UserUpdate, { user, }) } // Disconnects a socket from its current room async leaveRoom(socket: Socket) { // @ts-ignore let user: SocketSession = socket.data const { room, sessionId } = user if (!room) { return } // Leave room socket.leave(room) socket.data.room = undefined // Delete from redis const key = this.getSessionKey(sessionId) await this.redisClient?.delete(key) const sessionIds = await this.getRoomSessionIds(room) await this.setRoomSessionIds( room, sessionIds.filter(id => id !== sessionId) ) // Notify other users socket.to(room).emit(SocketEvent.UserDisconnect, { sessionId }) } // Updates a connected user's metadata, assuming a room change is not required. async updateUser(socket: Socket, patch: Object) { socket.data = { ...socket.data, ...patch, } // If we're in a room, notify others of this change and update redis if (socket.data.room) { await this.joinRoom(socket, socket.data.room) } } async onConnect(socket: Socket) { // Override } async onDisconnect(socket: Socket) { // Override } // Emit an event to all sockets emit(event: string, payload: any) { this.io.sockets.emit(event, payload) } // Emit an event to everyone in a room, including metadata of whom // the originator of the request was emitToRoom(ctx: any, room: string, event: string, payload: any) { this.io.in(room).emit(event, { ...payload, apiSessionId: ctx.headers?.[Header.SESSION_ID], }) } }