Skip to content
This repository was archived by the owner on Feb 16, 2024. It is now read-only.

Commit b5bf987

Browse files
[BAHIR-183] HDFS based MQTT client persistence
1 parent c51853d commit b5bf987

15 files changed

Lines changed: 351 additions & 725 deletions

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ Furthermore, to generate scaladocs for each module:
6161

6262
`$ mvn package`
6363

64-
Scaladocs is generated in, `MODULE_NAME/target/site/scaladocs/index.html`. __ Where `MODULE_NAME` is one of, `sql-streaming-mqtt`, `streaming-akka`, `streaming-mqtt`, `streaming-zeromq`, `streaming-twitter`. __
64+
Scaladocs is generated in `${MODULE_NAME}/target/site/scaladocs/index.html`, where `MODULE_NAME` is one of `sql-streaming-mqtt`, `streaming-akka`, `streaming-mqtt`, `streaming-zeromq` or `streaming-twitter`.
6565

6666
## A note about Apache Spark integration
6767

sql-streaming-mqtt/README.md

Lines changed: 25 additions & 25 deletions
Large diffs are not rendered by default.

sql-streaming-mqtt/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,4 @@
1616
#
1717

1818
org.apache.bahir.sql.streaming.mqtt.MQTTStreamSinkProvider
19-
org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider
20-
org.apache.spark.sql.mqtt.HDFSMQTTSourceProvider
19+
org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider

sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ private[mqtt] object CachedMQTTClient extends Logging {
6666

6767
private def createMqttClient(config: Map[String, String]):
6868
(MqttClient, MqttClientPersistence) = {
69-
val (brokerUrl, clientId, _, persistence, mqttConnectOptions, _, _, _, _) =
69+
val (brokerUrl, clientId, _, persistence, mqttConnectOptions, _) =
7070
MQTTUtils.parseConfigParams(config)
7171
val client = new MqttClient(brokerUrl, clientId, persistence)
7272
val callback = new MqttCallbackExtended() {

sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class MQTTDataWriter(config: mutable.Map[String, String]) extends DataWriter[Int
6868
private lazy val publishBackoff: Long =
6969
SparkEnv.get.conf.getTimeAsMs("spark.mqtt.client.publish.backoff", "5s")
7070

71-
private lazy val (_, _, topic, _, _, qos, _, _, _) = MQTTUtils.parseConfigParams(config.toMap)
71+
private lazy val (_, _, topic, _, _, qos) = MQTTUtils.parseConfigParams(config.toMap)
7272

7373
override def write(record: InternalRow): Unit = {
7474
val client = CachedMQTTClient.getOrCreate(config.toMap)

sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,6 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc
100100
private var startOffset: OffsetV2 = _
101101
private var endOffset: OffsetV2 = _
102102

103-
/* Older than last N messages, will not be checked for redelivery. */
104-
val backLog = options.getInt("autopruning.backlog", 500)
105-
106103
private[mqtt] val store = new LocalMessageStore(persistence)
107104

108105
private[mqtt] val messages = new TrieMap[Long, MQTTMessage]
@@ -231,7 +228,6 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc
231228
/** Stop this source. */
232229
override def stop(): Unit = synchronized {
233230
client.disconnect()
234-
persistence.close()
235231
client.close()
236232
}
237233

@@ -250,7 +246,7 @@ class MQTTStreamSourceProvider extends DataSourceV2
250246
}
251247

252248
import scala.collection.JavaConverters._
253-
val (brokerUrl, clientId, topic, persistence, mqttConnectOptions, qos, _, _, _) =
249+
val (brokerUrl, clientId, topic, persistence, mqttConnectOptions, qos) =
254250
MQTTUtils.parseConfigParams(collection.immutable.HashMap() ++ parameters.asMap().asScala)
255251

256252
new MQTTStreamSource(parameters, brokerUrl, persistence, topic, clientId,

sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.bahir.sql.streaming.mqtt
1919

2020
import java.util.Properties
2121

22+
import org.apache.hadoop.conf.Configuration
2223
import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttConnectOptions}
2324
import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence}
2425

@@ -45,7 +46,7 @@ object MQTTUtils extends Logging {
4546
)
4647

4748
def parseConfigParams(config: Map[String, String]):
48-
(String, String, String, MqttClientPersistence, MqttConnectOptions, Int, Long, Long, Int) = {
49+
(String, String, String, MqttClientPersistence, MqttConnectOptions, Int) = {
4950
def e(s: String) = new IllegalArgumentException(s)
5051
val parameters = CaseInsensitiveMap(config)
5152

@@ -54,6 +55,14 @@ object MQTTUtils extends Logging {
5455

5556
val persistence: MqttClientPersistence = parameters.get("persistence") match {
5657
case Some("memory") => new MemoryPersistence()
58+
case Some("hdfs") =>
59+
val hadoopConfig = new Configuration
60+
for (parameter <- parameters) {
61+
if (parameter._1.startsWith("hdfs.")) {
62+
hadoopConfig.set(parameter._1.replaceFirst("hdfs.", ""), parameter._2)
63+
}
64+
}
65+
new HdfsMqttClientPersistence( hadoopConfig )
5766
case _ => val localStorage: Option[String] = parameters.get("localStorage")
5867
localStorage match {
5968
case Some(x) => new MqttDefaultFilePersistence(x)
@@ -83,11 +92,6 @@ object MQTTUtils extends Logging {
8392
val autoReconnect: Boolean = parameters.getOrElse("autoReconnect", "false").toBoolean
8493
val maxInflight: Int = parameters.getOrElse("maxInflight", "60").toInt
8594

86-
val maxBatchMessageNum = parameters.getOrElse("maxBatchMessageNum", s"${Long.MaxValue}").toLong
87-
val maxBatchMessageSize = parameters.getOrElse("maxBatchMessageSize",
88-
s"${Long.MaxValue}").toLong
89-
val maxRetryNumber = parameters.getOrElse("maxRetryNum", "3").toInt
90-
9195
val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
9296
mqttConnectOptions.setAutomaticReconnect(autoReconnect)
9397
mqttConnectOptions.setCleanSession(cleanSession)
@@ -109,7 +113,6 @@ object MQTTUtils extends Logging {
109113
})
110114
mqttConnectOptions.setSSLProperties(sslProperties)
111115

112-
(brokerUrl, clientId, topic, persistence, mqttConnectOptions, qos,
113-
maxBatchMessageNum, maxBatchMessageSize, maxRetryNumber)
116+
(brokerUrl, clientId, topic, persistence, mqttConnectOptions, qos)
114117
}
115118
}

sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MessageStore.scala

Lines changed: 115 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@ package org.apache.bahir.sql.streaming.mqtt
2121
import java.io._
2222
import java.util
2323

24+
import org.apache.hadoop.conf.Configuration
25+
import org.apache.hadoop.fs.FileSystem
26+
import org.apache.hadoop.fs.Path
2427
import org.eclipse.paho.client.mqttv3.{MqttClientPersistence, MqttPersistable, MqttPersistenceException}
28+
import org.eclipse.paho.client.mqttv3.internal.MqttPersistentData
2529
import scala.util.Try
2630

2731
import org.apache.bahir.utils.Logging
@@ -45,24 +49,16 @@ trait MessageStore {
4549
}
4650

4751
private[mqtt] class MqttPersistableData(bytes: Array[Byte]) extends MqttPersistable {
48-
4952
override def getHeaderLength: Int = bytes.length
50-
5153
override def getHeaderOffset: Int = 0
52-
5354
override def getPayloadOffset: Int = 0
54-
5555
override def getPayloadBytes: Array[Byte] = null
56-
5756
override def getHeaderBytes: Array[Byte] = bytes
58-
5957
override def getPayloadLength: Int = 0
6058
}
6159

6260
trait Serializer {
63-
6461
def deserialize[T](x: Array[Byte]): T
65-
6662
def serialize[T](x: T): Array[Byte]
6763
}
6864

@@ -94,17 +90,14 @@ class JavaSerializer extends Serializer with Logging {
9490
null
9591
}
9692
}
93+
9794
}
9895

9996
object JavaSerializer {
100-
10197
private lazy val instance = new JavaSerializer()
102-
10398
def getInstance(): JavaSerializer = instance
104-
10599
}
106100

107-
108101
/**
109102
* A message store to persist messages received. This is not intended to be thread safe.
110103
* It uses `MqttDefaultFilePersistence` for storing messages on disk locally on the client.
@@ -148,3 +141,113 @@ private[mqtt] class LocalMessageStore(val persistentStore: MqttClientPersistence
148141
}
149142

150143
}
144+
145+
private[mqtt] class HdfsMqttClientPersistence(config: Configuration)
146+
extends MqttClientPersistence {
147+
148+
var rootPath: Path = _
149+
var fileSystem: FileSystem = _
150+
151+
override def open(clientId: String, serverURI: String): Unit = {
152+
try {
153+
rootPath = new Path("mqtt/" + clientId + "/" + serverURI.replaceAll("[^a-zA-Z0-9]", "_"))
154+
fileSystem = FileSystem.get(config)
155+
if (!fileSystem.exists(rootPath)) {
156+
fileSystem.mkdirs(rootPath)
157+
}
158+
}
159+
catch {
160+
case e: Exception => throw new MqttPersistenceException(e)
161+
}
162+
}
163+
164+
override def close(): Unit = {
165+
try {
166+
fileSystem.close()
167+
}
168+
catch {
169+
case e: Exception => throw new MqttPersistenceException(e)
170+
}
171+
}
172+
173+
override def put(key: String, persistable: MqttPersistable): Unit = {
174+
try {
175+
val path = getPath(key)
176+
val output = fileSystem.create(path)
177+
output.writeInt(persistable.getHeaderLength)
178+
if (persistable.getHeaderLength > 0) {
179+
output.write(persistable.getHeaderBytes)
180+
}
181+
output.writeInt(persistable.getPayloadLength)
182+
if (persistable.getPayloadLength > 0) {
183+
output.write(persistable.getPayloadBytes)
184+
}
185+
output.close()
186+
}
187+
catch {
188+
case e: Exception => throw new MqttPersistenceException(e)
189+
}
190+
}
191+
192+
override def get(key: String): MqttPersistable = {
193+
try {
194+
val input = fileSystem.open(getPath(key))
195+
val headerLength = input.readInt()
196+
val headerBytes: Array[Byte] = new Array[Byte](headerLength)
197+
input.read(headerBytes)
198+
val payloadLength = input.readInt()
199+
val payloadBytes: Array[Byte] = new Array[Byte](payloadLength)
200+
input.read(payloadBytes)
201+
input.close()
202+
new MqttPersistentData(
203+
key, headerBytes, 0, headerBytes.length, payloadBytes, 0, payloadBytes.length
204+
)
205+
}
206+
catch {
207+
case e: Exception => throw new MqttPersistenceException(e)
208+
}
209+
}
210+
211+
override def remove(key: String): Unit = {
212+
try {
213+
fileSystem.delete(getPath(key), false)
214+
}
215+
catch {
216+
case e: Exception => throw new MqttPersistenceException(e)
217+
}
218+
}
219+
220+
override def keys(): util.Enumeration[String] = {
221+
try {
222+
val iterator = fileSystem.listFiles(rootPath, false)
223+
new util.Enumeration[String]() {
224+
override def hasMoreElements: Boolean = iterator.hasNext
225+
override def nextElement(): String = iterator.next().getPath.getName
226+
}
227+
}
228+
catch {
229+
case e: Exception => throw new MqttPersistenceException(e)
230+
}
231+
}
232+
233+
override def clear(): Unit = {
234+
try {
235+
fileSystem.delete(rootPath, true)
236+
}
237+
catch {
238+
case e: Exception => throw new MqttPersistenceException(e)
239+
}
240+
}
241+
242+
override def containsKey(key: String): Boolean = {
243+
try {
244+
fileSystem.isFile(getPath(key))
245+
}
246+
catch {
247+
case e: Exception => throw new MqttPersistenceException(e)
248+
}
249+
}
250+
251+
private def getPath(key: String): Path = new Path(rootPath + "/" + key)
252+
253+
}

sql-streaming-mqtt/src/main/scala/org/apache/spark/sql/mqtt/HDFSMQTTSourceProvider.scala

Lines changed: 0 additions & 64 deletions
This file was deleted.

0 commit comments

Comments
 (0)