background
netty — это платформа уровня асинхронной сетевой связи, управляемая событиями, и ее официальная документация объясняется как
Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.
Мы выбрали netty в качестве базовой структуры сетевой коммуникации в Sailfish, системе рассылки новостей Университета Синьмей (средняя ежедневная отправка 5 миллиардов сообщений), и shark, системе оптимизации мобильных прокси Университета Синьмэй (средняя ежедневная пропускная способность 3). млрд).
Поскольку netty используется в основе двух таких важных систем, необходимо знать механизм netty и даже исходный код, поэтому родилась серия статей об исходном коде netty. Позже я представлю вам то, что я узнал из исходного кода netty без оговорок через серию тем, исходный код основан на 4.1.6.Final
why netty
Нижний уровень netty основан на JDK NIO, почему бы нам напрямую не использовать JDK NIO или другие фреймворки NIO? Вот что я придумал
1. Использование nio, поставляемого с jdk, требует понимания слишком многих концепций, а программирование сложное. 2. Базовая модель ввода-вывода netty может быть переключена по желанию, и все это требует лишь незначительных изменений. 3. Собственная распаковка и распаковка Netty, обнаружение аномалий и другие механизмы освобождают вас от тяжелых деталей nio, так что вам нужно заботиться только о бизнес-логике 4.netty решает многие ошибки jdk, включая обучение пустой ротации 5. Нижний слой netty сделал множество небольших оптимизаций для потоков и селекторов, а тщательно разработанные потоки-реакторы обеспечивают очень эффективную параллельную обработку. 6. Поставляется с различными стеками протоколов, что позволяет вам работать с любым общим протоколом без необходимости делать это самостоятельно. 7. Сообщество netty активно, список рассылки или проблема всякий раз, когда вы сталкиваетесь с проблемами 8. Netty прошла обширную проверку в Интернете основных инфраструктур RPC, промежуточного программного обеспечения сообщений и промежуточного программного обеспечения распределенной связи, и ее надежность чрезвычайно высока.
dive into netty
Зная так много, сегодня мы начнем наше путешествие по исходному коду netty с примера.
В этой статье в основном описывается, как netty связывает порты и запускает службы. В процессе запуска сервиса вы узнаете об основных основных компонентах netty, я не буду подробно рассказывать об этих компонентах, но расскажу, как основные компоненты нанизаны вместе, чтобы сформировать ядро netty.
example
Вот очень простой код запуска сервера
public final class SimpleServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new SimpleServerHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private static class SimpleServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded");
}
}
}
Несколько строк кода могут запустить сервер, порт привязан к 8888 и используется режим nio.Подробности обработки каждого шага описаны ниже.
EventLoopGroup
я был вДругие статьиОн был подробно проанализирован в , и, грубо говоря, это бесконечный цикл, постоянно обнаруживающий события ввода-вывода, обрабатывая события ввода-вывода и выполняя задачи.
ServerBootstrap
Это вспомогательный класс запуска сервера, который связывает порт для запуска службы, устанавливая для него ряд параметров.
group(bossGroup, workerGroup)
Нам нужны два типа людей для работы, один босс, а другой рабочий.Босс отвечает за то, чтобы забрать работу со стороны, и передать полученную работу рабочим, положить ее сюда,bossGroup
Роль состоит в том, чтобы постоянно принимать новые соединения и создавать новые соединения дляworkerGroup
иметь дело с
.channel(NioServerSocketChannel.class)
Указывает, что сервер запускает канал, связанный с нио. Канал является основной концепцией в Netty. Можно понимать, что канал представляет собой соединение или действие на стороне сервера, которое будет подробно описано позже.
.handler(new SimpleServerHandler()
Указывает, какие процессы необходимо пройти в процессе запуска сервера, здесьSimpleServerHandler
Окончательный интерфейс верхнего уровняChannelHander
, является основной концепцией netty, представляющей процессор, через который проходит поток данных, который можно понимать как каждый уровень конвейера.
childHandler(new ChannelInitializer<SocketChannel>)...
Указывает, что делать после прихода нового соединения, то есть, как было сказано выше, как начальник распределяет рабочие места
ChannelFuture f = b.bind(8888).sync();
Вот реальный процесс запуска, привяжите порт 8888 и подождите, пока сервер запустится, прежде чем вводить код нисходящего канала.
f.channel().closeFuture().sync();
Ожидание закрытия сокета сервером
bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully();
Закрыть два набора бесконечных циклов
Код можно легко повторно запустить локально, окончательный вывод в консоль:
handlerAdded
channelRegistered
channelActive
Что касается того, почему они выводятся последовательно, на самом деле это очень просто после глубокого анализа.
Подробная информация
ServerBootstrap
О серии настроек параметров говорить, собственно, не о чем, это не что иное, как использованиеmethod chainingспособ сохранения параметров, необходимых для запуска сервера, в файл. Наше внимание падает на следующий код
b.bind(8888).sync();
Вот слово: мы только начали смотреть на исходный код. Если мы не так ясны в деталях, мы можем использовать функцию отладки IDE, шаг за шагом, один шаг, один тест или метод двухточечного теста, чтобы определить, какая строка кода является последней записью для запуска службы. , здесь мы определили, что метод привязки является записью, мы следим, анализируем
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
Создав номер портаInetSocketAddress
Затем продолжайте связывать
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
validate()
Проверьте необходимые параметры, необходимые для запуска службы, а затем вызовитеdoBind()
private ChannelFuture doBind(final SocketAddress localAddress) {
//...
final ChannelFuture regFuture = initAndRegister();
//...
final Channel channel = regFuture.channel();
//...
doBind0(regFuture, channel, localAddress, promise);
//...
return promise;
}
Здесь я убрал мелочи и сосредоточился на основном методе.На самом деле есть два ядра.initAndRegister()
,а такжеdoBind0()
На самом деле из названия метода уже видно, init->initialization, register->registration, так что же именно нужно регистрировать? Обратитесь в регистрацию поллера в nio, может быть что-то прописать на селекторе после его инициализации, и ваще привязать, вроде привязать номер порта локально, с этими догадками давайте углубимся
initAndRegister()
final ChannelFuture initAndRegister() {
Channel channel = null;
// ...
channel = channelFactory.newChannel();
//...
init(channel);
//...
ChannelFuture regFuture = config().group().register(channel);
//...
return regFuture;
}
Мы по-прежнему сосредоточены на основном коде, оставляя в стороне остатки, как видим.initAndRegister()
сделал несколько вещей
1. новый канал
2. запустите этот канал
3. Зарегистрируйте этот канал на объекте
Разбираем эти три вещи шаг за шагом
1. новый канал
Сначала мы должны понять определение канала.Официальное описание канала Нетти выглядит следующим образом.
A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind
Поскольку канал здесь создается при запуске службы, мы можем соответствовать ServerSocket в обычном программировании Socket, представляя конвейер, который проходит, когда сервер привязан.
Мы обнаружили, что этот канал проходит черезchannelFactory
новинка,channelFactory
Интерфейс очень простой
public interface ChannelFactory<T extends Channel> extends io.netty.bootstrap.ChannelFactory<T> {
/**
* Creates a new channel.
*/
@Override
T newChannel();
}
Это способ просмотра назначенного места channelFactory
AbstractBootstrap.java
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return (B) this;
}
Назначенный здесь, мы откатились слой за слоем, посмотрели, где была вызвана функция, и обнаружили, что именно в этой функции ChannelFactory была новой.
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
Здесь наша демонстрационная программа вызываетchannel(channelClass)
метод,channelClass
в видеReflectiveChannelFactory
Конструктор создаетReflectiveChannelFactory
Код на демо-стороне выглядит следующим образом:
.channel(NioServerSocketChannel.class);
Затем вернитесь к началу этого раздела.
channelFactory.newChannel();
Мы можем сделать вывод, что последний вызовReflectiveChannelFactory.newChannel()
метод, продолжение
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
}
Видетьclazz.newInstance();
, мы понимаем, что объект создается отражением, и этот класс — это то, что у нас есть вServerBootstrap
входящийNioServerSocketChannel.class
В результате после круга окончательное создание канала эквивалентно вызову конструктора по умолчанию new для создания нового каналаNioServerSocketChannel
объект
Здесь упоминается, что есть два способа прочитать детали исходного кода. Один из них является возвращение. Например, когда используется объект, он может быть прослеженный слой по слою, а блок кода, который был изначально создан для объекта, будет Найдено. Одним из способов является сверху вниз, слоистый анализ, обычно используется для анализа определенного способа и, наконец, сплавливая полный процесс
Далее мы можем сосредоточиться наNioServerSocketChannel
конструктор по умолчанию для
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
//...
return provider.openServerSocketChannel();
}
пройти черезSelectorProvider.openServerSocketChannel()
Создайте канал на стороне сервера, а затем введите следующий метод
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
Первая строка кода здесь идет к родительскому классу, а вторая строка, новая выходитNioServerSocketChannelConfig
, чей интерфейс верхнего уровняChannelConfig
, официальное описание netty выглядит следующим образом
A set of configuration properties of a Channel.
В принципе, можно определить, чтоChannelConfig
Это также основной модуль ядра в netty. Впервые глядя на исходный код, когда мы видим это, нам не нужно глубоко копаться в этом объекте, а возвращаться к нему, когда мы его используем. Просто помните, что этот объект создается.NioServerSocketChannel
объект создается, когда
мы продолжаем отслеживатьNioServerSocketChannel
родительский класс
AbstractNioMessageChannel.java
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
продолжай преследовать
AbstractNioChannel.java
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
//...
ch.configureBlocking(false);
//...
}
Здесь просто поставьте переднюю частьprovider.openServerSocketChannel();
созданныйServerSocketChannel
сохранить в переменную-член, а затем вызватьch.configureBlocking(false);
Установите канал в неблокирующий режим, стандартный метод программирования jdk nio.
здесьreadInterestOp
То есть предыдущие слои входящегоSelectionKey.OP_ACCEPT
, а затем сосредоточьтесь на анализеsuper(parent);
(родительский элемент здесь на самом деле нулевой, который передается из предыдущей записи)
AbstractChannel.java
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
На этом этапе создаются три новых компонента, которые назначаются переменным-членам, которые
id = newId();
protected ChannelId newId() {
return DefaultChannelId.newInstance();
}
id — это уникальный идентификатор каждого канала в netty, который здесь не раскрывается, а затем
unsafe = newUnsafe();
protected abstract AbstractUnsafe newUnsafe();
См. определение небезопасно
Unsafe operations that should never be called from user-code. These methods are only provided to implement the actual transport, and must be invoked from an I/O thread
Чтобы успешно захватить еще один важный компонент netty, мы можем игнорировать то, что делает TA, просто знайтеnewUnsafe
Методы заканчиваются в классахNioServerSocketChannel
середина
Наконец
pipeline = newChannelPipeline();
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
Если вы смотрите на этот код в первый раз, вы можете не знатьDefaultChannelPipeline
Для чего это?Мы по-прежнему используем вышеуказанный метод для просмотра интерфейса верхнего уровняChannelPipeline
Определение
A list of ChannelHandlers which handles or intercepts inbound events and outbound operations of a Channel
Как видно из документации этого класса, этот интерфейс в основном снова является основным модулем netty.
На данный момент мы, наконец, создали канал на стороне сервера.Когда мы соединяем эти детали вместе, мы извлекаем несколько основных компонентов netty, которые резюмируются следующим образом.
- Channel
- ChannelConfig
- ChannelId
- Unsafe
- Pipeline
- ChannelHander
Когда я впервые вижу код, наша цель - проследить строку кода, которую запустил сервер. Давайте напишем вышеуказанные компоненты, подождем, пока код будет готов, мы можем его проанализировать, и я поставлю его. Переходим к каждому компоненту в серия обратного источника
Подводя итог, пользователь вызывает методBootstrap.bind(port)
Первый шаг — создать новый через отражениеNioServerSocketChannel
объект, и создал ряд основных компонентов в процессе нового, вот и все, ничего больше, нам нужно продолжать следить за реальным запуском
2. запустите этот канал
На этом этапе вам лучше перейти к началу статьи, чтобы вспомнить, первый шаг newChannel выполнен, вот инициализация для этого канала, что делает метод init, давайте углубимся
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
Когда вы впервые видите этот метод, вы можете подумать: «Вау, это старо, как вы можете видеть это таким образом?» Помните, что мы говорили ранее, Паодин Цзе Ню, шаг за шагом разбирая и, наконец, объединяя, следующие мои шаги по разборке
1. Установите опцию и атрибут
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
Отсюда видно, что мы звоним сюда первымoptions0()
а такжеattrs0()
, а потом полученные options и attrs инжектить в channelConfig или channel.Насчет того, для чего нужны options и attr, на самом деле, не нужно сейчас так глубоко разбираться, достаточно взглянуть на интерфейс верхнего уровняChannelOption
И посмотрите на конкретные отношения наследования канала, вы можете понять, я поставлю эти два в серию анализа исходного кода позже.
2. Установите параметр и атрибут нового канала доступа.
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
Здесь он аналогичен предыдущему, за исключением того, что вместо установки этих двух свойств текущего канала он соответствует каналу, соответствующему новому входящему соединению.Поскольку в этой статье речь идет только о том, как запустить сервер, соединение доступа будет быть размещены в следующей статье.
3. Добавьте новый обработчик соединения
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
На последнем шагеp.addLast()
Добавлен обработчик конвейера в serverChannel.ServerBootstrapAcceptor
, как видно из названия, это аксессор, который принимает новые запросы и отправляет новые запросы в зацикливатель событий.Сначала мы не будем слишком много анализировать.
Подведем итоги, мы обнаружили, что init не запускал службу, а инициализировал некоторые базовые конфигурации и свойства, а также добавил аксессор в пайплайн, чтобы специально принимать новые подключения, нам нужно продолжать следить
3. Зарегистрируйте этот канал на объекте
На этом этапе мы анализируем следующие методы
ChannelFuture regFuture = config().group().register(channel);
позвонитьNioEventLoop
серединаregister
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
Ну, в этот момент, помните здесьunsafe()
Каким должен быть возвращаемый объект? Если вы не помните, вы можете прочитать предыдущее описание небезопасности, или самый быстрый способ — выполнить отладку здесь, следовать методу регистрации и посмотреть, какой это тип небезопасности.
Мы проверили и обнаружили, что это
AbstractUnsafe.java
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// ...
AbstractChannel.this.eventLoop = eventLoop;
// ...
register0(promise);
}
Здесь нам нужно еще только сосредоточиться, сначала привязать цикл событий EventLoop к NioServerSocketChannel, а затем вызватьregister0()
private void register0(ChannelPromise promise) {
try {
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
Этот абзац на самом деле очень ясен, первый звонокdoRegister();
, я расскажу об этом позже, а потом позвонюinvokeHandlerAddedIfNeeded()
, поэтому первая строка, напечатанная в консоли, будет
handlerAdded
Что касается того, как он наконец называется, мы поговорим об этом позже, когда будем подробно разбирать пайплайн.
тогда позвониpipeline.fireChannelRegistered();
После звонка дисплей консоли
handlerAdded
channelRegistered
продолжайте следить
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
Читая это, вы можете считать само собой разумеющимся, что последняя строка консоли
pipeline.fireChannelActive();
Вывод из этой строки кода, давайте сначала посмотримisActive()
метод
@Override
public boolean isActive() {
return javaChannel().socket().isBound();
}
Наконец позвоните в jdk
ServerSocket.java
/**
* Returns the binding state of the ServerSocket.
*
* @return true if the ServerSocket succesfuly bound to an address
* @since 1.4
*/
public boolean isBound() {
// Before 1.3 ServerSockets were always bound during creation
return bound || oldImpl;
}
здесьisBound()
Возвращает false, но из текущего процесса, за которым мы следили, мы не привязали ServerSocket к адресу, поэтомуisActive()
вернуть false, мы не успешно вошли вpipeline.fireChannelActive();
метод, то кто является выходом последней строки, мы немного сумасшедшие, на самом деле, если вы умело используете IDE, очень просто найти стек вызовов функций
Ниже приведен конкретный метод, который я использую для поиска вызова функции с помощью intellij.
Сначала мы нажимаем точку останова на строке кода, которая, наконец, выводит текст, затем отлаживаем, запускаем эту строку, intellij автоматически подтягивает для нас стек вызовов, единственное, что нам нужно сделать, это двигать клавиши со стрелками, вы можете см. Полную цепочку вызовов функции
Если вы видите, что самым последним инициированием метода является метод Run, запускаемый потоком, то сделайте точку останова в месте отправки метода объекта Runnable, удалите другие точки останова, выполните повторную отладку, например, наша первая отладка обнаруживает хранилище в стеке. Runnable следующим образом
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
мы остановились на этой линииpipeline.fireChannelActive();
, Если мы хотим увидеть начальный вызов, мы должны выскочить и нажать точку останова.if (!wasActive && isActive())
, потому что выполнение многих задач в netty вызывается асинхронными потоками, то есть потоками-реакторами (подробности см. в трилогии о потоках-реакторах.Последняя песня), если мы хотим увидеть первый вызов метода, мы должны посмотреть место, где был отправлен Runnable, и рекурсивно мы можем найти строку «исчезающего кода»
Наконец, таким образом, наконец, нашелpipeline.fireChannelActive();
К сожалению, код, который инициирует вызов, выглядит следующим образом:doBind0()
метод
doBind0()
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
Мы обнаружили, что при вызовеdoBind0(...)
Когда метод используется, он является асинхронным, обертывая Runnable.Об асинхронных задачах вы можете прочитать мою предыдущую статью.Анализ исходного кода Netty раскрывает завесу потока реактора (3)
Хорошо, пойдем кchannel.bind()
метод
AbstractChannel.java
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return pipeline.bind(localAddress);
}
Найдено, позвонив на метод BING
@Override
public final ChannelFuture bind(SocketAddress localAddress) {
return tail.bind(localAddress);
}
Я полагаю, что вы не очень много знаете о том, что такое хвост. Вы можете обратиться к самому началу. Хвост появился, когда вы создали конвейер. О классах, соответствующих конвейеру и хвосту, я подробно объясню позже в серии исходных кодов. Здесь вы хотите узнать следующий код. Единственный лучший способ - ввести отладку шаг за шагом. Я не буду подробно раскрывать его из соображений экономии места.
Наконец, мы пришли к следующему району
HeadContext.java
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
Небезопасным здесь является тот, который упоминался ранееAbstractUnsafe
, если быть точным, должно бытьNioMessageUnsafe
Вводим его метод привязки
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
// ...
boolean wasActive = isActive();
// ...
doBind(localAddress);
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
Очевидно, согласно нормальному процессу, мы уже проанализировалиisActive();
Метод возвращает ложь и входит вdoBind()
После этого, если канал активирован, инициироватьpipeline.fireChannelActive();
Вызов, наконец, вызывает пользовательский метод и выводит последнюю строку на консоль, поэтому на этом этапе вы должны знать, почему три строки будут последовательно выведены на консоль.
doBind()
Способ тоже очень простой
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
//noinspection Since15
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
Наконец, передается метод привязки в jdk.После этой строчки кода, в нормальных условиях, порт фактически привязывается.
Кроме того, посредством нисходящего анализа при вызовеpipeline.fireChannelActive();
будет вызван следующий метод
HeadContext.java
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
ВходитьreadIfIsAutoRead
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
анализироватьisAutoRead
метод
private volatile int autoRead = 1;
public boolean isAutoRead() {
return autoRead == 1;
}
Отсюда видно, чтоisAutoRead
Метод возвращает true по умолчанию, поэтому введите следующий метод
public Channel read() {
pipeline.read();
return this;
}
наконец позвонить
AbstractNioUnsafe.java
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
здесьthis.selectionKey
Это объект, который мы вернули на предыдущем шаге регистрации. Когда мы регистрировались ранее, тестовая операция регистрации была равна 0.
Не забудьте зарегистрироваться
AbstractNioChannel
selectionKey = javaChannel().register(eventLoop().selector, 0, this)
Это эквивалентно удалению зарегистрированных операций, передаче условия if и последующему вызову
selectionKey.interestOps(interestOps | readInterestOp);
А вотreadInterestOp
Он был передан, когда ранее был представлен newChannel.SelectionKey.OP_ACCEPT
, снова стандартный геймплей jdk nio. На данный момент детали, которые вам нужно знать, почти такие же, так что давайте закончим так!
summary
Наконец, давайте подытожим процесс, через который проходит netty для запуска службы. 1. Задайте параметры класса запуска, самое главное выставить канал 2. Создайте канал, соответствующий серверу, и создайте основные компоненты, включая ChannelConfig, ChannelId, ChannelPipeline, ChannelHandler, Unsafe и т. д. 3. Инициализируйте канал, соответствующий серверу, установите атрибут, параметр и установите атрибут и параметр подканала, добавьте новый метод доступа к каналу на канале сервера и отправьте такие события, как addHandler и register. 4. Вызовите нижний уровень jdk, чтобы выполнить привязку порта, и инициируйте активное событие.Когда активное событие запускается, привязка сервисного порта действительно выполняется.
Кроме того, подробные идеи о чтении исходного кода в статье также могут вам помочь.
Если вы хотите изучить Нетти, моя маленькая книга«Введение и практика Netty: имитация системы обмена мгновенными сообщениями WeChat IM»Это может вам помочь.Если вы хотите систематически изучать принципы Netty, то вы не должны пропустить мою серию видеороликов по анализу исходного кода Netty:coding.IMO OC.com/class/230, Также…