Skip to content

Commit 5e00113

Browse files
committed
Fix interception of outbound publish packets
The publish inbound interceptor usage was incorrect, the publish outbound interceptor ensures packet delivery can be prevented.
1 parent 2723551 commit 5e00113

7 files changed

Lines changed: 57 additions & 142 deletions

File tree

hivemq-krb/src/main/java/uk/co/amrc/factoryplus/hivemq_auth_krb/FPKrbAuthorizer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ public void authorizeSubscribe(
8484
.subscribe(result -> {
8585
if (result) {
8686
log.info("Successfully authorized subscription client {} for topic {}.", clientUsername, topic);
87-
provider.context.storeTopicMapping(clientUsername, topic);
8887
output.authorizeSuccessfully();
8988
} else {
9089
log.info("Subscription permission denied for user {} topic {}", clientUsername, topic);

hivemq-krb/src/main/java/uk/co/amrc/factoryplus/hivemq_auth_krb/FPKrbContext.java

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
public class FPKrbContext {
1818
private final ConcurrentHashMap<String, String> sessionMap = new ConcurrentHashMap<>();
19-
private final ConcurrentHashMap<String, Set<String>> topicToSubscribers = new ConcurrentHashMap<>();
2019
private static final @NotNull Logger log = LoggerFactory.getLogger(FPKrbMain.class);
2120
public FPServiceClient fplus;
2221

@@ -55,48 +54,6 @@ public void removeClientUserNameMapping(String clientId) {
5554
sessionMap.remove(clientId);
5655
}
5756

58-
/**
59-
*
60-
* @param clientId
61-
* @param topic
62-
*/
63-
public void storeTopicMapping(String clientId,String topic) {
64-
topicToSubscribers
65-
.computeIfAbsent(topic, k -> ConcurrentHashMap.newKeySet())
66-
.add(clientId);
67-
}
68-
69-
/**
70-
*
71-
* @param topic
72-
* @return
73-
*/
74-
public Set<String> getTopicMapping(String topic){
75-
Set<String> subscribers = new HashSet<>();
76-
77-
topicToSubscribers.forEachKey(1, k-> {
78-
if(matchesPermission(k, topic)){
79-
// Exact match subscribers
80-
Set<String> exactSubscribers = topicToSubscribers.get(k);
81-
if (exactSubscribers != null) {
82-
subscribers.addAll(exactSubscribers);
83-
}
84-
}
85-
});
86-
87-
return subscribers;
88-
}
89-
90-
/**
91-
*
92-
* @param clientId
93-
*/
94-
public void removeClientFromTopicMapping(String clientId){
95-
for (Set<String> subscribers : topicToSubscribers.values()) {
96-
subscribers.remove(clientId);
97-
}
98-
}
99-
10057
private FPServiceClient startServiceClient ()
10158
{
10259
var fplus = new FPServiceClient();

hivemq-krb/src/main/java/uk/co/amrc/factoryplus/hivemq_auth_krb/interceptors/FPKrbClientInitializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ public FPKrbClientInitializer(FPKrbContext context) {
1818

1919
@Override
2020
public void initialize(@NotNull InitializerInput initializerInput, @NotNull ClientContext clientContext) {
21-
clientContext.addPublishInboundInterceptor(new FPKrbInboundPublishInterceptor(this));
2221
clientContext.addUnsubscribeInboundInterceptor(new FPKrbUnsubscribeInboundInterceptor(this));
2322
clientContext.addDisconnectInboundInterceptor(new FPKrbDisconnectInboundInterceptor(this));
23+
clientContext.addPublishOutboundInterceptor(new FPKrbOutboundPublishInterceptor(this));
2424
}
2525
}

hivemq-krb/src/main/java/uk/co/amrc/factoryplus/hivemq_auth_krb/interceptors/FPKrbDisconnectInboundInterceptor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ public void onInboundDisconnect(@NotNull DisconnectInboundInput disconnectInboun
2929
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(() -> {
3030
try {
3131
var clientId = disconnectInboundInput.getClientInformation().getClientId();
32-
initializer.context.removeClientUserNameMapping(initializer.context.getUsername(clientId));
33-
initializer.context.removeClientFromTopicMapping(initializer.context.getUsername(clientId));
32+
initializer.context.removeClientUserNameMapping(clientId);
3433
} catch (final Exception e) {
3534
log.error("Disconnect inbound interception failed:", e);
3635
}

hivemq-krb/src/main/java/uk/co/amrc/factoryplus/hivemq_auth_krb/interceptors/FPKrbInboundPublishInterceptor.java

Lines changed: 0 additions & 93 deletions
This file was deleted.
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package uk.co.amrc.factoryplus.hivemq_auth_krb.interceptors;
2+
3+
import com.hivemq.extension.sdk.api.annotations.NotNull;
4+
import com.hivemq.extension.sdk.api.auth.parameter.TopicPermission;
5+
import com.hivemq.extension.sdk.api.interceptor.publish.PublishOutboundInterceptor;
6+
import com.hivemq.extension.sdk.api.interceptor.publish.parameter.PublishOutboundInput;
7+
import com.hivemq.extension.sdk.api.interceptor.publish.parameter.PublishOutboundOutput;
8+
import com.hivemq.extension.sdk.api.async.Async;
9+
import com.hivemq.extension.sdk.api.async.TimeoutFallback;
10+
import com.hivemq.extension.sdk.api.services.Services;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
import uk.co.amrc.factoryplus.hivemq_auth_krb.FPKrbAuthenticator;
14+
15+
import java.time.Duration;
16+
17+
import static uk.co.amrc.factoryplus.hivemq_auth_krb.AuthUtils.getACLforPrincipal;
18+
import static uk.co.amrc.factoryplus.hivemq_auth_krb.AuthUtils.isPermissionAllowed;
19+
20+
public class FPKrbOutboundPublishInterceptor implements PublishOutboundInterceptor {
21+
private FPKrbClientInitializer initializer;
22+
private static final @NotNull Logger log = LoggerFactory.getLogger(FPKrbAuthenticator.class);
23+
24+
public FPKrbOutboundPublishInterceptor(FPKrbClientInitializer initializer) {
25+
this.initializer = initializer;
26+
}
27+
28+
@Override
29+
public void onOutboundPublish(@NotNull PublishOutboundInput input, @NotNull PublishOutboundOutput output) {
30+
final Async<PublishOutboundOutput> async = output.async(Duration.ofSeconds(10), TimeoutFallback.FAILURE);
31+
Services.extensionExecutorService().submit(() -> {
32+
final String clientId = input.getClientInformation().getClientId();
33+
final String username = initializer.context.getUsername(clientId);
34+
final String topic = input.getPublishPacket().getTopic();
35+
isPermissionAllowed(getACLforPrincipal(username, initializer.fplus), topic, TopicPermission.MqttActivity.SUBSCRIBE)
36+
.subscribe(result -> {
37+
if (result) {
38+
log.info("Successfully intercepted outbound subscription client {} for topic {}.", username, topic);
39+
} else {
40+
log.info("Subscription outbound intercepted! permission denied for user {} topic {}", username, topic);
41+
output.preventPublishDelivery();
42+
// Clean up.
43+
initializer.context.removeClientUserNameMapping(clientId);
44+
System.out.println("Client " + username + " was kicked due to policy violation.");
45+
}
46+
async.resume();
47+
},
48+
error -> {
49+
log.error("Error occurred: {}", error.getMessage());
50+
async.resume();
51+
});
52+
});
53+
}
54+
}

hivemq-krb/src/main/java/uk/co/amrc/factoryplus/hivemq_auth_krb/interceptors/FPKrbUnsubscribeInboundInterceptor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ public void onInboundUnsubscribe(@NotNull UnsubscribeInboundInput unsubscribeInb
3030
try {
3131
// Remove the client from our context storage.
3232
log.info("Unsubscribe from {}", clientId);
33-
initializer.context.removeClientUserNameMapping(initializer.context.getUsername(clientId));
34-
initializer.context.removeClientFromTopicMapping(initializer.context.getUsername(clientId));
33+
initializer.context.removeClientUserNameMapping(clientId);
3534
} catch (final Exception e) {
3635
log.info("Unsubscribe inbound interception failed:", e);
3736
}

0 commit comments

Comments
 (0)