分析 rust 实现 TCP idle 连接池
分析 rust 实现 TCP idle 连接池
简介
通常用 C 语言实现 TCP idle 连接池,是将 idle fd 放到 epoll_wait 中等待事件通知(对端主动断开链接等事件)。而更高级的语言如 go/rust 如果照搬 epoll_wait 实现,获取 inner fd 会失去语言封装的特性。
最近在阅读开源项目源码的时候,看到了 rust 实现的 ureq 库中的 TCP idle 连接池的实现,可以当作高级语言实现连接池的参考。
结构体
ureq 连接池的实现在 pool.rs 中,连接池结构体定义:
pub(crate) struct ConnectionPool {
inner: Mutex<Inner>,
max_idle_connections: usize,
max_idle_connections_per_host: usize,
}
struct Inner {
// the actual pooled connection. however only one per hostname:port.
recycle: HashMap<PoolKey, VecDeque<Stream>>,
// This is used to keep track of which streams to expire when the
// pool reaches MAX_IDLE_CONNECTIONS. The corresponding PoolKeys for
// recently used Streams are added to the back of the queue;
// old streams are removed from the front.
lru: VecDeque<PoolKey>,
}
空闲连接由 HashMap<PoolKey, VecDeque<String>>
存放,host:port
作为 key,连接存放到队列中。
空闲连接获取
ureq crate 从 connect_socket
接口获取 TCP 连接,如果 use_pooled
参数传递 true
,就从连接池中获取连接。
/// Connect the socket, either by using the pool or grab a new one.
fn connect_socket(unit: &Unit, hostname: &str, use_pooled: bool) -> Result<(Stream, bool), Error> {
...
if use_pooled {
let pool = &unit.agent.state.pool;
let proxy = &unit.agent.config.proxy;
// The connection may have been closed by the server
// due to idle timeout while it was sitting in the pool.
// Loop until we find one that is still good or run out of connections.
while let Some(stream) = pool.try_get_connection(&unit.url, proxy.clone()) {
let server_closed = stream.server_closed()?;
if !server_closed {
return Ok((stream, true));
}
debug!("dropping stream from pool; closed by server: {:?}", stream);
}
}
let stream = match unit.url.scheme() {
"http" => stream::connect_http(unit, hostname),
"https" => stream::connect_https(unit, hostname),
"test" => connect_test(unit),
scheme => Err(ErrorKind::UnknownScheme.msg(format!("unknown scheme {}", scheme))),
};
Ok((stream?, false))
}
函数循环从池子中获取连接,并调用 server_closed
判断空闲连接是否可用(没有被对端断开,没有残留数据)。
空闲连接需要判断是否断开的逻辑:
// Check if the server has closed a stream by performing a one-byte
// non-blocking read. If this returns EOF, the server has closed the
// connection: return true. If this returns a successful read, there are
// some bytes on the connection even though there was no inflight request.
// For plain HTTP streams, that might mean an HTTP 408 was pushed; it
// could also mean a buggy server that sent more bytes than a response's
// Content-Length. For HTTPS streams, that might mean a close_notify alert,
// which is the proper way to shut down an idle stream.
// Either way, bytes available on the stream before we've made a request
// means the stream is not usable, so we should discard it.
// If this returns WouldBlock (aka EAGAIN),
// that means the connection is still open: return false. Otherwise
// return an error.
fn serverclosed_stream(stream: &std::net::TcpStream) -> io::Result<bool> {
let mut buf = [0; 1];
stream.set_nonblocking(true)?;
let result = match stream.peek(&mut buf) {
Ok(n) => {
debug!(
"peek on reused connection returned {}, not WouldBlock; discarding",
n
);
Ok(true)
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => Ok(false),
Err(e) => Err(e),
};
stream.set_nonblocking(false)?;
result
}
将 stream 设置成非 blocking,调用 peak
判断是否对端已经断开连接或者有残留数据。