feat: enable transaction support via sqlx::Executor

This commit abstracts the database operations to use the generic
`sqlx::Executor` trait instead of a concrete `&sqlx::PgPool`.

This change allows all generated methods (find, create, update,
delete, and relationships) to be executed within a
`sqlx::Transaction`, in addition to a connection pool. This is a
crucial feature for ensuring atomic operations and data consistency.

The public-facing traits `Georm` and `Defaultable` have been updated
to require `sqlx::Executor`, and the documentation has been updated to
reflect this new capability.
This commit is contained in:
2025-08-09 12:19:06 +02:00
parent 3307aa679d
commit 49c7d86102
19 changed files with 230 additions and 112 deletions

View File

@@ -61,7 +61,8 @@ async fn create_comment(
pool: &sqlx::PgPool,
) -> Result {
let prompt = "Who is creating the comment?";
let user = User::get_user_by_username_or_select(username.as_deref(), prompt, pool).await?;
let mut tx = pool.begin().await?;
let user = User::get_user_by_username_or_select(username.as_deref(), prompt, &mut *tx).await?;
let content = match text {
Some(text) => text,
None => inquire::Text::new("Content of the comment:")
@@ -73,29 +74,33 @@ async fn create_comment(
content,
id: None,
};
let comment = comment.create(pool).await?;
let comment = comment.create(&mut *tx).await?;
tx.commit().await?;
println!("Successfuly created comment:\n{comment}");
Ok(())
}
async fn remove_comment(id: Option<i32>, pool: &sqlx::PgPool) -> Result {
let prompt = "Select the comment to remove:";
let mut tx = pool.begin().await?;
let comment = match id {
Some(id) => Comment::find(pool, &id)
Some(id) => Comment::find(&mut *tx, &id)
.await
.map_err(UserInputError::DatabaseError)?
.ok_or(UserInputError::CommentDoesNotExist)?,
None => Comment::select_comment(prompt, pool).await?,
None => Comment::select_comment(prompt, &mut *tx).await?,
};
comment.delete(pool).await?;
comment.delete(&mut *tx).await?;
tx.commit().await?;
Ok(())
}
async fn remove_user_comment(username: Option<String>, pool: &sqlx::PgPool) -> Result {
let mut tx = pool.begin().await?;
let prompt = "Select user whose comment you want to delete:";
let user = User::get_user_by_username_or_select(username.as_deref(), prompt, pool).await?;
let user = User::get_user_by_username_or_select(username.as_deref(), prompt, &mut *tx).await?;
let comments: HashMap<String, Comment> = user
.get_comments(pool)
.get_comments(&mut *tx)
.await?
.into_iter()
.map(|comment| (comment.content.clone(), comment))
@@ -105,7 +110,8 @@ async fn remove_user_comment(username: Option<String>, pool: &sqlx::PgPool) -> R
.prompt()
.map_err(UserInputError::InquireError)?;
let comment: &Comment = comments.get(&selected_comment_content).unwrap();
comment.delete(pool).await?;
comment.delete(&mut *tx).await?;
tx.commit().await?;
Ok(())
}

View File

@@ -53,16 +53,17 @@ async fn follow_user(
followed: Option<String>,
pool: &sqlx::PgPool,
) -> Result {
let mut tx = pool.begin().await?;
let follower = User::get_user_by_username_or_select(
follower.as_deref(),
"Select who will be following someone:",
pool,
&mut *tx,
)
.await?;
let followed = User::get_user_by_username_or_select(
followed.as_deref(),
"Select who will be followed:",
pool,
&mut *tx,
)
.await?;
let follow = FollowerDefault {
@@ -70,17 +71,22 @@ async fn follow_user(
follower: follower.id,
followed: followed.id,
};
follow.create(pool).await?;
follow.create(&mut *tx).await?;
tx.commit().await?;
println!("User {follower} now follows {followed}");
Ok(())
}
async fn unfollow_user(follower: Option<String>, pool: &sqlx::PgPool) -> Result {
let follower =
User::get_user_by_username_or_select(follower.as_deref(), "Select who is following", pool)
.await?;
let mut tx = pool.begin().await?;
let follower = User::get_user_by_username_or_select(
follower.as_deref(),
"Select who is following",
&mut *tx,
)
.await?;
let followed_list: HashMap<String, User> = follower
.get_followed(pool)
.get_followed(&mut *tx)
.await?
.iter()
.map(|person| (person.username.clone(), person.clone()))
@@ -97,8 +103,9 @@ async fn unfollow_user(follower: Option<String>, pool: &sqlx::PgPool) -> Result
follower.id,
followed.id
)
.execute(pool)
.execute(&mut *tx)
.await?;
tx.commit().await?;
println!("User {follower} unfollowed {followed}");
Ok(())
}

View File

@@ -18,8 +18,11 @@ pub struct Comment {
}
impl Comment {
pub async fn select_comment(prompt: &str, pool: &sqlx::PgPool) -> Result<Self> {
let comments: HashMap<String, Self> = Self::find_all(pool)
pub async fn select_comment<'e, E>(prompt: &str, executor: E) -> Result<Self>
where
E: sqlx::Executor<'e, Database = sqlx::Postgres>,
{
let comments: HashMap<String, Self> = Self::find_all(executor)
.await?
.into_iter()
.map(|comment| (comment.content.clone(), comment))

View File

@@ -38,7 +38,10 @@ impl Profile {
self.bio.clone().unwrap_or_default()
}
pub async fn try_new(user_id: i32, pool: &sqlx::PgPool) -> Result<Self> {
pub async fn try_new<'e, E>(user_id: i32, executor: E) -> Result<Self>
where
E: sqlx::Executor<'e, Database = sqlx::Postgres>,
{
let profile = ProfileDefault {
user_id,
id: None,
@@ -46,20 +49,23 @@ impl Profile {
display_name: None,
};
profile
.create(pool)
.create(executor)
.await
.map_err(UserInputError::DatabaseError)
}
pub async fn update_interactive(
pub async fn update_interactive<'e, E>(
&mut self,
display_name: Option<String>,
bio: Option<String>,
pool: &sqlx::PgPool,
) -> Result<Self> {
executor: E
) -> Result<Self>
where
E: sqlx::Executor<'e, Database = sqlx::Postgres>,
{
self.display_name = display_name;
self.bio = bio;
self.update(pool)
self.update(executor)
.await
.map_err(UserInputError::DatabaseError)
}

View File

@@ -50,8 +50,11 @@ impl From<&str> for UserDefault {
}
impl User {
async fn select_user(prompt: &str, pool: &sqlx::PgPool) -> Result<Self> {
let users: HashMap<String, Self> = Self::find_all(pool)
async fn select_user<'e, E>(prompt: &str, executor: E) -> Result<Self>
where
E: sqlx::Executor<'e, Database = sqlx::Postgres>,
{
let users: HashMap<String, Self> = Self::find_all(executor)
.await?
.into_iter()
.map(|user| (user.username.clone(), user))
@@ -63,41 +66,50 @@ impl User {
Ok(user.clone())
}
pub async fn get_user_by_id_or_select(
pub async fn get_user_by_id_or_select<'e, E>(
id: Option<i32>,
prompt: &str,
pool: &sqlx::PgPool,
) -> Result<Self> {
executor: E
) -> Result<Self>
where
E: sqlx::Executor<'e, Database = sqlx::Postgres>,
{
let user = match id {
Some(id) => Self::find(pool, &id)
Some(id) => Self::find(executor, &id)
.await?
.ok_or(UserInputError::UserDoesNotExist)?,
None => Self::select_user(prompt, pool).await?,
None => Self::select_user(prompt, executor).await?,
};
Ok(user)
}
pub async fn get_user_by_username_or_select(
pub async fn get_user_by_username_or_select<'e, E>(
username: Option<&str>,
prompt: &str,
pool: &sqlx::PgPool,
) -> Result<Self> {
executor: E,
) -> Result<Self>
where
E: sqlx::Executor<'e, Database = sqlx::Postgres>,
{
let user = match username {
Some(username) => Self::find_by_username(username, pool)
Some(username) => Self::find_by_username(username, executor)
.await?
.ok_or(UserInputError::UserDoesNotExist)?,
None => Self::select_user(prompt, pool).await?,
None => Self::select_user(prompt, executor).await?,
};
Ok(user)
}
pub async fn find_by_username(username: &str, pool: &sqlx::PgPool) -> Result<Option<Self>> {
pub async fn find_by_username<'e, E>(username: &str, executor: E) -> Result<Option<Self>>
where
E: sqlx::Executor<'e, Database = sqlx::Postgres>,
{
sqlx::query_as!(
Self,
"SELECT * FROM Users u WHERE u.username = $1",
username
)
.fetch_optional(pool)
.fetch_optional(executor)
.await
.map_err(UserInputError::DatabaseError)
}
@@ -116,7 +128,8 @@ impl User {
Ok(user)
}
pub async fn update_profile(id: Option<i32>, pool: &sqlx::PgPool) -> Result<(User, Profile)> {
pub async fn update_profile(id: Option<i32>, pool: &sqlx::PgPool) -> Result<(User, Profile)>
{
let prompt = "Select the user whose profile you want to update";
let user = Self::get_user_by_id_or_select(id, prompt, pool).await?;
let profile = match user.get_profile(pool).await? {