Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,26 @@
import jakarta.jms.JMSConsumer;
import jakarta.jms.JMSException;
import jakarta.jms.JMSRuntimeException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.Message;

import org.apache.activemq.util.JMSExceptionSupport;

public class ActiveMQConsumer implements JMSConsumer {

private final ActiveMQContext activemqContext;
private final MessageConsumer activemqMessageConsumer;
private final ActiveMQMessageConsumer consumer;

ActiveMQConsumer(ActiveMQContext activemqContext, MessageConsumer activemqMessageConsumer) {
ActiveMQConsumer(ActiveMQContext activemqContext, MessageConsumer consumer) {
this.activemqContext = activemqContext;
this.activemqMessageConsumer = activemqMessageConsumer;
this.consumer = (ActiveMQMessageConsumer) consumer;
}

@Override
public String getMessageSelector() {
try {
return activemqMessageConsumer.getMessageSelector();
return consumer.getMessageSelector();
} catch (JMSException e) {
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
}
Expand All @@ -47,7 +47,7 @@ public String getMessageSelector() {
@Override
public MessageListener getMessageListener() throws JMSRuntimeException {
try {
return activemqMessageConsumer.getMessageListener();
return consumer.getMessageListener();
} catch (JMSException e) {
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
}
Expand All @@ -56,7 +56,7 @@ public MessageListener getMessageListener() throws JMSRuntimeException {
@Override
public void setMessageListener(MessageListener listener) throws JMSRuntimeException {
try {
activemqMessageConsumer.setMessageListener(listener);
consumer.setMessageListener(listener);
} catch (JMSException e) {
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
}
Expand All @@ -65,7 +65,7 @@ public void setMessageListener(MessageListener listener) throws JMSRuntimeExcept
@Override
public Message receive() {
try {
return activemqMessageConsumer.receive();
return consumer.receive();
} catch (JMSException e) {
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
}
Expand All @@ -74,7 +74,7 @@ public Message receive() {
@Override
public Message receive(long timeout) {
try {
return activemqMessageConsumer.receive(timeout);
return consumer.receive(timeout);
} catch (JMSException e) {
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
}
Expand All @@ -83,7 +83,7 @@ public Message receive(long timeout) {
@Override
public Message receiveNoWait() {
try {
return activemqMessageConsumer.receiveNoWait();
return consumer.receiveNoWait();
} catch (JMSException e) {
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
}
Expand All @@ -92,25 +92,37 @@ public Message receiveNoWait() {
@Override
public void close() {
try {
activemqMessageConsumer.close();
consumer.close();
} catch (JMSException e) {
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
}
}
}

@Override
public <T> T receiveBody(Class<T> c) {
throw new UnsupportedOperationException("receiveBody(Class<T>) is not supported");
try {
return consumer.receiveBody(c);
} catch (JMSException e) {
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
}
}

@Override
public <T> T receiveBody(Class<T> c, long timeout) {
throw new UnsupportedOperationException("receiveBody(Class<T>, long) is not supported");
try {
return consumer.receiveBody(c, timeout);
} catch (JMSException e) {
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
}
}

@Override
public <T> T receiveBodyNoWait(Class<T> c) {
throw new UnsupportedOperationException("receiveBodyNoWait(Class<T>) is not supported");
try {
return consumer.receiveBodyNoWait(c);
} catch (JMSException e) {
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageFormatException;
import jakarta.jms.MessageListener;
import jakarta.jms.TransactionRolledBackException;

Expand All @@ -43,6 +44,7 @@
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConsumerId;
Expand Down Expand Up @@ -713,6 +715,221 @@ public Message receiveNoWait() throws JMSException {
return createActiveMQMessage(md);
}

/**
* Receives the next message produced for this message consumer and returns
* its body as an object of the specified type. This call blocks
* indefinitely until a message is produced or until this message consumer
* is closed.
* <p>
* If the message is not of a type for which the body can be assigned to
* the specified type, a {@code MessageFormatException} is thrown. The
* subsequent behaviour depends on the session's acknowledge mode:
* <ul>
* <li>{@code AUTO_ACKNOWLEDGE} / {@code DUPS_OK_ACKNOWLEDGE}: the
* message is not acknowledged and will be delivered again before any
* subsequent messages. This is not considered redelivery and does not
* cause the {@code JMSRedelivered} header or
* {@code JMSXDeliveryCount} property to be updated.</li>
* <li>{@code CLIENT_ACKNOWLEDGE}: the message is treated as delivered.
* The application must call {@code session.recover()} to have it
* redelivered.</li>
* <li>Transacted session: the message is treated as delivered within the
* transaction. The application must call {@code session.rollback()}
* to have it redelivered.</li>
* </ul>
* <p>
* This method cannot be used to receive {@code Message} or
* {@code StreamMessage} objects; a {@code MessageFormatException} will
* always be thrown for these types.
*
* @param c the type to which the body of the next message should be
* assigned
* @return the body of the next message, or null if this message consumer
* is concurrently closed
* @throws MessageFormatException if the message body cannot be assigned to
* the specified type, or if the message is a {@code Message} or
* {@code StreamMessage}
* @throws JMSException if the JMS provider fails to receive the next
* message due to some internal error
*/
public <T> T receiveBody(Class<T> c) throws JMSException {
checkClosed();
checkMessageListener();

sendPullCommand(0);
MessageDispatch md = dequeue(-1);
if (md == null) {
return null;
}

return doReceiveBody(md, c);
}

/**
* Receives the next message produced for this message consumer and returns
* its body as an object of the specified type, blocking up to the
* specified timeout. A {@code timeout} of zero never expires and the call
* blocks indefinitely.
* <p>
* If the message is not of a type for which the body can be assigned to
* the specified type, a {@code MessageFormatException} is thrown. The
* subsequent behaviour depends on the session's acknowledge mode:
* <ul>
* <li>{@code AUTO_ACKNOWLEDGE} / {@code DUPS_OK_ACKNOWLEDGE}: the
* message is not acknowledged and will be delivered again before any
* subsequent messages. This is not considered redelivery and does not
* cause the {@code JMSRedelivered} header or
* {@code JMSXDeliveryCount} property to be updated.</li>
* <li>{@code CLIENT_ACKNOWLEDGE}: the message is treated as delivered.
* The application must call {@code session.recover()} to have it
* redelivered.</li>
* <li>Transacted session: the message is treated as delivered within the
* transaction. The application must call {@code session.rollback()}
* to have it redelivered.</li>
* </ul>
* <p>
* This method cannot be used to receive {@code Message} or
* {@code StreamMessage} objects; a {@code MessageFormatException} will
* always be thrown for these types.
*
* @param c the type to which the body of the next message should be
* assigned
* @param timeout the timeout value (in milliseconds), a timeout of zero
* never expires
* @return the body of the next message, or null if the timeout expires or
* this message consumer is concurrently closed
* @throws MessageFormatException if the message body cannot be assigned to
* the specified type, or if the message is a {@code Message} or
* {@code StreamMessage}
* @throws JMSException if the JMS provider fails to receive the next
* message due to some internal error
*/
public <T> T receiveBody(Class<T> c, long timeout) throws JMSException {
checkClosed();
checkMessageListener();
if (timeout == 0) {
return this.receiveBody(c);
}

sendPullCommand(timeout);
while (timeout > 0) {
MessageDispatch md;
if (info.getPrefetchSize() == 0) {
md = dequeue(-1);
} else {
md = dequeue(timeout);
}

if (md == null) {
return null;
}

return doReceiveBody(md, c);
}
return null;
}

/**
* Receives the next message produced for this message consumer and returns
* its body as an object of the specified type if one is immediately
* available.
* <p>
* If the message is not of a type for which the body can be assigned to
* the specified type, a {@code MessageFormatException} is thrown. The
* subsequent behaviour depends on the session's acknowledge mode:
* <ul>
* <li>{@code AUTO_ACKNOWLEDGE} / {@code DUPS_OK_ACKNOWLEDGE}: the
* message is not acknowledged and will be delivered again before any
* subsequent messages. This is not considered redelivery and does not
* cause the {@code JMSRedelivered} header or
* {@code JMSXDeliveryCount} property to be updated.</li>
* <li>{@code CLIENT_ACKNOWLEDGE}: the message is treated as delivered.
* The application must call {@code session.recover()} to have it
* redelivered.</li>
* <li>Transacted session: the message is treated as delivered within the
* transaction. The application must call {@code session.rollback()}
* to have it redelivered.</li>
* </ul>
* <p>
* This method cannot be used to receive {@code Message} or
* {@code StreamMessage} objects; a {@code MessageFormatException} will
* always be thrown for these types.
*
* @param c the type to which the body of the next message should be
* assigned
* @return the body of the next message, or null if one is not immediately
* available
* @throws MessageFormatException if the message body cannot be assigned to
* the specified type, or if the message is a {@code Message} or
* {@code StreamMessage}
* @throws JMSException if the JMS provider fails to receive the next
* message due to some internal error
*/
public <T> T receiveBodyNoWait(Class<T> c) throws JMSException {
checkClosed();
checkMessageListener();
sendPullCommand(-1);

MessageDispatch md;
if (info.getPrefetchSize() == 0) {
md = dequeue(-1);
} else {
md = dequeue(0);
}

if (md == null) {
return null;
}

return doReceiveBody(md, c);
}

/**
* Checks that the message body can be assigned to the requested type,
* acknowledges the message, and returns its body. If the body cannot be
* assigned, the handling depends on the session's acknowledge mode:
* <ul>
* <li>AUTO_ACKNOWLEDGE / DUPS_OK_ACKNOWLEDGE: the message is re-enqueued
* without acknowledgement so that it remains available for a subsequent
* {@code receive} or {@code receiveBody} call.</li>
* <li>CLIENT_ACKNOWLEDGE / TRANSACTED: the message is treated as delivered
* (not re-enqueued). The application may call {@code session.recover()}
* or {@code session.rollback()} respectively to redeliver.</li>
* </ul>
* <p>
* Per Jakarta Messaging 3.1, {@code receiveBody} must always throw
* {@code MessageFormatException} for plain {@code Message} and
* {@code StreamMessage} types, regardless of what
* {@code isBodyAssignableTo} returns.
*/
private <T> T doReceiveBody(MessageDispatch md, Class<T> c) throws JMSException {
ActiveMQMessage message = createActiveMQMessage(md);

// Jakarta Messaging 3.1: receiveBody must always fail for Message and StreamMessage.
// Note: Message.isBodyAssignableTo() returns true for any type per spec,
// which conflicts with receiveBody's requirement, so we must check explicitly.
boolean bodyNotAssignable = message.getClass() == ActiveMQMessage.class
|| message instanceof ActiveMQStreamMessage
|| !message.isBodyAssignableTo(c);

if (bodyNotAssignable) {
if (session.isAutoAcknowledge() || session.isDupsOkAcknowledge()) {
// re-enqueue for redelivery on next receive/receiveBody call
unconsumedMessages.enqueueFirst(md);
} else {
// CLIENT_ACKNOWLEDGE or TRANSACTED: message is considered delivered,
// application must use session.recover() or session.rollback()
beforeMessageIsConsumed(md);
afterMessageIsConsumed(md, false);
}
throw new MessageFormatException("Message body cannot be read as type: " + c);
}

beforeMessageIsConsumed(md);
afterMessageIsConsumed(md, false);
return message.getBody(c);
}

/**
* Closes the message consumer.
* <P>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,15 @@ public void initTransients() {
}

@Override
public boolean isBodyAssignableTo(Class c) throws JMSException {
final Serializable object = getObject();
public boolean isBodyAssignableTo(Class c) {
final Serializable object;
try {
object = getObject();
} catch (JMSException e) {
// Per Jakarta Messaging 3.1: if the message is an ObjectMessage
// and object deserialization fails then false is returned.
return false;
}
if (object == null) {
return true;
}
Expand Down
Loading