feat: implement efficient upsert operation for create_or_update
Some checks failed
CI / tests (push) Failing after 33s
Some checks failed
CI / tests (push) Failing after 33s
Replace the existing two-query create_or_update implementation with a single atomic PostgreSQL upsert using ON CONFLICT clause to eliminate race conditions and improve performance. Race condition fix: The previous implementation had a critical race condition where multiple concurrent requests could: 1. Both call find() and get None (record doesn't exist) 2. Both call create() and the second one fails with duplicate key error 3. Or between find() and create(), another transaction inserts the record This created unreliable behavior in high-concurrency scenarios. Changes: - Add generate_upsert_query function in trait_implementation.rs - Generate SQL with INSERT ... ON CONFLICT ... DO UPDATE SET pattern - Remove default trait implementation that used separate find/create/update calls - Update derive_trait to include upsert query generation - Convert create_or_update from default implementation to required trait method The new implementation eliminates race conditions while reducing database round trips from 2-3 queries down to 1, significantly improving both reliability and performance.
This commit is contained in:
parent
fb20c7a0aa
commit
9d5139d03d
@ -506,7 +506,6 @@ Georm is designed for zero runtime overhead:
|
|||||||
|
|
||||||
### High Priority
|
### High Priority
|
||||||
- **Transaction Support**: Comprehensive transaction handling with atomic operations
|
- **Transaction Support**: Comprehensive transaction handling with atomic operations
|
||||||
- **Race Condition Fix**: Database-native UPSERT operations to replace current `create_or_update`
|
|
||||||
|
|
||||||
### Medium Priority
|
### Medium Priority
|
||||||
- **Multi-Database Support**: MySQL and SQLite support with feature flags
|
- **Multi-Database Support**: MySQL and SQLite support with feature flags
|
||||||
|
@ -97,6 +97,47 @@ fn generate_delete_query(table: &str, id: &GeormField) -> proc_macro2::TokenStre
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn generate_upsert_query(
|
||||||
|
table: &str,
|
||||||
|
fields: &[GeormField],
|
||||||
|
id: &GeormField,
|
||||||
|
) -> proc_macro2::TokenStream {
|
||||||
|
let inputs: Vec<String> = (1..=fields.len()).map(|num| format!("${num}")).collect();
|
||||||
|
let columns = fields
|
||||||
|
.iter()
|
||||||
|
.map(|f| f.ident.to_string())
|
||||||
|
.collect::<Vec<String>>()
|
||||||
|
.join(", ");
|
||||||
|
|
||||||
|
// For ON CONFLICT DO UPDATE, exclude the ID field from updates
|
||||||
|
let update_assignments = fields
|
||||||
|
.iter()
|
||||||
|
.filter(|f| !f.id)
|
||||||
|
.map(|f| format!("{} = EXCLUDED.{}", f.ident, f.ident))
|
||||||
|
.collect::<Vec<String>>()
|
||||||
|
.join(", ");
|
||||||
|
|
||||||
|
let upsert_string = format!(
|
||||||
|
"INSERT INTO {table} ({columns}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {update_assignments} RETURNING *",
|
||||||
|
inputs.join(", "),
|
||||||
|
id.ident
|
||||||
|
);
|
||||||
|
|
||||||
|
let field_idents: Vec<syn::Ident> = fields.iter().map(|f| f.ident.clone()).collect();
|
||||||
|
|
||||||
|
quote! {
|
||||||
|
async fn create_or_update(&self, pool: &::sqlx::PgPool) -> ::sqlx::Result<Self> {
|
||||||
|
::sqlx::query_as!(
|
||||||
|
Self,
|
||||||
|
#upsert_string,
|
||||||
|
#(self.#field_idents),*
|
||||||
|
)
|
||||||
|
.fetch_one(pool)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn generate_get_id(id: &GeormField) -> proc_macro2::TokenStream {
|
fn generate_get_id(id: &GeormField) -> proc_macro2::TokenStream {
|
||||||
let ident = &id.ident;
|
let ident = &id.ident;
|
||||||
let ty = &id.ty;
|
let ty = &id.ty;
|
||||||
@ -125,6 +166,7 @@ pub fn derive_trait(
|
|||||||
let find_query = generate_find_query(table, id);
|
let find_query = generate_find_query(table, id);
|
||||||
let create_query = generate_create_query(table, fields);
|
let create_query = generate_create_query(table, fields);
|
||||||
let update_query = generate_update_query(table, fields, id);
|
let update_query = generate_update_query(table, fields, id);
|
||||||
|
let upsert_query = generate_upsert_query(table, fields, id);
|
||||||
let delete_query = generate_delete_query(table, id);
|
let delete_query = generate_delete_query(table, id);
|
||||||
quote! {
|
quote! {
|
||||||
impl #impl_generics Georm<#ty> for #ident #type_generics #where_clause {
|
impl #impl_generics Georm<#ty> for #ident #type_generics #where_clause {
|
||||||
@ -133,6 +175,7 @@ pub fn derive_trait(
|
|||||||
#find_query
|
#find_query
|
||||||
#create_query
|
#create_query
|
||||||
#update_query
|
#update_query
|
||||||
|
#upsert_query
|
||||||
#delete_query
|
#delete_query
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -50,18 +50,9 @@ pub trait Georm<Id> {
|
|||||||
fn create_or_update(
|
fn create_or_update(
|
||||||
&self,
|
&self,
|
||||||
pool: &sqlx::PgPool,
|
pool: &sqlx::PgPool,
|
||||||
) -> impl ::std::future::Future<Output = sqlx::Result<Self>>
|
) -> impl std::future::Future<Output = sqlx::Result<Self>> + Send
|
||||||
where
|
where
|
||||||
Self: Sized,
|
Self: Sized;
|
||||||
{
|
|
||||||
async {
|
|
||||||
if Self::find(pool, self.get_id()).await?.is_some() {
|
|
||||||
self.update(pool).await
|
|
||||||
} else {
|
|
||||||
self.create(pool).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Delete the entity from the database if it exists.
|
/// Delete the entity from the database if it exists.
|
||||||
///
|
///
|
||||||
|
Loading…
x
Reference in New Issue
Block a user