Merge pull request 'Switch to axum' (#30) from axum into master

Reviewed-on: #30
pull/33/head
alban 3 years ago
commit b65cde32e4

1414
Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -8,7 +8,8 @@ db_entities:
sea-orm-cli generate entity --database-url $(DATABASE_URL) -o src/model --with-serde both sea-orm-cli generate entity --database-url $(DATABASE_URL) -o src/model --with-serde both
server-dev: server-dev:
cargo run --bin server cd server && \
cargo run
tailwind: tailwind:
cd client && \ cd client && \

@ -14,7 +14,9 @@ dotenvy = { workspace = true }
envy = { workspace = true } envy = { workspace = true }
tokio = { version = "^1.20.1", features = ["full"] } tokio = { version = "^1.20.1", features = ["full"] }
reqwest = { version = "0.11", features = ["json", "rustls-tls"] } reqwest = { version = "0.11", features = ["json", "rustls-tls"] }
rocket = { version = "0.5.0-rc.2", features = ["json"] } axum = "0.6.12"
hyper = { version = "0.14.25", features = ["full"] }
tower = "0.4"
sea-orm = { version = "0.11.0", features = [ sea-orm = { version = "0.11.0", features = [
"runtime-tokio-rustls", "runtime-tokio-rustls",
"macros", "macros",
@ -26,8 +28,9 @@ lazy_static = "1.4.0"
pretty_env_logger = "0.4.0" pretty_env_logger = "0.4.0"
log = "0.4.17" log = "0.4.17"
futures = "0.3.25" futures = "0.3.25"
rocket_contrib = "0.4.11"
async-trait = "0.1.61" async-trait = "0.1.61"
sea-orm-rocket = "0.5.2"
thiserror = "1.0.38" thiserror = "1.0.38"
slug = "0.1.4" slug = "0.1.4"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tower-http = { version = "0.4", features = ["trace", "cors"] }
tracing = "0.1"

@ -1,25 +0,0 @@
use rocket::fairing::{Fairing, Info, Kind};
use rocket::http::Header;
use rocket::{Request, Response};
pub struct Cors;
#[rocket::async_trait]
impl Fairing for Cors {
fn info(&self) -> Info {
Info {
name: "Add CORS headers to responses",
kind: Kind::Response,
}
}
async fn on_response<'r>(&self, _request: &'r Request<'_>, response: &mut Response<'r>) {
response.set_header(Header::new("Access-Control-Allow-Origin", "*"));
response.set_header(Header::new(
"Access-Control-Allow-Methods",
"POST, GET, PATCH, OPTIONS",
));
response.set_header(Header::new("Access-Control-Allow-Headers", "*"));
response.set_header(Header::new("Access-Control-Allow-Credentials", "true"));
}
}

@ -1,47 +1,24 @@
use std::time::Duration; use std::time::Duration;
use crate::CONFIG;
use async_trait::async_trait;
use sea_orm::{ConnectOptions, DatabaseConnection}; use sea_orm::{ConnectOptions, DatabaseConnection};
use sea_orm_rocket::rocket;
use sea_orm_rocket::{rocket::figment::Figment, Database};
use crate::CONFIG;
pub mod paginate; pub mod paginate;
pub mod slug; pub mod slug;
#[derive(Database, Debug)] pub async fn init() -> Result<DatabaseConnection, sea_orm::DbErr> {
#[database("fast_insiders")] let config = &CONFIG;
pub struct Db(SeaOrmPool); let mut options: ConnectOptions = ConnectOptions::new(config.database_url.to_owned());
options
#[derive(Debug, Clone)] .max_connections(config.max_connections)
pub struct SeaOrmPool { .min_connections(config.min_connections)
pub conn: DatabaseConnection, .connect_timeout(Duration::from_secs(config.connect_timeout))
} .acquire_timeout(Duration::from_secs(config.acquire_timeout))
.idle_timeout(Duration::from_secs(config.idle_timeout))
#[async_trait] .max_lifetime(Duration::from_secs(config.max_lifetime))
impl sea_orm_rocket::Pool for SeaOrmPool { .sqlx_logging(config.sqlx_logging);
type Error = sea_orm::DbErr;
let conn = sea_orm::Database::connect(options).await?;
type Connection = DatabaseConnection;
Ok(conn)
async fn init(_figment: &Figment) -> Result<Self, Self::Error> {
let config = &CONFIG;
let mut options: ConnectOptions = ConnectOptions::new(config.database_url.to_owned());
options
.max_connections(config.max_connections)
.min_connections(config.min_connections)
.connect_timeout(Duration::from_secs(config.connect_timeout))
.acquire_timeout(Duration::from_secs(config.acquire_timeout))
.idle_timeout(Duration::from_secs(config.idle_timeout))
.max_lifetime(Duration::from_secs(config.max_lifetime))
.sqlx_logging(config.sqlx_logging);
let conn = sea_orm::Database::connect(options).await?;
Ok(SeaOrmPool { conn })
}
fn borrow(&self) -> &Self::Connection {
&self.conn
}
} }

@ -1,11 +1,8 @@
use sea_orm::{ use sea_orm::{
error::DbErr, prelude::*, sea_query::SimpleExpr, FromQueryResult, Order, QueryOrder, error::DbErr, prelude::*, sea_query::SimpleExpr, FromQueryResult, Order, QueryOrder,
}; };
use sea_orm_rocket::Connection;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use super::Db;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PaginatedResponse<M> { pub struct PaginatedResponse<M> {
pub count: u64, pub count: u64,
@ -14,7 +11,7 @@ pub struct PaginatedResponse<M> {
} }
pub async fn paginate<E, M, C>( pub async fn paginate<E, M, C>(
conn: Connection<'_, Db>, db: &DatabaseConnection,
page: Option<u64>, page: Option<u64>,
size: Option<u64>, size: Option<u64>,
column: Option<C>, column: Option<C>,
@ -25,7 +22,6 @@ where
M: Send + Sync + FromQueryResult, M: Send + Sync + FromQueryResult,
C: ColumnTrait, C: ColumnTrait,
{ {
let db = conn.into_inner();
let s = size.unwrap_or(20).min(50); let s = size.unwrap_or(20).min(50);
let selector; let selector;
if let (Some(col), Some(ord)) = (column, order) { if let (Some(col), Some(ord)) = (column, order) {
@ -53,7 +49,7 @@ where
} }
pub async fn paginate_also_related<E, R, T, K, C>( pub async fn paginate_also_related<E, R, T, K, C>(
conn: Connection<'_, Db>, db: &DatabaseConnection,
page: Option<u64>, page: Option<u64>,
size: Option<u64>, size: Option<u64>,
column: Option<C>, column: Option<C>,
@ -67,7 +63,6 @@ where
K: Sync + Send + FromQueryResult, K: Sync + Send + FromQueryResult,
C: ColumnTrait, C: ColumnTrait,
{ {
let db = conn.into_inner();
let s = size.unwrap_or(20).min(50); let s = size.unwrap_or(20).min(50);
let mut selector; let mut selector;

@ -0,0 +1,49 @@
use axum::{
http::StatusCode,
response::{IntoResponse, Response},
Json,
};
use sea_orm::DbErr;
use serde_json::json;
use crate::repo::in_process_transaction::InProcessTransactionError;
pub enum AppError {
DbErr(DbErr),
InProcessTransaction(InProcessTransactionError),
NotFound(String),
}
impl From<DbErr> for AppError {
fn from(inner: DbErr) -> Self {
AppError::DbErr(inner)
}
}
impl From<InProcessTransactionError> for AppError {
fn from(inner: InProcessTransactionError) -> Self {
AppError::InProcessTransaction(inner)
}
}
impl IntoResponse for AppError {
fn into_response(self) -> Response {
let (status, error_message) = match self {
AppError::DbErr(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("The database retruned an error: {}", e),
),
AppError::InProcessTransaction(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Error in the in process transaction repo: {}", e),
),
AppError::NotFound(e) => (StatusCode::NOT_FOUND, format!("Not found error: {}", e)),
};
let body = Json(json!({
"error": error_message,
}));
(status, body).into_response()
}
}

@ -1,85 +0,0 @@
#![feature(proc_macro_hygiene, decl_macro)]
// Macros
#[macro_use]
extern crate rocket;
#[macro_use]
extern crate lazy_static;
extern crate pretty_env_logger;
#[macro_use]
extern crate log;
// External crates
use rocket::{Build, Rocket};
use sea_orm_rocket::rocket::fairing::{self, AdHoc};
use sea_orm_rocket::Database;
// Local crates
use migration::MigratorTrait;
mod amf;
mod cors;
mod db;
mod env;
mod logger;
mod model;
mod repo;
mod route;
mod task;
use crate::task::run_tasks;
// Module imports
use crate::db::Db;
use env::Config;
lazy_static! {
/// Contains variables defind in .env file
static ref CONFIG: Config = Config::new();
}
async fn run_migrations(rocket: Rocket<Build>) -> fairing::Result {
let conn = &Db::fetch(&rocket).unwrap().conn;
let _ = migration::Migrator::up(conn, None).await;
Ok(rocket)
}
async fn start_rocket() -> Result<(), sea_orm_rocket::rocket::Error> {
rocket::build()
.attach(Db::init())
.attach(AdHoc::try_on_ignite("Migrations", run_migrations))
.attach(crate::cors::Cors)
.mount(
"/v1",
routes![
route::company::get_all,
route::company::get_by_isin,
route::transaction::get_transactions,
route::transaction::get_aggregated_transactions,
route::transaction::get_latest_transactions,
route::in_process_transaction::get_all,
route::in_process_transaction::retry_failed_transaction,
route::in_process_transaction::retry_all
],
)
.launch()
.await
.map(|_| ())
}
#[rocket::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
logger::init_log()?;
// Run tasks
tokio::task::spawn(async { run_tasks().await });
let result = start_rocket().await;
info!("Rocket: deorbit.");
if let Some(err) = result.err() {
println!("Error: {}", err);
}
Ok(())
}

@ -0,0 +1,169 @@
#![feature(proc_macro_hygiene, decl_macro)]
// Macros
#[macro_use]
extern crate lazy_static;
extern crate pretty_env_logger;
#[macro_use]
extern crate log;
// External crates
use axum::{
extract::MatchedPath,
http::{HeaderValue, Method, Request, StatusCode},
response::Response,
routing::get,
Router,
};
use sea_orm::DatabaseConnection;
use std::{net::SocketAddr, time::Duration};
use tokio::signal;
use tower_http::{classify::ServerErrorsFailureClass, cors::CorsLayer, trace::TraceLayer};
use tracing::{info, info_span, Span};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
// Local crates
use migration::MigratorTrait;
use route::{company, in_process_transaction, transaction};
mod amf;
mod db;
mod env;
mod error;
mod logger;
mod model;
mod repo;
mod route;
mod task;
use crate::task::run_tasks;
// Module imports
use env::Config;
lazy_static! {
/// Contains variables defined in .env file
static ref CONFIG: Config = Config::new();
}
async fn fallback() -> (StatusCode, &'static str) {
(StatusCode::NOT_FOUND, "Not Found")
}
#[derive(Clone)]
pub struct AppState {
pub db: DatabaseConnection,
}
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info,tower_http=info".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let shared_state = AppState {
db: db::init().await?,
};
let _ = migration::Migrator::up(&shared_state.db, None).await;
let app = Router::new()
.route("/company", get(company::get_all))
.route("/company/:name", get(company::get_by_name))
.route("/transaction", get(transaction::get_all))
.route(
"/transaction/latest",
get(transaction::get_latest_transactions),
)
.route(
"/transaction/aggregated",
get(transaction::get_aggregated_transactions),
)
.route(
"/in_process_transaction",
get(in_process_transaction::get_all),
)
.route(
"/in_process_transaction/:foreign_id/retry",
get(in_process_transaction::retry_failed_transaction),
)
.route(
"/in_process_transaction/retry_all",
get(in_process_transaction::retry_all),
)
.with_state(shared_state.clone())
.fallback(fallback)
.layer(
TraceLayer::new_for_http()
.make_span_with(|request: &Request<_>| {
let matched_path = request
.extensions()
.get::<MatchedPath>()
.map(MatchedPath::as_str);
info_span!(
"http_request",
method = ?request.method(),
full_path = ?request.uri(),
matched_path,
)
})
.on_request(|_request: &Request<_>, _span: &Span| {
info!("New request");
})
.on_response(|response: &Response, latency: Duration, _span: &Span| {
info!(
"Response, status {}, time {}ms",
response.status(),
latency.as_millis()
);
})
.on_failure(
|error: ServerErrorsFailureClass, latency: Duration, _span: &Span| {
error!("There was an error answering this request, the server nonetheless answered in {}ms, error: {}", latency.as_millis(), error);
},
),
)
.layer(
CorsLayer::new()
.allow_origin("*".parse::<HeaderValue>().unwrap())
.allow_methods([Method::GET])
);
// Run tasks
tokio::task::spawn(async move { run_tasks(&shared_state.db).await });
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
axum::Server::bind(&addr)
.serve(app.into_make_service())
.with_graceful_shutdown(shutdown_signal())
.await?;
Ok(())
}
async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
info!("Starting graceful shutdown");
}

@ -1,50 +1,41 @@
use rocket::{http::Status, response::status::Custom}; use axum::extract::{Path, Query, State};
use axum::Json;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect}; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
use sea_orm_rocket::rocket::serde::json::Json;
use sea_orm_rocket::Connection;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::db::paginate::{paginate, PaginatedResponse}; use crate::db::paginate::{paginate, PaginatedResponse};
use crate::db::Db; use crate::error::AppError;
use crate::model::{self, company}; use crate::model::{self, company};
use crate::route::Pagination;
use crate::AppState;
use super::Limit;
#[get("/company?<page>&<size>")]
pub async fn get_all( pub async fn get_all(
conn: Connection<'_, Db>, Query(pagination): Query<Pagination>,
page: Option<u64>, State(state): State<AppState>,
size: Option<u64>, ) -> Result<Json<PaginatedResponse<company::Model>>, AppError> {
) -> Result<Json<PaginatedResponse<company::Model>>, Custom<String>> { let page = pagination.page;
let size = pagination.size;
let db = &state.db;
let res = let res =
paginate::<company::Entity, company::Model, company::Column>(conn, page, size, None, None) paginate::<company::Entity, company::Model, company::Column>(db, page, size, None, None)
.await .await?;
.map_err(|e| {
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?;
Ok(Json(res)) Ok(Json(res))
} }
#[get("/company/<name>?<limit>")] pub async fn get_by_name(
pub async fn get_by_isin( Query(Limit { limit }): Query<Limit>,
conn: Connection<'_, Db>, State(state): State<AppState>,
name: String, Path(name): Path<String>,
limit: Option<u64>, ) -> Result<Json<Vec<company::Model>>, AppError> {
) -> Result<Json<Vec<company::Model>>, Custom<String>> { let conn = state.db;
let db = conn.into_inner();
let res = company::Entity::find() let res = company::Entity::find()
.filter(company::Column::Name.contains(&name)) .filter(company::Column::Name.contains(&name))
.limit(limit.unwrap_or(10)) .limit(limit.unwrap_or(10).min(10))
.all(db) .all(&conn)
.await .await?;
.map_err(|e| {
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?;
Ok(Json(res)) Ok(Json(res))
} }

@ -1,120 +1,76 @@
use rocket::http::Status; use axum::extract::{Path, Query, State};
use rocket::response::status::Custom; use axum::Json;
use sea_orm::{ColumnTrait, EntityTrait, Order, QueryFilter, TransactionTrait}; use sea_orm::{ColumnTrait, EntityTrait, Order, QueryFilter, TransactionTrait};
use sea_orm_rocket::rocket::serde::json::Json;
use sea_orm_rocket::Connection;
use crate::db::paginate::{paginate, PaginatedResponse}; use crate::db::paginate::{paginate, PaginatedResponse};
use crate::db::Db; use crate::error::AppError;
use crate::model::transaction; use crate::model::transaction;
use crate::model::{company, in_process_transaction}; use crate::model::{company, in_process_transaction};
use crate::AppState;
use super::Pagination;
#[get("/in_process_transaction?<page>&<size>")]
pub async fn get_all( pub async fn get_all(
conn: Connection<'_, Db>, state: State<AppState>,
page: Option<u64>, Query(pagination): Query<Pagination>,
size: Option<u64>, ) -> Result<Json<PaginatedResponse<in_process_transaction::Model>>, AppError> {
) -> Result<Json<PaginatedResponse<in_process_transaction::Model>>, Custom<String>> { let db = &state.db;
let page = pagination.page;
let size = pagination.size;
let res = paginate::< let res = paginate::<
in_process_transaction::Entity, in_process_transaction::Entity,
in_process_transaction::Model, in_process_transaction::Model,
in_process_transaction::Column, in_process_transaction::Column,
>( >(
conn, db,
page, page,
size, size,
Some(in_process_transaction::Column::CreatedAt), Some(in_process_transaction::Column::CreatedAt),
Some(Order::Asc), Some(Order::Asc),
) )
.await .await?;
.map_err(|e| {
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?;
Ok(Json(res)) Ok(Json(res))
} }
#[get("/in_process_transaction/<foreign_id>/retry")]
pub async fn retry_failed_transaction( pub async fn retry_failed_transaction(
conn: Connection<'_, Db>, state: State<AppState>,
foreign_id: String, Path(foreign_id): Path<String>,
) -> Result<Json<(transaction::Model, Option<company::Model>)>, Custom<String>> { ) -> Result<Json<(transaction::Model, Option<company::Model>)>, AppError> {
let db = conn.into_inner(); let db = &state.db;
let txn = db.begin().await.map_err(|e| { let txn = db.begin().await?;
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?;
let mut tr = in_process_transaction::Entity::find() let mut tr = in_process_transaction::Entity::find()
.filter(in_process_transaction::Column::Failed.eq(1)) .filter(in_process_transaction::Column::Failed.eq(1))
.filter(in_process_transaction::Column::ForeignId.eq(foreign_id.to_owned())) .filter(in_process_transaction::Column::ForeignId.eq(foreign_id.to_owned()))
.one(&txn) .one(&txn)
.await .await?
.map_err(|e| {
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?
.ok_or_else(|| { .ok_or_else(|| {
Custom( AppError::NotFound(format!("Failed transaction {} doesn't exist", foreign_id))
Status::NotFound,
format!("Failed transaction {} doesn't exist", foreign_id),
)
})?; })?;
tr.process(&txn).await.map_err(|e| { tr.process(&txn).await?;
Custom(
Status::InternalServerError,
format!("Error retrying failed transaction {}", e),
)
})?;
let res = transaction::Entity::find() let res = transaction::Entity::find()
.filter(transaction::Column::ForeignId.eq(foreign_id.to_owned())) .filter(transaction::Column::ForeignId.eq(foreign_id.to_owned()))
.find_also_related(company::Entity) .find_also_related(company::Entity)
.one(&txn) .one(&txn)
.await .await?
.map_err(|e| {
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?
.ok_or_else(|| { .ok_or_else(|| {
Custom( AppError::NotFound("Failed to fetch just created transaction".to_string())
Status::NotFound,
"Failed to fetch just created transaction".to_string(),
)
})?; })?;
txn.commit().await.map_err(|e| { txn.commit().await?;
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?;
Ok(Json(res)) Ok(Json(res))
} }
#[get("/in_process_transaction/retry_all")] pub async fn retry_all(state: State<AppState>) -> Result<(), AppError> {
pub async fn retry_all(conn: Connection<'_, Db>) -> Result<(), Custom<String>> { let db = &state.db;
let db = conn.into_inner();
let list = in_process_transaction::Entity::find() let list = in_process_transaction::Entity::find()
.all(db) .all(db)
.await .await
.map_err(|e| { .map_err(|e| AppError::NotFound(format!("Database error: {}", e)))?;
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?;
let mut res_list = vec![]; let mut res_list = vec![];
for mut tr in list { for mut tr in list {

@ -1,3 +1,43 @@
use std::{fmt, str::FromStr};
use serde::{de, Deserialize, Deserializer};
pub mod company; pub mod company;
pub mod in_process_transaction; pub mod in_process_transaction;
pub mod transaction; pub mod transaction;
/// Struct to deserialize paginated routes query parameters
#[derive(Deserialize)]
pub struct Pagination {
#[serde(default, deserialize_with = "empty_string_as_none")]
pub page: Option<u64>,
#[serde(default, deserialize_with = "empty_string_as_none")]
pub size: Option<u64>,
}
/// Struct to deserialize a company slug as a query parameters
#[derive(Deserialize)]
pub struct CompanySlug {
#[serde(default, deserialize_with = "empty_string_as_none")]
pub company_slug: Option<String>,
}
/// Struct to deserialize a limit as a query parameters
#[derive(Deserialize)]
pub struct Limit {
#[serde(default, deserialize_with = "empty_string_as_none")]
pub limit: Option<u64>,
}
fn empty_string_as_none<'de, D, T>(de: D) -> Result<Option<T>, D::Error>
where
D: Deserializer<'de>,
T: FromStr,
T::Err: fmt::Display,
{
let opt = Option::<String>::deserialize(de)?;
match opt.as_deref() {
None | Some("") => Ok(None),
Some(s) => FromStr::from_str(s).map_err(de::Error::custom).map(Some),
}
}

@ -1,14 +1,16 @@
use chrono::{Duration, NaiveDate, NaiveDateTime, Utc}; use axum::extract::{Json, Path, Query, State};
use rocket::{http::Status, response::status::Custom}; use chrono::{NaiveDate, NaiveDateTime};
use sea_orm::{ use sea_orm::{
prelude::*, DbBackend, FromQueryResult, ItemsAndPagesNumber, JoinType, Order, QueryOrder, prelude::*, DbBackend, FromQueryResult, ItemsAndPagesNumber, JoinType, Order, QueryOrder,
QuerySelect, Statement, QuerySelect, Statement,
}; };
use sea_orm_rocket::{rocket::serde::json::Json, Connection};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::db::paginate::{paginate_also_related, PaginatedResponse}; use crate::db::paginate::{paginate_also_related, PaginatedResponse};
use crate::{db::Db, model}; use crate::error::AppError;
use crate::{model, AppState};
use super::{CompanySlug, Pagination};
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct TransactionCompany { pub struct TransactionCompany {
@ -27,27 +29,17 @@ pub struct TransactionCompany {
pub company: Option<model::company::Model>, pub company: Option<model::company::Model>,
} }
#[get("/transaction?<company_slug>&<page>&<size>&<hours>")] pub async fn get_all(
pub async fn get_transactions( state: State<AppState>,
conn: Connection<'_, Db>, Query(CompanySlug { company_slug }): Query<CompanySlug>,
company_slug: Option<String>, Query(Pagination { page, size }): Query<Pagination>,
hours: Option<i64>, ) -> Result<Json<PaginatedResponse<TransactionCompany>>, AppError> {
page: Option<u64>, let db = &state.db;
size: Option<u64>,
) -> Result<Json<PaginatedResponse<TransactionCompany>>, Custom<String>> {
let mut filters = vec![]; let mut filters = vec![];
if let Some(c) = company_slug { if let Some(c) = company_slug {
filters.push(model::company::Column::Slug.eq(c)) filters.push(model::company::Column::Slug.eq(c))
} }
if let Some(h) = hours {
filters.push(
model::transaction::Column::CreatedAtUtc.gte(
Utc::now()
.naive_utc()
.checked_sub_signed(Duration::hours(h)),
),
)
}
let res = paginate_also_related::< let res = paginate_also_related::<
model::transaction::Entity, model::transaction::Entity,
@ -56,20 +48,14 @@ pub async fn get_transactions(
model::company::Model, model::company::Model,
model::transaction::Column, model::transaction::Column,
>( >(
conn, db,
page, page,
size, size,
Some(model::transaction::Column::DatePublished), Some(model::transaction::Column::DatePublished),
Some(Order::Desc), Some(Order::Desc),
Some(filters), Some(filters),
) )
.await .await?;
.map_err(|e| {
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?;
let list = res let list = res
.list .list
@ -108,14 +94,12 @@ pub struct LatestTransaction {
total: f32, total: f32,
} }
#[get("/transaction/latest?<page>&<size>")]
pub async fn get_latest_transactions( pub async fn get_latest_transactions(
conn: Connection<'_, Db>, state: State<AppState>,
page: Option<u64>, Query(pagination): Query<Pagination>,
size: Option<u64>, ) -> Result<Json<PaginatedResponse<LatestTransaction>>, AppError> {
) -> Result<Json<PaginatedResponse<LatestTransaction>>, Custom<String>> { let db = &state.db;
let db = conn.into_inner(); let s = pagination.size.unwrap_or(20).min(50);
let s = size.unwrap_or(20).min(50);
let query_raw = "SELECT let query_raw = "SELECT
company.name as company_name, company.name as company_name,
@ -141,21 +125,11 @@ pub async fn get_latest_transactions(
let ItemsAndPagesNumber { let ItemsAndPagesNumber {
number_of_items: count, number_of_items: count,
number_of_pages: num_pages, number_of_pages: num_pages,
} = pages.num_items_and_pages().await.map_err(|e| { } = pages.num_items_and_pages().await?;
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?;
let p = page.unwrap_or(0).min(num_pages); let p = pagination.page.unwrap_or(0).min(num_pages);
let list = pages.fetch_page(p).await.map_err(|e| { let list = pages.fetch_page(p).await?;
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?;
let res = PaginatedResponse { let res = PaginatedResponse {
count, count,
@ -166,16 +140,13 @@ pub async fn get_latest_transactions(
Ok(Json(res)) Ok(Json(res))
} }
#[get("/transaction/aggregated?<page>&<size>&<hours>")]
pub async fn get_aggregated_transactions( pub async fn get_aggregated_transactions(
conn: Connection<'_, Db>, state: State<AppState>,
hours: Option<i64>, pagination: Query<Pagination>,
page: Option<u64>, ) -> Result<Json<PaginatedResponse<TransactionsAggregated>>, AppError> {
size: Option<u64>, let db = &state.db;
) -> Result<Json<PaginatedResponse<TransactionsAggregated>>, Custom<String>> { let s = pagination.0.size.unwrap_or(20).min(50);
let db = conn.into_inner(); let query = model::company::Entity::find()
let s = size.unwrap_or(20).min(50);
let mut query = model::company::Entity::find()
.select_only() .select_only()
.join( .join(
JoinType::InnerJoin, JoinType::InnerJoin,
@ -186,16 +157,6 @@ pub async fn get_aggregated_transactions(
.column(model::company::Column::Slug) .column(model::company::Column::Slug)
.column_as(model::transaction::Column::Id.count(), "transaction_count"); .column_as(model::transaction::Column::Id.count(), "transaction_count");
if let Some(h) = hours {
query = query.filter(
model::transaction::Column::CreatedAtUtc.gte(
Utc::now()
.naive_utc()
.checked_sub_signed(Duration::hours(h)),
),
)
}
let pages = query let pages = query
.group_by(model::company::Column::Name) .group_by(model::company::Column::Name)
.order_by(model::transaction::Column::Id.count(), Order::Desc) .order_by(model::transaction::Column::Id.count(), Order::Desc)
@ -205,21 +166,11 @@ pub async fn get_aggregated_transactions(
let ItemsAndPagesNumber { let ItemsAndPagesNumber {
number_of_items: count, number_of_items: count,
number_of_pages: num_pages, number_of_pages: num_pages,
} = pages.num_items_and_pages().await.map_err(|e| { } = pages.num_items_and_pages().await?;
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?;
let p = page.unwrap_or(0).min(num_pages); let p = pagination.0.page.unwrap_or(0).min(num_pages);
let list = pages.fetch_page(p).await.map_err(|e| { let list = pages.fetch_page(p).await?;
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?;
let res = PaginatedResponse { let res = PaginatedResponse {
count, count,

@ -134,7 +134,7 @@ impl GetAMFTransactions {
for tr in page.iter() { for tr in page.iter() {
futures.push(async move { tr.clone().process(db).await }); futures.push(async move { tr.clone().process(db).await });
} }
let stream = futures::stream::iter(futures).buffer_unordered(10); let stream = futures::stream::iter(futures).buffer_unordered(100);
let _results: Vec<Result<(), InProcessTransactionError>> = stream.collect().await; let _results: Vec<Result<(), InProcessTransactionError>> = stream.collect().await;
} }

@ -1,6 +1,6 @@
use std::time::Duration; use std::time::Duration;
use sea_orm::ConnectOptions; use sea_orm::DatabaseConnection;
use crate::{task::get_amf_transactions::GetAMFTransactions, CONFIG}; use crate::{task::get_amf_transactions::GetAMFTransactions, CONFIG};
@ -14,20 +14,8 @@ trait TaskTrait {
fn run(&self) -> Result<(), Self::Err>; fn run(&self) -> Result<(), Self::Err>;
} }
pub async fn run_tasks() -> Result<(), sea_orm::DbErr> { pub async fn run_tasks(tasks_pool: &DatabaseConnection) -> Result<(), sea_orm::DbErr> {
let config = &CONFIG; let config = &CONFIG;
info!("Creating task db pool");
let mut options: ConnectOptions = ConnectOptions::new(config.database_url.to_owned());
options
.max_connections(config.max_connections)
.min_connections(config.min_connections)
.connect_timeout(Duration::from_secs(config.connect_timeout))
.acquire_timeout(Duration::from_secs(config.acquire_timeout))
.idle_timeout(Duration::from_secs(config.idle_timeout))
.max_lifetime(Duration::from_secs(config.max_lifetime))
.sqlx_logging(config.sqlx_logging);
let tasks_pool = sea_orm::Database::connect(options).await?;
let mut inter = tokio::time::interval(Duration::from_secs(config.get_amf_transaction_interval)); let mut inter = tokio::time::interval(Duration::from_secs(config.get_amf_transaction_interval));
loop { loop {

@ -1,4 +0,0 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
server::main()?;
Ok(())
}
Loading…
Cancel
Save