Real-time communication has changed how we interact with web applications. From live chat systems to collaborative editing tools, users expect instant updates without refreshing their browsers. Traditional HTTP can’t deliver this experience efficiently.
WebSockets solve this problem. They create a persistent, bidirectional connection between client and server. Unlike HTTP, which requires a new request for every interaction, WebSockets keep the channel open. This means instant data transfer in both directions.
FastAPI makes WebSocket implementation straightforward. The framework handles connection management, async operations, and error handling with minimal code. You can build production-ready real-time features in hours, not days.
Prerequisites
You’ll need Python 3.8 or higher installed on your system. Basic knowledge of async/await syntax helps, but isn’t required. Familiarity with HTTP concepts makes understanding WebSockets easier.
Install the required packages:
pip install fastapi uvicorn websockets python-multipart
FastAPI provides the web framework. Uvicorn is the ASGI server. The websockets library handles the protocol implementation. Python-multipart enables form data processing if you need it.
Step 1: Setting Up Your FastAPI Project
Create a new directory for your project. Inside, make a file called main.py:
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
@app.get("/")
async def get():
return HTMLResponse("""
<html>
<head>
<title>WebSocket Test</title>
</head>
<body>
<h1>WebSocket Testing Page</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'></ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
""")
This creates a basic HTML interface. The JavaScript code connects to the WebSocket endpoint and handles sending messages. When messages arrive, they appear in the list.
Run your application:
uvicorn main:app --reload
The server starts on http://localhost:8000. Visit this URL in your browser. You’ll see the testing page, but the WebSocket won’t connect yet because we haven’t created the endpoint.
Step 2: Building Your First WebSocket Endpoint
Add this WebSocket route to your main.py:
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message received: {data}")
except Exception as e:
print(f"Connection closed: {e}")
The accept() method establishes the connection. The while loop keeps listening for messages. When data arrives, the server echoes it back with a prefix.
Restart your server and open the test page. Type a message and click send. You should see your message echoed back instantly. This demonstrates the basic WebSocket cycle: connect, send, receive, disconnect.
The try-except block handles connection closures. When a client disconnects, the exception fires and the loop exits. Without this, your server would crash on disconnection.
Step 3: Handling Multiple Connections
A single echo endpoint isn’t useful. Real applications need to broadcast messages to multiple clients. Create a connection manager class:
from typing import List
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
This manager tracks all connected clients in a list. The broadcast() method sends messages to everyone. When clients connect or disconnect, the list updates automatically.
Update your WebSocket endpoint:
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f"Client says: {data}")
except Exception:
manager.disconnect(websocket)
Now open multiple browser tabs. Send a message from one tab and watch it appear in all tabs simultaneously. This is the foundation of any real-time collaborative application.
The broadcast approach works fine for small applications. For larger systems with thousands of connections, you’ll need a message queue like Redis or RabbitMQ. These tools distribute messages across multiple server instances.
Step 4: Building a Real-Time Chat Feature
Let’s build something practical: a chat room where users can join, send messages, and see who’s online. First, expand the ConnectionManager:
from typing import Dict
class ChatManager:
def __init__(self):
self.connections: Dict[str, WebSocket] = {}
async def connect(self, websocket: WebSocket, client_id: str):
await websocket.accept()
self.connections[client_id] = websocket
await self.broadcast(f"{client_id} joined the chat")
def disconnect(self, client_id: str):
if client_id in self.connections:
del self.connections[client_id]
async def send_personal(self, message: str, client_id: str):
if client_id in self.connections:
websocket = self.connections[client_id]
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.connections.values():
await connection.send_text(message)
chat_manager = ChatManager()
This version uses a dictionary instead of a list. Each connection has a unique identifier. This enables private messages and user tracking.
Create a new endpoint with client identification:
@app.websocket("/ws/{client_id}")
async def chat_endpoint(websocket: WebSocket, client_id: str):
await chat_manager.connect(websocket, client_id)
try:
while True:
data = await websocket.receive_text()
await chat_manager.broadcast(f"{client_id}: {data}")
except Exception:
chat_manager.disconnect(client_id)
await chat_manager.broadcast(f"{client_id} left the chat")
The URL now includes a client ID parameter. When someone connects, everyone gets notified. When they disconnect, everyone gets notified again. This creates presence awareness.
Update the HTML to capture a username:
var clientId = prompt("Enter your name:");
var ws = new WebSocket(`ws://localhost:8000/ws/${clientId}`);
Test this by opening multiple browser windows with different names. You’ll see join notifications, messages with sender names, and leave notifications. This is a functional chat system in under 50 lines of Python.
Step 5: Error Handling and Reconnection
WebSocket connections drop frequently. Network hiccups, server restarts, or client-side issues can break the connection. Your application needs to handle these scenarios gracefully.
Add connection state tracking:
from datetime import datetime
class RobustChatManager:
def __init__(self):
self.connections: Dict[str, WebSocket] = {}
self.connection_times: Dict[str, datetime] = {}
async def connect(self, websocket: WebSocket, client_id: str):
await websocket.accept()
self.connections[client_id] = websocket
self.connection_times[client_id] = datetime.now()
online_users = len(self.connections)
await self.broadcast(f"{client_id} joined. {online_users} users online.")
async def send_with_retry(self, message: str, websocket: WebSocket, retries: int = 3):
for attempt in range(retries):
try:
await websocket.send_text(message)
return True
except Exception as e:
if attempt == retries - 1:
return False
await asyncio.sleep(0.1 * (attempt + 1))
return False
async def broadcast(self, message: str):
disconnected = []
for client_id, connection in self.connections.items():
success = await self.send_with_retry(message, connection)
if not success:
disconnected.append(client_id)
for client_id in disconnected:
self.disconnect(client_id)
The send_with_retry() method attempts to send messages multiple times. If all attempts fail, it marks the connection as dead. The broadcast function removes failed connections automatically.
Add client-side reconnection logic:
function connectWebSocket() {
var ws = new WebSocket(`ws://localhost:8000/ws/${clientId}`);
ws.onclose = function(e) {
console.log('Connection closed. Reconnecting in 3 seconds...');
setTimeout(function() {
connectWebSocket();
}, 3000);
};
ws.onerror = function(err) {
console.error('WebSocket error:', err);
ws.close();
};
return ws;
}
var ws = connectWebSocket();
When the connection drops, the client waits three seconds and reconnects. This creates a resilient system that recovers from temporary network issues.
For production applications, add exponential backoff. Start with a short delay and increase it with each failed attempt. This prevents overwhelming the server with reconnection attempts.
Common Pitfalls
Memory leaks from unclosed connections: Always remove connections from your manager when they close. A forgotten connection stays in memory forever. Use try-finally blocks to ensure cleanup happens:
@app.websocket("/ws/{client_id}")
async def chat_endpoint(websocket: WebSocket, client_id: str):
await chat_manager.connect(websocket, client_id)
try:
while True:
data = await websocket.receive_text()
await chat_manager.broadcast(f"{client_id}: {data}")
finally:
chat_manager.disconnect(client_id)
Blocking operations in the event loop: Never use synchronous I/O inside WebSocket handlers. Database queries, file operations, and API calls must be async. One blocking operation freezes all connections:
# Bad - blocks all connections
def slow_operation():
time.sleep(5)
return "done"
# Good - other connections stay responsive
async def async_operation():
await asyncio.sleep(5)
return "done"
Missing message validation: Clients can send anything. Validate and sanitize all incoming data. Check message length, format, and content before broadcasting:
async def validate_message(data: str) -> bool:
if len(data) > 1000:
return False
if not data.strip():
return False
return True
# In your endpoint
data = await websocket.receive_text()
if await validate_message(data):
await chat_manager.broadcast(f"{client_id}: {data}")
else:
await websocket.send_text("Invalid message format")
No rate limiting: Users can spam your server with messages. Implement rate limiting to protect against abuse:
from collections import defaultdict
import time
class RateLimiter:
def __init__(self, max_messages: int = 10, window: int = 60):
self.max_messages = max_messages
self.window = window
self.message_counts: Dict[str, List[float]] = defaultdict(list)
def check_rate_limit(self, client_id: str) -> bool:
now = time.time()
messages = self.message_counts[client_id]
# Remove old messages outside the window
messages[:] = [t for t in messages if now - t < self.window]
if len(messages) >= self.max_messages:
return False
messages.append(now)
return True
rate_limiter = RateLimiter()
# In your endpoint
if not rate_limiter.check_rate_limit(client_id):
await websocket.send_text("Rate limit exceeded. Slow down.")
continue
This allows 10 messages per minute per user. Adjust these values based on your application’s needs.
Summary
FastAPI makes WebSocket implementation accessible. You learned to create basic endpoints, manage multiple connections, and build a chat system. Error handling and reconnection strategies make your application production-ready.
WebSockets enable real-time features that HTTP can’t match. Chat applications, live dashboards, collaborative tools, and multiplayer games all rely on this technology. The bidirectional connection eliminates polling and reduces latency.
Start with simple endpoints. Add complexity gradually as your requirements grow. Test with multiple clients to catch concurrency issues early. Monitor connection counts and message throughput in production.
For high-traffic applications, consider Redis for pub/sub messaging. This lets you scale horizontally across multiple servers. Use connection pooling and load balancing to handle thousands of concurrent users.
The code examples in this tutorial give you a solid foundation. Modify them to fit your specific use case. Add authentication, database persistence, or custom message formats as needed. WebSockets are flexible enough to support any real-time communication pattern your application requires.
Discussion
Leave a comment
No comments yet
Be the first to start the conversation.