Skip to content

Commit 6534739

Browse files
authored
fix mqtt multiple client bug (#79)
1 parent 8ab0501 commit 6534739

6 files changed

Lines changed: 185 additions & 47 deletions

File tree

app/src/main/AndroidManifest.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
<uses-permission android:name="android.permission.ACCESS_COARSE_LOCATION" />
2424
<uses-permission android:name="android.permission.ACCESS_BACKGROUND_LOCATION" />
2525
<uses-permission android:name="android.permission.READ_PHONE_STATE" />
26+
<uses-permission android:name="android.permission.MANAGE_EXTERNAL_STORAGE" />
2627
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
2728
<uses-permission android:name="android.permission.INTERNET" />
2829
<uses-permission android:name="android.permission.HIDE_OVERLAY_WINDOWS" />

app/src/main/java/de/fraunhofer/fokus/OpenMobileNetworkToolkit/MQTT/MQTTService.java

Lines changed: 169 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package de.fraunhofer.fokus.OpenMobileNetworkToolkit.MQTT;
1010

1111
import android.app.Notification;
12+
import android.app.NotificationChannel;
1213
import android.app.NotificationManager;
1314
import android.app.PendingIntent;
1415
import android.app.Service;
@@ -20,6 +21,7 @@
2021
import android.os.Handler;
2122
import android.os.IBinder;
2223
import android.util.Log;
24+
import android.widget.Toast;
2325

2426
import androidx.annotation.Nullable;
2527
import androidx.core.app.NotificationCompat;
@@ -29,19 +31,27 @@
2931
import androidx.work.multiprocess.RemoteWorkManager;
3032

3133

34+
import com.hivemq.client.mqtt.MqttClientState;
3235
import com.hivemq.client.mqtt.datatypes.MqttQos;
36+
import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedContext;
37+
import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext;
3338
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
3439
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
3540
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck;
3641
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator;
3742

43+
import org.jetbrains.annotations.NotNull;
44+
3845
import java.net.InetSocketAddress;
46+
import java.net.URI;
3947
import java.nio.charset.StandardCharsets;
4048
import java.util.ArrayList;
49+
import java.util.EnumSet;
4150
import java.util.HashMap;
4251
import java.util.UUID;
4352
import java.util.concurrent.CompletableFuture;
4453
import java.util.concurrent.TimeUnit;
54+
import java.util.function.Consumer;
4555

4656
import de.fraunhofer.fokus.OpenMobileNetworkToolkit.CustomEventListener;
4757
import de.fraunhofer.fokus.OpenMobileNetworkToolkit.MQTT.Handler.Iperf3Handler;
@@ -77,69 +87,152 @@ private void setupSharedPreferences(){
7787
mqttSP = spg.getSharedPreference(SPType.MQTT);
7888
mqttSP.registerOnSharedPreferenceChangeListener((sharedPreferences, key) -> {
7989
if(key == null) return;
80-
if (key.equals("mqtt_host")) {
81-
Log.d(TAG, "MQTT Host update: " + sharedPreferences.getString("mqtt_host", ""));
82-
client.disconnect();
83-
createClient();
84-
createNotification();
90+
isEnabled = sharedPreferences.getBoolean("enable_mqtt", false);
91+
switch (key){
92+
case "mqtt_host":
93+
if(!isEnabled) return;
94+
Log.d(TAG, "mqtt_host: " + sharedPreferences.getString("mqtt_host", ""));
95+
disconnectClient();
96+
createClient();
97+
createNotification();
98+
break;
99+
case "enable_mqtt":
100+
Log.d(TAG, "enable_mqtt: " + isEnabled);
101+
if(!isEnabled && client != null){
102+
this.onDestroy();
103+
}
104+
break;
85105
}
106+
86107
});
87108
}
88-
89-
public void createClient(){
90-
String addressString = mqttSP.getString("mqtt_host", "localhost:1883");
91-
String host = null;
92-
int port = -1;
109+
private boolean isValidUrl(String addressString) {
93110
try {
94-
host = addressString.split(":")[0];
95-
port = Integer.parseInt(addressString.split(":")[1]);
111+
new java.net.URL(addressString);
112+
return true;
96113
} catch (Exception e) {
97-
Log.e(TAG, "createClient: Invalid address string: " + addressString);
114+
return false;
115+
}
116+
}
117+
118+
private boolean isProtocolIpPort(String addressString) {
119+
// Example: mqtt://192.168.1.1:1883
120+
String regex = "^[\\d.]+:\\d+$";
121+
return addressString.matches(regex);
122+
}
123+
124+
125+
public String mQTTClientStateToString(MqttClientState state) {
126+
switch (state) {
127+
case CONNECTED:
128+
return "Connected";
129+
case CONNECTING:
130+
return "Connecting";
131+
case DISCONNECTED:
132+
return "Disconnected";
133+
case DISCONNECTED_RECONNECT:
134+
return "Disconnected_Reconnecting";
135+
case CONNECTING_RECONNECT:
136+
return "Connecting_Reconnecting";
137+
default:
138+
return "Unknown";
139+
}
140+
}
141+
142+
143+
public void createClient() {
144+
String addressString = mqttSP.getString("mqtt_host", "");
145+
Log.d(TAG, "createClient: creating client...");
146+
if (addressString.isBlank()) {
147+
Log.e(TAG, "createClient: MQTT Host is empty");
148+
spg.getSharedPreference(SPType.MQTT).edit().putBoolean("enable_mqtt", false).apply();
149+
client = null;
98150
return;
99151
}
100-
if(host == null || port == -1){
101-
Log.e(TAG, "createClient: Invalid address string: " + addressString);
152+
153+
if (!isValidUrl(addressString) && !isProtocolIpPort(addressString)) {
154+
Log.e(TAG, "createClient: MQTT Host is not a valid URL or IP:Port");
155+
Toast.makeText(context, "MQTT Host is not a valid URL or IP:Port", Toast.LENGTH_SHORT).show();
156+
spg.getSharedPreference(SPType.MQTT).edit().putBoolean("enable_mqtt", false).apply();
157+
client = null;
158+
return;
159+
}
160+
161+
String host;
162+
int port;
163+
164+
try {
165+
if (isProtocolIpPort(addressString)) {
166+
// Case: raw host:port
167+
String[] hostPort = addressString.split(":");
168+
host = hostPort[0];
169+
port = Integer.parseInt(hostPort[1]);
170+
} else {
171+
// Case: URL with scheme
172+
URI uri = new URI(addressString);
173+
host = uri.getHost();
174+
port = uri.getPort() == -1 ? 1883 : uri.getPort(); // default MQTT port
175+
}
176+
} catch (Exception e) {
177+
Log.e(TAG, "createClient: Invalid MQTT address", e);
178+
spg.getSharedPreference(SPType.MQTT).edit().putBoolean("enable_mqtt", false).apply();
179+
client = null;
102180
return;
103181
}
104-
InetSocketAddress address = new InetSocketAddress(host, port);
182+
183+
InetSocketAddress address = InetSocketAddress.createUnresolved(host, port);
184+
if(client != null){
185+
disconnectClient();
186+
client = null;
187+
}
105188
client = Mqtt5Client.builder()
106189
.identifier(deviceName)
107190
.serverAddress(address)
108191
.automaticReconnect()
109192
.initialDelay(5, TimeUnit.SECONDS)
110193
.maxDelay(30, TimeUnit.SECONDS)
111194
.applyAutomaticReconnect()
112-
.addConnectedListener(context -> {
113-
Log.i(TAG, "createClient: Connected to MQTT server");
114-
createNotification();
195+
.addConnectedListener(ctx -> {
196+
Log.i(TAG, "addConnectedListener: Connected to MQTT server");
197+
createNotification(null, ctx);
115198
publishToTopic(String.format("device/%s/status", deviceName), "1", false);
199+
Log.d(TAG, "addConnectedListener: "+mQTTClientStateToString(client.getState()));
116200
})
117-
.addDisconnectedListener(context -> {
118-
Log.i(TAG, "createClient: Disconnected from MQTT server");
119-
createNotification();
201+
.addDisconnectedListener(ctx -> {
202+
Log.i(TAG, "addDisconnectedListener: Disconnected from MQTT server");
203+
createNotification(ctx, null);
204+
120205
})
121206
.willPublish()
122-
.topic(String.format("device/%s/status", deviceName))
123-
.qos(MqttQos.EXACTLY_ONCE)
124-
.payload("0".getBytes())
125-
.retain(true)
126-
.payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
127-
.contentType("text/plain")
128-
.noMessageExpiry()
129-
.applyWillPublish()
207+
.topic(String.format("device/%s/status", deviceName))
208+
.qos(MqttQos.EXACTLY_ONCE)
209+
.payload("0".getBytes())
210+
.retain(true)
211+
.payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
212+
.contentType("text/plain")
213+
.noMessageExpiry()
214+
.applyWillPublish()
130215
.buildAsync();
131-
132-
Log.i(TAG, "createClient: Client created with address: " + addressString);
216+
Log.i(TAG, "createClient: Client created with address: " + host + ":" + port);
133217
}
134218

135-
private void createNotification(){
219+
private void createNotification() {
220+
createNotification(null, null);
221+
}
222+
private void createNotification(MqttClientDisconnectedContext mqttClientDisconnectedContext,
223+
MqttClientConnectedContext mqttClientConnectedContext) {
136224
StringBuilder s = new StringBuilder();
137225
String address = spg.getSharedPreference(SPType.MQTT).getString("mqtt_host", "None");
138226
if(address.equals("None")){
139227
s.append("MQTT Host: None\n");
140228
} else {
141229
s.append("Host: ").append(address).append("\n");
142230
s.append("State: ").append(client.getState().toString()).append("\n");
231+
if(mqttClientDisconnectedContext != null){
232+
if(mqttClientDisconnectedContext.getCause() != null){
233+
s.append("Cause: ").append(mqttClientDisconnectedContext.getCause().getMessage()).append("\n");
234+
}
235+
}
143236
}
144237
builder.setStyle(new NotificationCompat.BigTextStyle()
145238
.bigText(s));
@@ -149,9 +242,21 @@ private void createNotification(){
149242
@Override
150243
public void onCreate() {
151244
super.onCreate();
245+
Log.d(TAG, "onCreate: Creating MQTTService");
152246
nm = getSystemService(NotificationManager.class);
153247
Intent notificationIntent = new Intent(this, MainActivity.class);
154248
PendingIntent pendingIntent = PendingIntent.getActivity(this, 0, notificationIntent, PendingIntent.FLAG_IMMUTABLE);
249+
250+
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
251+
NotificationChannel channel = new NotificationChannel(
252+
"OMNT_notification_channel",
253+
"OMNT MQTT Service",
254+
NotificationManager.IMPORTANCE_MAX
255+
);
256+
nm.createNotificationChannel(channel);
257+
}
258+
setupSharedPreferences();
259+
155260
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.S) {
156261
// create notification
157262
builder = new NotificationCompat.Builder(this, "OMNT_notification_channel")
@@ -184,19 +289,33 @@ public void publishToTopic(String topic, String message, boolean retain){
184289
.retain(retain)
185290
.send();
186291
}
187-
292+
private boolean isConnected(){
293+
if(client == null){
294+
Log.e(TAG, "isConnected: Client is null");
295+
return false;
296+
}
297+
return client.getState().isConnected();
298+
}
188299
public void disconnectClient(){
189-
CompletableFuture<Void> disconnect = client.disconnect();
190-
disconnect.whenComplete((aVoid, throwable) -> {
191-
if(throwable != null){
192-
Log.e(TAG, "disconnectClient: Error disconnecting from MQTT server: " + throwable.getMessage());
193-
} else {
194-
Log.i(TAG, "disconnectClient: Disconnected from MQTT server");
195-
}
196-
});
300+
Log.d(TAG, "disconnectClient: starting to disconnect client....");
301+
if(isConnected()){
302+
303+
CompletableFuture<Void> disconnect = client.disconnect();
304+
disconnect.whenComplete((aVoid, throwable) -> {
305+
if(throwable != null){
306+
Log.e(TAG, "disconnectClient: Error disconnecting from MQTT server: " + throwable.getMessage());
307+
} else {
308+
Log.i(TAG, "disconnectClient: Disconnected from MQTT server");
309+
}
310+
311+
});
312+
}
313+
client = null;
314+
nm.cancel(3);
197315
}
198316

199317
public void connectClient(){
318+
Log.d(TAG, "connectClient: Connecting to MQTT server...");
200319

201320
CompletableFuture<Mqtt5ConnAck> connAck = client.connectWith()
202321
.keepAlive(1)
@@ -435,16 +554,21 @@ private void subscribeToAllTopics(){
435554
subsribetoTopic(String.format("device/%s/#", deviceName));
436555
}
437556

557+
public void onDestroy(){
558+
disconnectClient();
559+
client = null;
560+
Log.d(TAG, "onDestroy: Destroying MQTTService");
561+
super.onDestroy();
562+
563+
}
438564

439565

440566
@Override
441567
public int onStartCommand(Intent intent, int flags, int startId) {
442-
Log.d(TAG, "onStartCommand: Start MQTT service");
568+
Log.d(TAG, "onStartCommand: Start MQTTservice");
443569
context = getApplicationContext();
444-
mqttSP = SharedPreferencesGrouper.getInstance(context).getSharedPreference(SPType.MQTT);
445570
deviceName = SharedPreferencesGrouper.getInstance(context).getSharedPreference(SPType.MAIN).getString("device_name", "null").strip();
446571
startForeground(3, builder.build());
447-
setupSharedPreferences();
448572
createClient();
449573
if(client == null){
450574
Log.e(TAG, "onStartCommand: Client is null");

app/src/main/java/de/fraunhofer/fokus/OpenMobileNetworkToolkit/SettingPreferences/MQTTSettingsFragment.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public void onSharedPreferenceChanged(SharedPreferences sharedPreferences, @Null
4343
if (key.equals("enable_mqtt")) {
4444
boolean logger = sharedPreferences.getBoolean("enable_mqtt", false);
4545
Log.d(TAG, "Logger update: " + logger);
46+
_switch.setChecked(logger);
4647
}
4748

4849

app/src/main/java/de/fraunhofer/fokus/OpenMobileNetworkToolkit/SettingPreferences/SettingsFragment.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import android.os.Build;
1313
import android.os.Bundle;
1414
import android.telephony.SubscriptionInfo;
15+
import android.util.Log;
1516
import android.widget.Toast;
1617

1718
import androidx.activity.OnBackPressedCallback;
@@ -33,6 +34,8 @@
3334

3435
public class SettingsFragment extends PreferenceFragmentCompat {
3536

37+
private static final String TAG = "SettingsFragment";
38+
3639
@Override
3740
public void onCreatePreferences(Bundle savedInstanceState, String rootKey) {
3841
SharedPreferencesGrouper spg = SharedPreferencesGrouper.getInstance(requireContext());
@@ -64,6 +67,15 @@ public void onCreatePreferences(Bundle savedInstanceState, String rootKey) {
6467
}
6568
}
6669

70+
for (String key : pfm.getSharedPreferences().getAll().keySet()) {
71+
Preference pref = pfm.findPreference(key);
72+
if (pref != null) {
73+
pref.setOnPreferenceChangeListener((preference, newValue) -> {
74+
Log.d(TAG, "Preference changed: " + preference.getKey() + " -> " + newValue);
75+
return true;
76+
});
77+
}
78+
}
6779
Preference button = pfm.findPreference("reset_modem");
6880
if (button != null) {
6981
if (GlobalVars.getInstance().isCarrier_permissions()) {

app/src/main/res/values/strings.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@
121121
<string name="mqtt_client_password">MQTT Client Password</string>
122122
<string name="mqtt_host_dialog_title">MQTT-Broker Address, including Port</string>
123123
<string name="mqtt_host">MQTT-Broker Address</string>
124-
<string name="mqtt_host_hint">tcp://192.168.213.89:1883</string>
124+
<string name="mqtt_host_hint">192.168.213.89:1883</string>
125125
<string name="mqtt_service_running">MQTT Service</string>
126126

127127

docs/preferences.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ _Section to set Credentials for MQTT._
9292

9393
| Key | Title | Summary | Default Value |
9494
| --- | ----- | ------- | ------------- |
95-
| **mqtt_host** | MQTT-Broker Address | MQTT Broker Address | `tcp://192.168.213.89:1883` |
95+
| **mqtt_host** | MQTT-Broker Address | MQTT Broker Address | `192.168.213.89:1883` |
9696
| **mqtt_client_username** | MQTT Client Username | MQTT Username | `USERNAME` |
9797
| **mqtt_client_password** | MQTT Client Password | MQTT Client Password. | `PASSWORD` |
9898

0 commit comments

Comments
 (0)