websocket-implementation
About
This skill provides WebSocket implementation for real-time bidirectional communication, including connection management and message routing. Use it when building chat systems, live notifications, or collaborative applications that require instant data updates. It supports scaling across multiple servers and includes error handling for production environments.
Documentation
WebSocket Implementation
Overview
Build scalable WebSocket systems for real-time communication with proper connection management, message routing, error handling, and horizontal scaling support.
When to Use
- Building real-time chat and messaging
- Implementing live notifications
- Creating collaborative editing tools
- Broadcasting live data updates
- Building real-time dashboards
- Streaming events to clients
- Live multiplayer games
Instructions
1. Node.js WebSocket Server (Socket.IO)
const express = require('express');
const http = require('http');
const socketIo = require('socket.io');
const redis = require('redis');
const app = express();
const server = http.createServer(app);
const io = socketIo(server, {
cors: { origin: '*' },
transports: ['websocket', 'polling'],
reconnection: true,
reconnectionDelay: 1000,
reconnectionDelayMax: 5000,
reconnectionAttempts: 5
});
// Redis adapter for horizontal scaling
const redisClient = redis.createClient();
const { createAdapter } = require('@socket.io/redis-adapter');
io.adapter(createAdapter(redisClient, redisClient.duplicate()));
// Connection management
const connectedUsers = new Map();
io.on('connection', (socket) => {
console.log(`User connected: ${socket.id}`);
// Store user connection
socket.on('auth', (userData) => {
connectedUsers.set(socket.id, {
userId: userData.id,
username: userData.username,
socketId: socket.id,
connectedAt: new Date()
});
// Join user-specific room
socket.join(`user:${userData.id}`);
socket.join('authenticated_users');
// Notify others user is online
io.to('authenticated_users').emit('user:online', {
userId: userData.id,
username: userData.username,
timestamp: new Date()
});
console.log(`User authenticated: ${userData.username}`);
});
// Chat messaging
socket.on('chat:message', (message) => {
const user = connectedUsers.get(socket.id);
if (!user) {
socket.emit('error', { message: 'Not authenticated' });
return;
}
const chatMessage = {
id: `msg_${Date.now()}`,
senderId: user.userId,
senderName: user.username,
text: message.text,
roomId: message.roomId,
timestamp: new Date(),
status: 'delivered'
};
// Save to database
Message.create(chatMessage);
// Broadcast to room
io.to(`room:${message.roomId}`).emit('chat:message', chatMessage);
// Update message status
setTimeout(() => {
socket.emit('chat:message:ack', { messageId: chatMessage.id, status: 'read' });
}, 100);
});
// Room management
socket.on('room:join', (roomId) => {
socket.join(`room:${roomId}`);
const user = connectedUsers.get(socket.id);
io.to(`room:${roomId}`).emit('room:user:joined', {
userId: user.userId,
username: user.username,
timestamp: new Date()
});
});
socket.on('room:leave', (roomId) => {
socket.leave(`room:${roomId}`);
const user = connectedUsers.get(socket.id);
io.to(`room:${roomId}`).emit('room:user:left', {
userId: user.userId,
timestamp: new Date()
});
});
// Typing indicator
socket.on('typing:start', (roomId) => {
const user = connectedUsers.get(socket.id);
io.to(`room:${roomId}`).emit('typing:indicator', {
userId: user.userId,
username: user.username,
isTyping: true
});
});
socket.on('typing:stop', (roomId) => {
const user = connectedUsers.get(socket.id);
io.to(`room:${roomId}`).emit('typing:indicator', {
userId: user.userId,
isTyping: false
});
});
// Handle disconnection
socket.on('disconnect', () => {
const user = connectedUsers.get(socket.id);
if (user) {
connectedUsers.delete(socket.id);
io.to('authenticated_users').emit('user:offline', {
userId: user.userId,
timestamp: new Date()
});
console.log(`User disconnected: ${user.username}`);
}
});
// Error handling
socket.on('error', (error) => {
console.error(`Socket error: ${error}`);
socket.emit('error', { message: 'An error occurred' });
});
});
// Server methods
const broadcastUserUpdate = (userId, data) => {
io.to(`user:${userId}`).emit('user:update', data);
};
const notifyRoom = (roomId, event, data) => {
io.to(`room:${roomId}`).emit(event, data);
};
const sendDirectMessage = (userId, event, data) => {
io.to(`user:${userId}`).emit(event, data);
};
server.listen(3000, () => {
console.log('WebSocket server listening on port 3000');
});
2. Browser WebSocket Client
class WebSocketClient {
constructor(url, options = {}) {
this.url = url;
this.socket = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = options.maxReconnectAttempts || 5;
this.reconnectDelay = options.reconnectDelay || 1000;
this.listeners = new Map();
this.messageQueue = [];
this.isAuthenticated = false;
this.connect();
}
connect() {
this.socket = io(this.url, {
reconnection: true,
reconnectionDelay: this.reconnectDelay,
reconnectionAttempts: this.maxReconnectAttempts
});
this.socket.on('connect', () => {
console.log('Connected to server');
this.reconnectAttempts = 0;
this.processMessageQueue();
});
this.socket.on('disconnect', () => {
console.log('Disconnected from server');
});
this.socket.on('error', (error) => {
console.error('Socket error:', error);
this.emit('error', error);
});
this.socket.on('connect_error', (error) => {
console.error('Connection error:', error);
});
}
authenticate(userData) {
this.socket.emit('auth', userData, (response) => {
if (response.success) {
this.isAuthenticated = true;
this.emit('authenticated');
}
});
}
on(event, callback) {
this.socket.on(event, callback);
if (!this.listeners.has(event)) {
this.listeners.set(event, []);
}
this.listeners.get(event).push(callback);
}
emit(event, data, callback) {
if (!this.socket.connected) {
this.messageQueue.push({ event, data, callback });
return;
}
this.socket.emit(event, data, callback);
}
processMessageQueue() {
while (this.messageQueue.length > 0) {
const { event, data, callback } = this.messageQueue.shift();
this.socket.emit(event, data, callback);
}
}
joinRoom(roomId) {
this.emit('room:join', roomId);
}
leaveRoom(roomId) {
this.emit('room:leave', roomId);
}
sendMessage(roomId, text) {
this.emit('chat:message', { roomId, text });
}
setTypingIndicator(roomId, isTyping) {
if (isTyping) {
this.emit('typing:start', roomId);
} else {
this.emit('typing:stop', roomId);
}
}
disconnect() {
this.socket.disconnect();
}
}
// Usage
const client = new WebSocketClient('http://localhost:3000');
client.on('chat:message', (message) => {
console.log('Received message:', message);
displayMessage(message);
});
client.on('typing:indicator', (data) => {
updateTypingIndicator(data);
});
client.on('user:online', (user) => {
updateUserStatus(user.userId, 'online');
});
client.authenticate({ id: 'user123', username: 'john' });
client.joinRoom('room1');
client.sendMessage('room1', 'Hello everyone!');
3. Python WebSocket Server (aiohttp)
from aiohttp import web
import aiohttp
import json
from datetime import datetime
from typing import Set
class WebSocketServer:
def __init__(self):
self.app = web.Application()
self.rooms = {}
self.users = {}
self.setup_routes()
def setup_routes(self):
self.app.router.add_get('/ws', self.websocket_handler)
self.app.router.add_post('/api/message', self.send_message_api)
async def websocket_handler(self, request):
ws = web.WebSocketResponse()
await ws.prepare(request)
user_id = None
room_id = None
async for msg in ws.iter_any():
if isinstance(msg, aiohttp.WSMessage):
data = json.loads(msg.data)
event_type = data.get('type')
try:
if event_type == 'auth':
user_id = data.get('userId')
self.users[user_id] = ws
await ws.send_json({
'type': 'authenticated',
'timestamp': datetime.now().isoformat()
})
elif event_type == 'join_room':
room_id = data.get('roomId')
if room_id not in self.rooms:
self.rooms[room_id] = set()
self.rooms[room_id].add(user_id)
# Notify others
await self.broadcast_to_room(room_id, {
'type': 'user_joined',
'userId': user_id,
'timestamp': datetime.now().isoformat()
}, exclude=user_id)
elif event_type == 'message':
message = {
'id': f'msg_{datetime.now().timestamp()}',
'userId': user_id,
'text': data.get('text'),
'roomId': room_id,
'timestamp': datetime.now().isoformat()
}
# Save to database
await self.save_message(message)
# Broadcast to room
await self.broadcast_to_room(room_id, message)
elif event_type == 'leave_room':
if room_id in self.rooms:
self.rooms[room_id].discard(user_id)
except Exception as error:
await ws.send_json({
'type': 'error',
'message': str(error)
})
# Cleanup on disconnect
if user_id:
del self.users[user_id]
if room_id and user_id:
if room_id in self.rooms:
self.rooms[room_id].discard(user_id)
return ws
async def broadcast_to_room(self, room_id, message, exclude=None):
if room_id not in self.rooms:
return
for user_id in self.rooms[room_id]:
if user_id != exclude and user_id in self.users:
try:
await self.users[user_id].send_json(message)
except Exception as error:
print(f'Error sending message: {error}')
async def save_message(self, message):
# Save to database
pass
async def send_message_api(self, request):
data = await request.json()
room_id = data.get('roomId')
await self.broadcast_to_room(room_id, {
'type': 'message',
'text': data.get('text'),
'timestamp': datetime.now().isoformat()
})
return web.json_response({'sent': True})
def create_app():
server = WebSocketServer()
return server.app
if __name__ == '__main__':
app = create_app()
web.run_app(app, port=3000)
4. Message Types and Protocols
// Authentication
{
"type": "auth",
"userId": "user123",
"token": "jwt_token_here"
}
// Chat Message
{
"type": "message",
"roomId": "room123",
"text": "Hello everyone!",
"timestamp": "2025-01-15T10:30:00Z"
}
// Typing Indicator
{
"type": "typing",
"roomId": "room123",
"isTyping": true
}
// Presence
{
"type": "presence",
"status": "online|away|offline"
}
// Notification
{
"type": "notification",
"title": "New message",
"body": "You have a new message",
"data": {}
}
5. Scaling with Redis
const redis = require('redis');
const { createAdapter } = require('@socket.io/redis-adapter');
const { createClient } = require('redis');
const pubClient = createClient({ host: 'redis', port: 6379 });
const subClient = pubClient.duplicate();
io.adapter(createAdapter(pubClient, subClient));
// Publish to multiple servers
io.emit('user:action', { userId: 123, action: 'login' });
// Subscribe to events from other servers
redisClient.subscribe('notifications', (message) => {
const notification = JSON.parse(message);
io.to(`user:${notification.userId}`).emit('notification', notification);
});
Best Practices
✅ DO
- Implement proper authentication
- Handle reconnection gracefully
- Manage rooms/channels effectively
- Persist messages appropriately
- Monitor active connections
- Implement presence features
- Use Redis for scaling
- Add message acknowledgment
- Implement rate limiting
- Handle errors properly
❌ DON'T
- Send unencrypted sensitive data
- Keep unlimited message history in memory
- Allow arbitrary room/channel creation
- Forget to clean up disconnected connections
- Send large messages frequently
- Ignore network failures
- Store passwords in messages
- Skip authentication/authorization
- Create unbounded growth of connections
- Ignore scalability from day one
Monitoring
// Track active connections
io.engine.on('connection_error', (err) => {
console.log(err.req); // the request object
console.log(err.code); // the error code, e.g. 1
console.log(err.message); // the error message
console.log(err.context); // some additional error context
});
app.get('/metrics/websocket', (req, res) => {
res.json({
activeConnections: io.engine.clientsCount,
connectedSockets: io.sockets.sockets.size,
rooms: Object.keys(io.sockets.adapter.rooms)
});
});
Quick Install
/plugin add https://github.com/aj-geddes/useful-ai-prompts/tree/main/websocket-implementationCopy and paste this command in Claude Code to install this skill
GitHub 仓库
Related Skills
langchain
MetaLangChain is a framework for building LLM applications using agents, chains, and RAG pipelines. It supports multiple LLM providers, offers 500+ integrations, and includes features like tool calling and memory management. Use it for rapid prototyping and deploying production systems like chatbots, autonomous agents, and question-answering services.
Algorithmic Art Generation
MetaThis skill helps developers create algorithmic art using p5.js, focusing on generative art, computational aesthetics, and interactive visualizations. It automatically activates for topics like "generative art" or "p5.js visualization" and guides you through creating unique algorithms with features like seeded randomness, flow fields, and particle systems. Use it when you need to build reproducible, code-driven artistic patterns.
webapp-testing
TestingThis Claude Skill provides a Playwright-based toolkit for testing local web applications through Python scripts. It enables frontend verification, UI debugging, screenshot capture, and log viewing while managing server lifecycles. Use it for browser automation tasks but run scripts directly rather than reading their source code to avoid context pollution.
requesting-code-review
DesignThis skill dispatches a code-reviewer subagent to analyze code changes against requirements before proceeding. It should be used after completing tasks, implementing major features, or before merging to main. The review helps catch issues early by comparing the current implementation with the original plan.
