在消息传递的进程中,某些情况下比如网络闪断、丢包等会致使消息永久性丢失,这时候消费者是接收不到消息的,这样就会造成数据不1致的问题。那末我们怎样才能保证消息1定能发送给消费者呢?怎样才能避免数据不1致呢?又比如我们发送多条消息,有时候我们期望都发送成功但实际上其中1部份发送成功,另外一部份发送失败了,没到达我们的预期效果,那末我们怎样解决这个问题呢?
前1种问题我们通过消息确认机制来解决,它分为几种模式,需要在创建session时指定是不是开启事务和确认模式,像下面这样:
<span style="font-size:12px;">ActiveMQSession session = connection.createSession(true,Session.CLIENT_ACKNOWLEDGE);</span>
然后我们来看在PubSubscribe模式下消息的全部从发送到消费确认的流程来了解消息的确认机制和事务。首先看看producer怎样发送消息的:
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
//检查session状态,如果session已关闭则抛出状态异常
checkClosed();
//检查destination类型,如果不符合要求就转变成ActiveMQDestination
if (destination == null) {
if (info.getDestination() == null) {
throw new UnsupportedOperationException("A destination must be specified.");
}
throw new InvalidDestinationException("Don't understand null destinations");
}
ActiveMQDestination dest;
if (destination.equals(info.getDestination())) {
dest = (ActiveMQDestination)destination;
} else if (info.getDestination() == null) {
dest = ActiveMQDestination.transform(destination);
} else {
throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
}
if (dest == null) {
throw new JMSException("No destination specified");
}
if (transformer != null) {
//把各种不同的message转换成ActiveMQMessage
Message transformedMessage = transformer.producerTransform(session, this, message);
if (transformedMessage != null) {
message = transformedMessage;
}
}
if (producerWindow != null) {
try {
producerWindow.waitForSpace();
} catch (InterruptedException e) {
throw new JMSException("Send aborted due to thread interrupt.");
}
}
//发送消息到broker中的topic
this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);
//消息计数
stats.onMessage();
}
我们以ActiveMQSession的send为例再来看看session是怎样发送消息的:
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
//检查session状态如果closed抛出状态异常
checkClosed();
if (destination.isTemporary() && connection.isDeleted(destination)) {
throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
}
//竞争锁(互斥信号量),如果1个session的多个producer发送消息这里会保证有序性
synchronized (sendMutex) {
// tell the Broker we are about to start a new transaction
//告知broker开始1个新事务,只有session的确认模式是SESSION_TRANSACTED时事务上下网才会开启事务
doStartTransaction();
//从事务上下文中获得事务id
TransactionId txid = transactionContext.getTransactionId();
long sequenceNumber = producer.getMessageSequence();
//Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
//在jms协议头中设置传输模式即消息是不是需要持久化
message.setJMSDeliveryMode(deliveryMode);
long expiration = 0L;
//检查producer中的message是不是过期
if (!producer.getDisableMessageTimestamp()) {
//message获得时间戳
long timeStamp = System.currentTimeMillis();
message.setJMSTimestamp(timeStamp);
//设置过期时间
if (timeToLive > 0) {
expiration = timeToLive + timeStamp;
}
}
//设置消息过期时间
message.setJMSExpiration(expiration);
//设置消息优先级
message.setJMSPriority(priority);
//设置消息是非重发的
message.setJMSRedelivered(false);
// transform to our own message format here
//将消息转化成ActiveMQMessage,message针对不同的数据格式有很多种,比如map message,blob message
ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
//设置目的地,这里是1个topic
msg.setDestination(destination);
//设置消息id
msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
// Set the message id.
//如果消息是经过转换的,那末原消息更新新的id
if (msg != message) {
message.setJMSMessageID(msg.getMessageId().toString());
// Make sure the JMS destination is set on the foreign messages too.
//设置目的地
message.setJMSDestination(destination);
}
//clear the brokerPath in case we are re-sending this message
//清除brokerpath
msg.setBrokerPath(null);
//设置事务id
msg.setTransactionId(txid);
//
if (connection.isCopyMessageOnSend()) {
msg = (ActiveMQMessage)msg.copy();
}
//设置连接器
msg.setConnection(connection);
//把消息的属性和消息体都设置为只读,避免被修改
msg.onSend();
//生产者id
msg.setProducerId(msg.getMessageId().getProducerId());
if (LOG.isTraceEnabled()) {
LOG.trace(getSessionId() + " sending message: " + msg);
}
//如果onComplete没设置,且发送超时时间小于0,且消息不需要反馈,且连接器不是同步发送模式,且(消息非持久化或连接器是异步发送模式或存在事务id的情况下)异步发送,否则同步发送
if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
//异步发送走transport的oneway通道
this.connection.asyncSendPacket(msg);
if (producerWindow != null) {
// Since we defer lots of the marshaling till we hit the
// wire, this might not
// provide and accurate size. We may change over to doing
// more aggressive marshaling,
// to get more accurate sizes.. this is more important once
// users start using producer window
// flow control.
int size = msg.getSize();
producerWindow.increaseUsage(size);
}
} else {
if (sendTimeout > 0 && onComplete==null) {
//同步发送走transport的request和asyncrequest通道
this.connection.syncSendPacket(msg,sendTimeout);
}else {
this.connection.syncSendPacket(msg, onComplete);
}
}
}
}
这样消息就被发送到broker的topic中了,接下来broker中会根据topic下的subscriber的id找出定阅者,并向这些消费者发送消息,消费者接收到消息后会消费消息,我们接下来看看消费者怎样消费消息的。
下面是ActiveMQConsumer和ActiveMQSession中的方法,session没创建1个consumer便可能会重启session线程,session线程的run中会调用message的listener中的onMessage方法
public void setMessageListener(MessageListener listener) throws JMSException {
checkClosed();
if (info.getPrefetchSize() == 0) {
throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
}
if (listener != null) {
boolean wasRunning = session.isRunning();
//停止session线程
if (wasRunning) {
session.stop();
}
this.messageListener.set(listener);
//session重新分发未消费的message
session.redispatch(this, unconsumedMessages);
//开启session线程
if (wasRunning) {
session.start();
}
} else {
this.messageListener.set(null);
}
}
public void run() {
MessageDispatch messageDispatch;
while ((messageDispatch = executor.dequeueNoWait()) != null) {
final MessageDispatch md = messageDispatch;
final ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
MessageAck earlyAck = null;
//如果消息过期创建新的确认消息
if (message.isExpired()) {
earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1);
earlyAck.setFirstMessageId(message.getMessageId());
} else if (connection.isDuplicate(ActiveMQSession.this, message)) {
LOG.debug("{} got duplicate: {}", this, message.getMessageId());
earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
earlyAck.setFirstMessageId(md.getMessage().getMessageId());
earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
}
//如果消息已过期,或消息有冲突则发送确认消息重新开始while循环
if (earlyAck != null) {
try {
asyncSendPacket(earlyAck);
} catch (Throwable t) {
LOG.error("error dispatching ack: {} ", earlyAck, t);
connection.onClientInternalException(t);
} finally {
continue;
}
}
//如果是确认模式是CLIENT_ACKNOWLEDGE或INDIVIDUAL_ACKONWLEDGE则设置空回调函数,这样consumer确认消息后会履行回调函数
if (isClientAcknowledge()||isIndividualAcknowledge()) {
message.setAcknowledgeCallback(new Callback() {
@Override
public void execute() throws Exception {
}
});
}
//在发送前调用途理函数
if (deliveryListener != null) {
deliveryListener.beforeDelivery(this, message);
}
//设置delivery id
md.setDeliverySequenceId(getNextDeliveryId());
lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId();
final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
final AtomicBoolean afterDeliveryError = new AtomicBoolean(false);
/*
* The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched.
* We dont want the after deliver being called after the redeliver as it may cause some weird stuff.
* */
synchronized (redeliveryGuard) {
try {
ack.setFirstMessageId(md.getMessage().getMessageId());
//如果是事务模式则开启事务
doStartTransaction();
ack.setTransactionId(getTransactionContext().getTransactionId());
if (ack.getTransactionId() != null) {
//事务状态下添加1个匿名同步器,用于处理同步事务比如回滚
getTransactionContext().addSynchronization(new Synchronization() {
final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get());
@Override
public void beforeEnd() throws Exception {
// validate our consumer so we don't push stale acks that get ignored
if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection);
throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection);
}
LOG.trace("beforeEnd ack {}", ack);
sendAck(ack);
}
@Override
public void afterRollback() throws Exception {
LOG.trace("rollback {}", ack, new Throwable("here"));
// ensure we don't filter this as a duplicate
connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
// don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect
if (clearRequestsCounter.get() > clearRequestCount) {
LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport());
return;
}
// validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched
if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport());
return;
}
RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
int redeliveryCounter = md.getMessage().getRedeliveryCounter();
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&& redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) {
// We need to NACK the messages so that they get
// sent to the
// DLQ.
// Acknowledge the last message.
MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
asyncSendPacket(ack);
} else {
MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
asyncSendPacket(ack);
// Figure out how long we should wait to resend
// this message.
long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
for (int i = 0; i < redeliveryCounter; i++) {
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
}
/*
* If we are a non blocking delivery then we need to stop the executor to avoid more
* messages being delivered, once the message is redelivered we can restart it.
* */
if (!connection.isNonBlockingRedelivery()) {
LOG.debug("Blocking session until re-delivery...");
executor.stop();
}
connection.getScheduler().executeAfterDelay(new Runnable() {
@Override
public void run() {
/*
* wait for the first delivery to be complete, i.e. after delivery has been called.
* */
synchronized (redeliveryGuard) {
/*
* If its non blocking then we can just dispatch in a new session.
* */
if (connection.isNonBlockingRedelivery()) {
((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
} else {
/*
* If there has been an error thrown during afterDelivery then the
* endpoint will be marked as dead so redelivery will fail (and eventually
* the session marked as stale), in this case we can only call dispatch
* which will create a new session with a new endpoint.
* */
if (afterDeliveryError.get()) {
((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
} else {
executor.executeFirst(md);
executor.start();
}
}
}
}
}, redeliveryDelay);
}
md.getMessage().onMessageRolledBack();
}
});
}
LOG.trace("{} onMessage({})", this, message.getMessageId());
//触发消息事件监听函数
messageListener.onMessage(message);
} catch (Throwable e) {
LOG.error("error dispatching message: ", e);
// A problem while invoking the MessageListener does not
// in general indicate a problem with the connection to the broker, i.e.
// it will usually be sufficient to let the afterDelivery() method either
// commit or roll back in order to deal with the exception.
// However, we notify any registered client internal exception listener
// of the problem.
connection.onClientInternalException(e);
} finally {
//发送确认消息
if (ack.getTransactionId() == null) {
try {
asyncSendPacket(ack);
} catch (Throwable e) {
connection.onClientInternalException(e);
}
}
}
//触发投递事件监听函数
if (deliveryListener != null) {
try {
deliveryListener.afterDelivery(this, message);
} catch (Throwable t) {
LOG.debug("Unable to call after delivery", t);
afterDeliveryError.set(true);
throw new RuntimeException(t);
}
}
}
/*
* this can be outside the try/catch as if an exception is thrown then this session will be marked as stale anyway.
* It also needs to be outside the redelivery guard.
* */
try {
executor.waitForQueueRestart();
} catch (InterruptedException ex) {
connection.onClientInternalException(ex);
}
}
}
总结
消息确认机制
消费者和broker通讯终究实现消息确认,消息确认机制1共有5种,4种jms的和1种activemq补充的,AUTO_ACKNOWLEDGE(自动确认)、CLIENT_ACKNOWLEDGE(客户确认)、DUPS_OK_ACKNOWLEDGE(批量确认)、SESSION_TRANSACTED(事务确认)、INDIVIDUAL_ACKNOWLEDGE(单条确认),consumer在不同的模式下会发不同的命令到broker,broker再根据不同的命令进行操作,如果consumer正常发送ack命令给broker,broker会从topic移除消息并烧毁,如果未从消费者接遭到确认命令,broker会将消息转移到dlq队列(dead
letter queue),并根据delivery mode进行重试或报异常。
消息事务
消息事务是在生产者producer到broker或broker到consumer进程中同1个session中产生的,保证几条消息在发送进程中的原子性。可以在connection的createSession方法中指定1个布尔值开启,如果消息确认机制是事务确认,那末在发送message的进程中session就会开启事务(实际上broker的),不用用户显示调用 beginTransaction,这时候所有通过session发送的消息都被缓存下来,用户调用session.commit时会发送所有消息,当发送出现异常时用户可以调用rollback进行回滚操作,只有在开启事务状态下有效。
最后附上1张他人画的activemq消息处理的流转图。