Back to Skills

websocket-implementation

aj-geddes
Updated Today
22 views
7
7
View on GitHub
Metadesign

About

This skill provides WebSocket implementation for real-time bidirectional communication, including connection management and message routing. Use it when building chat systems, live notifications, or collaborative applications that require instant data updates. It supports scaling across multiple servers and includes error handling for production environments.

Documentation

WebSocket Implementation

Overview

Build scalable WebSocket systems for real-time communication with proper connection management, message routing, error handling, and horizontal scaling support.

When to Use

  • Building real-time chat and messaging
  • Implementing live notifications
  • Creating collaborative editing tools
  • Broadcasting live data updates
  • Building real-time dashboards
  • Streaming events to clients
  • Live multiplayer games

Instructions

1. Node.js WebSocket Server (Socket.IO)

const express = require('express');
const http = require('http');
const socketIo = require('socket.io');
const redis = require('redis');

const app = express();
const server = http.createServer(app);
const io = socketIo(server, {
  cors: { origin: '*' },
  transports: ['websocket', 'polling'],
  reconnection: true,
  reconnectionDelay: 1000,
  reconnectionDelayMax: 5000,
  reconnectionAttempts: 5
});

// Redis adapter for horizontal scaling
const redisClient = redis.createClient();
const { createAdapter } = require('@socket.io/redis-adapter');

io.adapter(createAdapter(redisClient, redisClient.duplicate()));

// Connection management
const connectedUsers = new Map();

io.on('connection', (socket) => {
  console.log(`User connected: ${socket.id}`);

  // Store user connection
  socket.on('auth', (userData) => {
    connectedUsers.set(socket.id, {
      userId: userData.id,
      username: userData.username,
      socketId: socket.id,
      connectedAt: new Date()
    });

    // Join user-specific room
    socket.join(`user:${userData.id}`);
    socket.join('authenticated_users');

    // Notify others user is online
    io.to('authenticated_users').emit('user:online', {
      userId: userData.id,
      username: userData.username,
      timestamp: new Date()
    });

    console.log(`User authenticated: ${userData.username}`);
  });

  // Chat messaging
  socket.on('chat:message', (message) => {
    const user = connectedUsers.get(socket.id);

    if (!user) {
      socket.emit('error', { message: 'Not authenticated' });
      return;
    }

    const chatMessage = {
      id: `msg_${Date.now()}`,
      senderId: user.userId,
      senderName: user.username,
      text: message.text,
      roomId: message.roomId,
      timestamp: new Date(),
      status: 'delivered'
    };

    // Save to database
    Message.create(chatMessage);

    // Broadcast to room
    io.to(`room:${message.roomId}`).emit('chat:message', chatMessage);

    // Update message status
    setTimeout(() => {
      socket.emit('chat:message:ack', { messageId: chatMessage.id, status: 'read' });
    }, 100);
  });

  // Room management
  socket.on('room:join', (roomId) => {
    socket.join(`room:${roomId}`);

    const user = connectedUsers.get(socket.id);
    io.to(`room:${roomId}`).emit('room:user:joined', {
      userId: user.userId,
      username: user.username,
      timestamp: new Date()
    });
  });

  socket.on('room:leave', (roomId) => {
    socket.leave(`room:${roomId}`);

    const user = connectedUsers.get(socket.id);
    io.to(`room:${roomId}`).emit('room:user:left', {
      userId: user.userId,
      timestamp: new Date()
    });
  });

  // Typing indicator
  socket.on('typing:start', (roomId) => {
    const user = connectedUsers.get(socket.id);
    io.to(`room:${roomId}`).emit('typing:indicator', {
      userId: user.userId,
      username: user.username,
      isTyping: true
    });
  });

  socket.on('typing:stop', (roomId) => {
    const user = connectedUsers.get(socket.id);
    io.to(`room:${roomId}`).emit('typing:indicator', {
      userId: user.userId,
      isTyping: false
    });
  });

  // Handle disconnection
  socket.on('disconnect', () => {
    const user = connectedUsers.get(socket.id);

    if (user) {
      connectedUsers.delete(socket.id);
      io.to('authenticated_users').emit('user:offline', {
        userId: user.userId,
        timestamp: new Date()
      });

      console.log(`User disconnected: ${user.username}`);
    }
  });

  // Error handling
  socket.on('error', (error) => {
    console.error(`Socket error: ${error}`);
    socket.emit('error', { message: 'An error occurred' });
  });
});

// Server methods
const broadcastUserUpdate = (userId, data) => {
  io.to(`user:${userId}`).emit('user:update', data);
};

const notifyRoom = (roomId, event, data) => {
  io.to(`room:${roomId}`).emit(event, data);
};

const sendDirectMessage = (userId, event, data) => {
  io.to(`user:${userId}`).emit(event, data);
};

server.listen(3000, () => {
  console.log('WebSocket server listening on port 3000');
});

2. Browser WebSocket Client

class WebSocketClient {
  constructor(url, options = {}) {
    this.url = url;
    this.socket = null;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = options.maxReconnectAttempts || 5;
    this.reconnectDelay = options.reconnectDelay || 1000;
    this.listeners = new Map();
    this.messageQueue = [];
    this.isAuthenticated = false;

    this.connect();
  }

  connect() {
    this.socket = io(this.url, {
      reconnection: true,
      reconnectionDelay: this.reconnectDelay,
      reconnectionAttempts: this.maxReconnectAttempts
    });

    this.socket.on('connect', () => {
      console.log('Connected to server');
      this.reconnectAttempts = 0;
      this.processMessageQueue();
    });

    this.socket.on('disconnect', () => {
      console.log('Disconnected from server');
    });

    this.socket.on('error', (error) => {
      console.error('Socket error:', error);
      this.emit('error', error);
    });

    this.socket.on('connect_error', (error) => {
      console.error('Connection error:', error);
    });
  }

  authenticate(userData) {
    this.socket.emit('auth', userData, (response) => {
      if (response.success) {
        this.isAuthenticated = true;
        this.emit('authenticated');
      }
    });
  }

  on(event, callback) {
    this.socket.on(event, callback);

    if (!this.listeners.has(event)) {
      this.listeners.set(event, []);
    }
    this.listeners.get(event).push(callback);
  }

  emit(event, data, callback) {
    if (!this.socket.connected) {
      this.messageQueue.push({ event, data, callback });
      return;
    }

    this.socket.emit(event, data, callback);
  }

  processMessageQueue() {
    while (this.messageQueue.length > 0) {
      const { event, data, callback } = this.messageQueue.shift();
      this.socket.emit(event, data, callback);
    }
  }

  joinRoom(roomId) {
    this.emit('room:join', roomId);
  }

  leaveRoom(roomId) {
    this.emit('room:leave', roomId);
  }

  sendMessage(roomId, text) {
    this.emit('chat:message', { roomId, text });
  }

  setTypingIndicator(roomId, isTyping) {
    if (isTyping) {
      this.emit('typing:start', roomId);
    } else {
      this.emit('typing:stop', roomId);
    }
  }

  disconnect() {
    this.socket.disconnect();
  }
}

// Usage
const client = new WebSocketClient('http://localhost:3000');

client.on('chat:message', (message) => {
  console.log('Received message:', message);
  displayMessage(message);
});

client.on('typing:indicator', (data) => {
  updateTypingIndicator(data);
});

client.on('user:online', (user) => {
  updateUserStatus(user.userId, 'online');
});

client.authenticate({ id: 'user123', username: 'john' });
client.joinRoom('room1');
client.sendMessage('room1', 'Hello everyone!');

3. Python WebSocket Server (aiohttp)

from aiohttp import web
import aiohttp
import json
from datetime import datetime
from typing import Set

class WebSocketServer:
    def __init__(self):
        self.app = web.Application()
        self.rooms = {}
        self.users = {}
        self.setup_routes()

    def setup_routes(self):
        self.app.router.add_get('/ws', self.websocket_handler)
        self.app.router.add_post('/api/message', self.send_message_api)

    async def websocket_handler(self, request):
        ws = web.WebSocketResponse()
        await ws.prepare(request)

        user_id = None
        room_id = None

        async for msg in ws.iter_any():
            if isinstance(msg, aiohttp.WSMessage):
                data = json.loads(msg.data)
                event_type = data.get('type')

                try:
                    if event_type == 'auth':
                        user_id = data.get('userId')
                        self.users[user_id] = ws
                        await ws.send_json({
                            'type': 'authenticated',
                            'timestamp': datetime.now().isoformat()
                        })

                    elif event_type == 'join_room':
                        room_id = data.get('roomId')
                        if room_id not in self.rooms:
                            self.rooms[room_id] = set()
                        self.rooms[room_id].add(user_id)

                        # Notify others
                        await self.broadcast_to_room(room_id, {
                            'type': 'user_joined',
                            'userId': user_id,
                            'timestamp': datetime.now().isoformat()
                        }, exclude=user_id)

                    elif event_type == 'message':
                        message = {
                            'id': f'msg_{datetime.now().timestamp()}',
                            'userId': user_id,
                            'text': data.get('text'),
                            'roomId': room_id,
                            'timestamp': datetime.now().isoformat()
                        }

                        # Save to database
                        await self.save_message(message)

                        # Broadcast to room
                        await self.broadcast_to_room(room_id, message)

                    elif event_type == 'leave_room':
                        if room_id in self.rooms:
                            self.rooms[room_id].discard(user_id)

                except Exception as error:
                    await ws.send_json({
                        'type': 'error',
                        'message': str(error)
                    })

        # Cleanup on disconnect
        if user_id:
            del self.users[user_id]
        if room_id and user_id:
            if room_id in self.rooms:
                self.rooms[room_id].discard(user_id)

        return ws

    async def broadcast_to_room(self, room_id, message, exclude=None):
        if room_id not in self.rooms:
            return

        for user_id in self.rooms[room_id]:
            if user_id != exclude and user_id in self.users:
                try:
                    await self.users[user_id].send_json(message)
                except Exception as error:
                    print(f'Error sending message: {error}')

    async def save_message(self, message):
        # Save to database
        pass

    async def send_message_api(self, request):
        data = await request.json()
        room_id = data.get('roomId')

        await self.broadcast_to_room(room_id, {
            'type': 'message',
            'text': data.get('text'),
            'timestamp': datetime.now().isoformat()
        })

        return web.json_response({'sent': True})

def create_app():
    server = WebSocketServer()
    return server.app

if __name__ == '__main__':
    app = create_app()
    web.run_app(app, port=3000)

4. Message Types and Protocols

// Authentication
{
  "type": "auth",
  "userId": "user123",
  "token": "jwt_token_here"
}

// Chat Message
{
  "type": "message",
  "roomId": "room123",
  "text": "Hello everyone!",
  "timestamp": "2025-01-15T10:30:00Z"
}

// Typing Indicator
{
  "type": "typing",
  "roomId": "room123",
  "isTyping": true
}

// Presence
{
  "type": "presence",
  "status": "online|away|offline"
}

// Notification
{
  "type": "notification",
  "title": "New message",
  "body": "You have a new message",
  "data": {}
}

5. Scaling with Redis

const redis = require('redis');
const { createAdapter } = require('@socket.io/redis-adapter');
const { createClient } = require('redis');

const pubClient = createClient({ host: 'redis', port: 6379 });
const subClient = pubClient.duplicate();

io.adapter(createAdapter(pubClient, subClient));

// Publish to multiple servers
io.emit('user:action', { userId: 123, action: 'login' });

// Subscribe to events from other servers
redisClient.subscribe('notifications', (message) => {
  const notification = JSON.parse(message);
  io.to(`user:${notification.userId}`).emit('notification', notification);
});

Best Practices

✅ DO

  • Implement proper authentication
  • Handle reconnection gracefully
  • Manage rooms/channels effectively
  • Persist messages appropriately
  • Monitor active connections
  • Implement presence features
  • Use Redis for scaling
  • Add message acknowledgment
  • Implement rate limiting
  • Handle errors properly

❌ DON'T

  • Send unencrypted sensitive data
  • Keep unlimited message history in memory
  • Allow arbitrary room/channel creation
  • Forget to clean up disconnected connections
  • Send large messages frequently
  • Ignore network failures
  • Store passwords in messages
  • Skip authentication/authorization
  • Create unbounded growth of connections
  • Ignore scalability from day one

Monitoring

// Track active connections
io.engine.on('connection_error', (err) => {
  console.log(err.req); // the request object
  console.log(err.code); // the error code, e.g. 1
  console.log(err.message); // the error message
  console.log(err.context); // some additional error context
});

app.get('/metrics/websocket', (req, res) => {
  res.json({
    activeConnections: io.engine.clientsCount,
    connectedSockets: io.sockets.sockets.size,
    rooms: Object.keys(io.sockets.adapter.rooms)
  });
});

Quick Install

/plugin add https://github.com/aj-geddes/useful-ai-prompts/tree/main/websocket-implementation

Copy and paste this command in Claude Code to install this skill

GitHub 仓库

aj-geddes/useful-ai-prompts
Path: skills/websocket-implementation

Related Skills

langchain

Meta

LangChain is a framework for building LLM applications using agents, chains, and RAG pipelines. It supports multiple LLM providers, offers 500+ integrations, and includes features like tool calling and memory management. Use it for rapid prototyping and deploying production systems like chatbots, autonomous agents, and question-answering services.

View skill

Algorithmic Art Generation

Meta

This skill helps developers create algorithmic art using p5.js, focusing on generative art, computational aesthetics, and interactive visualizations. It automatically activates for topics like "generative art" or "p5.js visualization" and guides you through creating unique algorithms with features like seeded randomness, flow fields, and particle systems. Use it when you need to build reproducible, code-driven artistic patterns.

View skill

webapp-testing

Testing

This Claude Skill provides a Playwright-based toolkit for testing local web applications through Python scripts. It enables frontend verification, UI debugging, screenshot capture, and log viewing while managing server lifecycles. Use it for browser automation tasks but run scripts directly rather than reading their source code to avoid context pollution.

View skill

requesting-code-review

Design

This skill dispatches a code-reviewer subagent to analyze code changes against requirements before proceeding. It should be used after completing tasks, implementing major features, or before merging to main. The review helps catch issues early by comparing the current implementation with the original plan.

View skill