diff --git a/.build/build-bench.xml b/.build/build-bench.xml index 4ea0ff571e7d..7d6f564bbc0e 100644 --- a/.build/build-bench.xml +++ b/.build/build-bench.xml @@ -93,6 +93,7 @@ + diff --git a/src/java/org/apache/cassandra/concurrent/CassandraThread.java b/src/java/org/apache/cassandra/concurrent/CassandraThread.java new file mode 100644 index 000000000000..4b9f7c9eca28 --- /dev/null +++ b/src/java/org/apache/cassandra/concurrent/CassandraThread.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.concurrent; + +import org.apache.cassandra.metrics.ThreadLocalMetrics; + +import io.netty.util.concurrent.FastThreadLocalThread; + +public class CassandraThread extends FastThreadLocalThread +{ + private ThreadLocalMetrics threadLocalMetrics; + private ExecutorLocals executorLocals; + + public CassandraThread(ThreadGroup group, Runnable target, String name) + { + super(group, target, name); + } + + public CassandraThread() + { + super(); + } + + public ThreadLocalMetrics getThreadLocalMetrics() + { + ThreadLocalMetrics current = threadLocalMetrics; + if (current != null) + return current; + + threadLocalMetrics = ThreadLocalMetrics.create(); + return threadLocalMetrics; + } + + public ExecutorLocals getExecutorLocals() + { + ExecutorLocals current = executorLocals; + if (current != null) + return current; + + executorLocals = ExecutorLocals.none(); + return executorLocals; + } + + public void setExecutorLocals(ExecutorLocals executorLocals) + { + this.executorLocals = executorLocals; + } + + public ExecutorLocals replaceExecutorLocals(ExecutorLocals newExecutorLocals) + { + ExecutorLocals current = executorLocals; + if (current != newExecutorLocals) + executorLocals = newExecutorLocals; + + return current != null ? current : ExecutorLocals.none(); + } + + public void doRun() + { + } + + + // final to avoid skipping of the cleanup logic + final public void run() + { + try + { + super.run(); + doRun(); + } + finally + { + if (threadLocalMetrics != null) + threadLocalMetrics.release(); + } + + } +} diff --git a/src/java/org/apache/cassandra/concurrent/CassandraThreadFactory.java b/src/java/org/apache/cassandra/concurrent/CassandraThreadFactory.java new file mode 100644 index 000000000000..d7fd907bc0f4 --- /dev/null +++ b/src/java/org/apache/cassandra/concurrent/CassandraThreadFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.concurrent; + +import io.netty.util.concurrent.DefaultThreadFactory; + +public class CassandraThreadFactory extends DefaultThreadFactory +{ + public CassandraThreadFactory(String poolName, boolean daemon) + { + super(poolName, daemon); + } + + protected Thread newThread(Runnable r, String name) + { + return new CassandraThread(this.threadGroup, r, name); + } +} diff --git a/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java b/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java index 55c6ee24642a..b4b7f094cc0d 100644 --- a/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java +++ b/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java @@ -48,8 +48,8 @@ public static class Impl @SuppressWarnings("resource") protected static void set(TraceState traceState, ClientWarn.State clientWarnState, boolean eligibleForArtificialLatency) { - if (traceState == null && clientWarnState == null && !eligibleForArtificialLatency) locals.set(none); - else locals.set(new ExecutorLocals(traceState, clientWarnState, eligibleForArtificialLatency)); + if (traceState == null && clientWarnState == null && !eligibleForArtificialLatency) setLocal(none); + else setLocal(new ExecutorLocals(traceState, clientWarnState, eligibleForArtificialLatency)); } } @@ -64,12 +64,46 @@ protected ExecutorLocals(TraceState traceState, ClientWarn.State clientWarnState this.eligibleForArtificialLatency = eligibleForArtificialLatency; } + private static void setLocal(ExecutorLocals executorLocals) + { + Thread currentThread = Thread.currentThread(); + if (currentThread instanceof CassandraThread) + ((CassandraThread) currentThread).setExecutorLocals(executorLocals); + else + locals.set(executorLocals); + } + + private ExecutorLocals replace() + { + Thread currentThread = Thread.currentThread(); + if (currentThread instanceof CassandraThread) + { + return ((CassandraThread) currentThread).replaceExecutorLocals(this); + } + else + { + ExecutorLocals old = locals.get(); + if (old != this) + locals.set(this); + return old; + } + } + /** * @return an ExecutorLocals object which has the current trace state and client warn state. */ public static ExecutorLocals current() { - return locals.get(); + Thread currentThread = Thread.currentThread(); + if (currentThread instanceof CassandraThread) + return ((CassandraThread) currentThread).getExecutorLocals(); + else + return locals.get(); + } + + public static ExecutorLocals none() + { + return none; } /** @@ -84,13 +118,13 @@ public static WithResources propagate() public static ExecutorLocals create(TraceState traceState) { - ExecutorLocals current = locals.get(); + ExecutorLocals current = current(); return current.traceState == traceState ? current : new ExecutorLocals(traceState, current.clientWarnState, current.eligibleForArtificialLatency); } public static void clear() { - locals.set(none); + setLocal(none); } /** @@ -98,15 +132,11 @@ public static void clear() */ public ExecutorLocals get() { - // TODO (desired): add compareAndSet to save one thread local round trip - ExecutorLocals old = current(); - if (old != this) - locals.set(this); - return old; + return replace(); } public void close() { - locals.set(this); + setLocal(this); } } diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java index de43f33e7ebe..e2b2c970d1e5 100644 --- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java +++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java @@ -26,8 +26,6 @@ import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.utils.JVMStabilityInspector; -import io.netty.util.concurrent.FastThreadLocalThread; - /** * This class is an implementation of the ThreadFactory interface. This * is useful to give Java threads meaningful names which is useful when using @@ -169,12 +167,12 @@ public static Thread createThread(ThreadGroup threadGroup, Runnable runnable, St if (PRESERVE_THREAD_CREATION_STACKTRACE) thread = new InspectableFastThreadLocalThread(threadGroup, runnable, threadName); else - thread = new FastThreadLocalThread(threadGroup, runnable, threadName); + thread = new CassandraThread(threadGroup, runnable, threadName); thread.setDaemon(daemon); return thread; } - public static class InspectableFastThreadLocalThread extends FastThreadLocalThread + public static class InspectableFastThreadLocalThread extends CassandraThread { public StackTraceElement[] creationTrace; @@ -184,22 +182,8 @@ private void setStack() creationTrace = Arrays.copyOfRange(creationTrace, 2, creationTrace.length); } - public InspectableFastThreadLocalThread() { super(); setStack(); } - - public InspectableFastThreadLocalThread(Runnable target) { super(target); setStack(); } - - public InspectableFastThreadLocalThread(ThreadGroup group, Runnable target) { super(group, target); setStack(); } - - public InspectableFastThreadLocalThread(String name) { super(name); setStack(); } - - public InspectableFastThreadLocalThread(ThreadGroup group, String name) { super(group, name); setStack(); } - - public InspectableFastThreadLocalThread(Runnable target, String name) { super(target, name); setStack(); } - public InspectableFastThreadLocalThread(ThreadGroup group, Runnable target, String name) { super(group, target, name); setStack(); } - public InspectableFastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) { super(group, target, name, stackSize); setStack(); } - } public static T setupThread(T thread, int priority, ClassLoader contextClassLoader, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java index 8f1a5e9aa285..8eb34303e27d 100644 --- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java +++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java @@ -27,8 +27,6 @@ import org.apache.cassandra.utils.JVMStabilityInspector; -import io.netty.util.concurrent.FastThreadLocalThread; - import static org.apache.cassandra.concurrent.SEPExecutor.TakeTaskPermitResult.RETURNED_WORK_PERMIT; import static org.apache.cassandra.concurrent.SEPExecutor.TakeTaskPermitResult.TOOK_PERMIT; import static org.apache.cassandra.config.CassandraRelevantProperties.SET_SEP_THREAD_NAME; @@ -60,7 +58,7 @@ final class SEPWorker extends AtomicReference implements Runnabl this.pool = pool; this.workerId = workerId; this.workerIdThreadSuffix = '-' + workerId.toString(); - thread = new FastThreadLocalThread(threadGroup, this, threadGroup.getName() + "-Worker-" + workerId); + thread = new CassandraThread(threadGroup, this, threadGroup.getName() + "-Worker-" + workerId); thread.setDaemon(true); set(initialState); thread.start(); diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java index 8c9b2f979621..7cf48d30ebb8 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java @@ -26,6 +26,7 @@ import com.google.common.base.Throwables; +import org.apache.cassandra.concurrent.CassandraThread; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.SerializationHeader; @@ -40,8 +41,6 @@ import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; -import io.netty.util.concurrent.FastThreadLocalThread; - import static org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQueue; /** @@ -210,11 +209,11 @@ public static class SyncException extends RuntimeException //// typedef static class Buffer extends TreeMap {} - private class DiskWriter extends FastThreadLocalThread + private class DiskWriter extends CassandraThread { volatile Throwable exception = null; - public void run() + public void doRun() { while (true) { diff --git a/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java b/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java index 0135522ac051..a7eaa6b42905 100644 --- a/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java @@ -36,6 +36,7 @@ import com.google.common.annotations.VisibleForTesting; +import org.apache.cassandra.concurrent.CassandraThread; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Shutdownable; @@ -84,10 +85,7 @@ public class ThreadLocalMetrics @Override protected ThreadLocalMetrics initialValue() { - ThreadLocalMetrics result = new ThreadLocalMetrics(); - allThreadLocalMetrics.add(result); - destroyWhenUnreachable(Thread.currentThread(), result::release); - return result; + return create(); } // this method is invoked when a thread is going to finish, but it works only for FastThreadLocalThread @@ -99,6 +97,14 @@ protected void onRemoval(ThreadLocalMetrics value) } }; + public static ThreadLocalMetrics create() + { + ThreadLocalMetrics result = new ThreadLocalMetrics(); + allThreadLocalMetrics.add(result); + destroyWhenUnreachable(Thread.currentThread(), result::release); + return result; + } + private static volatile AtomicLongArray summaryValues = new AtomicLongArray(INITIAL_COUNTERS_CAPACITY); private static final Shutdownable cleaner; @@ -182,7 +188,7 @@ public static void shutdownCleaner(long timeout, TimeUnit unit) throws Interrupt shutdownAndWait(timeout, unit, of(cleaner)); } - private void release() + public void release() { // Using this lock while moving we want to avoid races with readers in getCount // such races can cause a transfered value lost or its double-counting by a reader @@ -275,6 +281,9 @@ public static long getCountAndReset(int metricId) } public static ThreadLocalMetrics get() { + Thread currentThread = Thread.currentThread(); + if (currentThread instanceof CassandraThread) + return ((CassandraThread)currentThread).getThreadLocalMetrics(); return threadLocalMetricsCurrent.get(); } diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java index 5d1c005153b9..032e96a98786 100644 --- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java +++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java @@ -50,6 +50,7 @@ import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.UpdateBuilder; import org.apache.cassandra.Util; +import org.apache.cassandra.concurrent.CassandraThread; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; @@ -70,8 +71,6 @@ import org.apache.cassandra.security.EncryptionContext; import org.apache.cassandra.security.EncryptionContextGenerator; -import io.netty.util.concurrent.FastThreadLocalThread; - import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; @Ignore @@ -403,7 +402,7 @@ private static ByteBuffer randomBytes(int quantity, Random tlr) return slice; } - public class CommitlogThread extends FastThreadLocalThread + public class CommitlogThread extends CassandraThread { final AtomicLong counter = new AtomicLong(); int hash = 0; @@ -421,7 +420,7 @@ public class CommitlogThread extends FastThreadLocalThread this.random = rand; } - public void run() + public void doRun() { Thread.currentThread().setName("CommitLogThread-" + threadID.getAndIncrement()); RateLimiter rl = rateLimit != 0 ? RateLimiter.create(rateLimit) : null; diff --git a/test/microbench/org/apache/cassandra/test/microbench/CassandraThreadLocalBench.java b/test/microbench/org/apache/cassandra/test/microbench/CassandraThreadLocalBench.java new file mode 100644 index 000000000000..d93e4e1d2bcf --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/CassandraThreadLocalBench.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench; + +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import org.apache.cassandra.metrics.ThreadLocalMetrics; + +import io.netty.util.concurrent.FastThreadLocal; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Warmup(iterations = 4, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 2, time = 30, timeUnit = TimeUnit.SECONDS) +@Fork(value = 2, + jvmArgsAppend = { "-Djmh.executor=CUSTOM", "-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"}) +@Threads(4) +@State(Scope.Benchmark) +public class CassandraThreadLocalBench +{ + private static final FastThreadLocal threadLocalMetricsCurrent = new FastThreadLocal<>() + { + + @Override + protected ThreadLocalMetrics initialValue() + { + return ThreadLocalMetrics.create(); + } + }; + + @Benchmark + public void netty() + { + threadLocalMetricsCurrent.get(); + } + + @Benchmark + public void cassandra() + { + ThreadLocalMetrics.get(); + } + +} diff --git a/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java b/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java index 03ea710abcea..58f753924f30 100644 --- a/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java +++ b/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java @@ -20,7 +20,7 @@ import java.util.concurrent.ThreadPoolExecutor; -import io.netty.util.concurrent.DefaultThreadFactory; +import org.apache.cassandra.concurrent.CassandraThreadFactory; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQueue; @@ -35,6 +35,6 @@ public class FastThreadExecutor extends ThreadPoolExecutor { public FastThreadExecutor(int size, String name) { - super(size, size, 10, SECONDS, newBlockingQueue(), new DefaultThreadFactory(name, true)); + super(size, size, 10, SECONDS, newBlockingQueue(), new CassandraThreadFactory(name, true)); } } diff --git a/test/microbench/org/apache/cassandra/test/microbench/ThreadLocalMetricsBench.java b/test/microbench/org/apache/cassandra/test/microbench/ThreadLocalMetricsBench.java index 8a514b554b77..a4d6c13b29e9 100644 --- a/test/microbench/org/apache/cassandra/test/microbench/ThreadLocalMetricsBench.java +++ b/test/microbench/org/apache/cassandra/test/microbench/ThreadLocalMetricsBench.java @@ -18,8 +18,6 @@ package org.apache.cassandra.test.microbench; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongArray; import java.util.concurrent.atomic.LongAdder; @@ -44,26 +42,29 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.NANOSECONDS) @Warmup(iterations = 4, time = 1, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 8, time = 2, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 2, time = 60, timeUnit = TimeUnit.SECONDS) @Fork(value = 2, jvmArgsAppend = { "-Djmh.executor=CUSTOM", "-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"}) @Threads(4) @State(Scope.Benchmark) public class ThreadLocalMetricsBench { - @Param({"LongAdder", "PlainArray"}) + @Param({"LongAdder", "ThreadLocalCounter"}) private String type; + @Param({"true", "false"}) + private boolean polluteCpuCaches; + @Param({"50", "100"}) private int metricsCount; - private List counters; + private Counter[] counters; @Setup(Level.Trial) public void setup() throws Throwable { - counters = new ArrayList<>(metricsCount); + counters = new Counter[metricsCount]; for (int i = 0; i < metricsCount; i++) { Counter counter; @@ -72,13 +73,13 @@ public void setup() throws Throwable case "LongAdder": counter = new LongAdderCounter(); break; - case "PlainArray": + case "ThreadLocalCounter": counter = new ThreadLocalCounter(); break; default: throw new UnsupportedOperationException(); } - counters.add(counter); + counters[i] = counter; } } @@ -87,8 +88,9 @@ public void setup() throws Throwable @Setup(Level.Invocation) public void polluteCpuCaches() { - for (int i = 0; i < anotherMemory.length(); i++) - anotherMemory.incrementAndGet(i); + if (polluteCpuCaches) + for (int i = 0; i < anotherMemory.length(); i++) + anotherMemory.incrementAndGet(i); } @Benchmark