лучшие практики netty4+protobuf3

задняя часть Netty API protobuf

Ключевые моменты этой статьи:

  1. Реализация многотипной передачи Netty4+protobuf3
  2. Элегантная реализация рассылки сообщений

Выполнение фоновых служб часто имеет такой процесс:

2eeae9b5-4a34-4171-b0e8-4f695c29d2d9.png
Как завершить этот процесс изящно? Поделиться ниже на основеnetty+protobufпрограмма:

Первое, что нужно решить, это какnetty+protobufПринятое здесь решение заключается в использовании класса в качестве решения для описания протокола, то есть решения, требующего вторичного декодирования.Файл IDL выглядит следующим образом:

syntax = "proto3";
option java_package = "com.nonpool.proto";
option java_multiple_files = true;

message Frame {
   string messageName = 1;
   
   bytes payload = 15;
}

message TextMessage {
   string  text = 1;
}

Фрейм — это протокол описания.Все сообщения сериализуются в байтовые массивы и записываются в полезную нагрузку фрейма при отправке.messageNameсогласился быть отправленнымmessage的类名, устанавливается при генерацииjava_multiple_files = trueКлассы можно генерировать отдельно, что нагляднее и удобнее использовать рефлексию для получения этих классов позже.

После создания protobuf наш процесс распаковки должен выглядеть так:

71d1665d-db17-48bf-a7b2-86f4c2faf652.png
Сериализация и десериализация protobuf netty уже написала за нас соответствующий декодер/кодировщик, который можно вызвать напрямую, осталось написать вторичный декодер/кодировщик:

public class SecondProtobufCodec extends MessageToMessageCodec<Frame, MessageLite> {

    @Override
    protected void encode(ChannelHandlerContext ctx, MessageLite msg, List<Object> out) throws Exception {
    
        out.add(Frame.newBuilder()
                .setMessageType(msg.getClass().getSimpleName())
                .setPayload(msg.toByteString())
                .build());
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, Frame msg, List<Object> out) throws Exception {
        out.add(ParseFromUtil.parse(msg));
    }

}
public abstract class ParseFromUtil {

    private final static ConcurrentMap<String, Method> methodCache = new ConcurrentHashMap<>();

    static {
        //找到指定包下所有protobuf实体类
        List<Class> classes = ClassUtil.getAllClassBySubClass(MessageLite.class, true, "com.nonpool.proto");
        classes.stream()
                .filter(protoClass -> !Objects.equals(protoClass, Frame.class))
                .forEach(protoClass -> {
                    try {
                        //反射获取parseFrom方法并缓存到map
                        methodCache.put(protoClass.getSimpleName(), protoClass.getMethod("parseFrom", ByteString.class));
                    } catch (NoSuchMethodException e) {
                        throw new RuntimeException(e);
                    }
                });
    }
    
    /**
     * 根据Frame类解析出其中的body
     *
     * @param msg
     * @return
     */
    public static MessageLite parse(Frame msg) throws InvocationTargetException, IllegalAccessException {
        String type = msg.getMessageType();
        ByteString body = msg.getPayload();

        Method method = methodCache.get(type);
        if (method == null) {
            throw new RuntimeException("unknown Message type :" + type);
        }
        return (MessageLite) method.invoke(null, body);
    }
}

До сих пор декодирование/кодирование данных, которые мы отправляем и получаем, выполнялось с помощью встроенного декодирования/кодирования.pipelineЦепочка обработки такая:

public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(new ProtobufVarint32FrameDecoder())
                                    .addLast(new ProtobufDecoder(Frame.getDefaultInstance()))
                                    .addLast(new ProtobufVarint32LengthFieldPrepender())
                                    .addLast(new ProtobufEncoder())
                                    .addLast(new SecondProtobufCodec())
                            ;

После того, как данные отправлены и получены, следующим шагом является распределение сообщения по соответствующему методу обработки. Метод обработки также использует полиморфизм + дженерики + аннотации для достижения элегантного распределения реализации. Сначала определите общий интерфейс:interface DataHandler<T extends MessageLite>, определите метод на интерфейсеvoid handler(T t, ChannelHandlerContext ctx), а затем каждый тип класса обработки реализует интерфейс с обрабатываемым типом и использует пользовательскую аннотацию для сопоставления типа обработки. Все классы, реализующие этот интерфейс, сканируются при запуске проекта, и эти классы обработки кэшируются с использованием того же метода, что и выше для разбора типа Message (одна разница в том, что здесь нужно кэшировать только экземпляр класса обработки вместо метода , так какparseFromдаstaticнельзя назвать единообразно), после этого мы можем написать нашpipelineПоследний процессор включен:

public class DispatchHandler extends ChannelInboundHandlerAdapter {

    @Override
    @SuppressWarnings("unchecked")
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        HandlerUtil.getHandlerInstance(msg.getClass().getSimpleName()).handler((MessageLite) msg,ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }
}
public abstract class HandlerUtil {

    private final static ConcurrentMap<String,DataHandler> instanceCache = new ConcurrentHashMap<>();

    static {
        try {
            List<Class> classes = ClassUtil.getAllClassBySubClass(DataHandler.class, true,"com.onescorpion");
            for (Class claz : classes) {
                HandlerMapping annotation = (HandlerMapping) claz.getAnnotation(HandlerMapping.class);
                instanceCache.put(annotation.value(), (DataHandler) claz.newInstance());
            }
            System.out.println("handler init success handler Map: " +  instanceCache);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static DataHandler getHandlerInstance(String name) {
        return instanceCache.get(name);
    }
}

Такое изящное распределение обработки сделано. Из-за сильной регулярности кода все классы обработчиков могут быть сгенерированы с использованием шаблонов Полный код см.здесь

ps: На самом деле, поскольку в этом примере используется protobuf, остается сильный контракт, поэтому теоретически пользовательские аннотации на каждом процессоре сообщений не нужны.Вы можете получить реальный тип универсального типа, но аннотацию можно значительно повысить гибкость обработчика, и более ценно использовать другие решения (например, json).