#![feature(proc_macro_hygiene, decl_macro)] // Macros #[macro_use] extern crate lazy_static; #[macro_use] extern crate log; // External crates use axum::{ extract::MatchedPath, http::{HeaderValue, Method, Request, StatusCode}, response::Response, routing::get, Router, }; use sea_orm::DatabaseConnection; use std::time::Duration; use tokio::signal; use tower_http::{classify::ServerErrorsFailureClass, cors::CorsLayer, trace::TraceLayer}; use tracing::{info, info_span, Span}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; // Local crates use migration::MigratorTrait; use route::{company, in_process_transaction, transaction}; mod amf; mod db; mod env; mod error; mod model; mod repo; mod route; mod task; use crate::task::run_tasks; // Module imports use env::Config; lazy_static! { /// Contains variables defined in .env file static ref CONFIG: Config = Config::new(); } async fn fallback() -> (StatusCode, &'static str) { (StatusCode::NOT_FOUND, "Not Found") } #[derive(Clone)] pub struct AppState { pub db: DatabaseConnection, } #[tokio::main] pub async fn main() -> Result<(), Box> { tracing_subscriber::registry() .with( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| "info,tower_http=info".into()), ) .with(tracing_subscriber::fmt::layer()) .init(); let shared_state = AppState { db: db::init().await?, }; let _ = migration::Migrator::up(&shared_state.db, None).await; let app = Router::new() .route("/company", get(company::get_all)) .route("/company/:name", get(company::get_by_name)) .route("/transaction", get(transaction::get_all)) .route( "/transaction/latest", get(transaction::get_latest_transactions), ) .route( "/transaction/aggregated", get(transaction::get_aggregated_transactions), ) .route( "/in_process_transaction", get(in_process_transaction::get_all), ) .route( "/in_process_transaction/:foreign_id/retry", get(in_process_transaction::retry_failed_transaction), ) .route( "/in_process_transaction/retry_all", get(in_process_transaction::retry_all), ) .with_state(shared_state.clone()) .fallback(fallback) .layer( TraceLayer::new_for_http() .make_span_with(|request: &Request<_>| { let matched_path = request .extensions() .get::() .map(MatchedPath::as_str); info_span!( "http_request", method = ?request.method(), full_path = ?request.uri(), matched_path, ) }) .on_request(|_request: &Request<_>, _span: &Span| { info!("New request"); }) .on_response(|response: &Response, latency: Duration, _span: &Span| { info!( "Response, status {}, time {}ms", response.status(), latency.as_millis() ); }) .on_failure( |error: ServerErrorsFailureClass, latency: Duration, _span: &Span| { error!("There was an error answering this request, the server nonetheless answered in {}ms, error: {}", latency.as_millis(), error); }, ), ) .layer( CorsLayer::new() .allow_origin("*".parse::().unwrap()) .allow_methods([Method::GET]) ); // Run tasks tokio::task::spawn(async move { run_tasks(&shared_state.db).await }); let addr = CONFIG.server_address; info!("Server will start listening on {}", addr); axum::Server::bind(&addr) .serve(app.into_make_service()) .with_graceful_shutdown(shutdown_signal()) .await?; Ok(()) } async fn shutdown_signal() { let ctrl_c = async { signal::ctrl_c() .await .expect("failed to install Ctrl+C handler"); }; #[cfg(unix)] let terminate = async { signal::unix::signal(signal::unix::SignalKind::terminate()) .expect("failed to install signal handler") .recv() .await; }; tokio::select! { _ = ctrl_c => {}, _ = terminate => {}, } info!("Starting graceful shutdown"); }