java WebSocket客户端断线重连 | 实用代码框架「建议收藏」

java WebSocket客户端断线重连 | 实用代码框架「建议收藏」在工作中是否会遇到实用websocket客户端连接服务端的时候,网络波动,服务端断连的情况。会导致客户端被动断开连接。为了解决这个问题,需要对被动断开连接的情况进行捕获,并重新创建连接。这篇文章主要是提供可以直接使用的断线重连websocket客户端代码。

大家好,又见面了,我是你们的朋友全栈君。

目录

前言

Maven依赖

代码

总结


前言

在工作中是否会遇到实用websocket客户端连接服务端的时候,网络波动服务端断连的情况。会导致客户端被动断开连接。为了解决这个问题,需要对被动断开连接的情况进行捕获,并重新创建连接。这篇文章主要是提供可以直接使用的断线重连websocket客户端代码。

Maven依赖

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.5.2</version>
        </dependency>
        <dependency>
            <groupId>org.java-websocket</groupId>
            <artifactId>Java-WebSocket</artifactId>
            <version>1.5.1</version>
        </dependency>

代码

不废话,上代码。

package ai.guiji.csdn.ws.client;

import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.framing.Framedata;
import org.java_websocket.handshake.ServerHandshake;

import javax.net.ssl.*;
import java.net.Socket;
import java.net.URI;
import java.nio.ByteBuffer;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

/** @Author huyi @Date 2021/10/15 20:03 @Description: 重连websocket客户端 */
@Slf4j
public class ReConnectWebSocketClient {
  /** 字符串消息回调 */
  private Consumer<String> msgStr;
  /** 字节流消息回调 */
  private Consumer<ByteBuffer> msgByte;
  /** 异常回调 */
  private Consumer<Exception> error;
  /** 连接标识 */
  private String key;
  /** ws服务端连接 */
  private URI serverUri;
  /** 尝试重连标识 */
  private AtomicBoolean tryReconnect;
  /** 需要ping标识 */
  private AtomicBoolean needPing;
  /** websocket连接实体 */
  private WebSocketClient webSocketClient;
  /** 重连次数 */
  private AtomicInteger reConnectTimes;
  /** 连接结束标识 */
  private AtomicBoolean end;
  /** 连接后初始发送报文,这里也可以不需要,如果服务端主动断开连接,重连后可以继续推送报文的话。 */
  private String initReConnectReq;
  /** 结束回调 */
  private Consumer<String> endConsumer;

  public ReConnectWebSocketClient(
      URI serverUri,
      String key,
      Consumer<String> msgStr,
      Consumer<ByteBuffer> msgByte,
      Consumer<Exception> error) {
    this.msgStr = msgStr;
    this.msgByte = msgByte;
    this.error = error;
    this.key = key;
    this.serverUri = serverUri;
    this.tryReconnect = new AtomicBoolean(false);
    this.needPing = new AtomicBoolean(true);
    this.reConnectTimes = new AtomicInteger(0);
    this.end = new AtomicBoolean(false);
    this.endConsumer = this::close;
    init();
  }

  /** 初始化连接 */
  public void init() {
    // 创建连接
    createWebSocketClient();
    // ping线程
    circlePing();
  }

  private void needReconnect() throws Exception {
    ThreadUtil.sleep(10, TimeUnit.SECONDS);
    int cul = reConnectTimes.incrementAndGet();
    if (cul > 3) {
      close("real stop");
      throw new Exception("服务端断连,3次重连均失败");
    }
    log.warn("[{}]第[{}]次断开重连", key, cul);
    if (tryReconnect.get()) {
      log.error("[{}]第[{}]次断开重连结果 -> 连接正在重连,本次重连请求放弃", key, cul);
      needReconnect();
      return;
    }
    try {
      tryReconnect.set(true);

      if (webSocketClient.isOpen()) {
        log.warn("[{}]第[{}]次断开重连,关闭旧连接", key, cul);
        webSocketClient.closeConnection(2, "reconnect stop");
      }
      webSocketClient = null;
      createWebSocketClient();
      connect();
      if (!StrUtil.hasBlank(initReConnectReq)) {
        send(initReConnectReq);
      }
    } catch (Exception exception) {
      log.error("[{}]第[{}]次断开重连结果 -> 连接正在重连,重连异常:[{}]", key, cul, exception.getMessage());
      needReconnect();
    } finally {
      tryReconnect.set(false);
    }
  }

  private void createWebSocketClient() {
    webSocketClient =
        new WebSocketClient(serverUri) {
          @Override
          public void onOpen(ServerHandshake serverHandshake) {
            log.info("[{}]ReConnectWebSocketClient [onOpen]连接成功{}", key, getRemoteSocketAddress());
            tryReconnect.set(false);
          }

          @Override
          public void onMessage(String text) {
            log.info("[{}]ReConnectWebSocketClient [onMessage]接收到服务端数据:text={}", key, text);
            msgStr.accept(text);
          }

          @Override
          public void onMessage(ByteBuffer bytes) {
            log.info("[{}]ReConnectWebSocketClient [onMessage]接收到服务端数据:bytes={}", key, bytes);
            msgByte.accept(bytes);
          }

          @Override
          public void onWebsocketPong(WebSocket conn, Framedata f) {
            log.info(
                "[{}]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode={}",
                key,
                f.getOpcode());
          }

          @Override
          public void onClose(int i, String s, boolean b) {
            log.info("[{}]ReConnectWebSocketClient [onClose]关闭,s={},b={}", key, s, b);
            if (StrUtil.hasBlank(s) || s.contains("https")) {
              if (end.get()) {
                return;
              }
              try {
                needReconnect();
              } catch (Exception exception) {
                endConsumer.accept("reconnect error");
                error.accept(exception);
              }
            }
          }

          @Override
          public void onError(Exception e) {
            log.info("[{}]ReConnectWebSocketClient [onError]异常,e={}", key, e);
            endConsumer.accept("error close");
            error.accept(e);
          }
        };
    if (serverUri.toString().contains("wss://")) {
      trustAllHosts(webSocketClient);
    }
  }

  public void circlePing() {
    new Thread(
            () -> {
              while (needPing.get()) {
                if (webSocketClient.isOpen()) {
                  webSocketClient.sendPing();
                }
                ThreadUtil.sleep(5, TimeUnit.SECONDS);
              }
              log.warn("[{}]Ping循环关闭", key);
            })
        .start();
  }

  /**
   * 连接
   *
   * @throws Exception 异常
   */
  public void connect() throws Exception {
    webSocketClient.connectBlocking(10, TimeUnit.SECONDS);
  }

  /**
   * 发送
   *
   * @param msg 消息
   * @throws Exception 异常
   */
  public void send(String msg) throws Exception {
    this.initReConnectReq = msg;
    if (webSocketClient.isOpen()) {
      webSocketClient.send(msg);
    }
  }

  /**
   * 关闭
   *
   * @param msg 关闭消息
   */
  public void close(String msg) {
    needPing.set(false);
    end.set(true);
    if (webSocketClient != null) {
      webSocketClient.closeConnection(3, msg);
    }
  }

  /**
   * 忽略证书
   *
   * @param client
   */
  public void trustAllHosts(WebSocketClient client) {
    TrustManager[] trustAllCerts =
        new TrustManager[] {
          new X509ExtendedTrustManager() {

            @Override
            public void checkClientTrusted(
                X509Certificate[] x509Certificates, String s, Socket socket)
                throws CertificateException {}

            @Override
            public void checkServerTrusted(
                X509Certificate[] x509Certificates, String s, Socket socket)
                throws CertificateException {}

            @Override
            public void checkClientTrusted(
                X509Certificate[] x509Certificates, String s, SSLEngine sslEngine)
                throws CertificateException {}

            @Override
            public void checkServerTrusted(
                X509Certificate[] x509Certificates, String s, SSLEngine sslEngine)
                throws CertificateException {}

            @Override
            public void checkClientTrusted(X509Certificate[] x509Certificates, String s)
                throws CertificateException {}

            @Override
            public void checkServerTrusted(X509Certificate[] x509Certificates, String s)
                throws CertificateException {}

            @Override
            public X509Certificate[] getAcceptedIssuers() {
              return null;
            }
          }
        };

    try {
      SSLContext ssl = SSLContext.getInstance("SSL");
      ssl.init(null, trustAllCerts, new java.security.SecureRandom());
      SSLSocketFactory socketFactory = ssl.getSocketFactory();
      client.setSocketFactory(socketFactory);
    } catch (Exception e) {
      log.error("ReConnectWebSocketClient trustAllHosts 异常,e={0}", e);
    }
  }

}

代码说明:

1、参数的重连次数可以配置。

2、增加异步pingpong线程,一旦结束连接会自动关闭。

3、对字符串、字节流、异常都有回调措施。

测试代码方法

  public static void main(String[] args) throws Exception {
    ReConnectWebSocketClient client =
        new ReConnectWebSocketClient(
            new URI(String.format("wss://192.168.1.77:24009")),
            "test",
            // 字符串消息处理
            msg -> {
              // todo 字符串消息处理
              System.out.println("字符串消息:" + msg);
            },
            null,
            // 异常回调
            error -> {
              // todo 字符串消息处理
              System.out.println("异常:" + error.getMessage());
            });
    client.connect();
    client.send("haha");
  }

验证结果

16:08:54.468 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onOpen]连接成功/192.168.1.77:24009
16:08:54.475 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onMessage]接收到服务端数据:text=connect success from tcp4:192.168.6.63:11018!
字符串消息:connect success from tcp4:192.168.6.63:11018!
16:08:56.080 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]关闭,s=,b=true
16:09:06.097 [WebSocketConnectReadThread-12] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]第[1]次断开重连
16:09:06.150 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onOpen]连接成功/192.168.1.77:24009
16:09:06.150 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onMessage]接收到服务端数据:text=connect success from tcp4:192.168.6.63:11038!
字符串消息:connect success from tcp4:192.168.6.63:11038!
16:09:09.369 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:14.370 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:19.371 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:24.379 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:29.382 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:34.398 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:39.402 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:44.404 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:49.415 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:54.429 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:59.437 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:10:04.449 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:10:06.154 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:10:09.455 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:10:14.462 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:10:19.468 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:10:19.644 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]关闭,s=,b=true
16:10:29.654 [WebSocketConnectReadThread-16] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]第[2]次断开重连
16:10:31.710 [WebSocketConnectReadThread-19] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onError]异常,e={}
java.net.ConnectException: Connection refused: connect
	at java.net.DualStackPlainSocketImpl.connect0(Native Method)
	at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:79)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:589)
	at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:673)
	at org.java_websocket.client.WebSocketClient.run(WebSocketClient.java:461)
	at java.lang.Thread.run(Thread.java:748)
16:10:31.710 [WebSocketConnectReadThread-19] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]关闭,s=error close,b=false
异常:Connection refused: connect
16:10:34.473 [Thread-0] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]Ping循环关闭

这里我才用的是手动关闭服务端方式触发,客户端被动断连情况。重连两次,第二次服务端还未启动导致异常触发。

总结

没啥好总结的,代码注释比较清楚。

如果对你有用,一健三连走一波!

java WebSocket客户端断线重连 | 实用代码框架「建议收藏」

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/157472.html原文链接:https://javaforall.net

(1)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • c语言三目运算符例子_单目运算符与双目运算符

    c语言三目运算符例子_单目运算符与双目运算符1.运算方向从左往右,从右往左结合,只有一个表达式被计算百度百科:三目运算符,又称条件运算符,是计算机语言(c,c++,java等)的重要组成部分。它是唯一有3个操作数的运算符,所以有时又称为三元运算符。一般来说,三目运算符的结合性是右结合的。对于条件表达式b?x:y,先计算条件b,然后进行判断。如果b的值为true,计算x的值,运算结果为x的值;否则,计算y的值,运算结果为y的值…

    2022年9月28日
    3
  • eclipse创建springboot项目的三种方法[通俗易懂]

    eclipse创建springboot项目的三种方法[通俗易懂]方法一安装STS插件安装插件导向窗口完成后,在eclipse右下角将会出现安装插件的进度,等插件安装完成后重启eclipse生效 新建springboot项目 项目启动 方法二1.创建Maven项目2.选择项目类型3.选择项目4.编写项目组和名称-finish即可5.修改pom.xml文件&lt;!–…

    2022年10月13日
    4
  • mac sh文件_android获取apk位置

    mac sh文件_android获取apk位置在嵌入百度地图SDK的时候需要配置“发布版SHA1”和“开发版SHA1”,这两个是不一样的。这里是基于mac电脑,开发工具androidstudio.1、首先打开Terminal窗口,会直接定位到当前的目录下。2、获取开发版的sha1,直接输入命令,获取到sha1:keytool-list-v-keystore~/.android/debug…

    2022年8月10日
    6
  • docker设置端口2375

    docker设置端口2375一、系统环境:   在Windows764位上,采用Vmwareworkstation12安装了CenOS7.564位。二、问题   在CentOS7.5里安装了Docker,启动docker服务,输入dockerversion,则出现错误信息:    CannotconnecttotheDockerdatemonattcp://0.0.0…

    2022年6月4日
    42
  • 2020美赛A题翻译

    2020美赛A题翻译对于一些生活在海洋里的物种,全球海洋温度影响它们栖息地的质量。当对于它们的后续繁衍来说温度改变太大的时候,这些物种就会迁徙去寻找新的栖息地去更好地安放当前和未来的生殖繁衍。一个例子就是美国缅因州的龙虾的种群,它缓慢地往加拿大北部迁徙,那六更低的温度提供一个合适的栖息地环境。这种地理上的人口转移可能会对那些依赖海洋生物稳定性的公司的生计造成严重的破坏。您的团队已被苏格兰北大西洋渔业…

    2022年5月25日
    35
  • 我要自学编程,Java和C语言相比哪个好?[通俗易懂]

    我要自学编程,Java和C语言相比哪个好?[通俗易懂]JavaJava是一种可以撰写跨平台应用软件的面向对象的程序设计语言。Java技术具有卓越的通用性、高效性、平台移植性和安全性,广泛应用于PC、数据中心、游戏控制台、科学超级计算机、移动电话和互联网,同时拥有全球最大的开发者专业社群。C语言学习C语言是一种计算机程序设计语言,属高级语言范畴。它既具有高级语言的特点,又具有汇编语言的特点。它可以作为工作系统设计语言,编写系统应用程序,也可以作为应用程序设计语言,编写不依赖计算机硬件的应用程序,代码清晰精简,十分灵活。语言没有好坏之分,无论学习哪个语言

    2022年7月7日
    37

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号