Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ if-addrs = { version = "0.10", features = ["link-local"] } # get local IP addres
log = { version = "0.4", optional = true } # logging
polling = "2.1" # select/poll sockets
socket2 = { version = "0.5.5", features = ["all"] } # socket APIs
socket-pktinfo = { version = "0.2.1" } # socket packet info extension

[dev-dependencies]
fastrand = "1.8"
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ This implementation is based on the following RFCs:

This is still beta software. We focus on the common use cases at hand. And we tested with some existing common tools (e.g. `Avahi` on Linux, `dns-sd` on MacOS, and `Bonjour` library on iOS) to verify the basic compatibility.

Currently this library has the following limitations:
- Only support multicast, no unicast send/recv.

## License

Licensed under either of
Expand Down
7 changes: 7 additions & 0 deletions src/dns_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub(crate) const MAX_MSG_ABSOLUTE: usize = 8972;
pub(crate) const FLAGS_QR_MASK: u16 = 0x8000; // mask for query/response bit
pub(crate) const FLAGS_QR_QUERY: u16 = 0x0000;
pub(crate) const FLAGS_QR_RESPONSE: u16 = 0x8000;
pub(crate) const FLAGS_UNICAST_RESPONSE_MASK: u16 = 0x8000; // mask for question unicast response bit
Comment thread
pixsperdavid marked this conversation as resolved.
Outdated
pub(crate) const FLAGS_AA: u16 = 0x0400; // mask for Authoritative answer bit

pub(crate) type DnsRecordBox = Box<dyn DnsRecordExt + Send>;
Expand Down Expand Up @@ -77,6 +78,12 @@ pub(crate) struct DnsQuestion {
pub(crate) entry: DnsEntry,
}

impl DnsQuestion {
pub fn is_unicast_response_requested(&self) -> bool {
self.entry.class & FLAGS_UNICAST_RESPONSE_MASK == FLAGS_UNICAST_RESPONSE_MASK
}
}

/// A DNS Resource Record - like a DNS entry, but has a TTL.
/// RFC: https://www.rfc-editor.org/rfc/rfc1035#section-3.2.1
/// https://www.rfc-editor.org/rfc/rfc1035#section-4.1.3
Expand Down
157 changes: 129 additions & 28 deletions src/service_daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ use crate::{
Receiver,
};
use flume::{bounded, Sender, TrySendError};
use if_addrs::Interface;
use if_addrs::{IfAddr, Interface};
use polling::Poller;
use socket2::{SockAddr, Socket};
use socket2::SockAddr;
use socket_pktinfo::PktInfoUdpSocket;
use std::cmp::min;
use std::{
cmp,
collections::{HashMap, HashSet},
fmt,
io::Read,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
str, thread,
time::Duration,
Expand All @@ -75,6 +76,8 @@ const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);

const RESOLVE_WAIT_IN_MILLIS: u64 = 500;

const LEGACY_UNICAST_RR_TTL: u32 = 10;
Comment thread
pixsperdavid marked this conversation as resolved.
Outdated

/// Response status code for the service `unregister` call.
#[derive(Debug)]
pub enum UnregisterStatus {
Expand Down Expand Up @@ -105,6 +108,8 @@ enum Counter {
UnregisterResend,
Browse,
Respond,
RespondUnicast,
RespondLegacyUnicast,
CacheRefreshQuery,
}

Expand All @@ -117,6 +122,8 @@ impl fmt::Display for Counter {
Counter::UnregisterResend => write!(f, "unregister-resend"),
Counter::Browse => write!(f, "browse"),
Counter::Respond => write!(f, "respond"),
Counter::RespondUnicast => write!(f, "respond-unicast"),
Counter::RespondLegacyUnicast => write!(f, "respond-legacy-unicast"),
Counter::CacheRefreshQuery => write!(f, "cache-refresh"),
}
}
Expand Down Expand Up @@ -691,7 +698,7 @@ impl ServiceDaemon {
}

/// Creates a new UDP socket that uses `intf` to send and recv multicast.
fn new_socket_bind(intf: &Interface) -> Result<Socket> {
fn new_socket_bind(intf: &Interface) -> Result<PktInfoUdpSocket> {
// Use the same socket for receiving and sending multicast packets.
// Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
let intf_ip = &intf.ip();
Expand Down Expand Up @@ -738,31 +745,30 @@ fn new_socket_bind(intf: &Interface) -> Result<Socket> {

/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
/// `non_block` indicates whether to set O_NONBLOCK for the socket.
fn new_socket(addr: SocketAddr, non_block: bool) -> Result<Socket> {
fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
let domain = match addr {
SocketAddr::V4(_) => socket2::Domain::IPV4,
SocketAddr::V6(_) => socket2::Domain::IPV6,
};

let fd = Socket::new(domain, socket2::Type::DGRAM, None)
.map_err(|e| e_fmt!("create socket failed: {}", e))?;
let sock = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;

fd.set_reuse_address(true)
sock.set_reuse_address(true)
.map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
#[cfg(unix)] // this is currently restricted to Unix's in socket2
fd.set_reuse_port(true)
sock.set_reuse_port(true)
.map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;

if non_block {
fd.set_nonblocking(true)
sock.set_nonblocking(true)
.map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
}

fd.bind(&addr.into())
sock.bind(&addr.into())
.map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;

debug!("new socket bind to {}", &addr);
Ok(fd)
Ok(sock)
}

/// Specify a UNIX timestamp in millis to run `command` for the next time.
Expand All @@ -777,7 +783,7 @@ struct ReRun {
#[derive(Debug)]
struct IntfSock {
intf: Interface,
sock: Socket,
sock: PktInfoUdpSocket,
}

/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
Expand Down Expand Up @@ -882,7 +888,7 @@ struct Zeroconf {
/// Next poll id value
poll_id_count: usize,

/// Local registered services, keyed by service full names.
/// Local registered services,Keyed by service full names.
my_services: HashMap<String, ServiceInfo>,

cache: DnsCache,
Expand Down Expand Up @@ -1128,7 +1134,7 @@ impl Zeroconf {

fn add_new_interface(&mut self, intf: Interface) {
// Bind the new interface.
let new_ip = intf.ip();
let new_ip = intf.addr.ip();
let sock = match new_socket_bind(&intf) {
Ok(s) => s,
Err(e) => {
Expand Down Expand Up @@ -1389,6 +1395,7 @@ impl Zeroconf {
Some(if_sock) => if_sock,
None => return false,
};

let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];

// Read the next mDNS UDP datagram.
Expand All @@ -1397,8 +1404,8 @@ impl Zeroconf {
// be truncated by the socket layer depending on the platform's libc.
// In any case, such large datagram will not be decoded properly and
// this function should return false but should not crash.
let sz = match intf_sock.sock.read(&mut buf) {
Ok(sz) => sz,
let (sz, pktinfo) = match intf_sock.sock.recv(&mut buf) {
Ok(r) => r,
Err(e) => {
if e.kind() != std::io::ErrorKind::WouldBlock {
error!("listening socket read failed: {}", e);
Expand Down Expand Up @@ -1430,12 +1437,48 @@ impl Zeroconf {

buf.truncate(sz); // reduce potential processing errors

let is_unicast = !pktinfo.addr_dst.is_multicast();

// Ignore unicast packets outside the local link
if is_unicast {
let should_respond = match (pktinfo.addr_src.ip(), &intf_sock.intf.addr) {
Comment thread
pixsperdavid marked this conversation as resolved.
(IpAddr::V4(src_ip), IfAddr::V4(intf)) => {
if src_ip.is_loopback() {
true
} else {
let src_ip: u32 = src_ip.into();
let intf_ip: u32 = intf.ip.into();
let intf_netmask: u32 = intf.netmask.into();
// Is src_ip in local subnet?
(intf_ip & intf_netmask) == (src_ip & intf_netmask)
}
}
(IpAddr::V6(src_ip), &IfAddr::V6(_)) => {
if src_ip.is_loopback() {
true
} else {
// Does src_ip have on-link prefix?
src_ip.segments()[0] & 0xffc0 == 0xfe80
}
}
// Interface and source message IP versions do not match
_ => false,
};
if !should_respond {
return true;
}
};

match DnsIncoming::new(buf) {
Ok(msg) => {
if msg.is_query() {
self.handle_query(msg, ip);
self.handle_query(msg, ip, pktinfo.addr_src, is_unicast);
} else if msg.is_response() {
self.handle_response(msg);
if !is_unicast {
self.handle_response(msg);
} else {
error!("Invalid message: unrequested unicast response");
Comment thread
keepsimple1 marked this conversation as resolved.
}
} else {
error!("Invalid message: not query and not response");
}
Expand Down Expand Up @@ -1724,12 +1767,24 @@ impl Zeroconf {
}
}

fn handle_query(&mut self, msg: DnsIncoming, ip: &IpAddr) {
fn handle_query(
&mut self,
msg: DnsIncoming,
ip: &IpAddr,
src_addr: SocketAddr,
is_unicast_query: bool,
) {
let intf_sock = match self.intf_socks.get(ip) {
Some(sock) => sock,
None => return,
};
let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
let is_legacy_unicast = is_unicast_query && src_addr.port() != MDNS_PORT;
Comment thread
pixsperdavid marked this conversation as resolved.
let is_unicast_reply = is_unicast_query
&& msg
.questions
.iter()
.all(|q| q.is_unicast_response_requested());

// Special meta-query "_services._dns-sd._udp.<Domain>".
// See https://datatracker.ietf.org/doc/html/rfc6763#section-9
Expand All @@ -1755,7 +1810,11 @@ impl Zeroconf {
&question.entry.name,
TYPE_PTR,
CLASS_IN,
service.get_other_ttl(),
if is_legacy_unicast {
min(LEGACY_UNICAST_RR_TTL, service.get_host_ttl())
} else {
service.get_other_ttl()
},
service.get_type().to_string(),
)),
);
Expand Down Expand Up @@ -1792,7 +1851,11 @@ impl Zeroconf {
&question.entry.name,
t,
CLASS_IN | CLASS_UNIQUE,
service.get_host_ttl(),
if is_legacy_unicast {
min(LEGACY_UNICAST_RR_TTL, service.get_host_ttl())
} else {
service.get_host_ttl()
},
address,
)),
);
Expand All @@ -1813,7 +1876,11 @@ impl Zeroconf {
Box::new(DnsSrv::new(
&question.entry.name,
CLASS_IN | CLASS_UNIQUE,
service.get_host_ttl(),
if is_legacy_unicast {
min(LEGACY_UNICAST_RR_TTL, service.get_host_ttl())
} else {
service.get_host_ttl()
},
service.get_priority(),
service.get_weight(),
service.get_port(),
Expand All @@ -1829,7 +1896,11 @@ impl Zeroconf {
&question.entry.name,
TYPE_TXT,
CLASS_IN | CLASS_UNIQUE,
service.get_host_ttl(),
if is_legacy_unicast {
min(LEGACY_UNICAST_RR_TTL, service.get_host_ttl())
} else {
service.get_host_ttl()
},
service.generate_txt(),
)),
);
Expand All @@ -1853,7 +1924,11 @@ impl Zeroconf {
service.get_hostname(),
t,
CLASS_IN | CLASS_UNIQUE,
service.get_host_ttl(),
if is_legacy_unicast {
min(LEGACY_UNICAST_RR_TTL, service.get_host_ttl())
} else {
service.get_host_ttl()
},
Comment thread
pixsperdavid marked this conversation as resolved.
Outdated
address,
)));
}
Expand All @@ -1863,9 +1938,18 @@ impl Zeroconf {

if !out.answers.is_empty() {
out.id = msg.id;
broadcast_dns_on_intf(&out, intf_sock);

self.increase_counter(Counter::Respond, 1);
if is_unicast_reply {
if is_legacy_unicast {
unicast_dns_on_intf(&out, src_addr.ip(), intf_sock);
self.increase_counter(Counter::RespondLegacyUnicast, 1);
} else {
unicast_dns_on_intf(&out, src_addr.ip(), intf_sock);
Comment thread
pixsperdavid marked this conversation as resolved.
Outdated
self.increase_counter(Counter::RespondUnicast, 1);
}
} else {
broadcast_dns_on_intf(&out, intf_sock);
self.increase_counter(Counter::Respond, 1);
}
}
}

Expand Down Expand Up @@ -2282,6 +2366,23 @@ fn broadcast_on_intf<'a>(packet: &'a [u8], intf: &IntfSock) -> &'a [u8] {
packet
}

/// Send an outgoing unicast DNS query or response, and returns the packet bytes.
fn unicast_dns_on_intf(out: &DnsOutgoing, ip_addr: IpAddr, intf: &IntfSock) -> Vec<u8> {
let qtype = if out.is_query() { "query" } else { "response" };
debug!(
"Unicasting ({}) {}: {} questions {} answers {} authorities {} additional",
ip_addr,
qtype,
out.questions.len(),
out.answers.len(),
out.authorities.len(),
out.additionals.len()
);
let packet = out.to_packet_data();
send_packet(&packet[..], SocketAddr::new(ip_addr, MDNS_PORT), intf);
packet
}

/// Sends out `packet` to `addr` on the socket in `intf_sock`.
fn send_packet(packet: &[u8], addr: SocketAddr, intf_sock: &IntfSock) {
let sockaddr = SockAddr::from(addr);
Expand Down