Skip to content

Commit 49f0e22

Browse files
Alex Heneveldahgittin
authored andcommitted
cleanup, and allow TaskFactory to be supplied as a config and other ValueResolver input
the TF will create a task which will then be used for evaluation. much cleaner semantics than setting tasks as values: tasks evaluate once and remember their result, whereas task factory spawns a new task each time. furthermore, the former cannot be interrupted without making the value _never_ resolvable (which was the case prior to the previous commit) so it is left running if immediate eval is done, whereas the latter can be safely cancelled.
1 parent f84d886 commit 49f0e22

10 files changed

Lines changed: 114 additions & 47 deletions

File tree

camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,15 @@ public Maybe<Entity> getImmediately() {
203203
}
204204
}
205205

206+
@Override
207+
public Entity get() {
208+
try {
209+
return call();
210+
} catch (Exception e) {
211+
throw Exceptions.propagate(e);
212+
}
213+
}
214+
206215
@Override
207216
public Entity call() throws Exception {
208217
return callImpl(false).get();
@@ -304,10 +313,11 @@ protected Maybe<Entity> callImpl(boolean immediate) throws Exception {
304313
return Maybe.of(result.get());
305314
}
306315

307-
// TODO may want to block and repeat on new entities joining?
308-
throw new NoSuchElementException("No entity matching id " + desiredComponentId+
316+
// could be nice if DSL has an extra .block() method to allow it to wait for a matching entity.
317+
// previously we threw if nothing existed; now we return an absent with a detailed error
318+
return Maybe.absent(new NoSuchElementException("No entity matching id " + desiredComponentId+
309319
(scope==Scope.GLOBAL ? "" : ", in scope "+scope+" wrt "+entity+
310-
(scopeComponent!=null ? " ("+scopeComponent+" from "+entity()+")" : "")));
320+
(scopeComponent!=null ? " ("+scopeComponent+" from "+entity()+")" : ""))));
311321
}
312322

313323
private ExecutionContext getExecutionContext() {

camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.brooklyn.util.guava.Maybe;
4949
import org.apache.brooklyn.util.text.Identifiers;
5050
import org.apache.brooklyn.util.time.Duration;
51+
import org.testng.Assert;
5152
import org.testng.annotations.AfterMethod;
5253
import org.testng.annotations.BeforeMethod;
5354
import org.testng.annotations.Test;
@@ -296,14 +297,13 @@ public void testUrlEncode() throws Exception {
296297
@Test
297298
public void testEntityNotFound() throws Exception {
298299
BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.entity("myIdDoesNotExist");
300+
Maybe<?> actualValue = execDslImmediately(dsl, Entity.class, app, true);
301+
Assert.assertTrue(actualValue.isAbsent());
299302
try {
300-
Maybe<?> actualValue = execDslImmediately(dsl, Entity.class, app, true);
303+
actualValue.get();
301304
Asserts.shouldHaveFailedPreviously("actual="+actualValue);
302305
} catch (Exception e) {
303-
NoSuchElementException nsee = Exceptions.getFirstThrowableOfType(e, NoSuchElementException.class);
304-
if (nsee == null) {
305-
throw e;
306-
}
306+
Asserts.expectedFailureOfType(e, NoSuchElementException.class);
307307
}
308308
}
309309

@@ -365,6 +365,7 @@ public DslTestWorker satisfiedAsynchronously(boolean val) {
365365
return this;
366366
}
367367

368+
@SuppressWarnings("unused") // kept in case useful for additional tests
368369
public DslTestWorker wrapInTaskForImmediately(boolean val) {
369370
wrapInTaskForImmediately = val;
370371
return this;

core/src/main/java/org/apache/brooklyn/core/config/internal/AbstractConfigMapImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import javax.annotation.Nullable;
3131

3232
import org.apache.brooklyn.api.mgmt.ExecutionContext;
33+
import org.apache.brooklyn.api.mgmt.TaskFactory;
3334
import org.apache.brooklyn.api.objs.BrooklynObject;
3435
import org.apache.brooklyn.config.ConfigInheritance;
3536
import org.apache.brooklyn.config.ConfigInheritances;
@@ -231,7 +232,7 @@ public Maybe<Object> getConfigRaw(ConfigKey<?> key, boolean includeInherited) {
231232
}
232233

233234
protected Object coerceConfigVal(ConfigKey<?> key, Object v) {
234-
if ((v instanceof Future) || (v instanceof DeferredSupplier)) {
235+
if ((v instanceof Future) || (v instanceof DeferredSupplier) || (v instanceof TaskFactory)) {
235236
// no coercion for these (coerce on exit)
236237
return v;
237238
} else if (key instanceof StructuredConfigKey) {

core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.slf4j.LoggerFactory;
4747

4848
import com.google.common.base.Function;
49-
import com.google.common.base.Supplier;
5049
import com.google.common.collect.Iterables;
5150

5251
/**
@@ -99,7 +98,8 @@ public ExecutionManager getExecutionManager() {
9998
@Override
10099
public Set<Task<?>> getTasks() { return executionManager.getTasksWithAllTags(tags); }
101100

102-
/** performs execution without spawning a new task thread, though it does temporarily set a fake task for the purpose of getting context */
101+
/** performs execution without spawning a new task thread, though it does temporarily set a fake task for the purpose of getting context;
102+
* currently supports suppliers or callables */
103103
@SuppressWarnings("unchecked")
104104
@Override
105105
public <T> Maybe<T> getImmediately(Object callableOrSupplier) {
@@ -110,17 +110,15 @@ public <T> Maybe<T> getImmediately(Object callableOrSupplier) {
110110

111111
Task<?> previousTask = BasicExecutionManager.getPerThreadCurrentTask().get();
112112
if (previousTask!=null) fakeTaskForContext.setSubmittedByTask(previousTask);
113+
fakeTaskForContext.cancel();
113114
try {
114115
BasicExecutionManager.getPerThreadCurrentTask().set(fakeTaskForContext);
115116

116-
if ((callableOrSupplier instanceof Supplier) && !(callableOrSupplier instanceof ImmediateSupplier)) {
117-
callableOrSupplier = new InterruptingImmediateSupplier<>((Supplier<Object>)callableOrSupplier);
117+
if (!(callableOrSupplier instanceof ImmediateSupplier)) {
118+
callableOrSupplier = InterruptingImmediateSupplier.of(callableOrSupplier);
118119
}
119-
if (callableOrSupplier instanceof ImmediateSupplier) {
120-
return ((ImmediateSupplier<T>)callableOrSupplier).getImmediately();
121-
}
122-
// TODO could add more types here
123-
throw new IllegalArgumentException("Type "+callableOrSupplier.getClass()+" not supported for getImmediately (instance "+callableOrSupplier+")");
120+
return ((ImmediateSupplier<T>)callableOrSupplier).getImmediately();
121+
124122
} finally {
125123
BasicExecutionManager.getPerThreadCurrentTask().set(previousTask);
126124
}

core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020

2121
import org.apache.brooklyn.util.guava.Maybe;
2222

23+
import com.google.common.base.Supplier;
24+
2325
/**
24-
* A class that supplies objects of a single type, without blocking for any significant length
25-
* of time.
26+
* A {@link Supplier} that has an extra method capable of supplying a value immediately or an absent if definitely not available,
27+
* or throwing an {@link ImmediateUnsupportedException} if it cannot determine whether a value is immediately available.
2628
*/
27-
public interface ImmediateSupplier<T> {
29+
public interface ImmediateSupplier<T> extends Supplier<T> {
2830

2931
/**
3032
* Indicates that a supplier does not support immediate evaluation,

core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818
*/
1919
package org.apache.brooklyn.util.core.task;
2020

21-
import java.util.NoSuchElementException;
21+
import java.util.concurrent.Callable;
2222
import java.util.concurrent.Semaphore;
2323

2424
import org.apache.brooklyn.util.exceptions.Exceptions;
2525
import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
2626
import org.apache.brooklyn.util.guava.Maybe;
2727

28+
import com.google.common.annotations.Beta;
2829
import com.google.common.base.Supplier;
2930

3031
/**
@@ -40,6 +41,7 @@
4041
* will throw if the thread is interrupted. Typically there are workarounds, for instance:
4142
* <code>if (semaphore.tryAcquire()) semaphore.acquire();</code>.
4243
*/
44+
@Beta
4345
public class InterruptingImmediateSupplier<T> implements ImmediateSupplier<T>, DeferredSupplier<T> {
4446

4547
final Supplier<T> nestedSupplier;
@@ -69,6 +71,33 @@ public Maybe<T> getImmediately() {
6971
public T get() {
7072
return nestedSupplier.get();
7173
}
72-
74+
75+
@SuppressWarnings("unchecked")
76+
public static <T> InterruptingImmediateSupplier<T> of(final Object o) {
77+
if (o instanceof Supplier) {
78+
return new InterruptingImmediateSupplier<T>((Supplier<T>)o);
79+
} else if (o instanceof Callable) {
80+
return new InterruptingImmediateSupplier<T>(new Supplier<T>() {
81+
@Override
82+
public T get() {
83+
try {
84+
return ((Callable<T>)o).call();
85+
} catch (Exception e) {
86+
throw Exceptions.propagate(e);
87+
}
88+
}
89+
});
90+
} else if (o instanceof Runnable) {
91+
return new InterruptingImmediateSupplier<T>(new Supplier<T>() {
92+
@Override
93+
public T get() {
94+
((Runnable)o).run();
95+
return null;
96+
}
97+
});
98+
} else {
99+
throw new UnsupportedOperationException("Type "+o.getClass()+" not supported as InterruptingImmediateSupplier (instance "+o+")");
100+
}
101+
}
73102

74103
}

core/src/main/java/org/apache/brooklyn/util/core/task/TaskTags.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public static boolean isInessential(Task<?> task) {
6262
}
6363

6464
public static boolean hasTag(Task<?> task, Object tag) {
65+
if (task==null) return false;
6566
return task.getTags().contains(tag);
6667
}
6768

core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.brooklyn.api.mgmt.ExecutionContext;
3030
import org.apache.brooklyn.api.mgmt.Task;
3131
import org.apache.brooklyn.api.mgmt.TaskAdaptable;
32+
import org.apache.brooklyn.api.mgmt.TaskFactory;
3233
import org.apache.brooklyn.core.entity.EntityInternal;
3334
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
3435
import org.apache.brooklyn.util.core.flags.TypeCoercions;
@@ -322,6 +323,10 @@ public Maybe<T> getMaybe() {
322323
return result;
323324
}
324325

326+
protected boolean isEvaluatingImmediately() {
327+
return immediately || BrooklynTaskTags.hasTag(Tasks.current(), BrooklynTaskTags.IMMEDIATE_TASK_TAG);
328+
}
329+
325330
@SuppressWarnings({ "unchecked", "rawtypes" })
326331
protected Maybe<T> getMaybeInternal() {
327332
if (started.getAndSet(true))
@@ -352,11 +357,11 @@ protected Maybe<T> getMaybeInternal() {
352357

353358
//if the expected type is a closure or map and that's what we have, we're done (or if it's null);
354359
//but not allowed to return a future or DeferredSupplier as the resolved value
355-
if (v==null || (!forceDeep && type.isInstance(v) && !Future.class.isInstance(v) && !DeferredSupplier.class.isInstance(v)))
360+
if (v==null || (!forceDeep && type.isInstance(v) && !Future.class.isInstance(v) && !DeferredSupplier.class.isInstance(v) && !TaskFactory.class.isInstance(v)))
356361
return Maybe.of((T) v);
357362

358363
try {
359-
if (immediately && v instanceof ImmediateSupplier) {
364+
if (isEvaluatingImmediately() && v instanceof ImmediateSupplier) {
360365
final ImmediateSupplier<Object> supplier = (ImmediateSupplier<Object>) v;
361366
try {
362367
Maybe<Object> result = exec.getImmediately(supplier);
@@ -366,12 +371,23 @@ protected Maybe<T> getMaybeInternal() {
366371
? recursive
367372
? new ValueResolver(result.get(), type, this).getMaybe()
368373
: result
369-
: Maybe.<T>absent();
374+
: result;
370375
} catch (ImmediateSupplier.ImmediateUnsupportedException e) {
371376
log.debug("Unable to resolve-immediately for "+description+" ("+v+"); falling back to executing with timeout", e);
372377
}
373378
}
374379

380+
// TODO if evaluating immediately should use a new ExecutionContext.submitImmediate(...)
381+
// and sets a timeout but which wraps a task but does not spawn a new thread
382+
383+
if ((v instanceof TaskFactory<?>) && !(v instanceof DeferredSupplier)) {
384+
v = ((TaskFactory<?>)v).newTask();
385+
BrooklynTaskTags.setTransient(((TaskAdaptable<?>)v).asTask());
386+
if (isEvaluatingImmediately()) {
387+
BrooklynTaskTags.addTagDynamically( ((TaskAdaptable<?>)v).asTask(), BrooklynTaskTags.IMMEDIATE_TASK_TAG );
388+
}
389+
}
390+
375391
//if it's a task or a future, we wait for the task to complete
376392
if (v instanceof TaskAdaptable<?>) {
377393
//if it's a task, we make sure it is submitted
@@ -382,7 +398,7 @@ protected Maybe<T> getMaybeInternal() {
382398
}
383399
if (!task.getTags().contains(BrooklynTaskTags.TRANSIENT_TASK_TAG)) {
384400
// mark this non-transient, because this value is usually something set e.g. in config
385-
// (ideally we'd discourage this in favour of task factories which can be transiently interrupted)
401+
// (should discourage this in favour of task factories which can be transiently interrupted?)
386402
BrooklynTaskTags.addTagDynamically(task, BrooklynTaskTags.NON_TRANSIENT_TASK_TAG);
387403
}
388404
exec.submit(task);

core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -313,12 +313,12 @@ private DeferredSupplier<String> deferredSupplier() {
313313
return new DeferredSupplier<String>() {
314314
@Override public String get() {
315315
try {
316-
log.info("acquiring");
316+
log.trace("acquiring");
317317
if (!latch.tryAcquire()) latch.acquire();
318318
latch.release();
319-
log.info("acquired and released");
319+
log.trace("acquired and released");
320320
} catch (InterruptedException e) {
321-
log.info("interrupted");
321+
log.trace("interrupted");
322322
throw Exceptions.propagate(e);
323323
}
324324
return "myval";
@@ -333,21 +333,21 @@ protected void runGetConfigNonBlockingInKey() throws Exception {
333333
TestEntity entity = (TestEntity) mgmt.getEntityManager().createEntity(EntitySpec.create(TestEntity.class)
334334
.configure((ConfigKey<Object>)(ConfigKey<?>)TestEntity.CONF_NAME, blockingVal));
335335

336-
log.info("get non-blocking");
336+
log.trace("get non-blocking");
337337
// Will initially return absent, because task is not done
338338
assertTrue(entity.config().getNonBlocking(TestEntity.CONF_NAME).isAbsent());
339-
log.info("got absent");
339+
log.trace("got absent");
340340

341341
latch.release();
342342

343343
// Can now finish task, so will return expectedVal
344-
log.info("get blocking");
344+
log.trace("get blocking");
345345
assertEquals(entity.config().get(TestEntity.CONF_NAME), expectedVal);
346-
log.info("got blocking");
346+
log.trace("got blocking");
347347
assertEquals(entity.config().getNonBlocking(TestEntity.CONF_NAME).get(), expectedVal);
348348

349349
latch.acquire();
350-
log.info("finished");
350+
log.trace("finished");
351351
}
352352

353353
protected void runGetConfigNonBlockingInMap() throws Exception {
@@ -526,7 +526,7 @@ public String call() {
526526
assertEquals(getConfigFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS), "abc");
527527
}
528528

529-
@Test
529+
@Test(groups="Integration") // takes 0.5s
530530
public void testGetConfigWithExecutedTaskWaitsForResult() throws Exception {
531531
LatchingCallable<String> work = new LatchingCallable<String>("abc");
532532
Task<String> task = executionManager.submit(work);
@@ -548,7 +548,7 @@ public String call() {
548548
assertEquals(work.callCount.get(), 1);
549549
}
550550

551-
@Test
551+
@Test(groups="Integration") // takes 0.5s
552552
public void testGetConfigWithUnexecutedTaskIsExecutedAndWaitsForResult() throws Exception {
553553
LatchingCallable<String> work = new LatchingCallable<String>("abc");
554554
Task<String> task = new BasicTask<String>(work);

0 commit comments

Comments
 (0)