python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > python tkinter、socket库开发tcp的客户端和服务端

python的tkinter、socket库开发tcp的客户端和服务端详解

作者:_BooI

本文介绍了TCP通讯流程和开发步骤,包括客户端和服务端的实现,客户端使用Python的tkinter库实现图形化界面,服务端使用socket库监听连接并处理消息,文章还提供了客户端和服务端的代码示例

一、tcp通讯流程和开发步骤

1、tcp客户端和服务端通讯流程图

套接字是通讯的利器,连接时要经过三次握手建立连接,断开连接要经过四次挥手断开连接。

2、客户端开发流程

3、服务端开发流程

二、客户端的实现

使用python的tkinter库实现图形化界面,使用类来实现tcp的客户端和服务端

1、构造方法__init__初始化图形界面

设置了三个按钮

2、连接服务器的函数

参数1:ipv4协议,参数2:表示tcp协议

3、发送消息函数

参数1:target=要执行的函数,参数2:守护主线程。

4、接收消息函数

5、断开连接函数

三、服务端的实现

1、构造方法__init__初始化图形界面

设置了四个按钮:

2、启动服务端函数

3、接收连接函数

4、发送消息函数

5、接收消息函数

四、效果图

使用说明:

五、代码

1、客户端

import socket
import tkinter as tk
import tkinter.messagebox
from threading import Thread


# 创建tcp客户端
class TcpClientSocket:
    def __init__(self):
        self.tcp_client_socket = None
        self.root = tk.Tk()
        self.root.title('tcp客户端')
        self.root.geometry('500x350')

        # 连接
        self.con_btn = tk.Button(self.root, text="连接服务端", width=10, command=self.connect)
        self.con_btn.place(x=100, y=20)

        # 断开连接
        self.discon_btn = tk.Button(self.root, text="断开连接", width=10, command=self.disconnect)
        self.discon_btn.place(x=300, y=20)

        # 文本输入框
        self.send_entry = tk.Entry(self.root, width=50)
        self.send_entry.place(x=50, y=80)
        # 发送消息按钮
        self.send_btn = tk.Button(self.root, text="发送消息", command=self.send)
        self.send_btn.place(x=420, y=75)

        # 消息框
        self.msg_test = tk.Text(self.root, width=70, height=16)
        self.msg_test.place(x=2, y=125)

        self.root.mainloop()

    # 连接服务端
    def connect(self):
        try:
            if self.tcp_client_socket:
                tk.messagebox.showinfo('提示', '已是连接状态')
            # 每次连接服务端就将上一次的记录清除
            self.msg_test.delete(1.0, tk.END)
            self.send_entry.delete(0, tk.END)
            # 创建客户端socket
            self.tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            # 连接,端口可以改变
            self.tcp_client_socket.connect(('127.0.0.1', 8888))
            self.msg_test.insert(tk.END, "连接到服务端, 请输入信息...\n")
        except Exception as e:
            self.msg_test.insert(tk.END, f"连接失败的原因是: {e}")

    # 发送消息
    def send(self):
        # 从输入框获取数据
        input_data = self.send_entry.get()
        self.tcp_client_socket.send(input_data.encode(encoding='utf-8'))
        self.msg_test.insert(tk.END, f"客户端:{str(input_data)}\n")
        # 使用线程,防止未响应
        receive_thread = Thread(target=self.receive, daemon=True)
        receive_thread.start()

    def receive(self):
        if self.tcp_client_socket:
            rec_msg = self.tcp_client_socket.recv(1024).decode(encoding='utf-8')
            self.msg_test.insert(tk.END, f"服务端:{str(rec_msg)}\n")

    # 断开连接
    def disconnect(self):
        if self.tcp_client_socket:
            self.send_entry.delete(0, tk.END)
            self.msg_test.insert(tk.END, "断开连接")
            self.tcp_client_socket.send('#'.encode(encoding='utf-8'))  # 发送'#‘断开连接
            self.tcp_client_socket.close()  # 关闭客户端
        else:
            tk.messagebox.showinfo('提示', '未连接到服务端')


if __name__ == '__main__':
    TcpClientSocket()

2、服务端

import socket

import tkinter as tk
from threading import Thread
import tkinter.messagebox


class TcpServerSocket:
    def __init__(self):
        self.conn_socket = None
        self.tcp_server_socket = None
        self.root = tk.Tk()
        self.root.title('tcp服务端')
        self.root.geometry('500x350')

        # 连接
        self.con_btn = tk.Button(self.root, text="启动服务端", width=10, command=self.start_server)
        self.con_btn.place(x=100, y=20)

        # 接收消息
        self.status_btn = tk.Button(self.root, text="接收连接", width=10, command=self.status)
        self.status_btn.place(x=200, y=20)
        # 接收消息
        self.rev_btn = tk.Button(self.root, text="接收消息", command=self.receive)
        self.rev_btn.place(x=300, y=20)
        # 文本输入框
        self.send_entry = tk.Entry(self.root, width=50)
        self.send_entry.place(x=50, y=80)
        # 发送消息按钮
        self.send_btn = tk.Button(self.root, text="发送消息", command=self.send)
        self.send_btn.place(x=420, y=75)
        # 消息框
        self.msg_test = tk.Text(self.root, width=70, height=16)
        self.msg_test.place(x=2, y=125)
        self.root.mainloop()

    # 启动tcp服务端
    def start_server(self):
        try:
            # 每次连接服务端就将上一次的记录清除
            self.send_entry.delete(0, tk.END)
            self.msg_test.delete(1.0, tk.END)

            # 创建服务端socket
            self.tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.tcp_server_socket.bind(('', 8888))  # 绑定
            self.tcp_server_socket.listen(5)  # 监听的最大连接数
            self.msg_test.insert(tk.END, f"服务端启动成功!\n")
        except Exception as e:
            self.msg_test.insert(tk.END, f"启动失败的原因是: {e}")

    def status(self):
        if self.conn_socket is None:
            self.conn_socket, ip_port = self.tcp_server_socket.accept()
        elif self.conn_socket:
            self.msg_test.insert(tk.END, "客户端已连接\n")
        else:
            tk.messagebox.showinfo('提示', "客户端未连接")

    def send(self):
        try:
            input_data = self.send_entry.get()
            self.conn_socket.send(input_data.encode(encoding='utf-8'))
            self.msg_test.insert(tk.END, f"服务端:{str(input_data)}\n")
        except Exception as e:
            self.msg_test.insert(tk.END, f"发送失败的原因: {e}")

    def receive(self):
        rec_msg = self.conn_socket.recv(1024).decode(encoding='utf-8')
        print(rec_msg)
        # 获取到#断开连接
        if rec_msg == '#':
            self.conn_socket.close()
            self.tcp_server_socket.close()
            self.msg_test.insert(tk.END, f"客户端断开连接\n")
            return
        self.msg_test.insert(tk.END, f"客户端:{str(rec_msg)}\n")


if __name__ == '__main__':
    TcpServerSocket()

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文