feat: implement efficient upsert operation for create_or_update
Some checks failed
CI / tests (push) Failing after 31s

Replace the existing two-query create_or_update implementation with a
single atomic PostgreSQL upsert using ON CONFLICT clause to eliminate
race conditions and improve performance.

Race condition fix:
The previous implementation had a critical race condition where
multiple concurrent requests could:
1. Both call find() and get None (record doesn't exist)
2. Both call create() and the second one fails with duplicate key
   error
3. Or between find() and create(), another transaction inserts the
   record

This created unreliable behavior in high-concurrency scenarios.

Changes:
- Add generate_upsert_query function in trait_implementation.rs
- Generate SQL with INSERT ... ON CONFLICT ... DO UPDATE SET pattern
- Remove default trait implementation that used separate
  find/create/update calls
- Update derive_trait to include upsert query generation
- Convert create_or_update from default implementation to required
  trait method

The new implementation eliminates race conditions while reducing
database round trips from 2-3 queries down to 1, significantly
improving both reliability and performance.
This commit is contained in:
Lucien Cartier-Tilet 2025-06-05 18:37:53 +02:00
parent fb20c7a0aa
commit 3415e18287
Signed by: phundrak
SSH Key Fingerprint: SHA256:CE0HPsbW3L2YiJETx1zYZ2muMptaAqTN2g3498KrMkc
2 changed files with 45 additions and 11 deletions

View File

@ -97,6 +97,47 @@ fn generate_delete_query(table: &str, id: &GeormField) -> proc_macro2::TokenStre
}
}
fn generate_upsert_query(
table: &str,
fields: &[GeormField],
id: &GeormField,
) -> proc_macro2::TokenStream {
let inputs: Vec<String> = (1..=fields.len()).map(|num| format!("${num}")).collect();
let columns = fields
.iter()
.map(|f| f.ident.to_string())
.collect::<Vec<String>>()
.join(", ");
// For ON CONFLICT DO UPDATE, exclude the ID field from updates
let update_assignments = fields
.iter()
.filter(|f| !f.id)
.map(|f| format!("{} = EXCLUDED.{}", f.ident, f.ident))
.collect::<Vec<String>>()
.join(", ");
let upsert_string = format!(
"INSERT INTO {table} ({columns}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {update_assignments} RETURNING *",
inputs.join(", "),
id.ident
);
let field_idents: Vec<syn::Ident> = fields.iter().map(|f| f.ident.clone()).collect();
quote! {
async fn create_or_update(&self, pool: &::sqlx::PgPool) -> ::sqlx::Result<Self> {
::sqlx::query_as!(
Self,
#upsert_string,
#(self.#field_idents),*
)
.fetch_one(pool)
.await
}
}
}
fn generate_get_id(id: &GeormField) -> proc_macro2::TokenStream {
let ident = &id.ident;
let ty = &id.ty;
@ -125,6 +166,7 @@ pub fn derive_trait(
let find_query = generate_find_query(table, id);
let create_query = generate_create_query(table, fields);
let update_query = generate_update_query(table, fields, id);
let upsert_query = generate_upsert_query(table, fields, id);
let delete_query = generate_delete_query(table, id);
quote! {
impl #impl_generics Georm<#ty> for #ident #type_generics #where_clause {
@ -133,6 +175,7 @@ pub fn derive_trait(
#find_query
#create_query
#update_query
#upsert_query
#delete_query
}
}

View File

@ -50,18 +50,9 @@ pub trait Georm<Id> {
fn create_or_update(
&self,
pool: &sqlx::PgPool,
) -> impl ::std::future::Future<Output = sqlx::Result<Self>>
) -> impl std::future::Future<Output = sqlx::Result<Self>> + Send
where
Self: Sized,
{
async {
if Self::find(pool, self.get_id()).await?.is_some() {
self.update(pool).await
} else {
self.create(pool).await
}
}
}
Self: Sized;
/// Delete the entity from the database if it exists.
///