Высокий параллелизм с нуля (8) --- Простая реализация инфраструктуры RPC

RPC

предисловие

Резюме предыдущей ситуации

В прошлой статье мы кратко рассмотрели, что такое RPC, три процесса, зачем он нам нужен, его характеристики и применимые сценарии, определение процесса и протокола RPC, а также немного познакомились с его структурой. Теория часто делает людей сонливыми и неразборчивыми. Если это можно объяснить с помощью некоторого кода, это будет легче понять.

предыдущая ссылка

Высокий параллелизм с нуля (1) --- основная концепция Zookeeper

Высокий параллелизм с нуля (2) --- Zookeeper реализует распределенные блокировки

Высокий параллелизм с нуля (3) --- Создание кластера Zookeeper и выбор лидера

Высокий параллелизм с нуля (4) --- распределенная очередь Zookeeper

Высокий параллелизм с нуля (5) --- Приложение центра конфигурации Zookeeper

Высокий параллелизм с нуля (6) --- Выборы мастера Zookeeper и краткий обзор его официального сайта

High Concurrency from Scratch (7) --- Введение в RPC, протокол и инфраструктуру

Содержание 1: Процесс и задачи RPC

1. RPC-процесс

На самом деле об этом уже упоминалось в 2 - ① предыдущей статьи, если забудете, не беда, скопирую еще раз.

заглушка: заглушка в распределенных вычислениях — это фрагмент кода, который преобразует параметры, передаваемые между клиентом и сервером во время удаленных вызовов процедур.

1. Клиентская заглушка вызывается во время обработки клиента (так же, как вызов локального метода), и параметры передаются в

2. Клиентская заглушка маршалирует параметры в сообщения, а затем отправляет сообщения на сервер через системные вызовы.

3. Клиентская локальная операционная система отправляет сообщение с клиентского компьютера на серверный компьютер.

4. Операционная система сервера передает полученный пакет данных на клиентскую заглушку

5. Сообщение о рассортировке серверной заглушки является параметром

6. Серверная заглушка снова вызывает серверный процесс, а результат выполнения процесса отвечает клиенту теми же шагами в обратном направлении

2. Начните анализ с точки зрения пользователя

1.定义过程接口
2.服务端实现接口的整个过程
3.客户端使用生成的stub代理对象

Содержание 2: Проектирование и реализация RPC Framework

1. Подготовьте класс сущности Student и базовый интерфейс

Клиент создает прокси-объект интерфейса процесса.При разработке клиентской прокси-фабрики прокси-объект интерфейса можно создать с помощью динамического прокси-сервера JDK.

① Определите интерфейс StudentService

Учебный класс имеет три атрибута: имя (строка), возраст (int), секс (строка). Чтобы сохранить место, я не буду публиковать код, просто предоставляющую методы Gotter, Setter и ToString.

public interface StudentService {
	/**
	 * 获取信息
	 * @return
	 */
	public Student getInfo();
	
	//打印student的信息并返回一个boolean值
	public boolean printInfo(Student student);
}

И предоставить простую реализацию, по сути, заключается в том, чтобы распечатать информацию студента

@Service(StudentService.class)
public class StudentServiceImpl implements StudentService {

	public Student getInfo() {
		Student person = new Student();
    	person.setAge(25);
		person.setName("说出你的愿望吧~");
		person.setSex("男");
		return person;
	}

	public boolean printInfo(Student person) {
		if (person != null) {
			System.out.println(person);
			return true;
		}
		return false;
	}
	
	public static void main(String[] args) {
		new Thread(()->{
			System.out.println("111");
		}).start();;
	}
}

2. Строительство клиента

① Учитесь на тестовом классе

Во-первых, клиент получает прокси-класс нашего StudentService через наш локальный прокси.В настоящее время точно нет локальной реализации StudentService на нашей стороне клиента.В это время мы напрямую даем адрес.

public class ClientTest {
	@Test
	public void test() {
		// 本地没有接口实现,通过代理获得接口实现实例
		RpcClientProxy proxy = new RpcClientProxy("192.168.80.1", 9998);
		StudentService service = proxy.getProxy(StudentService.class);
		
		System.out.println(service.getInfo());
		
		Student student = new Student();
		student.setAge(23);
		student.setName("hashmap");
		student.setSex("男");
		System.out.println(service.printInfo(student));
	}
}

На этом этапе наше внимание обращается на то, как клиент помогает нам проксировать

② RpcClientProxy, реализующий интерфейс InvocationHandler.

/**
 * RpcClientProxy
 * 客户端代理服务,客户端往服务端发起的调用将通过客户端代理来发起
 */
public class RpcClientProxy implements InvocationHandler{
	private String host;	// 服务端地址
	private int port;	// 服务端口号
	
	public RpcClientProxy(String host, int port){
		this.host = host;
		this.port = port;
	}
	
	/**
	 * 生成业务接口的代理对象,代理对象做的事情,在invoke方法中。
	 * @param clazz 代理类型(接口)
	 * @return
	 */
	@SuppressWarnings("unchecked")
	public <T> T getProxy(Class<T> clazz){
		// clazz 不是接口不能使用JDK动态代理
		return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{ clazz }, RpcClientProxy.this);
	}
	
	/**
	 * 动态代理做的事情,接口的实现不在本地,在网络中的其他进程中,我们通过实现了Rpc客户端的对象来发起远程服务的调用。
	 */
	public Object invoke(Object obj, Method method, Object[] params) throws Throwable {
		// 调用前
		System.out.println("执行远程方法前,可以做些事情");
		
		// 调用远程服务,需要封装参数,类似于序列化的过程
		RpcRequest request = new RpcRequest();
		request.setClassName(method.getDeclaringClass().getName());
		request.setMethodName(method.getName());
		request.setParamTypes(method.getParameterTypes());
		request.setParams(params);
		// 链接服务器调用服务
		RpcClient client = new RpcClient();
		Object rst = client.start(request, host, port);
		
		// 调用后
		System.out.println("执行远程方法后,也可以做些事情");
		return rst;
	}
}

JDK предоставляет класс Proxy для достижения нашего динамического прокси, вы можете создать экземпляр прокси-объекта newProxyInstance (ClassLoader var0, Class > [] Var1, InvocationHandler var2), на этот раз мы передаем параметр, указанный должен быть интерфейсом, интерфейс нельзя использовать, если нет динамических прокси JDK

Третий параметр RpcClientProxy.Это метод newProxyInstance().Хотя метод помогает нам создать экземпляр, конкретное действие после создания экземпляра должно быть обеспечено этим InvocationHandler.

Есть только один Object invoke(Object var1, Method var2, Object[] var3) бросает Throwable в интерфейсе InvocationHandler.Параметры этого метода понять не сложно.Первый это прокси объект,второй метод выполнения, а третий - обязательный набор параметров

Возвращаясь к нашему коду только что, когда я выполняю оператор System.out.println(service.getInfo()), наша логика перейдет к реализации invoke() в аннотации метода invoke(). Процесс также подробно объясняется.Во-первых, нам нужно вызвать удаленную службу, инкапсулировать параметр, а затем установить сетевое подключение для отправки этих параметров на наш сервер.На данный момент нам нужно использовать RpcClient.

③ RpcClient

В методе start() наш запрос RpcRequest реализует интерфейс Serializable, поэтому в это время инкапсулированные данные будут преобразованы в двоичный файл, а затем сброшены().В это время наше сообщение было отправлено, и нам нужно дождаться ответ сервера. , в ответ нам нужно получить входной поток через наш серверный ObjectOutputStream

/**
 * RpcClient
 * Rpc客户端,代表业务代码作为客户端,往远端服务发起请求。
 */
public class RpcClient {
	
	/**
	 * 通过网络IO,打开远端服务连接,将请求数据写入网络中,并获得响应结果。
	 * 
	 * @param request 将要发送的请求数据
	 * @param host 远端服务域名或者ip地址
	 * @param port 远端服务端口号
	 * @return 服务端响应结果
	 * @throws Throwable 抛出的异常
	 */
	public Object start(RpcRequest request, String host, int port) throws Throwable{
		// 打开远端服务连接
		Socket server = new Socket(host, port);
		
		ObjectInputStream oin = null;
		ObjectOutputStream oout = null;
		
		try {
			// 1. 服务端输出流,写入请求数据,发送请求数据
			oout = new ObjectOutputStream(server.getOutputStream());
			oout.writeObject(request);
			oout.flush();
			
			// 2. 服务端输入流,获取返回数据,转换参数类型
			// 类似于反序列化的过程
			oin = new ObjectInputStream(server.getInputStream());
			Object res = oin.readObject();
			RpcResponse response = null;
			if(!(res instanceof RpcResponse)){
				throw new InvalidClassException("返回参数不正确,应当为:"+RpcResponse.class+" 类型");
			}else{
				response = (RpcResponse) res;
			}
			
			// 3. 返回服务端响应结果
			if(response.getError() != null){ // 服务器产生异常
				throw response.getError();
			}
			return response.getResult();
		}finally{
			try {	// 清理资源,关闭流
				if(oin != null) oin.close();
				if(oout != null) oout.close();
				if(server != null) server.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
 
}

④ RpcRequest для инкапсуляции параметров

/**
 * RpcRequest
 * Rpc请求对象,请求远端服务服务的内容,在网络上进行传输。
 */
public class RpcRequest implements Serializable{
	// 需要请求的类名
	private String className;
	
	// 需求请求的方法名
    private String methodName;
    
    // 请求方法的参数类型
    private Class<?>[] paramTypes;
    
    // 请求的参数值
    private Object[] params;
	
	public String getClassName() {
		return className;
	}
	public void setClassName(String className) {
		this.className = className;
	}
	public String getMethodName() {
		return methodName;
	}
	public void setMethodName(String methodName) {
		this.methodName = methodName;
	}
	public Class<?>[] getParamTypes() {
		return paramTypes;
	}
	public void setParamTypes(Class<?>[] paramTypes) {
		this.paramTypes = paramTypes;
	}
	public Object[] getParams() {
		return params;
	}
	public void setParams(Object[] params) {
		this.params = params;
	}
 
}

⑤ Результат ответа Rpc-сервера оборачивает RpcResponse

Он также реализует сериализацию JDK по умолчанию Serializable.

/**
 * RpcResponse
 * Rpc服务端响应结果包装类,在网络上进行传输。
 */
public class RpcResponse implements Serializable {
	// 可能抛出的异常
	private Throwable error;
	// 响应的内容或结果
        private Object result;
    
	public Throwable getError() {
		return error;
	}
	public void setError(Throwable error) {
		this.error = error;
	}
	public Object getResult() {
		return result;
	}
	public void setResult(Object result) {
		this.result = result;
	}
    
    
}

3. Строительство сервера

① Моделирование на стороне сервера ServerTest

public class ServerTest {

	@Test
	public void startServer() {
		RpcServer server = new RpcServer();
		server.start(9998, "rpc.simple.RpcServer");
	}
	
	public static void main(String[] args) {
	}
}

Учитывая номер порта с пакетом в параметре, функция должна сканировать сервисы в пакете

② Реализация метода start()

Создайте коллекцию сервисов типа Map для хранения сканирований классов, предоставляющих сервисы rpc.В настоящее время, поскольку они не размещены в реестре, адресация отсутствует. Позже он будет внесен в реестр зоопарка.

В getService() разве мы не предоставили имя пакета в ServerTest? В этот раз мы сначала пошли, чтобы найти все их классы (пожалуйста, обратитесь к методу getClasses()). В getClasses() мы в основном сначала основывались на предоставленное имя пакета.Посмотрите вниз, если есть проблема с каталогом, будет выброшено исключение.Если проблем нет, он начнет обход всех файлов в этом каталоге.Если в результате обхода будет обнаружено, что файл является файлом класса, он будет создан и оценен.Если есть пользовательская аннотация @service, класс, отмеченный этой аннотацией, является классом реализации службы RPC. Если эта аннотация существует, это служба rpc, которую нам нужно найти, и она загружается в классы набора результатов.Если каталог все еще является каталогом, вызывайте себя, пока не увидите файл класса.

Когда мы найдем все классы, вернемся к методу getService(), поместим их все в classList, а затем отобразим, то есть используем имя интерфейса в качестве ключа и экземпляр в качестве значения (services.put( cla.getAnnotation(Service.class).value().getName(), obj)).

Наконец, вернитесь к start(), и после сканирования службы будет обработан RpcServerHandler.

/**
 * RpcServer
 * Rpc服务提供者
 */
public class RpcServer {
	
	/**
	 * 启动指定的网络端口号服务,并监听端口上的请求数据。获得请求数据以后将请求信息委派给服务处理器,放入线程池中执行。
	 * @param port 监听端口
	 * @param clazz 服务类所在包名,多个用英文逗号隔开
	 */
	public void start(int port, String clazz) {
		ServerSocket server = null;
		try {
			// 1. 创建服务端指定端口的socket连接
			server = new ServerSocket(port);
			// 2. 获取所有rpc服务类
			Map<String, Object> services = getService(clazz);
			// 3. 创建线程池
			Executor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
			while(true){
				// 4. 获取客户端连接
				Socket client = server.accept();
				// 5. 放入线程池中执行
				RpcServerHandler service = new RpcServerHandler(client, services);
				executor.execute(service);
			}
		} catch (IOException e) {
			e.printStackTrace();
		}finally{
			//关闭监听
			if(server != null)
				try {
					server.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
		}
	}
	
	/**
	 * 实例化所有rpc服务类,也可用于暴露服务信息到注册中心。
	 * @param clazz 服务类所在包名,多个用英文逗号隔开
	 * @return
	 */
	public Map<String,Object> getService(String clazz){
		try {
			Map<String, Object> services = new HashMap<String, Object>();
			// 获取所有服务类
			String[] clazzes = clazz.split(",");
			List<Class<?>> classes = new ArrayList<Class<?>>();
			for(String cl : clazzes){
				List<Class<?>> classList = getClasses(cl);
				classes.addAll(classList);
			}
			// 循环实例化
			for(Class<?> cla:classes){
				Object obj = cla.newInstance();
				services.put(cla.getAnnotation(Service.class).value().getName(), obj);
			}
			return services;
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}
	
	/**
	 * 获取包下所有有@Sercive注解的类
	 * @param pckgname
	 * @return
	 * @throws ClassNotFoundException
	 */
	public static List<Class<?>> getClasses(String pckgname) throws ClassNotFoundException {
		// 需要查找的结果
		List<Class<?>> classes = new ArrayList<Class<?>>();
		// 找到指定的包目录
		File directory = null;
		try {
			ClassLoader cld = Thread.currentThread().getContextClassLoader();
			if (cld == null)
				throw new ClassNotFoundException("无法获取到ClassLoader");
			String path = pckgname.replace('.', '/');
			URL resource = cld.getResource(path);
			if (resource == null)
				throw new ClassNotFoundException("没有这样的资源:" + path);
			directory = new File(resource.getFile());
		} catch (NullPointerException x) {
			throw new ClassNotFoundException(pckgname + " (" + directory + ") 不是一个有效的资源");
		}
		if (directory.exists()) {
			// 获取包目录下的所有文件
			String[] files = directory.list();
			File[] fileList = directory.listFiles();
			// 获取包目录下的所有文件
			for (int i = 0; fileList != null && i < fileList.length; i++) {
				File file = fileList[i];
				//判断是否是Class文件
				if (file.isFile() && file.getName().endsWith(".class")) {
					Class<?> clazz = Class.forName(pckgname + '.' + files[i].substring(0, files[i].length() - 6));
					if(clazz.getAnnotation(Service.class) != null){
						classes.add(clazz);
					}
				}else if(file.isDirectory()){ //如果是目录,递归查找
					List<Class<?>> result = getClasses(pckgname+"."+file.getName());
					if(result != null && result.size() != 0){
						classes.addAll(result);
					}
				}
			}
		} else{
			throw new ClassNotFoundException(pckgname + "不是一个有效的包名");
		}
		return classes;
	}
}

③ RpcServerHandler для обработки

Он очень похож на RpcClient только что. Это процесс сериализации и десериализации. В основном это процесс вызова метода invoke() после получения экземпляра, метода и его параметров на третьем шаге и помещения результата в отклик.

/**
 * RpcServerHandler
 * 服务端请求处理,处理来自网络IO的服务请求,并响应结果给网络IO。
 */
public class RpcServerHandler implements Runnable {
	
	// 客户端网络请求socket,可以从中获得网络请求信息
	private Socket clientSocket;
	
	// 服务端提供处理请求的类集合
	private Map<String, Object> serviceMap;
	
	/**
	 * @param client 客户端socket
	 * @param services 所有服务
	 */
	public RpcServerHandler(Socket client, Map<String, Object> services) {
		this.clientSocket = client;
		this.serviceMap = services;
	}
 
 
	/**
	 * 读取网络中客户端请求的信息,找到请求的方法,执行本地方法获得结果,写入网络IO输出中。
	 * 
	 */
	public void run() {
		ObjectInputStream oin = null;
		ObjectOutputStream oout = null;
		RpcResponse response = new RpcResponse();
		try {
			// 1. 获取流以待操作
			oin = new ObjectInputStream(clientSocket.getInputStream());
			oout = new ObjectOutputStream(clientSocket.getOutputStream());
			
			// 2. 从网络IO输入流中请求数据,强转参数类型
			Object param = oin.readObject();
			RpcRequest  request = null;
			if(!(param instanceof RpcRequest)){
				response.setError(new Exception("参数错误"));
				oout.writeObject(response);
				oout.flush();
				return;
			}else{
				// 反序列化RpcRequest
				request = (RpcRequest) param;
			}
			
			// 3. 查找并执行服务方法
			Object service = serviceMap.get(request.getClassName());
			Class<?> clazz= service.getClass();
			Method method = clazz.getMethod(request.getMethodName(), request.getParamTypes());
			Object result = method.invoke(service, request.getParams());
			
			// 4. 返回RPC响应,序列化RpcResponse
			response.setResult(result);
			// 序列化结果
			oout.writeObject(response);
			oout.flush();
			return;
		} catch (Exception e) {
			try {	//异常处理
				if(oout != null){
					response.setError(e);
					oout.writeObject(response);
					oout.flush();
				}
			} catch (Exception e1) {
				e1.printStackTrace();
			}
			return;
		}finally{
			try {	// 回收资源,关闭流
				if(oin != null) oin.close();
				if(oout != null) oout.close();
				if(clientSocket != null) clientSocket.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
 
}

4. Запуск результатов

Сначала откройте ServerTest, а затем откройте ClientTest, это просто и быстро.Будьте осторожны, чтобы не щелкнуть правой кнопкой мыши, чтобы запустить основной метод.

Содержание 3: Меры по оптимизации клиента

1. Представление первооткрывателя

При разработке клиента две вещи, которые необходимо сделать в ClientStubInvocationHandler, — это сортировка сообщений и отправка сетевых запросов, а сортировка содержимого запроса в сообщения передается прокси-серверу-заглушке клиента, в дополнение к протоколу сообщений и сетевому уровню. транзакции Кроме того, может быть обнаружение служебной информации. Кроме того, протокол сообщений также может измениться. Нам также необходимо поддерживать несколько протоколов. На самом деле это связано с широтой поддержки протокола фреймворком. Например, по сравнению с Spring Cloud поддержка протоколов в dubbo относительно гибкая.

На этом этапе нам нужно знать, какой протокол использует служба, поэтому нам нужно ввести средство поиска служб.

2. Уровень протокола

Мы хотим поддерживать несколько протоколов, как должен быть спроектирован класс (ориентированный на интерфейс, шаблон стратегии, комбинация)

В настоящее время наш протокол должен быть абстрагирован, а содержимое протокола должно быть упорядочено и неупорядочено.Например, две разные реализации JSON и HTTP, которые мы предоставили выше, и в настоящее время заглушка клиента нуждается не только сервис-поиск, но и сервис-поиск.Нам нужна наша поддержка для этого протокола

① Дополнение: Как получить регистрационную информацию от zookeeper

В основном смотрим на метод register().Мы склеили служебную информацию при регистрации и создали ее как временную ноду, а родительская нода — это персистентная нода. servicePath представляет собой структуру каталогов, аналогичную dubbo, корневой каталог /rpc+имя сервиса serviceName+service и метод loadServiceResouces() для получения сервиса не составляет труда, получить дочерние узлы под ними по этим адресам, и загрузить все URL для вызывающего абонента

public class RegistCenter {
	ZkClient client = new ZkClient("localhost:2181");
	
	private String centerRootPath = "/rpc";
	
	public RegistCenter() {
		client.setZkSerializer(new MyZkSerializer());
	}
	
	public void regist(ServiceResource serviceResource) {
		String serviceName = serviceResource.getServiceName();
		String uri = JsonMapper.toJsonString(serviceResource);
		try {
			uri = URLEncoder.encode(uri, "UTF-8");
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		String servicePath = centerRootPath + "/"+serviceName+"/service";
		if(! client.exists(servicePath)) {
			client.createPersistent(servicePath, true);
		}
		String uriPath = servicePath+"/"+uri;
		client.createEphemeral(uriPath);
	}
	
	/**
	 * 加载配置中心中服务资源信息
	 * @param serviceName
	 * @return
	 */
	public List<ServiceResource> loadServiceResouces(String serviceName) {
		String servicePath = centerRootPath + "/"+serviceName+"/service";
		List<String> children = client.getChildren(servicePath);
		List<ServiceResource> resources = new ArrayList<ServiceResource>();
		for(String ch : children) {
			try {
				String deCh = URLDecoder.decode(ch, "UTF-8");
				ServiceResource r = JsonMapper.fromJsonString(deCh, ServiceResource.class);
				resources.add(r);
			} catch (UnsupportedEncodingException e) {
				e.printStackTrace();
			}
		}
		return resources;
	}
	
	private void sub(String serviceName, ChangeHandler handler) {
		/*
		String path = centerRootPath + "/"+serviceName+"/service";
		client.subscribeChildChanges(path, new IZkChildListener() {
			@Override
			public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
				handler();
			}
		});
		client.subscribeDataChanges(path, new IZkDataListener() {
			@Override
			public void handleDataDeleted(String dataPath) throws Exception {
				handler();
			}
			
			@Override
			public void handleDataChange(String dataPath, Object data) throws Exception {
				handler();
			}
		});
		*/
	}
	
	interface ChangeHandler {
		/**
		 * 发生变化后给一个完整的属性对象
		 * @param resource
		 */
		void itemChange(ServiceResource resource);
	}
}

② Фабрика КлиентСтубПрокси

/**
 * ClientStubProxyFactory
  *   客户端存根代理工厂
 */
public class ClientStubProxyFactory {

	private ServiceInfoDiscoverer sid;

	private Map<String, MessageProtocol> supportMessageProtocols;

	private NetClient netClient;

	private Map<Class<?>, Object> objectCache = new HashMap<>();
	
	/**
	 * 
	 * 
	 * @param <T>
	 * @param interf
	 * @return
	 */
	@SuppressWarnings("unchecked")
	public <T> T getProxy(Class<T> interf) {
		T obj = (T) this.objectCache.get(interf);
		if (obj == null) {
			obj = (T) Proxy.newProxyInstance(interf.getClassLoader(), new Class<?>[] { interf },
					new ClientStubInvocationHandler(interf));
			this.objectCache.put(interf, obj);
		}

		return obj;
	}

	public ServiceInfoDiscoverer getSid() {
		return sid;
	}

	public void setSid(ServiceInfoDiscoverer sid) {
		this.sid = sid;
	}

	public Map<String, MessageProtocol> getSupportMessageProtocols() {
		return supportMessageProtocols;
	}

	public void setSupportMessageProtocols(Map<String, MessageProtocol> supportMessageProtocols) {
		this.supportMessageProtocols = supportMessageProtocols;
	}

	public NetClient getNetClient() {
		return netClient;
	}

	public void setNetClient(NetClient netClient) {
		this.netClient = netClient;
	}
	
	/**
	 * ClientStubInvocationHandler
	 * 客户端存根代理调用实现
	 * @date 2019年4月12日 下午2:38:30
	 */
	private class ClientStubInvocationHandler implements InvocationHandler {
		private Class<?> interf;

		public ClientStubInvocationHandler(Class<?> interf) {
			super();
			this.interf = interf;
		}

		@Override
		public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

			// 1、获得服务信息
			String serviceName = this.interf.getName();
			ServiceInfo sinfo = sid.getServiceInfo(serviceName);

			if (sinfo == null) {
				throw new Exception("远程服务不存在!");
			}

			// 2、构造request对象
			Request req = new Request();
			req.setServiceName(sinfo.getName());
			req.setMethod(method.getName());
			req.setPrameterTypes(method.getParameterTypes());
			req.setParameters(args);

			// 3、协议层编组
			// 获得该方法对应的协议
			MessageProtocol protocol = supportMessageProtocols.get(sinfo.getProtocol());
			// 编组请求
			byte[] data = protocol.marshallingRequest(req);

			// 4、调用网络层发送请求
			byte[] repData = netClient.sendRequest(data, sinfo);

			// 5解组响应消息
			Response rsp = protocol.unmarshallingResponse(repData);

			// 6、结果处理
			if (rsp.getException() != null) {
				throw rsp.getException();
			}

			return rsp.getReturnValue();
		}
	}
}

В ClientStub есть две ссылки в ClientStub, одна - это сервисный интерфейс Discovery Service, который используется для получения информации о удаленной службе в соответствии с именем службы, предоставляя метод ServiceInfo GESTERVICEICINFO (STRING STRING), а также поддерживаемыеProtocols для разных протоколов. Мы также определяем сообщениеProtocol Интерфейс, этот интерфейс должен быть более подробным, закодированным в дополнительную систему и декодировать в запросе и т. Д., Один и тот же процесс для ответа

/**
 * 通信协议接口
 * MessageProtocol
 */
public interface MessageProtocol {
	/**
	 * 编组请求消息
	 * @param req
	 * @return
	 */
	byte[] marshallingRequest(Request req);
	
	/**
	 * 解编组请求消息
	 * @param data
	 * @return
	 */
	Request unmarshallingRequest(byte[] data);
	
	/**
	 * 编组响应消息
	 * @param rsp
	 * @return
	 */
	byte[] marshallingResponse(Response rsp);
	
	/**
	 * 解编组响应消息
	 * @param data
	 * @return
	 */
	Response unmarshallingResponse(byte[] data);
}

В настоящее время есть некоторые проблемы, полагаться исключительно на метод сортировки и демаршаллинга недостаточно, целевая группа операции сортирует запросы на согласование, ответы, но их содержание отличается, тогда нам нужно определить рамки стандартного запроса. -класс ответа

Запрос имеет конкретное имя службы, метод службы, заголовок сообщения, тип параметра и параметры, и тот же ответ также имеет статус (через перечисление), заголовок сообщения, возвращаемое значение и тип, а также наличие исключения.

В настоящее время уровень протокола расширен до 4 методов.

Разделите протокол сообщений на уровень, который должен использовать как клиент, так и сервер.

3. Сетевой уровень

Работа сетевого уровня заключается в основном в отправке запросов и получении ответов. В настоящее время, если нам нужно инициировать сетевой запрос, мы должны сначала узнать адрес службы. В настоящее время мы используем объект serviceInfo на рисунке ниже как необходимая зависимость.В методе setRequest() будут данные для отправки.И кому отправлять,на данный момент даны две реализации BIO и Netty

Итак, три зависимости, которые нам нужны, выходят, одна — обнаружение службы, одна — поддержка, затем NetClient нашего сетевого уровня.

4. Общий чертеж

Фиолетовый представляет часть клиентского прокси, светло-зеленый относится к обнаружению службы, а светло-синий относится к части протокола.

5. Часть кода (можно игнорировать напрямую)

Поскольку эти коды не имеют ничего общего с основными идеями, это всего лишь некоторые функциональные коды, поэтому их можно напрямую игнорировать. Если вы действительно хотите работать самостоятельно, вы также можете попросить у меня образец.

① по-прежнему вернитесь к нашей ClientStubProxyFactory

Его можно сравнить с RpcClientProxy второго контента, и на исходной основе добавлены три зависимости ServiceInfoDiscoverer, supportMessageProtocols, netClient.

Кэш для объекта в ClientStubProxyFactory, если этот кеш был возвращен напрямую, в кеш не добавляются слова, а затем New — это просто небольшая разница.

② Изменение метода invoke()

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

	// 1、获得服务信息
	String serviceName = this.interf.getName();
	ServiceInfo sinfo = sid.getServiceInfo(serviceName);

	if (sinfo == null) {
		throw new Exception("远程服务不存在!");
	}

	// 2、构造request对象
	Request req = new Request();
	req.setServiceName(sinfo.getName());
	req.setMethod(method.getName());
	req.setPrameterTypes(method.getParameterTypes());
	req.setParameters(args);

	// 3、协议层编组
	// 获得该方法对应的协议
	MessageProtocol protocol = supportMessageProtocols.get(sinfo.getProtocol());
	// 编组请求
	byte[] data = protocol.marshallingRequest(req);

	// 4、调用网络层发送请求
	byte[] repData = netClient.sendRequest(data, sinfo);

	// 5、解组响应消息
	Response rsp = protocol.unmarshallingResponse(repData);

	// 6、结果处理
	if (rsp.getException() != null) {
		throw rsp.getException();
	}

	return rsp.getReturnValue();
}

Во-первых, это обнаружение службы.Когда мы выполняем метод getProxy(), упомянутый в ①, нам напрямую сообщается интерфейс прокси-сервера, поэтому мы напрямую получаем информацию об интерфейсе, а затем вызываем метод getName() для получения имя интерфейса, через имя интерфейса вызовите метод getServiceInfo(), предоставленный средством поиска служб ServiceInfo, для получения конкретной информации о службе, а затем поместите ее в запрос параметра запроса, а затем присвойте значения различным атрибуты запроса

После этого мы начали искать протокол, соответствующий этому сервису.После получения протокола мы можем получить объект поддержки протокола, а затем сделать маршалинговый запрос, преобразовать его в бинарный, отправить через netClient и отдать вместе с информация о сервере. Получите результат repData для десортировки (двоичный код обратно в ответ), а затем обработайте результат.

③ Внедрение поиска услуг

Как упоминалось ранее, средство обнаружения служб ServiceInfoDiscoverer предоставляет метод getServiceInfo() в качестве интерфейса.

Есть две разные реализации.Для локальной реализации мы можем загрузить файл конфигурации самостоятельно и получить соответствующую служебную информацию.

Обнаружение услуг Zookeeker реализуется следующим образом, аналогично содержанию зоопарка, который мы добавили в 2 - ① в начале

public class ZookeeperServiceInfoDiscoverer implements ServiceInfoDiscoverer {
	ZkClient client = new ZkClient("localhost:2181");
	
	private String centerRootPath = "/rpc";
	
	public ZookeeperServiceInfoDiscoverer() {
		client.setZkSerializer(new MyZkSerializer());
	}
	
	public void regist(ServiceInfo serviceResource) {
		String serviceName = serviceResource.getName();
		String uri = JSON.toJSONString(serviceResource);
		try {
			uri = URLEncoder.encode(uri, "UTF-8");
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		String servicePath = centerRootPath + "/"+serviceName+"/service";
		if(! client.exists(servicePath)) {
			client.createPersistent(servicePath, true);
		}
		String uriPath = servicePath+"/"+uri;
		client.createEphemeral(uriPath);
	}
	
	/**
	 * 加载配置中心中服务资源信息
	 * @param serviceName
	 * @return
	 */
	public List<ServiceInfo> loadServiceResouces(String serviceName) {
		String servicePath = centerRootPath + "/"+serviceName+"/service";
		List<String> children = client.getChildren(servicePath);
		List<ServiceInfo> resources = new ArrayList<ServiceInfo>();
		for(String ch : children) {
			try {
				String deCh = URLDecoder.decode(ch, "UTF-8");
				ServiceInfo r = JSON.parseObject(deCh, ServiceInfo.class);
				resources.add(r);
			} catch (UnsupportedEncodingException e) {
				e.printStackTrace();
			}
		}
		return resources;
	}
	
	@Override
	public ServiceInfo getServiceInfo(String name) {
		List<ServiceInfo> list = loadServiceResouces(name);
		ServiceInfo info = list.get(0);
		list.forEach((e)->{
			if(e != info) {
				info.addAddress(e.getAddress().get(0));
			}
		});
		return info;
	}

}

Поддержка 4 протоколов

Здесь реализован только JSON, который реализован через fastJSON

public class JSONMessageProtocol implements MessageProtocol {

	@Override
	public byte[] marshallingRequest(Request req) {
		Request temp = new Request();
		temp.setServiceName(req.getServiceName());
		temp.setMethod(req.getMethod());
		temp.setHeaders(req.getHeaders());
		temp.setPrameterTypes(req.getPrameterTypes());

		if (req.getParameters() != null) {
			Object[] params = req.getParameters();
			Object[] serizeParmas = new Object[params.length];
			for (int i = 0; i < params.length; i++) {
				serizeParmas[i] = JSON.toJSONString(params[i]);
			}

			temp.setParameters(serizeParmas);
		}

		return JSON.toJSONBytes(temp);
	}

	@Override
	public Request unmarshallingRequest(byte[] data) {
		Request req = JSON.parseObject(data, Request.class);
		if(req.getParameters() != null) {
			Object[] serizeParmas = req.getParameters();
			Object[] params = new Object[serizeParmas.length];
			for(int i = 0; i < serizeParmas.length; i++) {
				Object param = JSON.parseObject(serizeParmas[i].toString(), Object.class);
				params[i] = param;
			}
			req.setParameters(params);
		}
		return req;
	}

	@Override
	public byte[] marshallingResponse(Response rsp) {
		Response resp = new Response();
		resp.setHeaders(rsp.getHeaders());
		resp.setException(rsp.getException());
		resp.setReturnValue(rsp.getReturnValue());
		resp.setStatus(rsp.getStatus());
		return JSON.toJSONBytes(resp);
	}

	@Override
	public Response unmarshallingResponse(byte[] data) {
		return JSON.parseObject(data, Response.class);
	}

}

⑤ Связанный с NetClient

Он разделен на два режима: BIO и Netty, в netty используется EventLoopGroup.

BIO:
public class BioNetClient implements NetClient {
	
	@Override
	public byte[] sendRequest(byte[] data, ServiceInfo sinfo) {
		List<String> addressList = sinfo.getAddress();
		int randNum = new Random().nextInt(addressList.size());
		String address = addressList.get(randNum);
		String[] addInfoArray = address.split(":");
		try {
			return startSend(data, addInfoArray[0], Integer.valueOf(addInfoArray[1]));
		} catch (Throwable e) {
			e.printStackTrace();
		}
		return null;
	}
	
	/**
	 * 通过网络IO,打开远端服务连接,将请求数据写入网络中,并获得响应结果。
	 * 
	 * @param requestData 将要发送的请求数据
	 * @param host 远端服务域名或者ip地址
	 * @param port 远端服务端口号
	 * @return 服务端响应结果
	 * @throws Throwable 抛出的异常
	 */
	private byte[] startSend(byte[] requestData, String host, int port) throws Throwable{
		// 打开远端服务连接
		Socket serverSocket = new Socket(host, port);
		
		InputStream in = null;
		OutputStream out = null;
		
		try {
			// 1. 服务端输出流,写入请求数据,发送请求数据
			out = serverSocket.getOutputStream();
			out.write(requestData);
			out.flush();
			
			// 2. 服务端输入流,获取返回数据,转换参数类型
			// 类似于反序列化的过程
			in = serverSocket.getInputStream();
			byte[] res = new byte[1024];
			int readLen = -1;
			ByteArrayOutputStream baos = new ByteArrayOutputStream();
			while((readLen = in.read(res)) > 0) {
				baos.write(res, 0, readLen);
			}
			return baos.toByteArray();
		}finally{
			try {	// 清理资源,关闭流
				if(in != null) in.close();
				if(out != null) out.close();
				if(serverSocket != null) serverSocket.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}
сетевой режим:
public class NettyNetClient implements NetClient {
	private SendHandler sendHandler;
	private Map<String, SendHandler> sendHandlerMap = new ConcurrentHashMap<String, SendHandler>();
	
	@Override
	public byte[] sendRequest(byte[] data, ServiceInfo sinfo) {
		try {
			List<String> addressList = sinfo.getAddress();
			int randNum = new Random().nextInt(addressList.size());
			String address = addressList.get(randNum);
			String[] addInfoArray = address.split(":");
			SendHandler handler = sendHandlerMap.get(address);
			if(handler == null) {
				sendHandler = new SendHandler(data);
				new Thread(()->{
					try {
						connect(addInfoArray[0], Integer.valueOf(addInfoArray[1]));
					} catch (NumberFormatException e) {
						e.printStackTrace();
					} catch (Exception e) {
						e.printStackTrace();
					}
				}).start();
			}
			byte[] respData = (byte[]) sendHandler.rspData();
			return respData;
		} catch (NumberFormatException e) {
			e.printStackTrace();
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}
	
	public void connect(String host, int port) throws Exception {
        // 配置客户端
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            //EchoClientHandler handler = new EchoClientHandler();
            
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(sendHandler);
                 }
             });

            // 启动客户端连接
            ChannelFuture f = b.connect(host, port).sync();
            // 等待客户端连接关闭
            f.channel().closeFuture().sync();
        } finally {
            // 释放线程组资源
            group.shutdownGracefully();
        }
    }
}

⑥ Результат операции

Может имитировать потребителя и самовоспроизведение продюсера, не опубликовано здесь

finally

Потом продолжу контент Dubbo

Далее: Высокий уровень параллелизма с нуля (9) --- Основные функции и протоколы Dubbo