用Flask实现token登录校验的解决方案
作者:zhouqi1208
常用的登录方式
网站、小程序、APP 是否已经登录所代表的状态,代表一个概念是登录态。
我们常用的登录态验证方式主要是如下3种 cookie,session,token
cookie和session 采用的是缓存机制,是需要在浏览器端或服务器端,储存用户登录状态。
两者的特点,cookie 采用的浏览器缓存 相对于 session 采用的服务器缓存机制安全较差,但后者很怕用户过多给服务器代理过大的压力。
token 提供了另外一种不需要缓存账户和密码的登录状态验证方式。首次登录后,服务器会给客户端发送一个Token,这个Token相当于门禁卡,每次访问服务器的时候携带这个Token,服务器端就可以验证是谁登录了。
但为了防止Token被盗用,我们会把Token的有效期设置的很短。Token过期后,向服务器再申请新的Token.这样就可以更加安全的使用Token方式进行登录了。
用Flask实现TOKEN登录的解决方案
环境准备
python flask模板
flask-login 登录验证模板
Redis 缓存模板
登录验证机制
通过解密TOKEN获得有效的登录用户相关信息,并把登录信息保存在服务器端的Redis内。
这样,确保即使别人盗取了你的Token.如果和服务器Redis中保存的Token不一致。同样系统会判断登录不成功。
第一步 用户登录
auth.py
通过 https://自己的服务器地址/login 登录成功,并返回 access_token和refresh_token到前端。
from flask_restful import Api,Resource from db.models import * admin_login_parser = reqparse.RequestParser() admin_login_parser.add_argument("username", type=str, location='values', help='输入管理员用户名') admin_login_parser.add_argument("password", type=str, location='values', help='输入管理员密码') class Getlogin(Resource): """系统登录""" def post(self): args = admin_login_parser.parse_args() admin = User.query.filter(User.username == args.username).first() front_access_token, front_refresh_token = admin.get_id(life_time=7200, last_login_time=last_login_time, need_refresh_token=True) rd.set_redis_data('front_access_token'+'_'+str(admin.id), front_access_token) rd.set_redis_data('front_refresh_token'+'_'+str(admin.id),front_refresh_token) return jsonify(get_result(0, '登录成功', {'access_token': front_access_token, 'refresh_token': front_refresh_token, 'userInfo': userInfo}))
第二步 前端保存 token 每一个新的请求中(这里用的是Uniapp前端),把access_token和refresh_token 保存在前端。
/** * 调用微信登录 */ function login() { Utils.request(api.AuthLogin,{ username:'你的登录名', password:'你的密码' },'POST').then(res=>{ uni.setStorageSync('isLogin',true) uni.setStorageSync('userInfo', res.data.userInfo); uni.setStorageSync('Access-Token', res.data.access_token); uni.setStorageSync('refresh_token',res.data.refresh_token); setTimeout(()=>{ resolve({ userInfo:res.data.userInfo, access_token:res.data.access_token, }) },3000) }) }
第三步 给前端的请求头添加access_token
uni.addInterceptor('request',{ // 在发送请求之前做一些处理 invoke(requestConfig){ // 添加请求头、身份验证等 requestConfig.header['Access-Token'] = uni.getStorageSync('Access-Token') // requestConfig.header['Refresh-Token'] = uni.getStorageSync('refresh_token') // 添加token到请求头 return requestConfig }, success(response){ switch (response.data.errno) { case 401: uni.setStorageSync("Access-Token",'') uni.setStorageSync("refresh_token",'') uni.setStorageSync('isLogin',false) uni.navigateTo({ url:'/pages/login/login/index' }) break case 403: Utils.reloadMessage() break } }, fail(error){ console.error('请求失败',error) } })
第四步 后端登录效验
这里采用了flask_login 登录模块,flask_login 这里我们从新定义 登录装饰器 login_required_token,用于效验利用token进行的登录。
def login_required_token(func): @wraps(func) def decorated_view(*args, **kwargs): # 用户登录状态 if not current_user.is_authenticated: if g.refresh: g.refresh = False return jsonify(base.get_result(403, 'token过期重建', {'token': 'refresh'})) return current_app.login_manager.unauthorized() if callable(getattr(current_app, "ensure_sync", None)): return current_app.ensure_sync(func)(*args, **kwargs) return func(*args, **kwargs) return decorated_view
第五步 token过期,前端向后端发送 front_refresh_token
token 过期后,g.refresh 会设置为 True
g.refresh 为True是想后前端反馈状态码403
前端接受到403 的反馈后,执行Utils.reloadMessage函数获取新的token
public static reloadMessage = ()=>{ let header = { 'Content-Type': 'application/json; charset=UTF-8', "Access-Control-Allow-Origin": "*", "Refresh-Token":uni.getStorageSync('refresh_token') } Utils.request(api.getNewToken,{},"POST",header).then(res=>{ uni.setStorageSync('Access-Token', res.data.access_token) }) }
第六步 前端向后端请求,获取新的Access-Token
https://自己的服务器地址/getNewToken
def check_refresh_token(func): """refresh_token""" @wraps(func) def inner(*args, **kwargs): refresh_data = get_token_key(get_key(request.headers.get('Refresh-Token'))) if refresh_data is None: rd.del_redis_data('front_refresh_token' + '_' + str(refresh_data.get('user_id'))) return jsonify(base.get_result(401,'系统退出2')) try: refresh_token = rd.get_redis_data('front_refresh_token' + '_' + str(refresh_data.get('user_id'))) refresh_token = get_token_key(refresh_token) except Exception as e: return jsonify(base.get_result(401,'系统退出3')) else: # 生成新的token,保存在g中 user = User.query.get(refresh_token.get('user_id')) last_login_time=int(time.time()) front_access_token, front_refresh_token = user.get_id(life_time=10, last_login_time=last_login_time, need_refresh_token=True) rd.set_redis_data('front_access_token'+'_'+str(user.id),front_access_token) g.new_token = front_access_token return func(*args, **kwargs) return inner class GetNewToken(Resource): """申请新的Token""" @check_refresh_token def post(self): return jsonify(get_result(0,'申请成功',{'access_token':g.new_token})) api.add_resource(GetNewToken, '/getNewToken')
decorators.py 完整代码
from functools import wraps from flask import session,redirect,url_for,request,jsonify,Markup,flash,abort,current_app,g from db.models import User,Guest from flask_login import LoginManager,login_user,logout_user,login_required,current_user,UserMixin from apps.model import Token import lib.base as base import json import apps.redis as rd import time #访问控制 # 实例化LoginManager类 login_manager = LoginManager() # 提示信息 login_manager.login_message = "请先登录" # 提示样式 login_manager.login_message_category = 'danger' login_manager.anonymous_user = Guest class bcolors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' def load_token(tok): """通过loads()方法来解析浏览器发送过来的token,从而进行初步的验证""" try: api_key = Token.validate_token(tok) user = User.query.get(api_key.get('user_id')) except Exception as e: return None if user: user.token = tok else: print('用户不存在,令牌不正确!') return None return user def get_token_key(key): """解密token到数据""" try: key = key.replace('Basic ', '', 1) key = key.encode('utf-8') key = Token.validate_token(key) except Exception as e: # except TypeError: return None return key def get_key(value): if value == '': value = None return value @login_manager.unauthorized_handler def unauthorized(): """系统超时退出""" return jsonify(base.get_result(401,'系统退出2')) @login_manager.request_loader def load_user_from_request(request): """效验请求中的token""" if request.method == "POST": g.refresh = False """# 请求正文中携带token时,执行如下代码 捕捉请求参数中的 'api_key' 变量,这个api_key和数据库中存储的api_key比较,确认登录身份。 """ api_access_key = request.args.get('api_key') if api_access_key: user = User.query.filter_by(api_key=api_access_key).first() if user: return user """请求头中携带token,执行如下代码 捕捉请求参数中的 'X-Token' 变量,这个Access-Token通过服务端解码后,获得用户ID和Access-Token有效期,确认登录身份。 """ # 取请求头中的access-token api_access_key = get_key(request.headers.get('Access-Token')) # 对比前端的token和服务器中保存的token是否一致,判断用户是否已经登录了 if api_access_key: login_data = get_token_key(api_access_key) # 判断token是否过期,没有过期返回用户对象。 if login_data and api_access_key == get_key( rd.get_redis_data('front_access_token' + '_' + str(login_data.get('user_id')))): g.user = User.query.get(login_data.get('user_id')) return g.user # 判断如果token已经过期了,但是front_refresh_token没有过期,执行如下代码? else: g.refresh = True return None return None def confirm_required(func): def decorated_function(*args, **kwargs): if current_user.state.name == 'inactive': logout_user() message = Markup( '账户未激活') flash(message, 'warning') return redirect(url_for('wx.login')) return func(*args, **kwargs) return decorated_function def permission_required(permission_name): def decorator(func): @wraps(func) def decorated_function(*args, **kwargs): if not current_user.can(permission_name): abort(403) return func(*args, **kwargs) return decorated_function return decorator def admin_required(func): return permission_required('ADMINISTER')(func) def login_required_token(func): @wraps(func) def decorated_view(*args, **kwargs): # 用户登录状态 if not current_user.is_authenticated: if g.refresh: g.refresh = False return jsonify(base.get_result(403, 'token过期重建', {'token': 'refresh'})) return current_app.login_manager.unauthorized() if callable(getattr(current_app, "ensure_sync", None)): return current_app.ensure_sync(func)(*args, **kwargs) return func(*args, **kwargs) return decorated_view def check_refresh_token(func): """refresh_token""" @wraps(func) def inner(*args, **kwargs): refresh_data = get_token_key(get_key(request.headers.get('Refresh-Token'))) if refresh_data is None: rd.del_redis_data('front_refresh_token' + '_' + str(refresh_data.get('user_id'))) return jsonify(base.get_result(401,'系统退出2')) try: refresh_token = rd.get_redis_data('front_refresh_token' + '_' + str(refresh_data.get('user_id'))) refresh_token = get_token_key(refresh_token) except Exception as e: return jsonify(base.get_result(401,'系统退出3')) else: # 生成新的token,保存在g中 user = User.query.get(refresh_token.get('user_id')) last_login_time=int(time.time()) front_access_token, front_refresh_token = user.get_id(life_time=10, last_login_time=last_login_time, need_refresh_token=True) rd.set_redis_data('front_access_token'+'_'+str(user.id),front_access_token) g.new_token = front_access_token return func(*args, **kwargs) return inner
以上就是用Flask实现token登录校验的解决方案的详细内容,更多关于Flask实现token登录校验的资料请关注脚本之家其它相关文章!