目录

chen 的个人博客

VX:ZzzChChen
Phone:13403656751
Email:zxydczzs@gmail.com

X

Stomp协议 关键点(持续更新)

监听

如果需要添加监听,我们的监听类需要实现 ChannelInterceptor 接口,在 springframework 包 5.0.7 之前这一步我们一般是实现 ChannelInterceptorAdapter 抽象类,不过这个类已经废弃了,文档也推荐直接实现接口。

https://www.jianshu.com/p/4762494d42f1

https://www.jianshu.com/p/9103c9c7e128

https://spring.io/guides/gs/messaging-stomp-websocket/

 1package org.springframework.messaging.support;
 2
 3import org.springframework.messaging.Message;
 4import org.springframework.messaging.MessageChannel;
 5
 6public interface ChannelInterceptor {
 7    // 在消息发送之前调用,方法中可以对消息进行修改,如果此方法返回值为空,则不会发生实际的消息发送调用
 8    Message<?> preSend(Message<?> var1, MessageChannel var2);
 9
10    // 在消息发送后立刻调用,boolean值参数表示该调用的返回值
11    void postSend(Message<?> var1, MessageChannel var2, boolean var3);
12
13    /*
14     * 1. 在消息发送完成后调用,而不管消息发送是否产生异常,在次方法中,我们可以做一些资源释放清理的工作
15     * 2. 此方法的触发必须是preSend方法执行成功,且返回值不为null,发生了实际的消息推送,才会触发
16     */
17    void afterSendCompletion(Message<?> var1, MessageChannel var2, boolean var3, Exception var4);
18
19    /* 1. 在消息被实际检索之前调用,如果返回false,则不会对检索任何消息,只适用于(PollableChannels),
20     * 2. 在websocket的场景中用不到
21     */
22    boolean preReceive(MessageChannel var1);
23
24    /*
25     * 1. 在检索到消息之后,返回调用方之前调用,可以进行信息修改,如果返回null,就不会进行下一步操作
26     * 2. 适用于PollableChannels,轮询场景
27     */
28    Message<?> postReceive(Message<?> var1, MessageChannel var2);
29
30    /*
31     * 1. 在消息接收完成之后调用,不管发生什么异常,可以用于消息发送后的资源清理
32     * 2. 只有当preReceive 执行成功,并返回true才会调用此方法
33     * 2. 适用于PollableChannels,轮询场景
34     */
35    void afterReceiveCompletion(Message<?> var1, MessageChannel var2, Exception var3);
36}
  1package com.wzh.demo.websocket.interceptor;
  2
  3import com.wzh.demo.domain.WebSocketUserAuthentication;
  4import org.apache.log4j.Logger;
  5import org.springframework.messaging.Message;
  6import org.springframework.messaging.MessageChannel;
  7import org.springframework.messaging.simp.stomp.StompCommand;
  8import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
  9import org.springframework.messaging.support.ChannelInterceptor;
 10import org.springframework.messaging.support.MessageHeaderAccessor;
 11
 12import javax.servlet.http.HttpSession;
 13
 14import static org.springframework.messaging.simp.stomp.StompCommand.CONNECT;
 15
 16/**
 17 * <websocke消息监听,用于监听websocket用户连接情况>
 18 * <功能详细描述>
 19 * @author wzh
 20 * @version 2018-08-25 23:39
 21 * @see [相关类/方法] (可选)
 22 **/
 23public class WebSocketChannelInterceptor implements ChannelInterceptor {
 24
 25    public WebSocketChannelInterceptor() {
 26    }
 27
 28    Logger log = Logger.getLogger(WebSocketChannelInterceptor.class);
 29
 30    // 在消息发送之前调用,方法中可以对消息进行修改,如果此方法返回值为空,则不会发生实际的消息发送调用
 31    @Override
 32    public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {
 33
 34        StompHeaderAccessor accessor =  MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
 35        /**
 36         * 1. 判断是否为首次连接请求,如果已经连接过,直接返回message
 37         * 2. 网上有种写法是在这里封装认证用户的信息,本文是在http阶段,websockt 之前就做了认证的封装,所以这里直接取的信息
 38         */
 39        if(StompCommand.CONNECT.equals(accessor.getCommand()))
 40        {
 41            /*
 42             * 1. 这里获取就是JS stompClient.connect(headers, function (frame){.......}) 中header的信息
 43             * 2. JS中header可以封装多个参数,格式是{key1:value1,key2:value2}
 44             * 3. header参数的key可以一样,取出来就是list
 45             * 4. 样例代码header中只有一个token,所以直接取0位
 46             */
 47            String token = accessor.getNativeHeader("token").get(0);
 48
 49            /*
 50             * 1. 这里直接封装到StompHeaderAccessor 中,可以根据自身业务进行改变
 51             * 2. 封装大搜StompHeaderAccessor中后,可以在@Controller / @MessageMapping注解的方法中直接带上StompHeaderAccessor
 52             *    就可以通过方法提供的 getUser()方法获取到这里封装user对象
 53             * 2. 例如可以在这里拿到前端的信息进行登录鉴权
 54             */
 55            WebSocketUserAuthentication user = (WebSocketUserAuthentication) accessor.getUser();
 56
 57            System.out.println("认证用户:" + user.toString() + " 页面传递令牌" + token);
 58
 59        }else if (StompCommand.DISCONNECT.equals(accessor.getCommand()))
 60        {
 61
 62        }
 63        return message;
 64    }
 65
 66    // 在消息发送后立刻调用,boolean值参数表示该调用的返回值
 67    @Override
 68    public void postSend(Message<?> message, MessageChannel messageChannel, boolean b) {
 69
 70        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
 71
 72        /*
 73         * 拿到消息头对象后,我们可以做一系列业务操作
 74         * 1. 通过getSessionAttributes()方法获取到websocketSession,
 75         *    就可以取到我们在WebSocketHandshakeInterceptor拦截器中存在session中的信息
 76         * 2. 我们也可以获取到当前连接的状态,做一些统计,例如统计在线人数,或者缓存在线人数对应的令牌,方便后续业务调用
 77         */
 78        HttpSession httpSession = (HttpSession) accessor.getSessionAttributes().get("HTTP_SESSION");
 79
 80        // 这里只是单纯的打印,可以根据项目的实际情况做业务处理
 81        log.info("postSend 中获取httpSession key:" + httpSession.getId());
 82
 83        // 忽略心跳消息等非STOMP消息
 84        if(accessor.getCommand() == null)
 85        {
 86            return;
 87        }
 88
 89        // 根据连接状态做处理,这里也只是打印了下,可以根据实际场景,对上线,下线,首次成功连接做处理
 90        System.out.println(accessor.getCommand());
 91        switch (accessor.getCommand())
 92        {
 93            // 首次连接
 94            case CONNECT:
 95                log.info("httpSession key:" + httpSession.getId() + " 首次连接");
 96                break;
 97            // 连接中
 98            case CONNECTED:
 99                break;
100            // 下线
101            case DISCONNECT:
102                log.info("httpSession key:" + httpSession.getId() + " 下线");
103                break;
104             default:
105                break;
106        }
107
108
109    }
110
111    /*
112     * 1. 在消息发送完成后调用,而不管消息发送是否产生异常,在次方法中,我们可以做一些资源释放清理的工作
113     * 2. 此方法的触发必须是preSend方法执行成功,且返回值不为null,发生了实际的消息推送,才会触发
114     */
115    @Override
116    public void afterSendCompletion(Message<?> message, MessageChannel messageChannel, boolean b, Exception e) {
117
118    }
119
120    /* 1. 在消息被实际检索之前调用,如果返回false,则不会对检索任何消息,只适用于(PollableChannels),
121     * 2. 在websocket的场景中用不到
122     */
123    @Override
124    public boolean preReceive(MessageChannel messageChannel) {
125        return true;
126    }
127
128    /*
129     * 1. 在检索到消息之后,返回调用方之前调用,可以进行信息修改,如果返回null,就不会进行下一步操作
130     * 2. 适用于PollableChannels,轮询场景
131     */
132    @Override
133    public Message<?> postReceive(Message<?> message, MessageChannel messageChannel) {
134        return message;
135    }
136
137    /*
138     * 1. 在消息接收完成之后调用,不管发生什么异常,可以用于消息发送后的资源清理
139     * 2. 只有当preReceive 执行成功,并返回true才会调用此方法
140     * 2. 适用于PollableChannels,轮询场景
141     */
142    @Override
143    public void afterReceiveCompletion(Message<?> message, MessageChannel messageChannel, Exception e) {
144
145    }
146}
147
148

image.png

订阅完成后推送消息

需要创建 Controller
 1@RestController
 2public class WebSocketController {
 3
 4    @Autowired
 5    private BaseSocketService baseSocketService;
 6
 7    @SubscribeMapping({"订阅地址"})
 8    public void subscribe(MsgPrincipal msgPrincipal){
 9        baseSocketService.connectSuccessPush(msgPrincipal);
10    }
11}

标题:Stomp协议 关键点(持续更新)
作者:zzzzchen
地址:https://www.dczzs.com/articles/2021/12/06/1638762439583.html