Skip to content

Commit 732ca71

Browse files
committed
integration of frigate ephemeral scanning
1 parent b9a4fb7 commit 732ca71

5 files changed

Lines changed: 268 additions & 2 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cli/v2/src/main.rs

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use bdk_sp::{
1414
hex::{DisplayHex, FromHex},
1515
key::Secp256k1,
1616
script::PushBytesBuf,
17-
secp256k1::{PublicKey, Scalar},
17+
secp256k1::{PublicKey, Scalar, SecretKey},
1818
Address, Amount, Block, BlockHash, FeeRate, Network, OutPoint, PrivateKey, ScriptBuf,
1919
Sequence, Transaction, TxOut, Txid,
2020
},
@@ -34,6 +34,7 @@ use bdk_sp_oracles::{
3434
TrustedPeer, UnboundedReceiver, Warning,
3535
},
3636
filters::kyoto::{FilterEvent, FilterSubscriber},
37+
frigate::{FrigateClient, FrigateListener, History, SubscribeRequest},
3738
tweaks::blindbit::{BlindbitSubscriber, TweakEvent},
3839
};
3940
use bdk_sp_wallet::{
@@ -161,6 +162,24 @@ pub enum Commands {
161162
#[clap(long)]
162163
hash: Option<BlockHash>,
163164
},
165+
166+
ScanFrigate {
167+
#[clap(flatten)]
168+
rpc_args: RpcArgs,
169+
/// The scan private key for which outputs will be scanned for.
170+
#[clap(long)]
171+
scan_priv_key: SecretKey,
172+
/// The spend public key for which outputs will be scanned for.
173+
#[clap(long)]
174+
spend_pub_key: PublicKey,
175+
/// An optional start parameter from where the scanning will start. When not specified it starts from Taproot activation height.
176+
#[clap(long)]
177+
start: Option<u32>,
178+
/// Optional list of labels to scan for. It always scan for change index with label 0.
179+
#[clap(long)]
180+
labels: Option<Vec<u32>>,
181+
},
182+
164183
Create {
165184
/// Network
166185
#[clap(long, short, default_value = "signet")]
@@ -567,6 +586,64 @@ async fn main() -> anyhow::Result<()> {
567586
);
568587
}
569588
}
589+
Commands::ScanFrigate {
590+
rpc_args,
591+
scan_priv_key,
592+
spend_pub_key,
593+
start,
594+
labels,
595+
} => {
596+
let client = FrigateClient::new(
597+
&rpc_args.url,
598+
rpc_args.rpc_user.as_deref(),
599+
rpc_args.rpc_password.as_deref(),
600+
)?;
601+
602+
let rpc_client = rpc_args.new_client()?;
603+
// send a subscribe request
604+
let subscribe_params = SubscribeRequest {
605+
scan_priv_key,
606+
spend_pub_key: spend_pub_key.into(),
607+
start_height: start,
608+
labels,
609+
};
610+
611+
let (history_tx, mut history_rx) =
612+
tokio::sync::mpsc::unbounded_channel::<(Vec<History>, f32)>();
613+
let frigate_listener = std::sync::Arc::new(FrigateListener::new(client, history_tx));
614+
let frigate_listener_task = frigate_listener.clone();
615+
tokio::task::spawn(async move { frigate_listener_task.run(subscribe_params).await });
616+
617+
tracing::info!("Starting frigate scanning loop... {rpc_args:#?}");
618+
loop {
619+
if let Some(history) = history_rx.recv().await {
620+
// Group by block height, then iterate over by fetching block details and apply
621+
let mut secrets_by_height: HashMap<u32, HashMap<Txid, PublicKey>> =
622+
HashMap::new();
623+
624+
history.0.iter().for_each(|h| {
625+
secrets_by_height
626+
.entry(h.height)
627+
.and_modify(|v| {
628+
v.insert(h.tx_hash, h.tweak_key);
629+
})
630+
.or_insert(HashMap::from([(h.tx_hash, h.tweak_key)]));
631+
});
632+
633+
// Filter when the height is 0, because that would mean mempool transaction
634+
for secret in secrets_by_height.into_iter().filter(|v| v.0 > 0) {
635+
let block_hash = rpc_client.get_block_hash(secret.0 as u64)?;
636+
let block = rpc_client.get_block(&block_hash)?;
637+
wallet.apply_block_relevant(&block, secret.1, secret.0);
638+
}
639+
640+
// Check the progress
641+
if history.1 >= 1.0 {
642+
break;
643+
}
644+
}
645+
}
646+
}
570647
Commands::Balance => {
571648
fn print_balances<'a>(
572649
title_str: &'a str,

oracles/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ redb = "2.4.0"
1313
rayon = "1.11.0"
1414
reqwest = { version = "0.12.23", features = ["json", "rustls-tls", "http2", "charset"], default-features = false }
1515
serde = { version = "1.0.219", features = ["serde_derive"] }
16-
serde_json = "1.0.142"
16+
serde_json = { version = "1.0.142", features = ["raw_value"]}
1717
url = "2.5.4"
1818
tracing = "0.1.41"
19+
jsonrpc = "=0.18.0"
1920

2021
[lints]
2122
workspace = true

oracles/src/frigate/mod.rs

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
use bip157::tokio;
2+
use bip157::tokio::io::{AsyncReadExt, AsyncWriteExt};
3+
use bip157::tokio::sync::mpsc::UnboundedSender;
4+
use bitcoin::Txid;
5+
use bitcoin::secp256k1::{PublicKey, SecretKey};
6+
use jsonrpc::simple_http::{self, SimpleHttpTransport};
7+
use jsonrpc::Client;
8+
use serde::{Deserialize, Serialize};
9+
use serde_json::value::to_raw_value;
10+
use serde_json::Value;
11+
12+
#[derive(Debug)]
13+
pub enum FrigateError {
14+
JsonRpc(jsonrpc::Error),
15+
ParseUrl(url::ParseError),
16+
Serde(serde_json::Error),
17+
}
18+
19+
impl From<serde_json::Error> for FrigateError {
20+
fn from(value: serde_json::Error) -> Self {
21+
FrigateError::Serde(value)
22+
}
23+
}
24+
25+
impl From<url::ParseError> for FrigateError {
26+
fn from(value: url::ParseError) -> Self {
27+
Self::ParseUrl(value)
28+
}
29+
}
30+
31+
impl From<jsonrpc::Error> for FrigateError {
32+
fn from(value: jsonrpc::Error) -> Self {
33+
Self::JsonRpc(value)
34+
}
35+
}
36+
37+
pub struct FrigateClient {
38+
pub host_url: String,
39+
client: Client,
40+
}
41+
42+
#[derive(Serialize, Deserialize)]
43+
pub struct History {
44+
pub height: u32,
45+
pub tx_hash: Txid,
46+
pub tweak_key: PublicKey,
47+
pub progress: f32,
48+
}
49+
50+
#[derive(Serialize, Deserialize)]
51+
pub struct NotifPayload {
52+
scan_private_key: SecretKey,
53+
spend_public_key: PublicKey,
54+
address: String,
55+
labels: Option<Vec<u32>>,
56+
start_height: u32,
57+
progress: f32,
58+
history: Vec<History>,
59+
}
60+
61+
#[derive(Serialize, Deserialize)]
62+
pub struct SubscribeRequest {
63+
pub scan_priv_key: SecretKey,
64+
pub spend_pub_key: PublicKey,
65+
pub start_height: Option<u32>,
66+
pub labels: Option<Vec<u32>>,
67+
}
68+
69+
#[derive(Serialize, Deserialize)]
70+
pub struct UnsubscribeRequest {
71+
pub scan_privkey: SecretKey,
72+
pub spend_pubkey: PublicKey,
73+
}
74+
75+
#[derive(Serialize, Deserialize)]
76+
pub struct GetRequest {
77+
pub tx_hash: Txid,
78+
}
79+
80+
const SUBSCRIBE_RPC_METHOD: &str = "blockchain.silentpayments.subscribe";
81+
const UNSUBSCRIBE_RPC_METHOD: &str = "blockchain.silentpayments.unsubscribe";
82+
const GET_RPC_METHOD: &str = "blockchain.transaction.get";
83+
const SUBSCRIBE_OWNED_OUTPUTS: &str = "blockchain.scripthash.subscribe";
84+
85+
impl FrigateClient {
86+
pub fn new(
87+
host_url: &str,
88+
user: Option<&str>,
89+
password: Option<&str>,
90+
) -> Result<Self, simple_http::Error> {
91+
let transport = SimpleHttpTransport::builder()
92+
.url(host_url)?
93+
.auth(user.unwrap_or(""), password)
94+
.build();
95+
96+
Ok(Self {
97+
host_url: host_url.to_string(),
98+
client: Client::with_transport(transport),
99+
})
100+
}
101+
102+
pub fn subscribe(&self, req: &SubscribeRequest) -> Result<String, FrigateError> {
103+
let params = to_raw_value(&serde_json::json!(req))?;
104+
let req = self
105+
.client
106+
.build_request(SUBSCRIBE_RPC_METHOD, Some(&params));
107+
let res = self.client.send_request(req)?;
108+
Ok(res.result()?)
109+
}
110+
111+
pub fn unsubscribe(&self, req: &UnsubscribeRequest) -> Result<String, FrigateError> {
112+
let params = to_raw_value(&serde_json::json!(req))?;
113+
let req = self
114+
.client
115+
.build_request(UNSUBSCRIBE_RPC_METHOD, Some(&params));
116+
let res = self.client.send_request(req)?;
117+
Ok(res.result()?)
118+
}
119+
120+
pub fn get(&self, req: &GetRequest) -> Result<String, FrigateError> {
121+
let params = to_raw_value(req)?;
122+
let req = self.client.build_request(GET_RPC_METHOD, Some(&params));
123+
let response = self.client.send_request(req)?;
124+
Ok(response.result()?)
125+
}
126+
}
127+
128+
pub struct FrigateListener {
129+
pub frigate_client: FrigateClient,
130+
sender: UnboundedSender<(Vec<History>, f32)>,
131+
}
132+
133+
impl FrigateListener {
134+
pub fn new(frigate_client: FrigateClient, sender: UnboundedSender<(Vec<History>, f32)>) -> Self {
135+
Self {
136+
frigate_client,
137+
sender,
138+
}
139+
}
140+
141+
pub async fn run(&self, params: SubscribeRequest) -> Result<(), FrigateError> {
142+
let raw_subscribe_params = serde_json::json!(params);
143+
let mut tcp_stream = tokio::net::TcpStream::connect(self.frigate_client.host_url.clone())
144+
.await
145+
.map_err(|e| FrigateError::JsonRpc(jsonrpc::Error::Transport(Box::new(e))))?;
146+
let (mut reader, mut writer) = tcp_stream.split();
147+
148+
writer
149+
.write_all(raw_subscribe_params.to_string().as_bytes())
150+
.await
151+
.map_err(|e| FrigateError::JsonRpc(jsonrpc::Error::Transport(Box::new(e))))?;
152+
153+
loop {
154+
let mut buffer = vec![0; 1024];
155+
let n = reader
156+
.read(&mut buffer)
157+
.await
158+
.map_err(|e| FrigateError::JsonRpc(jsonrpc::Error::Transport(Box::new(e))))?;
159+
if n == 0 {
160+
break;
161+
}
162+
let response_str = String::from_utf8_lossy(&buffer[..n]);
163+
let result: Value = serde_json::from_str(&response_str).map_err(FrigateError::Serde)?;
164+
165+
if result["result"].is_string() {
166+
tracing::info!("Subscribed to silent payment address: {:?}", result);
167+
} else if result["params"].is_object() {
168+
let histories: Vec<History> =
169+
serde_json::from_value(result["params"]["history"].clone())
170+
.map_err(FrigateError::Serde)?;
171+
172+
let progress = result["params"]["progress"].as_f64().unwrap_or(0.0) as f32;
173+
174+
self.sender
175+
.send((histories, progress))
176+
.map_err(|e| FrigateError::JsonRpc(jsonrpc::Error::Transport(Box::new(e))))?;
177+
178+
if progress >= 1.0 {
179+
tracing::info!("Scanning completed");
180+
break;
181+
}
182+
}
183+
}
184+
Ok(())
185+
}
186+
}

oracles/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pub mod filters;
2+
pub mod frigate;
23
pub mod tweaks;
34
pub use bip157;

0 commit comments

Comments
 (0)