Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/taurus/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod runtime_status;
74 changes: 74 additions & 0 deletions crates/taurus/src/client/runtime_status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use std::time::{SystemTime, UNIX_EPOCH};

use code0_flow::flow_service::retry::create_channel_with_retry;
use tonic::transport::Channel;
use tucana::{
aquila::{
RuntimeStatusUpdateRequest, runtime_status_service_client::RuntimeStatusServiceClient,
runtime_status_update_request::Status,
},
shared::{ExecutionRuntimeStatus, RuntimeFeature},
};

pub struct TaurusRuntimeStatusService {
channel: Channel,
identifier: String,
features: Vec<RuntimeFeature>,
}

impl TaurusRuntimeStatusService {
pub async fn from_url(
aquila_url: String,
identifier: String,
features: Vec<RuntimeFeature>,
) -> Self {
let channel = create_channel_with_retry("Aquila", aquila_url).await;
Self::new(channel, identifier, features)
}

pub fn new(channel: Channel, identifier: String, features: Vec<RuntimeFeature>) -> Self {
TaurusRuntimeStatusService {
channel,
identifier,
features,
}
}

pub async fn update_runtime_status(
&self,
status: tucana::shared::execution_runtime_status::Status,
) {
log::info!("Updating the current Runtime Status!");
let mut client = RuntimeStatusServiceClient::new(self.channel.clone());

let now = SystemTime::now();
let timestamp = match now.duration_since(UNIX_EPOCH) {
Ok(time) => time.as_secs(),
Err(err) => {
log::error!("cannot get current system time: {:?}", err);
0
}
};

let request = RuntimeStatusUpdateRequest {
status: Some(Status::ExecutionRuntimeStatus(ExecutionRuntimeStatus {
status: status.into(),
timestamp: timestamp as i64,
identifier: self.identifier.clone(),
features: self.features.clone(),
})),
};

match client.update(request).await {
Ok(response) => {
log::info!(
"Was the update of the RuntimeStatus accepted by Sagittarius? {}",
response.into_inner().success
);
}
Err(err) => {
log::error!("Failed to update RuntimeStatus: {:?}", err);
}
}
}
}
32 changes: 31 additions & 1 deletion crates/taurus/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
mod client;
mod config;

use crate::client::runtime_status::TaurusRuntimeStatusService;
use crate::config::Config;
use code0_flow::flow_service::FlowUpdateService;

Expand All @@ -16,7 +18,7 @@ use taurus_core::context::signal::Signal;
use tokio::signal;
use tonic_health::pb::health_server::HealthServer;
use tucana::shared::value::Kind;
use tucana::shared::{ExecutionFlow, NodeFunction, Value};
use tucana::shared::{ExecutionFlow, NodeFunction, RuntimeFeature, Translation, Value};

fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> Signal {
let mut context = Context::default();
Expand All @@ -40,6 +42,7 @@ async fn main() {

let config = Config::new();
let store = FunctionStore::default();
let mut runtime_status_service: Option<TaurusRuntimeStatusService> = None;

let client = match async_nats::connect(config.nats_url.clone()).await {
Ok(client) => {
Expand Down Expand Up @@ -88,6 +91,27 @@ async fn main() {
.await
.send()
.await;

let status_service = TaurusRuntimeStatusService::from_url(
config.aquila_url.clone(),
"taurus".into(),
vec![RuntimeFeature {
name: vec![Translation {
code: "en-US".to_string(),
content: "Runtime".to_string(),
}],
description: vec![Translation {
code: "en-US".to_string(),
content: "Will execute incoming flows.".to_string(),
}],
}],
)
.await;

status_service
.update_runtime_status(tucana::shared::execution_runtime_status::Status::Running)
.await;
runtime_status_service = Some(status_service);
}

let mut worker_task = tokio::spawn(async move {
Expand Down Expand Up @@ -212,5 +236,11 @@ async fn main() {
}
}

if let Some(status_service) = &runtime_status_service {
status_service
.update_runtime_status(tucana::shared::execution_runtime_status::Status::Stopped)
.await;
};

log::info!("Taurus shutdown complete");
}