死磕java底层(二)—消息服务

这一节作为上一节多线程的延续,先说一下java原生的阻塞队列(Blocking Queue),之后再说一下JMS(Java Messaging Service,java消息服务)以及它的实现之一ActiveMQ消息队列,所以都归并到消息服务中讨论。

1.阻塞队列(Blocking Queue)

BlockingQueue也是java.util.concurrent下的接口,它解决了多线程中如何高效传输数据的问题,通过这些高效并且线程安全的类,我们可以搭建高质量的多线程程序。 主要用来控制线程同步的工具。
BlockingQueue是一个接口,里面的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit);
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit)
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}

  • 插入:
  1. add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则抛出异常,不好
  2. offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.
  3. put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续, 有阻塞, 放不进去就等待
  • 读取:
  1. poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null; 取不到返回null
  2. take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止; 阻塞, 取不到就一直等
  • 其他
  1. int remainingCapacity();返回队列剩余的容量,在队列插入和获取的时候使用,数据可能不准。
  2. boolean remove(Object o); 从队列移除元素,如果存在,即移除一个或者更多,队列改变了返回true
  3. public boolean contains(Object o); 查看队列是否存在这个元素,存在返回true
  4. int drainTo(Collection<? super E> c); 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。(即取出放到集合中)
  5. int drainTo(Collection<? super E> c, int maxElements); 和上面方法的区别在于,指定了移动的数量; (取出指定个数放到集合)
    主要的方法是:put、take一对阻塞存取;add、poll一对非阻塞存取。
    上面说了BlockingQueue是一个接口,它有四个具体的实现类,常用的有两个:
  6. ArrayBlockingQueue:一个由数组支持的有界阻塞队列,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的。
  7. LinkedBlockingQueue:大小不定的BlockingQueue,其构造函数中可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。
    LinkedBlockingQueue和ArrayBlockingQueue区别:
    LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.
    下面是是用BlockingQueue实现的生产者和消费者的示例:
    生产者Product:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public class Product implements Runnable{

    BlockingQueue<String> queue;
    public Product(BlockingQueue<String> queue) {
    //创建对象时就传入一个阻塞队列
    this.queue = queue;
    }
    @Override
    public void run(){
    try {
    System.out.println(Thread.currentThread().getName()+"开始生产");
    String temp = Thread.currentThread().getName()+":生产线程";
    queue.put(temp);//向队列中放数据,如果队列是满的话,会阻塞当前线程
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    }
    }

消费者Consumer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Consumer implements Runnable{
BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> queue) {
//使用有参构造函数的目的是我在创建这个消费者对象的时候就可以传进来一个队列
this.queue = queue;
}
@Override
public void run() {
Random random = new Random();
try {
while(true){
Thread.sleep(random.nextInt(10));
System.out.println(Thread.currentThread().getName()+ "准备消费...");
String temp = queue.take();//从队列中取任务消费,如果队列为空,会阻塞当前线程
System.out.println(Thread.currentThread().getName() + " 获取到工作任务==== " +temp);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

测试类TestQueue:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class TestQueue {
public static void main(String[] args) {
//新建一个阻塞队列,队列长度是5
BlockingQueue<String> queue = new LinkedBlockingDeque<String>(5);
//BlockingQueue<String> queue = new ArrayBlockingQueue<String>(5);
Consumer consumer = new Consumer(queue);
Product product = new Product(queue);

for(int i = 0;i<3;i++){
new Thread(product,"product"+i).start();
}

//for (int i = 0;i<5;i++){
new Thread(consumer,"consumer").start();
//}
}
}

整套代码的意思就是初始化一个消息队列,里面放String类型,队列长度是5,使用生产者线程来模拟三个用户发出请求,把用户的请求数据暂时放在BlockingQueue队列里面,随后消费者线程不断的从队列里面取任务进行业务逻辑处理,直到队列里面消费的什么都不剩了。由此可以看出消息队列有两大特点:解耦和削峰填谷。生产者和消费者毛关系没有,生产者往队列里放数据,消费者从队列里取数据,它们都跟队列建立关系,解耦;生产者如果并发量很高也只不过是把数据先放到队列里,消费者可以慢慢吃,实际中不会立马拖垮服务端。
参考地址:http://blog.csdn.net/ghsau/article/details/8108292

2.Java消息服务

2.1JMS简介

JMS即Java消息服务(Java Message Service)用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。JMS是一种与厂商(或者说是平台)无关的 API。类似于JDBC(Java Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。
许多厂商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ等等。 JMS 可以让你通过消息收发服务从一个 JMS 客户机向另一个 JMS客户机发送消息。
消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成;消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。

2.2JMS的组成

JMS由以下元素组成:
JMS提供者provider:面向消息中间件的,JMS规范的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。
JMS客户:生产或消费基于消息的Java应用程序或对象(即生产者和消费者都统称JMS客户)。
JMS生产者:创建并发送消息的JMS客户。
JMS消费者:接收消息的JMS客户。
JMS消息:可以在JMS客户之间传递数据的对象
JMS队列:一个容纳被发送的正在等待阅读的消息的区域。一个消息如果被阅读,它将被从队列中移走。
JMS主题:一种支持发送消息给多个订阅者的机制。

2.3Java消息服务模型

  • 点对点模型
    在点对点队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。
    这种模式有如下特点:
    只有一个消费者将获得消息;
    生产者不需要消费者在消费该消息期间处于运行状态,消费者也同样不需要在生产者在发送消息时处于运行状态;
    每一个成功处理的消息都由消费者者签收。
  • 发布者/订阅者模型
    发布者/订阅者模型支持向一个特定的消息主题发布消息。在这种模型下,发布者和订阅者彼此不知道对方,类似于匿名公告板。
    这种模式有如下特点:
    多个消费者可以获得消息;
    在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便消费者能够订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。

    2.4消息队列(ActiveMQ)

    ActiveMQ是JMS规范的一种实现,下面说如何使用
  • 下载ActiveMQ
    去官方网站下载:http://activemq.apache.org/
  • 运行ActiveMQ
    解压apache-activemq-5.5.1-bin.zip(类似于Tomcat,解压即可用),我在网上搜的有的人修改了配置文件activeMQ.xml中连接的地址和协议,我在测试时没有修改也可以测试成功,如果你测试不成功可以修改如下:
    1
    2
    3
    4
    5
    6
    7
    <transportConnectors>
    <transportConnector name="openwire" uri="tcp://localhost:61616"/>
    <transportConnector name="ssl" uri="ssl://localhost:61617"/>
    <transportConnector name="stomp" uri="stomp://localhost:61613"/>
    <transportConnector uri="http://localhost:8081"/>
    <transportConnector uri="udp://localhost:61618"/>
    </transportConnectors>

测试代码如下:
生产者Product:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class Product {

private String username = ActiveMQConnectionFactory.DEFAULT_USER;
private String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
private String url = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;

private Connection connection = null;
private Session session = null;
private String subject = "myQueue";
private Destination destination = null;
private MessageProducer producer = null;
/**
* @Description 初始化方法
* @Author 刘俊重
* @Date 2017/12/20
*/
private void init() throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(username,password,url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}

public void productMessage(String message) throws JMSException {
this.init();
TextMessage textMessage = session.createTextMessage(message);
connection.start();
System.out.println("生产者准备发送消息:"+textMessage);
producer.send(textMessage);
System.out.println("生产者已发送完毕消息。。。");
}

public void close() throws JMSException {
System.out.println("生产者开始关闭连接");
if(null!=producer){
producer.close();
}
if(null!=session){
session.close();
}
if(null!=connection){
connection.close();
}
}
}

消费者Consumer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class Consumer implements MessageListener,ExceptionListener{
private String name = ActiveMQConnectionFactory.DEFAULT_USER;
private String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
private String url = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
private ActiveMQConnectionFactory connectionFactory = null;
private Connection connection = null;
private Session session = null;
private String subject = "myQueue";
private Destination destination = null;
private MessageConsumer consumer = null;

public static Boolean isconnection=false;
/**
* @Description 连接ActiveMQ
* @Author 刘俊重
* @Date 2017/12/20
*/
private void init() throws JMSException {
connectionFactory = new ActiveMQConnectionFactory(name,password,url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}

public void consumerMessage() throws JMSException {
this.init();
connection.start();

//设置消息监听和异常监听
consumer.setMessageListener(this);
connection.setExceptionListener(this);
System.out.println("消费者开始监听....");
isconnection = true;
//Message receive = consumer.receive();
}

public void close() throws JMSException {
if(null!=consumer){
consumer.close();
}
if(null!=session){
session.close();
}
if(null!=connection){
connection.close();
}
}
/**
* 异常处理函数
*/
@Override
public void onException(JMSException exception) {
//发生异常关闭连接
isconnection = false;
}

/**
* 消息处理函数
*/
@Override
public void onMessage(Message message) {
try {
if(message instanceof TextMessage){
TextMessage textMsg = (TextMessage) message;
String text = textMsg.getText();
System.out.println("消费者接收到的消息======="+text);
}else {
System.out.println("接收的消息不符合");
}
} catch (JMSException e) {
e.printStackTrace();
}

}
}

注意:消费者需要实现MessageListener和ExceptionListener来监听收到消息和出错时的处理。
生产者测试类TestProduct:

1
2
3
4
5
6
7
8
9
public class TestProduct {
public static void main(String[] args) throws JMSException {
for(int i=0;i<100;i++){
Product product = new Product();
product.productMessage("Hello World!"+i);
product.close();
}
}
}

TestProduct是用来模拟生成100条消息,写入到ActiveMQ队列中。
消费者测试类TestConsumer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class TestConsumer implements Runnable {
static Thread thread = null;
public static void main(String[] args) throws InterruptedException {
thread = new Thread(new TestConsumer());
thread.start();
while (true){
//时刻监听消息队列,如果线程死了,则新建一个线程
boolean alive = thread.isAlive();
System.out.println("当前线程状态:"+alive);
if(!alive){
thread = new Thread(new TestConsumer());
thread.start();
System.out.println("线程重启完成");
}
Thread.sleep(1000);
}
}
@Override
public void run() {
try {
Consumer consumer = new Consumer();
consumer.consumerMessage();
while (Consumer.isconnection) {
//System.out.println(123);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

TestConsumer这里用了多线程,保证时刻有个线程活着等着接收ActiveMQ的消息队列并调用消费者处理。
总结:我的理解是线程间通信使用queue,如BlockingQueue,进程间通信使用JMS,如ActiveMQ。
另附上一篇将58架构师沈剑老师写的消息队列的文章,可以作为参考:http://dwz.cn/78yLxL
需要强调的是任何一项技术的引用都要为解决业务问题服务,而不能是单纯的炫技。举个例子,就拿消息服务来说,比如用户注册某个网站,注册完了之后我要调用邮件和短信服务给他发通知,我可能还要通过他填的信息,给他推荐一下可能认识的用户,那么这里核心业务是注册,其它的发通知和推荐用户就可以放在消息队列里处理,先响应注册信息,随后调用其它服务来处理发通知和推荐用户这两个业务。但是网站前期可能用户量比较少,不用消息队列就能满足我的需求了,引用消息队列反而会增加项目的复杂性,所以新技术的使用一定是需要解决业务的问题,而不是单纯的炫技。
参考文档:
http://blog.csdn.net/fanzhigang0/article/details/43764121
http://blog.csdn.net/u010702229/article/details/18085263

刘俊重 wechat
欢迎关注我的微信公众号
坚持原创技术分享