WebSockets¶
您可以在 FastAPI 中使用 WebSockets。
安装 WebSockets
¶
首先,您需要安装 WebSockets
:
$ pip install websockets
---> 100%
WebSockets 客户端¶
在生产环境中¶
在您的生产系统中,您可能使用现代框架(如React、Vue.js或Angular)创建了一个前端。
要使用 WebSockets 与后端进行通信,您可能会使用前端的工具。
或者,您可能有一个原生移动应用程序,直接使用原生代码与 WebSocket 后端通信。
或者,您可能有其他与 WebSocket 终端通信的方式。
但是,在本示例中,我们将使用一个非常简单的HTML文档,其中包含一些JavaScript,全部放在一个长字符串中。
当然,这并不是最优的做法,您不应该在生产环境中使用它。
在生产环境中,您应该选择上述任一选项。
但这是一种专注于 WebSockets 的服务器端并提供一个工作示例的最简单方式:
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
创建 websocket
¶
在您的 FastAPI 应用程序中,创建一个 websocket
:
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
技术细节
您也可以使用 from starlette.websockets import WebSocket
。
FastAPI 直接提供了相同的 WebSocket
,只是为了方便开发人员。但它直接来自 Starlette。
等待消息并发送消息¶
在您的 WebSocket 路由中,您可以使用 await
等待消息并发送消息。
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
您可以接收和发送二进制、文本和 JSON 数据。
尝试一下¶
如果您的文件名为 main.py
,请使用以下命令运行应用程序:
$ uvicorn main:app --reload
<span style="color: green;">INFO</span>: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
在浏览器中打开 http://127.0.0.1:8000。
您将看到一个简单的页面,如下所示:
您可以在输入框中输入消息并发送:
您的 FastAPI 应用程序将回复:
您可以发送(和接收)多条消息:
所有这些消息都将使用同一个 WebSocket 连
接。
使用 Depends
和其他依赖项¶
在 WebSocket 端点中,您可以从 fastapi
导入并使用以下内容:
Depends
Security
Cookie
Header
Path
Query
它们的工作方式与其他 FastAPI 端点/ 路径操作 相同:
from typing import Annotated
from fastapi import (
Cookie,
Depends,
FastAPI,
Query,
WebSocket,
WebSocketException,
status,
)
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<label>Item ID: <input type="text" id="itemId" autocomplete="off" value="foo"/></label>
<label>Token: <input type="text" id="token" autocomplete="off" value="some-key-token"/></label>
<button onclick="connect(event)">Connect</button>
<hr>
<label>Message: <input type="text" id="messageText" autocomplete="off"/></label>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = null;
function connect(event) {
var itemId = document.getElementById("itemId")
var token = document.getElementById("token")
ws = new WebSocket("ws://localhost:8000/items/" + itemId.value + "/ws?token=" + token.value);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
event.preventDefault()
}
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
async def get_cookie_or_token(
websocket: WebSocket,
session: Annotated[str | None, Cookie()] = None,
token: Annotated[str | None, Query()] = None,
):
if session is None and token is None:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
return session or token
@app.websocket("/items/{item_id}/ws")
async def websocket_endpoint(
*,
websocket: WebSocket,
item_id: str,
q: int | None = None,
cookie_or_token: Annotated[str, Depends(get_cookie_or_token)],
):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(
f"Session cookie or query token value is: {cookie_or_token}"
)
if q is not None:
await websocket.send_text(f"Query parameter q is: {q}")
await websocket.send_text(f"Message text was: {data}, for item ID: {item_id}")
from typing import Annotated, Union
from fastapi import (
Cookie,
Depends,
FastAPI,
Query,
WebSocket,
WebSocketException,
status,
)
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<label>Item ID: <input type="text" id="itemId" autocomplete="off" value="foo"/></label>
<label>Token: <input type="text" id="token" autocomplete="off" value="some-key-token"/></label>
<button onclick="connect(event)">Connect</button>
<hr>
<label>Message: <input type="text" id="messageText" autocomplete="off"/></label>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = null;
function connect(event) {
var itemId = document.getElementById("itemId")
var token = document.getElementById("token")
ws = new WebSocket("ws://localhost:8000/items/" + itemId.value + "/ws?token=" + token.value);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
event.preventDefault()
}
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
async def get_cookie_or_token(
websocket: WebSocket,
session: Annotated[Union[str, None], Cookie()] = None,
token: Annotated[Union[str, None], Query()] = None,
):
if session is None and token is None:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
return session or token
@app.websocket("/items/{item_id}/ws")
async def websocket_endpoint(
*,
websocket: WebSocket,
item_id: str,
q: Union[int, None] = None,
cookie_or_token: Annotated[str, Depends(get_cookie_or_token)],
):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(
f"Session cookie or query token value is: {cookie_or_token}"
)
if q is not None:
await websocket.send_text(f"Query parameter q is: {q}")
await websocket.send_text(f"Message text was: {data}, for item ID: {item_id}")
from typing import Union
from fastapi import (
Cookie,
Depends,
FastAPI,
Query,
WebSocket,
WebSocketException,
status,
)
from fastapi.responses import HTMLResponse
from typing_extensions import Annotated
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<label>Item ID: <input type="text" id="itemId" autocomplete="off" value="foo"/></label>
<label>Token: <input type="text" id="token" autocomplete="off" value="some-key-token"/></label>
<button onclick="connect(event)">Connect</button>
<hr>
<label>Message: <input type="text" id="messageText" autocomplete="off"/></label>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = null;
function connect(event) {
var itemId = document.getElementById("itemId")
var token = document.getElementById("token")
ws = new WebSocket("ws://localhost:8000/items/" + itemId.value + "/ws?token=" + token.value);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
event.preventDefault()
}
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
async def get_cookie_or_token(
websocket: WebSocket,
session: Annotated[Union[str, None], Cookie()] = None,
token: Annotated[Union[str, None], Query()] = None,
):
if session is None and token is None:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
return session or token
@app.websocket("/items/{item_id}/ws")
async def websocket_endpoint(
*,
websocket: WebSocket,
item_id: str,
q: Union[int, None] = None,
cookie_or_token: Annotated[str, Depends(get_cookie_or_token)],
):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(
f"Session cookie or query token value is: {cookie_or_token}"
)
if q is not None:
await websocket.send_text(f"Query parameter q is: {q}")
await websocket.send_text(f"Message text was: {data}, for item ID: {item_id}")
Tip
如果可能,请尽量使用 Annotated
版本。
from fastapi import (
Cookie,
Depends,
FastAPI,
Query,
WebSocket,
WebSocketException,
status,
)
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<label>Item ID: <input type="text" id="itemId" autocomplete="off" value="foo"/></label>
<label>Token: <input type="text" id="token" autocomplete="off" value="some-key-token"/></label>
<button onclick="connect(event)">Connect</button>
<hr>
<label>Message: <input type="text" id="messageText" autocomplete="off"/></label>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = null;
function connect(event) {
var itemId = document.getElementById("itemId")
var token = document.getElementById("token")
ws = new WebSocket("ws://localhost:8000/items/" + itemId.value + "/ws?token=" + token.value);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
event.preventDefault()
}
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
async def get_cookie_or_token(
websocket: WebSocket,
session: str | None = Cookie(default=None),
token: str | None = Query(default=None),
):
if session is None and token is None:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
return session or token
@app.websocket("/items/{item_id}/ws")
async def websocket_endpoint(
websocket: WebSocket,
item_id: str,
q: int | None = None,
cookie_or_token: str = Depends(get_cookie_or_token),
):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(
f"Session cookie or query token value is: {cookie_or_token}"
)
if q is not None:
await websocket.send_text(f"Query parameter q is: {q}")
await websocket.send_text(f"Message text was: {data}, for item ID: {item_id}")
Tip
如果可能,请尽量使用 Annotated
版本。
from typing import Union
from fastapi import (
Cookie,
Depends,
FastAPI,
Query,
WebSocket,
WebSocketException,
status,
)
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<label>Item ID: <input type="text" id="itemId" autocomplete="off" value="foo"/></label>
<label>Token: <input type="text" id="token" autocomplete="off" value="some-key-token"/></label>
<button onclick="connect(event)">Connect</button>
<hr>
<label>Message: <input type="text" id="messageText" autocomplete="off"/></label>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = null;
function connect(event) {
var itemId = document.getElementById("itemId")
var token = document.getElementById("token")
ws = new WebSocket("ws://localhost:8000/items/" + itemId.value + "/ws?token=" + token.value);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
event.preventDefault()
}
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
async def get_cookie_or_token(
websocket: WebSocket,
session: Union[str, None] = Cookie(default=None),
token: Union[str, None] = Query(default=None),
):
if session is None and token is None:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
return session or token
@app.websocket("/items/{item_id}/ws")
async def websocket_endpoint(
websocket: WebSocket,
item_id: str,
q: Union[int, None] = None,
cookie_or_token: str = Depends(get_cookie_or_token),
):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(
f"Session cookie or query token value is: {cookie_or_token}"
)
if q is not None:
await websocket.send_text(f"Query parameter q is: {q}")
await websocket.send_text(f"Message text was: {data}, for item ID: {item_id}")
尝试带有依赖项的 WebSockets¶
如果您的文件名为 main.py
,请使用以下命令运行应用程序:
$ uvicorn main:app --reload
<span style="color: green;">INFO</span>: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
在浏览器中打开 http://127.0.0.1:8000。
在页面中,您可以设置:
- "Item ID",用于路径。
- "Token",作为查询参数。
Tip
注意,查询参数 token
将由依赖项处理。
通过这样,您可以连接 WebSocket,然后发送和接收消息:
处理断开连接和多个客户端¶
当 WebSocket 连接关闭时,await websocket.receive_text()
将引发 WebSocketDisconnect
异常,您可以捕获并处理该异常,就像本示例中的示例一样。
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<h2>Your ID: <span id="ws-id"></span></h2>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var client_id = Date.now()
document.querySelector("#ws-id").textContent = client_id;
var ws = new WebSocket(`ws://localhost:8000/ws/${client_id}`);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.send_personal_message(f"You wrote: {data}", websocket)
await manager.broadcast(f"Client #{client_id} says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")
from typing import List
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<h2>Your ID: <span id="ws-id"></span></h2>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var client_id = Date.now()
document.querySelector("#ws-id").textContent = client_id;
var ws = new WebSocket(`ws://localhost:8000/ws/${client_id}`);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.send_personal_message(f"You wrote: {data}", websocket)
await manager.broadcast(f"Client #{client_id} says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")
尝试以下操作:
- 使用多个浏览器选项卡打开应用程序。
- 从这些选项卡中发送消息。
- 然后关闭其中一个选项卡。
这将引发 WebSocketDisconnect
异常,并且所有其他客户端都会收到类似以下的消息:
Client #1596980209979 left the chat
Tip
上面的应用程序是一个最小和简单的示例,用于演示如何处理和向多个 WebSocket 连接广播消息。
但请记住,由于所有内容都在内存中以单个列表的形式处理,因此它只能在进程运行时工作,并且只能使用单个进程。
如果您需要与 FastAPI 集成更简单但更强大的功能,支持 Redis、PostgreSQL 或其他功能,请查看 encode/broadcaster。
更多信息¶
要了解更多选项,请查看 Starlette 的文档: