简易聊天系统-聊天服务

聊天负责私人聊天,群组聊天。私人聊天接受信息后保存至数据库再转发给目标用户。群组聊天当前没有离线消息保存,也就是用户登录后无法知道多少消息未读,而是直接拉取指定数量群聊天。当有成员发送后会将聊天信息存储数据库(没有缓存进redis,因为在线用户会直接发送,目前没有这个优化必要),从redis中检索所有群组在线用户并通过消息队列发送至对应网关。大致代码如下://处理群消息funcDealGroupMsg(delivery*amqp.Delivery,transfer1*transfer)

大家好,又见面了,我是你们的朋友全栈君。

聊天负责私人聊天,群组聊天。私人聊天接受信息后保存至数据库再转发给目标用户。群组聊天当前没有离线消息保存,也就是用户登录后无法知道多少消息未读,而是直接拉取指定数量群聊天。当有成员发送后会将聊天信息存储数据库(没有缓存进redis,因为在线用户会直接发送,目前没有这个优化必要),从redis中检索所有群组在线用户并通过消息队列发送至对应网关。

大致代码如下:

// 处理群消息
func DealGroupMsg(delivery *amqp.Delivery, transfer1 *transfer) {
	now := time.Now()

	Userid := transfer1.Id
	var mess SendMessage
	err := mapstructure.Decode(transfer1.Data.Data, &mess)
	if err != nil {
		fmt.Println("DealGroupMsg json err :", err)
		return
	}
	if mess.UserId != Userid {
		fmt.Println("DealGroupMsg 发送者id不一致 ", mess.UserId, "--", Userid)
		//	delivery.Ack(true)
		return
	}

	ItemId := mess.To
	if mess.MsgType != GroupMessage {
		fmt.Println("DealGroupMsg 信息类型不一致 ", mess.MsgType, "--", Userid)
		//	delivery.Ack(true)
		return
	}
	// 判断用户是否为群成员
	r, err := IsGroupMember(Userid, ItemId)
	if err != nil {
		fmt.Println("DealGroupMsg json err :", err)
		return
	}
	if !r {
		fmt.Println("DealGroupMsg 不属于群成员 ", mess.To, "--", Userid)
		//	delivery.Ack(true)
		return
	}
	// 获取全局id
	id, err := redisconn.GetGlobalID()
	if err != nil {
		fmt.Println("DealGroupMsg json err :", err)
		return
	}

	//fmt.Println(id)
	//	time.Sleep(4 * time.Second)
	// 获取当前时间戳
	ti := time.Now().UnixNano() / 1e6
	Msgid1 := strconv.FormatInt(ti, 10) + fmt.Sprintf("%06d", id)
	mess.Id = Msgid1
	// 将id 发送给发信息者
	var sendmessagereply SendMessageReply
	sendmessagereply.Id = Msgid1
	sendmessagereply.MsgReplyID = mess.MsgReplyID
	sendmessagereply.To = mess.To
	sendmessagereply.MsgType = GroupMessage
	nowtime := time.Now().UnixNano() / 1e6
	// 如果发送时间差值小于2S 选用发送者时间 否则选择后台时间
	UserSendTime, err := strconv.ParseInt(mess.SendTime, 10, 64)
	if err != nil {
		fmt.Println("DealGroupMsg", err)
		return
	}
	var ReplyTime string
	if Abs(UserSendTime-nowtime) < 1000*2 {
		ReplyTime = strconv.FormatInt(UserSendTime, 10)
	} else {
		ReplyTime = strconv.FormatInt(nowtime, 10)
	}
	sendmessagereply.ReplyTime = ReplyTime
	var json = jsoniter.ConfigCompatibleWithStandardLibrary
	data2, err := json.Marshal(sendmessagereply)
	if err != nil {
		fmt.Println("DealGroupMsg", err)
		return
	}
	time11 := time.Now()
	err = RabbitMqPublish(mq, data2, Userid, SendGroupMsgAckReply, transfer1.From)
	if err != nil {
		fmt.Println("DealGroupMsg", err)
		return
	}
	fmt.Println("RabbitMqPublish", time.Now().Sub(time11))
	// 查询群成员
	//	delivery.Ack(true)
	var rmsg ReceiveMessage
	rmsg.Id = Msgid1
	rmsg.MsgData = mess.MsgData
	rmsg.MsgDataType = mess.MsgDataType
	rmsg.MsgType = GroupMessage
	rmsg.To = mess.To
	rmsg.UserId = mess.UserId
	rmsg.SendTime = ReplyTime
	// 使用Mysql存储起来
	go InsertMessages(rmsg)
	// err = msql.InsertChatMessage(Msgid1, mess.UserId, mess.To, mess.MsgData, mess.MsgType, mess.MsgDataType, ReplyTime)
	// if err != nil {
	// 	fmt.Println("DealGroupMsg e", Msgid1, err)
	// 	return
	// }
	fmt.Println("DealGroupMsg", time.Now().Sub(now))
	go DealGroupMessage(ItemId, rmsg)
}

// 群成员发送信息后后续处理
func DealGroupMessage(GroupId string, Msg ReceiveMessage) {
	now := time.Now()

	// 获取群成员 在线则发送
	members, err := GetGroupMemberListSimply(GroupId)
	if err != nil {
		fmt.Println("DealGroupMessage", err)
		return
	}

	data2, err := json.Marshal(Msg)
	if err != nil {
		fmt.Println("DealGroupMessage", err)
		return
	}

	for _, member := range members {
		// 获取用户信息
		u, err := redisconn.RedisGetUser(member.User.Userid)
		if err != nil {
			fmt.Println("DealGroupMessage", err)
			continue
		}
		//如果群成员在线则发送
		if u.Status == OnLine {
			//		fmt.Println("Send")
			err = RabbitMqPublish(mq, data2, u.Userid, ReceiveGroupMsg, u.GateWay)
			if err != nil {
				fmt.Println("AddUserDeal", err)
				return
			}
		}
	}
	fmt.Println("DealGroupMessage", time.Now().Sub(now))
}

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/124739.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • linux发起iscsi_iscsi自动连接

    linux发起iscsi_iscsi自动连接1、存储介质1)磁盘阵列:磁盘阵列是一种采用RAID技术、冗余技术和在线维护技术制造的一种高性能、高可用的磁盘存储设备。2)IP-SAN存储:SAN(StorageAreaNetwork-存储区域网络):是计算机信息处理技术中的一种架构,它将服务器和远程的计算机存储设备(如磁盘阵列、磁带库)连接起来,使得这些存储设备看起来就像是本地一样。SAN就理解成存储虚拟化,而IP-SAN就是采

    2022年8月23日
    8
  • 什么是框架(包括前端框架和后端框架)[通俗易懂]

    什么是框架(包括前端框架和后端框架)[通俗易懂]什么是框架

    2022年5月31日
    35
  • cvpr目标检测_目标检测指标

    cvpr目标检测_目标检测指标论文年份:2016,论文被引:12032(2022/05/03)

    2025年6月20日
    3
  • 磁盘加密

    磁盘加密

    2022年3月12日
    43
  • java中判断字符串是否日期格式的方法建议收藏

    大家好,又见面了,我是全栈君,今天给大家准备了Idea注册码。此处内容已经被作者隐藏,请输入验证码查看内容验证码:请关注本站微信公众号,回复“验证码”,获取验证码。在微信里搜索“全…

    未分类 2021年12月18日
    59
  • CPU流水线技术演进「建议收藏」

    CPU流水线技术演进「建议收藏」一.三级线性流水线每个流水级的结构是:逻辑电路+寄存器我们可以将流水线往下细分,使得各个流水级足够小(CPU执行时间少),就可以通过提高系统时钟频率来提高CPU的处理速度。二.多级线性流水线(这里以5级为例)注意:我们把5级以上的流水线称为超流水线结构。三.muti-多级线性流水线(这里以5级为例)四.多级非线性流水线(乱序执行部件)五.超线程处理器多级非线性流水线(虚拟处理器共用乱序执行部件)拥有超线程的处理器将两个虚拟的处理器暴露给共享的乱..

    2022年8月20日
    5

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号