1.wxpython GUI编程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | # -*- coding: UTF-8 -*- from socket import * import time import threading import wx import MySQLdb import xlwt from clientthread import ClientThread class Server(wx.Frame): def __init__( self ,parent = None , id = - 1 ,title = '服务器' ,pos = wx.DefaultPosition,size = ( 500 , 300 )): '''窗口''' wx.Frame.__init__( self ,parent, id ,title,pos,size = ( 400 , 470 )) pl = wx.Panel( self ) con = wx.BoxSizer(wx.VERTICAL) subcon = wx.FlexGridSizer(wx.HORIZONTAL) sta = wx.Button(pl , size = ( 133 , 40 ),label = '启动服务器' ) end = wx.Button(pl, size = ( 133 , 40 ), label = '关闭服务器' ) hist = wx.Button(pl,size = ( 133 , 40 ),label = '导出聊天记录' ) subcon.Add(sta, 1 , wx.BOTTOM) subcon.Add(hist, 1 , wx.BOTTOM) subcon.Add(end, 1 , wx.BOTTOM) con.Add(subcon, 1 ,wx.ALIGN_CENTRE|wx.BOTTOM) self .Text = wx.TextCtrl(pl, size = ( 400 , 250 ),style = wx.TE_MULTILINE|wx.TE_READONLY) con.Add( self .Text, 1 , wx.ALIGN_CENTRE) self .ttex = wx.TextCtrl(pl, size = ( 400 , 100 ),style = wx.TE_MULTILINE) con.Add( self .ttex, 1 , wx.ALIGN_CENTRE) sub2 = wx.FlexGridSizer(wx.HORIZONTAL) clear = wx.Button(pl, size = ( 200 , 40 ), label = '清空' ) send = wx.Button(pl, size = ( 200 , 40 ), label = '发送' ) sub2.Add(clear, 1 , wx.TOP | wx.LEFT) sub2.Add(send, 1 , wx.TOP | wx.RIGHT) con.Add(sub2, 1 , wx.ALIGN_CENTRE) pl.SetSizer(con) '''窗口''' '''绑定''' self .Bind(wx.EVT_BUTTON, self .EditClear, clear) self .Bind(wx.EVT_BUTTON, self .SendMessage, send) self .Bind(wx.EVT_BUTTON, self .Start, sta) self .Bind(wx.EVT_BUTTON, self .Break, end) self .Bind(wx.EVT_BUTTON, self .WriteToExcel, hist) '''绑定''' '''服务器准备工作''' self .UserThreadList = [] self .onServe = False addr = ('', 21567 ) self .ServeSock = socket(AF_INET, SOCK_STREAM) self .ServeSock.bind(addr) self .ServeSock.listen( 10 ) '''服务器准备工作''' '''数据库准备工作,用于存储聊天记录''' self .db = MySQLdb.connect( 'localhost' , 'root' , '123456' , 'user_info' ) self .cursor = self .db.cursor() self .cursor.execute( "select * from history order by time" ) self .Text.SetValue('') for data in self .cursor.fetchall(): #加载历史聊天记录 self .Text.AppendText( '%s said:\n%s\nwhen %s\n\n' % (data[ 0 ], data[ 2 ], data[ 1 ])) '''数据库准备工作,用于存储聊天记录''' #将聊天记录导出到EXCEl表中 def WriteToExcel( self ,event): wbk = xlwt.Workbook() sheet = wbk.add_sheet( 'sheet 1' ) self .cursor.execute( "select * from history order by time" ) sheet.write( 0 , 0 , "User" ) sheet.write( 0 , 1 , "Datetime" ) sheet.write( 0 , 5 , "Message" ) index = 0 for data in self .cursor.fetchall(): index = index + 1 Time = '%s' % data[ 1 ] #将datetime转成字符形式,否则直接写入Excel会变成时间戳 sheet.write(index, 0 ,data[ 0 ]) sheet.write(index, 1 ,Time) #写进EXCEL会变成时间戳 sheet.write(index, 5 ,data[ 2 ]) 'D:\History_Dialog.xls' ) #启动服务器的服务线程 def Start( self ,event): if not self .onServe: '''启动服务线程''' self .onServe = True mainThread = threading.Thread(target = self .on_serving, args = ()) mainThread.setDaemon( True ) # 解决父线程结束,子线程还继续运行的问题 mainThread.start() '''启动服务线程''' #关闭服务器 def Break( self ,event): self .onServe = False #服务器主循环 def on_serving( self ): print '...On serving...' while self .onServe: UserSocket, UserAddr = self .ServeSock.accept() username = UserSocket.recv( 1024 ).decode(encoding = 'utf-8' ) #接收用户名 userthread = ClientThread(UserSocket, username, self ) self .UserThreadList.append(userthread) #将用户线程加到队列中 userthread.start() self .ServeSock.close() #绑定发送按钮 def SendMessage( self ,event): if self .onServe and cmp ( self .ttex.GetValue(),''): data = self .ttex.GetValue() self .AddText( 'Server' ,data,time.strftime( "%Y-%m-%d %H:%M:%S" , time.localtime())) self .ttex.SetValue('') # 向所有客户端(包括自己)发送信息,同时更新到数据库 def AddText( self , source, data,Time): self .cursor.execute( "insert into history values(\"%s\",\"%s\",\"%s\")" % (source,Time,data)) #双引号里面有双引号,bug:句子不能有双引号、以及中文 self .db.commit() sendData = '%s said:\n%s\nwhen %s\n' % (source,data,Time) self .Text.AppendText( '%s\n' % sendData) for user in self .UserThreadList: #bug:客户端关闭了仍然在队列中。如果客户端关闭了,那怎么在服务器判断是否已经关闭了?客户端在关闭之前发一条信息给服务器? user.UserSocket.send(sendData.encode(encoding = 'utf-8' )) #绑定清空按钮 def EditClear( self ,event): self .ttex.Clear() def main(): app = wx.App( False ) Server().Show() app.MainLoop() if __name__ = = '__main__' : main() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | # -*- coding: UTF-8 -*- import threading import time class ClientThread(threading.Thread): def __init__( self ,UserSocket, Username,server): threading.Thread.__init__( self ) self .UserSocket = UserSocket self .Username = Username self .server = server self .Loadhist() # 加载历史聊天记录 def Loadhist( self ): self .server.cursor.execute( "select * from history order by time" ) for data in self .server.cursor.fetchall(): time.sleep( 0.6 ) #几条信息同时发,会造成末尾回车键的丢失,所以要有时间间隔 sendData = '%s said:\n%s\nwhen %s\n' % (data[ 0 ], data[ 2 ], data[ 1 ]) self .UserSocket.send(sendData.encode(encoding = 'utf-8' )) #方法重写,线程的入口 def run( self ): size = 1024 while True : data = self .UserSocket.recv(size) #未解决:客户端断开连接后这里会报错 self .server.AddText( self .Username,data.decode(encoding = 'utf-8' ),time.strftime( "%Y-%m-%d %H:%M:%S" , time.localtime())) self .UserSocket.close() #这里都执行不到 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 | # -*- coding: UTF-8 -*- from socket import * import wx import MySQLdb from client import Client class LogFrame(wx.Frame): def __init__( self ,parent = None , id = - 1 ,title = '登录窗口' ,pos = wx.DefaultPosition,size = ( 500 , 300 )): '''窗口''' wx.Frame.__init__( self ,parent, id ,title,pos,size = ( 400 , 280 )) self .pl = wx.Panel( self ) con = wx.BoxSizer(wx.VERTICAL) subcon = wx.FlexGridSizer( 2 , 2 , 10 , 10 ) username = wx.StaticText( self .pl, label = "Username:" ,style = wx.ALIGN_LEFT) password = wx.StaticText( self .pl, label = "Password:" ,style = wx.ALIGN_LEFT) self .tc1 = wx.TextCtrl( self .pl,size = ( 180 , 20 )) self .tc2 = wx.TextCtrl( self .pl,size = ( 180 , 20 ),style = wx.TE_PASSWORD) subcon.Add(username,wx.TE_LEFT) subcon.Add( self .tc1, 1 ,wx.EXPAND) subcon.Add(password) subcon.Add( self .tc2, 1 ,wx.EXPAND) con.Add(subcon, 1 ,wx.ALIGN_CENTER) subcon2 = wx.FlexGridSizer( 1 , 2 , 10 , 10 ) register = wx.Button( self .pl,label = 'Register' ) login = wx.Button( self .pl,label = 'Login' ) subcon2.Add(register, 1 , wx.TOP) subcon2.Add(login, 1 , wx.TOP) con.Add(subcon2, 1 ,wx.ALIGN_CENTRE) self .pl.SetSizer(con) self .Bind(wx.EVT_BUTTON, self .Register,register) self .Bind(wx.EVT_BUTTON, self .Login,login) '''窗口''' self .isConnected = False self .userSocket = None #连接到服务器 def ConnectToServer( self ): if not self .isConnected: ADDR = ( 'localhost' , 21567 ) self .userSocket = socket(AF_INET, SOCK_STREAM) try : self .userSocket.connect(ADDR) self .userSocket.send( self .tc1.GetValue().encode(encoding = 'utf-8' )) self .isConnected = True return True except Exception: return False else : return True #登录 def Login( self ,event): if not self .ConnectToServer(): err = wx.MessageDialog( None , '服务器未启动' , 'ERROR!' , wx.OK) err.ShowModal() err.Destroy() else : username = self .tc1.GetValue() password = self .tc2.GetValue() db = MySQLdb.connect( 'localhost' , 'root' , '123456' , 'user_info' ) cursor = db.cursor() cursor.execute( "select * from user_list where username='%s' and password='%s'" % (username,password)) if not cursor.fetchone(): err = wx.MessageDialog( None , '用户不存在或密码错误' , 'ERROR!' ,wx.OK) err.ShowModal() else : self .Close() Client(opSock = self .userSocket, username = username).Show() db.commit() db.close() #注册 def Register( self ,event): if not self .ConnectToServer(): err = wx.MessageDialog( None , '服务器未启动' , 'ERROR!' , wx.OK) err.ShowModal() err.Destroy() else : username = self .tc1.GetValue() password = self .tc2.GetValue() db = MySQLdb.connect( 'localhost' , 'root' , '123456' , 'user_info' ) cursor = db.cursor() cursor.execute( "select * from user_list where username='%s'" % username) if not cursor.fetchone(): cursor.execute( "insert into user_list(username,password) values('%s','%s')" % (username,password)) else : err = wx.MessageDialog( None , '用户已存在' , 'ERROR!' , wx.OK) err.ShowModal() db.commit() db.close() def main(): app = wx.App( False ) LogFrame().Show() app.MainLoop() if __name__ = = '__main__' : main() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | #/usr/bin/env python # -*- coding: UTF-8 -*- import wx import threading from time import ctime class Client(wx.Frame): def __init__( self ,opSock,username,parent = None , id = - 1 ,title = '客户端' ,pos = wx.DefaultPosition,size = ( 500 , 300 )): '''窗口''' wx.Frame.__init__( self ,parent, id ,title,pos,size = ( 400 , 470 )) self .opSock = opSock self .username = username pl = wx.Panel( self ) con = wx.BoxSizer(wx.VERTICAL) subcon = wx.FlexGridSizer(wx.HORIZONTAL) sta = wx.Button(pl, size = ( 200 , 40 ),label = '连接' ) end = wx.Button(pl, size = ( 200 , 40 ),label = '断开' ) subcon.Add(sta, 1 , wx.TOP|wx.LEFT) subcon.Add(end, 1 , wx.TOP|wx.RIGHT) con.Add(subcon, 1 ,wx.ALIGN_CENTRE) self .Text = wx.TextCtrl(pl, size = ( 400 , 250 ),style = wx.TE_MULTILINE|wx.TE_READONLY) con.Add( self .Text, 1 , wx.ALIGN_CENTRE) self .ttex = wx.TextCtrl(pl, size = ( 400 , 100 ),style = wx.TE_MULTILINE) con.Add( self .ttex, 1 , wx.ALIGN_CENTRE) sub2 = wx.FlexGridSizer(wx.HORIZONTAL) clear = wx.Button(pl, size = ( 200 , 40 ), label = '清空' ) send = wx.Button(pl, size = ( 200 , 40 ), label = '发送' ) sub2.Add(clear, 1 , wx.TOP | wx.LEFT) sub2.Add(send, 1 , wx.TOP | wx.RIGHT) con.Add(sub2, 1 , wx.ALIGN_CENTRE) pl.SetSizer(con) '''窗口''' '''绑定''' self .Bind(wx.EVT_BUTTON, self .EditClear, clear) self .Bind(wx.EVT_BUTTON, self .Send, send) self .Bind(wx.EVT_BUTTON, self .Login, sta) self .Bind(wx.EVT_BUTTON, self .Logout, end) '''绑定''' self .isConnected = False #登录 def Login( self ,event): '''客户端准备工作''' self .isConnected = True t = threading.Thread(target = self .Receive, args = ()) t.setDaemon( True ) t.start() '''客户端准备工作''' #退出 def Logout( self ,event): self .isConnected = False #绑定发送按钮 def Send( self ,event): if self .isConnected and cmp ( self .ttex.GetValue(),''): self .opSock.send( self .ttex.GetValue().encode(encoding = 'utf-8' )) self .ttex.SetValue('') #绑定清空按钮 def EditClear( self ,event): self .ttex.Clear() #接收客户端的信息(独立一个线程) def Receive( self ): while self .isConnected: data = self .opSock.recv( 1024 ).decode(encoding = 'utf-8' ) self .Text.AppendText( '%s\n' % data) |
更多关于python聊天功能的精彩文章请点击专题: python聊天功能汇总
微信公众号搜索 “ 脚本之家 ” ,选择关注
如若内容造成侵权/违法违规/事实不符,请将相关资料发送至 进行投诉反馈,一经查实,立即处理!
深度解析Django REST Framework 批量操作
这篇文章主要介绍了深度解析Django REST Framework批量操作,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2021-05-05