use futures::StreamExt; use sea_orm::{ ColumnTrait, DatabaseConnection, DbErr, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter, }; use thiserror::Error; use crate::{ amf::service::{ information_req::{AMFRequest, AMFRequestType}, pdf::AMFPdfError, }, model::{in_process_transaction, transaction}, repo::in_process_transaction::{InProcessTransactionError, NewInProcessTransaction}, }; pub struct GetAMFTransactions { max_req_size: u32, } #[derive(Debug, Error)] pub enum GetAMFTransactionsError { #[error("The transaction {0} contains no pdf document")] NoDocument(String), #[error("Database error: {0}")] Database(DbErr), #[error("Error extracting information from pdf at {0} : {1}")] InformationExtraction(String, AMFPdfError), } impl Default for GetAMFTransactions { fn default() -> Self { GetAMFTransactions { max_req_size: 100 } } } impl GetAMFTransactions { pub fn new(max_req_size: u32) -> Self { GetAMFTransactions { max_req_size } } /// This task will extract the information from all AMF transactions until one is already found /// in the database. Another function should be used to download individual records. pub async fn run(&self, db: &DatabaseConnection) -> Result<(), GetAMFTransactionsError> { info!("Starting AMF transaction download task"); let mut from = 0; let req = AMFRequest::new(AMFRequestType::DD, 0, 10); match req.get_list().await { Ok(_) => (), Err(e) => warn!("The AMF transaction public api is not available: {}", e), }; let mut req = AMFRequest::new(AMFRequestType::DD, from, self.max_req_size); let mut tr_to_process = Vec::new(); 'outer: while let Ok(resp) = req.get_list().await { info!( "Downloading hit list from {} to {}", from, self.max_req_size + from ); let list = resp.get_hits(); for hit in list.iter() { let number = &hit.source.numero; if transaction::Entity::find() .filter(transaction::Column::ForeignId.eq(number.to_owned())) .one(db) .await .map_err(GetAMFTransactionsError::Database)? .is_some() { // We've saved this transaction before, so we stop here. break 'outer; } if in_process_transaction::Entity::find() .filter(in_process_transaction::Column::ForeignId.eq(number.to_owned())) .one(db) .await .map_err(GetAMFTransactionsError::Database)? .is_some() { // We've registered this transaction before, so we stop here. break 'outer; } tr_to_process.push( NewInProcessTransaction::new( &hit.get_foreign_id(), &hit.get_documents()[0].path, false, None, ) .into_active_model(), ); } from += self.max_req_size; req = AMFRequest::new(AMFRequestType::DD, from, self.max_req_size); } if !tr_to_process.is_empty() { in_process_transaction::Entity::insert_many(tr_to_process) .exec(db) .await .map_err(GetAMFTransactionsError::Database)?; } let mut to_run = in_process_transaction::Entity::find() .filter(in_process_transaction::Column::Failed.eq(0)) .paginate(db, 100); let n_not_failed = to_run .num_items() .await .map_err(GetAMFTransactionsError::Database)?; if n_not_failed == 0 { info!("No new transactions to process since last run"); return Ok(()); } info!("{} transactions will be processed", n_not_failed); while let Some(page) = to_run .fetch_and_next() .await .map_err(GetAMFTransactionsError::Database)? { let mut futures = Vec::new(); for tr in page.iter() { futures.push(async move { tr.clone().process(db).await }); } let stream = futures::stream::iter(futures).buffer_unordered(100); let _results: Vec> = stream.collect().await; } info!("AMF transactions download task finished execution"); Ok(()) } }