diff --git a/.build/build-bench.xml b/.build/build-bench.xml
index 4ea0ff571e7d..7d6f564bbc0e 100644
--- a/.build/build-bench.xml
+++ b/.build/build-bench.xml
@@ -93,6 +93,7 @@
+
diff --git a/src/java/org/apache/cassandra/concurrent/CassandraThread.java b/src/java/org/apache/cassandra/concurrent/CassandraThread.java
new file mode 100644
index 000000000000..4b9f7c9eca28
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/CassandraThread.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import org.apache.cassandra.metrics.ThreadLocalMetrics;
+
+import io.netty.util.concurrent.FastThreadLocalThread;
+
+public class CassandraThread extends FastThreadLocalThread
+{
+ private ThreadLocalMetrics threadLocalMetrics;
+ private ExecutorLocals executorLocals;
+
+ public CassandraThread(ThreadGroup group, Runnable target, String name)
+ {
+ super(group, target, name);
+ }
+
+ public CassandraThread()
+ {
+ super();
+ }
+
+ public ThreadLocalMetrics getThreadLocalMetrics()
+ {
+ ThreadLocalMetrics current = threadLocalMetrics;
+ if (current != null)
+ return current;
+
+ threadLocalMetrics = ThreadLocalMetrics.create();
+ return threadLocalMetrics;
+ }
+
+ public ExecutorLocals getExecutorLocals()
+ {
+ ExecutorLocals current = executorLocals;
+ if (current != null)
+ return current;
+
+ executorLocals = ExecutorLocals.none();
+ return executorLocals;
+ }
+
+ public void setExecutorLocals(ExecutorLocals executorLocals)
+ {
+ this.executorLocals = executorLocals;
+ }
+
+ public ExecutorLocals replaceExecutorLocals(ExecutorLocals newExecutorLocals)
+ {
+ ExecutorLocals current = executorLocals;
+ if (current != newExecutorLocals)
+ executorLocals = newExecutorLocals;
+
+ return current != null ? current : ExecutorLocals.none();
+ }
+
+ public void doRun()
+ {
+ }
+
+
+ // final to avoid skipping of the cleanup logic
+ final public void run()
+ {
+ try
+ {
+ super.run();
+ doRun();
+ }
+ finally
+ {
+ if (threadLocalMetrics != null)
+ threadLocalMetrics.release();
+ }
+
+ }
+}
diff --git a/src/java/org/apache/cassandra/concurrent/CassandraThreadFactory.java b/src/java/org/apache/cassandra/concurrent/CassandraThreadFactory.java
new file mode 100644
index 000000000000..d7fd907bc0f4
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/CassandraThreadFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+public class CassandraThreadFactory extends DefaultThreadFactory
+{
+ public CassandraThreadFactory(String poolName, boolean daemon)
+ {
+ super(poolName, daemon);
+ }
+
+ protected Thread newThread(Runnable r, String name)
+ {
+ return new CassandraThread(this.threadGroup, r, name);
+ }
+}
diff --git a/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java b/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java
index 55c6ee24642a..b4b7f094cc0d 100644
--- a/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java
+++ b/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java
@@ -48,8 +48,8 @@ public static class Impl
@SuppressWarnings("resource")
protected static void set(TraceState traceState, ClientWarn.State clientWarnState, boolean eligibleForArtificialLatency)
{
- if (traceState == null && clientWarnState == null && !eligibleForArtificialLatency) locals.set(none);
- else locals.set(new ExecutorLocals(traceState, clientWarnState, eligibleForArtificialLatency));
+ if (traceState == null && clientWarnState == null && !eligibleForArtificialLatency) setLocal(none);
+ else setLocal(new ExecutorLocals(traceState, clientWarnState, eligibleForArtificialLatency));
}
}
@@ -64,12 +64,46 @@ protected ExecutorLocals(TraceState traceState, ClientWarn.State clientWarnState
this.eligibleForArtificialLatency = eligibleForArtificialLatency;
}
+ private static void setLocal(ExecutorLocals executorLocals)
+ {
+ Thread currentThread = Thread.currentThread();
+ if (currentThread instanceof CassandraThread)
+ ((CassandraThread) currentThread).setExecutorLocals(executorLocals);
+ else
+ locals.set(executorLocals);
+ }
+
+ private ExecutorLocals replace()
+ {
+ Thread currentThread = Thread.currentThread();
+ if (currentThread instanceof CassandraThread)
+ {
+ return ((CassandraThread) currentThread).replaceExecutorLocals(this);
+ }
+ else
+ {
+ ExecutorLocals old = locals.get();
+ if (old != this)
+ locals.set(this);
+ return old;
+ }
+ }
+
/**
* @return an ExecutorLocals object which has the current trace state and client warn state.
*/
public static ExecutorLocals current()
{
- return locals.get();
+ Thread currentThread = Thread.currentThread();
+ if (currentThread instanceof CassandraThread)
+ return ((CassandraThread) currentThread).getExecutorLocals();
+ else
+ return locals.get();
+ }
+
+ public static ExecutorLocals none()
+ {
+ return none;
}
/**
@@ -84,13 +118,13 @@ public static WithResources propagate()
public static ExecutorLocals create(TraceState traceState)
{
- ExecutorLocals current = locals.get();
+ ExecutorLocals current = current();
return current.traceState == traceState ? current : new ExecutorLocals(traceState, current.clientWarnState, current.eligibleForArtificialLatency);
}
public static void clear()
{
- locals.set(none);
+ setLocal(none);
}
/**
@@ -98,15 +132,11 @@ public static void clear()
*/
public ExecutorLocals get()
{
- // TODO (desired): add compareAndSet to save one thread local round trip
- ExecutorLocals old = current();
- if (old != this)
- locals.set(this);
- return old;
+ return replace();
}
public void close()
{
- locals.set(this);
+ setLocal(this);
}
}
diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
index de43f33e7ebe..e2b2c970d1e5 100644
--- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
@@ -26,8 +26,6 @@
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.utils.JVMStabilityInspector;
-import io.netty.util.concurrent.FastThreadLocalThread;
-
/**
* This class is an implementation of the ThreadFactory interface. This
* is useful to give Java threads meaningful names which is useful when using
@@ -169,12 +167,12 @@ public static Thread createThread(ThreadGroup threadGroup, Runnable runnable, St
if (PRESERVE_THREAD_CREATION_STACKTRACE)
thread = new InspectableFastThreadLocalThread(threadGroup, runnable, threadName);
else
- thread = new FastThreadLocalThread(threadGroup, runnable, threadName);
+ thread = new CassandraThread(threadGroup, runnable, threadName);
thread.setDaemon(daemon);
return thread;
}
- public static class InspectableFastThreadLocalThread extends FastThreadLocalThread
+ public static class InspectableFastThreadLocalThread extends CassandraThread
{
public StackTraceElement[] creationTrace;
@@ -184,22 +182,8 @@ private void setStack()
creationTrace = Arrays.copyOfRange(creationTrace, 2, creationTrace.length);
}
- public InspectableFastThreadLocalThread() { super(); setStack(); }
-
- public InspectableFastThreadLocalThread(Runnable target) { super(target); setStack(); }
-
- public InspectableFastThreadLocalThread(ThreadGroup group, Runnable target) { super(group, target); setStack(); }
-
- public InspectableFastThreadLocalThread(String name) { super(name); setStack(); }
-
- public InspectableFastThreadLocalThread(ThreadGroup group, String name) { super(group, name); setStack(); }
-
- public InspectableFastThreadLocalThread(Runnable target, String name) { super(target, name); setStack(); }
-
public InspectableFastThreadLocalThread(ThreadGroup group, Runnable target, String name) { super(group, target, name); setStack(); }
- public InspectableFastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) { super(group, target, name, stackSize); setStack(); }
-
}
public static T setupThread(T thread, int priority, ClassLoader contextClassLoader, Thread.UncaughtExceptionHandler uncaughtExceptionHandler)
{
diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
index 8f1a5e9aa285..8eb34303e27d 100644
--- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java
+++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
@@ -27,8 +27,6 @@
import org.apache.cassandra.utils.JVMStabilityInspector;
-import io.netty.util.concurrent.FastThreadLocalThread;
-
import static org.apache.cassandra.concurrent.SEPExecutor.TakeTaskPermitResult.RETURNED_WORK_PERMIT;
import static org.apache.cassandra.concurrent.SEPExecutor.TakeTaskPermitResult.TOOK_PERMIT;
import static org.apache.cassandra.config.CassandraRelevantProperties.SET_SEP_THREAD_NAME;
@@ -60,7 +58,7 @@ final class SEPWorker extends AtomicReference implements Runnabl
this.pool = pool;
this.workerId = workerId;
this.workerIdThreadSuffix = '-' + workerId.toString();
- thread = new FastThreadLocalThread(threadGroup, this, threadGroup.getName() + "-Worker-" + workerId);
+ thread = new CassandraThread(threadGroup, this, threadGroup.getName() + "-Worker-" + workerId);
thread.setDaemon(true);
set(initialState);
thread.start();
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 8c9b2f979621..7cf48d30ebb8 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -26,6 +26,7 @@
import com.google.common.base.Throwables;
+import org.apache.cassandra.concurrent.CassandraThread;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.SerializationHeader;
@@ -40,8 +41,6 @@
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
-import io.netty.util.concurrent.FastThreadLocalThread;
-
import static org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQueue;
/**
@@ -210,11 +209,11 @@ public static class SyncException extends RuntimeException
//// typedef
static class Buffer extends TreeMap {}
- private class DiskWriter extends FastThreadLocalThread
+ private class DiskWriter extends CassandraThread
{
volatile Throwable exception = null;
- public void run()
+ public void doRun()
{
while (true)
{
diff --git a/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java b/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java
index 0135522ac051..a7eaa6b42905 100644
--- a/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java
@@ -36,6 +36,7 @@
import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.concurrent.CassandraThread;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Shutdownable;
@@ -84,10 +85,7 @@ public class ThreadLocalMetrics
@Override
protected ThreadLocalMetrics initialValue()
{
- ThreadLocalMetrics result = new ThreadLocalMetrics();
- allThreadLocalMetrics.add(result);
- destroyWhenUnreachable(Thread.currentThread(), result::release);
- return result;
+ return create();
}
// this method is invoked when a thread is going to finish, but it works only for FastThreadLocalThread
@@ -99,6 +97,14 @@ protected void onRemoval(ThreadLocalMetrics value)
}
};
+ public static ThreadLocalMetrics create()
+ {
+ ThreadLocalMetrics result = new ThreadLocalMetrics();
+ allThreadLocalMetrics.add(result);
+ destroyWhenUnreachable(Thread.currentThread(), result::release);
+ return result;
+ }
+
private static volatile AtomicLongArray summaryValues = new AtomicLongArray(INITIAL_COUNTERS_CAPACITY);
private static final Shutdownable cleaner;
@@ -182,7 +188,7 @@ public static void shutdownCleaner(long timeout, TimeUnit unit) throws Interrupt
shutdownAndWait(timeout, unit, of(cleaner));
}
- private void release()
+ public void release()
{
// Using this lock while moving we want to avoid races with readers in getCount
// such races can cause a transfered value lost or its double-counting by a reader
@@ -275,6 +281,9 @@ public static long getCountAndReset(int metricId)
}
public static ThreadLocalMetrics get() {
+ Thread currentThread = Thread.currentThread();
+ if (currentThread instanceof CassandraThread)
+ return ((CassandraThread)currentThread).getThreadLocalMetrics();
return threadLocalMetricsCurrent.get();
}
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 5d1c005153b9..032e96a98786 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -50,6 +50,7 @@
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.UpdateBuilder;
import org.apache.cassandra.Util;
+import org.apache.cassandra.concurrent.CassandraThread;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
@@ -70,8 +71,6 @@
import org.apache.cassandra.security.EncryptionContext;
import org.apache.cassandra.security.EncryptionContextGenerator;
-import io.netty.util.concurrent.FastThreadLocalThread;
-
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
@Ignore
@@ -403,7 +402,7 @@ private static ByteBuffer randomBytes(int quantity, Random tlr)
return slice;
}
- public class CommitlogThread extends FastThreadLocalThread
+ public class CommitlogThread extends CassandraThread
{
final AtomicLong counter = new AtomicLong();
int hash = 0;
@@ -421,7 +420,7 @@ public class CommitlogThread extends FastThreadLocalThread
this.random = rand;
}
- public void run()
+ public void doRun()
{
Thread.currentThread().setName("CommitLogThread-" + threadID.getAndIncrement());
RateLimiter rl = rateLimit != 0 ? RateLimiter.create(rateLimit) : null;
diff --git a/test/microbench/org/apache/cassandra/test/microbench/CassandraThreadLocalBench.java b/test/microbench/org/apache/cassandra/test/microbench/CassandraThreadLocalBench.java
new file mode 100644
index 000000000000..d93e4e1d2bcf
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/CassandraThreadLocalBench.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.util.concurrent.TimeUnit;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import org.apache.cassandra.metrics.ThreadLocalMetrics;
+
+import io.netty.util.concurrent.FastThreadLocal;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@Warmup(iterations = 4, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 2, time = 30, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 2,
+ jvmArgsAppend = { "-Djmh.executor=CUSTOM", "-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"})
+@Threads(4)
+@State(Scope.Benchmark)
+public class CassandraThreadLocalBench
+{
+ private static final FastThreadLocal threadLocalMetricsCurrent = new FastThreadLocal<>()
+ {
+
+ @Override
+ protected ThreadLocalMetrics initialValue()
+ {
+ return ThreadLocalMetrics.create();
+ }
+ };
+
+ @Benchmark
+ public void netty()
+ {
+ threadLocalMetricsCurrent.get();
+ }
+
+ @Benchmark
+ public void cassandra()
+ {
+ ThreadLocalMetrics.get();
+ }
+
+}
diff --git a/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java b/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java
index 03ea710abcea..58f753924f30 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java
@@ -20,7 +20,7 @@
import java.util.concurrent.ThreadPoolExecutor;
-import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.cassandra.concurrent.CassandraThreadFactory;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQueue;
@@ -35,6 +35,6 @@ public class FastThreadExecutor extends ThreadPoolExecutor
{
public FastThreadExecutor(int size, String name)
{
- super(size, size, 10, SECONDS, newBlockingQueue(), new DefaultThreadFactory(name, true));
+ super(size, size, 10, SECONDS, newBlockingQueue(), new CassandraThreadFactory(name, true));
}
}
diff --git a/test/microbench/org/apache/cassandra/test/microbench/ThreadLocalMetricsBench.java b/test/microbench/org/apache/cassandra/test/microbench/ThreadLocalMetricsBench.java
index 8a514b554b77..a4d6c13b29e9 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/ThreadLocalMetricsBench.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/ThreadLocalMetricsBench.java
@@ -18,8 +18,6 @@
package org.apache.cassandra.test.microbench;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.atomic.LongAdder;
@@ -44,26 +42,29 @@
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 4, time = 1, timeUnit = TimeUnit.SECONDS)
-@Measurement(iterations = 8, time = 2, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 2, time = 60, timeUnit = TimeUnit.SECONDS)
@Fork(value = 2,
jvmArgsAppend = { "-Djmh.executor=CUSTOM", "-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"})
@Threads(4)
@State(Scope.Benchmark)
public class ThreadLocalMetricsBench
{
- @Param({"LongAdder", "PlainArray"})
+ @Param({"LongAdder", "ThreadLocalCounter"})
private String type;
+ @Param({"true", "false"})
+ private boolean polluteCpuCaches;
+
@Param({"50", "100"})
private int metricsCount;
- private List counters;
+ private Counter[] counters;
@Setup(Level.Trial)
public void setup() throws Throwable
{
- counters = new ArrayList<>(metricsCount);
+ counters = new Counter[metricsCount];
for (int i = 0; i < metricsCount; i++)
{
Counter counter;
@@ -72,13 +73,13 @@ public void setup() throws Throwable
case "LongAdder":
counter = new LongAdderCounter();
break;
- case "PlainArray":
+ case "ThreadLocalCounter":
counter = new ThreadLocalCounter();
break;
default:
throw new UnsupportedOperationException();
}
- counters.add(counter);
+ counters[i] = counter;
}
}
@@ -87,8 +88,9 @@ public void setup() throws Throwable
@Setup(Level.Invocation)
public void polluteCpuCaches()
{
- for (int i = 0; i < anotherMemory.length(); i++)
- anotherMemory.incrementAndGet(i);
+ if (polluteCpuCaches)
+ for (int i = 0; i < anotherMemory.length(); i++)
+ anotherMemory.incrementAndGet(i);
}
@Benchmark