diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java index 067e685683a..03a32927e3c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java @@ -64,6 +64,14 @@ public synchronized void purge() throws Exception { LOG.info("{} purge of {} messages", destination.getActiveMQDestination().getQualifiedName(), originalMessageCount); } + public synchronized void purge(long numberOfMessages) throws Exception { + final long originalMessageCount = destination.getDestinationStatistics().getMessages().getCount(); + + ((Queue)destination).purge(numberOfMessages); + + LOG.info("{} purge {} of {} messages", destination.getActiveMQDestination().getQualifiedName(), numberOfMessages, originalMessageCount); + } + public synchronized boolean removeMessage(String messageId) throws Exception { return ((Queue)destination).removeMessage(messageId); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java index 27ef61c9f0f..67175d46374 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java @@ -70,6 +70,14 @@ public interface QueueViewMBean extends DestinationViewMBean { @MBeanInfo("Removes all of the messages in the queue.") void purge() throws Exception; + /** + * Removes the first number of messages in the queue. + * + * @throws Exception + */ + @MBeanInfo("Removes the first number of messages in the queue.") + void purge(long numberOfMessages) throws Exception; + /** * Copies a given message to another destination. * diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 048512cafbc..cd534c8a354 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1320,9 +1320,20 @@ public QueueMessageReference getMessage(String id) { } public void purge() throws Exception { + purge(this.destinationStatistics.getMessages().getCount()); + } + + public void purge(long numberOfMessages) throws Exception { + + if (numberOfMessages <= 0) { + return; + } + ConnectionContext c = createConnectionContext(); List list = null; sendLock.lock(); + + long purgeCount = 0L; try { long originalMessageCount = this.destinationStatistics.getMessages().getCount(); do { @@ -1330,24 +1341,32 @@ public void purge() throws Exception { pagedInMessagesLock.readLock().lock(); try { list = new ArrayList(pagedInMessages.values()); - }finally { + } finally { pagedInMessagesLock.readLock().unlock(); } - for (MessageReference ref : list) { + int deleteCount = list.size(); + if ((numberOfMessages - purgeCount) < list.size()) { + deleteCount = (int)(numberOfMessages - purgeCount); + } + + for (int n=0; n < deleteCount; n++) { try { - QueueMessageReference r = (QueueMessageReference) ref; + QueueMessageReference r = (QueueMessageReference) list.get(n); removeMessage(c, r); messages.rollback(r.getMessageId()); + purgeCount++; } catch (IOException e) { } } // don't spin/hang if stats are out and there is nothing left in the // store - } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0); + } while (!list.isEmpty() && + this.destinationStatistics.getMessages().getCount() > 0 && + purgeCount < numberOfMessages); - if (this.destinationStatistics.getMessages().getCount() > 0) { - LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount, this.destinationStatistics.getMessages().getCount()); + if (numberOfMessages == originalMessageCount && this.destinationStatistics.getMessages().getCount() > 0) { + LOG.warn("{} after purge {} of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), numberOfMessages, originalMessageCount, this.destinationStatistics.getMessages().getCount()); } } finally { sendLock.unlock(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java index 780a7f5aa0e..7aef8b2be4d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java @@ -112,6 +112,39 @@ public void testPurge() throws Exception { producer.close(); } + public void testPurgeCount() throws Exception { + // Send some messages + int messagesSent = 1_000; + int messagesPurge = 200; + + connection = connectionFactory.createConnection(); + connection.setClientID(clientID); + connection.start(); + Session session = connection.createSession(transacted, authMode); + destination = createDestination(); + MessageProducer producer = session.createProducer(destination); + for (int i = 0; i < messagesSent; i++) { + Message message = session.createTextMessage("Message: " + i); + producer.send(message); + } + + // Now get the QueueViewMBean and purge + String objectNameStr = broker.getBrokerObjectName().toString(); + objectNameStr += ",destinationType=Queue,destinationName="+getDestinationString(); + ObjectName queueViewMBeanName = assertRegisteredObjectName(objectNameStr); + QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + long count = proxy.getQueueSize(); + assertEquals("Queue size", count, messagesSent); + + for (int i = 1; i <= 5; i++) { + proxy.purge(messagesPurge); + count = proxy.getQueueSize(); + assertEquals("Queue size", count, messagesSent - (messagesPurge * i)); + } + producer.close(); + } + public void initCombosForTestDelete() { addCombinationValues("persistenceAdapter", new Object[] {new MemoryPersistenceAdapter(), new KahaDBPersistenceAdapter()}); }