Эта статья - первый публичный аккаунт и личный блог:Java Cat Talk и блог Uncle Cat | MySelf, пожалуйста, укажите источник для перепечатки.
Адрес проекта GitHub
Облегченная, высокоэффективная асинхронная среда связи сетевых приложений, поддерживающая многотерминальный (прикладной и аппаратный IoT)
предисловие
Эта статья предполагает, что читатель уже обладает определенными базовыми знаниями о Netty и может построить их самостоятельно.Служба связи Netty (включая клиент и сервер). Тогда вы, должно быть, использовали Channel, который является примером инкапсуляции ссылок Netty для традиционных JavaIO и NIO.
Итак, давайте узнаем оОчистка данных канала и безопасность потоковБар.
Шаги для сброса данных
1. Получите экземпляр ссылки
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取链接实例
Channel channel = ctx.channel();
}
Я помещаю случай в наиболее знакомый новичкам метод ChannelRead, которыйСпособ получения данных, метод, который нам нужно переопределить, когда мы реализуем интерфейс обработки сообщений Netty. То есть после того, как клиент отправит сообщение, этот метод будет запущен и вызван, поэтому мы объясним это содержание в этом методе.
Из предыдущего фрагмента кода, на самом деле, в настоящее время все еще очень просто, мы используемChannelHandlerContext (это объект, с которым ChannelHandler взаимодействует и подключается к ChannelPipeline. Ниже приводится объяснение исходного кода) для получения текущего экземпляра ссылки Channel.
/* Enables a {@link ChannelHandler} to interact with its {@link ChannelPipeline}
* and other handlers. Among other things a handler can notify the next {@link ChannelHandler} in the
* {@link ChannelPipeline} as well as modify the {@link ChannelPipeline} it belongs to dynamically.
*/
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
//......
}
2. Создайте ByteBuf, который содержит данные
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取链接实例
Channel channel = ctx.channel();
//创建一个持有数据的ByteBuf
ByteBuf buf = Unpooled.copiedBuffer("data", CharsetUtil.UTF_8);
}
Что такое Байтбуф?
Он инкапсулирован самой инфраструктурой Netty.персонаж, лежащий в основе объекта, является абстрактным классом для byte[] и ByteBuffer NIO, более официальный веб-сайт представляет собой «случайную и последовательно доступную последовательность из нуля или более байтов». Ниже приводится объяснение исходного кода.
/**
* A random and sequential accessible sequence of zero or more bytes (octets).
* This interface provides an abstract view for one or more primitive byte
* arrays ({@code byte[]}) and {@linkplain ByteBuffer NIO buffers}.
*/
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf> {
//......
}
Как видно из предыдущего исходного кода, ByteBuf является абстрактным классом, поэтому мы не можем передатьnewформа для создания нового объекта ByteBuf. Тогда мы можем предоставить через NettyПоследний служебный класс Unpooled(Просто подумайте об этом как о служебном классе для создания ByteBufs).
/**
* Creates a new {@link ByteBuf} by allocating new space or by wrapping
* or copying existing byte arrays, byte buffers and a string.
*/
public final class Unpooled {
//......
}
Это был интересный процесс, так что нам просто нужно посмотреть еще разcopiedBufferЭтот метод. Этот метод относительно прост, мы создадимновый буфер, содержимое которого представляет собой «данные», указанные в указанной нами кодировке набора символов UTF-8, а индекс чтения и индекс записи этого нового буфера равны 0 и длине строки соответственно.
3. Сброс данных
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取链接实例
Channel channel = ctx.channel();
//创建一个持有数据的ByteBuf
ByteBuf buf = Unpooled.copiedBuffer("data", CharsetUtil.UTF_8);
//数据冲刷
channel.writeAndFlush(buf);
}
Я полагаю, что большинство людей пишут это напрямую, потому что мы часто запускаем тесты как должное и получаем это сообщение «данные» на стороне клиента. Итак, мы должны обратить внимание,Какое значение вернет этот сброс данных?, как мы можем узнать на стороне сервера,Является ли этот сброс данных успешным или неудачным?
Так что на самом деле фреймворк Netty учел это, и мы получим ChannelFuture в этом сбросе данных.
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取链接实例
Channel channel = ctx.channel();
//创建一个持有数据的ByteBuf
ByteBuf buf = Unpooled.copiedBuffer("data", CharsetUtil.UTF_8);
//数据冲刷
ChannelFuture cf = channel.writeAndFlush(buf);
}
Да, он является результатом асинхронной операции ввода-вывода Channel, которая является интерфейсом и наследует Future. (Ниже приводится объяснение исходного кода)
/**
* The result of an asynchronous {@link Channel} I/O operation.
*/
public interface ChannelFuture extends Future<Void> {
//......
}
В этом случае мы, очевидно, можем знать, что можем добавить к нему соответствующий мониторинг.
4. Мониторинг результатов асинхронного обратного вызова
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取链接实例
Channel channel = ctx.channel();
//创建一个持有数据的ByteBuf
ByteBuf buf = Unpooled.copiedBuffer("data", CharsetUtil.UTF_8);
//数据冲刷
ChannelFuture cf = channel.writeAndFlush(buf);
//添加ChannelFutureListener以便在写操作完成后接收通知
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
//写操作完成,并没有错误发生
if (future.isSuccess()){
System.out.println("successful");
}else{
//记录错误
System.out.println("error");
future.cause().printStackTrace();
}
}
});
}
Хорошо, мы можем просто понять из кода, что мы получим результат этого запуска, отслеживая результат асинхронного ввода-вывода. Я думаю, что это относительно полныйСброс данных (writeAndFlush).
Процесс тестирования безопасности потоков
Для многопоточного тестирования мы будем моделировать несколько потоков для выполнения операций сброса данных, мы можем использоватьExecutor.
мы можем понять этоExecutor,ДаСпособ опустить включение потоков и планирование, вам просто нужно пройтиRunnableПросто дайте это, вам больше не нужно начинать поток. (Ниже приводится объяснение исходного кода)
/**
* An object that executes submitted {@link Runnable} tasks. This
* interface provides a way of decoupling task submission from the
* mechanics of how each task will be run, including details of thread
* use, scheduling, etc. An {@code Executor} is normally used
* instead of explicitly creating threads. For example, rather than
* invoking {@code new Thread(new(RunnableTask())).start()} for each
* of a set of tasks, you might use:...
*/
public interface Executor {
//......
}
Итак, наш тестовый код примерно такой.
final Channel channel = ctx.channel();
//创建要写数据的ByteBuf
final ByteBuf buf = Unpooled.copiedBuffer("data",CharsetUtil.UTF_8).retain();
//创建将数据写到Channel的Runnable
Runnable writer = new Runnable() {
@Override
public void run() {
ChannelFuture cf = channel.writeAndFlush(buf.duplicate());
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
//写操作完成,并没有错误发生
if (future.isSuccess()){
System.out.println("successful");
}else{
//记录错误
System.out.println("error");
future.cause().printStackTrace();
}
}
});
}
};
//获取到线程池的Executor的引用
Executor executor = Executors.newCachedThreadPool();
//提交到某个线程中执行
executor.execute(writer);
//提交到另一个线程中执行
executor.execute(writer);
Здесь нам нужноУведомлениеявляется:
СоздайтеByteBuf, мы использовалиretainЭтот метод, он же ByteBuf, который мы сгенерировализарезервированная операция.
В ByteBuf есть такая область:Несохраняемые и зарезервированные производные буферы.
Это немного сложно, мы можем просто понять, что если вызывается функция сохранения, то данные будут сохранены в производном буфере, если не вызывается, то символьные данные будут удалены после вызова. (Ниже приводится объяснение исходного кода ByteBuf)
/*<h4>Non-retained and retained derived buffers</h4>
*
* Note that the {@link #duplicate()}, {@link #slice()}, {@link #slice(int, int)} and {@link #readSlice(int)} does NOT
* call {@link #retain()} on the returned derived buffer, and thus its reference count will NOT be increased. If you
* need to create a derived buffer with increased reference count, consider using {@link #retainedDuplicate()},
* {@link #retainedSlice()}, {@link #retainedSlice(int, int)} and {@link #readRetainedSlice(int)} which may return
* a buffer implementation that produces less garbage.
*/
Ладно, думаю можно и самому протестировать, лучше всего посмотреть исходники, чтобы углубить впечатление о принципе реализации.
Пул потоков здесь на самом деле не потокобезопасен, а предназначен для тестирования многопоточности Реализация канала Netty является потокобезопасной, поэтому мы можем хранить ссылку на канал и всякий раз, когда нам нужно записать данные на удаленный узел, можем его использовать. , и сообщения гарантированно отправляются по порядку, даже если в это время его использует множество потоков.
Эпилог
Наконец, позвольте мне представить личный проект с открытым исходным кодом, основанный на Netty:InChat
Облегченная, высокоэффективная асинхронная среда связи сетевых приложений, поддерживающая многотерминальный (прикладной и аппаратный IoT)
Ссылка: "Нетти бой"
Публичный номер: Java-кот говорит
Нынешний архитектурный дизайнер (код-фермер) и предприимчивый консультант по технологиям, раскованный и посредственный, любит открытый исходный код и рассказывает о жизни программы и нестандартных галантереях.