Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .build/build-bench.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
<jvmarg line="${java-jvmargs}"/>
<jvmarg line="${_std-test-jvmargs}"/>
<jvmarg line="${test.jvm.args}"/>
<jvmarg line="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints"/>

<!-- total memory must fit within the pod constraints, see comments in .jenkins/Jenkinsfile and dind's container resourceRequestMemory in .jenkins/k8s/jenkins-deployment.yaml -->
<!-- note! this is used for both the JMH runner and VMH fork -->
Expand Down
94 changes: 94 additions & 0 deletions src/java/org/apache/cassandra/concurrent/CassandraThread.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.concurrent;

import org.apache.cassandra.metrics.ThreadLocalMetrics;

import io.netty.util.concurrent.FastThreadLocalThread;

public class CassandraThread extends FastThreadLocalThread
{
private ThreadLocalMetrics threadLocalMetrics;
private ExecutorLocals executorLocals;

public CassandraThread(ThreadGroup group, Runnable target, String name)
{
super(group, target, name);
}

public CassandraThread()
{
super();
}

public ThreadLocalMetrics getThreadLocalMetrics()
{
ThreadLocalMetrics current = threadLocalMetrics;
if (current != null)
return current;

threadLocalMetrics = ThreadLocalMetrics.create();
return threadLocalMetrics;
}

public ExecutorLocals getExecutorLocals()
{
ExecutorLocals current = executorLocals;
if (current != null)
return current;

executorLocals = ExecutorLocals.none();
return executorLocals;
}

public void setExecutorLocals(ExecutorLocals executorLocals)
{
this.executorLocals = executorLocals;
}

public ExecutorLocals replaceExecutorLocals(ExecutorLocals newExecutorLocals)
{
ExecutorLocals current = executorLocals;
if (current != newExecutorLocals)
executorLocals = newExecutorLocals;

return current != null ? current : ExecutorLocals.none();
}

public void doRun()
{
}


// final to avoid skipping of the cleanup logic
final public void run()
{
try
{
super.run();
doRun();
}
finally
{
if (threadLocalMetrics != null)
threadLocalMetrics.release();
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.concurrent;

import io.netty.util.concurrent.DefaultThreadFactory;

public class CassandraThreadFactory extends DefaultThreadFactory
{
public CassandraThreadFactory(String poolName, boolean daemon)
{
super(poolName, daemon);
}

protected Thread newThread(Runnable r, String name)
{
return new CassandraThread(this.threadGroup, r, name);
}
}
52 changes: 41 additions & 11 deletions src/java/org/apache/cassandra/concurrent/ExecutorLocals.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public static class Impl
@SuppressWarnings("resource")
protected static void set(TraceState traceState, ClientWarn.State clientWarnState, boolean eligibleForArtificialLatency)
{
if (traceState == null && clientWarnState == null && !eligibleForArtificialLatency) locals.set(none);
else locals.set(new ExecutorLocals(traceState, clientWarnState, eligibleForArtificialLatency));
if (traceState == null && clientWarnState == null && !eligibleForArtificialLatency) setLocal(none);
else setLocal(new ExecutorLocals(traceState, clientWarnState, eligibleForArtificialLatency));
}
}

Expand All @@ -64,12 +64,46 @@ protected ExecutorLocals(TraceState traceState, ClientWarn.State clientWarnState
this.eligibleForArtificialLatency = eligibleForArtificialLatency;
}

private static void setLocal(ExecutorLocals executorLocals)
{
Thread currentThread = Thread.currentThread();
if (currentThread instanceof CassandraThread)
((CassandraThread) currentThread).setExecutorLocals(executorLocals);
else
locals.set(executorLocals);
}

private ExecutorLocals replace()
{
Thread currentThread = Thread.currentThread();
if (currentThread instanceof CassandraThread)
{
return ((CassandraThread) currentThread).replaceExecutorLocals(this);
}
else
{
ExecutorLocals old = locals.get();
if (old != this)
locals.set(this);
return old;
}
}

/**
* @return an ExecutorLocals object which has the current trace state and client warn state.
*/
public static ExecutorLocals current()
{
return locals.get();
Thread currentThread = Thread.currentThread();
if (currentThread instanceof CassandraThread)
return ((CassandraThread) currentThread).getExecutorLocals();
else
return locals.get();
}

public static ExecutorLocals none()
{
return none;
}

/**
Expand All @@ -84,29 +118,25 @@ public static WithResources propagate()

public static ExecutorLocals create(TraceState traceState)
{
ExecutorLocals current = locals.get();
ExecutorLocals current = current();
return current.traceState == traceState ? current : new ExecutorLocals(traceState, current.clientWarnState, current.eligibleForArtificialLatency);
}

public static void clear()
{
locals.set(none);
setLocal(none);
}

/**
* Overwrite current locals, and return the previous ones
*/
public ExecutorLocals get()
{
// TODO (desired): add compareAndSet to save one thread local round trip
ExecutorLocals old = current();
if (old != this)
locals.set(this);
return old;
return replace();
}

public void close()
{
locals.set(this);
setLocal(this);
}
}
20 changes: 2 additions & 18 deletions src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.utils.JVMStabilityInspector;

import io.netty.util.concurrent.FastThreadLocalThread;

/**
* This class is an implementation of the <i>ThreadFactory</i> interface. This
* is useful to give Java threads meaningful names which is useful when using
Expand Down Expand Up @@ -169,12 +167,12 @@ public static Thread createThread(ThreadGroup threadGroup, Runnable runnable, St
if (PRESERVE_THREAD_CREATION_STACKTRACE)
thread = new InspectableFastThreadLocalThread(threadGroup, runnable, threadName);
else
thread = new FastThreadLocalThread(threadGroup, runnable, threadName);
thread = new CassandraThread(threadGroup, runnable, threadName);
thread.setDaemon(daemon);
return thread;
}

public static class InspectableFastThreadLocalThread extends FastThreadLocalThread
public static class InspectableFastThreadLocalThread extends CassandraThread
{
public StackTraceElement[] creationTrace;

Expand All @@ -184,22 +182,8 @@ private void setStack()
creationTrace = Arrays.copyOfRange(creationTrace, 2, creationTrace.length);
}

public InspectableFastThreadLocalThread() { super(); setStack(); }

public InspectableFastThreadLocalThread(Runnable target) { super(target); setStack(); }

public InspectableFastThreadLocalThread(ThreadGroup group, Runnable target) { super(group, target); setStack(); }

public InspectableFastThreadLocalThread(String name) { super(name); setStack(); }

public InspectableFastThreadLocalThread(ThreadGroup group, String name) { super(group, name); setStack(); }

public InspectableFastThreadLocalThread(Runnable target, String name) { super(target, name); setStack(); }

public InspectableFastThreadLocalThread(ThreadGroup group, Runnable target, String name) { super(group, target, name); setStack(); }

public InspectableFastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) { super(group, target, name, stackSize); setStack(); }

}
public static <T extends Thread> T setupThread(T thread, int priority, ClassLoader contextClassLoader, Thread.UncaughtExceptionHandler uncaughtExceptionHandler)
{
Expand Down
4 changes: 1 addition & 3 deletions src/java/org/apache/cassandra/concurrent/SEPWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@

import org.apache.cassandra.utils.JVMStabilityInspector;

import io.netty.util.concurrent.FastThreadLocalThread;

import static org.apache.cassandra.concurrent.SEPExecutor.TakeTaskPermitResult.RETURNED_WORK_PERMIT;
import static org.apache.cassandra.concurrent.SEPExecutor.TakeTaskPermitResult.TOOK_PERMIT;
import static org.apache.cassandra.config.CassandraRelevantProperties.SET_SEP_THREAD_NAME;
Expand Down Expand Up @@ -60,7 +58,7 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
this.pool = pool;
this.workerId = workerId;
this.workerIdThreadSuffix = '-' + workerId.toString();
thread = new FastThreadLocalThread(threadGroup, this, threadGroup.getName() + "-Worker-" + workerId);
thread = new CassandraThread(threadGroup, this, threadGroup.getName() + "-Worker-" + workerId);
thread.setDaemon(true);
set(initialState);
thread.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.google.common.base.Throwables;

import org.apache.cassandra.concurrent.CassandraThread;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.SerializationHeader;
Expand All @@ -40,8 +41,6 @@
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;

import io.netty.util.concurrent.FastThreadLocalThread;

import static org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQueue;

/**
Expand Down Expand Up @@ -210,11 +209,11 @@ public static class SyncException extends RuntimeException
//// typedef
static class Buffer extends TreeMap<DecoratedKey, PartitionUpdate.Builder> {}

private class DiskWriter extends FastThreadLocalThread
private class DiskWriter extends CassandraThread
{
volatile Throwable exception = null;

public void run()
public void doRun()
{
while (true)
{
Expand Down
19 changes: 14 additions & 5 deletions src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import com.google.common.annotations.VisibleForTesting;

import org.apache.cassandra.concurrent.CassandraThread;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Shutdownable;

Expand Down Expand Up @@ -84,10 +85,7 @@ public class ThreadLocalMetrics
@Override
protected ThreadLocalMetrics initialValue()
{
ThreadLocalMetrics result = new ThreadLocalMetrics();
allThreadLocalMetrics.add(result);
destroyWhenUnreachable(Thread.currentThread(), result::release);
return result;
return create();
}

// this method is invoked when a thread is going to finish, but it works only for FastThreadLocalThread
Expand All @@ -99,6 +97,14 @@ protected void onRemoval(ThreadLocalMetrics value)
}
};

public static ThreadLocalMetrics create()
{
ThreadLocalMetrics result = new ThreadLocalMetrics();
allThreadLocalMetrics.add(result);
destroyWhenUnreachable(Thread.currentThread(), result::release);
return result;
}

private static volatile AtomicLongArray summaryValues = new AtomicLongArray(INITIAL_COUNTERS_CAPACITY);

private static final Shutdownable cleaner;
Expand Down Expand Up @@ -182,7 +188,7 @@ public static void shutdownCleaner(long timeout, TimeUnit unit) throws Interrupt
shutdownAndWait(timeout, unit, of(cleaner));
}

private void release()
public void release()
{
// Using this lock while moving we want to avoid races with readers in getCount
// such races can cause a transfered value lost or its double-counting by a reader
Expand Down Expand Up @@ -275,6 +281,9 @@ public static long getCountAndReset(int metricId)
}

public static ThreadLocalMetrics get() {
Thread currentThread = Thread.currentThread();
if (currentThread instanceof CassandraThread)
return ((CassandraThread)currentThread).getThreadLocalMetrics();
return threadLocalMetricsCurrent.get();
}

Expand Down
Loading