Контейнер соединителя в основном отвечает за анализ запросов сокетов.Исходный код в tomcat расположен по путям пакетов org.apache.catalina.connector и org.apache.coyote; из анализа предыдущих двух разделов мы знаем, что соединитель подконтейнер службы, а служба также является подконтейнером сервера. Настраивается в файле server.xml, а затем создается через Digester в классе Catalina. Два типа реализации коннектора настроены по умолчанию в файле server.xml, которые используются для обработки запросов Http и запросов AJP соответственно. На самом деле существует три реализации Connector:
1. Http-коннектор: анализирует HTTP-запросы и делится на Http-коннектор BIO и Http-коннектор NIO, а именно блокирующий IO-коннектор и неблокирующий IO-коннектор. В этой статье в основном анализируется процесс реализации NIO Http Connector.
2. AJP: основанный на протоколе AJP, это настраиваемый протокол для связи между Tomcat и HTTP-сервером, который может обеспечить более высокую скорость и эффективность связи. Этот протокол используется при интеграции с сервером Apache.
3. HTTP-коннектор APR: реализован на C и вызывается через JNI. Это в основном улучшает производительность доступа к статическим ресурсам (таким как HTML, изображения, CSS, JS и т. д.).
Какой соединитель использовать, можно настроить с помощью свойства протокола в файле server.xml следующим образом:
<Connector port="8080" protocol="org.apache.coyote.http11.Http11AprProtocol"
connectionTimeout="20000"
redirectPort="8443" />
Затем взгляните на конструктор соединителя:
//默认connector为HTTP/1.1 NIO
public Connector() {
this("org.apache.coyote.http11.Http11NioProtocol");
}
//根据protocol实现Connector
public Connector(String protocol) {
boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
AprLifecycleListener.getUseAprConnector();
if ("HTTP/1.1".equals(protocol) || protocol == null) {
if (aprConnector) {
protocolHandlerClassName = "org.apache.coyote.http11.Http11AprProtocol";
} else {
protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol";
}
} else if ("AJP/1.3".equals(protocol)) {
if (aprConnector) {
protocolHandlerClassName = "org.apache.coyote.ajp.AjpAprProtocol";
} else {
protocolHandlerClassName = "org.apache.coyote.ajp.AjpNioProtocol";
}
} else {
protocolHandlerClassName = protocol;
}
// 通过反射实例化一个protocolHandle,之后对请求数据的解析都由该protocolHandle完成,例如Http11AprProtocol
ProtocolHandler p = null;
try {
Class<?> clazz = Class.forName(protocolHandlerClassName);
p = (ProtocolHandler) clazz.getConstructor().newInstance();
} catch (Exception e) {
log.error(sm.getString(
"coyoteConnector.protocolHandlerInstantiationFailed"), e);
} finally {
this.protocolHandler = p;
}
// Default for Connector depends on this system property
setThrowOnFailure(Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE"));
}
Анализируя исходный код конструктора коннектора, мы можем узнать, что каждому коннектору соответствует обработчик протоколов, который предназначен для прослушивания сетевых запросов на определенном порту сервера, но не отвечает за обработку запросов (обработка запросов завершается компонентом «Контейнер»). Давайте возьмем Http11NioProtocol в качестве примера для анализа процесса разбора HTTP-запросов.
ProtocolHandler запускается в методе startInterval соединителя, и код выглядит следующим образом:
protected void startInternal() throws LifecycleException {
// Validate settings before starting
if (getPort() < 0) {
throw new LifecycleException(sm.getString(
"coyoteConnector.invalidPort", Integer.valueOf(getPort())));
}
setState(LifecycleState.STARTING);
try {
protocolHandler.start(); //启动protocolHandler
} catch (Exception e) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);
}
}
Http11NioProtocol создает экземпляр org.apache.tomcat.util.net.NioEndpoint, а затем делегирует работу по прослушиванию порта и разбору запроса реализации NioEndpoint. Tomcat проектирует в общей сложности три потока при использовании Http11NioProtocol для анализа HTTP-запросов, а именно Acceptor, Poller и Worker.
1. Акцепторная резьба
Acceptor реализует интерфейс Runnable. По его имени мы знаем, что он является приемником и отвечает за получение сокетов. Его метод приема — serverSocket.accept(), получает объект SocketChannel, а затем инкапсулирует его в настроенную для tomcat организацию. .apache.tomcat.util .net.NioChannel. Хоть это и Nio, но он все же использует традиционный метод при получении сокетов, который реализуется блокировкой. Акцептор создается и управляется в виде пула потоков.Запуск Акцептора осуществляется в методе startInternal() NioEndpoint.Исходный код выглядит следующим образом:
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());
// Create worker collection
if (getExecutor() == null) {
createExecutor();
}
//设置最大连接数,默认值为maxConnections = 10000,通过同步器AQS实现。
initializeConnectionLatch();
//创建、配置并启动线程Pooler
pollers = new Poller[getPollerThreadCount()];
for (int i = 0; i < pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-" + i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
startAcceptorThreads(); //启动Acceptor线程
}
}
Продолжайте отслеживать исходный код startAcceptorThreads.
protected final void startAcceptorThreads() {
int count = getAcceptorThreadCount(); //默认值为1
acceptors = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
Acceptor<U> acceptor = new Acceptor<>(this);
String threadName = getName() + "-Acceptor-" + i;
acceptor.setThreadName(threadName);
acceptors.add(acceptor);
Thread t = new Thread(acceptor, threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}
Основной код потока Acceptor находится в его методе запуска.
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
while (endpoint.isRunning()) {
// endpoint阻塞
while (endpoint.isPaused() && endpoint.isRunning()) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
if (!endpoint.isRunning()) {
break;
}
state = AcceptorState.RUNNING;
try {
//连接数到达最大值时,await等待释放connection,在Endpoint的startInterval方法中设置了最大连接数
endpoint.countUpOrAwaitConnection();
// Endpoint might have been paused while waiting for latch
// If that is the case, don't accept new connections
if (endpoint.isPaused()) {
continue;
}
//U是一个socketChannel
U socket = null;
try {
//接收socket请求
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
// We didn't get a socket
endpoint.countDownConnection();
if (endpoint.isRunning()) {
errorDelay = handleExceptionWithDelay(errorDelay);
throw ioe;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (endpoint.isRunning() && !endpoint.isPaused()) {
// endpoint的setSocketOptions方法对socket进行配置
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
String msg = sm.getString("endpoint.accept.fail");
// APR specific.
// Could push this down but not sure it is worth the trouble.
if (t instanceof Error) {
Error e = (Error) t;
if (e.getError() == 233) {
// Not an error on HP-UX so log as a warning
// so it can be filtered out on that platform
// See bug 50273
log.warn(msg, t);
} else {
log.error(msg, t);
}
} else {
log.error(msg, t);
}
}
}
state = AcceptorState.ENDED;
}
Акцептор завершает прием запроса сокета, а затем передает его NioEndpoint для настройки и продолжает отслеживать метод setSocketOptions конечной точки.
protected boolean setSocketOptions(SocketChannel socket) {
try {
//设置为非阻塞
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
NioChannel channel = nioChannels.pop();
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
getPoller0().register(channel); //调用Poller的register方法,完成channel的注册。
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error("", t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
Анализируя исходный код setSocketOptions, мы можем узнать, что основная функция этого метода заключается в том, чтобы использовать входящие параметры SocketChannel для генерации SecureNioChannel или NioChannel, а затем зарегистрировать его в селекторе потока Poller.Вы можете узнать больше о Java nio и иметь более глубокое понимание этого содержания.
2. Пул или нить
Pollor также реализует интерфейс Runnable, который является внутренним классом класса NioEndpoint. Поток опроса создается, настраивается и запускается в методе startInterval конечной точки, как показано в листинге 4. Основная обязанность Poolor — постоянно опрашивать свой селектор, проверять готовые сокеты (с данными, доступными для чтения или записи) и реализовывать мультиплексирование ввода-вывода. Его конструктор инициализирует селектор.
public Poller() throws IOException {
this.selector = Selector.open();
}
При анализе Acceptor упоминается, что после того, как Acceptor получает запрос сокета, он вызывает метод setSocketOptions NioEndpoint (листинг 6), который генерирует NioChannel, а затем вызывает метод register Pollor для генерации PoolorEvent и добавления его в очередь событий. исходный код метода регистрации выглядит следующим образом:
public void register(final NioChannel socket) {
socket.setPoller(this);
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(ka);
ka.setPoller(this);
ka.setReadTimeout(getConnectionTimeout());
ka.setWriteTimeout(getConnectionTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
PollerEvent r = eventCache.pop();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
//生成PoolorEvent并加入到Eventqueue
if (r == null) r = new PollerEvent(socket, ka, OP_REGISTER);
else r.reset(socket, ka, OP_REGISTER);
addEvent(r);
}
Основной код Pollor также находится в методе run.
public void run() {
// 调用了destroy()方法后终止此循环
while (true) {
boolean hasEvents = false;
try {
if (!close) {
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
//if we are here, means we have other stuff to do
//非阻塞的 select
keyCount = selector.selectNow();
} else {
//阻塞selector,直到有准备就绪的socket
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
//该方法遍历了eventqueue中的所有PollorEvent,然后依次调用PollorEvent的run,将socket注册到selector中。
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("", x);
continue;
}
//either we timed out or we woke up, process events first
if (keyCount == 0) hasEvents = (hasEvents | events());
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// 遍历就绪的socket
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper) sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
//调用processKey方法对有数据读写的socket进行处理,在分析Worker线程时会分析该方法
iterator.remove();
processKey(sk, attachment);
}
}
//process timeouts
timeout(keyCount, hasEvents);
}//while
getStopLatch().countDown();
}
Метод событий вызывается в методе запуска:
public boolean events() {
boolean result = false;
PollerEvent pe = null;
for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++) {
result = true;
try {
pe.run(); //将pollerEvent中的每个socketChannel注册到selector中
pe.reset();
if (running && !paused) {
eventCache.push(pe); //将注册了的pollerEvent加到endPoint.eventCache
}
} catch (Throwable x) {
log.error("", x);
}
}
return result;
}
Продолжайте следить за методом запуска PollerEvent:
public void run() {
if (interestOps == OP_REGISTER) {
try {
//将SocketChannel注册到selector中,注册时间为SelectionKey.OP_READ读事件
socket.getIOChannel().register(
socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
} else {
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
if (key == null) {
socket.socketWrapper.getEndpoint().countDownConnection();
} else {
final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
if (socketWrapper != null) {
//we are registering the key to start with, reset the fairness counter.
int ops = key.interestOps() | interestOps;
socketWrapper.interestOps(ops);
key.interestOps(ops);
} else {
socket.getPoller().cancelledKey(key);
}
}
} catch (CancelledKeyException ckx) {
try {
socket.getPoller().cancelledKey(key);
} catch (Exception ignore) {
}
}
}
}
3. Рабочий поток
Рабочий поток, SocketProcessor, используется для обработки запросов Socket. SocketProcessor также является внутренним классом Endpoint. В методе run Pollor (листинг 8) при отслеживании готового сокета для обработки вызывается метод processKey:
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if (close) {
cancelledKey(sk);
} else if (sk.isValid() && attachment != null) {
//有读写事件就绪时
if (sk.isReadable() || sk.isWritable()) {
if (attachment.getSendfileData() != null) {
processSendfile(sk, attachment, false);
} else {
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// socket可读时,先处理读事件
if (sk.isReadable()) {
//调用processSocket方法进一步处理
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
//写事件
if (!closeSocket && sk.isWritable()) {
//调用processSocket方法进一步处理
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
} else {
//invalid key
cancelledKey(sk);
}
} catch (CancelledKeyException ckx) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("", t);
}
}
Продолжайте отслеживать метод processSocket:
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
// 尝试循环利用之前回收的SocketProcessor对象,如果没有可回收利用的则
// 创建新的SocketProcessor对象
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
创建SocketProcessor,即Worker线程,基于线程池模式进行创建和管理
sc = createSocketProcessor(socketWrapper, event);
} else {
// 循环利用回收的SocketProcessor对象
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
//SocketProcessor实现了Runneble接口,可以直接传入execute方法进行处理
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
//NioEndpoint中createSocketProcessor创建一个SocketProcessor。
protected SocketProcessorBase<NioChannel> createSocketProcessor(
SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
return new SocketProcessor(socketWrapper, event);
}
Резюме: Http11NioProtocol реализован на основе Java Nio, создавая потоки Acceptor, Pollor и Worker для достижения мультиплексирования ввода-вывода. Взаимосвязь между тремя типами потоков показана на следующем рисунке:
Отношения между Acceptor и Pollor представляют собой режим производитель-потребитель: Acceptor постоянно добавляет PollorEvent в EventQueue, Pollor опрашивает, чтобы проверить готовность PollorEvent в EventQueue, а затем отправляет его в рабочий поток для обработки.После анализа соединителя в следующей статье будет продолжен анализ другого основного компонента, соединителя.
анализ исходного кода tomcat (первый анализ исходного кода после tomcat (первый, который начинается с общей архитектуры))
Анализ исходного кода tomcat (подробное объяснение второго процесса запуска tomcat)
Анализ исходного кода tomcat (Часть 4 анализа принципа обработки запросов tomcat — Анализ исходного кода контейнера)