python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python JWT认证指南

Python中实现JWT认证的完整指南(从生成到验证)

作者:盛夏绽放

JSON Web Tokens (JWT) 是现代 Web 开发中广泛使用的身份验证机制,本文将用生动的方式带你全面了解 JWT 在 Python 中的实现,包括生成、验证和各种相关方法,需要的朋友可以参考下

引言

JSON Web Tokens (JWT) 是现代 Web 开发中广泛使用的身份验证机制。本文将用生动的方式带你全面了解 JWT 在 Python 中的实现,包括生成、验证和各种相关方法,通过丰富的比喻、表格和流程图帮助你彻底掌握 JWT。

一、JWT 是什么?为什么需要它?

想象你去游乐园,入园时会得到一个手环。这个手环:

JWT 就是这样的数字手环:

游乐园手环JWT 令牌
门票类型用户角色
入园时间签发时间(iat)
闭园时间过期时间(exp)
防伪标记数字签名
手环材质加密算法

传统 session 与 JWT 对比

特性SessionJWT
存储位置服务器客户端
扩展性需要共享session天然无状态
跨域支持需要配置原生支持
移动端友好度需处理cookie直接使用header
安全性依赖cookie安全依赖token存储方式
典型场景传统Web应用API/移动应用/微服务

二、JWT 的结构解析

一个 JWT 看起来像这样:xxxxx.yyyyy.zzzzz

就像三明治分三层:

Header (面包上层)

{
  "alg": "HS256",  // 签名算法(HMAC SHA256)
  "typ": "JWT"     // 类型标识
}

Payload (馅料)

{
  "sub": "user123",  // 主题(用户ID)
  "name": "张三",
  "admin": true,
  "iat": 1516239022  // 签发时间
}

Signature (面包下层+防伪标记)

HMACSHA256(
  base64UrlEncode(header) + "." +
  base64UrlEncode(payload),
  密钥
)

生成流程

[Header] → base64编码 → xxxxx
[Payload] → base64编码 → yyyyy
[xxxxx.yyyyy + 密钥] → 签名算法 → zzzzz
最终令牌:xxxxx.yyyyy.zzzzz

三、Python 中实现 JWT

1. 安装 PyJWT 包

pip install pyjwt

2. 生成 JWT

import jwt
import datetime
from datetime import timezone

# 建议从环境变量读取
SECRET_KEY = "your_super_secret_key"

def generate_jwt(user_id: str, username: str, role: str) -> str:
    """生成JWT令牌"""
    payload = {
        "sub": user_id,  # 标准字段:主题
        "name": username,
        "role": role,
        "iat": datetime.datetime.now(tz=timezone.utc),  # 签发时间
        "exp": datetime.datetime.now(tz=timezone.utc) + datetime.timedelta(hours=1)  # 过期时间
    }
    return jwt.encode(payload, SECRET_KEY, algorithm="HS256")

参数详解表

参数类型必填说明示例
payloaddict负载数据{“sub”: “user123”}
keystr签名密钥“secret”
algorithmstr签名算法“HS256”
headersdict额外头部{“kid”: “key1”}

标准声明字段(建议)

字段全称说明示例
subSubject主题(用户ID)“user123”
expExpiration过期时间1735689600
iatIssued At签发时间1735686000
audAudience接收方“mobile-app”
issIssuer签发者“auth-server”

3. 验证 JWT

def verify_jwt(token: str) -> dict:
    """验证JWT令牌"""
    try:
        payload = jwt.decode(
            token, 
            SECRET_KEY, 
            algorithms=["HS256"],
            audience="your-app",  # 验证接收方
            issuer="auth-server"  # 验证签发方
        )
        return payload
    except jwt.PyJWTError as e:
        print(f"Token验证失败: {type(e).__name__}: {e}")
        return None

验证流程示意图

4. 错误处理大全

from fastapi import HTTPException, status

def validate_token(token: str):
    try:
        return jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
    except jwt.ExpiredSignatureError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token已过期",
            headers={"WWW-Authenticate": "Bearer"}
        )
    except jwt.InvalidTokenError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"无效Token: {str(e)}",
            headers={"WWW-Authenticate": "Bearer"}
        )

异常类型表

异常类触发条件HTTP状态码处理建议
ExpiredSignatureErrorToken过期401提示重新登录
InvalidSignatureError签名无效401拒绝访问
InvalidTokenError通用错误401记录日志
MissingRequiredClaimError缺少必要声明400返回错误详情
InvalidIssuerError签发者不匹配403审计日志

四、高级应用场景

1. 双令牌系统(Access + Refresh)

以下是基于你的 JWT 认证流程整理的表格表示:

步骤流程节点条件/判断动作/响应备注
1用户登录提交账号密码系统验证凭证
2验证结果验证成功生成: - access_token(15分钟) - refresh_token(7天)
验证失败返回错误信息HTTP 401
3返回响应返回双token给客户端
4API访问携带access_token验证token有效性
5验证结果access_token有效正常返回请求数据HTTP 200
access_token无效检查是否存在refresh_token
6刷新检查存在有效refresh_token发放新access_tokenHTTP 200 + 新token
无有效refresh_token要求重新登录HTTP 401

详细说明表格:

阶段条件分支系统行为客户端响应HTTP状态码
登录阶段
1.1凭证正确生成双token接收: - access_token - refresh_token200
1.2凭证错误终止流程收到错误提示401
API访问阶段
2.1access_token有效处理请求获取正常数据200
2.2access_token过期检查refresh_token
Token刷新阶段
3.1refresh_token有效发放新access_token获取新token200
3.2refresh_token无效终止流程要求重新登录401

异常处理补充表:

异常情况系统处理客户端表现
access_token格式错误直接拒绝请求收到"无效token"错误
refresh_token过期清除客户端存储跳转登录页面
连续使用过期refresh_token标记为安全事件强制登出所有设备

实现代码:

def generate_token_pair(user_id: str):
    """生成令牌对"""
    access_payload = {
        "sub": user_id,
        "type": "access",
        "exp": datetime.datetime.now(tz=timezone.utc) + datetime.timedelta(minutes=15)
    }
    refresh_payload = {
        "sub": user_id,
        "type": "refresh",
        "exp": datetime.datetime.now(tz=timezone.utc) + datetime.timedelta(days=7)
    }
    
    access_token = jwt.encode(access_payload, SECRET_KEY, algorithm="HS256")
    refresh_token = jwt.encode(refresh_payload, SECRET_KEY, algorithm="HS256")
    
    return {
        "access_token": access_token,
        "refresh_token": refresh_token
    }

def refresh_access_token(refresh_token: str):
    """使用refresh_token获取新access_token"""
    try:
        payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=["HS256"])
        if payload.get("type") != "refresh":
            raise HTTPException(status_code=400, detail="无效的refresh token类型")
        
        new_payload = {
            "sub": payload["sub"],
            "type": "access",
            "exp": datetime.datetime.now(tz=timezone.utc) + datetime.timedelta(minutes=15)
        }
        return jwt.encode(new_payload, SECRET_KEY, algorithm="HS256")
    
    except jwt.PyJWTError as e:
        raise HTTPException(status_code=401, detail=f"refresh token无效: {str(e)}")

2. 与 FastAPI/Django 集成

FastAPI 示例

from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

app = FastAPI()

async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
    token = credentials.credentials
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        return payload
    except jwt.PyJWTError:
        raise HTTPException(status_code=401, detail="无效或过期的token")

@app.get("/protected")
async def protected_route(user: dict = Depends(get_current_user)):
    return {"message": f"你好, {user['sub']}!", "user_data": user}

Django 示例

from django.http import JsonResponse
from functools import wraps

def jwt_required(view_func):
    @wraps(view_func)
    def wrapper(request, *args, **kwargs):
        auth_header = request.headers.get('Authorization')
        if not auth_header or not auth_header.startswith('Bearer '):
            return JsonResponse({"error": "未提供token"}, status=401)
        
        token = auth_header.split(' ')[1]
        try:
            request.user = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
            return view_func(request, *args, **kwargs)
        except jwt.ExpiredSignatureError:
            return JsonResponse({"error": "token已过期"}, status=401)
        except jwt.InvalidTokenError:
            return JsonResponse({"error": "无效token"}, status=401)
    return wrapper

@jwt_required
def protected_view(request):
    return JsonResponse({"data": "受保护的内容", "user": request.user})

五、安全最佳实践

密钥管理

import os
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "fallback-secret")
keys = {
    "2023": "old-secret",
    "2024": "current-secret"
}

增强安全措施

def generate_secure_token(user, ip):
    """生成带IP绑定的token"""
    payload = {
        "sub": user.id,
        "ip": ip,  # 绑定客户端IP
        "jti": str(uuid.uuid4())  # 唯一标识防止重放
    }
    return jwt.encode(payload, SECRET_KEY, algorithm="HS256")

def verify_secure_token(token, ip):
    payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
    if payload.get("ip") != ip:
        raise ValueError("IP地址不匹配")
    return payload

黑名单实现

from redis import Redis

redis = Redis(host='localhost', port=6379)

def revoke_token(jti: str, expire_in: int):
    """将token加入黑名单"""
    redis.setex(f"blacklist:{jti}", expire_in, "revoked")

def is_revoked(jti: str) -> bool:
    """检查是否在黑名单"""
    return bool(redis.exists(f"blacklist:{jti}"))

六、性能优化技巧

选择更快的算法

算法性能比较表(执行时间,越小越好)

算法类型性能指标(单位:ms)备注
HS2561.2对称加密算法
RS2563.8RSA 非对称加密
ES2564.1ECDSA 非对称加密

详细对比表格:

特性对比HS256RS256ES256
算法类型对称加密非对称加密非对称加密
密钥管理共享密钥公钥/私钥公钥/私钥
签名速度⚡️ 1.2ms🐢 3.8ms🐢 4.1ms
验证速度⚡️ 最快🐢 慢🐢 最慢
安全性中等最高
适用场景内部服务公开API金融级应用

减少payload大小

# 不推荐 - payload过大
payload = {**user.__dict__, "iat": ..., "exp": ...}

# 推荐 - 只存储必要信息
payload = {
    "sub": user.id,
    "role": user.role,
    "iat": ...,
    "exp": ...
}

异步签名验证

import asyncio
from jwt import PyJWT

async def async_verify(token: str):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(
        None, 
        lambda: jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
    )

七、完整示例:FastAPI 实现

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import jwt
import datetime
from datetime import timezone
from typing import Optional

app = FastAPI()
security = HTTPBearer()

# 配置
SECRET_KEY = "your-secret-key-here"  # 生产环境应从环境变量获取
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE = datetime.timedelta(minutes=15)
REFRESH_TOKEN_EXPIRE = datetime.timedelta(days=7)

# 模拟数据库
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "password": "secret123",
        "role": "user"
    },
    "admin": {
        "username": "admin",
        "password": "admin123",
        "role": "admin"
    }
}

class Token(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str

class User(BaseModel):
    username: str
    role: Optional[str] = None

def create_tokens(username: str):
    """创建access和refresh令牌对"""
    access_payload = {
        "sub": username,
        "role": fake_users_db[username]["role"],
        "type": "access",
        "exp": datetime.datetime.now(tz=timezone.utc) + ACCESS_TOKEN_EXPIRE
    }
    refresh_payload = {
        "sub": username,
        "type": "refresh",
        "exp": datetime.datetime.now(tz=timezone.utc) + REFRESH_TOKEN_EXPIRE
    }
    
    access_token = jwt.encode(access_payload, SECRET_KEY, algorithm=ALGORITHM)
    refresh_token = jwt.encode(refresh_payload, SECRET_KEY, algorithm=ALGORITHM)
    
    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer"
    }

async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
    """依赖项:验证JWT并返回当前用户"""
    token = credentials.credentials
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        if payload.get("type") != "access":
            raise HTTPException(status_code=400, detail="需要access token")
        username = payload.get("sub")
        if username not in fake_users_db:
            raise HTTPException(status_code=404, detail="用户不存在")
        return User(username=username, role=payload.get("role"))
    except jwt.ExpiredSignatureError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token已过期",
            headers={"WWW-Authenticate": "Bearer"}
        )
    except jwt.InvalidTokenError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="无效Token",
            headers={"WWW-Authenticate": "Bearer"}
        )

@app.post("/login")
async def login(username: str, password: str):
    """登录接口"""
    if username not in fake_users_db or fake_users_db[username]["password"] != password:
        raise HTTPException(status_code=400, detail="用户名或密码错误")
    return create_tokens(username)

@app.post("/refresh")
async def refresh_token(refresh_token: str):
    """刷新access token"""
    try:
        payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
        if payload.get("type") != "refresh":
            raise HTTPException(status_code=400, detail="需要refresh token")
        return create_tokens(payload.get("sub"))
    except jwt.PyJWTError as e:
        raise HTTPException(status_code=401, detail=str(e))

@app.get("/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
    """获取当前用户信息"""
    return current_user

@app.get("/admin")
async def admin_route(current_user: User = Depends(get_current_user)):
    """管理员专属路由"""
    if current_user.role != "admin":
        raise HTTPException(status_code=403, detail="无权访问")
    return {"message": "欢迎管理员"}

结语

通过本文,你已经掌握了:

记住这些黄金法则:

现在,你已经准备好为你的 Python 应用实现强大的 JWT 认证系统了!

以上就是Python中JWT认证的完整指南(从生成到验证)的详细内容,更多关于Python JWT认证指南的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文