Skip to content

Commit be90ac4

Browse files
ARTEMIS-5573 and ARTEMIS-5975 Improve AMQP Size estimation and make it static
1 parent 730940c commit be90ac4

10 files changed

Lines changed: 410 additions & 149 deletions

File tree

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ public Message getMessage() {
8080

8181
private boolean reencoded = false;
8282

83+
private int applicationPropertiesSize;
84+
8385
/**
8486
* AMQPLargeMessagePersister will save the buffer here.
8587
*/
@@ -264,7 +266,9 @@ protected void readSavedEncoding(ByteBuf buf) {
264266
applicationPropertiesPosition = buf.readInt();
265267
remainingBodyPosition = buf.readInt();
266268

269+
int applicationPropertiesInitialPosition = buf.readerIndex();
267270
applicationProperties = (ApplicationProperties)TLSEncode.getDecoder().readObject();
271+
this.applicationPropertiesSize = buf.readerIndex() - applicationPropertiesInitialPosition;
268272

269273
if (properties != null && properties.getAbsoluteExpiryTime() != null && properties.getAbsoluteExpiryTime().getTime() > 0) {
270274
if (!expirationReload) {
@@ -412,6 +416,16 @@ private void genericParseLargeMessage() {
412416
}
413417
}
414418

419+
@Override
420+
protected ApplicationProperties readApplicationProperties(ReadableBuffer data, int position) {
421+
applicationProperties = super.readApplicationProperties(data, position);
422+
if (applicationProperties != null) {
423+
this.applicationPropertiesSize = data.position() - position;
424+
}
425+
return applicationProperties;
426+
}
427+
428+
415429
protected void parseLargeMessage(ReadableBuffer data) {
416430
MessageDataScanningStatus status = getDataScanningStatus();
417431
if (status == MessageDataScanningStatus.NOT_SCANNED) {
@@ -604,8 +618,7 @@ public long getWholeMessageSize() {
604618
@Override
605619
public synchronized int getMemoryEstimate() {
606620
if (memoryEstimate == -1) {
607-
memoryEstimate = memoryOffset * 2 + (extraProperties != null ? extraProperties.getEncodeSize() : 0);
608-
originalEstimate = memoryEstimate;
621+
memoryEstimate = AMQP_OFFSET + (extraProperties != null ? extraProperties.getEncodeSize() : 0) + applicationPropertiesSize * 4;
609622
}
610623
return memoryEstimate;
611624
}

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java

Lines changed: 23 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.apache.activemq.artemis.api.core.SimpleString;
4545
import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants;
4646
import org.apache.activemq.artemis.core.message.openmbean.MessageOpenTypeFactory;
47-
import org.apache.activemq.artemis.core.paging.PagingStore;
4847
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
4948
import org.apache.activemq.artemis.core.persistence.Persister;
5049
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -119,6 +118,10 @@
119118
*/
120119
public abstract class AMQPMessage extends RefCountMessage implements org.apache.activemq.artemis.api.core.Message {
121120

121+
// how much an AMQP Message takes more in the memory, beyond the Message.offset.
122+
// this is an estimate, and it's based on testing
123+
public static final int AMQP_OFFSET = 1300;
124+
122125
private static final SimpleString ANNOTATION_AREA_PREFIX = SimpleString.of("m.");
123126

124127
protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -146,7 +149,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
146149
* developing purposes.
147150
*/
148151
public enum MessageDataScanningStatus {
149-
NOT_SCANNED(0), RELOAD_PERSISTENCE(1), SCANNED(2);
152+
NOT_SCANNED(0), SCANNED(1);
150153

151154
private static final MessageDataScanningStatus[] STATES;
152155

@@ -205,7 +208,6 @@ private static void checkCode(int code) {
205208
protected long messageID;
206209
protected SimpleString address;
207210
protected volatile int memoryEstimate = -1;
208-
protected volatile int originalEstimate = -1;
209211
protected long expiration;
210212
protected boolean expirationReload = false;
211213
protected long scheduledTime = -1;
@@ -546,36 +548,27 @@ protected ApplicationProperties lazyDecodeApplicationProperties() {
546548
// need to synchronize access to lazyDecodeApplicationProperties to avoid clashes with getMemoryEstimate
547549
protected synchronized ApplicationProperties lazyDecodeApplicationProperties(ReadableBuffer data) {
548550
if (applicationProperties == null && applicationPropertiesPosition != VALUE_NOT_PRESENT) {
549-
applicationProperties = scanForMessageSection(data, applicationPropertiesPosition, ApplicationProperties.class);
550-
if (owner != null && memoryEstimate != -1) {
551-
// the memory has already been tracked and needs to be updated to reflect the new decoding
552-
int addition = unmarshalledApplicationPropertiesMemoryEstimateFromData(data);
553-
554-
// it is difficult to track the updates for paged messages
555-
// for that reason we won't do it if paged
556-
// we also only do the update if the message was previously routed
557-
// so if a debug method or an interceptor changed the size before routing we would get a different size
558-
if (!isPaged && routed) {
559-
((PagingStore) owner).addSize(addition, false);
560-
final int updatedEstimate = memoryEstimate + addition;
561-
memoryEstimate = updatedEstimate;
562-
}
563-
}
551+
readApplicationProperties(data, applicationPropertiesPosition);
564552
}
565553

566554
return applicationProperties;
567555
}
568556

557+
protected ApplicationProperties readApplicationProperties(ReadableBuffer data, int position) {
558+
applicationProperties = scanForMessageSection(data, position, ApplicationProperties.class);
559+
return applicationProperties;
560+
}
561+
569562
protected int unmarshalledApplicationPropertiesMemoryEstimateFromData(ReadableBuffer data) {
570-
if (applicationProperties != null) {
571-
// they have been unmarshalled, estimate memory usage based on their encoded size
572-
if (remainingBodyPosition != VALUE_NOT_PRESENT) {
573-
return remainingBodyPosition - applicationPropertiesPosition;
574-
} else {
575-
return data.capacity() - applicationPropertiesPosition;
576-
}
563+
// no need to rescan if it's from RELOAD_PERSISTENCE
564+
ensureScanning();
565+
566+
// they have been unmarshalled, estimate memory usage based on their encoded size
567+
if (remainingBodyPosition != VALUE_NOT_PRESENT) {
568+
return remainingBodyPosition - applicationPropertiesPosition;
569+
} else {
570+
return data.capacity() - applicationPropertiesPosition;
577571
}
578-
return 0;
579572
}
580573

581574
@SuppressWarnings("unchecked")
@@ -661,9 +654,6 @@ protected synchronized void ensureMessageDataScanned() {
661654
case NOT_SCANNED:
662655
scanMessageData();
663656
break;
664-
case RELOAD_PERSISTENCE:
665-
lazyScanAfterReloadPersistence();
666-
break;
667657
case SCANNED:
668658
// NO-OP
669659
break;
@@ -686,7 +676,6 @@ protected synchronized void resetMessageData() {
686676
priority = DEFAULT_MESSAGE_PRIORITY;
687677
encodedHeaderSize = 0;
688678
memoryEstimate = -1;
689-
originalEstimate = -1;
690679
scheduledTime = -1;
691680
encodedDeliveryAnnotationsSize = 0;
692681
headerPosition = VALUE_NOT_PRESENT;
@@ -885,12 +874,8 @@ public final void receiveBuffer(ByteBuf buffer) {
885874

886875
@Override
887876
public int getOriginalEstimate() {
888-
if (originalEstimate < 0) {
889-
// getMemoryEstimate should initialize originalEstimate
890-
return getMemoryEstimate();
891-
} else {
892-
return originalEstimate;
893-
}
877+
// getMemoryEstimate should initialize originalEstimate
878+
return getMemoryEstimate();
894879
}
895880

896881
@Override
@@ -1033,13 +1018,9 @@ protected int internalPersistSize() {
10331018
public abstract void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools);
10341019

10351020
protected synchronized void lazyScanAfterReloadPersistence() {
1036-
assert messageDataScanned == MessageDataScanningStatus.RELOAD_PERSISTENCE.code;
10371021
scanMessageData();
10381022
messageDataScanned = MessageDataScanningStatus.SCANNED.code;
10391023
modified = false;
1040-
// reinitialise memory estimate as message will already be on a queue
1041-
// and lazy decode will want to update
1042-
getMemoryEstimate();
10431024
}
10441025

10451026
@Override
@@ -1223,9 +1204,8 @@ public boolean isDurable() {
12231204
if (header != null && header .getDurable() != null) {
12241205
return header.getDurable();
12251206
} else {
1226-
// if header == null and scanningStatus=RELOAD_PERSISTENCE, it means the message can only be durable
1227-
// even though the parsing hasn't happened yet
1228-
return getDataScanningStatus() == MessageDataScanningStatus.RELOAD_PERSISTENCE;
1207+
// we will assume it's non persistent if no header
1208+
return false;
12291209
}
12301210
}
12311211

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java

Lines changed: 3 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,8 @@
3939
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
4040
import org.apache.qpid.proton.amqp.messaging.Properties;
4141
import org.apache.qpid.proton.amqp.messaging.Section;
42-
import org.apache.qpid.proton.codec.DecodeException;
43-
import org.apache.qpid.proton.codec.DecoderImpl;
4442
import org.apache.qpid.proton.codec.EncoderImpl;
45-
import org.apache.qpid.proton.codec.EncodingCodes;
4643
import org.apache.qpid.proton.codec.ReadableBuffer;
47-
import org.apache.qpid.proton.codec.TypeConstructor;
4844
import org.apache.qpid.proton.codec.WritableBuffer;
4945

5046
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
@@ -192,14 +188,7 @@ protected ReadableBuffer getData() {
192188
@Override
193189
public synchronized int getMemoryEstimate() {
194190
if (memoryEstimate == -1) {
195-
if (isPaged) {
196-
// When the message is paged, we don't take the unmarshalled application properties because it could be
197-
// updated at different places. We just keep the estimate simple when paging.
198-
memoryEstimate = memoryOffset + (data != null ? data.capacity() : 0);
199-
} else {
200-
memoryEstimate = memoryOffset + (data != null ? data.capacity() + unmarshalledApplicationPropertiesMemoryEstimateFromData(data) : 0);
201-
}
202-
originalEstimate = memoryEstimate;
191+
memoryEstimate = AMQP_OFFSET + (data != null ? data.capacity() + unmarshalledApplicationPropertiesMemoryEstimateFromData(data) * 4 : 0);
203192
}
204193

205194
return memoryEstimate;
@@ -237,79 +226,10 @@ public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pool
237226

238227
// Message state is now that the underlying buffer is loaded, but the contents not yet scanned
239228
resetMessageData();
240-
recoverHeaderDataFromEncoding();
229+
scanMessageData(data);
241230

242231
modified = false;
243-
messageDataScanned = MessageDataScanningStatus.RELOAD_PERSISTENCE.code;
244-
}
245-
246-
private void recoverHeaderDataFromEncoding() {
247-
final DecoderImpl decoder = TLSEncode.getDecoder();
248-
decoder.setBuffer(data);
249-
250-
try {
251-
// At one point the broker could write the header and delivery annotations out of order
252-
// which means a full scan is required for maximum compatibility with that older data
253-
// where delivery annotations could be found ahead of the Header in the encoding.
254-
//
255-
// We manually extract the priority from the Header encoding if present to ensure we do
256-
// not create any unneeded GC overhead during load from storage. We don't directly store
257-
// other values from the header except for a value that is computed based on TTL and or
258-
// absolute expiration time in the Properties section, but that value is stored in the
259-
// data of the persisted message.
260-
for (int section = 0; section < 2 && data.hasRemaining(); section++) {
261-
final TypeConstructor<?> constructor = decoder.readConstructor();
262-
263-
if (Header.class.equals(constructor.getTypeClass())) {
264-
final byte typeCode = data.get();
265-
266-
@SuppressWarnings("unused")
267-
int size = 0;
268-
int count = 0;
269-
270-
switch (typeCode) {
271-
case EncodingCodes.LIST0:
272-
break;
273-
case EncodingCodes.LIST8:
274-
size = data.get() & 0xff;
275-
count = data.get() & 0xff;
276-
break;
277-
case EncodingCodes.LIST32:
278-
size = data.getInt();
279-
count = data.getInt();
280-
break;
281-
default:
282-
throw new DecodeException("Incorrect type found in Header encoding: " + typeCode);
283-
}
284-
285-
// Priority is stored in the second slot of the Header list encoding if present
286-
if (count >= 2) {
287-
decoder.readBoolean(false); // Discard durable for now, it is computed elsewhere.
288-
289-
final byte encodingCode = data.get();
290-
final int priority = switch (encodingCode) {
291-
case EncodingCodes.UBYTE -> data.get() & 0xff;
292-
case EncodingCodes.NULL -> DEFAULT_MESSAGE_PRIORITY;
293-
default ->
294-
throw new DecodeException("Expected UnsignedByte type but found encoding: " + EncodingCodes.toString(encodingCode));
295-
};
296-
297-
// Scaled here so do not call setPriority as that will store the set value in the AMQP header
298-
// and we don't want to create that Header instance at this stage.
299-
this.priority = (byte) Math.min(priority, MAX_MESSAGE_PRIORITY);
300-
}
301-
302-
return;
303-
} else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) {
304-
constructor.skipValue();
305-
} else {
306-
return;
307-
}
308-
}
309-
} finally {
310-
decoder.setBuffer(null);
311-
data.rewind(); // Ensure next scan start at the beginning.
312-
}
232+
messageDataScanned = MessageDataScanningStatus.SCANNED.code;
313233
}
314234

315235
@Override

artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -216,11 +216,11 @@ public void testHasScheduledDeliveryTimeReloadPersistence() {
216216
// Now reload from encoded data
217217
message.reloadPersistence(encoded, null);
218218

219-
assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
219+
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
220220

221221
assertTrue(message.hasScheduledDeliveryTime());
222222

223-
assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
223+
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
224224

225225
message.getHeader();
226226

@@ -249,11 +249,11 @@ public void testHasScheduledDeliveryDelayReloadPersistence() {
249249
// Now reload from encoded data
250250
message.reloadPersistence(encoded, null);
251251

252-
assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
252+
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
253253

254254
assertTrue(message.hasScheduledDeliveryTime());
255255

256-
assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
256+
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
257257

258258
message.getHeader();
259259

@@ -279,11 +279,11 @@ public void testNoScheduledDeliveryTimeOrDelayReloadPersistence() {
279279
// Now reload from encoded data
280280
message.reloadPersistence(encoded, null);
281281

282-
assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
282+
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
283283

284284
assertFalse(message.hasScheduledDeliveryTime());
285285

286-
assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
286+
assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
287287

288288
message.getHeader();
289289

@@ -331,12 +331,7 @@ private void testGetMemoryEstimateWithDecodedApplicationProperties(boolean paged
331331
}
332332

333333
assertEquals(TEST_APPLICATION_PROPERTY_VALUE, decodedWithApplicationPropertiesUnmarshalled.getStringProperty(TEST_APPLICATION_PROPERTY_KEY));
334-
335-
if (paged) {
336-
assertEquals(decodedWithApplicationPropertiesUnmarshalled.getMemoryEstimate(), decoded.getMemoryEstimate());
337-
} else {
338-
assertNotEquals(decodedWithApplicationPropertiesUnmarshalled.getMemoryEstimate(), decoded.getMemoryEstimate());
339-
}
334+
assertEquals(decodedWithApplicationPropertiesUnmarshalled.getMemoryEstimate(), decoded.getMemoryEstimate());
340335
}
341336

342337
//----- Test Connection ID access -----------------------------------------//

tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,10 +303,14 @@ protected void sendMessages(String destinationName,
303303
}
304304

305305
protected void sendMessages(String destinationName, int count, boolean durable) throws Exception {
306-
sendMessages(destinationName, count, durable, null);
306+
sendMessages(destinationName, count, durable, null, null);
307307
}
308308

309309
protected void sendMessages(String destinationName, int count, boolean durable, byte[] payload) throws Exception {
310+
sendMessages(destinationName, count, durable, payload, null);
311+
}
312+
313+
protected void sendMessages(String destinationName, int count, boolean durable, byte[] payload, Map<String, Object> properties) throws Exception {
310314
AmqpClient client = createAmqpClient();
311315
AmqpConnection connection = addConnection(client.connect());
312316
try {
@@ -320,6 +324,9 @@ protected void sendMessages(String destinationName, int count, boolean durable,
320324
if (payload != null) {
321325
message.setBytes(payload);
322326
}
327+
if (properties != null) {
328+
properties.forEach((a, b) -> message.setApplicationProperty(a, b));
329+
}
323330
sender.send(message);
324331
}
325332
} finally {

0 commit comments

Comments
 (0)