Мертвый нижний слой java (2) - служба сообщений

Java задняя часть API модульный тест

Этот раздел является продолжением предыдущего раздела о многопоточности.Сначала поговорим о родной очереди блокировки Java (Blocking Queue), а затем поговорим о JMS (Java Messaging Service, служба сообщений java) и одной из ее реализаций, очереди сообщений ActiveMQ, так что все Слилось в обсуждение службы сообщений.

1. Очередь на блокировку

BlockingQueue также является интерфейсом в java.util.concurrent, который решает проблему эффективной передачи данных в многопоточности.Благодаря этим эффективным и потокобезопасным классам мы можем создавать высококачественные многопоточные программы. Инструмент, в основном используемый для управления синхронизацией потоков. BlockingQueue — это интерфейс со следующими методами:

public interface BlockingQueue<E> extends Queue<E> {
    boolean add(E e);
    boolean offer(E e);
    void put(E e) throws InterruptedException;
    boolean offer(E e, long timeout, TimeUnit unit);
    E take() throws InterruptedException;
    E poll(long timeout, TimeUnit unit)
    int remainingCapacity();
    boolean remove(Object o);
    public boolean contains(Object o);
    int drainTo(Collection<? super E> c);
    int drainTo(Collection<? super E> c, int maxElements);
}
  • вставлять:
  1. add(anObject): добавить объект в BlockingQueue, то есть, если BlockingQueue может его разместить, вернуть true, в противном случае выдать исключение, нехорошо
  2. offer(anObject): Указывает, что если возможно, добавьте объект в BlockingQueue, то есть, если BlockingQueue может его разместить, вернуть true, в противном случае вернуть false.
  3. put(anObject): добавьте объект в BlockingQueue. Если в BlockQueue нет места, поток, вызывающий этот метод, будет заблокирован до тех пор, пока в BlockingQueue не будет места для продолжения. Если есть блокировка, подождите, если вы не можете поместить его в .
  • читать:
  1. poll(time): взять объект, занимающий первое место в BlockingQueue. Если вы не можете взять его немедленно, вы можете подождать время, указанное параметром времени. Если вы не можете взять его, верните ноль; если вы можете не бери, верни ноль
  2. take(): взять первый объект в BlockingQueue. Если BlockingQueue пуст, заблокируйте и войдите в состояние ожидания, пока новый объект не будет добавлен в Blocking; заблокируйте, подождите, пока вы не сможете его получить
  • разное
  1. int restCapacity(); Возвращает оставшуюся емкость очереди, которая используется при вставке и получении очереди, и данные могут быть неточными.
  2. boolean remove(Object o); удаляет элементы из очереди, если она существует, удаляет один или несколько, очередь изменилась и возвращает true
  3. public boolean contains(Object o); Проверить, существует ли этот элемент в очереди, вернуть true, если он существует
  4. intdrinkTo(Collectionc); Удаляет все доступные элементы из этой очереди и добавляет их в данную коллекцию. (то есть вынуть и положить в коллекцию)
  5. Отличие метода intdrrainTo(Collectionc, intmaxElements); от метода выше в том, что указывается количество ходов; (вынуть указанное число и положить в коллекцию) Основные методы: поставить, взять пару блокирующего доступа, добавить, опросить пару неблокирующего доступа. Как упоминалось выше, BlockingQueue — это интерфейс, имеющий четыре конкретных класса реализации, два из которых широко используются:
  6. ArrayBlockingQueue: ограниченная очередь блокировки, поддерживаемая массивом, BlockingQueue с указанным размером, его конструктор должен принимать параметр int, чтобы указать его размер.Содержащиеся в нем объекты сортируются в порядке FIFO (первым пришел, первым вышел).
  7. LinkedBlockingQueue: BlockingQueue переменного размера. Его конструктор может указать емкость или нет. Если не указано, максимальное значение по умолчанию — Integer.MAX_VALUE. В основном используются методы put и take. Метод put будет блокироваться до тех пор, пока очередь не будет заполнена. Когда элемент очереди потребляется, метод take будет блокироваться, когда очередь пуста, пока член очереди не будет добавлен. Разница между LinkedBlockingQueue и ArrayBlockingQueue: По сравнению с ArrayBlockingQueue, LinkedBlockingQueue и ArrayBlockingQueue имеют разные структуры данных, поэтому пропускная способность LinkedBlockingQueue выше, чем у ArrayBlockingQueue, но предсказуемость его производительности ниже, чем у ArrayBlockingQueue, при большом количестве потоков. Вот пример производителя и потребителя, реализованных с помощью BlockingQueue: Продукт производителя:
public class Product implements Runnable{

    BlockingQueue<String> queue;
    public Product(BlockingQueue<String> queue) {
        //创建对象时就传入一个阻塞队列
        this.queue = queue;
    }
    @Override
    public void run(){
        try {
            System.out.println(Thread.currentThread().getName()+"开始生产");
            String temp =  Thread.currentThread().getName()+":生产线程";
            queue.put(temp);//向队列中放数据,如果队列是满的话,会阻塞当前线程
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

Потребитель Потребитель:

public class Consumer implements Runnable{
    BlockingQueue<String> queue;
    public Consumer(BlockingQueue<String> queue) {
        //使用有参构造函数的目的是我在创建这个消费者对象的时候就可以传进来一个队列
        this.queue = queue;
    }
    @Override
    public void run() {
        Random random = new Random();
        try {
            while(true){
                Thread.sleep(random.nextInt(10));
                System.out.println(Thread.currentThread().getName()+ "准备消费...");
                String temp = queue.take();//从队列中取任务消费,如果队列为空,会阻塞当前线程
                System.out.println(Thread.currentThread().getName() + " 获取到工作任务==== " +temp);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Тестовый класс TestQueue:

public class TestQueue {
    public static void main(String[] args) {
        //新建一个阻塞队列,队列长度是5
        BlockingQueue<String> queue = new LinkedBlockingDeque<String>(5);
        //BlockingQueue<String> queue = new ArrayBlockingQueue<String>(5);
        Consumer consumer = new Consumer(queue);
        Product product = new Product(queue);

        for(int i = 0;i<3;i++){
            new Thread(product,"product"+i).start();
        }

        //for (int i = 0;i<5;i++){
            new Thread(consumer,"consumer").start();
        //}
    }
}

Смысл всего набора кода в том, чтобы инициализировать очередь сообщений, поместить в нее тип String, длина очереди 5, использовать поток производителя для имитации запроса трех пользователей, временно поместить данные запроса пользователя в очередь BlockingQueue, а затем поток-потребитель продолжает брать задачи из очереди для обработки бизнес-логики до тех пор, пока в очереди не останется ничего для потребления. Отсюда видно, что очередь сообщений имеет две характеристики: развязку и сглаживание пиков и заполнение впадин. Между производителем и потребителем нет грубой связи.Производитель помещает данные в очередь, а потребитель берет данные из очереди.Все они устанавливают связь с очередью и развязаны, если параллелизм производителя высок, он просто ставит данные первыми.В очереди потребители могут есть медленно, и на самом деле, сервер не будет тянуть вниз сразу. Справочный адрес: http://blog.csdn.net/ghsau/article/details/8108292

2. Служба сообщений Java

2.1 Введение в JMS

JMS (служба сообщений Java) используется для отправки сообщений между двумя приложениями или распределенными системами для асинхронной связи. JMS — это независимый от поставщика (или платформы) API. Подобно JDBC (подключение к базе данных Java): здесь JDBC — это API, который можно использовать для доступа ко многим различным реляционным базам данных, в то время как JMS предоставляет те же независимые от поставщика методы доступа для доступа к службам обмена сообщениями. Многие поставщики поддерживают JMS, в том числе MQSeries от IBM, сервис Weblogic JMS от BEA и SonicMQ от Progress. JMS позволяет отправлять сообщения от одного клиента JMS другому через службу обмена сообщениями. Сообщение — это объект типа в JMS, который состоит из двух частей: заголовка и тела сообщения. Заголовок состоит из маршрутной информации и метаданных о сообщении, тело сообщения содержит данные приложения или полезную нагрузку. По типу полезной нагрузки сообщения можно разделить на несколько типов, которые несут в себе: простой текст (TextMessage), сериализуемый объект (ObjectMessage), набор свойств (MapMessage), поток байтов (BytesMessage), необработанный поток значений. (StreamMessage) и сообщение без полезной нагрузки (Message).

2.2 Состав JMS

JMS состоит из следующих элементов: Поставщик провайдера JMS: ПО промежуточного слоя, ориентированное на сообщения, реализация спецификации JMS. Поставщик может быть реализацией JMS для платформы Java или ориентированным на сообщения адаптером промежуточного программного обеспечения для платформы, отличной от Java. Клиент JMS: Создает или использует Java-приложения или объекты на основе сообщений (т. е. как производители, так и потребители вместе называются клиентами JMS). JMS Producer: клиент JMS, который создает и отправляет сообщения. JMS Consumer: клиент JMS, который получает сообщения. Сообщения JMS: объекты, которые могут передавать данные между клиентами JMS. Очередь JMS: область, в которой хранятся отправленные сообщения, ожидающие прочтения. Если сообщение прочитано, оно будет удалено из очереди. Темы JMS: механизм, поддерживающий отправку сообщений нескольким подписчикам.

2.3 Модель службы сообщений Java

  • одноранговая модель В модели одноранговой очереди производитель публикует сообщения в определенной очереди, а потребитель читает сообщения из этой очереди. Здесь производитель знает очередь потребителя и отправляет сообщения непосредственно в очередь потребителя. Этот режим имеет следующие характеристики: Сообщение получит только один потребитель; Производителю не нужно, чтобы потребитель работал во время потребления сообщения, и потребителю также не нужно работать, когда производитель отправляет сообщение; Каждое успешно обработанное сообщение подписывается потребителем.
  • Модель издателя/подписчика Модель издатель/подписчик поддерживаетТема сообщениясделать объявление. В этой модели издатели и подписчики не знают друг о друге, подобно анонимной доске объявлений. Этот режим имеет следующие характеристики: Несколько потребителей могут получать сообщения; Между издателями и подписчиками существует временная зависимость. Издатели должны установить подписку, чтобы потребители могли подписаться. Подписчики должны оставаться постоянно активными, чтобы получать сообщения, если подписчик не устанавливает постоянную подписку.

2.4 Очередь сообщений (ActiveMQ)

ActiveMQ — это реализация спецификации JMS, вот как ее использовать

  • Скачать ActiveMQ Перейдите на официальный сайт для загрузки: http://activemq.apache.org/
  • запустить ActiveMQ Разархивируйте apache-activemq-5.5.1-bin.zip (аналогично Tomcat, его можно использовать после распаковки), некоторые люди, которых я искал в Интернете, изменили адрес подключения и протокол в файле конфигурации activeMQ.xml, я не изменить его во время теста Вы также можете успешно протестировать.Если ваш тест не пройден, вы можете изменить его следующим образом:
<transportConnectors>
   <transportConnector name="openwire" uri="tcp://localhost:61616"/>
   <transportConnector name="ssl"     uri="ssl://localhost:61617"/>
   <transportConnector name="stomp"   uri="stomp://localhost:61613"/>
   <transportConnector uri="http://localhost:8081"/>
   <transportConnector uri="udp://localhost:61618"/>
</transportConnectors>

Код теста выглядит следующим образом: Продукт производителя:

public class Product {

    private String username = ActiveMQConnectionFactory.DEFAULT_USER;
    private String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
    private String url = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;

    private Connection connection = null;
    private Session session = null;
    private String subject = "myQueue";
    private Destination destination = null;
    private MessageProducer producer = null;
    /**
     * @Description 初始化方法
     * @Author 刘俊重
     * @Date 2017/12/20
     */
    private void init() throws JMSException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(username,password,url);
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue(subject);
        producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    }

    public void productMessage(String message) throws JMSException {
        this.init();
        TextMessage textMessage = session.createTextMessage(message);
        connection.start();
        System.out.println("生产者准备发送消息:"+textMessage);
        producer.send(textMessage);
        System.out.println("生产者已发送完毕消息。。。");
    }

    public void close() throws JMSException {
        System.out.println("生产者开始关闭连接");
        if(null!=producer){
            producer.close();
        }
        if(null!=session){
            session.close();
        }
        if(null!=connection){
            connection.close();
        }
    }
}

Потребитель Потребитель:

public class Consumer implements MessageListener,ExceptionListener{
    private String name = ActiveMQConnectionFactory.DEFAULT_USER;
    private String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
    private String url = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
    private ActiveMQConnectionFactory connectionFactory = null;
    private Connection connection = null;
    private Session session = null;
    private String subject = "myQueue";
    private Destination destination = null;
    private MessageConsumer consumer = null;

    public static Boolean isconnection=false;
    /**
     * @Description 连接ActiveMQ
     * @Author 刘俊重
     * @Date 2017/12/20
     */
    private void init() throws JMSException {
        connectionFactory = new ActiveMQConnectionFactory(name,password,url);
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue(subject);
        consumer = session.createConsumer(destination);
    }

    public void consumerMessage() throws JMSException {
        this.init();
        connection.start();

        //设置消息监听和异常监听
        consumer.setMessageListener(this);
        connection.setExceptionListener(this);
        System.out.println("消费者开始监听....");
        isconnection = true;
        //Message receive = consumer.receive();
    }

    public void close() throws JMSException {
        if(null!=consumer){
            consumer.close();
        }
        if(null!=session){
            session.close();
        }
        if(null!=connection){
            connection.close();
        }
    }
    /**
     * 异常处理函数
     */
    @Override
    public void onException(JMSException exception) {
        //发生异常关闭连接
        isconnection = false;
    }

    /**
     * 消息处理函数
     */
    @Override
    public void onMessage(Message message) {
        try {
            if(message instanceof TextMessage){
                TextMessage textMsg = (TextMessage) message;
                String text = textMsg.getText();
                System.out.println("消费者接收到的消息======="+text);
            }else {
                System.out.println("接收的消息不符合");
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

Примечание. Потребителям необходимо реализовать MessageListener и ExceptionListener, чтобы контролировать обработку полученных сообщений и ошибок. Тестовый класс производителя TestProduct:

public class TestProduct {
    public static void main(String[] args) throws JMSException {
        for(int i=0;i<100;i++){
            Product product = new Product();
            product.productMessage("Hello World!"+i);
            product.close();
        }
    }
}

TestProduct используется для имитации генерации 100 сообщений и записи их в очередь ActiveMQ. Потребительский тестовый класс TestConsumer:

public class TestConsumer implements Runnable {
    static Thread thread = null;
    public static void main(String[] args) throws InterruptedException {
        thread = new Thread(new TestConsumer());
        thread.start();
        while (true){
            //时刻监听消息队列,如果线程死了,则新建一个线程
            boolean alive = thread.isAlive();
            System.out.println("当前线程状态:"+alive);
            if(!alive){
                thread = new Thread(new TestConsumer());
                thread.start();
                System.out.println("线程重启完成");
            }
            Thread.sleep(1000);
        }
    }
    @Override
    public void run() {
        try {
            Consumer consumer = new Consumer();
            consumer.consumerMessage();
            while (Consumer.isconnection) {
                //System.out.println(123);
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

TestConsumer использует здесь многопоточность, чтобы гарантировать наличие живого потока, ожидающего получения очереди сообщений ActiveMQ и вызова обработки потребителя в любое время. Резюме. Насколько я понимаю, межпотоковое взаимодействие использует очередь, например BlockingQueue, а межпроцессное взаимодействие использует JMS, например ActiveMQ. Прилагается статья об очереди сообщений, написанная архитектором 58 Shen Jian, которую можно использовать в качестве ссылки: http://dwz.cn/78yLxL Следует подчеркнуть, что любая ссылка на технологию должна служить решению бизнес-задачи, а не просто демонстрации мастерства. Например, возьмем службу сообщений в качестве примера. Например, когда пользователь регистрируется на определенном веб-сайте, после завершения регистрации я позвоню службе электронной почты и SMS, чтобы отправить ему уведомление. Я также могу использовать информацию, которую он заполняет, чтобы порекомендовать некоторых людей, которых он может знать.Пользователь, тогда основным бизнесом здесь является регистрация, другие пользователи уведомлений и рекомендаций могут быть обработаны в очереди сообщений, сначала ответить на регистрационную информацию, а затем вызвать другие службы для обработки двух служб уведомлений и рекомендаций пользователей. Тем не менее, количество пользователей на ранней стадии веб-сайта может быть относительно небольшим, и я могу удовлетворить свои потребности без очереди сообщений.Использование очереди сообщений увеличит сложность проекта, поэтому использование новых технологий необходимо быть решать бизнес-задачи, а не просто хвастаться навыками. Справочная документация: http://blog.csdn.net/fanzhigang0/article/details/43764121 http://blog.csdn.net/u010702229/article/details/18085263

Прикрепите личную общедоступную учетную запись WeChat, добро пожаловать на общение со мной.

Java开发日记