@@ -54,6 +54,22 @@ export default class MqttClient implements ProtocolClient {
5454
5555 private client ?: mqtt . MqttClient ;
5656
57+ private getBrokerUri ( href : string ) : string {
58+ const requestUri = new URL ( href ) ;
59+
60+ if ( href . startsWith ( "ws://" ) || href . startsWith ( "wss://" ) ) {
61+ return `${ requestUri . protocol } //${ requestUri . host } ` ;
62+ }
63+
64+ const compositeMatch = href . match ( / ^ ( [ a - z ] + ) \+ ( [ a - z ] + ) : \/ \/ / i) ;
65+ if ( compositeMatch ) {
66+ const transportScheme = compositeMatch [ 2 ] ;
67+ return `${ transportScheme } ://${ requestUri . host } ` ;
68+ }
69+
70+ return `${ this . scheme } ://${ requestUri . host } ` ;
71+ }
72+
5773 public async subscribeResource (
5874 form : MqttForm ,
5975 next : ( value : Content ) => void ,
@@ -62,7 +78,7 @@ export default class MqttClient implements ProtocolClient {
6278 ) : Promise < Subscription > {
6379 const contentType = form . contentType ?? ContentSerdes . DEFAULT ;
6480 const requestUri = new url . URL ( form . href ) ;
65- const brokerUri : string = ` ${ this . scheme } ://` + requestUri . host ;
81+ const brokerUri : string = this . getBrokerUri ( form . href ) ;
6682 // Keeping the path as the topic for compatibility reasons.
6783 // Current specification allows only form["mqv:filter"]
6884 const filter = requestUri . pathname . slice ( 1 ) ?? form [ "mqv:filter" ] ;
@@ -92,7 +108,7 @@ export default class MqttClient implements ProtocolClient {
92108 public async readResource ( form : MqttForm ) : Promise < Content > {
93109 const contentType = form . contentType ?? ContentSerdes . DEFAULT ;
94110 const requestUri = new url . URL ( form . href ) ;
95- const brokerUri : string = ` ${ this . scheme } ://` + requestUri . host ;
111+ const brokerUri : string = this . getBrokerUri ( form . href ) ;
96112 // Keeping the path as the topic for compatibility reasons.
97113 // Current specification allows only form["mqv:filter"]
98114 const filter = requestUri . pathname . slice ( 1 ) ?? form [ "mqv:filter" ] ;
@@ -124,7 +140,7 @@ export default class MqttClient implements ProtocolClient {
124140
125141 public async writeResource ( form : MqttForm , content : Content ) : Promise < void > {
126142 const requestUri = new url . URL ( form . href ) ;
127- const brokerUri = ` ${ this . scheme } :// ${ requestUri . host } ` ;
143+ const brokerUri = this . getBrokerUri ( form . href ) ;
128144 const topic = requestUri . pathname . slice ( 1 ) ?? form [ "mqv:topic" ] ;
129145
130146 let pool = this . pools . get ( brokerUri ) ;
@@ -147,7 +163,7 @@ export default class MqttClient implements ProtocolClient {
147163 public async invokeResource ( form : MqttForm , content : Content ) : Promise < Content > {
148164 const requestUri = new url . URL ( form . href ) ;
149165 const topic = requestUri . pathname . slice ( 1 ) ;
150- const brokerUri = ` ${ this . scheme } :// ${ requestUri . host } ` ;
166+ const brokerUri = this . getBrokerUri ( form . href ) ;
151167
152168 let pool = this . pools . get ( brokerUri ) ;
153169
@@ -170,7 +186,7 @@ export default class MqttClient implements ProtocolClient {
170186
171187 public async unlinkResource ( form : Form ) : Promise < void > {
172188 const requestUri = new url . URL ( form . href ) ;
173- const brokerUri : string = ` ${ this . scheme } ://` + requestUri . host ;
189+ const brokerUri : string = this . getBrokerUri ( form . href ) ;
174190 const topic = requestUri . pathname . slice ( 1 ) ;
175191
176192 const pool = this . pools . get ( brokerUri ) ;
0 commit comments