Знать сокет
Сокет, также известный как сокет, представляет собой протокол, соглашение или спецификацию для сетевого взаимодействия между различными процессами.
Для программирования сокетов это чаще всего уровень инкапсуляции или абстракции на основе таких протоколов, как TCP/UDP, который представляет собой интерфейс, предоставляемый системой для программирования, связанного с сетевым взаимодействием.
Основной процесс установки сокета
Давайте возьмем базовый API, предоставляемый операционной системой Linux, в качестве примера, чтобы понять основной процесс установления связи через сокет:
Видно, что по сути сокет — это упрощение и абстракция протоколов tcp-соединения (конечно, это может быть и udp и другое соединение) протоколов на уровне программирования.
1. Самая простая демонстрация сокета
1.1 Односторонняя связь
Во-первых, мы начнем с базового кода сокета, который отправляет и получает сообщения только один раз:
Сервер:
package com.marklux.socket.base;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
/**
* The very basic socket server that only listen one single message.
*/
public class BaseSocketServer {
private ServerSocket server;
private Socket socket;
private int port;
private InputStream inputStream;
private static final int MAX_BUFFER_SIZE = 1024;
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public BaseSocketServer(int port) {
this.port = port;
}
public void runServerSingle() throws IOException {
this.server = new ServerSocket(this.port);
System.out.println("base socket server started.");
// the code will block here till the request come.
this.socket = server.accept();
this.inputStream = this.socket.getInputStream();
byte[] readBytes = new byte[MAX_BUFFER_SIZE];
int msgLen;
StringBuilder stringBuilder = new StringBuilder();
while ((msgLen = inputStream.read(readBytes)) != -1) {
stringBuilder.append(new String(readBytes,0,msgLen,"UTF-8"));
}
System.out.println("get message from client: " + stringBuilder);
inputStream.close();
socket.close();
server.close();
}
public static void main(String[] args) {
BaseSocketServer bs = new BaseSocketServer(9799);
try {
bs.runServerSingle();
}catch (IOException e) {
e.printStackTrace();
}
}
}
Клиент:
package com.marklux.socket.base;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.Socket;
/**
* The very basic socket client that only send one single message.
*/
public class BaseSocketClient {
private String serverHost;
private int serverPort;
private Socket socket;
private OutputStream outputStream;
public BaseSocketClient(String host, int port) {
this.serverHost = host;
this.serverPort = port;
}
public void connetServer() throws IOException {
this.socket = new Socket(this.serverHost, this.serverPort);
this.outputStream = socket.getOutputStream();
// why the output stream?
}
public void sendSingle(String message) throws IOException {
try {
this.outputStream.write(message.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
System.out.println(e.getMessage());
}
this.outputStream.close();
this.socket.close();
}
public static void main(String[] args) {
BaseSocketClient bc = new BaseSocketClient("127.0.0.1",9799);
try {
bc.connetServer();
bc.sendSingle("Hi from mark.");
}catch (IOException e) {
e.printStackTrace();
}
}
}
Сначала запустите сервер, затем запустите клиент, вы увидите эффект.
- Обратите внимание на реализацию операции ввода-вывода здесь, мы используем размер
MAX_BUFFER_SIZE
Массив байтов используется как буфер, затем байты берутся из входного потока и помещаются в буфер, а затем байты берутся из буфера и встраиваются в строку, Это очень полезно, когда файл входного потока На самом деле на этой идее основан и упомянутый позже NIO.
1.2 Двусторонняя связь
В приведенном выше примере реализована только односторонняя связь, что, очевидно, является пустой тратой каналов. Соединение через сокет поддерживает полнодуплексную двустороннюю связь (нижний уровень — tcp).В следующем примере сервер вернет квитанцию клиенту после получения сообщения от клиента.
И мы используем некоторые упакованные методы java.io, чтобы упростить весь процесс коммуникации (т.к. длина сообщения не большая, буферы больше не используются).
Сервер:
public void runServer() throws IOException {
this.serverSocket = new ServerSocket(port);
this.socket = serverSocket.accept();
this.inputStream = socket.getInputStream();
String message = new String(inputStream.readAllBytes(), "UTF-8");
System.out.println("received message: " + message);
this.socket.shutdownInput(); // 告诉客户端接收已经完毕,之后只能发送
// write the receipt.
this.outputStream = this.socket.getOutputStream();
String receipt = "We received your message: " + message;
outputStream.write(receipt.getBytes("UTF-8"));
this.outputStream.close();
this.socket.close();
}
Клиент:
public void sendMessage(String message) throws IOException {
this.socket = new Socket(host,port);
this.outputStream = socket.getOutputStream();
this.outputStream.write(message.getBytes("UTF-8"));
this.socket.shutdownOutput(); // 告诉服务器,所有的发送动作已经结束,之后只能接收
this.inputStream = socket.getInputStream();
String receipt = new String(inputStream.readAllBytes(), "UTF-8");
System.out.println("got receipt: " + receipt);
this.inputStream.close();
this.socket.close();
}
-
Обратите внимание, что здесь мы вызываем после того, как сервер получает сообщение, а клиент отправляет сообщение соответственно.
shutdownInput()
иshutdownOutput()
Вместо того, чтобы напрямую закрывать соответствующий поток, это связано с тем, что закрытие любого потока напрямую приведет к закрытию сокета, и последующая квитанция не может быть отправлена. -
Но учтите, что вызов
shutdownInput()
иshutdownOutput()
После этого соответствующий поток также будет закрыт и его нельзя будет повторно отправить/записать в сокет.
2. Отправка большего количества сообщений: определение конца
В только что приведенных двух примерах при каждом открытии потока может выполняться только одна операция записи/чтения, после закрытия соответствующий поток не может быть повторно записан или прочитан.
В этом случае, если вы хотите отправить два сообщения, вы должны установить два сокета, что потребляет ресурсы и создает проблемы. На самом деле мы можем вообще не закрывать соответствующий поток, пока сообщение пишется поэтапно.
Но в этом случае мы должны столкнуться с другой проблемой: как судить об окончании отправки сообщения?
2.1 Использование специальных символов
Самый простой способ — использовать специальные символы для обозначения завершения передачи.Сервер может завершить чтение, пока он считывает соответствующие символы, а затем выполнять соответствующие операции обработки.
В следующем примере мы используем символ новой строки\n
Чтобы отметить конец передачи, сервер печатает сообщение каждый раз, когда получает сообщение, и использует сканер для упрощения операции:
Сервер:
public void runServer() throws IOException {
this.server = new ServerSocket(this.port);
System.out.println("base socket server started.");
this.socket = server.accept();
// the code will block here till the request come.
this.inputStream = this.socket.getInputStream();
Scanner sc = new Scanner(this.inputStream);
while (sc.hasNextLine()) {
System.out.println("get info from client: " + sc.nextLine());
} // 循环接收并输出消息内容
this.inputStream.close();
socket.close();
}
Клиент:
public void connetServer() throws IOException {
this.socket = new Socket(this.serverHost, this.serverPort);
this.outputStream = socket.getOutputStream();
}
public void send(String message) throws IOException {
String sendMsg = message + "\n"; // we mark \n as a end of line.
try {
this.outputStream.write(sendMsg.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
System.out.println(e.getMessage());
}
// this.outputStream.close();
// this.socket.shutdownOutput();
}
public static void main(String[] args) {
CycleSocketClient cc = new CycleSocketClient("127.0.0.1", 9799);
try {
cc.connetServer();
Scanner sc = new Scanner(System.in);
while (sc.hasNext()) {
String line = sc.nextLine();
cc.send(line);
}
}catch (IOException e) {
e.printStackTrace();
}
}
Эффект после запуска заключается в том, что каждый раз, когда клиент вводит строку текста и нажимает Enter, сервер распечатывает соответствующую запись о прочтении сообщения.
2.2 Определяется длиной
Вернемся к происхождению. Причина, по которой мы не можем определить, когда сообщение заканчивается, заключается в том, что мы не можем определить длину каждого сообщения.
На самом деле длина сообщения может быть отправлена первой.Когда серверу известна длина сообщения, он может завершить прием сообщения.
В общем, отправка сообщения становится двумя шагами
- длина отправленного сообщения
- Отправить сообщение
Последняя проблема заключается в том, что количество байтов, отправленных на этапе «отправить длину сообщения», должно быть фиксированным, иначе мы все равно будем в тупике.
Вообще говоря, мы можем использовать фиксированное количество байтов для сохранения длины сообщения.Например, первые 2 байта указаны как длина сообщения, но максимальная длина сообщения, которое мы можем передать, также фиксирована, с Например, раздел 2 слова, максимальная длина отправляемого сообщения не превышает 2^16 байт или 64 КБ.
Если вы понимаете кодировку некоторых символов, вы знаете, что мы можем использовать пространство переменной длины для хранения длины сообщения, например:
第一个字节首位为0:即0XXXXXXX,表示长度就一个字节,最大128,表示128B
第一个字节首位为110,那么附带后面一个字节表示长度:即110XXXXX 10XXXXXX,最大2048,表示2K
第一个字节首位为1110,那么附带后面二个字节表示长度:即110XXXXX 10XXXXXX 10XXXXXX,最大131072,表示128K
依次类推
Конечно, это будет сложнее реализовать, поэтому в следующем примере мы по-прежнему используем фиксированные два байта для записи длины сообщения.
Сервер:
public void runServer() throws IOException {
this.serverSocket = new ServerSocket(this.port);
this.socket = serverSocket.accept();
this.inputStream = socket.getInputStream();
byte[] bytes;
while (true) {
// 先读第一个字节
int first = inputStream.read();
if (first == -1) {
// 如果是-1,说明输入流已经被关闭了,也就不需要继续监听了
this.socket.close();
break;
}
// 读取第二个字节
int second = inputStream.read();
int length = (first << 8) + second; // 用位运算将两个字节拼起来成为真正的长度
bytes = new byte[length]; // 构建指定长度的字节大小来储存消息即可
inputStream.read(bytes);
System.out.println("receive message: " + new String(bytes,"UTF-8"));
}
}
Клиент:
public void connetServer() throws IOException {
this.socket = new Socket(host,port);
this.outputStream = socket.getOutputStream();
}
public void sendMessage(String message) throws IOException {
// 首先要把message转换成bytes以便处理
byte[] bytes = message.getBytes("UTF-8");
// 接下来传输两个字节的长度,依然使用移位实现
int length = bytes.length;
this.outputStream.write(length >> 8); // write默认一次只传输一个字节
this.outputStream.write(length);
// 传输完长度后,再正式传送消息
this.outputStream.write(bytes);
}
public static void main(String[] args) {
LengthSocketClient lc = new LengthSocketClient("127.0.0.1",9799);
try {
lc.connetServer();
Scanner sc = new Scanner(System.in);
while (sc.hasNextLine()) {
lc.sendMessage(sc.nextLine());
}
} catch (IOException e) {
e.printStackTrace();
}
}
3. Обрабатывайте больше подключений: многопоточность
3.1 Отправлять и получать сообщения одновременно
Прежде чем рассматривать, как сервер обрабатывает несколько подключений, давайте сначала рассмотрим использование многопоточности для преобразования исходного экземпляра диалога один к одному.
В исходном примере получатель сообщения не может активно отправить сообщение другой стороне. Другими словами, мы не достигли реального диалога друг с другом. Это в основном потому, что два действия отправки и получения сообщений не могутв то же времяПоэтому нам нужно использовать два потока: один для прослушивания ввода с клавиатуры и записи его в сокет, а другой для прослушивания сокета и отображения полученного сообщения.
Для простоты мы напрямую позволяем основному потоку отвечать за мониторинг клавиатуры и отправку сообщений, и в то же время открываем другой поток для получения сообщений и их отображения.
Поток получения сообщений ListenThread.java
public class ListenThread implements Runnable {
private Socket socket;
private InputStream inputStream;
public ListenThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() throws RuntimeException{
try {
this.inputStream = socket.getInputStream();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
while (true) {
try {
int first = this.inputStream.read();
if (first == -1) {
// 输入流已经被关闭,无需继续读取
throw new RuntimeException("disconnected.");
}
int second = this.inputStream.read();
int msgLength = (first<<8) + second;
byte[] readBuffer = new byte[msgLength];
this.inputStream.read(readBuffer);
System.out.println("message from [" + socket.getInetAddress() + "]: " + new String(readBuffer,"UTF-8"));
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
}
}
Основной поток, который выбирается пользователем в качестве сервера или клиента при запуске:
public class ChatSocket {
private String host;
private int port;
private Socket socket;
private ServerSocket serverSocket;
private OutputStream outputStream;
// 以服务端形式启动,创建会话
public void runAsServer(int port) throws IOException {
this.serverSocket = new ServerSocket(port);
System.out.println("[log] server started at port " + port);
// 等待客户端的加入
this.socket = serverSocket.accept();
System.out.println("[log] successful connected with " + socket.getInetAddress());
// 启动监听线程
Thread listenThread = new Thread(new ListenThread(this.socket));
listenThread.start();
waitAndSend();
}
// 以客户端形式启动,加入会话
public void runAsClient(String host, int port) throws IOException {
this.socket = new Socket(host, port);
System.out.println("[log] successful connected to server " + socket.getInetAddress());
Thread listenThread = new Thread(new ListenThread(this.socket));
listenThread.start();
waitAndSend();
}
public void waitAndSend() throws IOException {
this.outputStream = this.socket.getOutputStream();
Scanner sc = new Scanner(System.in);
while (sc.hasNextLine()) {
this.sendMessage(sc.nextLine());
}
}
public void sendMessage(String message) throws IOException {
byte[] msgBytes = message.getBytes("UTF-8");
int length = msgBytes.length;
outputStream.write(length>>8);
outputStream.write(length);
outputStream.write(msgBytes);
}
public static void main(String[] args) {
Scanner scanner = new Scanner(System.in);
ChatSocket chatSocket = new ChatSocket();
System.out.println("select connect type: 1 for server and 2 for client");
int type = Integer.parseInt(scanner.nextLine().toString());
if (type == 1) {
System.out.print("input server port: ");
int port = scanner.nextInt();
try {
chatSocket.runAsServer(port);
} catch (IOException e) {
e.printStackTrace();
}
}else if (type == 2) {
System.out.print("input server host: ");
String host = scanner.nextLine();
System.out.print("input server port: ");
int port = scanner.nextInt();
try {
chatSocket.runAsClient(host, port);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
3.2 Используйте пулы потоков для оптимизации возможностей параллелизма на стороне сервера
Как сервер, если вы устанавливаете соединение сокета только с одним клиентом за раз, это будет слишком расточительно для ресурсов, поэтому мы можем полностью разрешить серверу устанавливать несколько сокетов с несколькими клиентами.
Затем, поскольку вам приходится иметь дело с несколькими подключениями, вам приходится сталкиваться с проблемой параллелизма (конечно, вы также можете написать цикл для его обработки по очереди). Мы можем использовать несколько потоков для обработки параллелизма, но создание и уничтожение потоков будет потреблять много ресурсов и времени, поэтому лучше всего сделать это за один шаг и использовать для реализации пул потоков.
Ниже приведен пример кода на стороне сервера:
public class SocketServer {
public static void main(String args[]) throws Exception {
// 监听指定的端口
int port = 55533;
ServerSocket server = new ServerSocket(port);
// server将一直等待连接的到来
System.out.println("server将一直等待连接的到来");
//如果使用多线程,那就需要线程池,防止并发过高时创建过多线程耗尽资源
ExecutorService threadPool = Executors.newFixedThreadPool(100);
while (true) {
Socket socket = server.accept();
Runnable runnable=()->{
try {
// 建立好连接后,从socket中获取输入流,并建立缓冲区进行读取
InputStream inputStream = socket.getInputStream();
byte[] bytes = new byte[1024];
int len;
StringBuilder sb = new StringBuilder();
while ((len = inputStream.read(bytes)) != -1) {
// 注意指定编码格式,发送方和接收方一定要统一,建议使用UTF-8
sb.append(new String(bytes, 0, len, "UTF-8"));
}
System.out.println("get message from client: " + sb);
inputStream.close();
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
};
threadPool.submit(runnable);
}
}
}
4. Поддержание связи
Я думаю, вам не составит труда найти проблему, то есть после того, как соединение по сокету будет успешно установлено, если посередине возникнет исключение и одна из сторон отключится, другую сторону в это время не найти. за исключением.
Проще говоря, поддерживаемое нами сокетное соединение является длинным соединением, но мы не гарантируем егоСвоевременность, он может быть доступен в одну секунду, но не в следующую.
4.1 Использование пакетов Heartbeat
Самый распространенный способ обеспечить постоянную доступность соединения — отправлять пакеты пульса через регулярные промежутки времени, чтобы определить, является ли соединение нормальным. Это по-прежнему очень важно для сервисов с высокими требованиями к работе в режиме реального времени (таких как отправка сообщений).
Общий план таков:
- Обе стороны договариваются о формате пакета пульса, который должен отличать его от обычных сообщений.
- Клиент через равные промежутки времени отправляет на сервер пакет сердцебиения.
- Каждый раз, когда сервер получает пакет сердцебиения, он отбрасывает его.
- Если пакет пульса клиента не может быть отправлен, можно сделать вывод, что соединение было разорвано.
- Если требования к реальному времени очень высоки, сервер также может регулярно проверять частоту пакетов пульса, отправляемых клиентом.
4.2 Повторное подключение при отключении
Использование пакетов Heartbeat неизбежно увеличит нагрузку на полосу пропускания и производительность.Для обычных приложений нам фактически не нужно использовать эту схему.Если при отправке сообщения возникает исключение соединения, просто попробуйте переподключиться напрямую.
По сравнению с приведенной выше схемой, фактически сообщение, выдающее исключение, действует как пакет пульса.
В целом, необходимо гибко обдумывать и настраивать в соответствии с конкретными бизнес-сценариями вопрос о том, нужно ли поддерживать соединение и как его поддерживать.