Skip to content

Commit 3a6a2ee

Browse files
committed
This closes #480
2 parents b3397aa + e6fd10c commit 3a6a2ee

21 files changed

Lines changed: 942 additions & 110 deletions

File tree

api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import java.util.concurrent.Executor;
2525

2626
import org.apache.brooklyn.api.entity.Entity;
27+
import org.apache.brooklyn.util.guava.Maybe;
28+
29+
import com.google.common.annotations.Beta;
2730

2831
/**
2932
* This is a Brooklyn extension to the Java {@link Executor}.
@@ -64,4 +67,18 @@ public interface ExecutionContext extends Executor {
6467

6568
boolean isShutdown();
6669

70+
/**
71+
* Gets the value promptly, or returns {@link Maybe#absent()} if the value is not yet available.
72+
* It may throw an error if it cannot be determined whether a value is available immediately or not.
73+
* <p>
74+
* Implementations will typically attempt to execute in the current thread, with appropriate
75+
* tricks to make it look like it is in a sub-thread, and will attempt to be non-blocking but
76+
* if needed they may block.
77+
* <p>
78+
* Supports {@link Callable} and {@link Runnable} and some {@link Task} targets to be evaluated with "immediate" semantics.
79+
*/
80+
// TODO reference ImmediateSupplier when that class is moved to utils project
81+
@Beta
82+
<T> Maybe<T> getImmediately(Object callableOrSupplierOrTask);
83+
6784
}

camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java

Lines changed: 69 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.brooklyn.util.core.task.DeferredSupplier;
4949
import org.apache.brooklyn.util.core.task.ImmediateSupplier;
5050
import org.apache.brooklyn.util.core.task.TaskBuilder;
51+
import org.apache.brooklyn.util.core.task.TaskTags;
5152
import org.apache.brooklyn.util.core.task.Tasks;
5253
import org.apache.brooklyn.util.exceptions.Exceptions;
5354
import org.apache.brooklyn.util.groovy.GroovyJavaMethods;
@@ -206,6 +207,15 @@ public Maybe<Entity> getImmediately() {
206207
}
207208
}
208209

210+
@Override
211+
public Entity get() {
212+
try {
213+
return call();
214+
} catch (Exception e) {
215+
throw Exceptions.propagate(e);
216+
}
217+
}
218+
209219
@Override
210220
public Entity call() throws Exception {
211221
return callImpl(false).get();
@@ -219,7 +229,7 @@ protected Maybe<Entity> getEntity(boolean immediate) {
219229
return Maybe.of(scopeComponent.get());
220230
}
221231
} else {
222-
return Maybe.<Entity>of(entity());
232+
return Maybe.<Entity>ofDisallowingNull(entity()).or(Maybe.<Entity>absent("Context entity not available when trying to evaluate Brooklyn DSL"));
223233
}
224234
}
225235

@@ -311,10 +321,11 @@ protected Maybe<Entity> callImpl(boolean immediate) throws Exception {
311321
return Maybe.of(result.get());
312322
}
313323

314-
// TODO may want to block and repeat on new entities joining?
315-
throw new NoSuchElementException("No entity matching id " + desiredComponentId+
324+
// could be nice if DSL has an extra .block() method to allow it to wait for a matching entity.
325+
// previously we threw if nothing existed; now we return an absent with a detailed error
326+
return Maybe.absent(new NoSuchElementException("No entity matching id " + desiredComponentId+
316327
(scope==Scope.GLOBAL ? "" : ", in scope "+scope+" wrt "+entity+
317-
(scopeComponent!=null ? " ("+scopeComponent+" from "+entity()+")" : "")));
328+
(scopeComponent!=null ? " ("+scopeComponent+" from "+entity()+")" : ""))));
318329
}
319330

320331
private ExecutionContext getExecutionContext() {
@@ -538,14 +549,11 @@ protected String resolveKeyName(boolean immediately) {
538549

539550
@Override
540551
public final Maybe<Object> getImmediately() {
541-
Maybe<Entity> targetEntityMaybe = component.getImmediately();
542-
if (targetEntityMaybe.isAbsent()) return Maybe.absent("Target entity not available");
543-
EntityInternal targetEntity = (EntityInternal) targetEntityMaybe.get();
544-
545-
String keyNameS = resolveKeyName(true);
546-
ConfigKey<?> key = targetEntity.getEntityType().getConfigKey(keyNameS);
547-
Maybe<?> result = targetEntity.config().getNonBlocking(key != null ? key : ConfigKeys.newConfigKey(Object.class, keyNameS));
548-
return Maybe.<Object>cast(result);
552+
Maybe<Object> maybeWrappedMaybe = findExecutionContext(this).getImmediately(newCallableReturningImmediateMaybeOrNonImmediateValue(true));
553+
// the answer will be wrapped twice due to the callable semantics;
554+
// the inner present/absent is important; it will only get an outer absent if interrupted
555+
if (maybeWrappedMaybe.isAbsent()) return maybeWrappedMaybe;
556+
return Maybe.<Object>cast( (Maybe<?>) maybeWrappedMaybe.get() );
549557
}
550558

551559
@Override
@@ -554,15 +562,55 @@ public Task<Object> newTask() {
554562
.displayName("retrieving config for "+keyName)
555563
.tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
556564
.dynamic(false)
557-
.body(new Callable<Object>() {
558-
@Override
559-
public Object call() throws Exception {
560-
Entity targetEntity = component.get();
561-
String keyNameS = resolveKeyName(true);
562-
ConfigKey<?> key = targetEntity.getEntityType().getConfigKey(keyNameS);
563-
return targetEntity.getConfig(key != null ? key : ConfigKeys.newConfigKey(Object.class, keyNameS));
564-
}})
565-
.build();
565+
.body(newCallableReturningImmediateMaybeOrNonImmediateValue(false)).build();
566+
}
567+
568+
private Callable<Object> newCallableReturningImmediateMaybeOrNonImmediateValue(final boolean immediate) {
569+
return new Callable<Object>() {
570+
@Override
571+
public Object call() throws Exception {
572+
Entity targetEntity;
573+
if (immediate) {
574+
Maybe<Entity> targetEntityMaybe = component.getImmediately();
575+
if (targetEntityMaybe.isAbsent()) return Maybe.<Object>cast(targetEntityMaybe);
576+
targetEntity = (EntityInternal) targetEntityMaybe.get();
577+
} else {
578+
targetEntity = component.get();
579+
}
580+
581+
// this is always run in a new dedicated task (possibly a fake task if immediate), so no need to clear
582+
checkAndTagForRecursiveReference(targetEntity);
583+
584+
String keyNameS = resolveKeyName(true);
585+
ConfigKey<?> key = targetEntity.getEntityType().getConfigKey(keyNameS);
586+
if (key==null) key = ConfigKeys.newConfigKey(Object.class, keyNameS);
587+
if (immediate) {
588+
return ((EntityInternal)targetEntity).config().getNonBlocking(key);
589+
} else {
590+
return targetEntity.getConfig(key);
591+
}
592+
}
593+
};
594+
}
595+
596+
private void checkAndTagForRecursiveReference(Entity targetEntity) {
597+
String tag = "DSL:entity('"+targetEntity.getId()+"').config('"+keyName+"')";
598+
Task<?> ancestor = Tasks.current();
599+
if (ancestor!=null) {
600+
// don't check on ourself; only look higher in hierarchy;
601+
// this assumes impls always spawn new tasks (which they do, just maybe not always in new threads)
602+
// but it means it does not rely on tag removal to prevent weird errors,
603+
// and more importantly it makes the strategy idempotent
604+
ancestor = ancestor.getSubmittedByTask();
605+
}
606+
while (ancestor!=null) {
607+
if (TaskTags.hasTag(ancestor, tag)) {
608+
throw new IllegalStateException("Recursive config reference "+tag);
609+
}
610+
ancestor = ancestor.getSubmittedByTask();
611+
}
612+
613+
Tasks.addTagDynamically(tag);
566614
}
567615

568616
@Override

camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,14 @@
2828

2929
import org.apache.brooklyn.api.entity.Entity;
3030
import org.apache.brooklyn.core.config.ConfigKeys;
31+
import org.apache.brooklyn.core.entity.Entities;
32+
import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
3133
import org.apache.brooklyn.core.sensor.Sensors;
3234
import org.apache.brooklyn.core.test.entity.TestEntity;
35+
import org.apache.brooklyn.test.Asserts;
36+
import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
37+
import org.apache.brooklyn.util.time.Duration;
38+
import org.apache.brooklyn.util.time.Time;
3339
import org.slf4j.Logger;
3440
import org.slf4j.LoggerFactory;
3541
import org.testng.annotations.AfterMethod;
@@ -44,7 +50,6 @@
4450

4551
public class ConfigYamlTest extends AbstractYamlTest {
4652

47-
@SuppressWarnings("unused")
4853
private static final Logger LOG = LoggerFactory.getLogger(ConfigYamlTest.class);
4954

5055
private ExecutorService executor;
@@ -91,6 +96,61 @@ public void testConfigInConfigBlock() throws Exception {
9196
assertNull(entity.getMyField()); // field with @SetFromFlag
9297
assertNull(entity.getMyField2()); // field with @SetFromFlag("myField2Alias"), set using alias
9398
}
99+
100+
101+
@Test
102+
public void testRecursiveConfigFailsGracefully() throws Exception {
103+
doTestRecursiveConfigFailsGracefully(false);
104+
}
105+
106+
@Test
107+
public void testRecursiveConfigImmediateFailsGracefully() throws Exception {
108+
doTestRecursiveConfigFailsGracefully(true);
109+
}
110+
111+
protected void doTestRecursiveConfigFailsGracefully(boolean immediate) throws Exception {
112+
String yaml = Joiner.on("\n").join(
113+
"services:",
114+
"- type: org.apache.brooklyn.core.test.entity.TestEntity",
115+
" brooklyn.config:",
116+
" infinite_loop: $brooklyn:config(\"infinite_loop\")");
117+
118+
final Entity app = createStartWaitAndLogApplication(yaml);
119+
TestEntity entity = (TestEntity) Iterables.getOnlyElement(app.getChildren());
120+
121+
Thread t = new Thread(new Runnable() {
122+
@Override
123+
public void run() {
124+
try {
125+
Time.sleep(Duration.FIVE_SECONDS);
126+
// error, loop wasn't interrupted or detected
127+
LOG.warn("Timeout elapsed, destroying items; usage: "+
128+
((LocalManagementContext)mgmt()).getGarbageCollector().getUsageString());
129+
Entities.destroy(app);
130+
} catch (RuntimeInterruptedException e) {
131+
// expected on normal execution; clear the interrupted flag to prevent ugly further warnings being logged
132+
Thread.interrupted();
133+
}
134+
}
135+
});
136+
t.start();
137+
try {
138+
String c;
139+
if (immediate) {
140+
// this should throw rather than return "absent", because the error is definitive (absent means couldn't resolve in time)
141+
c = entity.config().getNonBlocking(ConfigKeys.newStringConfigKey("infinite_loop")).or("FAILED");
142+
} else {
143+
c = entity.config().get(ConfigKeys.newStringConfigKey("infinite_loop"));
144+
}
145+
Asserts.shouldHaveFailedPreviously("Expected recursive error, instead got: "+c);
146+
} catch (Exception e) {
147+
Asserts.expectedFailureContainsIgnoreCase(e, "infinite_loop", "recursive");
148+
} finally {
149+
if (!Entities.isManaged(app)) {
150+
t.interrupt();
151+
}
152+
}
153+
}
94154

95155
@Test
96156
public void testConfigAtTopLevel() throws Exception {

camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.brooklyn.util.guava.Maybe;
4949
import org.apache.brooklyn.util.text.Identifiers;
5050
import org.apache.brooklyn.util.time.Duration;
51+
import org.testng.Assert;
5152
import org.testng.annotations.AfterMethod;
5253
import org.testng.annotations.BeforeMethod;
5354
import org.testng.annotations.Test;
@@ -296,14 +297,13 @@ public void testUrlEncode() throws Exception {
296297
@Test
297298
public void testEntityNotFound() throws Exception {
298299
BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.entity("myIdDoesNotExist");
300+
Maybe<?> actualValue = execDslImmediately(dsl, Entity.class, app, true);
301+
Assert.assertTrue(actualValue.isAbsent());
299302
try {
300-
Maybe<?> actualValue = execDslImmediately(dsl, Entity.class, app, true);
303+
actualValue.get();
301304
Asserts.shouldHaveFailedPreviously("actual="+actualValue);
302305
} catch (Exception e) {
303-
NoSuchElementException nsee = Exceptions.getFirstThrowableOfType(e, NoSuchElementException.class);
304-
if (nsee == null) {
305-
throw e;
306-
}
306+
Asserts.expectedFailureOfType(e, NoSuchElementException.class);
307307
}
308308
}
309309

@@ -365,7 +365,7 @@ public DslTestWorker satisfiedAsynchronously(boolean val) {
365365
return this;
366366
}
367367

368-
@SuppressWarnings("unused") // included for completeness?
368+
@SuppressWarnings("unused") // kept in case useful for additional tests, for completeness
369369
public DslTestWorker wrapInTaskForImmediately(boolean val) {
370370
wrapInTaskForImmediately = val;
371371
return this;

core/src/main/java/org/apache/brooklyn/core/config/internal/AbstractConfigMapImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import javax.annotation.Nullable;
3131

3232
import org.apache.brooklyn.api.mgmt.ExecutionContext;
33+
import org.apache.brooklyn.api.mgmt.TaskFactory;
3334
import org.apache.brooklyn.api.objs.BrooklynObject;
3435
import org.apache.brooklyn.config.ConfigInheritance;
3536
import org.apache.brooklyn.config.ConfigInheritances;
@@ -231,7 +232,7 @@ public Maybe<Object> getConfigRaw(ConfigKey<?> key, boolean includeInherited) {
231232
}
232233

233234
protected Object coerceConfigVal(ConfigKey<?> key, Object v) {
234-
if ((v instanceof Future) || (v instanceof DeferredSupplier)) {
235+
if ((v instanceof Future) || (v instanceof DeferredSupplier) || (v instanceof TaskFactory)) {
235236
// no coercion for these (coerce on exit)
236237
return v;
237238
} else if (key instanceof StructuredConfigKey) {

core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ public class BrooklynTaskTags extends TaskTags {
8080
* and that it need not appear in some task lists;
8181
* often used for framework lifecycle events and sensor polling */
8282
public static final String TRANSIENT_TASK_TAG = "TRANSIENT";
83+
/** marks that a task is meant to return immediately, without blocking (or if absolutely necessary blocking for a short while) */
84+
public static final String IMMEDIATE_TASK_TAG = "IMMEDIATE";
8385

8486
// ------------- entity tags -------------------------
8587

core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@ protected <T> Maybe<T> getNonBlockingResolvingSimple(ConfigKey<T> key) {
146146
.immediately(true)
147147
.deep(true)
148148
.context(getContext())
149-
.swallowExceptions()
150149
.get();
151150
return (resolved != marker)
152151
? TypeCoercions.tryCoerce(resolved, key.getTypeToken())

core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,13 @@
4141
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity;
4242
import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
4343
import org.apache.brooklyn.util.collections.MutableMap;
44+
import org.apache.brooklyn.util.core.task.ImmediateSupplier.ImmediateUnsupportedException;
45+
import org.apache.brooklyn.util.guava.Maybe;
4446
import org.slf4j.Logger;
4547
import org.slf4j.LoggerFactory;
4648

4749
import com.google.common.base.Function;
50+
import com.google.common.base.Supplier;
4851
import com.google.common.collect.Iterables;
4952

5053
/**
@@ -96,7 +99,55 @@ public ExecutionManager getExecutionManager() {
9699
/** returns tasks started by this context (or tasks which have all the tags on this object) */
97100
@Override
98101
public Set<Task<?>> getTasks() { return executionManager.getTasksWithAllTags(tags); }
99-
102+
103+
/** performs execution without spawning a new task thread, though it does temporarily set a fake task for the purpose of getting context;
104+
* currently supports {@link Supplier}, {@link Callable}, {@link Runnable}, or {@link Task} instances;
105+
* with tasks if it is submitted or in progress,
106+
* it fails if not completed; with unsubmitted, unqueued tasks, it gets the {@link Callable} job and
107+
* uses that; with such a job, or any other callable/supplier/runnable, it runs that
108+
* in an {@link InterruptingImmediateSupplier}, with as much metadata as possible (eg task name if
109+
* given a task) set <i>temporarily</i> in the current thread context */
110+
@SuppressWarnings("unchecked")
111+
@Override
112+
public <T> Maybe<T> getImmediately(Object callableOrSupplier) {
113+
BasicTask<?> fakeTaskForContext;
114+
if (callableOrSupplier instanceof BasicTask) {
115+
fakeTaskForContext = (BasicTask<?>)callableOrSupplier;
116+
if (fakeTaskForContext.isQueuedOrSubmitted()) {
117+
if (fakeTaskForContext.isDone()) {
118+
return Maybe.of((T)fakeTaskForContext.getUnchecked());
119+
} else {
120+
throw new ImmediateUnsupportedException("Task is in progress and incomplete: "+fakeTaskForContext);
121+
}
122+
}
123+
callableOrSupplier = fakeTaskForContext.getJob();
124+
} else {
125+
fakeTaskForContext = new BasicTask<Object>(MutableMap.of("displayName", "immediate evaluation"));
126+
}
127+
fakeTaskForContext.tags.addAll(tags);
128+
fakeTaskForContext.tags.add(BrooklynTaskTags.IMMEDIATE_TASK_TAG);
129+
fakeTaskForContext.tags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG);
130+
131+
Task<?> previousTask = BasicExecutionManager.getPerThreadCurrentTask().get();
132+
BasicExecutionContext oldExecutionContext = getCurrentExecutionContext();
133+
registerPerThreadExecutionContext();
134+
135+
if (previousTask!=null) fakeTaskForContext.setSubmittedByTask(previousTask);
136+
fakeTaskForContext.cancel();
137+
try {
138+
BasicExecutionManager.getPerThreadCurrentTask().set(fakeTaskForContext);
139+
140+
if (!(callableOrSupplier instanceof ImmediateSupplier)) {
141+
callableOrSupplier = InterruptingImmediateSupplier.of(callableOrSupplier);
142+
}
143+
return ((ImmediateSupplier<T>)callableOrSupplier).getImmediately();
144+
145+
} finally {
146+
BasicExecutionManager.getPerThreadCurrentTask().set(previousTask);
147+
perThreadExecutionContext.set(oldExecutionContext);
148+
}
149+
}
150+
100151
@SuppressWarnings({ "unchecked", "rawtypes" })
101152
@Override
102153
protected <T> Task<T> submitInternal(Map<?,?> propertiesQ, final Object task) {

0 commit comments

Comments
 (0)