+ * We use this {@link PriorityQueue} to hold values until they are in order
+ */
+ final PriorityQueue This class will use the framework provided to by AbstractQueuedSynchronizer. AbstractQueuedSynchronizer is the framework for any sort of concurrent synchronization, such as Semaphores, events, etc, based on AtomicIntegers. This class works just like CountDownLatch, with the difference you can also increase the counter It could be used for sync points when one process is feeding the latch while another will wait when everything is done. (e.g. waiting IO completions to finish) On ActiveMQ Artemis we have the requirement of increment and decrement a counter until the user fires a ready event (commit). At that point we just act as a regular countDown. Note: This latch is reusable. Once it reaches zero, you can call up again, and reuse it on further waits. For example: prepareTransaction will wait for the current completions, and further adds will be called on the latch. Later on when commit is called you can reuse the same latch.
+ * It holds native data, and it will share a libaio queue that can be used by multiple files.
+ *
+ * You need to use the poll methods to read the result of write and read submissions.
+ *
+ * You also need to use the special buffer created by {@link LibaioFile} as you need special alignments
+ * when dealing with O_DIRECT files.
+ *
+ * A Single controller can server multiple files. There's no need to create one controller per file.
+ *
+ * Interesting reading for this.
+ */
+public class LibaioContext
+ * It is unsafe to close the controller while you have pending writes or files open as
+ * this could cause core dumps or VM crashes.
+ */
+ @Override
+ public void close() {
+ if (!closed.getAndSet(true)) {
+
+ if (ioSpace != null) {
+ try {
+ ioSpace.tryAcquire(queueSize, 10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ totalMaxIO.addAndGet(-queueSize);
+
+ if (ioContext != null) {
+ deleteContext(ioContext);
+ }
+ contexts.decrementAndGet();
+ checkShutdown();
+ }
+ }
+
+ /**
+ * It will open a file. If you set the direct flag = false then you won't need to use the special buffer.
+ * Notice: This will create an empty file if the file doesn't already exist.
+ *
+ * @param file the file to be open.
+ * @param direct will set ODIRECT.
+ * @return It will return a LibaioFile instance.
+ * @throws IOException in case of error.
+ */
+ public LibaioFile
+ * This shouldn't be called concurrently. You should provide your own synchronization if you need more than one
+ * Thread polling for any reason.
+ *
+ * Notice that the native layer will invoke {@link SubmitInfo#onError(int, String)} in case of failures,
+ * but it won't call done method for you.
+ *
+ * @param callbacks area to receive the callbacks passed on submission.The size of this callback has to
+ * be greater than the parameter max.
+ * @param min the minimum number of elements to receive. It will block until this is achieved.
+ * @param max The maximum number of elements to receive.
+ * @return Number of callbacks returned.
+ * @see LibaioFile#write(long, int, ByteBuffer, SubmitInfo)
+ * @see LibaioFile#read(long, int, ByteBuffer, SubmitInfo)
+ */
+ public int poll(Callback[] callbacks, int min, int max) {
+ int released = poll(ioContext, callbacks, min, max);
+ if (ioSpace != null) {
+ if (released > 0) {
+ ioSpace.release(released);
+ }
+ }
+ return released;
+ }
+
+ /**
+ * It will start polling and will keep doing until the context is closed.
+ * This will call callbacks on {@link SubmitInfo#onError(int, String)} and
+ * {@link SubmitInfo#done()}.
+ * In case of error, both {@link SubmitInfo#onError(int, String)} and
+ * {@link SubmitInfo#done()} are called.
+ */
+ public void poll() {
+ if (!closed.get()) {
+ blockedPoll(ioContext, useFdatasync);
+ }
+ }
+
+ private void releaseSemaphore() {
+ if (ioSpace != null) {
+ ioSpace.release();
+ }
+ }
+
+ /**
+ * This is the queue for libaio, initialized with queueSize.
+ */
+ private IOControl
+ * Documented at {@link LibaioFile#newBuffer(int)}.
+ *
+ * @param size needs to be % alignment
+ * @param alignment the alignment used at the dispositive
+ * @return a new native buffer used with posix_memalign
+ */
+ public static MemorySegment newAlignedBuffer(int size, int alignment) {
+ return FFMNativeHelper.newAlignedBuffer(size, alignment);
+ }
+
+ /**
+ * This will call posix free to release the inner buffer allocated at {@link #newAlignedBuffer(int, int)}.
+ *
+ * @param buffer a native buffer allocated with {@link #newAlignedBuffer(int, int)}.
+ */
+ public static void freeBuffer(MemorySegment buffer) {
+ FFMNativeHelper.freeBuffer(buffer);
+ }
+
+ /**
+ * Documented at {@link LibaioFile#write(long, int, ByteBuffer, SubmitInfo)}.
+ */
+ void submitWrite(int fd,
+ IOControl ioControl,
+ long position,
+ int size,
+ ByteBuffer bufferWrite,
+ Callback callback) throws IOException {
+ this.ffmNativeHelper.submitWrite(fd, ioControl, position, size, bufferWrite, callback);
+ }
+
+ /**
+ * Documented at {@link LibaioFile#read(long, int, ByteBuffer, SubmitInfo)}.
+ */
+ void submitRead(int fd,
+ IOControl ioControl,
+ long position,
+ int size,
+ ByteBuffer bufferWrite,
+ Callback callback) throws IOException {
+ this.ffmNativeHelper.submitRead(fd, ioControl, position, size, bufferWrite, callback);
+ }
+
+ /**
+ * Note: this shouldn't be done concurrently.
+ * This method will block until the min condition is satisfied on the poll.
+ *
+ * The callbacks will include the original callback sent at submit (read or write).
+ */
+ int poll(IOControl ioControl, Callback[] callbacks, int min, int max) {
+ return this.ffmNativeHelper.poll(ioControl, callbacks, min, max);
+ }
+
+ /**
+ * This method will block as long as the context is open.
+ */
+ void blockedPoll(IOControl
+ * Notice: this won't hold a global reference on buffer, callback should hold a reference towards bufferWrite.
+ * And don't free the buffer until the callback was called as this could crash the VM.
+ *
+ * @param position The position on the file to write. Notice this has to be a multiple of 512.
+ * @param size The size of the buffer to use while writing.
+ * @param buffer if you are using O_DIRECT the buffer here needs to be allocated by {@link #newBuffer(int)}.
+ * @param callback A callback to be returned on the poll method.
+ * @throws IOException in case of error
+ */
+ public void write(long position, int size, ByteBuffer buffer, Callback callback) throws IOException {
+ Objects.requireNonNull(callback, "Callback cannot be null");
+ ctx.submitWrite(fd, position, size, buffer, callback);
+ }
+
+ /**
+ * It will submit a read to the queue. The callback sent here will be received on the
+ * {@link LibaioContext#poll(SubmitInfo[], int, int)}.
+ * In case of the libaio queue is full (e.g. returning E_AGAIN) this method will return false.
+ *
+ * Notice: this won't hold a global reference on buffer, callback should hold a reference towards bufferWrite.
+ * And don't free the buffer until the callback was called as this could crash the VM.
+ * *
+ *
+ * @param position The position on the file to read. Notice this has to be a multiple of 512.
+ * @param size The size of the buffer to use while reading.
+ * @param buffer if you are using O_DIRECT the buffer here needs to be allocated by {@link #newBuffer(int)}.
+ * @param callback A callback to be returned on the poll method.
+ * @throws IOException in case of error
+ * @see LibaioContext#poll(SubmitInfo[], int, int)
+ */
+ public void read(long position, int size, ByteBuffer buffer, Callback callback) throws IOException {
+ Objects.requireNonNull(callback, "Callback cannot be null");
+ ctx.submitRead(fd, position, size, buffer, callback);
+ }
+
+ /**
+ * It will allocate a buffer to be used on libaio operations.
+ * Buffers here are allocated with posix_memalign.
+ *
+ * You need to explicitly free the buffer created from here using the
+ * {@link LibaioContext#freeBuffer(MemorySegment)}.
+ *
+ * @param size the size of the buffer.
+ * @return the buffer allocated.
+ */
+ public MemorySegment newBuffer(int size) {
+ return LibaioContext.newAlignedBuffer(size, 4 * 1024);
+ }
+
+ /**
+ * It will preallocate the file with a given size.
+ *
+ * @param size number of bytes to be filled on the file
+ */
+ public void fill(int alignment, long size) throws IOException {
+ try {
+ LibaioContext.fill(fd, alignment, size);
+ } catch (OutOfMemoryError e) {
+ logger.warn("Did not have enough memory to allocate " + size + " bytes in memory while filling the file, using simple fallocate");
+ LibaioContext.fallocate(fd, size);
+ }
+ }
+
+ /**
+ * It will use fallocate to initialize a file.
+ *
+ * @param size number of bytes to be filled on the file
+ */
+ public void fallocate(long size) throws IOException {
+ LibaioContext.fallocate(fd, size);
+ }
+
+}
diff --git a/artemis-journal/src/main/java24/org/apache/artemis/nativo/jlibaio/ffm/AIORing.java b/artemis-journal/src/main/java24/org/apache/artemis/nativo/jlibaio/ffm/AIORing.java
new file mode 100644
index 00000000000..fcf54a08a31
--- /dev/null
+++ b/artemis-journal/src/main/java24/org/apache/artemis/nativo/jlibaio/ffm/AIORing.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.artemis.nativo.jlibaio.ffm;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.foreign.MemoryLayout;
+import java.lang.foreign.MemorySegment;
+import java.lang.foreign.StructLayout;
+import java.lang.foreign.ValueLayout;
+import java.lang.invoke.VarHandle;
+
+import static org.apache.artemis.nativo.jlibaio.ffm.Constants.AIO_RING_INCOMPAT_FEATURES;
+import static org.apache.artemis.nativo.jlibaio.ffm.Constants.AIO_RING_MAGIC;
+import static org.apache.artemis.nativo.jlibaio.ffm.IOEvent.IO_EVENT_LAYOUT;
+
+public class AIORing {
+
+ private static final Logger logger = LoggerFactory.getLogger(AIORing.class);
+
+ /**
+ * There is no defined aio_ring anywhere in an include,
+ * This is an implementation detail, that is a binary contract.
+ * it is safe to use the feature though.
+ */
+ static final StructLayout AIO_RING_LAYOUT = MemoryLayout.structLayout(
+ // Fixed header (32 bytes)
+ ValueLayout.JAVA_INT.withName("id"), /* kernel internal index number */
+ ValueLayout.JAVA_INT.withName("nr"), /* number of io_events */
+ ValueLayout.JAVA_INT.withName("head"), ValueLayout.JAVA_INT.withName("tail"), ValueLayout.JAVA_INT.withName("magic"), ValueLayout.JAVA_INT.withName("compat_features"), ValueLayout.JAVA_INT.withName("incompat_features"), ValueLayout.JAVA_INT.withName("header_length") /* size of aio_ring */).withName("aio_ring");
+
+ public static final long AIO_RING_HEADER_SIZE = AIO_RING_LAYOUT.byteSize();
+
+ public static final VarHandle AIO_RING_NR_VH = AIO_RING_LAYOUT.varHandle(MemoryLayout.PathElement.groupElement("nr"));
+ public static final VarHandle AIO_RING_HEAD_VH = AIO_RING_LAYOUT.varHandle(MemoryLayout.PathElement.groupElement("head"));
+ public static final VarHandle AIO_RING_TAIL_VH = AIO_RING_LAYOUT.varHandle(MemoryLayout.PathElement.groupElement("tail"));
+ public static final VarHandle AIO_RING_MAGIC_VH = AIO_RING_LAYOUT.varHandle(MemoryLayout.PathElement.groupElement("magic"));
+ public static final VarHandle AIO_RING_INCOMPAT_FEATURES_VH = AIO_RING_LAYOUT.varHandle(MemoryLayout.PathElement.groupElement("incompat_features"));
+
+ // Check if the implementation supports AIO_RING by checking this number directly.
+ public static boolean hasUsableRing(MemorySegment ring) {
+ if (ring == null || ring.address() == 0L || ring.byteSize() < AIO_RING_HEADER_SIZE) {
+ return false;
+ }
+
+ MemorySegment header = ring.asSlice(0, AIO_RING_HEADER_SIZE);
+ int magic = (int) AIO_RING_MAGIC_VH.getAcquire(header, 0L);
+ int incompat = (int) AIO_RING_INCOMPAT_FEATURES_VH.getAcquire(header, 0L);
+ int nr = (int) AIO_RING_NR_VH.getAcquire(header, 0L);
+ if (logger.isTraceEnabled()) {
+ logger.trace("nr={}, magic={}, incompat={}", nr, magic, incompat);
+ }
+
+ return magic == AIO_RING_MAGIC && incompat == AIO_RING_INCOMPAT_FEATURES && nr > 0;
+ }
+
+ // Newer versions of the kernel (newer here being a relative word, a couple years already at the time
+ // I am writing this), will have io_context_t as an opaque type, and the real type being the aio_ring.
+ public static MemorySegment toAioRing(MemorySegment aioCtx) {
+ if (aioCtx == null || aioCtx.address() == 0L) {
+ return MemorySegment.NULL;
+ }
+
+ MemorySegment header = aioCtx.reinterpret(AIO_RING_HEADER_SIZE);
+
+ if (!hasUsableRing(header)) {
+ return MemorySegment.NULL;
+ }
+
+ int nr = (int) AIO_RING_NR_VH.getAcquire(header, 0L);
+ long eventBytesize = IO_EVENT_LAYOUT.byteSize();
+ long fullSize;
+
+ try {
+ fullSize = Math.addExact(AIO_RING_HEADER_SIZE, Math.multiplyExact((long) nr, eventBytesize));
+ } catch (ArithmeticException e) {
+ logger.warn("toAioRing: overflow computing ring size (nr={}, eventBytes={})", nr, eventBytesize);
+ return MemorySegment.NULL;
+ }
+
+ if (fullSize <= AIO_RING_HEADER_SIZE) {
+ return MemorySegment.NULL;
+ }
+
+ return aioCtx.reinterpret(fullSize);
+ }
+}
diff --git a/artemis-journal/src/main/java24/org/apache/artemis/nativo/jlibaio/ffm/Constants.java b/artemis-journal/src/main/java24/org/apache/artemis/nativo/jlibaio/ffm/Constants.java
new file mode 100644
index 00000000000..cf719c67494
--- /dev/null
+++ b/artemis-journal/src/main/java24/org/apache/artemis/nativo/jlibaio/ffm/Constants.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.artemis.nativo.jlibaio.ffm;
+
+public final class Constants {
+
+ private Constants() {
+ }
+
+ static final long ONE_MEGA = 1048576L;
+
+ //These should be used to check if the user-space io_getevents is supported:
+ //Linux ABI for the ring buffer: https://elixir.bootlin.com/linux/v4.20.13/source/fs/aio.c#L54
+ //aio_read_events_ring: https://elixir.bootlin.com/linux/v4.20.13/source/fs/aio.c#L1148
+
+ // NOTE: if the kernel ever updates the structure, the RING-MAGIC will change and the code will switch back to normal IO calls
+ static final int AIO_RING_MAGIC = 0xa10a10a1;
+ static final int AIO_RING_INCOMPAT_FEATURES = 0;
+
+ // set this to false if you want to stop using ring reaping
+ static final boolean RING_REAPER = true;
+
+ static final int PERMISSION_MODE = 0666;
+ static final int O_RDWR = 0x0002;
+ static final int O_CREAT = 0x0040;
+ static final int O_DIRECT;
+
+ static final int LOCK_EX = 2; // Exclusive lock
+ static final int LOCK_NB = 4; // Non-blocking lock
+
+ static {
+ O_DIRECT = detectODirectFlag();
+ }
+
+ /*
+ * Detecting OS Architecture and setting O_DIRECT
+ *
+ * */
+ private static int detectODirectFlag() {
+ String arch = System.getProperty("os.arch");
+ if ("aarch64".equals(arch) || "arm64".equals(arch) || "arm".equals(arch)) {
+ return 0x10000;
+ } else if ("ppc64le".equals(arch) || "ppc64".equals(arch) || "ppc".equals(arch)) {
+ return 0x8000;
+ }
+ // amd64, x86_64
+ return 0x4000;
+ }
+}
diff --git a/artemis-journal/src/main/java24/org/apache/artemis/nativo/jlibaio/ffm/FFMHandles.java b/artemis-journal/src/main/java24/org/apache/artemis/nativo/jlibaio/ffm/FFMHandles.java
new file mode 100644
index 00000000000..b04306e7837
--- /dev/null
+++ b/artemis-journal/src/main/java24/org/apache/artemis/nativo/jlibaio/ffm/FFMHandles.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.artemis.nativo.jlibaio.ffm;
+
+import java.lang.foreign.Arena;
+import java.lang.foreign.FunctionDescriptor;
+import java.lang.foreign.Linker;
+import java.lang.foreign.MemoryLayout;
+import java.lang.foreign.StructLayout;
+import java.lang.foreign.SymbolLookup;
+import java.lang.foreign.ValueLayout;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.VarHandle;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FFMHandles {
+
+ private static final Logger logger = LoggerFactory.getLogger(FFMHandles.class);
+ static final Linker LINKER = Linker.nativeLinker();
+ static final SymbolLookup STDLIB = setStdLib();
+ public static final SymbolLookup LIBAIO = setLibaio();
+
+ static final ReentrantLock oneMegaMutex = new ReentrantLock();
+
+ static final StructLayout CAPTURE_STATE_LAYOUT = Linker.Option.captureStateLayout();
+ static final VarHandle ERRNO_VH = CAPTURE_STATE_LAYOUT.varHandle(MemoryLayout.PathElement.groupElement("errno"));
+
+ private static final Linker.Option captureCallState = Linker.Option.captureCallState("errno");
+
+ static final MethodHandle WRITE_HANDLE = LINKER.downcallHandle(STDLIB.find("write").orElseThrow(() -> new UnsatisfiedLinkError("write not found in STDLIB")), FunctionDescriptor.of(ValueLayout.JAVA_LONG, ValueLayout.JAVA_INT, ValueLayout.ADDRESS, ValueLayout.JAVA_LONG), captureCallState);
+
+ static final MethodHandle OPEN_HANDLE = LINKER.downcallHandle(STDLIB.find("open").orElseThrow(() -> new UnsatisfiedLinkError("open not found in STDLIB")), FunctionDescriptor.of(ValueLayout.JAVA_INT, ValueLayout.ADDRESS, // pathName
+ ValueLayout.JAVA_INT, // flags
+ ValueLayout.JAVA_INT), // mode
+ captureCallState);
+
+ static final MethodHandle CLOSE_HANDLE = LINKER.downcallHandle(STDLIB.find("close").orElseThrow(() -> new UnsatisfiedLinkError("close not found in STDLIB")), FunctionDescriptor.of(ValueLayout.JAVA_INT, ValueLayout.JAVA_INT), captureCallState);
+
+ static final MethodHandle FALLOCATE_HANDLE = LINKER.downcallHandle(STDLIB.find("fallocate").orElseThrow(() -> new UnsatisfiedLinkError("fallocate not found in STDLIB")), FunctionDescriptor.of(ValueLayout.JAVA_INT, ValueLayout.JAVA_INT, ValueLayout.JAVA_INT, ValueLayout.JAVA_LONG, ValueLayout.JAVA_LONG), captureCallState);
+
+ static final MethodHandle FSYNC_HANDLE = LINKER.downcallHandle(STDLIB.find("fsync").orElseThrow(() -> new UnsatisfiedLinkError("fsync not found in STDLIB")), FunctionDescriptor.of(ValueLayout.JAVA_INT, ValueLayout.JAVA_INT), captureCallState);
+
+ static final MethodHandle LSEEK_HANDLE = LINKER.downcallHandle(STDLIB.find("lseek").orElseThrow(() -> new UnsatisfiedLinkError("lseek not found in STDLIB")), FunctionDescriptor.of(ValueLayout.JAVA_LONG, ValueLayout.JAVA_INT, ValueLayout.JAVA_LONG, ValueLayout.JAVA_INT), captureCallState);
+
+ static final MethodHandle FSTAT_HANDLE = LINKER.downcallHandle(STDLIB.find("fstat").orElseThrow(() -> new UnsatisfiedLinkError("fstat not found in STDLIB")), FunctionDescriptor.of(ValueLayout.JAVA_INT, // return
+ ValueLayout.JAVA_INT, // fd
+ ValueLayout.ADDRESS), // struct stat
+ captureCallState);
+
+ // for x86_64 - stat
+ static final MethodHandle STAT_HANDLE = LINKER.downcallHandle(STDLIB.find("stat").orElseThrow(() -> new UnsatisfiedLinkError("stat not found in STDLIB")), FunctionDescriptor.of(ValueLayout.JAVA_INT, // return
+ ValueLayout.ADDRESS, // pathname
+ ValueLayout.ADDRESS), // struct stat
+ captureCallState);
+
+ static final MethodHandle IO_GETEVENTS_HANDLE = LINKER.downcallHandle(LIBAIO.find("io_getevents").orElseThrow(() -> new UnsatisfiedLinkError("io_getevents not found in LIBAIO")), FunctionDescriptor.of(ValueLayout.JAVA_INT, ValueLayout.ADDRESS, ValueLayout.JAVA_LONG, ValueLayout.JAVA_LONG, ValueLayout.ADDRESS, ValueLayout.ADDRESS), captureCallState);
+
+ static final MethodHandle IO_SUBMIT_HANDLE = LINKER.downcallHandle(LIBAIO.find("io_submit").orElseThrow(() -> new UnsatisfiedLinkError("io_submit not found in LIBAIO")), FunctionDescriptor.of(ValueLayout.JAVA_INT, ValueLayout.ADDRESS, ValueLayout.JAVA_LONG, ValueLayout.ADDRESS)).asFixedArity();
+
+ static final MethodHandle FREE_BUF_HANDLE = LINKER.downcallHandle(STDLIB.find("free").orElseThrow(() -> new UnsatisfiedLinkError("free not found in STDLIB")), FunctionDescriptor.ofVoid(ValueLayout.ADDRESS));
+
+ static final MethodHandle FLOCK_HANDLE = LINKER.downcallHandle(STDLIB.find("flock").orElseThrow(() -> new UnsatisfiedLinkError("flock not found in STDLIB")), FunctionDescriptor.of(ValueLayout.JAVA_INT, ValueLayout.JAVA_INT, ValueLayout.JAVA_INT), captureCallState);
+
+ static final MethodHandle IO_QUEUE_INIT_HANDLE = LINKER.downcallHandle(LIBAIO.find("io_queue_init").orElseThrow(() -> new UnsatisfiedLinkError("io_queue_init not found in LIBAIO")), FunctionDescriptor.of(ValueLayout.JAVA_INT, ValueLayout.JAVA_INT, ValueLayout.ADDRESS), captureCallState);
+
+ static final MethodHandle IO_QUEUE_RELEASE_HANDLE = LINKER.downcallHandle(LIBAIO.find("io_queue_release").orElseThrow(() -> new UnsatisfiedLinkError("io_queue_release not found in LIBAIO")), FunctionDescriptor.of(ValueLayout.JAVA_INT, ValueLayout.ADDRESS), captureCallState);
+
+ static final MethodHandle FDATASYNC_HANDLE = LINKER.downcallHandle(STDLIB.find("fdatasync").orElseThrow(() -> new UnsatisfiedLinkError("fdatasync not found in STDLIB")), FunctionDescriptor.of(ValueLayout.JAVA_INT, ValueLayout.JAVA_INT), captureCallState);
+
+ static final MethodHandle MEMSET_HANDLE = LINKER.downcallHandle(STDLIB.find("memset").orElseThrow(() -> new UnsatisfiedLinkError("memset not found in STDLIB")), FunctionDescriptor.of(ValueLayout.ADDRESS, ValueLayout.ADDRESS, ValueLayout.JAVA_INT, ValueLayout.JAVA_LONG));
+
+ static final MethodHandle POSIX_MEMALIGN_HANDLE = LINKER.downcallHandle(STDLIB.find("posix_memalign").orElseThrow(() -> new UnsatisfiedLinkError("posix_memalign not found in STDLIB")), FunctionDescriptor.of(ValueLayout.JAVA_INT, ValueLayout.ADDRESS, ValueLayout.JAVA_LONG, ValueLayout.JAVA_LONG), Linker.Option.captureCallState("errno"));
+
+ private static SymbolLookup setStdLib() {
+ String[] libcPaths = {"/lib64/libc.so.6", "/usr/lib64/libc.so.6", "/lib/x86_64-linux-gnu/libc.so.6", "libc.so.6"};
+ for (String path : libcPaths) {
+ try {
+ SymbolLookup loopup = SymbolLookup.libraryLookup(path, Arena.global());
+ if (loopup != null) {
+ logger.info("libc.so.6 found at {}", path);
+ return loopup;
+ }
+ } catch (IllegalArgumentException | SecurityException e) {
+ logger.warn("libc.so.6 not found", e);
+ }
+ }
+ logger.warn("libc.so.6 not found");
+ throw new RuntimeException("libc.so.6 not found");
+ }
+
+ private static SymbolLookup setLibaio() {
+ String[] paths = {System.getProperty("libaio.path"), "/usr/lib64/libaio.so.1", "/usr/lib/x86_64-linux-gnu/libaio.so.1", "/lib64/libaio.so.1", "/usr/lib/libaio.so.1", "libaio.so.1"};
+ for (String path : paths) {
+ if (path != null && !path.isEmpty()) {
+ try {
+ SymbolLookup lookup = SymbolLookup.libraryLookup(path, Arena.global());
+ if (lookup != null) {
+ logger.info("libaio.so.1 found at {}", path);
+ return lookup;
+ }
+ } catch (IllegalArgumentException | SecurityException e) {
+ logger.warn("libaio.so.1 not found", e);
+ }
+ }
+ }
+ logger.warn("libaio.so.1 not found");
+ throw new RuntimeException("libaio.so.1 not found");
+ }
+}
diff --git a/artemis-journal/src/main/java24/org/apache/artemis/nativo/jlibaio/ffm/FFMNativeHelper.java b/artemis-journal/src/main/java24/org/apache/artemis/nativo/jlibaio/ffm/FFMNativeHelper.java
new file mode 100644
index 00000000000..1959a22d318
--- /dev/null
+++ b/artemis-journal/src/main/java24/org/apache/artemis/nativo/jlibaio/ffm/FFMNativeHelper.java
@@ -0,0 +1,1134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.artemis.nativo.jlibaio.ffm;
+
+import org.apache.artemis.nativo.jlibaio.SubmitInfo;
+import org.apache.artemis.nativo.jlibaio.ffm.ReleaseCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.foreign.Arena;
+import java.lang.foreign.MemorySegment;
+import java.lang.foreign.ValueLayout;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.artemis.nativo.jlibaio.ffm.AIORing.AIO_RING_HEADER_SIZE;
+import static org.apache.artemis.nativo.jlibaio.ffm.AIORing.AIO_RING_HEAD_VH;
+import static org.apache.artemis.nativo.jlibaio.ffm.AIORing.AIO_RING_NR_VH;
+import static org.apache.artemis.nativo.jlibaio.ffm.AIORing.AIO_RING_TAIL_VH;
+import static org.apache.artemis.nativo.jlibaio.ffm.AIORing.hasUsableRing;
+import static org.apache.artemis.nativo.jlibaio.ffm.AIORing.toAioRing;
+import static org.apache.artemis.nativo.jlibaio.ffm.Constants.LOCK_EX;
+import static org.apache.artemis.nativo.jlibaio.ffm.Constants.LOCK_NB;
+import static org.apache.artemis.nativo.jlibaio.ffm.Constants.ONE_MEGA;
+import static org.apache.artemis.nativo.jlibaio.ffm.Constants.O_CREAT;
+import static org.apache.artemis.nativo.jlibaio.ffm.Constants.O_DIRECT;
+import static org.apache.artemis.nativo.jlibaio.ffm.Constants.O_RDWR;
+import static org.apache.artemis.nativo.jlibaio.ffm.Constants.PERMISSION_MODE;
+import static org.apache.artemis.nativo.jlibaio.ffm.Constants.RING_REAPER;
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.CAPTURE_STATE_LAYOUT;
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.CLOSE_HANDLE;
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.ERRNO_VH;
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.FALLOCATE_HANDLE;
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.FDATASYNC_HANDLE;
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.FLOCK_HANDLE;
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.FREE_BUF_HANDLE;
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.FSTAT_HANDLE;
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.FSYNC_HANDLE;
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.IO_GETEVENTS_HANDLE;
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.IO_QUEUE_INIT_HANDLE;
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.IO_QUEUE_RELEASE_HANDLE;
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.IO_SUBMIT_HANDLE;
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.LSEEK_HANDLE;
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.MEMSET_HANDLE;
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.OPEN_HANDLE;
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.POSIX_MEMALIGN_HANDLE;
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.STAT_HANDLE;
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.WRITE_HANDLE;
+import static org.apache.artemis.nativo.jlibaio.ffm.FFMHandles.oneMegaMutex;
+import static org.apache.artemis.nativo.jlibaio.ffm.IOCBInit.IOCB_LAYOUT_SIZE;
+import static org.apache.artemis.nativo.jlibaio.ffm.IOEvent.IO_EVENT_LAYOUT;
+import static org.apache.artemis.nativo.jlibaio.ffm.Stat.STAT_LAYOUT;
+
+public class FFMNativeHelper