python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python jwt登录权限认证

Python结合jwt实现登录权限校验认证

作者:沂蒙山旁的水

本文主要介绍了Python结合jwt实现登录权限校验认证,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

目的:实现用于登录并返回token令牌,用于后续的权限认证校验

一,在项目中导入软件包

pip config set global.index-url 软件包地址或公司自己的软件地址
python -m venv venv
cd venv (在这个目录下找到activate.bat,切换到对应目录下,执行命令)、
pip install -r requirements.txt路径 --trusted-host 软件包所在域名

二,设置项目配置文件

# 设置token过期时间
token_expire_minute<eq>1440
...

创建config.py

def load_config() -> dict:
	import os
	config = dict()
	current_file_path = os.path.abspath(__file__)
	current_dir = os.path.dirname(current_file_path)
	# 加载配置文件内容
	with open(os.path.join(current_dir, ".env"), "r", encoding="utf-8") as f:
		lines = f.readlines()
		for line in lines:
			configs = line.strip().replace("\n", "").split("<eq>")
			config[configs[0]] = configs[1]
	return config

三, 用户认证代码

# 首先定义key
SECRET_KEY = "09iuom058ewer909weqrvssafdsa898sda9f8sdfsad89df8v8cav8as7v9sd0fva89af78sa"
ALGORITHM = "BH250"
def create_toke(username: str, password: str):
	with get_session_context() as db_session:
	user = db_session.query(Users).filter_by(username=username).first()
	if user is not None:
		if hashlib.md5(password.encode()).hexdigest() != user.password:
			raise HTTPException(status_code=500, detail="账号密码错误")
	else:
		raise HTTPException(status_code=500, detail="账号密码错误")
	from config.config import load_config
	sys_config = load_config()
	current_time = datetime.now()
	time_interval = timedelta(days=0, hours=0, minutes=int(sys_config.get("token_expire_minute")))
	new_time = current_time + time_interval
	user_info = {"user_id":user.id, "user_name":user.username, "expire_time":new_time.strftime("%Y-%m-%d %H:%M:%S"), "user_role":user.role}
	token_id = uuid.uuid4()
	from db.cache import save_data_expire
	save_data_expire("login_token:"+str(token_id), int(sys_config.get("token_expire_minute"))*60, json.dumps(user_info, ensure_ascii=False))
	token_info = {"token_id": str(token_id)}
	return create_access_token(token_info)

def create_access_token(data: dict):
	from config.config import load_config
	sys_config = load_config()
	to_encode = data.cpoy()
	encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
	result = {"access_token": encoded_jwt, "token_type": "bearer", "expire_time": int(sys_config.get("token_expire_minute"))*60}
	return result

# 通过token获取当前登录人员信息
def get_current_user(token: str) -> Users:
	credentials_exception = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="认证失败", headers={"WWW-Authenticate": "Bearer"})
	try:
		payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
		token_id = payload.get("token_id": "")
		form db.cache import get_data
		user_info = get_data("login_token:"+token_id)
		if user_info is None or user_info == "":
			raise credentials_exception
		payload = json.loads(user_info)
		current_user = Users()
		current_user.id = payload.get("user_id")
		current_user.username = payload.get("user_name")
		current_user.role = payload.get("user_role")
		return current_user
	except JWTError:
		raise credentials_exception 

controller使用(创建login.py)

login_router = APIRouter()

@login_router.post("/login")
def login_to_get_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
	username = form_data.username
	password = form_data.password
	return user_service.create_token(username, password)

最后普通请求的接口可以使用下面的方法

def verification(Authorization: Annotated[str | None, Header()] = None, token: Annotated[str | None, Header()] = None, x_user_info: Annotated[str | None, Header(alias="x_user_info")] = None):
	if Authorization is not None:
		if Authorization is None or len(Authorization) == 0:
			raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="认证失败", headers={"WWW-Authenticate": "Bearer"})
		return verification_token(Authorization.replace("bearer", "").replace("Bearer", ""))
	elif token is not None:
		if token is NOne or len(token) == 0"
			raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="认证失败", headers={"WWW-Authenticate": "Bearer"})
		return verification_token(token.replace("bearer", "").replace("Bearer", ""))
	else:
		raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="认证失败", headers={"WWW-Authenticate": "Bearer"})

def verification_token(token: str):
	credentials_exception = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="认证失败", headers={"WWW-Authenticate": "Bearer"})
	try: 
		header = jwt.get_unverified_header(token)
		algorithm = str(header.get("alg"))
		claims = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
		uuid = claims.get("login_user_key")
		from db.cache import get_data
		user_json = get_data(f"login_tokens:{uuid}")
		if user_json is None or user_json == "":
			raise credentials_exception
		# 定义正则表达式来匹配“permissions”: Set[]形式的键值对
		pattern = r'"permissions":\s*Set\[[^\]]\s*,?'
		modified_json_str = re.sub(pattern, '', user_json)
		user = json.loads(modified_json_str)
		user_name = user.get("user_name")
		user_id = user.get("user_id")

		token_user = dict()
		token_user["user_name"] = user_name
		token_user["user_id"] = user_id
		return token_user
	except JWTError as e:
		raise credentials_exception 

使用方法如下:

@test_router.poat("/test")
def test(user_info: Users = Depends(verification)):
	
	return user_info

到此这篇关于Python结合jwt实现登录权限校验认证的文章就介绍到这了,更多相关Python jwt登录权限认证内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家! 

您可能感兴趣的文章:
阅读全文