Эта статья подробно объяснит духовный путь постепенной эволюции от BIO к NIO, прокладывая путь к объяснению библиотеки Reactor-Netty.
оJava编程方法论-Reactor与Webflux
Совместное использование видео Rxjava и Reactor завершено. Адрес станции b выглядит следующим образом:
Интерпретация исходного кода Rxjava и совместное использование:вооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооо
Интерпретация и совместное использование исходного кода Reactor:вооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооо
вводить
Мы демонстрируем его использование через BIO Demo:
//服务端
public class BIOServer {
public void initBIOServer(int port)
{
ServerSocket serverSocket = null;//服务端Socket
Socket socket = null;//客户端socket
BufferedReader reader = null;
String inputContent;
int count = 0;
try {
serverSocket = new ServerSocket(port);
System.out.println(stringNowTime() + ": serverSocket started");
while(true)
{
socket = serverSocket.accept();
System.out.println(stringNowTime() + ": id为" + socket.hashCode()+ "的Clientsocket connected");
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
while ((inputContent = reader.readLine()) != null) {
System.out.println("收到id为" + socket.hashCode() + " "+inputContent);
count++;
}
System.out.println("id为" + socket.hashCode()+ "的Clientsocket "+stringNowTime()+"读取结束");
}
} catch (IOException e) {
e.printStackTrace();
}finally{
try {
reader.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public String stringNowTime()
{
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return format.format(new Date());
}
public static void main(String[] args) {
BIOServer server = new BIOServer();
server.initBIOServer(8888);
}
}
// 客户端
public class BIOClient {
public void initBIOClient(String host, int port) {
BufferedReader reader = null;
BufferedWriter writer = null;
Socket socket = null;
String inputContent;
int count = 0;
try {
reader = new BufferedReader(new InputStreamReader(System.in));
socket = new Socket(host, port);
writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
System.out.println("clientSocket started: " + stringNowTime());
while (((inputContent = reader.readLine()) != null) && count < 2) {
inputContent = stringNowTime() + ": 第" + count + "条消息: " + inputContent + "\n";
writer.write(inputContent);//将消息发送给服务端
writer.flush();
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
socket.close();
reader.close();
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public String stringNowTime() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return format.format(new Date());
}
public static void main(String[] args) {
BIOClient client = new BIOClient();
client.initBIOClient("127.0.0.1", 8888);
}
}
Из приведенного выше примера мы можем знать, что, будь то сервер или клиент, некоторые операции, на которых мы сосредоточимся, основаны на сервере.serverSocket = new ServerSocket(port)
serverSocket.accept()
, на базе клиентаSocket socket = new Socket(host, port);
И у обоих есть способ чтения и записи данных Socket, то есть чтение и запись через потоки, причем это чтение и запись неизбежно осуществляется через буфер промежуточного байтового массива.
Интерпретация привязки в ServerSocket
Поэтому мы смотрим на эти соответствующие логики через исходный код. давайте сначала посмотримServerSocket.java
Соответствующий код для этого класса.
мы смотримServerSocket.java
Конструктор может знать, что он все равно вызовет свой конструктор в концеbind
метод:
//java.net.ServerSocket#ServerSocket(int)
public ServerSocket(int port) throws IOException {
this(port, 50, null);
}
public ServerSocket(int port, int backlog, InetAddress bindAddr) throws IOException {
setImpl();
if (port < 0 || port > 0xFFFF)
throw new IllegalArgumentException(
"Port value out of range: " + port);
if (backlog < 1)
backlog = 50;
try {
bind(new InetSocketAddress(bindAddr, port), backlog);
} catch(SecurityException e) {
close();
throw e;
} catch(IOException e) {
close();
throw e;
}
}
Согласно нашей демонстрации и приведенному выше исходному коду, параметр endpoint, переданный здесь, не будет нулевым, в то же время он принадлежитInetSocketAddress
Типа, размер бэклога 50, поэтому основная логика кода, на которую стоит обратить внимание, этоgetImpl().bind(epoint.getAddress(), epoint.getPort());
:
public void bind(SocketAddress endpoint, int backlog) throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
if (!oldImpl && isBound())
throw new SocketException("Already bound");
if (endpoint == null)
endpoint = new InetSocketAddress(0);
if (!(endpoint instanceof InetSocketAddress))
throw new IllegalArgumentException("Unsupported address type");
InetSocketAddress epoint = (InetSocketAddress) endpoint;
if (epoint.isUnresolved())
throw new SocketException("Unresolved address");
if (backlog < 1)
backlog = 50;
try {
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkListen(epoint.getPort());
// 我们应该关注的主要逻辑
getImpl().bind(epoint.getAddress(), epoint.getPort());
getImpl().listen(backlog);
bound = true;
} catch(SecurityException e) {
bound = false;
throw e;
} catch(IOException e) {
bound = false;
throw e;
}
}
здесьgetImpl()
, из реализации вышеуказанного конструктора мы виделиsetImpl();
, известно, чтоfactory
Значение по умолчанию равно null, поэтому здесь мы сосредоточимся наSocksSocketImpl
этого класса, создайте его объекты и установите текущийServerSocket
Настройка объекта, исходный код этой настройки можно найти вSocksSocketImpl
родительский классjava.net.SocketImpl
Посмотреть в.
Тогда getImpl становится ясным. На самом деле это класс сущностей, соответствующий базовой реализации нашего сокета. Поскольку разные ядра операционных систем отличаются, их реализация сокета, конечно, будет разной. Мы должны обратить на это внимание. система под win.
/**
* The factory for all server sockets.
*/
private static SocketImplFactory factory = null;
private void setImpl() {
if (factory != null) {
impl = factory.createSocketImpl();
checkOldImpl();
} else {
// No need to do a checkOldImpl() here, we know it's an up to date
// SocketImpl!
impl = new SocksSocketImpl();
}
if (impl != null)
impl.setServerSocket(this);
}
/**
* Get the {@code SocketImpl} attached to this socket, creating
* it if necessary.
*
* @return the {@code SocketImpl} attached to that ServerSocket.
* @throws SocketException if creation fails.
* @since 1.4
*/
SocketImpl getImpl() throws SocketException {
if (!created)
createImpl();
return impl;
}
/**
* Creates the socket implementation.
*
* @throws IOException if creation fails
* @since 1.4
*/
void createImpl() throws SocketException {
if (impl == null)
setImpl();
try {
impl.create(true);
created = true;
} catch (IOException e) {
throw new SocketException(e.getMessage());
}
}
ПосмотримSocksSocketImpl
Реализован метод привязки, а затем получить его конец — не что иное, как вызов нативного методаbind0
.
//java.net.AbstractPlainSocketImpl#bind
/**
* Binds the socket to the specified address of the specified local port.
* @param address the address
* @param lport the port
*/
protected synchronized void bind(InetAddress address, int lport)
throws IOException
{
synchronized (fdLock) {
if (!closePending && (socket == null || !socket.isBound())) {
NetHooks.beforeTcpBind(fd, address, lport);
}
}
socketBind(address, lport);
if (socket != null)
socket.setBound();
if (serverSocket != null)
serverSocket.setBound();
}
//java.net.PlainSocketImpl#socketBind
@Override
void socketBind(InetAddress address, int port) throws IOException {
int nativefd = checkAndReturnNativeFD();
if (address == null)
throw new NullPointerException("inet address argument is null.");
if (preferIPv4Stack && !(address instanceof Inet4Address))
throw new SocketException("Protocol family not supported");
bind0(nativefd, address, port, useExclusiveBind);
if (port == 0) {
localport = localPort0(nativefd);
} else {
localport = port;
}
this.address = address;
}
//java.net.PlainSocketImpl#bind0
static native void bind0(int fd, InetAddress localAddress, int localport,
boolean exclBind)
throws IOException;
Здесь нам также нужно понимать, что использование многопоточности может реализовать только многопоточность «обработки бизнес-логики», но все же необходимо получать пакеты данных один за другим, что мы видели в демонстрации выше. . Как и проблема блокировки метода чтения, многопоточность вообще не может быть решена. Во-первых, давайте посмотрим, почему accept вызывает блокировку. Функция метода accept — запросить у операционной системы, есть ли новая информация о сокете Socket. отправлено из порта XXX. Обратите внимание, что здесь запрашивается операционная система, то есть поддержка режима ввода-вывода сокета сокета основана на операционной системе. Если операционная система не находит, что сокет подключен из указанного порт XXX, то операционная система будет ждать, поэтому accept. Метод будет заблокирован, а его внутренняя реализация использует синхронный ввод-вывод на уровне операционной системы.
Интерпретация принятия в ServerSocket
Итак, давайте проанализируемServerSocket.accept
Процесс исходного кода метода:
public Socket accept() throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
if (!isBound())
throw new SocketException("Socket is not bound yet");
Socket s = new Socket((SocketImpl) null);
implAccept(s);
return s;
}
Сначала делаются какие-то суждения, затем создается объект Socket (почему здесь создается объект Socket, о чем пойдет речь далее), выполняется метод implAccept, и давайте взглянем на метод implAccept:
/**
* Subclasses of ServerSocket use this method to override accept()
* to return their own subclass of socket. So a FooServerSocket
* will typically hand this method an <i>empty</i> FooSocket. On
* return from implAccept the FooSocket will be connected to a client.
*
* @param s the Socket
* @throws java.nio.channels.IllegalBlockingModeException
* if this socket has an associated channel,
* and the channel is in non-blocking mode
* @throws IOException if an I/O error occurs when waiting
* for a connection.
* @since 1.1
* @revised 1.4
* @spec JSR-51
*/
protected final void implAccept(Socket s) throws IOException {
SocketImpl si = null;
try {
if (s.impl == null)
s.setImpl();
else {
s.impl.reset();
}
si = s.impl;
s.impl = null;
si.address = new InetAddress();
si.fd = new FileDescriptor();
getImpl().accept(si); // <1>
SocketCleanable.register(si.fd); // raw fd has been set
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkAccept(si.getInetAddress().getHostAddress(),
si.getPort());
}
} catch (IOException e) {
if (si != null)
si.reset();
s.impl = si;
throw e;
} catch (SecurityException e) {
if (si != null)
si.reset();
s.impl = si;
throw e;
}
s.impl = si;
s.postAccept();
}
После выполнения метода accept getImpl в выше мы находим метод accept в AbstractPlainSocketImpl:
//java.net.AbstractPlainSocketImpl#accept
/**
* Accepts connections.
* @param s the connection
*/
protected void accept(SocketImpl s) throws IOException {
acquireFD();
try {
socketAccept(s);
} finally {
releaseFD();
}
}
Видно, что он вызвал метод socketAccept, потому что реализация Socket в каждой операционной системе разная, поэтому здесь метод socketAccept в нашем PlainSocketImpl выполняется под Windows:
// java.net.PlainSocketImpl#socketAccept
@Override
void socketAccept(SocketImpl s) throws IOException {
int nativefd = checkAndReturnNativeFD();
if (s == null)
throw new NullPointerException("socket is null");
int newfd = -1;
InetSocketAddress[] isaa = new InetSocketAddress[1];
if (timeout <= 0) { //<1>
newfd = accept0(nativefd, isaa); // <2>
} else {
configureBlocking(nativefd, false);
try {
waitForNewConnection(nativefd, timeout);
newfd = accept0(nativefd, isaa); // <3>
if (newfd != -1) {
configureBlocking(newfd, true);
}
} finally {
configureBlocking(nativefd, true);
}
} // <4>
/* Update (SocketImpl)s' fd */
fdAccess.set(s.fd, newfd);
/* Update socketImpls remote port, address and localport */
InetSocketAddress isa = isaa[0];
s.port = isa.getPort();
s.address = isa.getAddress();
s.localport = localport;
if (preferIPv4Stack && !(s.address instanceof Inet4Address))
throw new SocketException("Protocol family not supported");
}
//java.net.PlainSocketImpl#accept0
static native int accept0(int fd, InetSocketAddress[] isaa) throws IOException;
Здесь от до — это код, который нас интересует, и выполняют метод accept0, который является собственным методом, в частности, он взаимодействует с операционной системой, чтобы отслеживать, есть ли клиент на указанный порт Терминальный доступ, именно потому, что accept0 всегда будет заблокирован, когда нет доступа клиента, поэтому наш метод accept на уровне программы заблокирован.Конечно, для блокировки на уровне программы мы можем избежать этого, то есть мы Можно изменить метод accept, чтобы он был неблокирующим, но пока мы не можем изменить блокировку, вызванную accept0.Блокировка на уровне операционной системы на самом деле является тем, что мы обычно называем синхронизацией в синхронном и асинхронном режимах. Как было сказано ранее, мы можем изменить блокировку accept на программном уровне, как этого добиться? На самом деле это реализуется путем оценки значения тайм-аута в методе socketAccept выше.Если значение тайм-аута оценивается как , если оно меньше или равно 0, то непосредственно выполняется метод accept0, который всегда будет быть в состоянии блокировки, но если мы установим тайм-аут Если значение тайм-аута больше 0, программа вернется после ожидания установленного нами времени.Обратите внимание, что если newfd здесь равен -1, это означает, что accept делает не найти никаких данных, возвращенных с нижнего слоя, тогда значение тайм-аута находится в какой настройке? Мы можем установить его с помощью метода setSoTimeout ServerSocket, давайте взглянем на этот метод:
/**
* Enable/disable {@link SocketOptions#SO_TIMEOUT SO_TIMEOUT} with the
* specified timeout, in milliseconds. With this option set to a non-zero
* timeout, a call to accept() for this ServerSocket
* will block for only this amount of time. If the timeout expires,
* a <B>java.net.SocketTimeoutException</B> is raised, though the
* ServerSocket is still valid. The option <B>must</B> be enabled
* prior to entering the blocking operation to have effect. The
* timeout must be {@code > 0}.
* A timeout of zero is interpreted as an infinite timeout.
* @param timeout the specified timeout, in milliseconds
* @exception SocketException if there is an error in
* the underlying protocol, such as a TCP error.
* @since 1.1
* @see #getSoTimeout()
*/
public synchronized void setSoTimeout(int timeout) throws SocketException {
if (isClosed())
throw new SocketException("Socket is closed");
getImpl().setOption(SocketOptions.SO_TIMEOUT, timeout);
}
Он выполняет метод setOption getImpl и устанавливает время ожидания Здесь мы видим его из AbstractPlainSocketImpl:
//java.net.AbstractPlainSocketImpl#setOption
public void setOption(int opt, Object val) throws SocketException {
if (isClosedOrPending()) {
throw new SocketException("Socket Closed");
}
boolean on = true;
switch (opt) {
/* check type safety b4 going native. These should never
* fail, since only java.Socket* has access to
* PlainSocketImpl.setOption().
*/
case SO_LINGER:
if (val == null || (!(val instanceof Integer) && !(val instanceof Boolean)))
throw new SocketException("Bad parameter for option");
if (val instanceof Boolean) {
/* true only if disabling - enabling should be Integer */
on = false;
}
break;
case SO_TIMEOUT: //<1>
if (val == null || (!(val instanceof Integer)))
throw new SocketException("Bad parameter for SO_TIMEOUT");
int tmp = ((Integer) val).intValue();
if (tmp < 0)
throw new IllegalArgumentException("timeout < 0");
timeout = tmp;
break;
case IP_TOS:
if (val == null || !(val instanceof Integer)) {
throw new SocketException("bad argument for IP_TOS");
}
trafficClass = ((Integer)val).intValue();
break;
case SO_BINDADDR:
throw new SocketException("Cannot re-bind socket");
case TCP_NODELAY:
if (val == null || !(val instanceof Boolean))
throw new SocketException("bad parameter for TCP_NODELAY");
on = ((Boolean)val).booleanValue();
break;
case SO_SNDBUF:
case SO_RCVBUF:
if (val == null || !(val instanceof Integer) ||
!(((Integer)val).intValue() > 0)) {
throw new SocketException("bad parameter for SO_SNDBUF " +
"or SO_RCVBUF");
}
break;
case SO_KEEPALIVE:
if (val == null || !(val instanceof Boolean))
throw new SocketException("bad parameter for SO_KEEPALIVE");
on = ((Boolean)val).booleanValue();
break;
case SO_OOBINLINE:
if (val == null || !(val instanceof Boolean))
throw new SocketException("bad parameter for SO_OOBINLINE");
on = ((Boolean)val).booleanValue();
break;
case SO_REUSEADDR:
if (val == null || !(val instanceof Boolean))
throw new SocketException("bad parameter for SO_REUSEADDR");
on = ((Boolean)val).booleanValue();
break;
case SO_REUSEPORT:
if (val == null || !(val instanceof Boolean))
throw new SocketException("bad parameter for SO_REUSEPORT");
if (!supportedOptions().contains(StandardSocketOptions.SO_REUSEPORT))
throw new UnsupportedOperationException("unsupported option");
on = ((Boolean)val).booleanValue();
break;
default:
throw new SocketException("unrecognized TCP option: " + opt);
}
socketSetOption(opt, on, val);
}
Этот метод относительно длинный, мы смотрим только наtimeout
Соответствующий код, код в . На самом деле здесь просто устанавливается значение тайм-аута, переданное в нашем setOption, в глобальную переменную timeout AbstractPlainSocketImpl.
Таким образом, мы можем сделать метод accept неблокирующим на программном уровне, но метод чтения все равно будет блокирующим, то есть метод чтения нам нужно преобразовать позже, а также сделать его неблокирующим на программном уровне. уровень.
Неблокирующая реализация принятия через демонстрационную трансформацию
Перед формальным преобразованием нам необходимо объяснить синхронность/асинхронность и блокировку/неблокировку в Socket:
Синхронизация/асинхронность относится к уровню операционной системы, что означает, что после того, как операционная система получит запрошенный программой ввод-вывод, если ресурс ввода-вывода не готов, как реагировать на проблему программы, если она синхронна, она будет не отвечать до тех пор, пока ресурс ввода-вывода не будет готов; и асинхронно он вернет программе флаг, который используется для отправки содержимого, отправленного через механизм событий, когда ресурс ввода-вывода будет готов.
Блокировка/неблокировка относится к программному уровню, это означает, что когда программа запрашивает операционную систему для выполнения операций ввода-вывода, если ресурсы ввода-вывода не готовы, что должна делать программа?Если она блокируется, программа ничего не делает и ждет до IO Если ресурс готов, программа будет продолжать работать, если она не блокируется, но будет время от времени проверять, готов ли IO или нет;
BIO, который мы обычно видим, является синхронной блокировкой. Если это синхронно, это означает, что нижний уровень операционной системы ожидает подготовки ресурсов ввода-вывода до тех пор, пока все в порядке. Если он заблокирован, сама программа также ожидает ресурсов ввода-вывода. быть готовым до тех пор, пока все в порядке. В частности, блокировка на уровне программы Это вызвано принятием и чтением. Мы можем сделать его неблокирующим, преобразовав его, но мы не можем изменить блокировку на уровне операционной системы.
Наш NIO является синхронным и неблокирующим.На самом деле, его неблокирующий принцип реализации аналогичен тому, что мы объяснили выше.Он заключается в улучшении явления блокировки, вызванного методами принятия и чтения, поэтому мы представилиChannel
иBuffer
Концепция чего-либо.
Что ж, мы улучшим нашу демонстрацию, чтобы решить проблему блокировки, вызванную принятием (асинхронная обработка для нескольких клиентских подключений, я не буду здесь много объяснять, читатели могут подумать сами, если это невозможно, перейдите к моему связанному видео. Найдите соответствующее интерпретация):
public class BIOProNotB {
public void initBIOServer(int port) {
ServerSocket serverSocket = null;//服务端Socket
Socket socket = null;//客户端socket
ExecutorService threadPool = Executors.newCachedThreadPool();
ClientSocketThread thread = null;
try {
serverSocket = new ServerSocket(port);
serverSocket.setSoTimeout(1000);
System.out.println(stringNowTime() + ": serverSocket started");
while (true) {
try {
socket = serverSocket.accept();
} catch (SocketTimeoutException e) {
//运行到这里表示本次accept是没有收到任何数据的,服务端的主线程在这里可以做一些其他事情
System.out.println("now time is: " + stringNowTime());
continue;
}
System.out.println(stringNowTime() + ": id为" + socket.hashCode() + "的Clientsocket connected");
thread = new ClientSocketThread(socket);
threadPool.execute(thread);
}
} catch (IOException e) {
e.printStackTrace();
}
}
public String stringNowTime() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
return format.format(new Date());
}
class ClientSocketThread extends Thread {
public Socket socket;
public ClientSocketThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader reader = null;
String inputContent;
int count = 0;
try {
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
while ((inputContent = reader.readLine()) != null) {
System.out.println("收到id为" + socket.hashCode() + " " + inputContent);
count++;
}
System.out.println("id为" + socket.hashCode() + "的Clientsocket " + stringNowTime() + "读取结束");
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
reader.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
BIOProNotB server = new BIOProNotB();
server.initBIOServer(8888);
}
}
Для нашего ServerSocket установлено время тайм-аута, так что при вызове метода accept он будет просыпаться каждую 1с, а не быть там все время, и информация будет возвращаться только при доступе клиента; давайте запустим его и увидеть результат:
2019-01-02 17:28:43:362: serverSocket started
now time is: 2019-01-02 17:28:44:363
now time is: 2019-01-02 17:28:45:363
now time is: 2019-01-02 17:28:46:363
now time is: 2019-01-02 17:28:47:363
now time is: 2019-01-02 17:28:48:363
now time is: 2019-01-02 17:28:49:363
now time is: 2019-01-02 17:28:50:363
now time is: 2019-01-02 17:28:51:364
now time is: 2019-01-02 17:28:52:365
now time is: 2019-01-02 17:28:53:365
now time is: 2019-01-02 17:28:54:365
now time is: 2019-01-02 17:28:55:365
now time is: 2019-01-02 17:28:56:365 // <1>
2019-01-02 17:28:56:911: id为1308927845的Clientsocket connected
now time is: 2019-01-02 17:28:57:913 // <2>
now time is: 2019-01-02 17:28:58:913
Видно, что когда у нас нет клиентского доступа в начале, мы выполнимSystem.out.println("now time is: " + stringNowTime());
Следует также отметить, что если вы внимательно посмотрите на теги и приведенного выше вывода, вы обнаружите, что значение времени в не равно 17:28:57:365, причина в том, что если If accept возвращает нормальное значение, оператор catch не будет выполнен.
Неблокирующая реализация демо-преобразования чтения
В этом случае мы преобразовали принимающую часть в неблокирующий тип, так можно ли преобразовать читаемую часть? Конечно, метод преобразования очень похож на accept.Когда мы читаем, мы будем вызыватьjava.net.AbstractPlainSocketImpl#getInputStream
:
/**
* Gets an InputStream for this socket.
*/
protected synchronized InputStream getInputStream() throws IOException {
synchronized (fdLock) {
if (isClosedOrPending())
throw new IOException("Socket Closed");
if (shut_rd)
throw new IOException("Socket input is shutdown");
if (socketInputStream == null)
socketInputStream = new SocketInputStream(this);
}
return socketInputStream;
}
Это создалоSocketInputStream
объект, текущийAbstractPlainSocketImpl
Объект передается, поэтому при чтении данных мы будем вызывать следующий метод:
public int read(byte b[], int off, int length) throws IOException {
return read(b, off, length, impl.getTimeout());
}
int read(byte b[], int off, int length, int timeout) throws IOException {
int n;
// EOF already encountered
if (eof) {
return -1;
}
// connection reset
if (impl.isConnectionReset()) {
throw new SocketException("Connection reset");
}
// bounds check
if (length <= 0 || off < 0 || length > b.length - off) {
if (length == 0) {
return 0;
}
throw new ArrayIndexOutOfBoundsException("length == " + length
+ " off == " + off + " buffer length == " + b.length);
}
// acquire file descriptor and do the read
FileDescriptor fd = impl.acquireFD();
try {
n = socketRead(fd, b, off, length, timeout);
if (n > 0) {
return n;
}
} catch (ConnectionResetException rstExc) {
impl.setConnectionReset();
} finally {
impl.releaseFD();
}
/*
* If we get here we are at EOF, the socket has been closed,
* or the connection has been reset.
*/
if (impl.isClosedOrPending()) {
throw new SocketException("Socket closed");
}
if (impl.isConnectionReset()) {
throw new SocketException("Connection reset");
}
eof = true;
return -1;
}
private int socketRead(FileDescriptor fd,
byte b[], int off, int len,
int timeout)
throws IOException {
return socketRead0(fd, b, off, len, timeout);
}
Здесь мы видим, что socketRead также устанавливает тайм-аут, и этот тайм-аут — это то, что мы создали.SocketInputStream
переданный объектAbstractPlainSocketImpl
объект для управления, поэтому нам нужно только установитьserverSocket.setSoTimeout(1000)
Вот и все.
Мы снова изменяем код сервера (всего код задается дважды, первый раз — это уровень ServerSocket, а второй раз — задается Socket, возвращаемый клиентским соединением, они разные):
public class BIOProNotBR {
public void initBIOServer(int port) {
ServerSocket serverSocket = null;//服务端Socket
Socket socket = null;//客户端socket
ExecutorService threadPool = Executors.newCachedThreadPool();
ClientSocketThread thread = null;
try {
serverSocket = new ServerSocket(port);
serverSocket.setSoTimeout(1000);
System.out.println(stringNowTime() + ": serverSocket started");
while (true) {
try {
socket = serverSocket.accept();
} catch (SocketTimeoutException e) {
//运行到这里表示本次accept是没有收到任何数据的,服务端的主线程在这里可以做一些其他事情
System.out.println("now time is: " + stringNowTime());
continue;
}
System.out.println(stringNowTime() + ": id为" + socket.hashCode() + "的Clientsocket connected");
thread = new ClientSocketThread(socket);
threadPool.execute(thread);
}
} catch (IOException e) {
e.printStackTrace();
}
}
public String stringNowTime() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
return format.format(new Date());
}
class ClientSocketThread extends Thread {
public Socket socket;
public ClientSocketThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader reader = null;
String inputContent;
int count = 0;
try {
socket.setSoTimeout(1000);
} catch (SocketException e1) {
e1.printStackTrace();
}
try {
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
while (true) {
try {
while ((inputContent = reader.readLine()) != null) {
System.out.println("收到id为" + socket.hashCode() + " " + inputContent);
count++;
}
} catch (Exception e) {
//执行到这里表示read方法没有获取到任何数据,线程可以执行一些其他的操作
System.out.println("Not read data: " + stringNowTime());
continue;
}
//执行到这里表示读取到了数据,我们可以在这里进行回复客户端的工作
System.out.println("id为" + socket.hashCode() + "的Clientsocket " + stringNowTime() + "读取结束");
sleep(1000);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
reader.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
BIOProNotBR server = new BIOProNotBR();
server.initBIOServer(8888);
}
}
Выполните следующее:
2019-01-02 17:59:03:713: serverSocket started
now time is: 2019-01-02 17:59:04:714
now time is: 2019-01-02 17:59:05:714
now time is: 2019-01-02 17:59:06:714
2019-01-02 17:59:06:932: id为1810132623的Clientsocket connected
now time is: 2019-01-02 17:59:07:934
Not read data: 2019-01-02 17:59:07:935
now time is: 2019-01-02 17:59:08:934
Not read data: 2019-01-02 17:59:08:935
now time is: 2019-01-02 17:59:09:935
Not read data: 2019-01-02 17:59:09:936
收到id为1810132623 2019-01-02 17:59:09: 第0条消息: ccc // <1>
now time is: 2019-01-02 17:59:10:935
Not read data: 2019-01-02 17:59:10:981 // <2>
收到id为1810132623 2019-01-02 17:59:11: 第1条消息: bbb
now time is: 2019-01-02 17:59:11:935
Not read data: 2019-01-02 17:59:12:470
now time is: 2019-01-02 17:59:12:935
id为1810132623的Clientsocket 2019-01-02 17:59:13:191读取结束
now time is: 2019-01-02 17:59:13:935
id为1810132623的Clientsocket 2019-01-02 17:59:14:192读取结束
Среди них часть вывода непрочитанных данных решает нашу проблему блокировки чтения.Она будет пробуждать нашу операцию чтения каждую 1 с.Если данные не будут прочитаны в течение 1 с, она будет выполнена.System.out.println("Not read data: " + stringNowTime())
, здесь мы можем выполнить некоторые другие операции, чтобы избежать явления блокировки текущего потока.Когда у нас есть данные для отправки, вывод будет в , потому что чтение получает вывод, поэтому оператор catch больше не выполняется. часть, поэтому вы обнаружите, что время вывода в отличается от времени в на 1 секунду, а не на 1 секунду от предыдущего 17:59:09:936;
Таким образом, мы решаем проблему блокировки, вызванную приемом и чтением, и в то же время создаем поток для каждого клиента на сервере для обработки собственной бизнес-логики, что в основном решает проблему блокировки. начальная версия NIO, но создание потока для каждого клиента это действительно головная боль, особенно если клиентов слишком много, это пустая трата ресурсов сервера, плюс переключение между потоками Накладные расходы еще хуже.Даже если ввести пул потоков технология управления количеством потоков, когда клиентов становится больше, очередь BlockingQueue пула потоков будет становиться все больше и больше. Тогда NIO в это время можно использовать для нас. Чтобы решить эту проблему, он не создает поток для каждого клиента существует только один поток на стороне сервера, и для каждого клиента создается канал.
Мысли о некоторых кодовых точках accept()
accept(), мы можем попытаться увидеть соответствующую интерпретацию Linux:
#include <sys/types.h>
#include <sys/socket.h>
int accept(int sockfd,struct sockaddr *addr,socklen_t *addrlen);
Системный вызов accept() в основном используется для типов сокетов на основе соединения, таких как SOCK_STREAM и SOCK_SEQPACKET. Он извлекает первый запрос на соединение из очереди ожидающих соединений прослушивающего сокета,создать новый сокети возвращает файловый дескриптор, указывающий на сокет. Вновь установленный сокет не находится в состоянии прослушивания, а исходный сокет прослушивания не затрагивается системным вызовом.
Примечания: вновь созданный сокет готов к отправке send() и приему данных recv().
параметр:
sockfd, который использует дескриптор сокета, установленный системным вызовом socket(), связывается с локальным адресом (обычно сокетом сервера) через bind() и продолжает прослушивать соединения через listen().
addr, указатель на struct sockaddr, структура заполняется адресом однорангового сокета сервера коммуникационного уровня (обычно это адрес клиента), а точный формат возвращаемого адреса addr определяется типом адреса сокета ( например, TCP или UDP); если адрес равен NULL и нет действительного адреса для заполнения, в этом случае адрес addrlen не используется и должен быть установлен равным NULL;
Примечания: addr — это указатель на локальную структуру данных sockaddr_in, которая является локальным сокетом (адресом и указателем) информации, к которой требуется получить доступ.
addrlen, параметр результата значения, вызывающая функция должна быть инициализирована значением, содержащим размер структуры, на которую указывает addr, и функция возвращает фактическое значение адреса однорангового узла (обычно адрес сервера);
Примечание. addrlen — это локальная целочисленная переменная, для которой задано значение sizeof(struct sockaddr_in).
Если в очереди нет ожидающих соединений и сокет не помечен как неблокирующий, accept() будет блокировать вызывающую функцию до тех пор, пока не появится соединение; если сокет помечен как неблокирующий и в очереди нет ожидающих соединений. очередь, accept() возвращает ошибку EAGAIN или EWOULDBLOCK.
Примечания: Вообще говоря, accept() является блокирующей функцией при реализации.Когда слушающий сокет вызывает accept(), он сначала обращается к своему собственному Receive_buf, чтобы проверить, есть ли пакет данных соединения; если да, скопируйте данные и удалите полученные данные, если нет пакета данных, создать новый сокет для установления соединения с адресом, отправленным клиентом, если нет, заблокировать и ждать;
Чтобы получать уведомления, когда сокет имеет входящее соединение, вы можете использоватьselect()илиpoll(). При попытке установить новое соединение система отправляет читаемое событие, а затем вызывает метод accept() для получения сокета для этого соединения. Другой способ — настроить сокет на отправку сигнала SIGIO при поступлении соединения на сокет.
возвращаемое значение При успешном выполнении возвращается неотрицательное целое число, являющееся дескриптором полученного сокета, при возникновении ошибки возвращается -1 и соответствующим образом устанавливается глобальная переменная errno.
Итак, в нашем исходном коде раздела Java (java.net.ServerSocket#accept) создаст новый сокет, так что информация дескриптора файла нового сокета, полученная после соединения, может быть установлена на новый сокет, который мы создали.java.net.PlainSocketImpl#socketAccept
Это особенно очевидно в , читатели могут просмотреть соответствующий исходный код.
Ссылаться на :Linux.drop.net/full/2/AC оценка…