Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 31 additions & 14 deletions sqlx-postgres/src/advisory_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use crate::Either;
use crate::PgConnection;
use hkdf::Hkdf;
use sha2::Sha256;
use sqlx_core::executor::Executor;
use sqlx_core::sql_str::SqlSafeStr;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::sync::OnceLock;
Expand Down Expand Up @@ -199,27 +201,36 @@ impl PgAdvisoryLock {
/// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details.
///
/// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
///
/// # Cancel Safety
///
/// This method is cancel safe. If the future is dropped before the query completes, a
/// `pg_advisory_unlock()` call is queued and run the next time the connection is used.
pub async fn acquire<C: AsMut<PgConnection>>(
&self,
mut conn: C,
) -> Result<PgAdvisoryLockGuard<C>> {
let query = match &self.key {
PgAdvisoryLockKey::BigInt(_) => "SELECT pg_advisory_lock($1)",
PgAdvisoryLockKey::IntPair(_, _) => "SELECT pg_advisory_lock($1, $2)",
};

let stmt = conn.as_mut().prepare(query.into_sql_str()).await?;
let query = crate::query::query_statement(&stmt);

// We're wrapping the connection in a `PgAdvisoryLockGuard` early here on purpose. If this
// future is dropped, the lock will be released in the drop impl.
let mut guard = PgAdvisoryLockGuard::new(self.clone(), conn);
let conn = guard.conn.as_mut().unwrap();

match &self.key {
PgAdvisoryLockKey::BigInt(key) => {
crate::query::query("SELECT pg_advisory_lock($1)")
.bind(key)
.execute(conn.as_mut())
.await?;
}
PgAdvisoryLockKey::IntPair(key1, key2) => {
crate::query::query("SELECT pg_advisory_lock($1, $2)")
.bind(key1)
.bind(key2)
.execute(conn.as_mut())
.await?;
}
PgAdvisoryLockKey::BigInt(key) => query.bind(key),
PgAdvisoryLockKey::IntPair(key1, key2) => query.bind(key1).bind(key2),
}
.execute(conn.as_mut())
.await?;

Ok(PgAdvisoryLockGuard::new(self.clone(), conn))
Ok(guard)
}

/// Acquires an exclusive lock using `pg_try_advisory_lock()`, returning immediately
Expand All @@ -242,6 +253,12 @@ impl PgAdvisoryLock {
/// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details.
///
/// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
///
/// # Cancel Safety
///
/// This method is **not** cancel safe. If the future is dropped while the query is in-flight,
/// it is not possible to know whether the lock was acquired, so it cannot be safely released.
/// The lock may remain held until the connection is closed.
pub async fn try_acquire<C: AsMut<PgConnection>>(
&self,
mut conn: C,
Expand Down
Loading