二手产品管理网站(2)
本文最后更新于185 天前,其中的信息可能已经过时,如有错误请发送邮件到big_fw@foxmail.com

用户信息模块

项目已经完成初始化。

因为其他模块都需要有用户信息关联,因此需要先完成用户信息模块功能。

实现流程的开发思路

已完成:登录/注册 安全方案:JWT+REDIS

已完成用户管理->用户的增删改查

已完成:更新用户信息的前端页面

已完成:判断查询参数是否为空

使用JWT+REDIS 作为单点登录的实现方案

可参考:【Python_Flask系列之_JWT+Redis的token管理(包含失效)】

实现方案:

  • 注册:将密码通过MD5加密后存储进数据库
    • 登陆时用密码加密后的值进行验证
  • 单点登录:将脱敏后的用户信息使用JWT生成token
    • 将token放进REDIS,将token返回前端
    • 前端每次访问时在请求头判断有无token,无token则跳转到登录页面
    • 后端获取到token,将token解析为哦用户信息,确认用户登录状态

优点:token能够让用户登录信息更安全 / 管理员可以通过redis控制用户登录状态

代码实现:

redisUtil.py

# 封装REDIS工具:REDIS存储当前的登录用户信息
import redis


class MyRedis():
    def __init__(self, host="127.0.0.1", port=6379, password='admin123'):
        pools = redis.ConnectionPool(host=host, port=port, password=password, decode_responses=True)
        self.__redis = redis.StrictRedis(connection_pool=pools)

    # 在 Redis 中设置值,默认,不存在则创建,存在则修改。
    def set(self, key, value):
        return self.__redis.set(key, value)

    # 取出键对应的值
    def get(self, key):
        if self.__redis.exists(key):
            return self.__redis.get(key)
        else:
            return None

    # 获取有序集合中所有元素
    def zrange(self, key):
        if self.__redis.exists(key):
            return self.__redis.zrange(key, 0, -1, desc=False, withscores=True)
        else:
            return None

    # 获取值的索引号
    def zrank(self, key):
        if self.__redis.exists(key):
            return self.__redis.zrank(key)
        else:
            return None

    # 按照分数范围获取name对应的有序集合的元素
    def zrangebyscore(self, key):
        if self.__redis.exists(key):
            return self.__redis.zrangebyscore(key, 0, -1, withscores=True)
        else:
            return None

    # 按照分数范围获取有序集合的元素并排序(默认从大到小排序)
    def zrevrange(self, key):
        if self.__redis.exists(key):
            return self.__redis.zrevrange(key, 0, -1)
        else:
            return None

    # 删除键值对
    def delete(self, key):
        if self.__redis.exists(key):
            return self.__redis.delete(key)
        else:
            return None

    # 获取key对应hash的所有键值
    def hgetall(self, key):
        if self.__redis.exists(key):
            return self.__redis.hgetall(key)
        else:
            return None

    # 统计返回的总个数
    def rcount(self, re, key):
        if self.__redis.exists(key):
            sum = 0
            for i in re:
                print(i)
                sum += 1
            print("总共有%s个" % (sum))

app.py

# app.py
from flask import Flask, request, jsonify
from flask_cors import CORS
from api.AccountApi import account_api
from api.ProductApi import product_api
from dao.userDAO import UserDAO
from utils.authUtil import jwt_and_redis_required
from utils.redisUtil import MyRedis
from utils.responseUtil import responseOK, responseError, ErrorCode, BAD_REQUEST
from flask_jwt_extended import (
    JWTManager, jwt_required, create_access_token,
    create_refresh_token,
    get_jwt_identity, get_jwt
)

app = Flask(__name__)
# 跨域
CORS(app, origins='http://localhost:5173')
# redis
redist = MyRedis()
# JWT
app.config['JWT_SECRET_KEY'] = 'preOwnedMarketplace'
app.config["JWT_ACCESS_TOKEN_EXPIRES"] = 6660
jwt = JWTManager(app)
# 接口
app.register_blueprint(account_api, url_prefix='/api/ac')
app.register_blueprint(product_api, url_prefix='/api/pro')


# 注册功能实现
@app.route('/api/regis', methods=['POST'])
def toregis():
    print("注册用户!")
    user_data = request.json  # 获取 JSON 数据
    try:
        # @TODO 可以提供email/phone 提供注册
        # @TODO 校验用户名是否存在
        UserDAO.create_user(user_data)
        user = UserDAO.read_user_by_username(user_data['username'])
    except Exception as e:
        print(f'注册失败!用户: {user_data}, 错误: {str(e)}')
        return responseError(BAD_REQUEST)
    return responseOK('用户注册成功', user)  # 返回 JSON 响应


# 登录功能实现
@app.route('/api/login', methods=['POST'])
def tologin():
    print("登录用户!")
    user_data = request.json  # 获取 JSON 数据
    userid = UserDAO.login_user(user_data)
    print('登录用户:' + user_data['username'])
    print('登录id:' + str(userid))
    if userid == 0:
        return responseError(ErrorCode('400', '用户信息不存在'))
    user = UserDAO.read_user_by_id(userid)
    access_token = create_access_token(identity=user)
    redist.set(user.get('username'),access_token)
    return responseOK(access_token)  # 返回 JSON 响应


@app.route('/api/cur', methods=['GET'])
@jwt_and_redis_required()
def getCuruser():
    current_user = get_jwt_identity()
    print(f'username:' + current_user['username'])
    return responseOK(current_user)

if __name__ == "__main__":
    app.run(debug=True)
    # debug==True是为了方便修改代码之后,能够不重启项目就能够更新,否则,每次更改代码都需要重新启动项目
    # 其他参数的设置可以查阅文档,这里越简单越好

UserDAO.py

import hashlib

from pageModel.User import UserDisplay
from utils.dbUtil import db


class UserDAO:
    @staticmethod
    def create_user(user_data):
        query = "INSERT INTO Users (username, password, email, phone) VALUES (%s, %s, %s, %s)"
        hashed_password = hashlib.md5(user_data['password'].encode()).hexdigest()
        params = (
            user_data['username'],
            hashed_password,
            user_data.get('email', None),
            user_data.get('phone', None),
        )
        db.execute_update(query, params)

    @staticmethod
    def read_user_by_id(user_id):
        query = "SELECT user_id, username, email, phone, role, status FROM Users WHERE user_id = %s"
        params = (user_id,)
        result = db.execute_query(query, params)
        if result:
            user_data = result[0]
            return UserDisplay(
                user_data['user_id'],
                user_data['username'],
                user_data.get('email'),
                user_data.get('phone'),
                user_data.get('role'),
                user_data.get('status')
            ).to_dict()
        return None

    @staticmethod
    def read_user_by_username(username):
        query = "SELECT user_id, username, email, phone, role, status FROM Users WHERE status!=0 and username = %s"
        params = (username,)
        result = db.execute_query(query, params)
        if result:
            user_data = result[0]
            return UserDisplay(
                user_data['user_id'],
                user_data['username'],
                user_data.get('email'),
                user_data.get('phone'),
                user_data.get('role'),
                user_data.get('status')
            ).to_dict()
        return None

    @staticmethod
    def update_user(user_id, update_data):
        set_clause = ", ".join(f"{key} = %s" for key in update_data.keys())
        query = f"UPDATE Users SET {set_clause}, updated_at = NOW() WHERE user_id = %s"
        params = tuple(update_data.values()) + (user_id,)
        db.execute_update(query, params)

    @staticmethod
    def delete_user(user_id):
        # 逻辑删除
        query = "UPDATE Users SET status=0 WHERE user_id = %s"
        params = (user_id,)
        db.execute_update(query, params)

    # 根据用户名和密码匹配
    @staticmethod
    def login_user(user_data):
        hashed_password = hashlib.md5(user_data['password'].encode()).hexdigest()
        query = "SELECT user_id FROM Users WHERE username = %s AND password = %s"
        params = (
            user_data['username'],
            hashed_password
        )
        result = db.execute_query(query, params)
        if result:
            return result[0]['user_id']
        return 0

    @staticmethod
    def select_all_user():
        query = "SELECT user_id, username, email, phone, role, status FROM Users WHERE status != 0"
        result = db.execute_query(query)
        if result:
            user_data = result
            user_res = []
            for user in user_data:
                user_res.append(UserDisplay(
                    user['user_id'],
                    user['username'],
                    user.get('email'),
                    user.get('phone'),
                    user.get('role'),
                    user.get('status')
                ).to_dict())
            return user_res
        return None

    # 修改后的 query_user 方法
    @staticmethod
    def query_user(userid=None, username=None, role=None):
        query = "SELECT * FROM Users WHERE 1=1"
        params = []
        if userid:
            query += " AND user_id = %s"
            params.append(userid)
        if username:
            query += " AND username = %s"
            params.append(username)
        if role:
            query += " AND role = %s"
            params.append(role)
        result = db.execute_query(query, params)
        if result:
            user_data = result
            user_res = []
            for user in user_data:
                user_res.append(UserDisplay(
                    user['user_id'],
                    user['username'],
                    user.get('email'),
                    user.get('phone'),
                    user.get('role'),
                    user.get('status')
                ).to_dict())
            return user_res

        return None

类似于responseOK…都是作为封装的工具类,返回给前端数据的一种规范化处理,可以参考源代码中关于util文件夹部分。

前端

  • 封装axios,配置请求/响应拦截器

request.ts

// 引入axios
import axios from 'axios'
import { ElMessage } from 'element-plus'

//1. 创建新的axios实例,
const instance = axios.create()

// 请求拦截器(请求拦截器作用就是用于发送接口请求前需要做的操作)
instance.interceptors.request.use((config) => {
    //发请求前做的一些处理,数据转化,配置请求头,设置token,设置loading等,根据需求去添加
    const token = window.sessionStorage.getItem('token')
    if (token) { // 判断是否存在token,如果存在的话,则每个http header都加上token
        config.headers.Authorization = "Bearer "+`${token}` //请求头加上token
    }
    return config
})

// 响应拦截器(响应拦截器作用就是接收到响应数据后需要做的一些操作,比如判断接口状态)
instance.interceptors.response.use((res) => {
    // 接收到响应数据并成功后的一些共有的处理,关闭loading等
    // res为回调函数,也就是接口请求成功后返回的数据
    // res.data.meta.status为当前接口的状态
    let status = res.data.code
    // switch函数判断
    // 响应拦截器执行完后需要将接口返回的数据也就是res回调函数,return给到之后的请求方法
    return res.data
})

// 将新创建的一个axios导出
export default instance

userLogin.vue  >   登录按钮绑定的登录功能

<script>
import { useRouter } from 'vue-router';
import { reactive, ref } from 'vue';
import instance from '@/request/request';
import { ElMessage } from 'element-plus'

export default {
  name: "UserLogin",
  setup() {
    const userLoginForm = reactive({
      username: "",
      password: ""
    });
    const router = useRouter();
    const error = ref('');

    async function userLogin() {
      try {
        console.log(111)
        const response = await instance.post('/api/login', userLoginForm);
        if (response.code === 200) {
          window.sessionStorage.setItem('token', response.data)
        } else {
          error.value = response.message;
          ElMessage({
            message: '登录失败.请检查登录信息',
            type: 'error',
          })
        }
      } catch (e) {
        error.value = '登录失败,请重试';
      }
      try {
        const response = await instance.get('/api/cur');
        if (response.code === 200) {
          await router.push('/admin');
          ElMessage({
            message: '登录成功.欢迎' + response.data.username,
            type: 'success',
          })
        } else {
          error.value = response.message;
          ElMessage({
            message: '登录失败.请检查登录信息',
            type: 'error',
          })
        }
      } catch (e) {
        error.value = '登录失败,请重试';
      }
    }

    return {
      userLoginForm,
      error,
      userLogin
    };
  }
}
</script>
至此,你已经可以在页面上添加一个简单的登录按钮,绑定登录的功能,进行登录的测试。
如有疑惑,在下方留下您的邮箱和相关问题,本站将尽快为您提供解决方案。

源码地址

请返回第一章底部:二手产品管理网站(1)-LANJ – 个人博客

文末附加内容
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇