python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > python聊天室

基于python与streamlit构建简单的内网智能聊天室应用

作者:Atlas Shepherd

这篇文章主要为大家详细介绍了如何基于python与streamlit构建简单的内网智能聊天室应用,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下

这是一个基于Streamlit 构建的内网智能聊天室应用。最后界面效果图如下:

核心功能

1.数据存储配置

SAVE_PATH = Path(r"D:\daku\内网共享")  # 数据保存路径
CHAT_HISTORY_FILE = SAVE_PATH / "chat_history.json"  # 聊天记录
ONLINE_STATUS_FILE = SAVE_PATH / "online_status.json"  # 在线状态
USER_PROFILES_FILE = SAVE_PATH / "user_profiles.json"  # 用户资料
FILES_DIR = SAVE_PATH / "uploaded_files"  # 上传文件目录

2.主要特性

用户系统

聊天功能

文件共享

界面设计

技术实现亮点

数据安全处理

def save_json_file(self, file_path: Path, data):
    # 使用临时文件+备份机制,避免写入中断导致数据损坏
    temp_file → 备份原文件 → 替换新文件

消息格式兼容

def fix_historical_messages(self):
    # 自动修复历史消息格式,确保向后兼容
    # 添加缺失的日期、时间、类型字段

自动清理机制

界面布局结构

侧边栏(Sidebar)

主区域(Main)

核心业务流程

用户登录→ 更新在线状态

发送消息→ 保存到JSON → 显示更新

文件上传→ 保存到本地 → 生成下载链接

自动刷新→ 每分钟更新界面状态

安全与稳定性

输入验证:用户名和消息内容过滤

错误恢复:备份文件自动恢复机制

权限检查:目录写入权限验证

资源管理:文件大小格式化显示

特色功能

完整代码 

import streamlit as st
import os
import json
import time
import datetime
from pathlib import Path
import hashlib
import uuid
import mimetypes
from typing import Dict, List, Optional
import sys
import shutil

# 配置保存路径
SAVE_PATH = Path(r"D:\daku\内网共享")
SAVE_PATH.mkdir(parents=True, exist_ok=True)

# 文件路径配置
CHAT_HISTORY_FILE = SAVE_PATH / "chat_history.json"
ONLINE_STATUS_FILE = SAVE_PATH / "online_status.json"
USER_PROFILES_FILE = SAVE_PATH / "user_profiles.json"
FILES_DIR = SAVE_PATH / "uploaded_files"
FILES_DIR.mkdir(exist_ok=True)


class EnhancedChatApplication:
    def __init__(self):
        self.setup_page_config()
        self.load_data()
        self.inject_custom_css()

    def setup_page_config(self):
        """设置页面配置"""
        st.set_page_config(
            page_title="内网智能聊天室",
            page_icon="🚀",
            layout="wide",
            initial_sidebar_state="expanded",
            menu_items={
                'Get Help': 'https',
                'Report a bug': 'https',
                'About': '# 内网智能聊天应用 v4.0'
            }
        )

    def inject_custom_css(self):
        """注入自定义CSS样式"""
        st.markdown("""
        <style>
        /* 主容器样式 */
        .main-container {
            background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
        }

        /* 聊天消息样式 */
        .user-message {
            background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
            color: white;
            padding: 12px;
            border-radius: 18px 18px 0px 18px;
            margin: 8px 0;
            max-width: 80%;
            margin-left: auto;
        }

        .other-message {
            background: #f0f2f6;
            color: #333;
            padding: 12px;
            border-radius: 18px 18px 18px 0px;
            margin: 8px 0;
            max-width: 80%;
        }

        /* 在线用户样式 */
        .online-user {
            background: #e8f5e8;
            padding: 8px;
            margin: 4px 0;
            border-radius: 8px;
            border-left: 4px solid #4CAF50;
        }

        /* 文件卡片样式 */
        .file-card {
            background: white;
            padding: 12px;
            margin: 8px 0;
            border-radius: 12px;
            border: 1px solid #e0e0e0;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }

        /* 按钮样式 */
        .stButton>button {
            width: 100%;
            border-radius: 8px;
            background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
            color: white;
            border: none;
            padding: 12px;
            font-weight: bold;
        }

        /* 输入框样式 */
        .stTextArea textarea {
            border-radius: 8px;
            border: 2px solid #e0e0e0;
        }

        /* 侧边栏样式 */
        .css-1d391kg {
            background: #f8f9fa;
        }

        /* 消息气泡样式 */
        .message-bubble-own {
            background: linear-gradient(135deg, #667eea, #764ba2);
            color: white;
            padding: 12px 16px;
            border-radius: 18px 18px 0 18px;
            max-width: 70%;
            margin-left: auto;
            margin-bottom: 10px;
        }

        .message-bubble-other {
            background: #f0f2f6;
            color: #333;
            padding: 12px 16px;
            border-radius: 18px 18px 18px 0;
            max-width: 70%;
            margin-bottom: 10px;
        }
        </style>
        """, unsafe_allow_html=True)

    def load_data(self):
        """加载所有数据"""
        self.chat_history = self.load_json_file(CHAT_HISTORY_FILE, [])
        self.online_users = self.load_json_file(ONLINE_STATUS_FILE, {})
        self.user_profiles = self.load_json_file(USER_PROFILES_FILE, {})
        self.user_files = self.load_file_list()

        # 修复历史消息数据格式
        self.fix_historical_messages()

    def fix_historical_messages(self):
        """修复历史消息数据格式,确保所有消息都有必要的字段"""
        fixed_count = 0
        for msg in self.chat_history:
            # 确保消息有必要的字段
            if 'date_str' not in msg:
                if 'timestamp' in msg:
                    msg['date_str'] = datetime.datetime.fromtimestamp(msg['timestamp']).strftime('%Y-%m-%d')
                else:
                    msg['date_str'] = datetime.datetime.now().strftime('%Y-%m-%d')
                fixed_count += 1

            if 'time_str' not in msg:
                if 'timestamp' in msg:
                    msg['time_str'] = datetime.datetime.fromtimestamp(msg['timestamp']).strftime('%H:%M:%S')
                else:
                    msg['time_str'] = datetime.datetime.now().strftime('%H:%M:%S')
                fixed_count += 1

            if 'type' not in msg:
                msg['type'] = 'text'
                fixed_count += 1

        if fixed_count > 0:
            st.info(f"修复了 {fixed_count} 条历史消息的格式")
            self.save_json_file(CHAT_HISTORY_FILE, self.chat_history)

    def load_json_file(self, file_path: Path, default):
        """加载JSON文件"""
        if file_path.exists():
            try:
                with open(file_path, 'r', encoding='utf-8') as f:
                    return json.load(f)
            except Exception as e:
                st.error(f"加载文件失败 {file_path}: {e}")
                # 如果文件损坏,尝试备份
                backup_path = file_path.with_suffix('.bak')
                if backup_path.exists():
                    try:
                        with open(backup_path, 'r', encoding='utf-8') as f:
                            data = json.load(f)
                        st.success("从备份文件恢复数据成功")
                        return data
                    except:
                        pass
        return default

    def save_json_file(self, file_path: Path, data):
        """保存JSON文件 - 修复备份逻辑"""
        try:
            # 创建临时文件
            temp_file = file_path.with_suffix('.tmp')

            # 先写入临时文件
            with open(temp_file, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=2)

            # 如果原文件存在,先备份
            if file_path.exists():
                backup_path = file_path.with_suffix('.bak')
                # 如果备份文件已存在,先删除
                if backup_path.exists():
                    backup_path.unlink()
                # 重命名原文件为备份文件
                file_path.rename(backup_path)

            # 重命名临时文件为目标文件
            temp_file.rename(file_path)

            return True
        except Exception as e:
            st.error(f"保存文件失败 {file_path}: {e}")
            # 清理临时文件
            if 'temp_file' in locals() and temp_file.exists():
                try:
                    temp_file.unlink()
                except:
                    pass
            return False

    def load_file_list(self) -> List[Dict]:
        """加载文件列表"""
        files = []
        for file_path in FILES_DIR.glob("*"):
            if file_path.is_file():
                file_info = {
                    'filename': file_path.name,
                    'size': file_path.stat().st_size,
                    'upload_time': datetime.datetime.fromtimestamp(
                        file_path.stat().st_mtime
                    ).strftime('%Y-%m-%d %H:%M:%S'),
                    'path': file_path,
                    'file_id': file_path.stem.split('_')[0] if '_' in file_path.stem else file_path.stem
                }
                files.append(file_info)
        return sorted(files, key=lambda x: x['upload_time'], reverse=True)

    def update_online_status(self, username: str):
        """更新用户在线状态"""
        current_time = time.time()
        self.online_users[username] = current_time

        # 清理超时用户(10分钟无活动)
        timeout = 600
        current_users = list(self.online_users.keys())
        for user in current_users:
            if current_time - self.online_users[user] > timeout:
                del self.online_users[user]

        self.save_json_file(ONLINE_STATUS_FILE, self.online_users)

    def send_message(self, username: str, message: str, file_data=None, message_type: str = "text"):
        """发送消息"""
        message_id = str(uuid.uuid4())
        timestamp = time.time()

        chat_message = {
            'id': message_id,
            'username': username,
            'message': message,
            'type': message_type,
            'timestamp': timestamp,
            'time_str': datetime.datetime.now().strftime('%H:%M:%S'),
            'date_str': datetime.datetime.now().strftime('%Y-%m-%d'),
            'has_file': file_data is not None
        }

        if file_data is not None:
            # 保存文件(无大小限制)
            filename = file_data.name
            file_ext = Path(filename).suffix
            file_path = FILES_DIR / f"{message_id}_{filename}"

            try:
                with open(file_path, 'wb') as f:
                    f.write(file_data.getvalue())

                chat_message.update({
                    'filename': filename,
                    'file_path': str(file_path),
                    'file_size': file_path.stat().st_size,
                    'file_type': mimetypes.guess_type(filename)[0] or 'application/octet-stream'
                })
                st.success(f"文件 '{filename}' 上传成功!")
            except Exception as e:
                st.error(f"文件上传失败: {e}")
                return False

        self.chat_history.append(chat_message)

        # 限制聊天记录数量,保存最近的2000条
        if len(self.chat_history) > 2000:
            self.chat_history = self.chat_history[-2000:]

        return self.save_json_file(CHAT_HISTORY_FILE, self.chat_history)

    def get_online_users(self) -> List[str]:
        """获取在线用户列表"""
        current_time = time.time()
        online = []
        for user, last_seen in self.online_users.items():
            if current_time - last_seen < 600:  # 10分钟内活跃
                online.append(user)
        return sorted(online)

    def display_login_section(self):
        """显示登录区域"""
        st.sidebar.title("🚀 内网智能聊天室")

        with st.sidebar.expander("👤 用户登录", expanded=True):
            if 'username' not in st.session_state:
                username = st.text_input("用户名:", placeholder="请输入您的用户名", key="username_input")
                if st.button("进入聊天室", type="primary", key="login_btn"):
                    if username and username.strip():
                        st.session_state.username = username.strip()
                        st.rerun()
                    else:
                        st.warning("请输入有效的用户名")
                return False
            return True

    def display_online_users(self):
        """显示在线用户"""
        st.sidebar.subheader("🟢 在线用户")
        online_users = self.get_online_users()
        current_user = st.session_state.username

        for user in online_users:
            is_current = user == current_user
            status_emoji = "🟢" if is_current else "🔵"
            user_display = f"**{user}**" if is_current else user

            with st.sidebar.container():
                col1, col2 = st.sidebar.columns([1, 4])
                with col1:
                    st.write(status_emoji)
                with col2:
                    st.write(user_display)

            if is_current:
                st.sidebar.caption("在线时长: " + self.format_duration(
                    time.time() - self.online_users[user]
                ))

        if not online_users:
            st.sidebar.info("暂无其他在线用户")

    def display_file_section(self):
        """显示文件传输区域"""
        with st.sidebar.expander("📁 文件传输", expanded=True):
            # 文件上传
            uploaded_file = st.file_uploader(
                "选择要发送的文件:",
                type=None,
                key="file_uploader",
                help="支持任意类型和大小的文件"
            )

            if uploaded_file is not None:
                file_size = len(uploaded_file.getvalue())
                st.info(f"文件: {uploaded_file.name} | 大小: {self.format_file_size(file_size)}")

                if st.button("📤 发送文件", key="send_file"):
                    if self.send_message(st.session_state.username,
                                         f"分享了文件: {uploaded_file.name}",
                                         uploaded_file, "file"):
                        st.success("文件发送成功!")
                        st.rerun()

            # 文件列表
            st.subheader("共享文件库")
            self.user_files = self.load_file_list()

            if not self.user_files:
                st.info("暂无共享文件")
            else:
                for file_info in self.user_files[:10]:  # 显示最近10个文件
                    with st.container():
                        col1, col2 = st.columns([3, 1])
                        with col1:
                            st.write(f"**{file_info['filename']}**")
                            st.caption(f"{self.format_file_size(file_info['size'])} | {file_info['upload_time']}")
                        with col2:
                            with open(file_info['path'], 'rb') as f:
                                st.download_button(
                                    label="📥",
                                    data=f,
                                    file_name=file_info['filename'],
                                    key=f"dl_{file_info['file_id']}_{uuid.uuid4()}",
                                    use_container_width=True
                                )

    def display_chat_interface(self):
        """显示主聊天界面"""
        st.header("💬 智能聊天室")

        # 创建标签页布局
        tab1, tab2 = st.tabs(["📝 聊天室", "⚙️ 设置"])

        with tab1:
            self.display_chat_messages()
            self.display_message_input()

        with tab2:
            self.display_settings()

    def display_chat_messages(self):
        """显示聊天消息"""
        chat_container = st.container(height=500)

        with chat_container:
            if not self.chat_history:
                st.info("暂无聊天记录,开始第一个对话吧!")
                return

            current_user = st.session_state.username

            for i, msg in enumerate(self.chat_history[-100:]):  # 显示最近100条消息
                # 安全地获取消息字段,提供默认值
                date_str = msg.get('date_str', '未知日期')
                time_str = msg.get('time_str', '未知时间')
                username = msg.get('username', '未知用户')
                message_content = msg.get('message', '')

                is_current_user = username == current_user

                # 显示日期分隔(安全地比较日期)
                if i > 0:
                    prev_date_str = self.chat_history[i - 1].get('date_str', '未知日期')
                    if date_str != prev_date_str:
                        st.markdown(f"---\n**{date_str}**\n---")

                # 使用更安全的消息显示方式
                col1, col2 = st.columns([1, 20])
                with col2:
                    if is_current_user:
                        # 自己的消息靠右显示
                        message_html = f"""
                        <div style="display: flex; justify-content: flex-end; margin: 10px 0;">
                            <div class="message-bubble-own">
                                <div><strong>{username}</strong></div>
                                <div>{message_content}</div>
                                <div style="font-size: 0.8em; opacity: 0.8; text-align: right;">{time_str}</div>
                            </div>
                        </div>
                        """
                    else:
                        # 他人的消息靠左显示
                        message_html = f"""
                        <div style="display: flex; justify-content: flex-start; margin: 10px 0;">
                            <div class="message-bubble-other">
                                <div><strong>{username}</strong></div>
                                <div>{message_content}</div>
                                <div style="font-size: 0.8em; opacity: 0.8;">{time_str}</div>
                            </div>
                        </div>
                        """

                    st.markdown(message_html, unsafe_allow_html=True)

                    # 文件下载按钮(安全地处理文件信息)
                    if msg.get('has_file'):
                        filename = msg.get('filename', '文件')
                        file_size = msg.get('file_size', 0)
                        file_info = f"{filename} ({self.format_file_size(file_size)})"
                        file_path_str = msg.get('file_path', '')

                        if file_path_str:
                            file_path = Path(file_path_str)
                            if file_path.exists():
                                with open(file_path, 'rb') as f:
                                    st.download_button(
                                        label=f"📎 下载附件: {file_info}",
                                        data=f,
                                        file_name=filename,
                                        key=f"file_{msg.get('id', str(i))}_{uuid.uuid4()}",
                                        use_container_width=True
                                    )

    def display_message_input(self):
        """显示消息输入区域"""
        col1, col2 = st.columns([5, 1])

        with col1:
            message = st.text_area(
                "输入消息:",
                placeholder="输入您要发送的消息...",
                height=100,
                key="message_input",
                label_visibility="collapsed"
            )

        with col2:
            send_col1, send_col2 = st.columns(2)
            with send_col1:
                if st.button("🚀 发送", use_container_width=True, key="send_msg_btn"):
                    if message and message.strip():
                        if self.send_message(st.session_state.username, message.strip()):
                            # 清空输入框
                            st.session_state.message_input = ""
                            st.rerun()
                    else:
                        st.warning("请输入消息内容")

            with send_col2:
                if st.button("🔄 刷新", use_container_width=True, key="refresh_btn"):
                    st.rerun()

    def display_settings(self):
        """显示设置页面"""
        st.subheader("应用设置")

        col1, col2 = st.columns(2)

        with col1:
            st.metric("在线用户", len(self.get_online_users()))
            st.metric("总消息数", len(self.chat_history))
            st.metric("共享文件", len(self.user_files))

        with col2:
            if st.button("清空聊天记录", type="secondary", key="clear_chat_btn"):
                if st.checkbox("确认清空(此操作不可逆)", key="confirm_clear"):
                    if st.button("确认清空", type="primary", key="confirm_clear_btn"):
                        self.chat_history = []
                        self.save_json_file(CHAT_HISTORY_FILE, [])
                        st.success("聊天记录已清空")
                        st.rerun()

            if st.button("导出聊天记录", key="export_chat_btn"):
                self.export_chat_history()

            if st.button("清理备份文件", key="cleanup_backups_btn"):
                self.cleanup_backup_files()

        # 系统信息
        st.subheader("系统信息")
        st.info(f"数据保存路径: {SAVE_PATH}")
        st.info(f"最后更新: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

        # 调试信息
        with st.expander("调试信息"):
            st.json({
                "chat_history_count": len(self.chat_history),
                "online_users_count": len(self.online_users),
                "files_count": len(self.user_files)
            })

    def export_chat_history(self):
        """导出聊天记录"""
        export_data = {
            "export_time": datetime.datetime.now().isoformat(),
            "total_messages": len(self.chat_history),
            "online_users": self.get_online_users(),
            "chat_history": self.chat_history
        }

        export_file = SAVE_PATH / f"chat_export_{int(time.time())}.json"
        if self.save_json_file(export_file, export_data):
            st.success(f"聊天记录已导出到: {export_file}")

    def cleanup_backup_files(self):
        """清理备份文件"""
        backup_files = list(SAVE_PATH.glob("*.bak"))
        cleaned_count = 0

        for backup_file in backup_files:
            try:
                backup_file.unlink()
                cleaned_count += 1
            except Exception as e:
                st.error(f"删除备份文件失败 {backup_file}: {e}")

        if cleaned_count > 0:
            st.success(f"已清理 {cleaned_count} 个备份文件")
        else:
            st.info("没有找到可清理的备份文件")

    def format_file_size(self, size_bytes: int) -> str:
        """格式化文件大小显示"""
        if size_bytes == 0:
            return "0 B"
        size_names = ["B", "KB", "MB", "GB", "TB"]
        i = 0
        while size_bytes >= 1024 and i < len(size_names) - 1:
            size_bytes /= 1024.0
            i += 1
        return f"{size_bytes:.1f} {size_names[i]}"

    def format_duration(self, seconds: float) -> str:
        """格式化时间间隔"""
        if seconds < 60:
            return f"{int(seconds)}秒"
        elif seconds < 3600:
            return f"{int(seconds / 60)}分钟"
        else:
            hours = int(seconds / 3600)
            minutes = int((seconds % 3600) / 60)
            return f"{hours}小时{minutes}分钟"

    def run(self):
        """运行应用"""
        # 自动刷新逻辑
        if 'last_refresh' not in st.session_state:
            st.session_state.last_refresh = time.time()

        # 每分钟自动刷新一次
        if time.time() - st.session_state.last_refresh > 60:
            st.session_state.last_refresh = time.time()
            st.rerun()

        # 主界面逻辑
        if not self.display_login_section():
            return

        username = st.session_state.username
        self.update_online_status(username)

        # 主布局
        self.display_online_users()
        self.display_file_section()
        self.display_chat_interface()


def main():
    """主函数"""
    # 检查保存目录权限
    if not SAVE_PATH.exists():
        try:
            SAVE_PATH.mkdir(parents=True, exist_ok=True)
            st.success(f"创建保存目录: {SAVE_PATH}")
        except Exception as e:
            st.error(f"创建目录失败: {e}")
            return

    # 检查写入权限
    try:
        test_file = SAVE_PATH / "test_write.txt"
        with open(test_file, 'w') as f:
            f.write("test")
        test_file.unlink()
    except PermissionError:
        st.error(f"没有写入权限: {SAVE_PATH}")
        st.info("请检查目录权限或更换保存路径")
        return

    # 运行应用
    app = EnhancedChatApplication()
    app.run()


if __name__ == "__main__":
    main()

到此这篇关于基于python与streamlit构建简单的内网聊天应用的文章就介绍到这了,更多相关python streamlit内网聊天内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文