feat: implement efficient upsert operation for create_or_update
All checks were successful
CI / tests (push) Successful in 5m17s

Replace the existing two-query create_or_update implementation with a
single atomic PostgreSQL upsert using ON CONFLICT clause to eliminate
race conditions and improve performance.

Race condition fix:
The previous implementation had a critical race condition where
multiple concurrent requests could:
1. Both call find() and get None (record doesn't exist)
2. Both call create() and the second one fails with duplicate key
   error
3. Or between find() and create(), another transaction inserts the
   record

This created unreliable behavior in high-concurrency scenarios.

Changes:
- Add generate_upsert_query function in trait_implementation.rs
- Generate SQL with INSERT ... ON CONFLICT ... DO UPDATE SET pattern
- Remove default trait implementation that used separate
  find/create/update calls
- Update derive_trait to include upsert query generation
- Convert create_or_update from default implementation to required
  trait method

The new implementation eliminates race conditions while reducing
database round trips from 2-3 queries down to 1, significantly
improving both reliability and performance.
This commit is contained in:
Lucien Cartier-Tilet 2025-06-05 18:37:53 +02:00
parent 6e1376e04a
commit a899dd46fe
Signed by: phundrak
SSH Key Fingerprint: SHA256:CE0HPsbW3L2YiJETx1zYZ2muMptaAqTN2g3498KrMkc
3 changed files with 45 additions and 12 deletions

View File

@ -506,7 +506,6 @@ Georm is designed for zero runtime overhead:
### High Priority ### High Priority
- **Transaction Support**: Comprehensive transaction handling with atomic operations - **Transaction Support**: Comprehensive transaction handling with atomic operations
- **Race Condition Fix**: Database-native UPSERT operations to replace current `create_or_update`
### Medium Priority ### Medium Priority
- **Multi-Database Support**: MySQL and SQLite support with feature flags - **Multi-Database Support**: MySQL and SQLite support with feature flags

View File

@ -97,6 +97,47 @@ fn generate_delete_query(table: &str, id: &GeormField) -> proc_macro2::TokenStre
} }
} }
fn generate_upsert_query(
table: &str,
fields: &[GeormField],
id: &GeormField,
) -> proc_macro2::TokenStream {
let inputs: Vec<String> = (1..=fields.len()).map(|num| format!("${num}")).collect();
let columns = fields
.iter()
.map(|f| f.ident.to_string())
.collect::<Vec<String>>()
.join(", ");
// For ON CONFLICT DO UPDATE, exclude the ID field from updates
let update_assignments = fields
.iter()
.filter(|f| !f.id)
.map(|f| format!("{} = EXCLUDED.{}", f.ident, f.ident))
.collect::<Vec<String>>()
.join(", ");
let upsert_string = format!(
"INSERT INTO {table} ({columns}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {update_assignments} RETURNING *",
inputs.join(", "),
id.ident
);
let field_idents: Vec<syn::Ident> = fields.iter().map(|f| f.ident.clone()).collect();
quote! {
async fn create_or_update(&self, pool: &::sqlx::PgPool) -> ::sqlx::Result<Self> {
::sqlx::query_as!(
Self,
#upsert_string,
#(self.#field_idents),*
)
.fetch_one(pool)
.await
}
}
}
fn generate_get_id(id: &GeormField) -> proc_macro2::TokenStream { fn generate_get_id(id: &GeormField) -> proc_macro2::TokenStream {
let ident = &id.ident; let ident = &id.ident;
let ty = &id.ty; let ty = &id.ty;
@ -125,6 +166,7 @@ pub fn derive_trait(
let find_query = generate_find_query(table, id); let find_query = generate_find_query(table, id);
let create_query = generate_create_query(table, fields); let create_query = generate_create_query(table, fields);
let update_query = generate_update_query(table, fields, id); let update_query = generate_update_query(table, fields, id);
let upsert_query = generate_upsert_query(table, fields, id);
let delete_query = generate_delete_query(table, id); let delete_query = generate_delete_query(table, id);
quote! { quote! {
impl #impl_generics Georm<#ty> for #ident #type_generics #where_clause { impl #impl_generics Georm<#ty> for #ident #type_generics #where_clause {
@ -133,6 +175,7 @@ pub fn derive_trait(
#find_query #find_query
#create_query #create_query
#update_query #update_query
#upsert_query
#delete_query #delete_query
} }
} }

View File

@ -50,18 +50,9 @@ pub trait Georm<Id> {
fn create_or_update( fn create_or_update(
&self, &self,
pool: &sqlx::PgPool, pool: &sqlx::PgPool,
) -> impl ::std::future::Future<Output = sqlx::Result<Self>> ) -> impl std::future::Future<Output = sqlx::Result<Self>> + Send
where where
Self: Sized, Self: Sized;
{
async {
if Self::find(pool, self.get_id()).await?.is_some() {
self.update(pool).await
} else {
self.create(pool).await
}
}
}
/// Delete the entity from the database if it exists. /// Delete the entity from the database if it exists.
/// ///