Python结合jwt实现登录权限校验认证
作者:沂蒙山旁的水
本文主要介绍了Python结合jwt实现登录权限校验认证,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
目的:实现用于登录并返回token令牌,用于后续的权限认证校验
一,在项目中导入软件包
- 在项目根目录,创建requirements.txt文件
- 设置jose软件包版本
- 执行以下命令
pip config set global.index-url 软件包地址或公司自己的软件地址 python -m venv venv cd venv (在这个目录下找到activate.bat,切换到对应目录下,执行命令)、 pip install -r requirements.txt路径 --trusted-host 软件包所在域名
二,设置项目配置文件
- 在项目根目录创建config
- 进入该目录下创建.env文件
- 设置项目参数
# 设置token过期时间 token_expire_minute<eq>1440 ...
创建config.py
def load_config() -> dict:
import os
config = dict()
current_file_path = os.path.abspath(__file__)
current_dir = os.path.dirname(current_file_path)
# 加载配置文件内容
with open(os.path.join(current_dir, ".env"), "r", encoding="utf-8") as f:
lines = f.readlines()
for line in lines:
configs = line.strip().replace("\n", "").split("<eq>")
config[configs[0]] = configs[1]
return config
三, 用户认证代码
- 创建token的方法(创建user_service.py)
# 首先定义key
SECRET_KEY = "09iuom058ewer909weqrvssafdsa898sda9f8sdfsad89df8v8cav8as7v9sd0fva89af78sa"
ALGORITHM = "BH250"
def create_toke(username: str, password: str):
with get_session_context() as db_session:
user = db_session.query(Users).filter_by(username=username).first()
if user is not None:
if hashlib.md5(password.encode()).hexdigest() != user.password:
raise HTTPException(status_code=500, detail="账号密码错误")
else:
raise HTTPException(status_code=500, detail="账号密码错误")
from config.config import load_config
sys_config = load_config()
current_time = datetime.now()
time_interval = timedelta(days=0, hours=0, minutes=int(sys_config.get("token_expire_minute")))
new_time = current_time + time_interval
user_info = {"user_id":user.id, "user_name":user.username, "expire_time":new_time.strftime("%Y-%m-%d %H:%M:%S"), "user_role":user.role}
token_id = uuid.uuid4()
from db.cache import save_data_expire
save_data_expire("login_token:"+str(token_id), int(sys_config.get("token_expire_minute"))*60, json.dumps(user_info, ensure_ascii=False))
token_info = {"token_id": str(token_id)}
return create_access_token(token_info)
def create_access_token(data: dict):
from config.config import load_config
sys_config = load_config()
to_encode = data.cpoy()
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
result = {"access_token": encoded_jwt, "token_type": "bearer", "expire_time": int(sys_config.get("token_expire_minute"))*60}
return result
# 通过token获取当前登录人员信息
def get_current_user(token: str) -> Users:
credentials_exception = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="认证失败", headers={"WWW-Authenticate": "Bearer"})
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
token_id = payload.get("token_id": "")
form db.cache import get_data
user_info = get_data("login_token:"+token_id)
if user_info is None or user_info == "":
raise credentials_exception
payload = json.loads(user_info)
current_user = Users()
current_user.id = payload.get("user_id")
current_user.username = payload.get("user_name")
current_user.role = payload.get("user_role")
return current_user
except JWTError:
raise credentials_exception
controller使用(创建login.py)
login_router = APIRouter()
@login_router.post("/login")
def login_to_get_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
username = form_data.username
password = form_data.password
return user_service.create_token(username, password)
最后普通请求的接口可以使用下面的方法
def verification(Authorization: Annotated[str | None, Header()] = None, token: Annotated[str | None, Header()] = None, x_user_info: Annotated[str | None, Header(alias="x_user_info")] = None):
if Authorization is not None:
if Authorization is None or len(Authorization) == 0:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="认证失败", headers={"WWW-Authenticate": "Bearer"})
return verification_token(Authorization.replace("bearer", "").replace("Bearer", ""))
elif token is not None:
if token is NOne or len(token) == 0"
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="认证失败", headers={"WWW-Authenticate": "Bearer"})
return verification_token(token.replace("bearer", "").replace("Bearer", ""))
else:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="认证失败", headers={"WWW-Authenticate": "Bearer"})
def verification_token(token: str):
credentials_exception = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="认证失败", headers={"WWW-Authenticate": "Bearer"})
try:
header = jwt.get_unverified_header(token)
algorithm = str(header.get("alg"))
claims = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
uuid = claims.get("login_user_key")
from db.cache import get_data
user_json = get_data(f"login_tokens:{uuid}")
if user_json is None or user_json == "":
raise credentials_exception
# 定义正则表达式来匹配“permissions”: Set[]形式的键值对
pattern = r'"permissions":\s*Set\[[^\]]\s*,?'
modified_json_str = re.sub(pattern, '', user_json)
user = json.loads(modified_json_str)
user_name = user.get("user_name")
user_id = user.get("user_id")
token_user = dict()
token_user["user_name"] = user_name
token_user["user_id"] = user_id
return token_user
except JWTError as e:
raise credentials_exception
使用方法如下:
@test_router.poat("/test")
def test(user_info: Users = Depends(verification)):
return user_info到此这篇关于Python结合jwt实现登录权限校验认证的文章就介绍到这了,更多相关Python jwt登录权限认证内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
