当前位置:首页 > 应用开发

使用消息队列实现分布式 WebSocket

上周知识星球中的使用实现式球友问了一个关于websocket的问题,大致如下:

简单的消息概括一下:如果我们的项目是分布式环境,登录的队列用户被Nginx的反向代理分配到多个不同服务器,那么在其中一个服务器建立了WebSocket连接的分布用户如何给在另外一个服务器上建立了WebSocket连接的用户发送消息呢?

今天就来解答一下球友的问题:其实,要解决这个问题就需要实现分布式WebSocket,使用实现式而分布式WebSocket一般可以通过以下两种方案来实现:

将消息(<用户id,消息消息内容>)统一推送到一个消息队列(Redis、队列Kafka等)的分布的topic,然后每个应用节点都订阅这个topic,使用实现式在接收到WebSocket消息后取出这个消息的消息“消息接收者的用户ID/用户名”,然后再比对自身是队列否存在相应用户的连接,如果存在则推送消息,分布否则丢弃接收到的使用实现式这个消息(这个消息接收者所在的高防服务器应用节点会处理)在用户建立WebSocket连接后,使用Redis缓存记录用户的消息WebSocket建立在哪个应用节点上,然后同样使用消息队列将消息推送到接收者所在的队列应用节点上面(实现上比方案一要复杂,但是网络流量会更低)实现方案

下面将以第一种方案来具体实现,实现方式如下:

1. 定义一个WebSocket Channel枚举类 复制public enum WebSocketChannelEnum { //

测试使用的简易点对点聊天

CHAT("CHAT", "测试使用的简易点对点聊天", "/topic/reply"); WebSocketChannelEnum(String code, String description, String subscribeUrl) { this.code = code; this.description = description; this.subscribeUrl = subscribeUrl; } /** * 唯一CODE */ private String code; /** * 描述 */ private String description; /** * WebSocket客户端订阅的URL */ private String subscribeUrl; public String getCode() { return code; } public String getDescription() { return description; } public String getSubscribeUrl() { return subscribeUrl; } /** * 通过CODE查找枚举类 */ public static WebSocketChannelEnum fromCode(String code){ if(StringUtils.isNoneBlank(code)){ for(WebSocketChannelEnum channelEnum : values()){ if(channelEnum.code.equals(code)){ return channelEnum; } } } return null; }}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51. 2. 配置基于Redis的消息队列

需要注意的是,在大中型正式项目中并不推荐使用Redis实现的消息队列,因为经过测试它并不是特别可靠,所以应该考虑使用Kafka、rabbitMQ等专业的消息队列中间件

复制@Configuration

@ConditionalOnClass({ JedisCluster.class})public class RedisConfig { @Value("${ spring.redis.timeout}") private String timeOut; @Value("${ spring.redis.cluster.nodes}") private String nodes; @Value("${ spring.redis.cluster.max-redirects}") private int maxRedirects; @Value("${ spring.redis.jedis.pool.max-active}") private int maxActive; @Value("${ spring.redis.jedis.pool.max-wait}") private int maxWait; @Value("${ spring.redis.jedis.pool.max-idle}") private int maxIdle; @Value("${ spring.redis.jedis.pool.min-idle}") private int minIdle; @Value("${ spring.redis.message.topic-name}") private String topicName;

@Bean

public JedisPoolConfig jedisPoolConfig(){ JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(maxActive); config.setMaxIdle(maxIdle); config.setMinIdle(minIdle); config.setMaxWaitMillis(maxWait); return config; }

@Bean

public RedisClusterConfiguration redisClusterConfiguration(){ RedisClusterConfiguration configuration = new RedisClusterConfiguration(Arrays.asList(nodes)); configuration.setMaxRedirects(maxRedirects); return configuration; } /** * JedisConnectionFactory */

@Bean

public JedisConnectionFactory jedisConnectionFactory(RedisClusterConfiguration configuration,JedisPoolConfig jedisPoolConfig){ return new JedisConnectionFactory(configuration,jedisPoolConfig); } /** * 使用Jackson序列化对象 */

@Bean

public Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer(){ Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<Object>(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); serializer.setObjectMapper(objectMapper); return serializer; } /** * RedisTemplate */

@Bean

public RedisTemplate<String, Object> redisTemplate(JedisConnectionFactory factory, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){ RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(factory); //

字符串方式序列化KEY

StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); redisTemplate.setKeySerializer(stringRedisSerializer); redisTemplate.setHashKeySerializer(stringRedisSerializer); //

JSON方式序列化VALUE

redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; } /** * 消息监听器 */

@Bean

MessageListenerAdapter messageListenerAdapter(MessageReceiver messageReceiver, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){ //

消息接收者以及对应的默认处理方法

MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(messageReceiver, "receiveMessage"); //

消息的反序列化方式

messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer); return messageListenerAdapter; } /** * message listener container */

@Bean

RedisMessageListenerContainer container(

RedisConnectionFactory connectionFactory

, MessageListenerAdapter messageListenerAdapter){ RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //

添加消息监听器

container.addMessageListener(messageListenerAdapter, new PatternTopic(topicName)); return container; }}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.97.98.99.100.101.102.103.104.105.106.107.108.109.110.111.112.113.114.115.116.117.118.119.120.

需要注意的是,这里使用的配置如下所示:

复制spring:

...

#redis

redis: cluster: nodes: namenode22:6379,datanode23:6379,datanode24:6379 max-redirects: 6 timeout: 300000 jedis: pool: max-active: 8 max-wait: 100000 max-idle: 8 min-idle: 0

#自定义的监听的TOPIC路径

message: topic-name: topic-test1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17. 3. 定义一个Redis消息的云南idc服务商处理者

复制@Component

public class MessageReceiver { private final Logger logger = LoggerFactory.getLogger(getClass());

@Autowired

private SimpMessagingTemplate messagingTemplate;

@Autowired

private SimpUserRegistry userRegistry; /** * 处理WebSocket消息 */ public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg) { logger.info(MessageFormat.format("Received Message: { 0}", redisWebsocketMsg)); //1

. 取出用户名并判断是否连接到当前应用节点的WebSocket

SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver()); if(simpUser != null && StringUtils.isNoneBlank(simpUser.getName())){ //2

. 获取WebSocket客户端的订阅地址

WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode()); if(channelEnum != null){ //3

. 给WebSocket客户端发送消息

messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent()); } } }}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30. 4. 在Controller中发送WebSocket消息

复制@Controller

@RequestMapping(("/wsTemplate"))public class RedisMessageController { private final Logger logger = LoggerFactory.getLogger(getClass()); @Value("${ spring.redis.message.topic-name}") private String topicName;

@Autowired

private SimpMessagingTemplate messagingTemplate;

@Autowired

private SimpUserRegistry userRegistry; @Resource(name = "redisServiceImpl") private RedisService redisService; /** * 给指定用户发送WebSocket消息 */ @PostMapping("/sendToUser")

@ResponseBody

public String chat(HttpServletRequest request) { //

消息接收者

String receiver = request.getParameter("receiver"); //

消息内容

String msg = request.getParameter("msg"); HttpSession session = SpringContextUtils.getSession(); User loginUser = (User) session.getAttribute(Constants.SESSION_USER); HelloMessage resultData = new HelloMessage(MessageFormat.format("{ 0} say: { 1}", loginUser.getUsername(), msg)); this.sendToUser(loginUser.getUsername(), receiver, WebSocketChannelEnum.CHAT.getSubscribeUrl(), JsonUtils.toJson(resultData)); return "ok"; } /** * 给指定用户发送消息,并处理接收者不在线的情况 * @param sender 消息发送者 * @param receiver 消息接收者 * @param destination 目的地 * @param payload 消息正文 */ private void sendToUser(String sender, String receiver, String destination, String payload){ SimpUser simpUser = userRegistry.getUser(receiver); //

如果接收者存在,则发送消息

if(simpUser != null && StringUtils.isNoneBlank(simpUser.getName())){ messagingTemplate.convertAndSendToUser(receiver, destination, payload); } //

如果接收者在线,则说明接收者连接了集群的其他节点,需要通知接收者连接的那个节点发送消息

else if(redisService.isSetMember(Constants.REDIS_WEBSOCKET_USER_SET, receiver)){ RedisWebsocketMsg<String> redisWebsocketMsg = new RedisWebsocketMsg<>(receiver, WebSocketChannelEnum.CHAT.getCode(), payload); redisService.convertAndSend(topicName, redisWebsocketMsg); } //

否则将消息存储到redis,等用户上线后主动拉取未读消息

else{ //

存储消息的Redis列表名

String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + receiver + ":" + destination; logger.info(MessageFormat.format("消息接收者{ 0}还未建立WebSocket连接,{ 1}发送的消息【{ 2}】将被存储到Redis的【{ 3}】列表中", receiver, sender, payload, listKey)); //

存储消息到Redis中

redisService.addToListRight(listKey, ExpireEnum.UNREAD_MSG, payload); } } /** * 拉取指定监听路径的未读的WebSocket消息 * @param destination 指定监听路径 * @return java.util.Map<java.lang.String,java.lang.Object> */ @PostMapping("/pullUnreadMessage")

@ResponseBody

public Map<String, Object> pullUnreadMessage(String destination){ Map<String, Object> result = new HashMap<>(); try { HttpSession session = SpringContextUtils.getSession(); //

当前登录用户

User loginUser = (User) session.getAttribute(Constants.SESSION_USER); //

存储消息的Redis列表名

String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + loginUser.getUsername() + ":" + destination; //

从Redis中拉取所有未读消息

List<Object> messageList = redisService.rangeList(listKey, 0, -1); result.put("code", "200"); if(messageList !=null && messageList.size() > 0){ //

删除Redis中的这个未读消息列表

redisService.delete(listKey); //

将数据添加到返回集,供前台页面展示

result.put("result", messageList); } }catch (Exception e){ result.put("code", "500"); result.put("msg", e.getMessage()); } return result; }}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.97.98.99.100.101.102.103.104. 5. WebSocket相关配置

复制@Configuration

@EnableWebSocketMessageBroker

public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{

@Autowired

private AuthHandshakeInterceptor authHandshakeInterceptor;

@Autowired

private MyHandshakeHandler myHandshakeHandler;

@Autowired

private MyChannelInterceptor myChannelInterceptor;

@Override

public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/chat-websocket") .addInterceptors(authHandshakeInterceptor) .setHandshakeHandler(myHandshakeHandler) .withSockJS(); }

@Override

public void configureMessageBroker(MessageBrokerRegistry registry) { //客户端需要把消息发送到/message/

xxx地址

registry.setApplicationDestinationPrefixes("/message"); //服务端广播消息的路径前缀,客户端需要相应订阅/topic/

yyy这个地址的消息

registry.enableSimpleBroker("/topic"); //给指定用户发送消息的路径前缀,默认值是/user/ registry.setUserDestinationPrefix("/user/"); }

@Override

public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(myChannelInterceptor); }}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36. 6. 示例页面 复制<head> <meta content="text/html;charset=UTF-8"/> <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/> <meta http-equiv="X-UA-Compatible" content="IE=edge"/> <meta name="viewport" content="width=device-width, initial-scale=1"/> <title>Chat With STOMP Message</title> <script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script> <script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.1.4/sockjs.min.js"></script> <script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script> <script th:src="@{ /layui/layui.js}"></script> <script th:src="@{ /layui/lay/modules/layer.js}"></script> <link th:href="@{ /layui/css/layui.css}" rel="stylesheet"> <link th:href="@{ /layui/css/modules/layer/default/layer.css}" rel="stylesheet"> <link th:href="@{ /css/style.css}" rel="stylesheet"> <style type="text/css"> #connect-container { margin: 0 auto; width: 400px; } #connect-container div { padding: 5px; margin: 0 7px 10px 0; } .message input { padding: 5px; margin: 0 7px 10px 0; } .layui-btn { display: inline-block; } </style> <script type="text/javascript"> var stompClient = null; $(function () { var target = $("#target"); if (window.location.protocol === http:) { target.val(http:// + window.location.host + target.val()); } else { target.val(https:// + window.location.host + target.val()); } }); function setConnected(connected) { var connect = $("#connect"); var disconnect = $("#disconnect"); var echo = $("#echo"); if (connected) { connect.addClass("layui-btn-disabled"); disconnect.removeClass("layui-btn-disabled"); echo.removeClass("layui-btn-disabled"); } else { connect.removeClass("layui-btn-disabled"); disconnect.addClass("layui-btn-disabled"); echo.addClass("layui-btn-disabled"); } connect.attr("disabled", connected); disconnect.attr("disabled", !connected); echo.attr("disabled", !connected); } //

连接

function connect() { var target = $("#target").val(); var ws = new SockJS(target); stompClient = Stomp.over(ws); stompClient.connect({ }, function () { setConnected(true); log(Info: STOMP connection opened.); //

连接成功后,主动拉取未读消息

pullUnreadMessage("/topic/reply"); //订阅服务端的源码下载/topic/

reply地址

stompClient.subscribe("/user/topic/reply", function (response) { log(JSON.parse(response.body).content); }) },function () { //

断开处理

setConnected(false); log(Info: STOMP connection closed.); }); } //

断开连接

function disconnect() { if (stompClient != null) { stompClient.disconnect(); stompClient = null; } setConnected(false); log(Info: STOMP connection closed.); } //

向指定用户发送消息

function sendMessage() { if (stompClient != null) { var receiver = $("#receiver").val(); var msg = $("#message").val(); log(Sent: + JSON.stringify({ receiver: receiver, msg:msg})); $.ajax({ url: "/wsTemplate/sendToUser", type: "POST", dataType: "json", async: true, data: { "receiver": receiver, "msg":

msg

}, success: function (data) { } }); } else { layer.msg(STOMP connection not established, please connect., { offset: auto ,icon: 2 }); } } //

从服务器拉取未读消息

function pullUnreadMessage(destination) { $.ajax({ url: "/wsTemplate/pullUnreadMessage", type: "POST", dataType: "json", async: true, data: { "destination":

destination

}, success: function (data) { if (data.result != null) { $.each(data.result, function (i, item) { log(JSON.parse(item).content); }) } else if (data.code !=null && data.code == "500") { layer.msg(data.msg, { offset: auto ,icon: 2 }); } } }); } //

日志输出

function log(message) { console.debug(message); } </script></head><body> <noscript><h2 style="color: #ff0000">Seems your browser doesnt support Javascript! Websockets rely on Javascript being enabled. Please enable Javascript and reload this page!</h2></noscript> <div> <div id="connect-container" class="layui-elem-field"> <legend>Chat With STOMP Message</legend> <div> <input id="target" type="text" class="layui-input" size="40" style="width: 350px" value="/chat-websocket"/> </div> <div> <button id="connect" class="layui-btn layui-btn-normal" onclick="connect();">Connect</button> <button id="disconnect" class="layui-btn layui-btn-normal layui-btn-disabled" disabled="disabled" onclick="disconnect();">Disconnect </button> </div> <div class="message"> <input id="receiver" type="text" class="layui-input" size="40" style="width: 350px" placeholder="接收者姓名" value=""/> <input id="message" type="text" class="layui-input" size="40" style="width: 350px" placeholder="消息内容" value=""/> </div> <div> <button id="echo" class="layui-btn layui-btn-normal layui-btn-disabled" disabled="disabled" onclick="sendMessage();">Send Message </button> </div> </div> </div></body></html>1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.97.98.99.100.101.102.103.104.105.106.107.108.109.110.111.112.113.114.115.116.117.118.119.120.121.122.123.124.125.126.127.128.129.130.131.132.133.134.135.136.137.138.139.140.141.142.143.144.145.146.147.148.149.150.151.152.153.154.155.156.157.158.159.160.161.162.163.164.165.166.167.168.169.170.171.172.173.174.175.176.177.178.179.180.181.182.183.184.185.186.187.188.189.190.

分享到:

滇ICP备2023006006号-16