python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Flask实现token登录校验

用Flask实现token登录校验的解决方案

作者:zhouqi1208

网站、小程序、APP 是否已经登录所代表的状态,代表一个概念是登录态, 我们常用的登录态验证方式有cookie,session,token,token提供了另外一种不需要缓存账户和密码的登录状态验证方式,本文给大家介绍了用Flask实现token登录校验的解决方案,需要的朋友可以参考下

常用的登录方式

网站、小程序、APP 是否已经登录所代表的状态,代表一个概念是登录态。

我们常用的登录态验证方式主要是如下3种 cookie,session,token

cookie和session 采用的是缓存机制,是需要在浏览器端或服务器端,储存用户登录状态。

两者的特点,cookie 采用的浏览器缓存 相对于 session 采用的服务器缓存机制安全较差,但后者很怕用户过多给服务器代理过大的压力。

token 提供了另外一种不需要缓存账户和密码的登录状态验证方式。首次登录后,服务器会给客户端发送一个Token,这个Token相当于门禁卡,每次访问服务器的时候携带这个Token,服务器端就可以验证是谁登录了。

但为了防止Token被盗用,我们会把Token的有效期设置的很短。Token过期后,向服务器再申请新的Token.这样就可以更加安全的使用Token方式进行登录了。

用Flask实现TOKEN登录的解决方案

环境准备

python flask模板

flask-login 登录验证模板

Redis 缓存模板

登录验证机制

通过解密TOKEN获得有效的登录用户相关信息,并把登录信息保存在服务器端的Redis内。

这样,确保即使别人盗取了你的Token.如果和服务器Redis中保存的Token不一致。同样系统会判断登录不成功。

第一步 用户登录

  auth.py

 通过 https://自己的服务器地址/login 登录成功,并返回 access_token和refresh_token到前端。

from flask_restful import Api,Resource
from db.models import *
 
 
admin_login_parser = reqparse.RequestParser()
admin_login_parser.add_argument("username", type=str, location='values', help='输入管理员用户名')
admin_login_parser.add_argument("password", type=str, location='values', help='输入管理员密码')
 
 
class Getlogin(Resource):
"""系统登录"""
 
    def post(self):
        args = admin_login_parser.parse_args()
 
        admin = User.query.filter(User.username == args.username).first()
 
        front_access_token, front_refresh_token = admin.get_id(life_time=7200, last_login_time=last_login_time, need_refresh_token=True)
 
         rd.set_redis_data('front_access_token'+'_'+str(admin.id), front_access_token)
            rd.set_redis_data('front_refresh_token'+'_'+str(admin.id),front_refresh_token)
 
        return jsonify(get_result(0, '登录成功', {'access_token': front_access_token, 'refresh_token': front_refresh_token, 'userInfo': userInfo}))

第二步 前端保存 token 每一个新的请求中(这里用的是Uniapp前端),把access_token和refresh_token 保存在前端。

/**
 * 调用微信登录
 */
function login() {
   Utils.request(api.AuthLogin,{
                username:'你的登录名',
                password:'你的密码'
            },'POST').then(res=>{
               uni.setStorageSync('isLogin',true)
               uni.setStorageSync('userInfo', res.data.userInfo);
               uni.setStorageSync('Access-Token', res.data.access_token);
               uni.setStorageSync('refresh_token',res.data.refresh_token);
 
                setTimeout(()=>{
                    resolve({
                        userInfo:res.data.userInfo,
                        access_token:res.data.access_token,
                    })
                },3000)
            })
}

第三步 给前端的请求头添加access_token

uni.addInterceptor('request',{
    // 在发送请求之前做一些处理
    invoke(requestConfig){
        // 添加请求头、身份验证等
        requestConfig.header['Access-Token'] = uni.getStorageSync('Access-Token')
        // requestConfig.header['Refresh-Token'] = uni.getStorageSync('refresh_token')
 
        // 添加token到请求头
 
 
        return requestConfig
    },
    success(response){
        switch (response.data.errno) {
            case 401:
                uni.setStorageSync("Access-Token",'')
                uni.setStorageSync("refresh_token",'')
                uni.setStorageSync('isLogin',false)
                uni.navigateTo({
                    url:'/pages/login/login/index'
                })
                break
            case 403:
                Utils.reloadMessage()
                break
        }
    },
    fail(error){
        console.error('请求失败',error)
    }
})

第四步 后端登录效验 

 这里采用了flask_login 登录模块,flask_login 这里我们从新定义 登录装饰器 login_required_token,用于效验利用token进行的登录。

def login_required_token(func):
    @wraps(func)
    def decorated_view(*args, **kwargs):
        # 用户登录状态
        if not current_user.is_authenticated:
            if g.refresh:
                g.refresh = False
                return jsonify(base.get_result(403, 'token过期重建', {'token': 'refresh'}))
 
            return current_app.login_manager.unauthorized()
        if callable(getattr(current_app, "ensure_sync", None)):
 
            return current_app.ensure_sync(func)(*args, **kwargs)
 
        return func(*args, **kwargs)
 
    return decorated_view

第五步 token过期,前端向后端发送 front_refresh_token

token 过期后,g.refresh 会设置为 True

g.refresh 为True是想后前端反馈状态码403

前端接受到403 的反馈后,执行Utils.reloadMessage函数获取新的token
public static reloadMessage = ()=>{
      let header = {
          'Content-Type': 'application/json; charset=UTF-8',
          "Access-Control-Allow-Origin": "*",
          "Refresh-Token":uni.getStorageSync('refresh_token')
      }
      Utils.request(api.getNewToken,{},"POST",header).then(res=>{
          uni.setStorageSync('Access-Token', res.data.access_token)
      })
  }

第六步 前端向后端请求,获取新的Access-Token

https://自己的服务器地址/getNewToken

def check_refresh_token(func):
    """refresh_token"""
    @wraps(func)
    def inner(*args, **kwargs):
 
        refresh_data = get_token_key(get_key(request.headers.get('Refresh-Token')))
 
 
 
        if refresh_data is None:
            rd.del_redis_data('front_refresh_token' + '_' + str(refresh_data.get('user_id')))
            return jsonify(base.get_result(401,'系统退出2'))
 
        try:
            refresh_token = rd.get_redis_data('front_refresh_token' + '_' + str(refresh_data.get('user_id')))
            refresh_token = get_token_key(refresh_token)
 
        except Exception as e:
            return jsonify(base.get_result(401,'系统退出3'))
 
        else:
            # 生成新的token,保存在g中
            user = User.query.get(refresh_token.get('user_id'))
            last_login_time=int(time.time())
            front_access_token, front_refresh_token = user.get_id(life_time=10, last_login_time=last_login_time,
                                                                   need_refresh_token=True)
 
            rd.set_redis_data('front_access_token'+'_'+str(user.id),front_access_token)
            g.new_token = front_access_token
 
            return func(*args, **kwargs)
 
    return inner
 
 
class GetNewToken(Resource):
    """申请新的Token"""
 
    @check_refresh_token
    def post(self):
        return jsonify(get_result(0,'申请成功',{'access_token':g.new_token}))
 
 
api.add_resource(GetNewToken, '/getNewToken')

decorators.py 完整代码

from functools import wraps
from flask import session,redirect,url_for,request,jsonify,Markup,flash,abort,current_app,g
from db.models import User,Guest
from flask_login import LoginManager,login_user,logout_user,login_required,current_user,UserMixin
 
 
from apps.model import Token
import lib.base as base
import json
import apps.redis as rd
import time
 
#访问控制
# 实例化LoginManager类
login_manager = LoginManager()
 
# 提示信息
login_manager.login_message = "请先登录"
# 提示样式
login_manager.login_message_category = 'danger'
 
login_manager.anonymous_user = Guest
 
class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'
 
def load_token(tok):
    """通过loads()方法来解析浏览器发送过来的token,从而进行初步的验证"""
    try:
        api_key = Token.validate_token(tok)
        user = User.query.get(api_key.get('user_id'))
    except Exception as e:
        return None
 
    if user:
        user.token = tok
 
    else:
        print('用户不存在,令牌不正确!')
        return None
    return user
 
 
def get_token_key(key):
    """解密token到数据"""
    try:
        key = key.replace('Basic ', '', 1)
        key = key.encode('utf-8')
        key = Token.validate_token(key)
 
    except Exception as e:
    # except TypeError:
        return None
    return key
 
 
def get_key(value):
    if value == '':
        value = None
    return value
 
 
@login_manager.unauthorized_handler
def unauthorized():
    """系统超时退出"""
    return jsonify(base.get_result(401,'系统退出2'))
 
 
@login_manager.request_loader
def load_user_from_request(request):
    """效验请求中的token"""
    if request.method == "POST":
        g.refresh = False
        """# 请求正文中携带token时,执行如下代码
          捕捉请求参数中的 'api_key' 变量,这个api_key和数据库中存储的api_key比较,确认登录身份。 
        """
        api_access_key = request.args.get('api_key')
        if api_access_key:
            user = User.query.filter_by(api_key=api_access_key).first()
            if user:
                return user
 
        """请求头中携带token,执行如下代码
           捕捉请求参数中的 'X-Token' 变量,这个Access-Token通过服务端解码后,获得用户ID和Access-Token有效期,确认登录身份。
        """
        # 取请求头中的access-token
        api_access_key = get_key(request.headers.get('Access-Token'))
 
 
        # 对比前端的token和服务器中保存的token是否一致,判断用户是否已经登录了
        if api_access_key:
 
            login_data = get_token_key(api_access_key)
 
            # 判断token是否过期,没有过期返回用户对象。
            if login_data and api_access_key == get_key(
                    rd.get_redis_data('front_access_token' + '_' + str(login_data.get('user_id')))):
                g.user = User.query.get(login_data.get('user_id'))
 
                return g.user
 
            # 判断如果token已经过期了,但是front_refresh_token没有过期,执行如下代码?
            else:
                g.refresh = True
            return None
 
        return None
 
 
def confirm_required(func):
    def decorated_function(*args, **kwargs):
        if current_user.state.name == 'inactive':
            logout_user()
            message = Markup(
                '账户未激活')
            flash(message, 'warning')
            return redirect(url_for('wx.login'))
        return func(*args, **kwargs)
    return decorated_function
 
 
def permission_required(permission_name):
    def decorator(func):
        @wraps(func)
        def decorated_function(*args, **kwargs):
            if not current_user.can(permission_name):
                abort(403)
            return func(*args, **kwargs)
        return decorated_function
    return decorator
 
 
def admin_required(func):
    return permission_required('ADMINISTER')(func)
 
def login_required_token(func):
    @wraps(func)
    def decorated_view(*args, **kwargs):
        # 用户登录状态
        if not current_user.is_authenticated:
            if g.refresh:
                g.refresh = False
                return jsonify(base.get_result(403, 'token过期重建', {'token': 'refresh'}))
 
            return current_app.login_manager.unauthorized()
 
        if callable(getattr(current_app, "ensure_sync", None)):
 
            return current_app.ensure_sync(func)(*args, **kwargs)
 
 
        return func(*args, **kwargs)
 
    return decorated_view
 
 
def check_refresh_token(func):
    """refresh_token"""
    @wraps(func)
    def inner(*args, **kwargs):
 
        refresh_data = get_token_key(get_key(request.headers.get('Refresh-Token')))
 
 
 
        if refresh_data is None:
            rd.del_redis_data('front_refresh_token' + '_' + str(refresh_data.get('user_id')))
            return jsonify(base.get_result(401,'系统退出2'))
 
        try:
            refresh_token = rd.get_redis_data('front_refresh_token' + '_' + str(refresh_data.get('user_id')))
            refresh_token = get_token_key(refresh_token)
 
        except Exception as e:
            return jsonify(base.get_result(401,'系统退出3'))
 
        else:
            # 生成新的token,保存在g中
            user = User.query.get(refresh_token.get('user_id'))
            last_login_time=int(time.time())
            front_access_token, front_refresh_token = user.get_id(life_time=10, last_login_time=last_login_time,
                                                                   need_refresh_token=True)
 
            rd.set_redis_data('front_access_token'+'_'+str(user.id),front_access_token)
            g.new_token = front_access_token
 
            return func(*args, **kwargs)
 
    return inner

以上就是用Flask实现token登录校验的解决方案的详细内容,更多关于Flask实现token登录校验的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文