Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > Golang实现聊天室

Golang使用gin框架实现一个完整的聊天室功能

作者:paterl

由于我们项目的需要,我就研究了一下关于websocket的相关内容,去实现一个聊天室的功能,经过几天的探索,现在使用Gin框架实现了一个完整的聊天室+消息实时通知系统,感兴趣的小伙伴欢迎阅读本文

用到的技术

websocket、gin、mysql、redis、协程、通道

实现思路

说到聊天室可以有多种方法实现,例如:使用单纯的MySQL也可以实现,但是为什么要选择使用websocket去实现呢?有什么优势呢?

websocket是基于TCP/IP,独立的HTTP协议的双向通信协议,这就使实时的消息通知成为可能, 同时又符合Go高效处理高并发的语言特点,结合聊天室又是高并发的,所以采取的室websocket进行消息的转接,MySQL持久化聊天消息,redis用于做一些判断。

首先用户在进入App时,客户端和服务端建立一个websocket连接,并开启一个通道。

当服务端收到客户端的消息后,将消息写入通道里,服务端监听通道的消息,并将消息取出,使用接收人的websocket连接将消息广播到接收人那里。

实现代码

下面开始实现:

创建模型,用于关系的确立及数据的传输

//数据库存储消息结构体,用于持久化历史记录
type ChatMessage struct {
	gorm.Model
	Direction   string //这条消息是从谁发给谁的
	SendID      int    //发送者id
	RecipientID int    //接受者id
	GroupID     string //群id,该消息要发到哪个群里面去
	Content     string //内容
	Read        bool   //是否读了这条消息
}
//群聊结构体
type Group struct {
	ID           string ` gorm:"primaryKey"` //群id
	CreatedAt    time.Time
	UpdatedAt    time.Time
	DeletedAt    gorm.DeletedAt `gorm:"index"`
	GroupName    string         `json:"group_name"`    //群名
	GroupContent string         `json:"group_content"` //群签名
	GroupIcon    string         `json:"group_icon"`    //群头像
	GroupNum     int            //群人数
	GroupOwnerId int            //群主id
	Users        []User         `gorm:"many2many:users_groups;"` //群成员
}
type UsersGroup struct {
	GroupId string `json:"group_id"`
	UserId  int    `json:"user_id"`
}
// 用于处理请求后返回一些数据
type ReplyMsg struct {
	From    string `json:"from"`
	Code    int    `json:"code"`
	Content string `json:"content"`
}
// 发送消息的类型
type SendMsg struct {
	Type        int    `json:"type"`
	RecipientID int    `json:"recipient_id"` //接受者id
	Content     string `json:"content"`
}
// 用户类
type Client struct {
	ID          string          //消息的去向
	RecipientID int             //接受者id
	SendID      int             //发送人的id
	GroupID     string          //群聊id
	Socket      *websocket.Conn //websocket连接对象
	Send        chan []byte     //发送消息用的管道
}
// 广播类,包括广播内容和源用户
type Broadcast struct {
	Client  *Client
	Message []byte
	Type    int
}
// 用户管理,用于管理用户的连接及断开连接
type ClientManager struct {
	Clients    map[string]*Client
	Broadcast  chan *Broadcast
	Reply      chan *Client
	Register   chan *Client
	Unregister chan *Client
}
//创建一个用户管理对象
var Manager = ClientManager{
	Clients:    make(map[string]*Client), // 参与连接的用户,出于性能的考虑,需要设置最大连接数
	Broadcast:  make(chan *Broadcast),
	Register:   make(chan *Client), //新建立的连接访放入这里面
	Reply:      make(chan *Client),
	Unregister: make(chan *Client), //新断开的连接放入这里面
}

创建连接

func WsHandle(c *gin.Context) {
	myid := c.Query("myid")
	userid, err := strconv.Atoi(myid)
	if err != nil {
		zap.L().Error("转换失败", zap.Error(err))
		ResponseError(c, CodeParamError)
	}
	//将http协议升级为ws协议
	conn, err := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
		return true
	}}).Upgrade(c.Writer, c.Request, nil)
	if err != nil {
		http.NotFound(c.Writer, c.Request)
		return
	}
	//创建一个用户客户端实例,用于记录该用户的连接信息
	client := new(model.Client)
	client = &model.Client{
		ID:     myid + "->",
		SendID: userid,
		Socket: conn,
		Send:   make(chan []byte),
	}
	//使用管道将实例注册到用户管理上
	model.Manager.Register <- client
	//开启两个协程用于读写消息
	go Read(client)
	go Write(client)
}
//用于读管道中的数据
func Read(c *model.Client) {
	//结束把通道关闭
	defer func() {
		model.Manager.Unregister <- c
		//关闭连接
		_ = c.Socket.Close()
	}()
	for {
		//先测试一下连接能不能连上
		c.Socket.PongHandler()
		sendMsg := new(model.SendMsg)
		err := c.Socket.ReadJSON(sendMsg)
		c.RecipientID = sendMsg.RecipientID
		if err != nil {
			zap.L().Error("数据格式不正确", zap.Error(err))
			model.Manager.Unregister <- c
			_ = c.Socket.Close()
			return
		}
		//根据要发送的消息类型去判断怎么处理
		//消息类型的后端调度
		switch sendMsg.Type {
		case 1: //私信
			SingleChat(c, sendMsg)
		case 2: //获取未读消息
			UnreadMessages(c)
		case 3: //拉取历史消息记录
			HistoryMsg(c, sendMsg)
		case 4: //群聊消息广播
			GroupChat(c, sendMsg)
		}
	}
}
//用于将数据写进管道中
func Write(c *model.Client) {
	defer func() {
		_ = c.Socket.Close()
	}()
	for {
		select {
		//读取管道里面的信息
		case message, ok := <-c.Send:
			//连接不到就返回消息
			if !ok {
				_ = c.Socket.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}
			fmt.Println(c.ID+"接收消息:", string(message))
			replyMsg := model.ReplyMsg{
				Code:    int(CodeConnectionSuccess),
				Content: fmt.Sprintf("%s", string(message)),
			}
			msg, _ := json.Marshal(replyMsg)
			//将接收的消息发送到对应的websocket连接里
			rwLocker.Lock()
			_ = c.Socket.WriteMessage(websocket.TextMessage, msg)
			rwLocker.Unlock()
		}
	}
}

后端调度

//聊天的后端调度逻辑
//单聊
func SingleChat(c *model.Client, sendMsg *model.SendMsg) {
	//获取当前用户发出到固定用户的消息
	r1, _ := redis.REDIS.Get(context.Background(), c.ID).Result()
	//从redis中取出固定用户发给当前用户的消息
	id := CreateId(strconv.Itoa(c.RecipientID), strconv.Itoa(c.SendID))
	r2, _ := redis.REDIS.Get(context.Background(), id).Result()
	//根据redis的结果去做未关注聊天次数限制
	if r2 >= "3" && r1 == "" {
		ResponseWebSocket(c.Socket, CodeLimiteTimes, "未相互关注,限制聊天次数")
		return
	} else {
		//将消息写入redis
		redis.REDIS.Incr(context.Background(), c.ID)
		//设置消息的过期时间
		_, _ = redis.REDIS.Expire(context.Background(), c.ID, time.Hour*24*30*3).Result()
	}
	fmt.Println(c.ID+"发送消息:", sendMsg.Content)
	//将消息广播出去
	model.Manager.Broadcast <- &model.Broadcast{
		Client:  c,
		Message: []byte(sendMsg.Content),
	}
}
//查看未读消息
func UnreadMessages(c *model.Client) {
	//获取数据库中的未读消息
	msgs, err := mysql.GetMessageUnread(c.SendID)
	if err != nil {
		ResponseWebSocket(c.Socket, CodeServerBusy, "服务繁忙")
	}
	for i, msg := range msgs {
		replyMsg := model.ReplyMsg{
			From:    msg.Direction,
			Content: msg.Content,
		}
		message, _ := json.Marshal(replyMsg)
		_ = c.Socket.WriteMessage(websocket.TextMessage, message)
		//发送完后将消息设为已读
		msgs[i].Read = true
		err := mysql.UpdateMessage(&msgs[i])
		if err != nil {
			ResponseWebSocket(c.Socket, CodeServerBusy, "服务繁忙")
		}
	}
}
//拉取历史消息记录
func HistoryMsg(c *model.Client, sendMsg *model.SendMsg) {
	//拿到传过来的时间
	timeT := TimeStringToGoTime(sendMsg.Content)
	//查找聊天记录
	//做一个分页处理,一次查询十条数据,根据时间去限制次数
	//别人发给当前用户的
	direction := CreateId(strconv.Itoa(c.RecipientID), strconv.Itoa(c.SendID))
	//当前用户发出的
	id := CreateId(strconv.Itoa(c.SendID), strconv.Itoa(c.RecipientID))
	msgs, err := mysql.GetHistoryMsg(direction, id, timeT, 10)
	if err != nil {
		ResponseWebSocket(c.Socket, CodeServerBusy, "服务繁忙")
	}
	//把消息写给用户
	for _, msg := range *msgs {
		replyMsg := model.ReplyMsg{
			From:    msg.Direction,
			Content: msg.Content,
		}
		message, _ := json.Marshal(replyMsg)
		_ = c.Socket.WriteMessage(websocket.TextMessage, message)
		//发送完后将消息设为已读
		if err != nil {
			ResponseWebSocket(c.Socket, CodeServerBusy, "服务繁忙")
		}
	}
}
//群聊消息广播
func GroupChat(c *model.Client, sendMsg *model.SendMsg) {
	//根据消息类型判断是否为群聊消息
	//先去数据库查询该群下的所有用户
	users, err := mysql.GetAllGroupUser(strconv.Itoa(sendMsg.RecipientID))
	if err != nil {
		ResponseWebSocket(c.Socket, CodeServerBusy, "服务繁忙")
	}
	//向群里面的用户广播消息
	for _, user := range users {
		//获取群里每个用户的连接
		if int(user.ID) == c.SendID {
			continue
		}
		c.ID = strconv.Itoa(c.SendID) + "->"
		c.GroupID = strconv.Itoa(sendMsg.RecipientID)
		c.RecipientID = int(user.ID)
		model.Manager.Broadcast <- &model.Broadcast{
			Client:  c,
			Message: []byte(sendMsg.Content),
		}
	}
}

转发消息

//用于在启动时进行监听
func Start(manager *model.ClientManager) {
	for {
		fmt.Println("<-----监听通信管道----->")
		select {
		//监测model.Manager.Register这个的变化,有新的东西加入管道时会被监听到,从而建立连接
		case conn := <-model.Manager.Register:
			fmt.Println("建立新连接:", conn.ID)
			//将新建立的连接加入到用户管理的map中,用于记录连接对象,以连接人的id为键,以连接对象为值
			model.Manager.Clients[conn.ID] = conn
			//返回成功信息
			controller.ResponseWebSocket(conn.Socket, controller.CodeConnectionSuccess, "已连接至服务器")
		//断开连接,监测到变化,有用户断开连接
		case conn := <-model.Manager.Unregister:
			fmt.Println("连接失败:", conn.ID)
			if _, ok := model.Manager.Clients[conn.ID]; ok {
				controller.ResponseWebSocket(conn.Socket, controller.CodeConnectionBreak, "连接已断开")
			}
			//关闭当前用户使用的管道
			//close(conn.Send)
			//删除用户管理中的已连接的用户
			delete(model.Manager.Clients, conn.ID)
		case broadcast := <-model.Manager.Broadcast: //广播消息
			message := broadcast.Message
			recipientID := broadcast.Client.RecipientID
			//给一个变量用于确定状态
			flag := false
			contentid := createId(strconv.Itoa(broadcast.Client.SendID), strconv.Itoa(recipientID))
			rID := strconv.Itoa(recipientID) + "->"
			//遍历客户端连接map,查找该用户有没有在线,判断的是对方的连接例如:1要向2发消息,我现在是用户1,那么我需要判断2->1是否存在在用户管理中
			for id, conn := range model.Manager.Clients {
				//如果找不到就说明用户不在线,与接收人的id比较
				if id != rID {
					continue
				}
				//走到这一步,就说明用户在线,就把消息放入管道里面
				select {
				case conn.Send <- message:
					flag = true
				default: //否则就把该连接从用户管理中删除
					close(conn.Send)
					delete(model.Manager.Clients, conn.ID)
				}
			}
			//判断完之后就把将消息发给用户
			if flag {
				fmt.Println("用户在线应答")
				controller.ResponseWebSocket(model.Manager.Clients[rID].Socket, controller.CodeConnectionSuccess, string(message))
				//把消息插到数据库中
				msg := model.ChatMessage{
					Direction:   contentid,
					SendID:      broadcast.Client.SendID,
					RecipientID: recipientID,
					GroupID:     broadcast.Client.GroupID,
					Content:     string(message),
					Read:        true,
				}
				err := mysql.DB.Create(&msg).Error
				if err != nil {
					zap.L().Error("在线发送消息出现了错误", zap.Error(err))
				}
			} else { //如果不在线
				controller.ResponseWebSocket(broadcast.Client.Socket, controller.CodeConnectionSuccess, "对方不在线")
				//把消息插到数据库中
				msg := model.ChatMessage{
					Direction:   contentid,
					SendID:      broadcast.Client.SendID,
					RecipientID: recipientID,
					GroupID:     broadcast.Client.GroupID,
					Content:     string(message),
					Read:        false,
				}
				err := mysql.DB.Create(&msg).Error
				if err != nil {
					zap.L().Error("不在线发送消息出现了错误", zap.Error(err))
				}
			}
		}
	}
}
func createId(uid, toUid string) string {
	return uid + "->" + toUid
}

到此这篇关于Golang使用gin框架实现一个完整的聊天室功能的文章就介绍到这了,更多相关Golang实现聊天室内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文