Combined repo initial commit

pull/7/head
Miroito 3 years ago
commit 57f7b218fb

6
.gitignore vendored

@ -0,0 +1,6 @@
/target
.env
./client/.perseus
./client/pkg

4725
Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -0,0 +1,22 @@
[package]
name = "fast-insiders"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
server = { version = "0.1.0", path = "./server" }
client = { version = "0.1.0", path = "./client" }
[workspace]
members = ["server", "client"]
[workspace.dependencies]
chrono = { version = "0.4.23", features = ["serde"] }
serde = { version = "1.0.152", features = ["derive"] }
dotenvy = "0.15.6"
envy = "0.4.2"
serde_json = "1.0.91"
# chrono = { workspace = true, features = ["serde"] }

2
client/.gitignore vendored

@ -0,0 +1,2 @@
.perseus/
pkg/

@ -0,0 +1,16 @@
[package]
name = "client"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
chrono = { workspace = true, features = ["serde"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
dotenvy = { workspace = true }
envy = { workspace = true }
perseus = { version = "0.3.6", features = ["hydrate"] }
sycamore = { version = "0.7.1", features = ["ssr", "serde", "futures"] }
reqwasm = "0.5.0"

@ -0,0 +1,9 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
</head>
<body></body>
</html>

@ -0,0 +1,2 @@
pub mod routes;
pub mod types;

@ -0,0 +1 @@
pub mod transaction;

@ -0,0 +1,19 @@
use crate::api::types::{paginated_response::PaginatedResponse, transaction::TransactionCompany};
pub async fn get_transactions(
page: i64,
size: i64,
) -> Result<PaginatedResponse<TransactionCompany>, ()> {
let res = reqwasm::http::Request::get(&format!(
"http://localhost:8000/v1/transaction?page={}&size={}",
page, size,
))
.send()
.await
.unwrap()
.json::<PaginatedResponse<TransactionCompany>>()
.await
.unwrap();
Ok(res)
}

@ -0,0 +1,8 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Company {
pub id: i32,
pub name: String,
pub slug: String,
}

@ -0,0 +1,3 @@
pub mod company;
pub mod paginated_response;
pub mod transaction;

@ -0,0 +1,84 @@
use serde::{Deserialize, Serialize};
use sycamore::{prelude::GenericNode, view};
use crate::components::base_table::TableContent;
use super::transaction::TransactionCompany;
pub trait IntoTableData<G>
where
G: GenericNode,
{
fn into_table_data(self) -> TableContent<G>;
}
#[derive(Clone, Serialize, Deserialize)]
pub struct PaginatedResponse<M> {
pub count: i64,
pub num_pages: i64,
pub list: Vec<M>,
}
impl<G> IntoTableData<G> for PaginatedResponse<TransactionCompany>
where
G: GenericNode,
{
fn into_table_data(self) -> TableContent<G> {
let headers_view = vec![
view! { "Company" },
view! { "Date published" },
view! { "Date executed" },
view! { "Person" },
view! { "Nature" },
view! { "ISIN" },
view! { "Instrument" },
view! { "Exchange" },
view! { "Volume" },
view! { "Unit price" },
view! { "Total" },
];
let data_view: Vec<Vec<view::View<G>>> = self
.list
.into_iter()
.map(|t| {
let mut res = vec![];
let t1 = t.clone();
res.push(view! {
a (href=format!("index/{}", t1.company.slug),
class="text-indigo-800 dark:text-indigo-300 hover:text-indigo-500 dark:hover:text-indigo-600 hover:underline",
) {
(t1.company.name.to_owned())
}
});
let t1 = t.clone();
res.push(view! { (t1.date_published.to_owned()) });
let t1 = t.clone();
res.push(view! { (t1.date_executed.to_owned()) });
let t1 = t.clone();
res.push(view! { (t1.person.to_owned()) });
let t1 = t.clone();
res.push(view! { (t1.nature.to_owned()) });
let t1 = t.clone();
res.push(view! { (t1.isin.to_owned().unwrap_or("-".to_string())) });
let t1 = t.clone();
res.push(view! { (t1.instrument.to_owned()) });
let t1 = t.clone();
res.push(view! { (t1.exchange.to_owned()) });
let t1 = t.clone();
res.push(view! { (t1.volume.to_owned()) });
let t1 = t.clone();
res.push(view! { (t1.unit_price.to_owned()) });
let t1 = t.clone();
res.push(view! { ((t1.volume as f32 * t1.unit_price).to_string()) });
res
})
.collect();
TableContent {
headers_view,
data_view,
}
}
}

@ -0,0 +1,36 @@
use chrono::NaiveDate;
use serde::{Deserialize, Serialize};
use super::company::Company;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Transaction {
pub id: i32,
pub company_id: i32,
pub foreign_id: String,
pub date_published: NaiveDate,
pub date_executed: NaiveDate,
pub person: String,
pub exchange: String,
pub nature: String,
pub isin: Option<String>,
pub instrument: String,
pub volume: i32,
pub unit_price: f32,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct TransactionCompany {
pub id: i32,
pub foreign_id: String,
pub date_published: NaiveDate,
pub date_executed: NaiveDate,
pub person: String,
pub exchange: String,
pub nature: String,
pub isin: Option<String>,
pub instrument: String,
pub volume: i32,
pub unit_price: f32,
pub company: Company,
}

@ -0,0 +1,94 @@
use sycamore::prelude::*;
#[derive(Debug, Clone)]
pub struct TableContent<G>
where
G: GenericNode,
{
pub headers_view: Vec<View<G>>,
pub data_view: Vec<Vec<View<G>>>,
}
#[derive(Debug, Clone)]
pub struct TableContentRx<G>
where
G: GenericNode,
{
pub headers_view: Signal<Vec<View<G>>>,
pub data_view: Signal<Vec<Vec<View<G>>>>,
}
#[derive(Debug, Clone)]
struct PEqView<T>(usize, T);
impl<T> PartialEq<PEqView<T>> for PEqView<T> {
fn eq(&self, other: &PEqView<T>) -> bool {
self.0 == other.0
}
}
impl<T> Eq for PEqView<T> {}
#[component(BaseTable<G>)]
pub fn the_header(
TableContentRx {
headers_view,
data_view,
}: TableContentRx<G>,
) -> View<G>
where
G: GenericNode,
{
let headers = create_memo(cloned!((headers_view) => move || {
(*headers_view.get()).clone().into_iter().enumerate().map(|(idx, v)| PEqView(idx,v)).collect::<Vec<PEqView<View<G>>>>()
}));
let data = Signal::new(vec![]);
let data2 = data.clone();
create_effect(cloned!((data_view, data2, data) => move || {
data.set(vec![]);
let v_table = data_view.get();
for (idx, row) in v_table.iter().enumerate() {
let views = PEqView(idx, View::new_fragment(
row.iter().map(|cell| {
view!{ th (class="m-2 p-2 border-slate-500 border-x border-dashed") { (cell) } }
} ).collect()
));
let mut d = (*data2.get()).clone();
d.push(views);
data.set(d);
}
}));
view! {
table (class="table-auto bg-slate-200 text-left dark:bg-slate-800 rounded-lg mx-auto my-2") {
thead {
tr (class="border-b-2 border-slate-500 text-center") {
Keyed(KeyedProps {
iterable: headers,
template: |v| {
view! {
th (class="m-2 p-2") { (v.1) }
}
},
key: |v| v.0,
})
}
}
tbody {
Keyed(KeyedProps {
iterable: data.handle(),
key: |x| x.0,
template: |t| {
view! {
tr (class="m-2 p-2 border-slate-500 border") {
(t.1)
}
}
},
})
}
}
}
}

@ -0,0 +1,19 @@
use sycamore::prelude::*;
#[component(Loading<G>)]
pub fn component() -> View<G> {
view! {
svg (version="1.1", id="loader-1", xmlns="http://www.w3.org/2000/svg", xlink="http://www.w3.org/1999/xlink", x="0px", y="0px",
width="40px", height="40px", viewBox="0 0 50 50", space="preserve") {
path (fill="#000", d="M25.251,6.461c-10.318,0-18.683,8.365-18.683,18.683h4.068c0-8.071,6.543-14.615,14.615-14.615V6.461z"){
animateTransform (attributeType="xml",
attributeName="transform",
type="rotate",
from="0 25 25",
to="360 25 25",
dur="0.6s",
repeatCount="indefinite") {}
}
}
}
}

@ -0,0 +1,4 @@
pub mod base_table;
pub mod loading;
pub mod paginated_data_table;
pub mod the_header;

@ -0,0 +1,126 @@
use std::{marker::PhantomData, rc::Rc};
use serde::Deserialize;
use sycamore::prelude::*;
use crate::{
api::types::paginated_response::{IntoTableData, PaginatedResponse},
components::{
base_table::{BaseTable, TableContentRx},
loading::Loading,
},
};
#[perseus::make_rx(PaginatedTableStateRx)]
pub struct PaginatedTableState<M>
where
M: 'static,
{
pub req: String,
pub ph_data: PhantomData<M>,
}
#[component(PaginatedTable<G>)]
pub fn component<M>(PaginatedTableStateRx { req, ph_data }: PaginatedTableStateRx<M>) -> View<G>
where
M: 'static,
PaginatedResponse<M>: IntoTableData<G> + Clone,
for<'de> M: Deserialize<'de>,
{
let paginated_data: Signal<Option<PaginatedResponse<M>>> = Signal::new(None);
let table_prop: TableContentRx<G> = TableContentRx {
headers_view: Signal::new(vec![]),
data_view: Signal::new(vec![vec![]]),
};
let table_prop2 = table_prop.clone();
let page: Signal<i64> = Signal::new(0);
let n_page: Signal<i64> = Signal::new(1);
let n_rows: Signal<i64> = Signal::new(0);
let page_up = cloned!((page, paginated_data, n_page) => move |_| {
n_page.set((*paginated_data.get()).as_ref().map_or(0, |t| t.num_pages));
if *page.get() + 1 < *n_page.get() {
page.set((*page.get()).min(*n_page.get() - 1) + 1)
}
});
let page_down = cloned!((page) => move |_| {
if *page.get() > 0 {
page.set((*page.get()-1).max(0));
}
});
let page_size_string = Signal::new("20".to_string());
let page_size_string2 = page_size_string.clone();
create_effect(
cloned!((page_size_string, paginated_data, page, req, n_page, n_rows) => move || {
let page = *page.get();
let page_size_s = page_size_string.get();
let page_size = page_size_s.parse().unwrap_or(20);
let url = (*req.get()).clone();
if G::IS_BROWSER {
perseus::spawn_local(
cloned!((table_prop2, page, paginated_data, n_page, n_rows) => async move {
let res = reqwasm::http::Request::get(&format!(
"{}?page={}&size={}",
url,
page, page_size,
))
.send()
.await
.unwrap()
.json::<PaginatedResponse<M>>()
.await
.unwrap();
paginated_data.set(Some(res.clone()));
n_rows.set(res.count);
let table_content = res.into_table_data();
table_prop2.data_view.set(table_content.data_view);
table_prop2.headers_view.set(table_content.headers_view);
n_page.set((*paginated_data.get()).as_ref().map_or(0, |t| t.num_pages));
}),
);
}
}),
);
view! {
(cloned!((n_rows, page_size_string, page_down, page_up, page, n_page, page_size_string2) =>
view! {
p (class="text-right") { (format!("{} transactions", n_rows.get())) }
div (class="flex flex-row justify-between") {
select (bind:value=page_size_string,
class="p-2 justify-end text-slate-700 dark:text-slate-100 bg-slate-100 dark:bg-slate-800 rounded-md",
id="size-select",
) {
option (value="20") { "20" }
option (value="10") { "10" }
option (value="30", selected=page_size_string2.get().eq(&Rc::new("30".to_string()))) { "30" }
option (value="40") { "40" }
option (value="50") { "50" }
}
div (class="flex flex-row p-2 bg-slate-200 dark:bg-slate-800 rounded-md") {
button (on:click=page_down,class="m-1 hover:font-bold") {
"<<"
}
div (class="m-1 align-middle text-center") {
(format!("{}/{}",(*page.get() + 1).to_string(), *n_page.get()) )
}
button (on:click=page_up, class="m-1 hover:font-bold") {
">>"
}
}
}
}))
(if paginated_data.get().is_some() {
view! {
BaseTable(table_prop.clone())
}
} else {
view! {
div (class="flex flex-row justify-center") {
Loading()
}
}
})
}
}

@ -0,0 +1,18 @@
use sycamore::prelude::*;
#[component(TheHeader<G>)]
pub fn the_header(_: ()) -> View<G> {
view! {
"Don't use until global state is recheable from a component"
// header (class="shadow-md h-10 sm:p-2 w-full mb-20 bg-gray-100 dark:bg-slate-500/30 backdrop-blur-lg") {
// nav () {
// div (class="fixed flex justify-between") {
// div (class="font-mono mr-10") { "Fast Insiders" }
// div (class="flex justify-end") {
// button (on:click=toggle_dark_mode, class="mx-1 p-1 bg-pink-200 dark:bg-pink-600 rounded-full") { "toggle dark mode" }
// }
// }
// }
// }
}
}

@ -0,0 +1,17 @@
use perseus::{ErrorPages, Html};
use sycamore::view;
pub fn get_error_pages<G: Html>() -> ErrorPages<G> {
let mut error_pages = ErrorPages::new(|url, status, err, _| {
view! {
p { (format!("An error with HTTP code {} occurred at '{}': '{}'.", status, url, err)) }
}
});
error_pages.add_page(404, |_, _, _, _| {
view! {
p { "Page not found." }
}
});
error_pages
}

@ -0,0 +1,15 @@
use perseus::{state::GlobalStateCreator, RenderFnResult};
pub fn get_global_state_creator() -> GlobalStateCreator {
GlobalStateCreator::new().build_state_fn(get_build_state)
}
#[perseus::make_rx(AppStateRx)]
pub struct AppState {
pub dark_mode: bool,
}
#[perseus::autoserde(global_build_state)]
pub async fn get_build_state() -> RenderFnResult<AppState> {
Ok(AppState { dark_mode: true })
}

@ -0,0 +1,15 @@
use perseus::{Html, PerseusApp};
mod api;
mod components;
pub mod error_pages;
pub mod global_state;
pub mod templates;
#[perseus::main]
pub fn main<G: Html>() -> PerseusApp<G> {
PerseusApp::new()
.template(crate::templates::index::get_template)
.global_state_creator(crate::global_state::get_global_state_creator())
.error_pages(crate::error_pages::get_error_pages)
}

@ -0,0 +1,95 @@
use std::marker::PhantomData;
use perseus::{Html, RenderFnResult, RenderFnResultWithCause, SsrNode, Template};
use sycamore::{
prelude::{view, View},
reactive::{cloned, Signal},
};
use crate::{
api::types::transaction::TransactionCompany,
components::paginated_data_table::{PaginatedTable, PaginatedTableStateRx},
global_state::AppStateRx,
};
#[perseus::make_rx(IndexPageStateRx)]
pub struct IndexPageState {
pub req: String,
}
#[perseus::template_rx]
pub fn index_page(IndexPageStateRx { req }: IndexPageStateRx, global_state: AppStateRx) -> View<G> {
let dark_mode = global_state.dark_mode;
let dark_mode_2 = dark_mode.clone();
let dark_mode_3 = dark_mode.clone();
let toggle_dark_mode = cloned!(() => move |_| dark_mode_2.set(!*dark_mode.get()));
let paginated_table_state: PaginatedTableStateRx<TransactionCompany> = PaginatedTableStateRx {
req,
ph_data: Signal::new(PhantomData),
};
view! {
main (class=if *dark_mode_3.get() { "dark" } else { "" }) {
link (rel="stylesheet", href = "/.perseus/static/tailwind.css") {}
div (class="bg-slate-200 dark:bg-slate-700 text-slate-700 dark:text-slate-100 font-sans") {
header (class="shadow-md h-12 p-2 align-middle w-full bg-gray-100 dark:bg-slate-500/30 backdrop-blur-lg") {
div (class="flex flex-row justify-between") {
div (class="mr-10 align-middle") {
a (href="/", class="hover:underline") {
"Fast Insiders"
}
}
div (class="align-middle") {
button (on:click=toggle_dark_mode, class="mx-1 py-1 px-2 bg-slate-200 dark:bg-slate-800 rounded-full")
{ "Toggle dark mode" }
}
}
}
div (class="flex flex-col items-center justify-center ") {
div (class="w-4/5 m-10 p-3 bg-slate-100 dark:bg-slate-600 rounded-lg items-center justify-center") {
a (class="hover:underline", href="/") {
h1 (
class="text-center text-lg"
) {
"Insider Transactions published by the AMF"
}
}
PaginatedTable(paginated_table_state)
}
}
}
}
}
}
pub fn get_template<G: Html>() -> Template<G> {
Template::new("index")
.build_paths_fn(get_build_paths)
.build_state_fn(get_build_state)
.template(index_page)
.incremental_generation()
.head(head)
}
#[perseus::head]
pub fn head(_props: IndexPageState) -> View<SsrNode> {
view! {
title { "Fast Insiders" }
}
}
#[perseus::autoserde(build_state)]
pub async fn get_build_state(
path: String,
_locale: String,
) -> RenderFnResultWithCause<IndexPageState> {
let company_slug: String = path.clone().drain("index".len()..).collect();
let req = format!("http://localhost:8000/v1/transaction{}", company_slug);
Ok(IndexPageState { req })
}
pub async fn get_build_paths() -> RenderFnResult<Vec<String>> {
Ok(vec!["".to_string()])
}

@ -0,0 +1 @@
pub mod index;

@ -0,0 +1,3 @@
@tailwind base;
@tailwind components;
@tailwind utilities;

@ -0,0 +1,798 @@
/*
! tailwindcss v3.2.4 | MIT License | https://tailwindcss.com
*/
/*
1. Prevent padding and border from affecting element width. (https://github.com/mozdevs/cssremedy/issues/4)
2. Allow adding a border to an element by just adding a border-width. (https://github.com/tailwindcss/tailwindcss/pull/116)
*/
*,
::before,
::after {
box-sizing: border-box;
/* 1 */
border-width: 0;
/* 2 */
border-style: solid;
/* 2 */
border-color: #e5e7eb;
/* 2 */
}
::before,
::after {
--tw-content: '';
}
/*
1. Use a consistent sensible line-height in all browsers.
2. Prevent adjustments of font size after orientation changes in iOS.
3. Use a more readable tab size.
4. Use the user's configured `sans` font-family by default.
5. Use the user's configured `sans` font-feature-settings by default.
*/
html {
line-height: 1.5;
/* 1 */
-webkit-text-size-adjust: 100%;
/* 2 */
-moz-tab-size: 4;
/* 3 */
-o-tab-size: 4;
tab-size: 4;
/* 3 */
font-family: ui-sans-serif, system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, "Noto Sans", sans-serif, "Apple Color Emoji", "Segoe UI Emoji", "Segoe UI Symbol", "Noto Color Emoji";
/* 4 */
font-feature-settings: normal;
/* 5 */
}
/*
1. Remove the margin in all browsers.
2. Inherit line-height from `html` so users can set them as a class directly on the `html` element.
*/
body {
margin: 0;
/* 1 */
line-height: inherit;
/* 2 */
}
/*
1. Add the correct height in Firefox.
2. Correct the inheritance of border color in Firefox. (https://bugzilla.mozilla.org/show_bug.cgi?id=190655)
3. Ensure horizontal rules are visible by default.
*/
hr {
height: 0;
/* 1 */
color: inherit;
/* 2 */
border-top-width: 1px;
/* 3 */
}
/*
Add the correct text decoration in Chrome, Edge, and Safari.
*/
abbr:where([title]) {
-webkit-text-decoration: underline dotted;
text-decoration: underline dotted;
}
/*
Remove the default font size and weight for headings.
*/
h1,
h2,
h3,
h4,
h5,
h6 {
font-size: inherit;
font-weight: inherit;
}
/*
Reset links to optimize for opt-in styling instead of opt-out.
*/
a {
color: inherit;
text-decoration: inherit;
}
/*
Add the correct font weight in Edge and Safari.
*/
b,
strong {
font-weight: bolder;
}
/*
1. Use the user's configured `mono` font family by default.
2. Correct the odd `em` font sizing in all browsers.
*/
code,
kbd,
samp,
pre {
font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace;
/* 1 */
font-size: 1em;
/* 2 */
}
/*
Add the correct font size in all browsers.
*/
small {
font-size: 80%;
}
/*
Prevent `sub` and `sup` elements from affecting the line height in all browsers.
*/
sub,
sup {
font-size: 75%;
line-height: 0;
position: relative;
vertical-align: baseline;
}
sub {
bottom: -0.25em;
}
sup {
top: -0.5em;
}
/*
1. Remove text indentation from table contents in Chrome and Safari. (https://bugs.chromium.org/p/chromium/issues/detail?id=999088, https://bugs.webkit.org/show_bug.cgi?id=201297)
2. Correct table border color inheritance in all Chrome and Safari. (https://bugs.chromium.org/p/chromium/issues/detail?id=935729, https://bugs.webkit.org/show_bug.cgi?id=195016)
3. Remove gaps between table borders by default.
*/
table {
text-indent: 0;
/* 1 */
border-color: inherit;
/* 2 */
border-collapse: collapse;
/* 3 */
}
/*
1. Change the font styles in all browsers.
2. Remove the margin in Firefox and Safari.
3. Remove default padding in all browsers.
*/
button,
input,
optgroup,
select,
textarea {
font-family: inherit;
/* 1 */
font-size: 100%;
/* 1 */
font-weight: inherit;
/* 1 */
line-height: inherit;
/* 1 */
color: inherit;
/* 1 */
margin: 0;
/* 2 */
padding: 0;
/* 3 */
}
/*
Remove the inheritance of text transform in Edge and Firefox.
*/
button,
select {
text-transform: none;
}
/*
1. Correct the inability to style clickable types in iOS and Safari.
2. Remove default button styles.
*/
button,
[type='button'],
[type='reset'],
[type='submit'] {
-webkit-appearance: button;
/* 1 */
background-color: transparent;
/* 2 */
background-image: none;
/* 2 */
}
/*
Use the modern Firefox focus style for all focusable elements.
*/
:-moz-focusring {
outline: auto;
}
/*
Remove the additional `:invalid` styles in Firefox. (https://github.com/mozilla/gecko-dev/blob/2f9eacd9d3d995c937b4251a5557d95d494c9be1/layout/style/res/forms.css#L728-L737)
*/
:-moz-ui-invalid {
box-shadow: none;
}
/*
Add the correct vertical alignment in Chrome and Firefox.
*/
progress {
vertical-align: baseline;
}
/*
Correct the cursor style of increment and decrement buttons in Safari.
*/
::-webkit-inner-spin-button,
::-webkit-outer-spin-button {
height: auto;
}
/*
1. Correct the odd appearance in Chrome and Safari.
2. Correct the outline style in Safari.
*/
[type='search'] {
-webkit-appearance: textfield;
/* 1 */
outline-offset: -2px;
/* 2 */
}
/*
Remove the inner padding in Chrome and Safari on macOS.
*/
::-webkit-search-decoration {
-webkit-appearance: none;
}
/*
1. Correct the inability to style clickable types in iOS and Safari.
2. Change font properties to `inherit` in Safari.
*/
::-webkit-file-upload-button {
-webkit-appearance: button;
/* 1 */
font: inherit;
/* 2 */
}
/*
Add the correct display in Chrome and Safari.
*/
summary {
display: list-item;
}
/*
Removes the default spacing and border for appropriate elements.
*/
blockquote,
dl,
dd,
h1,
h2,
h3,
h4,
h5,
h6,
hr,
figure,
p,
pre {
margin: 0;
}
fieldset {
margin: 0;
padding: 0;
}
legend {
padding: 0;
}
ol,
ul,
menu {
list-style: none;
margin: 0;
padding: 0;
}
/*
Prevent resizing textareas horizontally by default.
*/
textarea {
resize: vertical;
}
/*
1. Reset the default placeholder opacity in Firefox. (https://github.com/tailwindlabs/tailwindcss/issues/3300)
2. Set the default placeholder color to the user's configured gray 400 color.
*/
input::-moz-placeholder, textarea::-moz-placeholder {
opacity: 1;
/* 1 */
color: #9ca3af;
/* 2 */
}
input::placeholder,
textarea::placeholder {
opacity: 1;
/* 1 */
color: #9ca3af;
/* 2 */
}
/*
Set the default cursor for buttons.
*/
button,
[role="button"] {
cursor: pointer;
}
/*
Make sure disabled buttons don't get the pointer cursor.
*/
:disabled {
cursor: default;
}
/*
1. Make replaced elements `display: block` by default. (https://github.com/mozdevs/cssremedy/issues/14)
2. Add `vertical-align: middle` to align replaced elements more sensibly by default. (https://github.com/jensimmons/cssremedy/issues/14#issuecomment-634934210)
This can trigger a poorly considered lint error in some tools but is included by design.
*/
img,
svg,
video,
canvas,
audio,
iframe,
embed,
object {
display: block;
/* 1 */
vertical-align: middle;
/* 2 */
}
/*
Constrain images and videos to the parent width and preserve their intrinsic aspect ratio. (https://github.com/mozdevs/cssremedy/issues/14)
*/
img,
video {
max-width: 100%;
height: auto;
}
/* Make elements with the HTML hidden attribute stay hidden by default */
[hidden] {
display: none;
}
*, ::before, ::after {
--tw-border-spacing-x: 0;
--tw-border-spacing-y: 0;
--tw-translate-x: 0;
--tw-translate-y: 0;
--tw-rotate: 0;
--tw-skew-x: 0;
--tw-skew-y: 0;
--tw-scale-x: 1;
--tw-scale-y: 1;
--tw-pan-x: ;
--tw-pan-y: ;
--tw-pinch-zoom: ;
--tw-scroll-snap-strictness: proximity;
--tw-ordinal: ;
--tw-slashed-zero: ;
--tw-numeric-figure: ;
--tw-numeric-spacing: ;
--tw-numeric-fraction: ;
--tw-ring-inset: ;
--tw-ring-offset-width: 0px;
--tw-ring-offset-color: #fff;
--tw-ring-color: rgb(59 130 246 / 0.5);
--tw-ring-offset-shadow: 0 0 #0000;
--tw-ring-shadow: 0 0 #0000;
--tw-shadow: 0 0 #0000;
--tw-shadow-colored: 0 0 #0000;
--tw-blur: ;
--tw-brightness: ;
--tw-contrast: ;
--tw-grayscale: ;
--tw-hue-rotate: ;
--tw-invert: ;
--tw-saturate: ;
--tw-sepia: ;
--tw-drop-shadow: ;
--tw-backdrop-blur: ;
--tw-backdrop-brightness: ;
--tw-backdrop-contrast: ;
--tw-backdrop-grayscale: ;
--tw-backdrop-hue-rotate: ;
--tw-backdrop-invert: ;
--tw-backdrop-opacity: ;
--tw-backdrop-saturate: ;
--tw-backdrop-sepia: ;
}
::backdrop {
--tw-border-spacing-x: 0;
--tw-border-spacing-y: 0;
--tw-translate-x: 0;
--tw-translate-y: 0;
--tw-rotate: 0;
--tw-skew-x: 0;
--tw-skew-y: 0;
--tw-scale-x: 1;
--tw-scale-y: 1;
--tw-pan-x: ;
--tw-pan-y: ;
--tw-pinch-zoom: ;
--tw-scroll-snap-strictness: proximity;
--tw-ordinal: ;
--tw-slashed-zero: ;
--tw-numeric-figure: ;
--tw-numeric-spacing: ;
--tw-numeric-fraction: ;
--tw-ring-inset: ;
--tw-ring-offset-width: 0px;
--tw-ring-offset-color: #fff;
--tw-ring-color: rgb(59 130 246 / 0.5);
--tw-ring-offset-shadow: 0 0 #0000;
--tw-ring-shadow: 0 0 #0000;
--tw-shadow: 0 0 #0000;
--tw-shadow-colored: 0 0 #0000;
--tw-blur: ;
--tw-brightness: ;
--tw-contrast: ;
--tw-grayscale: ;
--tw-hue-rotate: ;
--tw-invert: ;
--tw-saturate: ;
--tw-sepia: ;
--tw-drop-shadow: ;
--tw-backdrop-blur: ;
--tw-backdrop-brightness: ;
--tw-backdrop-contrast: ;
--tw-backdrop-grayscale: ;
--tw-backdrop-hue-rotate: ;
--tw-backdrop-invert: ;
--tw-backdrop-opacity: ;
--tw-backdrop-saturate: ;
--tw-backdrop-sepia: ;
}
.static {
position: static;
}
.fixed {
position: fixed;
}
.m-2 {
margin: 0.5rem;
}
.m-1 {
margin: 0.25rem;
}
.m-10 {
margin: 2.5rem;
}
.mx-auto {
margin-left: auto;
margin-right: auto;
}
.my-2 {
margin-top: 0.5rem;
margin-bottom: 0.5rem;
}
.mx-1 {
margin-left: 0.25rem;
margin-right: 0.25rem;
}
.mb-20 {
margin-bottom: 5rem;
}
.mr-10 {
margin-right: 2.5rem;
}
.flex {
display: flex;
}
.table {
display: table;
}
.h-10 {
height: 2.5rem;
}
.h-12 {
height: 3rem;
}
.w-full {
width: 100%;
}
.w-4\/5 {
width: 80%;
}
.table-auto {
table-layout: auto;
}
.transform {
transform: translate(var(--tw-translate-x), var(--tw-translate-y)) rotate(var(--tw-rotate)) skewX(var(--tw-skew-x)) skewY(var(--tw-skew-y)) scaleX(var(--tw-scale-x)) scaleY(var(--tw-scale-y));
}
.flex-row {
flex-direction: row;
}
.flex-col {
flex-direction: column;
}
.items-center {
align-items: center;
}
.justify-end {
justify-content: flex-end;
}
.justify-center {
justify-content: center;
}
.justify-between {
justify-content: space-between;
}
.rounded-lg {
border-radius: 0.5rem;
}
.rounded-md {
border-radius: 0.375rem;
}
.rounded-full {
border-radius: 9999px;
}
.border {
border-width: 1px;
}
.border-x {
border-left-width: 1px;
border-right-width: 1px;
}
.border-b-2 {
border-bottom-width: 2px;
}
.border-dashed {
border-style: dashed;
}
.border-slate-500 {
--tw-border-opacity: 1;
border-color: rgb(100 116 139 / var(--tw-border-opacity));
}
.bg-slate-200 {
--tw-bg-opacity: 1;
background-color: rgb(226 232 240 / var(--tw-bg-opacity));
}
.bg-slate-100 {
--tw-bg-opacity: 1;
background-color: rgb(241 245 249 / var(--tw-bg-opacity));
}
.bg-gray-100 {
--tw-bg-opacity: 1;
background-color: rgb(243 244 246 / var(--tw-bg-opacity));
}
.bg-pink-200 {
--tw-bg-opacity: 1;
background-color: rgb(251 207 232 / var(--tw-bg-opacity));
}
.p-2 {
padding: 0.5rem;
}
.p-1 {
padding: 0.25rem;
}
.p-3 {
padding: 0.75rem;
}
.py-1 {
padding-top: 0.25rem;
padding-bottom: 0.25rem;
}
.px-2 {
padding-left: 0.5rem;
padding-right: 0.5rem;
}
.text-left {
text-align: left;
}
.text-center {
text-align: center;
}
.text-right {
text-align: right;
}
.align-middle {
vertical-align: middle;
}
.font-mono {
font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace;
}
.font-sans {
font-family: ui-sans-serif, system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, "Noto Sans", sans-serif, "Apple Color Emoji", "Segoe UI Emoji", "Segoe UI Symbol", "Noto Color Emoji";
}
.text-lg {
font-size: 1.125rem;
line-height: 1.75rem;
}
.text-slate-700 {
--tw-text-opacity: 1;
color: rgb(51 65 85 / var(--tw-text-opacity));
}
.text-indigo-800 {
--tw-text-opacity: 1;
color: rgb(55 48 163 / var(--tw-text-opacity));
}
.shadow-md {
--tw-shadow: 0 4px 6px -1px rgb(0 0 0 / 0.1), 0 2px 4px -2px rgb(0 0 0 / 0.1);
--tw-shadow-colored: 0 4px 6px -1px var(--tw-shadow-color), 0 2px 4px -2px var(--tw-shadow-color);
box-shadow: var(--tw-ring-offset-shadow, 0 0 #0000), var(--tw-ring-shadow, 0 0 #0000), var(--tw-shadow);
}
.backdrop-blur-lg {
--tw-backdrop-blur: blur(16px);
-webkit-backdrop-filter: var(--tw-backdrop-blur) var(--tw-backdrop-brightness) var(--tw-backdrop-contrast) var(--tw-backdrop-grayscale) var(--tw-backdrop-hue-rotate) var(--tw-backdrop-invert) var(--tw-backdrop-opacity) var(--tw-backdrop-saturate) var(--tw-backdrop-sepia);
backdrop-filter: var(--tw-backdrop-blur) var(--tw-backdrop-brightness) var(--tw-backdrop-contrast) var(--tw-backdrop-grayscale) var(--tw-backdrop-hue-rotate) var(--tw-backdrop-invert) var(--tw-backdrop-opacity) var(--tw-backdrop-saturate) var(--tw-backdrop-sepia);
}
.hover\:font-bold:hover {
font-weight: 700;
}
.hover\:text-indigo-500:hover {
--tw-text-opacity: 1;
color: rgb(99 102 241 / var(--tw-text-opacity));
}
.hover\:underline:hover {
text-decoration-line: underline;
}
.dark .dark\:bg-slate-800 {
--tw-bg-opacity: 1;
background-color: rgb(30 41 59 / var(--tw-bg-opacity));
}
.dark .dark\:bg-slate-500\/30 {
background-color: rgb(100 116 139 / 0.3);
}
.dark .dark\:bg-pink-600 {
--tw-bg-opacity: 1;
background-color: rgb(219 39 119 / var(--tw-bg-opacity));
}
.dark .dark\:bg-slate-700 {
--tw-bg-opacity: 1;
background-color: rgb(51 65 85 / var(--tw-bg-opacity));
}
.dark .dark\:bg-slate-600 {
--tw-bg-opacity: 1;
background-color: rgb(71 85 105 / var(--tw-bg-opacity));
}
.dark .dark\:text-slate-100 {
--tw-text-opacity: 1;
color: rgb(241 245 249 / var(--tw-text-opacity));
}
.dark .dark\:text-indigo-300 {
--tw-text-opacity: 1;
color: rgb(165 180 252 / var(--tw-text-opacity));
}
.dark .dark\:hover\:text-indigo-600:hover {
--tw-text-opacity: 1;
color: rgb(79 70 229 / var(--tw-text-opacity));
}
@media (min-width: 640px) {
.sm\:p-2 {
padding: 0.5rem;
}
}

@ -0,0 +1,4 @@
module.exports = {
darkMode: 'class',
content: ["./src/**/*.rs", "./index.html"],
};

@ -0,0 +1,17 @@
include .env
# Migrations should be written first and the model files can be created using this command
db_entities:
cd server && \
sea-orm-cli generate entity --database-url $(DATABASE_URL) -o src/model --with-serde both
server-dev:
cargo run --bin server
tailwind:
cd client && \
tailwindcss -i static/style.css -o static/tailwind.css -w
serve:
cd client && \
perseus serve -w

@ -0,0 +1,35 @@
[package]
name = "server"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
migration = { version = "0.1.0", path = "./migration" }
chrono = { workspace = true, features = ["serde"] }
perseus = { version = "0.3.6", features = ["hydrate"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
dotenvy = { workspace = true }
envy = { workspace = true }
tokio = { version = "^1.20.1", features = ["full"] }
reqwest = { version = "0.11", features = ["json"] }
rocket = { version = "0.5.0-rc.2", features = ["json"] }
sea-orm = { version = "0.10.7", features = [
"runtime-tokio-rustls",
"macros",
"sqlx-mysql",
] }
lopdf = "0.29.0"
bytes = { version = "1.3.0", features = ["serde"] }
lazy_static = "1.4.0"
pretty_env_logger = "0.4.0"
log = "0.4.17"
futures = "0.3.25"
rocket_contrib = "0.4.11"
async-trait = "0.1.61"
sea-orm-rocket = "0.5.2"
thiserror = "1.0.38"
sea-query = "^0.27.1"
slug = "0.1.4"

@ -0,0 +1 @@
/target

File diff suppressed because it is too large Load Diff

@ -0,0 +1,19 @@
[package]
name = "migration"
version = "0.1.0"
edition = "2021"
publish = false
[lib]
name = "migration"
path = "src/lib.rs"
[dependencies]
async-std = { version = "^1", features = ["attributes", "tokio1"] }
[dependencies.sea-orm-migration]
version = "^0.10.0"
features = [
"sqlx-mysql",
"runtime-tokio-rustls",
]

@ -0,0 +1,41 @@
# Running Migrator CLI
- Generate a new migration file
```sh
cargo run -- migrate generate MIGRATION_NAME
```
- Apply all pending migrations
```sh
cargo run
```
```sh
cargo run -- up
```
- Apply first 10 pending migrations
```sh
cargo run -- up -n 10
```
- Rollback last applied migrations
```sh
cargo run -- down
```
- Rollback last 10 applied migrations
```sh
cargo run -- down -n 10
```
- Drop all tables from the database, then reapply all migrations
```sh
cargo run -- fresh
```
- Rollback all applied migrations, then reapply all migrations
```sh
cargo run -- refresh
```
- Rollback all applied migrations
```sh
cargo run -- reset
```
- Check the status of all migrations
```sh
cargo run -- status
```

@ -0,0 +1,18 @@
pub use sea_orm_migration::prelude::*;
mod m20230112_115856_create_company_table;
mod m20230112_160440_create_transaction_table;
mod m20230119_112539_create_transactions_in_process_table;
pub struct Migrator;
#[async_trait::async_trait]
impl MigratorTrait for Migrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![
Box::new(m20230112_115856_create_company_table::Migration),
Box::new(m20230112_160440_create_transaction_table::Migration),
Box::new(m20230119_112539_create_transactions_in_process_table::Migration),
]
}
}

@ -0,0 +1,52 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Company::Table)
.if_not_exists()
.col(
ColumnDef::new(Company::Id)
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(
ColumnDef::new(Company::Slug)
.string()
.unique_key()
.not_null(),
)
.col(
ColumnDef::new(Company::Name)
.string()
.unique_key()
.not_null(),
)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(Company::Table).to_owned())
.await
}
}
/// Learn more at https://docs.rs/sea-query#iden
#[derive(Iden)]
pub enum Company {
Table,
Id,
Slug,
Name,
}

@ -0,0 +1,78 @@
use crate::m20230112_115856_create_company_table as company;
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Transaction::Table)
.if_not_exists()
.col(
ColumnDef::new(Transaction::Id)
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(ColumnDef::new(Transaction::CompanyId).integer().not_null())
.col(
ColumnDef::new(Transaction::ForeignId)
.string()
.not_null()
.unique_key(),
)
.col(ColumnDef::new(Transaction::DatePublished).date().not_null())
.col(ColumnDef::new(Transaction::DateExecuted).date().not_null())
.col(ColumnDef::new(Transaction::Person).text().not_null())
.col(ColumnDef::new(Transaction::Exchange).string().not_null())
.col(ColumnDef::new(Transaction::Nature).string().not_null())
.col(ColumnDef::new(Transaction::Isin).string())
.col(ColumnDef::new(Transaction::Instrument).string().not_null())
.col(ColumnDef::new(Transaction::Volume).integer().not_null())
.col(ColumnDef::new(Transaction::UnitPrice).float().not_null())
.to_owned(),
)
.await?;
manager
.create_foreign_key(
ForeignKey::create()
.name("FK_transaction_to_company")
.from(Transaction::Table, Transaction::CompanyId)
.to(company::Company::Table, company::Company::Id)
.on_update(ForeignKeyAction::Cascade)
.on_delete(ForeignKeyAction::Cascade)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(Transaction::Table).to_owned())
.await
}
}
/// Learn more at https://docs.rs/sea-query#iden
#[derive(Iden)]
enum Transaction {
Table,
Id,
CompanyId,
ForeignId,
DatePublished,
DateExecuted,
Person,
Exchange,
Nature,
Isin,
Instrument,
Volume,
UnitPrice,
}

@ -0,0 +1,66 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(InProcessTransaction::Table)
.if_not_exists()
.col(
ColumnDef::new(InProcessTransaction::Id)
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(
ColumnDef::new(InProcessTransaction::CreatedAt)
.date_time()
.not_null(),
)
.col(
ColumnDef::new(InProcessTransaction::ForeignId)
.string()
.unique_key()
.not_null(),
)
.col(
ColumnDef::new(InProcessTransaction::DocPath)
.string()
.unique_key()
.not_null(),
)
.col(
ColumnDef::new(InProcessTransaction::Failed)
.boolean()
.not_null(),
)
.col(ColumnDef::new(InProcessTransaction::ErrorString).text())
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(InProcessTransaction::Table).to_owned())
.await
}
}
/// Learn more at https://docs.rs/sea-query#iden
#[derive(Iden)]
enum InProcessTransaction {
Table,
Id,
CreatedAt,
ForeignId,
DocPath,
Failed,
ErrorString,
}

@ -0,0 +1,6 @@
use sea_orm_migration::prelude::*;
#[async_std::main]
async fn main() {
cli::run_cli(migration::Migrator).await;
}

@ -0,0 +1,11 @@
use crate::amf::types::transaction_data::TransactionData;
pub mod service;
pub mod types;
#[async_trait::async_trait]
pub trait TransactionDataTrait {
type Err;
async fn get_transaction_data(self: &Self) -> Result<TransactionData, Self::Err>;
}

@ -0,0 +1,66 @@
use crate::{
amf::{
types::{
amf_response::{Document, Hit},
AMFResponse, TransactionData,
},
TransactionDataTrait,
},
task::get_amf_transactions::GetAMFTransactionsError,
};
use super::pdf::AMFPdf;
impl AMFResponse {
pub fn get_hits(&self) -> &Vec<Hit> {
&self.hits.hits
}
}
#[async_trait::async_trait]
impl TransactionDataTrait for Hit {
type Err = GetAMFTransactionsError;
async fn get_transaction_data(self: &Self) -> Result<TransactionData, Self::Err> {
let foreign_id = self.source.numero.to_owned();
let docs = self.get_documents();
if docs.len() > 1 {
warn!("Transaction number {} contains more than one document, only the first document will be parsed for information", foreign_id);
}
let doc_path = &docs
.first()
.ok_or(GetAMFTransactionsError::NoDocumentError(
foreign_id.to_owned(),
))?
.path;
let amf_pdf = AMFPdf::new(&doc_path);
let info = amf_pdf.extract_info().await.map_err(|e| {
GetAMFTransactionsError::InformationExtractionError(doc_path.to_string(), e)
})?;
Ok(TransactionData {
foreign_id,
company_name: info.company_name,
isin: info.isin,
person: info.person,
date_published: info.date_published,
date_executed: info.date_executed,
exchange: info.exchange,
nature: info.nature,
instrument: info.instrument,
volume: info.volume,
unit_price: info.unit_price,
})
}
}
impl Hit {
pub fn get_documents(&self) -> &Vec<Document> {
&self.source.documents
}
pub fn get_foreign_id(&self) -> String {
self.source.numero.to_owned()
}
}

@ -0,0 +1,60 @@
use std::fmt;
use thiserror::Error;
use crate::amf::types::AMFResponse;
use crate::CONFIG;
#[derive(Debug, Error)]
pub enum AMFRequestError {
#[error("Request error: {0}")]
RequestError(reqwest::Error),
#[error("Json conversion error: {0}")]
JsonConversionError(reqwest::Error),
#[error("Status code error: {0}")]
StatusCodeError(reqwest::Error),
}
pub struct AMFRequest {
url: String,
}
impl AMFRequest {
pub fn new(inf_type: AMFRequestType, from: u32, size: u32) -> Self {
let url = CONFIG
.amf_informations_req
.replace("{{inf_type}}", &inf_type.to_string())
.replace("{{from}}", &from.to_string())
.replace("{{size}}", &size.to_string());
AMFRequest { url }
}
pub async fn get_list(&self) -> Result<AMFResponse, AMFRequestError> {
Ok(reqwest::get(&self.url)
.await
.map_err(|e| AMFRequestError::RequestError(e))?
.error_for_status()
.map_err(|e| AMFRequestError::StatusCodeError(e))?
.json::<AMFResponse>()
.await
.map_err(|e| AMFRequestError::JsonConversionError(e))?)
}
}
pub enum AMFRequestType {
DD,
}
impl Default for AMFRequestType {
fn default() -> Self {
AMFRequestType::DD
}
}
impl fmt::Display for AMFRequestType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AMFRequestType::DD => write!(f, "DD"),
}
}
}

@ -0,0 +1,6 @@
pub mod amf_response;
pub mod information_req;
pub mod pdf;
#[cfg(test)]
mod test;

@ -0,0 +1,237 @@
use bytes::Bytes;
use chrono::NaiveDate;
use std::num::ParseFloatError;
use thiserror::Error;
use crate::amf::types::date::naive_date_from_str;
use crate::amf::types::date::DateParseError;
use crate::CONFIG;
#[derive(Debug, Clone)]
pub struct AMFPdf {
url: String,
}
#[derive(Debug, Error)]
pub enum PatternExtractionError {
#[error("Person not found")]
PersonNotFoundError,
#[error("Date executed not found")]
DateExecutedNotFoundError,
#[error("Date published not found")]
DatePublishedNotFoundError,
#[error("Exchange not found")]
ExchangeNotFoundError,
#[error("Nature not found")]
NatureNotFoundError,
#[error("Instrument not found")]
InstrumentNotFoundError,
#[error("Coordonnees section not found")]
CoordonneesNotFound,
#[error("Company name not found")]
CompanyNameNotFound,
#[error("Aggregated informations not found")]
AggregatedInformationNotFoundError,
#[error("Volume not found")]
VolumeNotFoundError,
#[error("Price not found")]
PriceNotFoundError,
}
#[derive(Debug, Error)]
pub enum AMFPdfError {
#[error("Download error: {0}")]
DownloadError(reqwest::Error),
#[error("Bytes conversion error: {0}")]
BytesConversionError(reqwest::Error),
#[error("Error loading pdf document: {0}")]
DocumentLoadError(lopdf::Error),
#[error("Error during lopdf text extraction: {0}")]
PdfTextExtractionError(lopdf::Error),
#[error("Error during extraction of information: {0}")]
PatternExtractionError(PatternExtractionError),
#[error("Error parsing date: {0}")]
DateParseError(DateParseError),
#[error("Error parsing unit price: {0}")]
PriceParseError(ParseFloatError),
#[error("Error parsing volume: {0}")]
VolumeParseError(ParseFloatError),
}
pub struct AMFPdfData {
pub company_name: String,
pub isin: Option<String>,
pub person: String,
pub date_published: NaiveDate,
pub date_executed: NaiveDate,
pub exchange: String,
pub nature: String,
pub instrument: String,
pub volume: f32,
pub unit_price: f32,
}
impl AMFPdf {
pub fn new(path: &String) -> AMFPdf {
let mut url = (&CONFIG.amf_documents_path).to_string();
url.push_str(&path);
AMFPdf { url }
}
async fn download(&self) -> Result<Bytes, AMFPdfError> {
Ok(reqwest::get(&self.url)
.await
.map_err(|e| AMFPdfError::DownloadError(e))?
.bytes()
.await
.map_err(|e| AMFPdfError::BytesConversionError(e))?)
}
async fn extract_text(&self) -> Result<String, AMFPdfError> {
let bfile = self.download().await?;
let pdf =
lopdf::Document::load_mem(&bfile).map_err(|e| AMFPdfError::DocumentLoadError(e))?;
let mut text = "".to_string();
for (idx, _) in pdf.page_iter().enumerate() {
text.push_str(
&pdf.extract_text(&[idx as u32 + 1])
.map_err(|e| AMFPdfError::PdfTextExtractionError(e))?,
);
}
Ok(text)
}
pub async fn extract_info(&self) -> Result<AMFPdfData, AMFPdfError> {
let mut text = self.extract_text().await?;
debug!("Extracted text from document:\n{}", text);
let isin = extract_pattern(
&text,
"CODE DIDENTIFICATION DE LINSTRUMENT FINANCIER : ",
"\n",
)
.map_or_else(
|| {
Some(
text.lines()
.skip_while(|l| l.is_empty())
.nth(1)
.unwrap()
.split(" ")
.next()
.unwrap()
.get(0..12)
.map(|t| t.to_string()),
)
},
|t| Some(Some(t.get(0..12).unwrap_or(&t).to_string())),
)
.unwrap_or(None);
let person = extract_pattern(
&text,
&"NOM /FONCTION DE LA PERSONNE EXERCANT DES RESPONSABILITES DIRIGEANTES OU DE LAPERSONNE ETROITEMENT LIEE :\n",
"\n",
)
.ok_or(AMFPdfError::PatternExtractionError(
PatternExtractionError::PersonNotFoundError,
))?;
let date_published_raw =
extract_pattern(&text, &"DATE DE RECEPTION DE LA NOTIFICATION : ", &"\n").ok_or(
AMFPdfError::PatternExtractionError(
PatternExtractionError::DatePublishedNotFoundError,
),
)?;
let date_published =
naive_date_from_str(&date_published_raw).map_err(|e| AMFPdfError::DateParseError(e))?;
let date_executed_raw = extract_pattern(&text, &"DATE DE LA TRANSACTION : ", &"\n").ok_or(
AMFPdfError::PatternExtractionError(PatternExtractionError::DateExecutedNotFoundError),
)?;
let date_executed =
naive_date_from_str(&date_executed_raw).map_err(|e| AMFPdfError::DateParseError(e))?;
let exchange = extract_pattern(&text, &"LIEU DE LA TRANSACTION : ", &"\n").ok_or(
AMFPdfError::PatternExtractionError(PatternExtractionError::ExchangeNotFoundError),
)?;
let nature = extract_pattern(&text, &"NATURE DE LA TRANSACTION : ", &"\n").ok_or(
AMFPdfError::PatternExtractionError(PatternExtractionError::NatureNotFoundError),
)?;
let instrument = extract_pattern(&text, &"DESCRIPTION DE LINSTRUMENT FINANCIER : ", &"\n")
.ok_or(AMFPdfError::PatternExtractionError(
PatternExtractionError::InstrumentNotFoundError,
))?;
let inf_coordonnees =
text.find("COORDONNEES DE LEMETTEUR")
.ok_or(AMFPdfError::PatternExtractionError(
PatternExtractionError::CoordonneesNotFound,
))?;
let mut text_cp = text.clone();
text_cp.drain(0..inf_coordonnees);
let company_name = extract_pattern(&text_cp, &"NOM : ", &"\n").ok_or(
AMFPdfError::PatternExtractionError(PatternExtractionError::CompanyNameNotFound),
)?;
let inf_aggregees_idx =
text.find("INFORMATIONS AGREGEES")
.ok_or(AMFPdfError::PatternExtractionError(
PatternExtractionError::AggregatedInformationNotFoundError,
))?;
text.drain(0..inf_aggregees_idx);
let volume = extract_pattern(&text, &"VOLUME : ", &"\n")
.ok_or(AMFPdfError::PatternExtractionError(
PatternExtractionError::VolumeNotFoundError,
))?
.replace(" ", "")
.parse::<f32>()
.map_err(|e| AMFPdfError::VolumeParseError(e))?;
let unit_price = extract_pattern(&text, &"PRIX : ".to_string(), &"\n".to_string())
.ok_or(AMFPdfError::PatternExtractionError(
PatternExtractionError::PriceNotFoundError,
))?
.replace(" ", "")
.chars()
.take_while(|c| c.is_digit(10) || c == &'.')
.collect::<String>()
.parse::<f32>()
.map_err(|e| AMFPdfError::PriceParseError(e))?;
Ok(AMFPdfData {
company_name,
isin,
person,
date_published,
date_executed,
exchange,
nature,
instrument,
volume,
unit_price,
})
}
}
fn extract_pattern(s: &String, p1: &str, p2: &str) -> Option<String> {
let idx1 = s.find(p1)?;
let idx2 = s
.get(idx1 + p1.len()..)
.unwrap()
.find(p2)
.unwrap_or(s.len())
+ idx1
+ p1.len();
Some(s.get(idx1 + p1.len()..idx2).unwrap().to_string())
}

@ -0,0 +1,18 @@
use crate::amf::service::information_req::{AMFRequest, AMFRequestType};
/// This function will make sure that only one document is given by the AMF API in the last 100
/// records. The program will need a change otherwise.
#[tokio::test]
async fn insure_single_document_per_hit() {
dotenvy::dotenv().expect("Failed to load .env file");
let req = AMFRequest::new(AMFRequestType::DD, 0, 100)
.get_list()
.await
.expect("AMF Endpoint should be available");
for hit in req.get_hits() {
let n_docs = hit.get_documents().len();
assert_eq!(n_docs, 1);
}
}

@ -0,0 +1,2 @@
mod amf_response;
mod pdf;

@ -0,0 +1,35 @@
use futures::StreamExt;
use crate::{
amf::service::{
information_req::{AMFRequest, AMFRequestType},
pdf::AMFPdf,
},
env, logger,
};
#[tokio::test]
async fn read_data_from_last_100_amf_pdf() {
env::load_env().expect("Failed to load .env");
logger::init_log().expect("Failed to init log");
let size = 100;
let req = AMFRequest::new(AMFRequestType::DD, 1, size)
.get_list()
.await
.expect("AMF Endpoint should be available");
let mut futures = Vec::new();
for hit in req.get_hits().iter() {
let doc_path = &hit.get_documents()[0].path;
futures.push(async move { AMFPdf::new(doc_path).clone().extract_info().await });
}
let stream = futures::stream::iter(futures).buffer_unordered(10);
let results = stream.collect::<Vec<_>>().await;
for res in results {
assert!(res.is_ok());
}
}

@ -0,0 +1,187 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct AMFResponse {
pub hits: Hits,
pub aggregations: Aggregations,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Hits {
pub total: Total,
pub hits: Vec<Hit>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Total {
pub value: i64,
pub relation: String,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Hit {
#[serde(rename = "_ignored")]
pub ignored: Option<Vec<String>>,
#[serde(rename = "_source")]
pub source: Source,
pub sort: Vec<i64>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Source {
pub date_creation: String,
#[serde(rename = "numeroSOIF")]
pub numero_soif: Value,
pub numero: String,
pub types_operation: Vec<Value>,
pub documents: Vec<Document>,
pub titre: Value,
pub date_publication: String,
pub role_regulateur: String,
pub marche: Value,
pub index_year: i64,
pub sous_type_document: Value,
pub date_action: Value,
pub id: i64,
pub instrument_financier: Value,
pub numero_concatene: String,
pub domaine: String,
pub types_document: Vec<String>,
pub types_information: Vec<String>,
pub date_mise_en_ligne: String,
pub date_information: String,
pub langue: String,
pub version: i64,
pub regulateur: String,
pub relations: Vec<Value>,
pub societes: Vec<Societe>,
pub annee_comptable: Value,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Document {
pub accessible: bool,
pub issuer_id: Option<String>,
pub path: String,
pub numero: Value,
pub signature: Option<String>,
pub format: Value,
pub details: Details,
pub doc_regulateur: bool,
pub nom_fichier: String,
pub date_reception: Value,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Details {
pub date: String,
#[serde(rename = "content_type")]
pub content_type: String,
pub language: String,
pub title: String,
#[serde(rename = "content_length")]
pub content_length: i64,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Societe {
pub role: String,
pub raison_sociale: String,
pub jeton: String,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Aggregations {
pub types_information: TypesInformation,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TypesInformation {
#[serde(rename = "doc_count_error_upper_bound")]
pub doc_count_error_upper_bound: i64,
#[serde(rename = "sum_other_doc_count")]
pub sum_other_doc_count: i64,
pub buckets: Vec<Bucket>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Bucket {
pub key: String,
#[serde(rename = "doc_count")]
pub doc_count: i64,
pub types_operation: TypesOperation,
pub types_document: TypesDocument,
pub instrument_financier: InstrumentFinancier,
pub marche: Marche,
pub annee_comptable: AnneeComptable,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TypesOperation {
#[serde(rename = "doc_count_error_upper_bound")]
pub doc_count_error_upper_bound: i64,
#[serde(rename = "sum_other_doc_count")]
pub sum_other_doc_count: i64,
pub buckets: Vec<Value>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TypesDocument {
#[serde(rename = "doc_count_error_upper_bound")]
pub doc_count_error_upper_bound: i64,
#[serde(rename = "sum_other_doc_count")]
pub sum_other_doc_count: i64,
pub buckets: Vec<Bucket2>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Bucket2 {
pub key: String,
#[serde(rename = "doc_count")]
pub doc_count: i64,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct InstrumentFinancier {
#[serde(rename = "doc_count_error_upper_bound")]
pub doc_count_error_upper_bound: i64,
#[serde(rename = "sum_other_doc_count")]
pub sum_other_doc_count: i64,
pub buckets: Vec<Value>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Marche {
#[serde(rename = "doc_count_error_upper_bound")]
pub doc_count_error_upper_bound: i64,
#[serde(rename = "sum_other_doc_count")]
pub sum_other_doc_count: i64,
pub buckets: Vec<Value>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct AnneeComptable {
#[serde(rename = "doc_count_error_upper_bound")]
pub doc_count_error_upper_bound: i64,
#[serde(rename = "sum_other_doc_count")]
pub sum_other_doc_count: i64,
pub buckets: Vec<Value>,
}

@ -0,0 +1,93 @@
use chrono::Month;
use chrono::NaiveDate;
use std::fmt;
use std::num::ParseIntError;
use std::str::FromStr;
struct FrenchMonth(Month);
#[derive(Debug)]
pub enum FrenchMonthParseError {
UnknownMonth(String),
}
impl fmt::Display for FrenchMonthParseError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for FrenchMonthParseError {}
impl FromStr for FrenchMonth {
type Err = FrenchMonthParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"janvier" => Ok(FrenchMonth(Month::January)),
"fevrier" => Ok(FrenchMonth(Month::February)),
"février" => Ok(FrenchMonth(Month::February)),
"mars" => Ok(FrenchMonth(Month::March)),
"avril" => Ok(FrenchMonth(Month::April)),
"mai" => Ok(FrenchMonth(Month::May)),
"juin" => Ok(FrenchMonth(Month::June)),
"juillet" => Ok(FrenchMonth(Month::July)),
"aout" => Ok(FrenchMonth(Month::August)),
"août" => Ok(FrenchMonth(Month::August)),
"septembre" => Ok(FrenchMonth(Month::September)),
"octobre" => Ok(FrenchMonth(Month::October)),
"novembre" => Ok(FrenchMonth(Month::November)),
"decembre" => Ok(FrenchMonth(Month::December)),
"décembre" => Ok(FrenchMonth(Month::December)),
s => Err(FrenchMonthParseError::UnknownMonth(s.to_string())),
}
}
}
#[derive(Debug)]
pub enum DateParseError {
NotEnoughData(String),
DayParseError(ParseIntError),
MonthParseError(FrenchMonthParseError),
YearParseError(ParseIntError),
InvalidDateError(String),
}
impl fmt::Display for DateParseError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for DateParseError {}
/// This function will return a chrono naive date from an input with the following format:
/// Day month and year are separated by 1 space and are written exactly in that order.
/// The day and years are represented by numbers while the month is represented by its french name.
pub fn naive_date_from_str(s: &str) -> Result<NaiveDate, DateParseError> {
let mut data = s.split(" ");
let day = data.next().ok_or(DateParseError::NotEnoughData(
"Given date string is empty".to_string(),
))?;
let month = data.next().ok_or(DateParseError::NotEnoughData(format!(
"Given date string: \"{s}\" does not contain a complete date"
)))?;
let year = data.next().ok_or(DateParseError::NotEnoughData(format!(
"Given date string: \"{s}\" does not contain a complete date"
)))?;
let date = NaiveDate::from_ymd_opt(
year.parse()
.map_err(|e| DateParseError::YearParseError(e))?,
month
.parse::<FrenchMonth>()
.map_err(|e| DateParseError::MonthParseError(e))?
.0 as u32
+ 1,
day.parse().map_err(|e| DateParseError::DayParseError(e))?,
)
.ok_or(DateParseError::InvalidDateError(format!(
"Invalid date resulted from string {}",
s
)))?;
Ok(date)
}

@ -0,0 +1,8 @@
pub mod amf_response;
pub use amf_response::AMFResponse;
pub mod date;
pub mod transaction_data;
pub use transaction_data::TransactionData;
#[cfg(test)]
mod test;

@ -0,0 +1,26 @@
use chrono::NaiveDate;
use super::date::*;
#[test]
fn test_naive_date_from_str() {
assert_eq!(
naive_date_from_str("13 Janvier 2023").unwrap(),
NaiveDate::from_ymd_opt(2023, 1, 13).unwrap()
);
assert_eq!(
naive_date_from_str("31 décembre 2023").unwrap(),
NaiveDate::from_ymd_opt(2023, 12, 31).unwrap()
);
assert!(naive_date_from_str("").is_err());
assert!(naive_date_from_str("29 février 2023").is_err());
assert!(naive_date_from_str("2023-01-12").is_err());
assert!(naive_date_from_str("12 fevrier").is_err());
assert!(naive_date_from_str("0.1 fevrier 2033").is_err());
assert_eq!(
naive_date_from_str("28 Fevrier 2023").unwrap(),
NaiveDate::from_ymd_opt(2023, 2, 28).unwrap()
);
}

@ -0,0 +1,16 @@
use chrono::NaiveDate;
#[derive(Debug, Clone)]
pub struct TransactionData {
pub foreign_id: String,
pub company_name: String,
pub isin: Option<String>,
pub person: String,
pub date_published: NaiveDate,
pub date_executed: NaiveDate,
pub exchange: String,
pub nature: String,
pub instrument: String,
pub volume: f32,
pub unit_price: f32,
}

@ -0,0 +1,25 @@
use rocket::fairing::{Fairing, Info, Kind};
use rocket::http::Header;
use rocket::{Request, Response};
pub struct CORS;
#[rocket::async_trait]
impl Fairing for CORS {
fn info(&self) -> Info {
Info {
name: "Add CORS headers to responses",
kind: Kind::Response,
}
}
async fn on_response<'r>(&self, _request: &'r Request<'_>, response: &mut Response<'r>) {
response.set_header(Header::new("Access-Control-Allow-Origin", "*"));
response.set_header(Header::new(
"Access-Control-Allow-Methods",
"POST, GET, PATCH, OPTIONS",
));
response.set_header(Header::new("Access-Control-Allow-Headers", "*"));
response.set_header(Header::new("Access-Control-Allow-Credentials", "true"));
}
}

@ -0,0 +1,47 @@
use std::time::Duration;
use crate::CONFIG;
use async_trait::async_trait;
use sea_orm::{ConnectOptions, DatabaseConnection};
use sea_orm_rocket::rocket;
use sea_orm_rocket::{rocket::figment::Figment, Database};
pub mod paginate;
pub mod slug;
#[derive(Database, Debug)]
#[database("fast_insiders")]
pub struct Db(SeaOrmPool);
#[derive(Debug, Clone)]
pub struct SeaOrmPool {
pub conn: DatabaseConnection,
}
#[async_trait]
impl sea_orm_rocket::Pool for SeaOrmPool {
type Error = sea_orm::DbErr;
type Connection = DatabaseConnection;
async fn init(_figment: &Figment) -> Result<Self, Self::Error> {
let config = &CONFIG;
let mut options: ConnectOptions = ConnectOptions::new(config.database_url.to_owned());
options
.max_connections(config.max_connections)
.min_connections(config.min_connections)
.connect_timeout(Duration::from_secs(config.connect_timeout))
.acquire_timeout(Duration::from_secs(config.acquire_timeout))
.idle_timeout(Duration::from_secs(config.idle_timeout))
.max_lifetime(Duration::from_secs(config.max_lifetime))
.sqlx_logging(config.sqlx_logging);
let conn = sea_orm::Database::connect(options).await?;
Ok(SeaOrmPool { conn })
}
fn borrow(&self) -> &Self::Connection {
&self.conn
}
}

@ -0,0 +1,102 @@
use sea_orm::{error::DbErr, FromQueryResult};
use sea_orm::{prelude::*, Order, QueryOrder};
use sea_orm_rocket::Connection;
use sea_query::expr::SimpleExpr;
use serde::{Deserialize, Serialize};
use super::Db;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PaginatedResponse<M> {
pub count: u64,
pub num_pages: u64,
pub list: Vec<M>,
}
pub async fn paginate<E, M, C>(
conn: Connection<'_, Db>,
page: Option<u64>,
size: Option<u64>,
column: Option<C>,
order: Option<Order>,
) -> Result<PaginatedResponse<M>, DbErr>
where
E: EntityTrait,
M: Send + Sync + FromQueryResult,
C: ColumnTrait,
{
let db = conn.into_inner();
let s = size.unwrap_or(20).min(50);
let selector;
if let (Some(col), Some(ord)) = (column, order) {
selector = E::find().order_by(col, ord);
} else {
selector = E::find();
}
let pages = selector.into_model().paginate(db, s);
let count = pages.num_items().await?;
let num_pages = pages.num_pages().await?;
let p = page.unwrap_or(0).min(num_pages);
let list = pages.fetch_page(p).await?;
let res = PaginatedResponse {
count,
num_pages,
list,
};
Ok(res)
}
pub async fn paginate_also_related<E, R, T, K, C>(
conn: Connection<'_, Db>,
page: Option<u64>,
size: Option<u64>,
column: Option<C>,
order: Option<Order>,
filter: Option<SimpleExpr>,
) -> Result<PaginatedResponse<(T, Option<K>)>, DbErr>
where
E: EntityTrait + Related<R>,
R: EntityTrait,
T: Sync + Send + FromQueryResult,
K: Sync + Send + FromQueryResult,
C: ColumnTrait,
{
let db = conn.into_inner();
let s = size.unwrap_or(20).min(50);
let mut selector;
if let (Some(col), Some(ord)) = (column, order) {
selector = E::find()
.find_also_related::<R>(R::default())
.order_by(col, ord)
} else {
selector = E::find().find_also_related::<R>(R::default());
}
if let Some(fil) = filter {
selector = selector.filter(fil);
}
let pages = selector.into_model().paginate(db, s);
let count = pages.num_items().await?;
let num_pages = pages.num_pages().await?;
let p = page.unwrap_or(0).min(num_pages);
let list = pages.fetch_page(p).await?;
let res = PaginatedResponse {
count,
num_pages,
list,
};
Ok(res)
}

@ -0,0 +1,31 @@
use sea_orm::{error::DbErr, ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter};
use slug::slugify;
/// This generic function returns a slug that is not already used in the database to insert safely
pub async fn ensure_unique_slug<E, C, DB>(s: &String, column: C, db: &DB) -> Result<String, DbErr>
where
E: EntityTrait,
C: ColumnTrait,
DB: ConnectionTrait,
{
let mut slug = slugify(s);
let mut count = 0;
// This is inefficient, we could search for all slugs that start with the new slug and take the
// last one + 1
while E::find()
.filter(column.eq(slug.clone()))
.one(db)
.await?
.is_some()
{
count += 1;
if count == 1 {
slug += "-1";
} else {
slug.drain(0..slug.len() - 1);
slug = format!("{}{}", slug, count);
}
}
Ok(slug)
}

@ -0,0 +1,85 @@
use std::path::PathBuf;
use serde::Deserialize;
#[derive(Deserialize, Debug)]
pub struct Config {
pub database_url: String,
#[serde(default = "max_connections_default")]
pub max_connections: u32,
#[serde(default = "min_connections_default")]
pub min_connections: u32,
#[serde(default = "connect_timeout_default")]
pub connect_timeout: u64,
#[serde(default = "acquire_timeout_default")]
pub acquire_timeout: u64,
#[serde(default = "idle_timeout_default")]
pub idle_timeout: u64,
#[serde(default = "max_lifetime_default")]
pub max_lifetime: u64,
#[serde(default = "sqlx_logging_default")]
pub sqlx_logging: bool,
#[serde(default = "amf_informations_req")]
pub amf_informations_req: String,
#[serde(default = "amf_documents_path")]
pub amf_documents_path: String,
#[serde(default = "get_amf_transaction_interval")]
pub get_amf_transaction_interval: u64,
}
fn max_connections_default() -> u32 {
100
}
fn min_connections_default() -> u32 {
5
}
fn connect_timeout_default() -> u64 {
8
}
fn acquire_timeout_default() -> u64 {
8
}
fn idle_timeout_default() -> u64 {
8
}
fn max_lifetime_default() -> u64 {
8
}
fn sqlx_logging_default() -> bool {
false
}
fn amf_documents_path() -> String {
"https://bdif.amf-france.org/back/api/v1/documents/".to_string()
}
fn amf_informations_req() -> String {
"https://bdif.amf-france.org/back/api/v1/informations?rechercheTexte=&typesInformation={{inf_type}}&from={{from}}&size={{size}}".to_string()
}
fn get_amf_transaction_interval() -> u64 {
3600
}
impl Config {
pub fn new() -> Self {
dotenvy::dotenv().expect("Failed to load .env file");
let mut config = envy::from_env::<Config>().expect("Failed to load env");
if config.amf_documents_path.chars().last().unwrap_or('/') != '/' {
config.amf_documents_path.push('/');
}
config
}
}
pub fn load_env() -> Result<PathBuf, dotenvy::Error> {
dotenvy::dotenv()
}

@ -0,0 +1,86 @@
#![feature(proc_macro_hygiene, decl_macro)]
// Macros
#[macro_use]
extern crate rocket;
#[macro_use]
extern crate lazy_static;
extern crate pretty_env_logger;
#[macro_use]
extern crate log;
// External crates
use rocket::{Build, Rocket};
use sea_orm_rocket::rocket::fairing::{self, AdHoc};
use sea_orm_rocket::Database;
// Local crates
use migration;
use migration::MigratorTrait;
mod amf;
mod cors;
mod db;
mod env;
mod logger;
mod model;
mod repo;
mod route;
mod task;
use crate::task::run_tasks;
// Module imports
use crate::db::Db;
use env::Config;
lazy_static! {
/// Contains variables defind in .env file
static ref CONFIG: Config = Config::new();
}
async fn run_migrations(rocket: Rocket<Build>) -> fairing::Result {
let conn = &Db::fetch(&rocket).unwrap().conn;
let _ = migration::Migrator::up(conn, None).await;
Ok(rocket)
}
async fn start_rocket() -> Result<(), sea_orm_rocket::rocket::Error> {
rocket::build()
.attach(Db::init())
.attach(AdHoc::try_on_ignite("Migrations", run_migrations))
.attach(crate::cors::CORS)
.mount(
"/v1",
routes![
route::company::get_all,
route::company::get_by_isin,
route::transaction::get_by_company_id,
route::transaction::get_all,
route::in_process_transaction::get_all,
route::in_process_transaction::retry_failed_transaction,
route::in_process_transaction::retry_all
],
)
.launch()
.await
.map(|_| ())
}
#[rocket::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
env::load_env()?;
logger::init_log()?;
// Run tasks
tokio::task::spawn(async { run_tasks().await });
let result = start_rocket().await;
info!("Rocket: deorbit.");
if let Some(err) = result.err() {
println!("Error: {}", err);
}
Ok(())
}

@ -0,0 +1,7 @@
extern crate pretty_env_logger;
use log::SetLoggerError;
/// For this function to do anything the RUST_LOG environment variable should be set.
pub fn init_log() -> Result<(), SetLoggerError> {
pretty_env_logger::try_init_timed()
}

@ -0,0 +1,29 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "company")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
#[sea_orm(unique)]
pub slug: String,
#[sea_orm(unique)]
pub name: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(has_many = "super::transaction::Entity")]
Transaction,
}
impl Related<super::transaction::Entity> for Entity {
fn to() -> RelationDef {
Relation::Transaction.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

@ -0,0 +1,24 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "in_process_transaction")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub created_at: DateTime,
#[sea_orm(unique)]
pub foreign_id: String,
#[sea_orm(unique)]
pub doc_path: String,
pub failed: i8,
#[sea_orm(column_type = "Text", nullable)]
pub error_string: Option<String>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

@ -0,0 +1,7 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6
pub mod prelude;
pub mod company;
pub mod in_process_transaction;
pub mod transaction;

@ -0,0 +1,5 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6
pub use super::company::Entity as Company;
pub use super::in_process_transaction::Entity as InProcessTransaction;
pub use super::transaction::Entity as Transaction;

@ -0,0 +1,44 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "transaction")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub company_id: i32,
#[sea_orm(unique)]
pub foreign_id: String,
pub date_published: Date,
pub date_executed: Date,
#[sea_orm(column_type = "Text")]
pub person: String,
pub exchange: String,
pub nature: String,
pub isin: Option<String>,
pub instrument: String,
pub volume: i32,
pub unit_price: f32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::company::Entity",
from = "Column::CompanyId",
to = "super::company::Column::Id",
on_update = "Cascade",
on_delete = "Cascade"
)]
Company,
}
impl Related<super::company::Entity> for Entity {
fn to() -> RelationDef {
Relation::Company.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

@ -0,0 +1,49 @@
use crate::db::slug::ensure_unique_slug;
use crate::model;
use sea_orm::error::DbErr;
use sea_orm::ConnectionTrait;
use sea_orm::{ActiveModelTrait, DeriveIntoActiveModel, IntoActiveModel};
use serde::{Deserialize, Serialize};
use thiserror::Error;
type ActiveModel = model::company::ActiveModel;
#[derive(Debug, PartialEq, Eq, Clone, DeriveIntoActiveModel, Serialize, Deserialize)]
pub struct NewCompany {
pub name: String,
}
impl NewCompany {
pub fn new(name: String) -> NewCompany {
NewCompany { name }
}
pub async fn create<C>(&self, db: &C) -> Result<model::company::Model, CompanyRepoError>
where
C: ConnectionTrait,
{
let slug = ensure_unique_slug::<model::company::Entity, model::company::Column, _>(
&self.name,
model::company::Column::Slug,
db,
)
.await
.map_err(|e| CompanyRepoError::CompanyCreationErr(e))?
.into();
let mut comp = self.clone().into_active_model();
comp.set(model::company::Column::Slug, slug);
let res = comp
.insert(db)
.await
.map_err(|e| CompanyRepoError::CompanyCreationErr(e))?;
Ok(res)
}
}
#[derive(Debug, Error)]
pub enum CompanyRepoError {
#[error("Error creating company record: {0}")]
CompanyCreationErr(DbErr),
}

@ -0,0 +1,148 @@
use crate::amf::service::pdf::{AMFPdf, AMFPdfError};
use crate::amf::types::TransactionData;
use crate::amf::TransactionDataTrait;
use crate::model;
use chrono::NaiveDateTime;
use sea_orm::error::DbErr;
use sea_orm::{
AccessMode, ActiveModelTrait, ConnectionTrait, DeriveIntoActiveModel, IntoActiveModel,
IsolationLevel, TransactionTrait,
};
use sea_orm::DatabaseConnection;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use super::transaction::{NewTransactionFromTransactionData, TransactionRepoError};
type ActiveModel = model::in_process_transaction::ActiveModel;
#[derive(Debug, PartialEq, Clone, DeriveIntoActiveModel, Serialize, Deserialize)]
pub struct NewInProcessTransaction {
pub created_at: NaiveDateTime,
pub foreign_id: String,
pub doc_path: String,
pub failed: i8,
pub error_string: Option<String>,
}
impl NewInProcessTransaction {
pub fn new(
foreign_id: &String,
doc_path: &String,
failed: bool,
error_string: Option<String>,
) -> Self {
let created_at = chrono::Utc::now().naive_utc();
NewInProcessTransaction {
created_at,
foreign_id: foreign_id.to_owned(),
doc_path: doc_path.to_owned(),
failed: failed.into(),
error_string,
}
}
pub async fn create(
&self,
db: &DatabaseConnection,
) -> Result<model::in_process_transaction::Model, InProcessTransactionError> {
let res = self
.clone()
.into_active_model()
.insert(db)
.await
.map_err(|e| InProcessTransactionError::InProcessTransactionCreationError(e))?;
Ok(res)
}
}
#[derive(Debug, Error)]
pub enum InProcessTransactionError {
#[error("Error creating in process transaction record: {0}")]
InProcessTransactionCreationError(DbErr),
#[error("Error creating transaction: {0}")]
TransactionCreateError(TransactionRepoError),
#[error("Database error: {0}")]
DatabaseError(DbErr),
#[error("Error during data extraction")]
DataExtractionError(AMFPdfError),
#[error("Error deleting in process transaction record")]
InProcessTransactionDeleteError(DbErr),
#[error("No document error for id {0}")]
NoDocumentError(String),
#[error("Error while extracting information from doc {0}: {1}")]
InformationExtractionError(String, AMFPdfError),
}
#[async_trait::async_trait]
impl TransactionDataTrait for model::in_process_transaction::Model {
type Err = InProcessTransactionError;
async fn get_transaction_data(self: &Self) -> Result<TransactionData, Self::Err> {
let amf_pdf = AMFPdf::new(&self.doc_path);
let info = amf_pdf.extract_info().await.map_err(|e| {
InProcessTransactionError::InformationExtractionError(self.doc_path.to_string(), e)
})?;
Ok(TransactionData {
foreign_id: self.foreign_id.to_owned(),
company_name: info.company_name,
isin: info.isin,
person: info.person,
date_published: info.date_published,
date_executed: info.date_executed,
exchange: info.exchange,
nature: info.nature,
instrument: info.instrument,
volume: info.volume,
unit_price: info.unit_price,
})
}
}
impl model::in_process_transaction::Model {
pub async fn process<C>(&mut self, db: &C) -> Result<(), InProcessTransactionError>
where
C: ConnectionTrait + TransactionTrait,
{
match self.get_transaction_data().await {
Ok(val) => {
let txn = db
.begin_with_config(
Some(IsolationLevel::Serializable),
Some(AccessMode::ReadWrite),
)
.await
.map_err(|e| InProcessTransactionError::DatabaseError(e))?;
NewTransactionFromTransactionData::new_from_transaction_data(val)
.create(&txn)
.await
.map_err(|e| InProcessTransactionError::TransactionCreateError(e))?;
self.clone()
.into_active_model()
.delete(&txn)
.await
.map_err(|e| InProcessTransactionError::DatabaseError(e))?;
txn.commit()
.await
.map_err(|e| InProcessTransactionError::DatabaseError(e))?;
}
Err(e) => {
self.error_string = Some(e.to_string());
self.failed = 1;
self.clone()
.into_active_model()
.update(db)
.await
.map_err(|e| InProcessTransactionError::DatabaseError(e))?;
}
};
Ok(())
}
}

@ -0,0 +1,3 @@
pub mod company;
pub mod in_process_transaction;
pub mod transaction;

@ -0,0 +1,138 @@
use crate::amf::types::TransactionData;
use crate::model;
use chrono::NaiveDate;
use sea_orm::error::DbErr;
use sea_orm::ConnectionTrait;
use sea_orm::{
ActiveModelTrait, ColumnTrait, DeriveIntoActiveModel, EntityTrait, IntoActiveModel, QueryFilter,
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use super::company::{CompanyRepoError, NewCompany};
type ActiveModel = model::transaction::ActiveModel;
#[derive(Debug, PartialEq, Clone, DeriveIntoActiveModel, Serialize, Deserialize)]
struct NewTransaction {
pub company_id: i32,
pub foreign_id: String,
pub date_published: NaiveDate,
pub date_executed: NaiveDate,
pub person: String,
pub exchange: String,
pub nature: String,
pub isin: Option<String>,
pub instrument: String,
pub volume: i32,
pub unit_price: f32,
}
pub struct NewTransactionFromTransactionData {
pub company_name: String,
pub date_published: NaiveDate,
pub date_executed: NaiveDate,
pub person: String,
pub exchange: String,
pub nature: String,
pub isin: Option<String>,
pub instrument: String,
pub volume: i32,
pub unit_price: f32,
pub foreign_id: String,
}
impl NewTransactionFromTransactionData {
pub fn new_from_transaction_data(data: TransactionData) -> Self {
NewTransactionFromTransactionData {
foreign_id: data.foreign_id,
company_name: data.company_name,
date_published: data.date_published,
date_executed: data.date_executed,
person: data.person,
exchange: data.exchange,
nature: data.nature,
isin: data.isin,
instrument: data.instrument,
volume: data.volume as i32,
unit_price: data.unit_price,
}
}
pub async fn create<C>(
&self,
db: &C,
) -> Result<(model::transaction::Model, Option<model::company::Model>), TransactionRepoError>
where
C: ConnectionTrait,
{
let comp;
if let Some(c) = model::company::Entity::find()
.filter(model::company::Column::Name.eq(self.company_name.to_owned()))
.one(db)
.await
.map_err(|e| TransactionRepoError::DatabaseError(e))?
{
comp = c;
} else {
let name = self.company_name.to_owned();
info!("Company {} will be created", name);
let new_comp = NewCompany::new(name);
comp = new_comp
.create(db)
.await
.map_err(|e| TransactionRepoError::CompanyCreationError(e))?;
}
let new_tr = NewTransaction {
company_id: comp.id,
foreign_id: self.foreign_id.to_owned(),
date_published: self.date_published,
date_executed: self.date_executed,
person: self.person.to_owned(),
exchange: self.exchange.to_owned(),
nature: self.nature.to_owned(),
isin: self.isin.to_owned(),
instrument: self.instrument.to_owned(),
volume: self.volume as i32,
unit_price: self.unit_price,
};
let res = new_tr.create(db).await?;
let ret = model::transaction::Entity::find_by_id(res.id)
.find_also_related(model::company::Entity)
.one(db)
.await
.map_err(|e| TransactionRepoError::DatabaseError(e))?
.ok_or(TransactionRepoError::CreatedTransactionNotFound)?;
Ok(ret)
}
}
impl NewTransaction {
async fn create<C>(&self, db: &C) -> Result<model::transaction::Model, TransactionRepoError>
where
C: ConnectionTrait,
{
let res = self
.clone()
.into_active_model()
.insert(db)
.await
.map_err(|e| TransactionRepoError::TransactionCreationError(e))?;
Ok(res)
}
}
#[derive(Debug, Error)]
pub enum TransactionRepoError {
#[error("Error creating transaction record: {0}")]
TransactionCreationError(DbErr),
#[error("Error creating company record: {0}")]
CompanyCreationError(CompanyRepoError),
#[error("Error finding company record: {0}")]
DatabaseError(DbErr),
#[error("The created transaction has not been found")]
CreatedTransactionNotFound,
}

@ -0,0 +1,56 @@
use rocket::{http::Status, response::status::Custom};
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use sea_orm_rocket::rocket::serde::json::Json;
use sea_orm_rocket::Connection;
use serde::{Deserialize, Serialize};
use crate::db::paginate::{paginate, PaginatedResponse};
use crate::db::Db;
use crate::model::{self, company};
#[get("/company?<page>&<size>")]
pub async fn get_all(
conn: Connection<'_, Db>,
page: Option<u64>,
size: Option<u64>,
) -> Result<Json<PaginatedResponse<company::Model>>, Custom<String>> {
let res =
paginate::<company::Entity, company::Model, company::Column>(conn, page, size, None, None)
.await
.map_err(|e| {
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?;
Ok(Json(res))
}
#[get("/company/<name>")]
pub async fn get_by_isin(
conn: Connection<'_, Db>,
name: String,
) -> Result<Json<Vec<company::Model>>, Custom<String>> {
let db = conn.into_inner();
let res = company::Entity::find()
.filter(company::Column::Name.contains(&name))
.all(db)
.await
.map_err(|e| {
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?;
Ok(Json(res))
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct CompanyTransaction {
id: i32,
name: String,
isin: String,
transactions: Vec<model::transaction::Model>,
}

@ -0,0 +1,125 @@
use rocket::http::Status;
use rocket::response::status::Custom;
use sea_orm::{ColumnTrait, EntityTrait, Order, QueryFilter, TransactionTrait};
use sea_orm_rocket::rocket::serde::json::Json;
use sea_orm_rocket::Connection;
use crate::db::paginate::{paginate, PaginatedResponse};
use crate::db::Db;
use crate::model::transaction;
use crate::model::{company, in_process_transaction};
#[get("/in_process_transaction?<page>&<size>")]
pub async fn get_all(
conn: Connection<'_, Db>,
page: Option<u64>,
size: Option<u64>,
) -> Result<Json<PaginatedResponse<in_process_transaction::Model>>, Custom<String>> {
let res = paginate::<
in_process_transaction::Entity,
in_process_transaction::Model,
in_process_transaction::Column,
>(
conn,
page,
size,
Some(in_process_transaction::Column::CreatedAt),
Some(Order::Asc),
)
.await
.map_err(|e| {
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?;
Ok(Json(res))
}
#[get("/in_process_transaction/<foreign_id>/retry")]
pub async fn retry_failed_transaction(
conn: Connection<'_, Db>,
foreign_id: String,
) -> Result<Json<(transaction::Model, Option<company::Model>)>, Custom<String>> {
let db = conn.into_inner();
let txn = db.begin().await.map_err(|e| {
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?;
let mut tr = in_process_transaction::Entity::find()
.filter(in_process_transaction::Column::Failed.eq(1))
.filter(in_process_transaction::Column::ForeignId.eq(foreign_id.to_owned()))
.one(&txn)
.await
.map_err(|e| {
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?
.ok_or(Custom(
Status::NotFound,
format!("Failed transaction {} doesn't exist", foreign_id),
))?;
tr.process(&txn).await.map_err(|e| {
Custom(
Status::InternalServerError,
format!("Error retrying failed transaction {}", e),
)
})?;
let res = transaction::Entity::find()
.filter(transaction::Column::ForeignId.eq(foreign_id.to_owned()))
.find_also_related(company::Entity)
.one(&txn)
.await
.map_err(|e| {
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?
.ok_or(Custom(
Status::NotFound,
format!("Failed to fetch just created transaction"),
))?;
txn.commit().await.map_err(|e| {
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?;
Ok(Json(res))
}
#[get("/in_process_transaction/retry_all")]
pub async fn retry_all(conn: Connection<'_, Db>) -> Result<(), Custom<String>> {
let db = conn.into_inner();
let list = in_process_transaction::Entity::find()
.all(db)
.await
.map_err(|e| {
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?;
let mut res_list = vec![];
for mut tr in list {
let res = tr.process(db).await;
match res {
Ok(val) => res_list.push(val),
Err(_) => (),
}
}
Ok(())
}

@ -0,0 +1,3 @@
pub mod company;
pub mod in_process_transaction;
pub mod transaction;

@ -0,0 +1,140 @@
use chrono::NaiveDate;
use rocket::http::Status;
use rocket::response::status::Custom;
use sea_orm::{ColumnTrait, Order};
use sea_orm_rocket::rocket::serde::json::Json;
use sea_orm_rocket::Connection;
use serde::{Deserialize, Serialize};
use crate::db::paginate::{paginate_also_related, PaginatedResponse};
use crate::{db::Db, model};
#[get("/transaction?<page>&<size>")]
pub async fn get_all(
conn: Connection<'_, Db>,
page: Option<u64>,
size: Option<u64>,
) -> Result<Json<PaginatedResponse<TransactionCompany>>, Custom<String>> {
let res = paginate_also_related::<
model::transaction::Entity,
model::company::Entity,
model::transaction::Model,
model::company::Model,
model::transaction::Column,
>(
conn,
page,
size,
Some(model::transaction::Column::DatePublished),
Some(Order::Desc),
None,
)
.await
.map_err(|e| {
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?;
let list = res
.list
.iter()
.map(|t| TransactionCompany {
id: t.0.id,
foreign_id: t.0.foreign_id.to_owned(),
date_published: t.0.date_published,
date_executed: t.0.date_executed,
person: t.0.person.to_owned(),
exchange: t.0.exchange.to_owned(),
nature: t.0.nature.to_owned(),
isin: t.0.isin.clone(),
instrument: t.0.instrument.to_owned(),
volume: t.0.volume,
unit_price: t.0.unit_price,
company: t.1.to_owned(),
})
.collect();
let res = PaginatedResponse {
count: res.count,
num_pages: res.num_pages,
list,
};
Ok(Json(res))
}
#[get("/transaction/<company_slug>?<page>&<size>")]
pub async fn get_by_company_id(
conn: Connection<'_, Db>,
company_slug: String,
page: Option<u64>,
size: Option<u64>,
) -> Result<Json<PaginatedResponse<TransactionCompany>>, Custom<String>> {
let filter = model::company::Column::Slug.eq(company_slug);
let res = paginate_also_related::<
model::transaction::Entity,
model::company::Entity,
model::transaction::Model,
model::company::Model,
model::transaction::Column,
>(
conn,
page,
size,
Some(model::transaction::Column::DatePublished),
Some(Order::Desc),
Some(filter),
)
.await
.map_err(|e| {
Custom(
Status::InternalServerError,
format!("Database error: {}", e),
)
})?;
let list = res
.list
.iter()
.map(|t| TransactionCompany {
id: t.0.id,
foreign_id: t.0.foreign_id.to_owned(),
date_published: t.0.date_published,
date_executed: t.0.date_executed,
person: t.0.person.to_owned(),
exchange: t.0.exchange.to_owned(),
nature: t.0.nature.to_owned(),
isin: t.0.isin.clone(),
instrument: t.0.instrument.to_owned(),
volume: t.0.volume,
unit_price: t.0.unit_price,
company: t.1.to_owned(),
})
.collect();
let res = PaginatedResponse {
count: res.count,
num_pages: res.num_pages,
list,
};
Ok(Json(res))
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct TransactionCompany {
pub id: i32,
pub foreign_id: String,
pub date_published: NaiveDate,
pub date_executed: NaiveDate,
pub person: String,
pub exchange: String,
pub nature: String,
pub isin: Option<String>,
pub instrument: String,
pub volume: i32,
pub unit_price: f32,
pub company: Option<model::company::Model>,
}

@ -0,0 +1,138 @@
use futures::StreamExt;
use sea_orm::{
ColumnTrait, DatabaseConnection, DbErr, EntityTrait, IntoActiveModel, PaginatorTrait,
QueryFilter,
};
use thiserror::Error;
use crate::{
amf::service::{
information_req::{AMFRequest, AMFRequestType},
pdf::AMFPdfError,
},
model::{in_process_transaction, transaction},
repo::in_process_transaction::{InProcessTransactionError, NewInProcessTransaction},
};
pub struct GetAMFTransactions {
max_req_size: u32,
}
#[derive(Debug, Error)]
pub enum GetAMFTransactionsError {
#[error("The transaction {0} contains no pdf document")]
NoDocumentError(String),
#[error("Database error: {0}")]
DatabaseError(DbErr),
#[error("Error extracting information from pdf at {0} : {1}")]
InformationExtractionError(String, AMFPdfError),
}
impl Default for GetAMFTransactions {
fn default() -> Self {
GetAMFTransactions { max_req_size: 100 }
}
}
impl GetAMFTransactions {
pub fn new(max_req_size: u32) -> Self {
GetAMFTransactions { max_req_size }
}
/// This task will extract the information from all AMF transactions until one is already found
/// in the database. Another function should be used to download individual records.
pub async fn run(&self, db: &DatabaseConnection) -> Result<(), GetAMFTransactionsError> {
info!("Starting AMF transaction download task");
let mut from = 0;
let mut req = AMFRequest::new(AMFRequestType::DD, from, self.max_req_size);
let mut tr_to_process = Vec::new();
'outer: while let Some(resp) = req.get_list().await.ok() {
info!(
"Downloading hit list from {} to {}",
from,
self.max_req_size + from
);
let list = resp.get_hits();
for hit in list.iter() {
let number = &hit.source.numero;
if transaction::Entity::find()
.filter(transaction::Column::ForeignId.eq(number.to_owned()))
.one(db)
.await
.map_err(|e| GetAMFTransactionsError::DatabaseError(e))?
.is_some()
{
// We've saved this transaction before, so we stop here.
break 'outer;
}
if in_process_transaction::Entity::find()
.filter(in_process_transaction::Column::ForeignId.eq(number.to_owned()))
.one(db)
.await
.map_err(|e| GetAMFTransactionsError::DatabaseError(e))?
.is_some()
{
// We've registered this transaction before, so we stop here.
break 'outer;
}
tr_to_process.push(
NewInProcessTransaction::new(
&hit.get_foreign_id(),
&hit.get_documents()[0].path,
false,
None,
)
.into_active_model(),
);
}
from += self.max_req_size;
req = AMFRequest::new(AMFRequestType::DD, from, self.max_req_size);
}
if !tr_to_process.is_empty() {
in_process_transaction::Entity::insert_many(tr_to_process)
.exec(db)
.await
.map_err(|e| GetAMFTransactionsError::DatabaseError(e))?;
}
let mut to_run = in_process_transaction::Entity::find()
.filter(in_process_transaction::Column::Failed.eq(0))
.paginate(db, 100);
let n_not_failed = to_run
.num_items()
.await
.map_err(|e| GetAMFTransactionsError::DatabaseError(e))?;
if n_not_failed == 0 {
info!("No new transactions to process since last run");
return Ok(());
}
info!("{} transactions will be processed", n_not_failed);
while let Some(page) = to_run
.fetch_and_next()
.await
.map_err(|e| GetAMFTransactionsError::DatabaseError(e))?
{
let mut futures = Vec::new();
for tr in page.iter() {
futures.push(async move { tr.clone().process(db).await });
}
let stream = futures::stream::iter(futures).buffer_unordered(10);
let _results: Vec<Result<(), InProcessTransactionError>> = stream.collect().await;
}
info!("AMF transactions download task finished execution");
Ok(())
}
}

@ -0,0 +1,41 @@
use std::time::Duration;
use sea_orm::ConnectOptions;
use crate::{task::get_amf_transactions::GetAMFTransactions, CONFIG};
pub mod get_amf_transactions;
trait TaskTrait {
type Err;
fn init(&self) -> Result<(), Self::Err>;
fn run(&self) -> Result<(), Self::Err>;
}
pub async fn run_tasks() -> Result<(), sea_orm::DbErr> {
let config = &CONFIG;
info!("Creating task db pool");
let mut options: ConnectOptions = ConnectOptions::new(config.database_url.to_owned());
options
.max_connections(config.max_connections)
.min_connections(config.min_connections)
.connect_timeout(Duration::from_secs(config.connect_timeout))
.acquire_timeout(Duration::from_secs(config.acquire_timeout))
.idle_timeout(Duration::from_secs(config.idle_timeout))
.max_lifetime(Duration::from_secs(config.max_lifetime))
.sqlx_logging(config.sqlx_logging);
let tasks_pool = sea_orm::Database::connect(options).await?;
let mut inter = tokio::time::interval(Duration::from_secs(config.get_amf_transaction_interval));
loop {
inter.tick().await;
info!("Running task: getamftransactions");
match GetAMFTransactions::new(1000).run(&tasks_pool).await {
Ok(_) => (),
Err(e) => error!("Task failed: {}", e),
};
}
}

@ -0,0 +1,6 @@
use server;
fn main() -> Result<(), Box<dyn std::error::Error>> {
server::main()?;
Ok(())
}

@ -0,0 +1,2 @@
fn main() -> () {
}
Loading…
Cancel
Save