分析 rust 实现 TCP idle 连接池

简介

通常用 C 语言实现 TCP idle 连接池,是将 idle fd 放到 epoll_wait 中等待事件通知(对端主动断开链接等事件)。而更高级的语言如 go/rust 如果照搬 epoll_wait 实现,获取 inner fd 会失去语言封装的特性。

最近在阅读开源项目源码的时候,看到了 rust 实现的 ureq 库中的 TCP idle 连接池的实现,可以当作高级语言实现连接池的参考。

结构体

ureq 连接池的实现在 pool.rs 中,连接池结构体定义:

pub(crate) struct ConnectionPool {
    inner: Mutex<Inner>,
    max_idle_connections: usize,
    max_idle_connections_per_host: usize,
}

struct Inner {
    // the actual pooled connection. however only one per hostname:port.
    recycle: HashMap<PoolKey, VecDeque<Stream>>,
    // This is used to keep track of which streams to expire when the
    // pool reaches MAX_IDLE_CONNECTIONS. The corresponding PoolKeys for
    // recently used Streams are added to the back of the queue;
    // old streams are removed from the front.
    lru: VecDeque<PoolKey>,
}

空闲连接由 HashMap<PoolKey, VecDeque<String>> 存放,host:port 作为 key,连接存放到队列中。

空闲连接获取

ureq crate 从 connect_socket 接口获取 TCP 连接,如果 use_pooled 参数传递 true,就从连接池中获取连接。

/// Connect the socket, either by using the pool or grab a new one.
fn connect_socket(unit: &Unit, hostname: &str, use_pooled: bool) -> Result<(Stream, bool), Error> {
    ...

    if use_pooled {
        let pool = &unit.agent.state.pool;
        let proxy = &unit.agent.config.proxy;
        // The connection may have been closed by the server
        // due to idle timeout while it was sitting in the pool.
        // Loop until we find one that is still good or run out of connections.
        while let Some(stream) = pool.try_get_connection(&unit.url, proxy.clone()) {
            let server_closed = stream.server_closed()?;
            if !server_closed {
                return Ok((stream, true));
            }
            debug!("dropping stream from pool; closed by server: {:?}", stream);
        }
    }
    let stream = match unit.url.scheme() {
        "http" => stream::connect_http(unit, hostname),
        "https" => stream::connect_https(unit, hostname),
        "test" => connect_test(unit),
        scheme => Err(ErrorKind::UnknownScheme.msg(format!("unknown scheme {}", scheme))),
    };
    Ok((stream?, false))
}

函数循环从池子中获取连接,并调用 server_closed 判断空闲连接是否可用(没有被对端断开,没有残留数据)。

空闲连接需要判断是否断开的逻辑:

    // Check if the server has closed a stream by performing a one-byte
    // non-blocking read. If this returns EOF, the server has closed the
    // connection: return true. If this returns a successful read, there are
    // some bytes on the connection even though there was no inflight request.
    // For plain HTTP streams, that might mean an HTTP 408 was pushed; it
    // could also mean a buggy server that sent more bytes than a response's
    // Content-Length. For HTTPS streams, that might mean a close_notify alert,
    // which is the proper way to shut down an idle stream.
    // Either way, bytes available on the stream before we've made a request
    // means the stream is not usable, so we should discard it.
    // If this returns WouldBlock (aka EAGAIN),
    // that means the connection is still open: return false. Otherwise
    // return an error.
    fn serverclosed_stream(stream: &std::net::TcpStream) -> io::Result<bool> {
        let mut buf = [0; 1];
        stream.set_nonblocking(true)?;

        let result = match stream.peek(&mut buf) {
            Ok(n) => {
                debug!(
                    "peek on reused connection returned {}, not WouldBlock; discarding",
                    n
                );
                Ok(true)
            }
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => Ok(false),
            Err(e) => Err(e),
        };
        stream.set_nonblocking(false)?;

        result
    }

将 stream 设置成非 blocking,调用 peak 判断是否对端已经断开连接或者有残留数据。

参考

https://github.com/algesten/ureq/blob/main/src/unit.rs

https://github.com/algesten/ureq/blob/main/src/stream.rs