目录

大橙子

VX:ZzzChChen
Phone:13403656751
Email:zxydczzs@gmail.com

X

Stomp协议 关键点(持续更新)

监听

如果需要添加监听,我们的监听类需要实现ChannelInterceptor接口,在 springframework包5.0.7之前这一步我们一般是实现ChannelInterceptorAdapter 抽象类,不过这个类已经废弃了,文档也推荐直接实现接口。

https://www.jianshu.com/p/4762494d42f1

https://www.jianshu.com/p/9103c9c7e128

https://spring.io/guides/gs/messaging-stomp-websocket/

package org.springframework.messaging.support;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

public interface ChannelInterceptor {
    // 在消息发送之前调用,方法中可以对消息进行修改,如果此方法返回值为空,则不会发生实际的消息发送调用
    Message<?> preSend(Message<?> var1, MessageChannel var2);

    // 在消息发送后立刻调用,boolean值参数表示该调用的返回值
    void postSend(Message<?> var1, MessageChannel var2, boolean var3);

    /*
     * 1. 在消息发送完成后调用,而不管消息发送是否产生异常,在次方法中,我们可以做一些资源释放清理的工作
     * 2. 此方法的触发必须是preSend方法执行成功,且返回值不为null,发生了实际的消息推送,才会触发
     */
    void afterSendCompletion(Message<?> var1, MessageChannel var2, boolean var3, Exception var4);

    /* 1. 在消息被实际检索之前调用,如果返回false,则不会对检索任何消息,只适用于(PollableChannels),
     * 2. 在websocket的场景中用不到
     */
    boolean preReceive(MessageChannel var1);

    /*
     * 1. 在检索到消息之后,返回调用方之前调用,可以进行信息修改,如果返回null,就不会进行下一步操作
     * 2. 适用于PollableChannels,轮询场景
     */
    Message<?> postReceive(Message<?> var1, MessageChannel var2);

    /*
     * 1. 在消息接收完成之后调用,不管发生什么异常,可以用于消息发送后的资源清理
     * 2. 只有当preReceive 执行成功,并返回true才会调用此方法
     * 2. 适用于PollableChannels,轮询场景
     */
    void afterReceiveCompletion(Message<?> var1, MessageChannel var2, Exception var3);
}
package com.wzh.demo.websocket.interceptor;

import com.wzh.demo.domain.WebSocketUserAuthentication;
import org.apache.log4j.Logger;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;

import javax.servlet.http.HttpSession;

import static org.springframework.messaging.simp.stomp.StompCommand.CONNECT;

/**
 * <websocke消息监听,用于监听websocket用户连接情况>
 * <功能详细描述>
 * @author wzh
 * @version 2018-08-25 23:39
 * @see [相关类/方法] (可选)
 **/
public class WebSocketChannelInterceptor implements ChannelInterceptor {

    public WebSocketChannelInterceptor() {
    }

    Logger log = Logger.getLogger(WebSocketChannelInterceptor.class);

    // 在消息发送之前调用,方法中可以对消息进行修改,如果此方法返回值为空,则不会发生实际的消息发送调用
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {

        StompHeaderAccessor accessor =  MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        /**
         * 1. 判断是否为首次连接请求,如果已经连接过,直接返回message
         * 2. 网上有种写法是在这里封装认证用户的信息,本文是在http阶段,websockt 之前就做了认证的封装,所以这里直接取的信息
         */
        if(StompCommand.CONNECT.equals(accessor.getCommand()))
        {
            /*
             * 1. 这里获取就是JS stompClient.connect(headers, function (frame){.......}) 中header的信息
             * 2. JS中header可以封装多个参数,格式是{key1:value1,key2:value2}
             * 3. header参数的key可以一样,取出来就是list
             * 4. 样例代码header中只有一个token,所以直接取0位
             */
            String token = accessor.getNativeHeader("token").get(0);

            /*
             * 1. 这里直接封装到StompHeaderAccessor 中,可以根据自身业务进行改变
             * 2. 封装大搜StompHeaderAccessor中后,可以在@Controller / @MessageMapping注解的方法中直接带上StompHeaderAccessor
             *    就可以通过方法提供的 getUser()方法获取到这里封装user对象
             * 2. 例如可以在这里拿到前端的信息进行登录鉴权
             */
            WebSocketUserAuthentication user = (WebSocketUserAuthentication) accessor.getUser();

            System.out.println("认证用户:" + user.toString() + " 页面传递令牌" + token);

        }else if (StompCommand.DISCONNECT.equals(accessor.getCommand()))
        {

        }
        return message;
    }

    // 在消息发送后立刻调用,boolean值参数表示该调用的返回值
    @Override
    public void postSend(Message<?> message, MessageChannel messageChannel, boolean b) {

        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);

        /*
         * 拿到消息头对象后,我们可以做一系列业务操作
         * 1. 通过getSessionAttributes()方法获取到websocketSession,
         *    就可以取到我们在WebSocketHandshakeInterceptor拦截器中存在session中的信息
         * 2. 我们也可以获取到当前连接的状态,做一些统计,例如统计在线人数,或者缓存在线人数对应的令牌,方便后续业务调用
         */
        HttpSession httpSession = (HttpSession) accessor.getSessionAttributes().get("HTTP_SESSION");

        // 这里只是单纯的打印,可以根据项目的实际情况做业务处理
        log.info("postSend 中获取httpSession key:" + httpSession.getId());

        // 忽略心跳消息等非STOMP消息
        if(accessor.getCommand() == null)
        {
            return;
        }

        // 根据连接状态做处理,这里也只是打印了下,可以根据实际场景,对上线,下线,首次成功连接做处理
        System.out.println(accessor.getCommand());
        switch (accessor.getCommand())
        {
            // 首次连接
            case CONNECT:
                log.info("httpSession key:" + httpSession.getId() + " 首次连接");
                break;
            // 连接中
            case CONNECTED:
                break;
            // 下线
            case DISCONNECT:
                log.info("httpSession key:" + httpSession.getId() + " 下线");
                break;
             default:
                break;
        }


    }

    /*
     * 1. 在消息发送完成后调用,而不管消息发送是否产生异常,在次方法中,我们可以做一些资源释放清理的工作
     * 2. 此方法的触发必须是preSend方法执行成功,且返回值不为null,发生了实际的消息推送,才会触发
     */
    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel messageChannel, boolean b, Exception e) {

    }

    /* 1. 在消息被实际检索之前调用,如果返回false,则不会对检索任何消息,只适用于(PollableChannels),
     * 2. 在websocket的场景中用不到
     */
    @Override
    public boolean preReceive(MessageChannel messageChannel) {
        return true;
    }

    /*
     * 1. 在检索到消息之后,返回调用方之前调用,可以进行信息修改,如果返回null,就不会进行下一步操作
     * 2. 适用于PollableChannels,轮询场景
     */
    @Override
    public Message<?> postReceive(Message<?> message, MessageChannel messageChannel) {
        return message;
    }

    /*
     * 1. 在消息接收完成之后调用,不管发生什么异常,可以用于消息发送后的资源清理
     * 2. 只有当preReceive 执行成功,并返回true才会调用此方法
     * 2. 适用于PollableChannels,轮询场景
     */
    @Override
    public void afterReceiveCompletion(Message<?> message, MessageChannel messageChannel, Exception e) {

    }
}


image.png

订阅完成后推送消息

需要创建Controller
@RestController
public class WebSocketController {

    @Autowired
    private BaseSocketService baseSocketService;

    @SubscribeMapping({"订阅地址"})
    public void subscribe(MsgPrincipal msgPrincipal){
        baseSocketService.connectSuccessPush(msgPrincipal);
    }
}

标题:Stomp协议 关键点(持续更新)
作者:zzzzchen
地址:https://www.dczzs.com/articles/2021/12/06/1638762439583.html