跳转至

数据库 DB 设计#

1. 数据库表结构设计#

数据库包含以下表项:用户信息表收集信息表题目信息表选项信息表答案信息表提交信息表内容信息表

1.1 用户信息表#

用户信息表项代码设计如下。

Bases: db.Model, UserMixin

用户信息表。

记录已注册用户的相关信息。

Attributes:

Name Type Description
id db.Interger

主键,自增

name db.String

用户昵称(不可为空)

username db.String

用户名(不可为空,不可重复)

password_hash db.String

密码散列值(不可为空)

userpath db.String

用户空间路径(不可为空,不可重复)

email db.String

用户邮箱(不可为空)

authorization_code db.String

邮箱授权码

Source code in Flask\models.py
class User(db.Model, UserMixin):
    """ 用户信息表。

    记录已注册用户的相关信息。

    Attributes:
        id (db.Interger): 主键,自增
        name (db.String): 用户昵称(不可为空)
        username (db.String): 用户名(不可为空,不可重复)
        password_hash (db.String): 密码散列值(不可为空)
        userpath (db.String): 用户空间路径(不可为空,不可重复)
        email (db.String): 用户邮箱(不可为空)
        authorization_code (db.String): 邮箱授权码
    """
    id = db.Column(db.Integer, primary_key=True)  # 主键
    name = db.Column(db.String(30), nullable=False)  # 用户昵称
    username = db.Column(db.String(30), nullable=False, unique=True)  # 用户名
    password_hash = db.Column(db.String(128), nullable=False)  # 密码散列值
    userpath = db.Column(db.String(50), nullable=False, unique=True)  # 用户空间路径
    email = db.Column(db.String(30), nullable=False)  # 用户邮箱
    authorization_code = db.Column(db.String(30))  # 邮箱授权码

    def set_password(self, password: str) -> None:
        """设置密码

        Args:
            password (str): 密码(明文)
        """
        self.password_hash = generate_password_hash(
            password)  # 根据用户输入的密码生成密码散列值

    def validate_password(self, password: str) -> bool:
        """验证密码

        Args:
            password (str): 密码(明文)

        Returns:
            布尔值,表示密码是否正确
        """
        return check_password_hash(self.password_hash, password)

    def set_userpath(self) -> None:
        """设置用户空间路径"""

        # 路径的前若干位为用户名和 user 标识,后面用随机字符串补齐,总长度 20 位。
        self.userpath = self.username + 'user' + ''.join(
            random.sample(string.ascii_letters + string.digits,
                          20 - len(self.username) - len('user'))
        )

    def set_email(self, email: str) -> None:
        """设置用户邮箱

        Args:
            email (str): 邮箱
        """
        self.email = email

    def email_authentication(self, user_email: str = email,
                             user_pwd: str = authorization_code,
                             host: str = 'smtp.sina.com'):
        """邮箱认证

        Args:
            user_email: 用户邮箱
            user_pwd: 邮箱授权码
            host: 发送邮件服务器地址
        """
        return yagmail.SMTP(user=user_email, password=user_pwd, host=host)

    def send_email(self, to_email, email_title: str, email_message: str) -> bool:
        """发送邮件,可以单发也可以群发,取决于传入参数 to_email 的类型

        Args:
            to_email (str | list): 目标邮箱地址,若为列表则代表群发
            email_title (str): 邮件标题
            email_message (str): 邮件正文,可以使用 HTML 格式的字符串

        Returns:
            布尔值,表示是否发送成功
        """
        server = "smtp." + self.email.split('@')[1]
        if self.authorization_code is None:
            print("没有邮箱授权码!")
            return False
        yag = yagmail.SMTP(user=self.email, password=self.authorization_code, host=server)
        if yag is None:
            print("yag is None!")
            return False
        if type(to_email) == "str":  # 单发
            if re.match("^.+\\@(\\[?)[a-zA-Z0-9\\-\\.]+\\.([a-zA-Z]{2,3}|[0-9]{1,3})(\\]?)$", to_email) is None:
                print("目标邮箱地址错误!")
                return False
            try:
                yag.send(
                    to=to_email,
                    subject=email_title,
                    contents=email_message
                )
            except smtplib.SMTPAuthenticationError:
                print("授权码错误!")
            return True
        else:
            for email_addr in to_email:  # 群发
                if re.match("^.+\\@(\\[?)[a-zA-Z0-9\\-\\.]+\\.([a-zA-Z]{2,3}|[0-9]{1,3})(\\]?)$", email_addr) is None:
                    print("目标邮箱地址错误!")
                    return False
                try:
                    yag.send(
                        to=email_addr,
                        subject=email_title,
                        contents=email_message
                    )
                except smtplib.SMTPAuthenticationError:
                    print("授权码错误!")

            return True

    def sub_func(self, email_list, email_title, email_message):
        self.send_email(
            to_email=email_list,
            email_title=email_title,
            email_message=email_message
        )
        print("已发送")

email_authentication(user_email=email, user_pwd=authorization_code, host='smtp.sina.com') #

邮箱认证

Parameters:

Name Type Description Default
user_email str

用户邮箱

email
user_pwd str

邮箱授权码

authorization_code
host str

发送邮件服务器地址

'smtp.sina.com'
Source code in Flask\models.py
def email_authentication(self, user_email: str = email,
                         user_pwd: str = authorization_code,
                         host: str = 'smtp.sina.com'):
    """邮箱认证

    Args:
        user_email: 用户邮箱
        user_pwd: 邮箱授权码
        host: 发送邮件服务器地址
    """
    return yagmail.SMTP(user=user_email, password=user_pwd, host=host)

send_email(to_email, email_title, email_message) #

发送邮件,可以单发也可以群发,取决于传入参数 to_email 的类型

Parameters:

Name Type Description Default
to_email str | list

目标邮箱地址,若为列表则代表群发

required
email_title str

邮件标题

required
email_message str

邮件正文,可以使用 HTML 格式的字符串

required

Returns:

Type Description
bool

布尔值,表示是否发送成功

Source code in Flask\models.py
def send_email(self, to_email, email_title: str, email_message: str) -> bool:
    """发送邮件,可以单发也可以群发,取决于传入参数 to_email 的类型

    Args:
        to_email (str | list): 目标邮箱地址,若为列表则代表群发
        email_title (str): 邮件标题
        email_message (str): 邮件正文,可以使用 HTML 格式的字符串

    Returns:
        布尔值,表示是否发送成功
    """
    server = "smtp." + self.email.split('@')[1]
    if self.authorization_code is None:
        print("没有邮箱授权码!")
        return False
    yag = yagmail.SMTP(user=self.email, password=self.authorization_code, host=server)
    if yag is None:
        print("yag is None!")
        return False
    if type(to_email) == "str":  # 单发
        if re.match("^.+\\@(\\[?)[a-zA-Z0-9\\-\\.]+\\.([a-zA-Z]{2,3}|[0-9]{1,3})(\\]?)$", to_email) is None:
            print("目标邮箱地址错误!")
            return False
        try:
            yag.send(
                to=to_email,
                subject=email_title,
                contents=email_message
            )
        except smtplib.SMTPAuthenticationError:
            print("授权码错误!")
        return True
    else:
        for email_addr in to_email:  # 群发
            if re.match("^.+\\@(\\[?)[a-zA-Z0-9\\-\\.]+\\.([a-zA-Z]{2,3}|[0-9]{1,3})(\\]?)$", email_addr) is None:
                print("目标邮箱地址错误!")
                return False
            try:
                yag.send(
                    to=email_addr,
                    subject=email_title,
                    contents=email_message
                )
            except smtplib.SMTPAuthenticationError:
                print("授权码错误!")

        return True

set_email(email) #

设置用户邮箱

Parameters:

Name Type Description Default
email str

邮箱

required
Source code in Flask\models.py
def set_email(self, email: str) -> None:
    """设置用户邮箱

    Args:
        email (str): 邮箱
    """
    self.email = email

set_password(password) #

设置密码

Parameters:

Name Type Description Default
password str

密码(明文)

required
Source code in Flask\models.py
def set_password(self, password: str) -> None:
    """设置密码

    Args:
        password (str): 密码(明文)
    """
    self.password_hash = generate_password_hash(
        password)  # 根据用户输入的密码生成密码散列值

set_userpath() #

设置用户空间路径

Source code in Flask\models.py
def set_userpath(self) -> None:
    """设置用户空间路径"""

    # 路径的前若干位为用户名和 user 标识,后面用随机字符串补齐,总长度 20 位。
    self.userpath = self.username + 'user' + ''.join(
        random.sample(string.ascii_letters + string.digits,
                      20 - len(self.username) - len('user'))
    )

validate_password(password) #

验证密码

Parameters:

Name Type Description Default
password str

密码(明文)

required

Returns:

Type Description
bool

布尔值,表示密码是否正确

Source code in Flask\models.py
def validate_password(self, password: str) -> bool:
    """验证密码

    Args:
        password (str): 密码(明文)

    Returns:
        布尔值,表示密码是否正确
    """
    return check_password_hash(self.password_hash, password)

1.2 收集信息表#

收集信息表项代码设计如下。

Bases: db.Model

收集表。

记录已创建收集的相关信息。

Attributes:

Name Type Description
id db.Integer

主键

creator db.String

创建人员名称(不可为空)

creator_id db.String

创建人员ID(外键:关联user.id;不可为空)

collection_title db.String

收集标题(不可为空)

description db.String

收集描述(不可为空)

start_date db.String

开始时间,自动设置为创建收集的时间(不可为空)

end_date db.String

结束时间(不可为空)

status db.Enum

收集的状态('0' 发布,'1' 暂存,'2' 已结束,'3' 已失效)(不可为空)

namelist_path db.String

应交名单路径

Source code in Flask\models.py
class Collection_info(db.Model):
    """ 收集表。

    记录已创建收集的相关信息。

    Attributes:
        id (db.Integer): 主键
        creator (db.String): 创建人员名称(不可为空)
        creator_id (db.String): 创建人员ID(外键:关联user.id;不可为空)
        collection_title (db.String): 收集标题(不可为空)
        description (db.String): 收集描述(不可为空)
        start_date (db.String): 开始时间,自动设置为创建收集的时间(不可为空)
        end_date (db.String): 结束时间(不可为空)
        status (db.Enum): 收集的状态('0' 发布,'1' 暂存,'2' 已结束,'3' 已失效)(不可为空)
        namelist_path (db.String): 应交名单路径
    """
    # * 收集状态常量定义
    RELEASE, SAVED, FINISHED, OVERDUE = '0', '1', '2', '3'  # ? 发布,暂存,已结束,已失效

    id = db.Column(db.Integer, primary_key=True)  # 主键
    creator = db.Column(db.String(30), nullable=False)  # 创建人员名称
    creator_id = db.Column(db.Integer, db.ForeignKey(
        'user.id', ondelete="CASCADE"), nullable=False)  # 创建人员ID
    collection_title = db.Column(db.String(50), nullable=False)  # 收集名称
    description = db.Column(db.Text, nullable=False)  # 收集描述
    start_date = db.Column(db.DateTime, nullable=False,
                           default=datetime.datetime.now())  # 开始时间
    end_date = db.Column(db.DateTime, nullable=False)  # 收集结束时间
    status = db.Column(db.Enum(RELEASE, SAVED, FINISHED,
                               OVERDUE), nullable=False)  # 当前状态
    collection_path = db.Column(db.String(50))  # 应交名单路径

1.3 题目信息表#

题目信息表项代码设计如下。

Bases: db.Model

题目表。

记录已创建收集的题目相关信息。

Attributes:

Name Type Description
id db.Integer

主键

collection_id db.String

收集id(外键:关联collection_info.id)(不可为空)

qno db.Integer

题目序号(不可为空)

question_type db.Enum

题目类型(不可为空): '0' 上传文件题; '1' 单选; '2' 多选; '3' 姓名题; '4' 学号题; '5' 问卷题(单选); '6' 问卷题(多选)

question_title db.String

问题标题(不可为空)

question_description db.String

问题描述

rename_rule db.String

文件重命名规则

file_path db.String

提交文件路径(不可重复)(文件上传题需设置,其余类型不必)

Source code in Flask\models.py
class Question_info(db.Model):
    """ 题目表。

    记录已创建收集的题目相关信息。

    Attributes:
        id (db.Integer): 主键
        collection_id (db.String): 收集id(外键:关联collection_info.id)(不可为空)
        qno (db.Integer): 题目序号(不可为空)
        question_type (db.Enum): 题目类型(不可为空): '0' 上传文件题; '1' 单选; '2' 多选; '3' 姓名题; '4' 学号题; '5' 问卷题(单选); '6' 问卷题(多选)
        question_title (db.String): 问题标题(不可为空)
        question_description (db.String): 问题描述
        rename_rule (db.String): 文件重命名规则
        file_path (db.String): 提交文件路径(不可重复)(文件上传题需设置,其余类型不必)
    """
    # * 问题类型常量
    FILE_UPLOAD = '0'  # ? 解答题
    SINGLE_CHOICE = '1'  # ?,单选
    MULTI_CHOICE = '2'  # ?多选
    NAME = '3'  # ?姓名
    SNO = '4'  # ?学号
    SINGLE_QUESTIONNAIRE = '5'  # ?问卷题目(单选)
    MULTI_QUESTIONNAIRE = '6'  # ?问卷题目(多选)

    id = db.Column(db.Integer, primary_key=True)  # 主键
    collection_id = db.Column(db.Integer,
                              db.ForeignKey('collection_info.id',
                                            ondelete="CASCADE"),
                              nullable=False)  # 关联收集id
    qno = db.Column(db.Integer, nullable=False)  # 题目序号
    question_type = db.Column(
        db.Enum(FILE_UPLOAD, SINGLE_CHOICE, MULTI_CHOICE, NAME,
                SNO, SINGLE_QUESTIONNAIRE, MULTI_QUESTIONNAIRE),
        nullable=False)  # 题目类型
    question_title = db.Column(db.String(50), nullable=False)  # 问题标题
    question_description = db.Column(db.Text)  # 问题描述
    rename_rule = db.Column(db.String(20))  # 文件重命名规则,其值为题目顺序
    file_path = db.Column(db.String(50), unique=True)  # 提交文件路径

1.4 选项信息表#

选项信息表项代码设计如下。

Bases: db.Model

问卷题选项表

记录问卷题的每一个选项内容。

Attributes:

Name Type Description
id db.Integer

主键

question_id db.Integer

题目id(外键:关联question_info.id)(不可为空)

collection_id db.Integer

收集id(不可为空)

qno db.Integer

题目序号(不可为空)

option_sn db.Integer

选项序号(不可为空)

option_content db.Text

选项内容(不可为空)

Source code in Flask\models.py
class Option_info(db.Model):
    """问卷题选项表

    记录问卷题的每一个选项内容。

    Attributes:
        id (db.Integer): 主键
        question_id (db.Integer): 题目id(外键:关联question_info.id)(不可为空)
        collection_id (db.Integer): 收集id(不可为空)
        qno (db.Integer): 题目序号(不可为空)
        option_sn (db.Integer): 选项序号(不可为空)
        option_content (db.Text): 选项内容(不可为空)
    """
    id = db.Column(db.Integer, primary_key=True)  # 主键
    question_id = db.Column(db.Integer, db.ForeignKey(
        'question_info.id', ondelete="CASCADE"), nullable=False)  # 关联题目id
    collection_id = db.Column(db.Integer, nullable=False)  # 收集id
    qno = db.Column(db.Integer, nullable=False)  # 题目序号
    option_sn = db.Column(db.Integer, nullable=False)  # 选项序号
    option_content = db.Column(db.Text, nullable=False)  # 选项内容

1.5 答案信息表#

答案信息表项代码设计如下。

Bases: db.Model

答案表。

记录单选题和多选题的答案。

Attributes:

Name Type Description
id db.Integer

主键

question_id db.Integer

题目id(外键:关联question_info.id)(不可为空)

collection_id db.Integer

收集id(不可为空)

qno db.Integer

题目序号(不可为空)

answer_option db.String

答案选项(单选题格式为x,多选题格式为x-x-x-……)(不可为空)

Source code in Flask\models.py
class Answer_info(db.Model):
    """ 答案表。

    记录单选题和多选题的答案。

    Attributes:
        id (db.Integer): 主键
        question_id (db.Integer): 题目id(外键:关联question_info.id)(不可为空)
        collection_id (db.Integer): 收集id(不可为空)
        qno (db.Integer): 题目序号(不可为空)
        answer_option (db.String): 答案选项(单选题格式为x,多选题格式为x-x-x-……)(不可为空)
    """
    id = db.Column(db.Integer, primary_key=True)  # 主键
    question_id = db.Column(db.Integer,
                            db.ForeignKey('question_info.id',
                                          ondelete="CASCADE"),
                            nullable=False)  # 关联题目id
    collection_id = db.Column(db.Integer, nullable=False)  # 收集id
    qno = db.Column(db.Integer, nullable=False)  # 题目序号
    answer_option = db.Column(db.String(30), nullable=False)  # 答案

1.6 提交信息表#

提交信息表项代码设计如下。

Bases: db.Model

收集提交记录

记录所有收集的提交记录。

Attributes:

Name Type Description
id db.Integer

主键

collection_id db.Integer

收集id(外键:关联collection_info.id)(不可为空)

collection_title db.String

收集标题(不可以为空)

submitter_name db.String

提交者姓名(不可以为空)

submit_time db.DateTime

提交时间(不可以为空),默认为datetime.datetime.now()

Source code in Flask\models.py
class Submission_info(db.Model):
    """收集提交记录

    记录所有收集的提交记录。

    Attributes:
        id (db.Integer): 主键
        collection_id (db.Integer): 收集id(外键:关联collection_info.id)(不可为空)
        collection_title (db.String): 收集标题(不可以为空)
        submitter_name (db.String): 提交者姓名(不可以为空)
        submit_time (db.DateTime): 提交时间(不可以为空),默认为datetime.datetime.now()
    """
    id = db.Column(db.Integer, primary_key=True)  # 主键
    collection_id = db.Column(db.Integer,
                              db.ForeignKey('collection_info.id',
                                            ondelete="CASCADE"),
                              nullable=False)  # 关联收集id
    collection_title = db.Column(db.String(50), nullable=False)  # 收集标题
    submitter_name = db.Column(db.String(30), nullable=False)  # 提交者名称
    submit_time = db.Column(db.DateTime, nullable=False,
                            default=datetime.datetime.now())  # 提交时间

1.7 内容信息表#

提交内容信息表项代码设计如下。

Bases: db.Model

提交内容信息表。

记录收集每一题的填写情况。

Attributes:

Name Type Description
id db.Integer

主键

submission_id db.Integer

提交记录id(外键:关联submission_info.id)(不可为空)

question_id db.Integer

题目id(外键:关联question_info.id)(不可为空)

collection_id db.Integer

收集id(不可为空)

qno db.Integer

题目序号(不可为空)

result db.String

某个人对这一题的填写结果(若为文件上传题,则此字段存放上传的文件名称)(不可为空)

Source code in Flask\models.py
class Submit_Content_info(db.Model):
    """ 提交内容信息表。

    记录收集每一题的填写情况。

    Attributes:
        id (db.Integer): 主键
        submission_id (db.Integer): 提交记录id(外键:关联submission_info.id)(不可为空)
        question_id (db.Integer): 题目id(外键:关联question_info.id)(不可为空)
        collection_id (db.Integer): 收集id(不可为空)
        qno (db.Integer): 题目序号(不可为空)
        result (db.String): 某个人对这一题的填写结果(若为文件上传题,则此字段存放上传的文件名称)(不可为空)
    """
    id = db.Column(db.Integer, primary_key=True)  # 主键
    submission_id = db.Column(db.Integer,
                              db.ForeignKey('submission_info.id',
                                            ondelete="CASCADE"),
                              nullable=False)  # 关联提交记录id
    question_id = db.Column(db.Integer,
                            db.ForeignKey('question_info.id',
                                          ondelete="CASCADE"),
                            nullable=False)  # 题目id
    collection_id = db.Column(db.Integer, nullable=False)  # 收集id
    qno = db.Column(db.Integer, nullable=False)  # 问题序号
    result = db.Column(db.String(50), nullable=False)  # 填写结果

2. 数据库 API 设计#

数据库提供了一系列操作接口方便 Flask 视图函数处理数据。

2.1 帐号信息相关#

2.1.1 修改密码#

修改密码通过 modify_password 实现。

修改密码

Parameters:

Name Type Description Default
user_id int

用户id

required
original_pswd str

原始密码

required
new_pswd str

新密码

required

Returns:

Type Description
int

若为 -1,则用户 id 不存在;若为 0,则原密码错误;若为 1,则修改成功。

Source code in Flask\db_manipulation.py
def modify_password(user_id: int, original_pswd: str, new_pswd: str) -> int:
    """修改密码

    Args:
        user_id: 用户id
        original_pswd: 原始密码
        new_pswd: 新密码

    Returns:
        若为 -1,则用户 id 不存在;若为 0,则原密码错误;若为 1,则修改成功。

    """
    user = User.query.filter_by(id=user_id).first()  # 在数据库中查询用户

    # 该用户id不存在
    if user is None:
        return -1

    # 验证原密码
    if not user.validate_password(original_pswd):
        return 0

    # 修改密码
    user.set_password(new_pswd)
    db.session.commit()
    return 1  # 修改成功

2.1.2 修改个人信息#

修改个人信息通过 modify_personal_info 实现。

修改个人信息(昵称、邮箱、邮箱授权码)

Parameters:

Name Type Description Default
user_id int

用户id

required
new_name str

新昵称

required
new_email str

新邮箱

required
authorization_code str

邮箱授权码

required

Returns:

Type Description
int

若为-1,则用户id不存在;若为1,则修改成功。

Source code in Flask\db_manipulation.py
def modify_personal_info(user_id: int, new_name: str, new_email: str, authorization_code: str) -> int:
    """修改个人信息(昵称、邮箱、邮箱授权码)

    Args:
        user_id: 用户id
        new_name: 新昵称
        new_email: 新邮箱
        authorization_code: 邮箱授权码

    Returns:
        若为-1,则用户id不存在;若为1,则修改成功。

    """
    user = User.query.get(user_id)

    # 该用户id不存在
    if user is None:
        return -1

    # 修改个人信息
    user.name = new_name
    user.email = new_email
    user.authorization_code = authorization_code
    user.email_authentication(user_email=user.email, user_pwd=user.authorization_code)
    db.session.commit()
    return 1  # 修改成功

2.2 收集信息相关#

2.2.1 添加收集问卷#

添加收集通过 add_FC 实现。

将新创建的收集存入数据库,并为每个收集分配一个收集者用户目录下的子目录,总长度为 X 位,最后一位代表收集 id。

Parameters:

Name Type Description Default
question_list list

题目信息列表

required
user_id int

用户id

required

Returns:

Name Type Description
collection_id int

收集id

Source code in Flask\db_manipulation.py
def add_FC(question_list: list, user_id: int) -> int:
    """将新创建的收集存入数据库,并为每个收集分配一个收集者用户目录下的子目录,总长度为 X 位,最后一位代表收集 id。

    Args:
        question_list: 题目信息列表
        user_id: 用户id

    Returns:
        collection_id (int): 收集id
    """
    # ! 文件类型可能有多个,设置一个计数器记录是第几个文件
    file_counter = 0  # * 文件计数器

    list_of_question_dict = deepcopy(question_list)  # ! 保存元组的列表,与字典类型的区别在于是否对 key 去重
    question_multidict = MultiDict(question_list)

    # 前端传来的deadLine为string类型,在此转化为datetime类型
    deadline = question_multidict['deadline']
    # ! 解决 00 秒的问题
    if len(deadline) < 19:
        deadline += ':00'
    deadline = deadline.replace("T", " ")
    question_multidict['deadline'] = datetime.strptime(deadline, '%Y-%m-%d %H:%M:%S')

    # * 生成应交名单路径
    # collection_counter = Collection_info.query.filter_by(creator_id=user_id).count()  # 获取当前用户创建的收集总数
    # namelist_path = current_user.userpath + '/' + str(collection_counter) + ''.join(
    #     random.sample(string.ascii_letters + string.digits, 8)
    # )  # * 总长度为 20 + 1 + 1 + 8 = 30 位

    # 创建一个文件收集对象,更新文件收集主表里
    collection = Collection_info(creator=question_multidict['collector'],
                                 creator_id=user_id,
                                 collection_title=question_multidict['collectionTitle'],
                                 description=question_multidict['description'],
                                 end_date=question_multidict['deadline'],
                                 # namelist_path=namelist_path,
                                 status=Collection_info.SAVED)
    db.session.add(collection)
    db.session.commit()  # 提交数据库会话,否则 id 为None
    collection_id = collection.id

    # ! 生成文件存储路径,最后一位固定为收集 id
    # ! 生成位置为:FileStorage / userpath / filepath
    # * 总长度为 20 + 5 + 10 = 35 位
    # file_path = current_user.userpath + '/file' + ''.join(
    #     random.sample(string.ascii_letters + string.digits, 4 - len(str(collection_id)))
    # ) + str(collection_id)
    file_path = os.path.join(
        APP_FILE,
        current_user.userpath,
        'file' + ''.join(
            random.sample(string.ascii_letters + string.digits, 10 - len(str(collection_id)))
        ) + str(collection_id)
    )
    os.makedirs(file_path)  # 创建该收集的文件存储目录

    # ! 生成应交名单路径,与文件存储路径相同
    # ! 应交名单以 .csv 格式存放在 filepath 下
    # ! 生成位置为:FileStorage / userpath / filepath / xxx.csv
    # * 更新 Collection_info 的 collection_path 属性
    collection = Collection_info.query.filter_by(id=collection_id)
    collection.update({'collection_path': file_path})
    db.session.commit()

    key_list = list(question_multidict.keys())
    # 问题的键列表
    question_key_list = [question_key for question_key in key_list if "question" in question_key]
    seq = 0

    # 更新问题主表和答案表
    for question_key in question_key_list:
        seq += 1
        # ? 若为姓名题
        if "name" in question_key:
            question = Question_info(collection_id=collection_id,
                                     qno=seq,
                                     question_type=Question_info.NAME,
                                     question_title=question_multidict[question_key],
                                     question_description=question_multidict[f'detail{seq}'])
            db.session.add(question)
            db.session.commit()

        # ? 若为学号题
        if "sno" in question_key:
            question = Question_info(collection_id=collection_id,
                                     qno=seq,
                                     question_type=Question_info.SNO,
                                     question_title=question_multidict[question_key],
                                     question_description=question_multidict[f'detail{seq}'])
            db.session.add(question)
            db.session.commit()

        # ? 若为单选题
        elif "radio" in question_key:
            question = Question_info(collection_id=collection_id,
                                     qno=seq,
                                     question_type=Question_info.SINGLE_CHOICE,
                                     question_title=question_multidict[question_key],
                                     question_description=question_multidict[f'detail{seq}'])
            db.session.add(question)
            db.session.commit()
            # 存选择题答案
            answer = Answer_info(collection_id=collection_id,
                                 question_id=question.id,
                                 qno=seq,
                                 answer_option=question_multidict[f'checked_radio{seq}'])
            db.session.add(answer)
            db.session.commit()

        # ? 若为多选题
        elif "multipleChoice" in question_key:
            question = Question_info(collection_id=collection_id,
                                     qno=seq,
                                     question_type=Question_info.MULTI_CHOICE,
                                     question_title=question_multidict[question_key],
                                     question_description=question_multidict[f'detail{seq}'])
            db.session.add(question)
            db.session.commit()
            # 存选择题答案
            ano_list = question_multidict.getlist(f'checked_mulans{seq}')
            ano = '-'.join(ano_list)
            answer = Answer_info(collection_id=collection_id,
                                 question_id=question.id,
                                 qno=seq,
                                 answer_option=ano)
            db.session.add(answer)
            db.session.commit()

        # ? 若为问卷题
        elif "question_qnaire" in question_key:
            if question_multidict[f'choose_type{seq}'] == 'single':
                qn_type = Question_info.SINGLE_QUESTIONNAIRE
            else:
                qn_type = Question_info.MULTI_QUESTIONNAIRE
            question = Question_info(collection_id=collection_id,
                                     qno=seq,
                                     question_type=qn_type,
                                     question_title=question_multidict[question_key],
                                     question_description=question_multidict[f'detail{seq}'])
            db.session.add(question)
            db.session.commit()
            # 存问卷题目各选项的内容
            option_content = question_multidict.getlist(f'qn_option{seq}')
            for i in range(len(option_content)):
                option = Option_info(collection_id=collection_id,
                                     question_id=question.id,
                                     qno=seq,
                                     option_sn=i,
                                     option_content=option_content[i])
                db.session.add(option)
            db.session.commit()

        # ? 若为文件上传题
        elif "file" in question_key:
            # 确定文件重命名规则
            file_counter += 1
            rename_rule = []
            rename_rule_list = []  # * 重命名所需的题目
            question_num = str(seq)
            for elem in list_of_question_dict:
                if elem[0] == "checked_topic" + question_num:
                    rename_rule_list.append(elem[1])

            # TODO 逻辑有待优化
            cnt = 0
            for elem in list_of_question_dict:
                if elem[1] not in rename_rule_list:
                    continue
                rename_rule.append(re.findall(r"\d+", elem[0])[0])  # ! 待添加分隔符
                cnt += 1
                if cnt >= len(rename_rule_list):  # * 防止获取到文件后面的重命名规则
                    break

            rename_rule = '-'.join(rename_rule)
            if rename_rule == '':
                rename_rule = None

            question = Question_info(
                collection_id=collection_id,
                qno=seq,
                question_type=Question_info.FILE_UPLOAD,
                question_title=question_multidict[question_key],
                question_description=question_multidict[f'detail{seq}'],
                rename_rule=rename_rule,  # * 命名规则用 - 分隔,数字代表题目序号
                # file_path=file_path + "/" + id_int_to_str(
                #     file_counter
                # )  # ! 创建一个以 file_counter 命名的子目录
                # ! 创建一个以 file_counter 命名的子目录
                # file_path=os.path.join(
                #     file_path,
                #     id_int_to_str(file_counter)
                # )
                file_path=os.path.join(
                    file_path,
                    str(file_counter)
                )
            )
            db.session.add(question)
            db.session.commit()
            path = os.path.join(APP_FILE, question.file_path)
            print("第", seq, "题的文件存储路径:", path)  # ! 调试
            try:
                os.makedirs(path)  # 创建该题的文件存储目录
            except OSError:
                print("文件存储路径错误!")

    return collection_id

2.2.2 删除收集问卷#

删除收集通过 delete_collection 实现。

删除id为collection_id的收集在数据库中的所有相关信息

Parameters:

Name Type Description Default
collection_id int

收集id

required
Source code in Flask\db_manipulation.py
def delete_collection(collection_id: int) -> None:
    """删除id为collection_id的收集在数据库中的所有相关信息

    Args:
        collection_id: 收集id

    """
    Submit_Content_info.query.filter_by(collection_id=collection_id).delete()
    Submission_info.query.filter_by(collection_id=collection_id).delete()
    Option_info.query.filter_by(collection_id=collection_id).delete()
    Answer_info.query.filter_by(collection_id=collection_id).delete()

    # 删除该收集的存储路径
    file_path = Collection_info.query.get(collection_id).collection_path
    file_path = Path(os.path.join(APP_FILE, file_path))
    shutil.rmtree(file_path)

    Question_info.query.filter_by(collection_id=collection_id).delete()
    Collection_info.query.filter_by(id=collection_id).delete()
    db.session.commit()

2.2.3 查看收集信息#

查看收集信息通过 get_question_dict 实现。

获取id为collection_id的收集的相关信息

Parameters:

Name Type Description Default
collection_id int

收集id

required

Returns:

Type Description
dict

一个字典,包含该收集的相关信息(收集标题、收集描述、创建者、截止时间、题目等等)。 格式如下: [('collectionTitle', 'ceshi'), ('collector', '凯'), ('deadline', '2022-11-18T22:31:49'), ('description', ''), ('question_name1', '姓名'), ('detail1', ''), ('question_sno2', '学号'), ('detail2', ''), ('question_file3', '文件'), ('detail3', ''), ('question_radio4', '单选题'), ('detail4', ''), ('checked_radio4', 'A'), ('question_multipleChoice5', '多选题'), ('detail5', ''), ('checked_mulans5', 'B'), ('checked_mulans5', 'C'), ('question_qnaire6', '问卷题目'), ('detail6', ''), ('qn_option6', 'asdf'), ('qn_option6', 'adff'), ('choose_type6', 'single')]

Source code in Flask\db_manipulation.py
def get_question_dict(collection_id: int) -> dict:
    """获取id为collection_id的收集的相关信息

    Args:
        collection_id: 收集id

    Returns:
        (dict):
            一个字典,包含该收集的相关信息(收集标题、收集描述、创建者、截止时间、题目等等)。
            格式如下:
            [('collectionTitle', 'ceshi'),
            ('collector', '凯'),
            ('deadline', '2022-11-18T22:31:49'),
            ('description', ''),
            ('question_name1', '姓名'),
            ('detail1', ''),
            ('question_sno2', '学号'),
            ('detail2', ''),
            ('question_file3', '文件'),
            ('detail3', ''),
            ('question_radio4', '单选题'),
            ('detail4', ''),
            ('checked_radio4', 'A'),
            ('question_multipleChoice5', '多选题'),
            ('detail5', ''),
            ('checked_mulans5', 'B'),
            ('checked_mulans5', 'C'),
            ('question_qnaire6', '问卷题目'),
            ('detail6', ''),
            ('qn_option6', 'asdf'),
            ('qn_option6', 'adff'),
            ('choose_type6', 'single')]
    """
    seq = 0
    question = {}
    collection = Collection_info.query.get(collection_id)
    if collection is None:
        return None
    seq += 1
    question[f'{seq}_collectionTitle'] = collection.collection_title
    seq += 1
    question[f'{seq}_collector'] = collection.creator
    seq += 1
    question[f'{seq}_deadline'] = collection.end_date.strftime("%Y-%m-%d %H:%M:%S")
    seq += 1
    question[f'{seq}_description'] = collection.description
    question_list = Question_info.query.filter_by(collection_id=collection_id).order_by("qno").all()
    for q in question_list:
        # 若是姓名题
        if q.question_type == Question_info.NAME:
            seq += 1
            question[f'{seq}_question_name{q.qno}'] = q.question_title
            seq += 1
            question[f'{seq}_detail{q.qno}'] = q.question_description

        # 若是姓名题
        if q.question_type == Question_info.SNO:
            seq += 1
            question[f'{seq}_question_sno{q.qno}'] = q.question_title
            seq += 1
            question[f'{seq}_detail{q.qno}'] = q.question_description

        # 若是文件上传题
        if q.question_type == Question_info.FILE_UPLOAD:
            seq += 1
            question[f'{seq}_question_file{q.qno}'] = q.question_title
            seq += 1
            question[f'{seq}_detail{q.qno}'] = q.question_description
            # 重命名规则
            if q.rename_rule is None:
                seq += 1
                question[f'{seq}_checked_topic{q.qno}'] = ''
            else:
                qno_list = list(map(int, q.rename_rule.split('-')))
                for qno in qno_list:
                    seq += 1
                    question[f'{seq}_checked_topic{q.qno}'] = Question_info.query. \
                        filter_by(collection_id=collection_id, qno=qno).first().question_title

        # 若是单选题
        if q.question_type == Question_info.SINGLE_CHOICE:
            seq += 1
            question[f'{seq}_question_radio{q.qno}'] = q.question_title
            seq += 1
            question[f'{seq}_detail{q.qno}'] = q.question_description
            # 单选题答案
            seq += 1
            question[f'{seq}_checked_radio{q.qno}'] = Answer_info.query. \
                filter_by(question_id=q.id).first().answer_option

        # 若是多选题
        if q.question_type == Question_info.MULTI_CHOICE:
            seq += 1
            question[f'{seq}_question_multipleChoice{q.qno}'] = q.question_title
            seq += 1
            question[f'{seq}_detail{q.qno}'] = q.question_description
            # 多选题答案
            answer_list = Answer_info.query.filter_by(question_id=q.id).first().answer_option.split('-')
            for answer in answer_list:
                seq += 1
                question[f'{seq}_checked_mulans{q.qno}'] = answer

        # 若是问卷题
        if q.question_type == Question_info.SINGLE_QUESTIONNAIRE or \
                q.question_type == Question_info.MULTI_QUESTIONNAIRE:
            seq += 1
            question[f'{seq}_question_qnaire{q.qno}'] = q.question_title
            seq += 1
            question[f'{seq}_detail{q.qno}'] = q.question_description
            option_list = Option_info.query.filter_by(question_id=q.id).order_by("option_sn").all()
            for option in option_list:
                seq += 1
                question[f'{seq}_qn_option{q.qno}'] = option.option_content
            if q.question_type == Question_info.SINGLE_QUESTIONNAIRE:
                seq += 1
                question[f'{seq}_choose_type{q.qno}'] = "single"
            else:
                seq += 1
                question[f'{seq}_choose_type{q.qno}'] = "multiple"

    return question

2.2.4 修改收集信息#

修改已创建收集的信息通过 modify_collection 实现。

修改已创建的收集

Parameters:

Name Type Description Default
collection_id int

收集id

required
question_list list

问题信息列表

required
Source code in Flask\db_manipulation.py
def modify_collection(collection_id: int, question_list: list) -> None:
    """修改已创建的收集

    Args:
        collection_id: 收集id
        question_list: 问题信息列表
    """
    question_multidict = MultiDict(question_list)

    # 前端传来的deadLine为string类型,在此转化为datetime类型
    deadline = question_multidict['deadline']
    if len(deadline) < 19:
        deadline += ':00'  # ! 解决 00 秒的问题
    deadline = deadline.replace("T", " ")
    question_multidict['deadline'] = datetime.strptime(deadline, '%Y-%m-%d %H:%M:%S')

    # 更新Collection_info表中的信息
    collection = Collection_info.query.filter_by(id=collection_id)
    collection.update({'start_date': datetime.now(),
                       'collection_title': question_multidict['collectionTitle'],
                       'creator': question_multidict['collector'],
                       'description': question_multidict['description'],
                       'end_date': question_multidict['deadline']})
    db.session.commit()

    # 问题的键列表
    key_list = list(question_multidict.keys())
    question_key_list = [question_key for question_key in key_list if "question" in question_key]
    seq = 0
    for question_key in question_key_list:
        seq += 1
        question = Question_info.query.filter_by(collection_id=collection_id, qno=seq)

        if 'name' in question_key:
            question.update({'question_title': question_multidict[f'question_name{seq}'],
                             'question_description': question_multidict[f'detail{seq}']})

        elif 'sno' in question_key:
            question.update({'question_title': question_multidict[f'question_sno{seq}'],
                             'question_description': question_multidict[f'detail{seq}']})

        elif 'file' in question_key:
            # 确定文件重命名规则
            rename_rule = []
            rename_rule_list = []  # * 重命名所需的题目
            question_num = str(seq)
            for elem in question_list:
                if elem[0] == "checked_topic" + question_num:
                    rename_rule_list.append(elem[1])
            cnt = 0
            for elem in question_list:
                if elem[1] not in rename_rule_list:
                    continue
                rename_rule.append(re.findall(r"\d+", elem[0])[0])  # ! 待添加分隔符
                cnt += 1
                if cnt >= len(rename_rule_list):  # * 防止获取到文件后面的重命名规则
                    break

            rename_rule = '-'.join(rename_rule)
            if rename_rule == '':
                rename_rule = None

            question.update({'question_title': question_multidict[f'question_file{seq}'],
                             'question_description': question_multidict[f'detail{seq}'],
                             'rename_rule': rename_rule})

        elif 'radio' in question_key:
            question.update({'question_title': question_multidict[f'question_radio{seq}'],
                             'question_description': question_multidict[f'detail{seq}']})
            # 更新答案
            answer = Answer_info.query.filter_by(collection_id=collection_id, qno=seq)
            answer.update({'answer_option': question_multidict[f'checked_radio{seq}']})

        elif 'multipleChoice' in question_key:
            question.update({'question_title': question_multidict[f'question_multipleChoice{seq}'],
                             'question_description': question_multidict[f'detail{seq}']})

            # 更新答案
            ano_list = question_multidict.getlist(f'checked_mulans{seq}')
            ano = '-'.join(ano_list)
            answer = Answer_info.query.filter_by(collection_id=collection_id, qno=seq)
            answer.update({'answer_option': ano})

        elif 'qnaire' in question_key:
            question.update({'question_title': question_multidict[f'question_qnaire{seq}'],
                             'question_description': question_multidict[f'detail{seq}']})

            if question_multidict[f'choose_type{seq}'] == 'single':
                question.update({'question_type': Question_info.SINGLE_QUESTIONNAIRE})
            else:
                question.update({'question_type': Question_info.MULTI_QUESTIONNAIRE})

            # 先删除原来的选项内容,在创建新的
            Option_info.query.filter_by(question_id=question.first().id).delete()

            # 更新选项
            option_content = question_multidict.getlist(f'qn_option{seq}')
            for index, value in enumerate(option_content):
                option = Option_info(collection_id=collection_id,
                                     question_id=question.first().id,
                                     qno=seq,
                                     option_sn=index,
                                     option_content=value)
                db.session.add(option)

        db.session.commit()

2.2.5 获取提交信息#

获取某个收集的提交信息通过 submission_record 实现。

获取id为collection_id的收集的提交记录(姓名,提交时间,文件数量,文件详情)

Parameters:

Name Type Description Default
collection_id int

收集id

required

Returns:

Type Description
list

一个元组列表,每个元组表示一条提交信息。

For example: [('计胜翔', datetime.datetime(2022, 11, 5, 20, 25, 32, 142115), 2, ['jsx1.pdf', 'jsx2.doc']), ('张隽翊', datetime.datetime(2022, 11, 5, 20, 25, 32, 142115), 1, ['zjy1.pdf'])]

Source code in Flask\db_manipulation.py
def submission_record(collection_id: int) -> list:
    """获取id为collection_id的收集的提交记录(姓名,提交时间,文件数量,文件详情)

    Args:
        collection_id: 收集id

    Returns:
        (list):
            一个元组列表,每个元组表示一条提交信息。

            For example:
            [('计胜翔', datetime.datetime(2022, 11, 5, 20, 25, 32, 142115), 2, ['jsx1.pdf', 'jsx2.doc']),
            ('张隽翊', datetime.datetime(2022, 11, 5, 20, 25, 32, 142115), 1, ['zjy1.pdf'])]
    """
    # 获取提交名单列表
    name_list = Submission_info.query. \
        filter_by(collection_id=collection_id). \
        order_by("id"). \
        with_entities(Submission_info.submitter_name). \
        all()
    name_list = list(map(itemgetter(0), name_list))

    # 获取提交时间列表
    time_list = Submission_info.query. \
        filter_by(collection_id=collection_id). \
        order_by("id"). \
        with_entities(Submission_info.submit_time). \
        all()
    time_list = list(map(itemgetter(0), time_list))

    # 获取提交信息id列表
    submission_id_list = Submission_info.query. \
        filter_by(collection_id=collection_id). \
        order_by('id'). \
        with_entities(Submission_info.id). \
        all()
    submission_id_list = list(map(itemgetter(0), submission_id_list))

    # 获取文件上传题的问题id列表
    question_id_list = Question_info.query. \
        filter_by(collection_id=collection_id, question_type=Question_info.FILE_UPLOAD). \
        with_entities(Question_info.id).all()
    question_id_list = list(map(itemgetter(0), question_id_list))

    file_num_list = []
    for id in submission_id_list:
        num = Submit_Content_info.query. \
            filter(Submit_Content_info.submission_id == id,
                   Submit_Content_info.question_id.in_(question_id_list)).count()
        file_num_list.append(num)

    # 构建文件详情列表
    file_list = []
    for id in submission_id_list:
        file = Submit_Content_info.query.filter(Submit_Content_info.submission_id == id,
                                                Submit_Content_info.question_id.in_(question_id_list)). \
            with_entities(Submit_Content_info.result). \
            all()
        file = list(map(itemgetter(0), file))
        file_list.append(file)

    record = list(zip(name_list, time_list, file_num_list, file_list))
    # 对元组列表根据submit_time进行降序排序
    record = list(reversed(sorted(record, key=lambda x: (x[1].timestamp(), x[0]))))
    return record

对应的另一个版本是 submission_record_v2,相比 submission_record 多返回了提交 id。

获取id为collection_id的收集的提交记录(姓名,提交时间,文件数量,文件详情)

Parameters:

Name Type Description Default
collection_id int

收集id

required

Returns:

Type Description
list

一个元组列表,每个元组表示一条提交信息,元组按Submission.id排序。 每个元组格式为(提交记录id: int, 姓名: string, 提交时间: datetime, 文件数量: int, 文件详情: list)

例如: [(1, '计胜翔', datetime.datetime(2022, 11, 5, 20, 25, 32, 142115), 2, ['jsx1.pdf', 'jsx2.doc']), (2, '张隽翊', datetime.datetime(2022, 11, 5, 20, 25, 32, 142115), 1, ['zjy1.pdf'])]

Source code in Flask\db_manipulation.py
def submission_record_v2(collection_id: int) -> list:
    """获取id为collection_id的收集的提交记录(姓名,提交时间,文件数量,文件详情)

    Args:
        collection_id: 收集id

    Returns:
        (list):
            一个元组列表,每个元组表示一条提交信息,元组按Submission.id排序。
            每个元组格式为(提交记录id: int, 姓名: string, 提交时间: datetime, 文件数量: int, 文件详情: list)

            例如:
            [(1, '计胜翔', datetime.datetime(2022, 11, 5, 20, 25, 32, 142115), 2, ['jsx1.pdf', 'jsx2.doc']),
            (2, '张隽翊', datetime.datetime(2022, 11, 5, 20, 25, 32, 142115), 1, ['zjy1.pdf'])]
    """
    # 获取提交记录id
    id_list = Submission_info.query. \
        filter_by(collection_id=collection_id). \
        order_by("id"). \
        with_entities(Submission_info.id). \
        all()
    id_list = list(map(itemgetter(0), id_list))

    # 获取提交名单列表
    name_list = Submission_info.query. \
        filter_by(collection_id=collection_id). \
        order_by("id"). \
        with_entities(Submission_info.submitter_name). \
        all()
    name_list = list(map(itemgetter(0), name_list))

    # 获取提交时间列表
    time_list = Submission_info.query. \
        filter_by(collection_id=collection_id). \
        order_by("id"). \
        with_entities(Submission_info.submit_time). \
        all()
    time_list = list(map(itemgetter(0), time_list))

    # 获取提交信息id列表
    submission_id_list = Submission_info.query. \
        filter_by(collection_id=collection_id). \
        order_by('id'). \
        with_entities(Submission_info.id). \
        all()
    submission_id_list = list(map(itemgetter(0), submission_id_list))

    # 获取文件上传题的问题id列表
    question_id_list = Question_info.query. \
        filter_by(collection_id=collection_id, question_type=Question_info.FILE_UPLOAD). \
        with_entities(Question_info.id).all()
    question_id_list = list(map(itemgetter(0), question_id_list))

    file_num_list = []
    for id in submission_id_list:
        num = Submit_Content_info.query. \
            filter(Submit_Content_info.submission_id == id,
                   Submit_Content_info.question_id.in_(question_id_list)).count()
        file_num_list.append(num)

    # 构建文件详情列表
    file_list = []
    for id in submission_id_list:
        file = Submit_Content_info.query.filter(Submit_Content_info.submission_id == id,
                                                Submit_Content_info.question_id.in_(question_id_list)). \
            with_entities(Submit_Content_info.result). \
            all()
        file = list(map(itemgetter(0), file))
        file_list.append(file)

    record = list(zip(id_list, name_list, time_list, file_num_list, file_list))
    # 对元组列表根据submit_time进行降序排序
    record = list(reversed(sorted(record, key=lambda x: (x[2].timestamp(), x[0]))))
    return record

2.2.6 添加提交信息#

向数据库中添加用户填写的内容通过 save_submission 实现。

保存收集提交内容

Parameters:

Name Type Description Default
submission_list list

提交信息列表

required
collection_id int

收集id

required
file werkzeug.datastructures.ImmutableMultiDict

网页提交表单中的文件数据

required

Returns:

Type Description
int

若提交时间超过收集截止时间,则返回-1;

int

若未超时,则返回提交记录id。

Source code in Flask\db_manipulation.py
def save_submission(submission_list: list, collection_id: int, file: werkzeug.datastructures.ImmutableMultiDict) -> int:
    """保存收集提交内容

    Args:
        submission_list: 提交信息列表
        collection_id: 收集id
        file: 网页提交表单中的文件数据

    Returns:
        若提交时间超过收集截止时间,则返回-1;
        若未超时,则返回提交记录id。
    """
    submission_multidict = MultiDict(submission_list)
    key_list = list(submission_multidict.keys())  # 提取问题的键值列表
    name_key_list = list(filter(lambda x: x.find("name") >= 0, key_list))
    if len(name_key_list) != 0:
        name_key = name_key_list[0]
        qno = re.findall(r"\d+", name_key)[0]
        # 创建一个提交记录,并加入数据库
        submission = Submission_info(collection_id=collection_id,
                                     submitter_name=submission_multidict['submit_name' + qno],
                                     submit_time=datetime.now())
    else:
        submission = Submission_info(collection_id=collection_id,
                                     submitter_name='',
                                     submit_time=datetime.now())

    # ! 判断提交时间是否超过截止时间
    deadline = Collection_info.query.get(collection_id).end_date
    diff = (submission.submit_time - deadline).total_seconds()
    if diff > 0:
        print('提交时间超时!')
        return -1

    # 若未超时
    submission.collection_title = Collection_info.query.get(collection_id).collection_title
    db.session.add(submission)
    db.session.commit()
    submission_id = submission.id  # 获得该提交记录的id

    key_list = [key for key in key_list if "question" in key]
    seq = 0
    for key in key_list:
        seq += 1
        submit_content = Submit_Content_info(submission_id=submission_id,
                                             collection_id=collection_id,
                                             qno=seq)
        question_id = Question_info.query.filter_by(collection_id=collection_id, qno=seq).first().id
        submit_content.question_id = question_id

        # 若为姓名题
        if "name" in key:
            submit_content.result = submission_multidict[f'submit_name{seq}']

        # 若为学号题
        elif "sno" in key:
            submit_content.result = submission_multidict[f'submit_sno{seq}']

        # 若为文件上传题
        elif "file" in key:
            filename = file.get(f'submit_file{seq}').filename
            submit_content.result = filename

        # 若为单选题
        elif "radio" in key:
            submit_content.result = submission_multidict[f'submit_checked_radio{seq}']

        # 若为多选题
        elif "multipleChoice" in key:
            result = submission_multidict.getlist(f"submit_checked_mulans{seq}")
            result = '-'.join(result)
            submit_content.result = result

        elif "qnaire" in key:
            result = submission_multidict.getlist(f"submit_checked_qnaire{seq}")
            result = '-'.join(result)
            submit_content.result = result

        db.session.add(submit_content)

    db.session.commit()
    return submission_id

2.2.7 提交文件存储#

将提交者上传的文件存储到正确的位置通过 file_upload 实现。

将提交的文件重命名后,存储到题目相应的路径中

Parameters:

Name Type Description Default
collection_id int

收集id

required
question_list list

问题信息列表

required
file werkzeug.datastructures.ImmutableMultiDict

网页提交表单中的文件数据

required

Returns:

Type Description
werkzeug.datastructures.ImmutableMultiDict

返回重命名后的表单中的文件数据,以便调用save_submission函数时使用

Source code in Flask\db_manipulation.py
def file_upload(collection_id: int,
                question_list: list,
                file: werkzeug.datastructures.ImmutableMultiDict) -> werkzeug.datastructures.ImmutableMultiDict:
    """将提交的文件重命名后,存储到题目相应的路径中

    Args:
        collection_id: 收集id
        question_list: 问题信息列表
        file: 网页提交表单中的文件数据

    Returns:
        返回重命名后的表单中的文件数据,以便调用save_submission函数时使用
    """
    question_multidict = MultiDict(question_list)
    key_list = list(question_multidict.keys())
    file_key_list = list(filter(lambda x: 'file' in x, key_list))
    submit_key_list = list(filter(lambda x: 'submit' in x, key_list))
    for file_key in file_key_list:
        qno_str = re.findall(r"\d+", file_key)[0]
        question = Question_info.query.filter_by(collection_id=collection_id, qno=int(qno_str)).first()
        f = file['submit_file' + qno_str]

        # 确定文件存储路径
        # path = './FileStorage/' + question.file_path
        path = os.path.join(APP_FILE, question.file_path)

        # 重命名文件
        rename_rule = question.rename_rule
        if rename_rule is not None:  # 若重命名规则不为空
            filename_list = f.filename.split('.')  # 将文件名分为名称和后缀两部分,便于后面修改名称
            new_filename = ''
            rename_qno_list = rename_rule.split('-')  # 重命名依赖的题目序号列表
            for index, qno in enumerate(rename_qno_list):
                key = list(filter(lambda x: qno in x, submit_key_list))[0]
                new_filename += question_multidict[key]
                if index != len(rename_qno_list) - 1:
                    new_filename += '_'
            filename_list[0] = new_filename
            f.filename = '.'.join(filename_list)  # 修改文件名

        # print("重命名后文件名为:", f.filename)
        # 保存文件到指定路径
        f.save(os.path.join(path, f.filename))

    return file

2.3 状态更新相关#

2.3.1 计算收集截止倒计时#

计算某个收集还有多长时间截止的倒计时通过 deadline_countdown 实现。

截止倒计时

Parameters:

Name Type Description Default
collection_id int

收集id

required

Returns:

Type Description
Datetime

截止倒计时。

Source code in Flask\db_manipulation.py
def deadline_countdown(collection_id: int):
    """ 截止倒计时

    Args:
        collection_id: 收集id

    Returns:
        (Datetime): 截止倒计时。
    """
    current_time = datetime.now()  # 获取当前时间
    deadline = Collection_info.query.get(collection_id).end_date  # 查询问卷截止时间
    return deadline - current_time  # 返回倒计时

2.3.4 修改收集状态为截止#

将收集的状态修改为“已截止”通过 stop_collection 实现。

停止收集

Parameters:

Name Type Description Default
collection_id int

收集id

required
action_list list

操作码列表

required
Source code in Flask\db_manipulation.py
def stop_collection(collection_id: int, action_list: list) -> None:
    """停止收集

    Args:
        collection_id: 收集id
        action_list: 操作码列表
    """
    collection = Collection_info.query.filter_by(id=collection_id)
    collection.update({'status': Collection_info.FINISHED})  # 状态标记为已截止
    new_ddl = action_list[2]
    new_ddl = datetime.strptime(new_ddl, '%Y-%m-%d %H:%M:%S')
    collection.update({'end_date': new_ddl})
    db.session.commit()

2.4 统计汇总相关#

2.4.1 统计提交数量#

统计提交数量通过 count_submission 实现。

统计一个收集的提交数量

Parameters:

Name Type Description Default
collection_id int

收集id

None

Returns:

Type Description
int

若collection_id不为 None,则返回该问卷的提交数量,是一个整数;否则返回None。

Source code in Flask\db_manipulation.py
def count_submission(collection_id: int = None) -> int:
    """统计一个收集的提交数量

    Args:
        collection_id: 收集id

    Returns:
        若collection_id不为 None,则返回该问卷的提交数量,是一个整数;否则返回None。
    """

    # 先看是否给了参数collection_id
    if collection_id is not None:
        return Submission_info.query.filter_by(collection_id=collection_id).count()

    return None

2.4.2 统计已收文件数#

统计已收文件数通过 count_filenum 实现。

统计一个收集的已收文件数

Parameters:

Name Type Description Default
collection_id int

收集id

None

Returns:

Type Description
int

若collection_id不为None,则返回该问卷的已收文件数,是一个整数;否则返回None。

Source code in Flask\db_manipulation.py
def count_filenum(collection_id: int = None) -> int:
    """统计一个收集的已收文件数

    Args:
        collection_id: 收集id

    Returns:
        若collection_id不为None,则返回该问卷的已收文件数,是一个整数;否则返回None。
    """

    # 若没给参数question_id,但给了参数collection_id
    if collection_id is not None:
        # 查询收集中所有文件上传题的id
        question_id_list = Question_info.query.filter_by(
            collection_id=collection_id, question_type=Question_info.FILE_UPLOAD
        ).with_entities(Question_info.id).all()
        question_id_list = list(map(itemgetter(0), question_id_list))
        file_num = 0
        # 遍历该收集中所有文件上传题,统计已收文件总数
        for q_id in question_id_list:
            # path = './FileStorage/' + Question_info.query.filter_by(id=q_id).first().file_path
            path = os.path.join(APP_FILE, Question_info.query.filter_by(id=q_id).first().file_path)
            files = os.listdir(path)
            file_num += len(files)
        return file_num

    return None

2.4.3 获取收集信息和提交记录#

获取收集信息和指定 id 的用户提交内容通过 get_submission_dict 实现。

获取id为collection_id的收集、提交记录id为submission_id的用户提交内容信息

Parameters:

Name Type Description Default
collection_id int

收集id

required
submission_id int

提交记录id

required

Returns:

Type Description
dict

一个字典,包含该提交记录中用户的提交内容。

格式如下: {'1_collectionTitle': '核酸检测', '2_collector': '张三', '3_deadline': '2022-11-15 15:23:09', '4_description': '', '5_question_name1': '姓名', '6_detail1': '', '7_submit_name1': '王广凯', '8_question_sno2': '学号', '9_detail2': '', '10_submit_sno2': 'U202012345', '11_question_file3': '文件', '12_detail3': '', '13_submit_file3': '系统设计.md', '14_question_radio4': '单选题', '15_detail4': '', '16_checked_radio4': 'A', '17_submit_radio4': 'B', '18_question_multipleChoice5': '多选题', '19_detail5': '', '20_checked_mulans5': 'C', '21_checked_mulans5': 'D', '22_submit_mulans5': 'A', '23_submit_mulans5': 'B', '24_question_qnaire6': '问卷题目', '25_detail6': '是否已做核酸', '26_qn_option6': '是', '27_qn_option6': '否', '28_submit_qnaire6': '2'}

Source code in Flask\db_manipulation.py
def get_submission_dict(collection_id: int, submission_id: int) -> dict:
    """获取id为collection_id的收集、提交记录id为submission_id的用户提交内容信息

    Args:
        collection_id: 收集id
        submission_id: 提交记录id

    Returns:
        (dict):
            一个字典,包含该提交记录中用户的提交内容。

            格式如下:
            {'1_collectionTitle': '核酸检测',
            '2_collector': '张三',
            '3_deadline': '2022-11-15 15:23:09',
            '4_description': '',
            '5_question_name1': '姓名',
            '6_detail1': '',
            '7_submit_name1': '王广凯',
            '8_question_sno2': '学号',
            '9_detail2': '',
            '10_submit_sno2': 'U202012345',
            '11_question_file3': '文件',
            '12_detail3': '',
            '13_submit_file3': '系统设计.md',
            '14_question_radio4': '单选题',
            '15_detail4': '',
            '16_checked_radio4': 'A',
            '17_submit_radio4': 'B',
            '18_question_multipleChoice5': '多选题',
            '19_detail5': '',
            '20_checked_mulans5': 'C',
            '21_checked_mulans5': 'D',
            '22_submit_mulans5': 'A',
            '23_submit_mulans5': 'B',
            '24_question_qnaire6': '问卷题目',
            '25_detail6': '是否已做核酸',
            '26_qn_option6': '是',
            '27_qn_option6': '否',
            '28_submit_qnaire6': '2'}
    """
    seq = 0
    submission = {}
    collection = Collection_info.query.get(collection_id)
    if collection is None:
        return None
    seq += 1
    submission[f'{seq}_collectionTitle'] = collection.collection_title
    seq += 1
    submission[f'{seq}_collector'] = collection.creator
    seq += 1
    submission[f'{seq}_deadline'] = collection.end_date.strftime("%Y-%m-%d %H:%M:%S")
    seq += 1
    submission[f'{seq}_description'] = collection.description
    question_list = Question_info.query.filter_by(collection_id=collection_id).order_by("qno").all()
    submission_content_list = Submit_Content_info.query.filter_by(submission_id=submission_id).order_by("qno").all()
    for q, s in list(zip(question_list, submission_content_list)):
        # 若是姓名题
        if q.question_type == Question_info.NAME:
            seq += 1
            submission[f'{seq}_question_name{q.qno}'] = q.question_title
            seq += 1
            submission[f'{seq}_detail{q.qno}'] = q.question_description
            seq += 1
            submission[f'{seq}_submit_name{q.qno}'] = s.result

        # 若是姓名题
        if q.question_type == Question_info.SNO:
            seq += 1
            submission[f'{seq}_question_sno{q.qno}'] = q.question_title
            seq += 1
            submission[f'{seq}_detail{q.qno}'] = q.question_description
            seq += 1
            submission[f'{seq}_submit_sno{q.qno}'] = s.result

        # 若是文件上传题
        if q.question_type == Question_info.FILE_UPLOAD:
            seq += 1
            submission[f'{seq}_question_file{q.qno}'] = q.question_title
            seq += 1
            submission[f'{seq}_detail{q.qno}'] = q.question_description
            seq += 1
            submission[f'{seq}_submit_file{q.qno}'] = s.result

        # 若是单选题
        if q.question_type == Question_info.SINGLE_CHOICE:
            seq += 1
            submission[f'{seq}_question_radio{q.qno}'] = q.question_title
            seq += 1
            submission[f'{seq}_detail{q.qno}'] = q.question_description
            # 单选题答案
            seq += 1
            submission[f'{seq}_checked_radio{q.qno}'] = Answer_info.query. \
                filter_by(question_id=q.id).first().answer_option
            # 用户提交答案
            seq += 1
            submission[f'{seq}_submit_radio{q.qno}'] = s.result

        # 若是多选题
        if q.question_type == Question_info.MULTI_CHOICE:
            seq += 1
            submission[f'{seq}_question_multipleChoice{q.qno}'] = q.question_title
            seq += 1
            submission[f'{seq}_detail{q.qno}'] = q.question_description
            # 多选题答案
            answer_list = Answer_info.query.filter_by(question_id=q.id).first().answer_option.split('-')
            for answer in answer_list:
                seq += 1
                submission[f'{seq}_checked_mulans{q.qno}'] = answer
            # 用户提交答案
            submit_option_list = s.result.split('-')
            for submit_option in submit_option_list:
                seq += 1
                submission[f'{seq}_submit_mulans{q.qno}'] = submit_option

        # 若是问卷题
        if q.question_type == Question_info.SINGLE_QUESTIONNAIRE or \
                q.question_type == Question_info.MULTI_QUESTIONNAIRE:
            seq += 1
            submission[f'{seq}_question_qnaire{q.qno}'] = q.question_title
            seq += 1
            submission[f'{seq}_detail{q.qno}'] = q.question_description
            # 选项内容
            option_list = Option_info.query.filter_by(question_id=q.id).order_by("option_sn").all()
            for option in option_list:
                seq += 1
                submission[f'{seq}_qn_option{q.qno}'] = option.option_content

            # 用户提交选项
            submit_option_list = s.result.split('-')
            for submit_option in submit_option_list:
                seq += 1
                submission[f'{seq}_submit_qnaire{q.qno}'] = submit_option

    return submission

2.4.4 统计答题情况#

统计选择题、问卷题答题情况通过 collection_data_statistics 实现。

对收集中的选择题、问卷题的答题情况进行数据统计

Parameters:

Name Type Description Default
collection_id int

收集id

required

Returns:

Name Type Description
choice_statistics dict

选择题答题情况数据统计。若收集中无选择题,则返回None;否则返回一个字典,格式如下: choice_data = {'question_1': { 'questionName': '单选题', 'correctAnswer': 'A', 'accuracy': 0.25, 'A': ['王梓熙'], 'B': ['张隽翊'], 'C': ['王广凯'], 'D': ['计胜翔'] }, 'question_2': { 'questionName': '多选题', 'correctAnswer': 'C-D', 'accuracy': 0.25, 'A': ['王梓熙', '计胜翔'], 'B': ['王梓熙', '张隽翊'], 'C': ['张隽翊', '王广凯'], 'D': ['王广凯', '计胜翔'] } }

qnaire_statistics dict

问卷题答题情况数据统计。若收集中无问卷题,则返回None;否则返回一个字典,格式如下: qnaire_data = {'question_1': { 'questionName': '你喜欢吃屎吗?', 'optionNumber': 2, 'option_1': { 'optionName': '喜欢', 'peopleNumber': 3, 'people': ['王梓熙', '张隽翊', '王广凯'] }, 'option_2': { 'optionName': '不喜欢', 'peopleNumber': 1, 'people': ['计胜翔'] } } }

Source code in Flask\db_manipulation.py
def collection_data_statistics(collection_id: int) -> dict:
    """对收集中的选择题、问卷题的答题情况进行数据统计

    Args:
        collection_id: 收集id

    Returns:
        choice_statistics: 选择题答题情况数据统计。若收集中无选择题,则返回None;否则返回一个字典,格式如下:
                            choice_data = {'question_1': {
                                                'questionName': '单选题',
                                                'correctAnswer': 'A',
                                                'accuracy': 0.25,
                                                'A': ['王梓熙'],
                                                'B': ['张隽翊'],
                                                'C': ['王广凯'],
                                                'D': ['计胜翔']
                                            },
                                           'question_2': {
                                               'questionName': '多选题',
                                               'correctAnswer': 'C-D',
                                               'accuracy': 0.25,
                                               'A': ['王梓熙', '计胜翔'],
                                               'B': ['王梓熙', '张隽翊'],
                                               'C': ['张隽翊', '王广凯'],
                                               'D': ['王广凯', '计胜翔']
                                           }
                            }
        qnaire_statistics: 问卷题答题情况数据统计。若收集中无问卷题,则返回None;否则返回一个字典,格式如下:
                            qnaire_data = {'question_1': {
                                                'questionName': '你喜欢吃屎吗?',
                                                'optionNumber': 2,
                                                'option_1': {
                                                    'optionName': '喜欢',
                                                    'peopleNumber': 3,
                                                    'people': ['王梓熙', '张隽翊', '王广凯']
                                                },
                                                'option_2': {
                                                    'optionName': '不喜欢',
                                                    'peopleNumber': 1,
                                                    'people': ['计胜翔']
                                                }
                                        }
                            }
    """
    choice_qtype = [Question_info.SINGLE_CHOICE, Question_info.MULTI_CHOICE]
    qnaire_qtype = [Question_info.SINGLE_QUESTIONNAIRE, Question_info.MULTI_QUESTIONNAIRE]

    # 查找选择题
    choice_qlist = Question_info.query.filter(Question_info.collection_id == collection_id,
                                              Question_info.question_type.in_(choice_qtype)). \
        with_entities(Question_info.id, Question_info.question_title).all()

    # 查看是否由提交
    submission = Submission_info.query.filter_by(collection_id=collection_id).all()

    # 判断是否存在选择题
    if len(choice_qlist) == 0 or len(submission) == 0:
        choice_statistics = None
    else:
        choice_statistics = {}
        seq = 0
        for id, title in choice_qlist:
            seq += 1
            detail = {}
            detail['questionName'] = title
            answer = Answer_info.query.filter_by(question_id=id).first().answer_option
            detail['correctAnswer'] = answer
            submit_content_list = Submit_Content_info.query.filter_by(question_id=id). \
                with_entities(Submit_Content_info.submission_id, Submit_Content_info.result). \
                all()
            id_list, result_list = zip(*submit_content_list)
            id_list, result_list = list(id_list), list(result_list)
            detail['accuracy'] = result_list.count(answer) / len(result_list)  # 计算此题正确率

            # 将submission_id根据选择的选项进行分类
            A_list = list(filter(lambda x: 'A' in x[1], submit_content_list))
            A_id_list = list(map(itemgetter(0), A_list))
            B_list = list(filter(lambda x: 'B' in x[1], submit_content_list))
            B_id_list = list(map(itemgetter(0), B_list))
            C_list = list(filter(lambda x: 'C' in x[1], submit_content_list))
            C_id_list = list(map(itemgetter(0), C_list))
            D_list = list(filter(lambda x: 'D' in x[1], submit_content_list))
            D_id_list = list(map(itemgetter(0), D_list))
            name_list = Submission_info.query.filter(Submission_info.id.in_(A_id_list)). \
                with_entities(Submission_info.submitter_name). \
                all()
            name_list = list(map(itemgetter(0), name_list))
            detail['A'] = name_list
            name_list = Submission_info.query.filter(Submission_info.id.in_(B_id_list)). \
                with_entities(Submission_info.submitter_name). \
                all()
            name_list = list(map(itemgetter(0), name_list))
            detail['B'] = name_list
            name_list = Submission_info.query.filter(Submission_info.id.in_(C_id_list)). \
                with_entities(Submission_info.submitter_name). \
                all()
            name_list = list(map(itemgetter(0), name_list))
            detail['C'] = name_list
            name_list = Submission_info.query.filter(Submission_info.id.in_(D_id_list)). \
                with_entities(Submission_info.submitter_name). \
                all()
            name_list = list(map(itemgetter(0), name_list))
            detail['D'] = name_list

            choice_statistics[f'question_{seq}'] = detail

    # 查找问卷题
    qnaire_qlist = Question_info.query.filter(Question_info.collection_id == collection_id,
                                              Question_info.question_type.in_(qnaire_qtype)). \
        with_entities(Question_info.id, Question_info.question_title).all()

    # 判断是否存在问卷题
    if len(qnaire_qlist) == 0 or len(submission) == 0:
        qnaire_statistics = None
    else:
        qnaire_statistics = {}
        seq = 0
        for id, title in qnaire_qlist:
            seq += 1
            detail = {}
            detail['questionName'] = title
            option_list = Option_info.query.filter_by(question_id=id). \
                with_entities(Option_info.option_sn, Option_info.option_content). \
                all()
            detail['optionNumber'] = len(option_list)
            submit_content_list = Submit_Content_info.query.filter_by(question_id=id). \
                with_entities(Submit_Content_info.submission_id, Submit_Content_info.result). \
                all()
            for sn, content in option_list:
                option = {}
                option['optionName'] = content
                submission_list = list(filter(lambda x: str(sn + 1) in x[1], submit_content_list))
                submission_id_list = list(map(itemgetter(0), submission_list))
                name_list = Submission_info.query.filter(Submission_info.id.in_(submission_id_list)). \
                    with_entities(Submission_info.submitter_name). \
                    all()
                name_list = list(map(itemgetter(0), name_list))
                option['peopleNumber'] = len(name_list)
                option['people'] = name_list
                detail[f'option_{sn + 1}'] = option

            qnaire_statistics[f'question_{seq}'] = detail

    print("选择题数据统计:", choice_statistics)
    print("问卷题数据统计:", qnaire_statistics)
    return choice_statistics, qnaire_statistics