TIP

所谓心跳,即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包,通知对方自己还在线,以确保 TCP 连接的有效性。心跳包还有另一个作用,经常被忽略,即:一个连接如果长时间不用,防火墙或者路由器就会断开该连接。建议:将下面的代码敲一遍,对这个流程就有一个比较好的理解。

# 一、心跳检测核心Handler

Netty 中,实现心跳机制的关键是 IdleStateHandler,那么这个 Handler 如何使用呢? 先看下它的构造器:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
1
2
3

这里解释下三个参数的含义:
【1】readerIdleTimeSeconds: 读超时。即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLEIdleStateEvent 事件。
【2】writerIdleTimeSeconds: 写超时。即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLEIdleStateEvent 事件。
【3】allIdleTimeSeconds: 读/写超时。即当在指定的时间间隔内没有读或写操作时,会触发一个 ALL_IDLEIdleStateEvent 事件。

这三个参数默认的时间单位是秒。若需要指定其他时间单位,可以使用另一个构造方法:IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)

IdleStateHandler 的实现原理。相关链接

TIP

下面将使用 IdleStateHandler来实现心跳,Client端连接到 Server端后,会循环执行一个任务:随机等待几秒,然后 ping一下Server端,即发送一个心跳包。当等待的时间超过规定时间,将会发送失败,以为 Server端在此之前已经主动断开连接了。

# client端

ClientIdleStateTrigger(心跳触发器)类 ClientIdleStateTrigger也是一个 Handler,只是重写了userEventTriggered方法,用于捕获 IdleState.WRITER_IDLE事件(未在指定时间内向服务器发送数据),然后向 Server端发送一个心跳包。

package com.yunda.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;

/**
 * @description: 用于捕获{@link IdleState#WRITER_IDLE}事件(未在指定时间内向服务器发送数据),然后向Server端发送一个心跳包。
 * @author: zzx
 * @createDate: 2020/9/27
 * @version: 1.0
 */
@ChannelHandler.Sharable
public class ConnectorIdleStateTrigger extends ChannelInboundHandlerAdapter {

    private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
            CharsetUtil.UTF_8));

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.WRITER_IDLE) {
                // write heartbeat to server
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

接下来就是重点,我们需要写一个类,这个类可以观察链路是否断了,如果断了,进行循环的断线重连操作,ConnectionWatch,链路检测,完整代码如下:

package com.yunda.netty.adml;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
 * @description:重连检测,当发现当前的链路不稳定关闭之后,进行12次重连
 * @author: zzx
 * @createDate: 2020/9/27
 * @version: 1.0
 */
@ChannelHandler.Sharable
public class ConnectionWatch extends ChannelInboundHandlerAdapter implements TimerTask{
    /**
     * 日志
     */
    private static final Logger logger= LoggerFactory.getLogger(SubClientBootStart.class);

    /**
     * 建立连接的 Bootstrap  从 TCPClinet中获取
     */
    private final Bootstrap bootstrap;

    /**
     * JDK 自带定时任务类
     */
    private final Timer timer;

    /**
     * 端口,从 TCPClinet中获取
     */
    private final int port;

    /**
     * ip,从 TCPClinet中获取
     */
    private final String host;

    /**
     * 是否重连
     */
    private volatile boolean reconnect = true;

    /**
     * 重连次数
     */
    private int attempts;


    /**
     * 有参构造器
     */
    public ConnectionWatch(Bootstrap bootstrap, Timer timer, int port, String host, boolean reconnect) {
        this.bootstrap = bootstrap;
        this.timer = timer;
        this.port = port;
        this.host = host;
        this.reconnect = reconnect;
    }

    /**
     * channel链路每次active的时候,将其连接的次数重新☞ 0
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info("当前链路已经激活了,重连尝试次数重新置为0");
        attempts = 0;
        ctx.fireChannelActive();
    }

    /**
     * 当断开连接时调用该方法
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if(reconnect){
            logger.info("链接关闭,将进行重连");
            if (attempts < 12) {
                attempts++;
                //重连的间隔时间会越来越长
                int timeout = 2 << attempts;
                /**创建一个定时任务执行重连操作*/
                timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
            }
        }
        ctx.fireChannelInactive();
    }

    /**
     * 定时任务执行的方法
     */
    @Override
    public void run(Timeout timeout) throws Exception {

        ChannelFuture future;
        //bootstrap已经初始化好了,只需要将handler填入就可以了
        synchronized (bootstrap) {
            bootstrap.handler(new ChannelInitializer<Channel>() {

                @Override
                protected void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(handlers());
                }
            });
            future = bootstrap.connect(host,port);
        }
        //future对象
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                boolean succeed = f.isSuccess();

                //如果重连失败,则调用ChannelInactive方法,再次出发重连事件,一直尝试12次,如果失败则不再重连
                if (!succeed) {
                    logger.info("重连失败");
                    f.channel().pipeline().fireChannelInactive();
                }else{
                    logger.info("重连成功");
                }
            }
        });
    }

    /**
     * 钩子方法 , 存放所有的 handler类
     */
    public ChannelHandler[] handlers() {
       return null;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137

HeartBeatClientHandler:客户端注入的 handler业务处理类,主要用作业务处理。

package com.yunda.netty;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

import java.util.Date;

/**
 * @description: 客户端注入的handler类
 * @author: zzx
 * @createDate: 2020/9/27
 * @version: 1.0
 */
@ChannelHandler.Sharable
public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("激活时间是:"+new Date());
        System.out.println("HeartBeatClientHandler channelActive");
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("停止时间是:"+new Date());
        System.out.println("HeartBeatClientHandler channelInactive");
    }


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String message = (String) msg;
        System.out.println(message);
        if (message.equals("Heartbeat")) {
            ctx.write("has read message from server");
            ctx.flush();
        }
        ReferenceCountUtil.release(msg);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

HeartBeatsClient:TCP连接的客户端

package com.yunda.netty;

import com.yunda.netty.adml.ConnectionWatch;
import com.yunda.netty.adml.ConnectorIdleStateTrigger;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.HashedWheelTimer;

import java.util.concurrent.TimeUnit;

/**
 * @description:
 * @author: zzx
 * @createDate: 2020/9/27
 * @version: 1.0
 */
public class HeartBeatsClient {

    /**
     * 定时任务具体实现类
     */
    protected final HashedWheelTimer timer = new HashedWheelTimer();

    /**
     * 客户端启动 Bootstrap
     */
    private Bootstrap bootstrap;

    /**
     *  心跳检测触发器
     */
    private final ConnectorIdleStateTrigger idleStateTrigger = new ConnectorIdleStateTrigger();

    /**
     * 连接 服务端
     * @param port 端口
     * @param host ip
     * @throws Exception
     */
    public void connect(int port, String host) throws Exception {
        //NIO 非阻塞循环组
        EventLoopGroup group = new NioEventLoopGroup();
        //客户端 Bootstrap
        bootstrap = new Bootstrap();
        //添加组,线程类型,日志handler
        bootstrap.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO));

        //新建 监控类,将连接信息都传输过去
        final ConnectionWatch watchdog = new ConnectionWatch(bootstrap, timer, port,host, true) {
            @Override
            public ChannelHandler[] handlers() {
                return new ChannelHandler[] {
                        this,
                        new IdleStateHandler(4, 4, 4, TimeUnit.SECONDS),
                        idleStateTrigger,
                        new StringDecoder(),
                        new StringEncoder(),
                        new HeartBeatClientHandler()
                };
            }
        };

        //连接后的返回类
        ChannelFuture future;
        /**
         * 开始连接  初始化handler职责链
         */
        try {
            synchronized (bootstrap) {
                bootstrap.handler(new ChannelInitializer<Channel>() {
                    //初始化channel
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(watchdog.handlers());
                    }
                });
                //建立连接
                future = bootstrap.connect(host,port);
            }

            // 以上代码在synchronized同步块外面是安全的
            future.sync();
        } catch (Throwable t) {
            throw new Exception("connects to  fails", t);
        }
    }

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        int port = 1123;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 采用默认值
            }
        }
        new HeartBeatsClient().connect(port, "127.0.0.1");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110

# Server端

AcceptorIdleStateTrigger:断连触发器

package com.dahua.netty.zzx;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

/**
 * @description: 在规定时间内未收到客户端的任何数据包, 将主动断开该连接
 * @author: zzx
 * @createDate: 2020/9/27
 * @version: 1.0
 */
public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.READER_IDLE) {
                throw new Exception("idle exception");
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

HeartBeatServerHandler:服务器端的业务处理器

package com.dahua.netty.zzx;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @description: 打印客户端发送的消息
 * @author: zzx
 * @createDate: 2020/9/27
 * @version: 1.0
 */
public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server channelRead..");
        System.out.println(ctx.channel().remoteAddress() + "->Server :" + msg.toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

TestBootStrap:服务器端,new IdleStateHandler(5, 0, 0)handler代表如果在5秒内没有收到来自客户端的任何数据包(包括但不限于心跳包),将会主动断开与该客户端的连接。

package com.dahua.netty.zzx;

import com.dahua.netty.NettyServer;
import com.dahua.netty.zzx.AcceptorIdleStateTrigger;
import com.dahua.netty.zzx.HeartBeatServerHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.nio.charset.Charset;
import java.util.LinkedHashMap;
import java.util.concurrent.TimeUnit;

/**
 * @description:
 * @author: zzx
 * @createDate: 2020/9/16
 * @version: 1.0
 */
public class TestBootStrap {
    public static void main(String[] args) {
        /*-------------------------------测试代码,建立服务端---------------------------------*/
        //子系统接口对接,端口使用
        NettyServer nettyServer = new NettyServer(1123);
        LinkedHashMap<String, ChannelHandlerAdapter> subChannelHandlerAdapterHashMap =new LinkedHashMap<>();
        subChannelHandlerAdapterHashMap.put("idleStateHandler",new IdleStateHandler(5, 5, 5, TimeUnit.SECONDS));
        subChannelHandlerAdapterHashMap.put("acceptorIdleStateTrigger",new AcceptorIdleStateTrigger());
        subChannelHandlerAdapterHashMap.put("stringDecoder",new StringDecoder(Charset.forName("utf-8")));
        subChannelHandlerAdapterHashMap.put("stringEncoder",new StringEncoder(Charset.forName("utf-8")));
        subChannelHandlerAdapterHashMap.put("heartBeatServerHandler",new HeartBeatServerHandler());

        try {
            nettyServer.start(subChannelHandlerAdapterHashMap);
        } catch (Exception e) {
            e.printStackTrace();
        }
        /*----------------------------------测试代码结束-------------------------------------------*/
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

# 测试

首先启动客户端,再启动服务器端。启动完成后,在客户端的控制台上,可以看到打印如下类似日志:

日志

在服务器端可以看到控制台输出了类似如下的日志:

服务端日志

可以看到,客户端在发送4个心跳包后,第5个包因为等待时间较长,等到真正发送的时候,发现连接已断开了;而服务器端收到客户端的4个心跳数据包后,迟迟等不到下一个数据包,所以果断断开该连接。

# 异常情况

在测试过程中,有可能会出现如下情况:

异常

出现这种情况的原因是:在连接已断开的情况下,仍然向服务器端发送心跳包。虽然在发送心跳包之前会使用channel.isActive()判断连接是否可用,但也有可能上一刻判断结果为可用,但下一刻发送数据包之前,连接就断了。目前尚未找到优雅处理这种情况的方案。

# 二、断线重连

实现思路:客户端在监测到与服务器端的连接断开后,或者一开始就无法连接的情况下,使用指定的重连策略进行重连操作,直到重新建立连接或重试次数耗尽。对于如何监测连接是否断开,则是通过重写 ChannelInboundHandler#channelInactive来实现,当连接不可用,该方法会被触发,所以只需要在该方法做好重连工作即可。

代码实现:以下代码都是在上面心跳机制的基础上修改/添加的。因为断线重连是客户端的工作,所以只需对客户端代码进行修改。 重试策略

【1】RetryPolicy重试策略接口

public interface RetryPolicy {
    
    /**
        * Called when an operation has failed for some reason. This method should return
        * true to make another attempt.
        *
        * @param retryCount the number of times retried so far (0 the first time)
        * @return true/false
        */
    boolean allowRetry(int retryCount);
    
    /**
        * get sleep time in ms of current retry count.
        *
        * @param retryCount current retry count
        * @return the time to sleep
        */
    long getSleepTimeMs(int retryCount);
}2】`ExponentialBackOffRetry`重连策略的默认实现
``` java
/**
    * <p>Retry policy that retries a set number of times with increasing sleep time between retries</p>
*/
public class ExponentialBackOffRetry implements RetryPolicy {
    
    private static final int MAX_RETRIES_LIMIT = 29;
    private static final int DEFAULT_MAX_SLEEP_MS = Integer.MAX_VALUE;
    
    private final Random random = new Random();
    private final long baseSleepTimeMs;
    private final int maxRetries;
    private final int maxSleepMs;
    
    public ExponentialBackOffRetry(int baseSleepTimeMs, int maxRetries) {
        this(baseSleepTimeMs, maxRetries, DEFAULT_MAX_SLEEP_MS);
    }
    
    public ExponentialBackOffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) {
        this.maxRetries = maxRetries;
        this.baseSleepTimeMs = baseSleepTimeMs;
        this.maxSleepMs = maxSleepMs;
    }
    
    @Override
    public boolean allowRetry(int retryCount) {
        if (retryCount < maxRetries) {
            return true;
        }
        return false;
    }
    
    @Override
    public long getSleepTimeMs(int retryCount) {
        if (retryCount < 0) {
            throw new IllegalArgumentException("retries count must greater than 0.");
        }
        if (retryCount > MAX_RETRIES_LIMIT) {
            System.out.println(String.format("maxRetries too large (%d). Pinning to %d", maxRetries, MAX_RETRIES_LIMIT));
            retryCount = MAX_RETRIES_LIMIT;
        }
        long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << retryCount));
        if (sleepMs > maxSleepMs) {
            System.out.println(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, maxSleepMs));
            sleepMs = maxSleepMs;
        }
        return sleepMs;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

【3】ReconnectHandler重连处理器

@ChannelHandler.Sharable
public class ReconnectHandler extends ChannelInboundHandlerAdapter {
    
    private int retries = 0;
    private RetryPolicy retryPolicy;
    
    private TcpClient tcpClient;
    
    public ReconnectHandler(TcpClient tcpClient) {
        this.tcpClient = tcpClient;
    }
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Successfully established a connection to the server.");
        retries = 0;
        ctx.fireChannelActive();
    }
    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (retries == 0) {
            System.err.println("Lost the TCP connection with the server.");
            ctx.close();
        }
    
        boolean allowRetry = getRetryPolicy().allowRetry(retries);
        if (allowRetry) {
    
            long sleepTimeMs = getRetryPolicy().getSleepTimeMs(retries);
    
            System.out.println(String.format("Try to reconnect to the server after %dms. Retry count: %d.", sleepTimeMs, ++retries));
    
            final EventLoop eventLoop = ctx.channel().eventLoop();
            eventLoop.schedule(() -> {
                System.out.println("Reconnecting ...");
                tcpClient.connect();
            }, sleepTimeMs, TimeUnit.MILLISECONDS);
        }
        ctx.fireChannelInactive();
    }
    
    
    private RetryPolicy getRetryPolicy() {
        if (this.retryPolicy == null) {
            this.retryPolicy = tcpClient.getRetryPolicy();
        }
        return this.retryPolicy;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

【4】ClientHandlersInitializer: 在之前的基础上,添加了重连处理器ReconnectHandler

public class ClientHandlersInitializer extends ChannelInitializer<SocketChannel> {
    
    private ReconnectHandler reconnectHandler;
    private EchoHandler echoHandler;
    
    public ClientHandlersInitializer(TcpClient tcpClient) {
        Assert.notNull(tcpClient, "TcpClient can not be null.");
        this.reconnectHandler = new ReconnectHandler(tcpClient);
        this.echoHandler = new EchoHandler();
    }
    
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(this.reconnectHandler);
        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
        pipeline.addLast(new LengthFieldPrepender(4));
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new Pinger());
    }
}5】`TcpClient`:在之前的基础上添加重连、重连策略的支持。 
``` java
public class TcpClient {
    
    private String host;
    private int port;
    private Bootstrap bootstrap;
    /** 重连策略 */
    private RetryPolicy retryPolicy;
    /** 将<code>Channel</code>保存起来, 可用于在其他非handler的地方发送数据 */
    private Channel channel;
    
    public TcpClient(String host, int port) {
        this(host, port, new ExponentialBackOffRetry(1000, Integer.MAX_VALUE, 60 * 1000));
    }
    
    public TcpClient(String host, int port, RetryPolicy retryPolicy) {
        this.host = host;
        this.port = port;
        this.retryPolicy = retryPolicy;
        init();
    }
    
    /**
        * 向远程TCP服务器请求连接
        */
    public void connect() {
        synchronized (bootstrap) {
            ChannelFuture future = bootstrap.connect(host, port);
            future.addListener(getConnectionListener());
            this.channel = future.channel();
        }
    }
    
    public RetryPolicy getRetryPolicy() {
        return retryPolicy;
    }
    
    private void init() {
        EventLoopGroup group = new NioEventLoopGroup();
        // bootstrap 可重用, 只需在TcpClient实例化的时候初始化即可.
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ClientHandlersInitializer(TcpClient.this));
    }
    
    private ChannelFutureListener getConnectionListener() {
        return new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    future.channel().pipeline().fireChannelInactive();
                }
            }
        };
    }
    
    public static void main(String[] args) {
        TcpClient tcpClient = new TcpClient("localhost", 2222);
        tcpClient.connect();
    }
    
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86

【6】测试:在测试之前,为了避开 Connection reset by peer 异常,可以稍微修改Pingerping()方法,添加if (second == 5)的条件判断。如下:

private void ping(Channel channel) {
    int second = Math.max(1, random.nextInt(baseRandom));
    if (second == 5) {
        second = 6;
    }
    System.out.println("next heart beat will send after " + second + "s.");
    ScheduledFuture<?> future = channel.eventLoop().schedule(new Runnable() {
        @Override
        public void run() {
            if (channel.isActive()) {
                System.out.println("sending heart beat to the server...");
                channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT);
            } else {
                System.err.println("The connection had broken, cancel the task that will send a heart beat.");
                channel.closeFuture();
                throw new RuntimeException();
            }
        }
    }, second, TimeUnit.SECONDS);

    future.addListener(new GenericFutureListener() {
        @Override
        public void operationComplete(Future future) throws Exception {
            if (future.isSuccess()) {
                ping(channel);
            }
        }
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# 启动客户端

先只启动客户端,观察控制台输出,可以看到类似如下日志: 客户端控制台输出

日志

可以看到,当客户端发现无法连接到服务器端,所以一直尝试重连。随着重试次数增加,重试时间间隔越大,但又不想无限增大下去,所以需要定一个阈值,比如60s。如上图所示,当下一次重试时间超过60s时,会打印Sleep extension too large(*). Pinning to 60000,单位为ms。出现这句话的意思是,计算出来的时间超过阈值(60s),所以把真正睡眠的时间重置为阈值(60s)。

# 启动服务器端

接着启动服务器端,服务器端启动后客户端控制台输出。然后继续观察客户端控制台输出。

日志

可以看到,在第9次重试失败后,第10次重试之前,启动的服务器,所以第10次重连的结果为Successfully established a connection to the server.,即成功连接到服务器。接下来因为还是不定时ping服务器,所以出现断线重连、断线重连的循环。

TIP

在不同环境,可能会有不同的重连需求。有不同的重连需求的,只需自己实现RetryPolicy接口,然后在创建TcpClient的时候覆盖默认的重连策略即可。

(adsbygoogle = window.adsbygoogle || []).push({});