173779f3dSLoGin use std::{
273779f3dSLoGin collections::{BTreeMap, HashMap},
373779f3dSLoGin fmt::Debug,
473779f3dSLoGin path::PathBuf,
573779f3dSLoGin process::exit,
673779f3dSLoGin sync::{
773779f3dSLoGin atomic::{AtomicI32, Ordering},
873779f3dSLoGin Arc, Mutex, RwLock,
973779f3dSLoGin },
1073779f3dSLoGin thread::ThreadId,
1173779f3dSLoGin };
1273779f3dSLoGin
1373779f3dSLoGin use log::{error, info};
1473779f3dSLoGin
1573779f3dSLoGin use crate::{
16eaa67f3cSLoGin context::{Action, DadkUserExecuteContext},
17*d2ade6efSJomo executor::Executor,
1873779f3dSLoGin parser::task::DADKTask,
1973779f3dSLoGin };
2073779f3dSLoGin
2173779f3dSLoGin use self::task_deque::TASK_DEQUE;
2273779f3dSLoGin
2373779f3dSLoGin pub mod task_deque;
2473779f3dSLoGin #[cfg(test)]
2573779f3dSLoGin mod tests;
2673779f3dSLoGin
2773779f3dSLoGin lazy_static! {
2873779f3dSLoGin // 线程id与任务实体id映射表
2973779f3dSLoGin pub static ref TID_EID: Mutex<HashMap<ThreadId,i32>> = Mutex::new(HashMap::new());
3073779f3dSLoGin }
3173779f3dSLoGin
3273779f3dSLoGin /// # 调度实体内部结构
3373779f3dSLoGin #[derive(Debug, Clone)]
3473779f3dSLoGin pub struct InnerEntity {
3573779f3dSLoGin /// 任务ID
3673779f3dSLoGin id: i32,
3773779f3dSLoGin file_path: PathBuf,
3873779f3dSLoGin /// 任务
3973779f3dSLoGin task: DADKTask,
4073779f3dSLoGin /// 入度
4173779f3dSLoGin indegree: usize,
4273779f3dSLoGin /// 子节点
4373779f3dSLoGin children: Vec<Arc<SchedEntity>>,
4473779f3dSLoGin }
4573779f3dSLoGin
4673779f3dSLoGin /// # 调度实体
4773779f3dSLoGin #[derive(Debug)]
4873779f3dSLoGin pub struct SchedEntity {
4973779f3dSLoGin inner: Mutex<InnerEntity>,
5073779f3dSLoGin }
5173779f3dSLoGin
5273779f3dSLoGin impl PartialEq for SchedEntity {
5373779f3dSLoGin fn eq(&self, other: &Self) -> bool {
5473779f3dSLoGin self.inner.lock().unwrap().id == other.inner.lock().unwrap().id
5573779f3dSLoGin }
eq(&self, other: &Self) -> bool5673779f3dSLoGin }
5773779f3dSLoGin
5873779f3dSLoGin impl SchedEntity {
5973779f3dSLoGin #[allow(dead_code)]
6073779f3dSLoGin pub fn id(&self) -> i32 {
6173779f3dSLoGin self.inner.lock().unwrap().id
6273779f3dSLoGin }
6373779f3dSLoGin
6473779f3dSLoGin #[allow(dead_code)]
6573779f3dSLoGin pub fn file_path(&self) -> PathBuf {
6673779f3dSLoGin self.inner.lock().unwrap().file_path.clone()
6773779f3dSLoGin }
6873779f3dSLoGin
6973779f3dSLoGin #[allow(dead_code)]
7073779f3dSLoGin pub fn task(&self) -> DADKTask {
7173779f3dSLoGin self.inner.lock().unwrap().task.clone()
7273779f3dSLoGin }
7373779f3dSLoGin
7473779f3dSLoGin /// 入度加1
7573779f3dSLoGin pub fn add_indegree(&self) {
7673779f3dSLoGin self.inner.lock().unwrap().indegree += 1;
7773779f3dSLoGin }
7873779f3dSLoGin
7973779f3dSLoGin /// 入度减1
8073779f3dSLoGin pub fn sub_indegree(&self) -> usize {
8173779f3dSLoGin self.inner.lock().unwrap().indegree -= 1;
8273779f3dSLoGin return self.inner.lock().unwrap().indegree;
8373779f3dSLoGin }
8473779f3dSLoGin
8573779f3dSLoGin /// 增加子节点
8673779f3dSLoGin pub fn add_child(&self, entity: Arc<SchedEntity>) {
8773779f3dSLoGin self.inner.lock().unwrap().children.push(entity);
8873779f3dSLoGin }
8973779f3dSLoGin
9073779f3dSLoGin /// 获取入度
9173779f3dSLoGin pub fn indegree(&self) -> usize {
9273779f3dSLoGin self.inner.lock().unwrap().indegree
9373779f3dSLoGin }
9473779f3dSLoGin
9573779f3dSLoGin /// 当前任务完成后,所有子节点入度减1
9673779f3dSLoGin ///
9773779f3dSLoGin /// ## 参数
9873779f3dSLoGin ///
9973779f3dSLoGin /// 无
10073779f3dSLoGin ///
10173779f3dSLoGin /// ## 返回值
10273779f3dSLoGin ///
10373779f3dSLoGin /// 所有入度为0的子节点集合
10473779f3dSLoGin pub fn sub_children_indegree(&self) -> Vec<Arc<SchedEntity>> {
10573779f3dSLoGin let mut zero_child = Vec::new();
10673779f3dSLoGin let children = &self.inner.lock().unwrap().children;
10773779f3dSLoGin for child in children.iter() {
10873779f3dSLoGin if child.sub_indegree() == 0 {
10973779f3dSLoGin zero_child.push(child.clone());
11073779f3dSLoGin }
11173779f3dSLoGin }
11273779f3dSLoGin return zero_child;
11373779f3dSLoGin }
11473779f3dSLoGin }
11573779f3dSLoGin
11673779f3dSLoGin /// # 调度实体列表
11773779f3dSLoGin ///
11873779f3dSLoGin /// 用于存储所有的调度实体
11973779f3dSLoGin #[derive(Debug)]
12073779f3dSLoGin pub struct SchedEntities {
12173779f3dSLoGin /// 任务ID到调度实体的映射
12273779f3dSLoGin id2entity: RwLock<BTreeMap<i32, Arc<SchedEntity>>>,
12373779f3dSLoGin }
12473779f3dSLoGin
12573779f3dSLoGin impl SchedEntities {
12673779f3dSLoGin pub fn new() -> Self {
12773779f3dSLoGin Self {
12873779f3dSLoGin id2entity: RwLock::new(BTreeMap::new()),
12973779f3dSLoGin }
13073779f3dSLoGin }
13173779f3dSLoGin
13273779f3dSLoGin pub fn add(&mut self, entity: Arc<SchedEntity>) {
13373779f3dSLoGin self.id2entity
new() -> Self13473779f3dSLoGin .write()
13573779f3dSLoGin .unwrap()
13673779f3dSLoGin .insert(entity.id(), entity.clone());
13773779f3dSLoGin }
13873779f3dSLoGin
13973779f3dSLoGin #[allow(dead_code)]
add(&mut self, entity: Arc<SchedEntity>)14073779f3dSLoGin pub fn get(&self, id: i32) -> Option<Arc<SchedEntity>> {
14173779f3dSLoGin self.id2entity.read().unwrap().get(&id).cloned()
14273779f3dSLoGin }
14373779f3dSLoGin
14473779f3dSLoGin pub fn get_by_name_version(&self, name: &str, version: &str) -> Option<Arc<SchedEntity>> {
14573779f3dSLoGin for e in self.id2entity.read().unwrap().iter() {
14673779f3dSLoGin if e.1.task().name_version_env() == DADKTask::name_version_uppercase(name, version) {
14773779f3dSLoGin return Some(e.1.clone());
get(&self, id: i32) -> Option<Arc<SchedEntity>>14873779f3dSLoGin }
14973779f3dSLoGin }
15073779f3dSLoGin return None;
15173779f3dSLoGin }
get_by_name_version(&self, name: &str, version: &str) -> Option<Arc<SchedEntity>>15273779f3dSLoGin
15373779f3dSLoGin pub fn entities(&self) -> Vec<Arc<SchedEntity>> {
15473779f3dSLoGin let mut v = Vec::new();
15573779f3dSLoGin for e in self.id2entity.read().unwrap().iter() {
15673779f3dSLoGin v.push(e.1.clone());
15773779f3dSLoGin }
15873779f3dSLoGin return v;
15973779f3dSLoGin }
16073779f3dSLoGin
entities(&self) -> Vec<Arc<SchedEntity>>16173779f3dSLoGin pub fn id2entity(&self) -> BTreeMap<i32, Arc<SchedEntity>> {
16273779f3dSLoGin self.id2entity.read().unwrap().clone()
16373779f3dSLoGin }
16473779f3dSLoGin
16573779f3dSLoGin #[allow(dead_code)]
16673779f3dSLoGin pub fn len(&self) -> usize {
16773779f3dSLoGin self.id2entity.read().unwrap().len()
16873779f3dSLoGin }
id2entity(&self) -> BTreeMap<i32, Arc<SchedEntity>>16973779f3dSLoGin
17073779f3dSLoGin #[allow(dead_code)]
17173779f3dSLoGin pub fn is_empty(&self) -> bool {
17273779f3dSLoGin self.id2entity.read().unwrap().is_empty()
17373779f3dSLoGin }
len(&self) -> usize17473779f3dSLoGin
17573779f3dSLoGin #[allow(dead_code)]
17673779f3dSLoGin pub fn clear(&mut self) {
17773779f3dSLoGin self.id2entity.write().unwrap().clear();
17873779f3dSLoGin }
is_empty(&self) -> bool17973779f3dSLoGin
18073779f3dSLoGin pub fn topo_sort(&self) -> Vec<Arc<SchedEntity>> {
18173779f3dSLoGin let mut result = Vec::new();
18273779f3dSLoGin let mut visited = BTreeMap::new();
18373779f3dSLoGin let btree = self.id2entity.write().unwrap().clone();
18473779f3dSLoGin for entity in btree.iter() {
18573779f3dSLoGin if !visited.contains_key(entity.0) {
18673779f3dSLoGin let r = self.dfs(entity.1, &mut visited, &mut result);
18773779f3dSLoGin if r.is_err() {
18873779f3dSLoGin let err = r.unwrap_err();
18973779f3dSLoGin error!("{}", err.display());
19073779f3dSLoGin println!("Please fix the errors above and try again.");
19173779f3dSLoGin std::process::exit(1);
19273779f3dSLoGin }
19373779f3dSLoGin }
19473779f3dSLoGin }
19573779f3dSLoGin return result;
19673779f3dSLoGin }
19773779f3dSLoGin
19873779f3dSLoGin fn dfs(
19973779f3dSLoGin &self,
20073779f3dSLoGin entity: &Arc<SchedEntity>,
20173779f3dSLoGin visited: &mut BTreeMap<i32, bool>,
20273779f3dSLoGin result: &mut Vec<Arc<SchedEntity>>,
20373779f3dSLoGin ) -> Result<(), DependencyCycleError> {
20473779f3dSLoGin visited.insert(entity.id(), false);
20573779f3dSLoGin for dep in entity.task().depends.iter() {
dfs( &self, entity: &Arc<SchedEntity>, visited: &mut BTreeMap<i32, bool>, result: &mut Vec<Arc<SchedEntity>>, ) -> Result<(), DependencyCycleError>20673779f3dSLoGin if let Some(dep_entity) = self.get_by_name_version(&dep.name, &dep.version) {
20773779f3dSLoGin let guard = self.id2entity.write().unwrap();
20873779f3dSLoGin let e = guard.get(&entity.id()).unwrap();
20973779f3dSLoGin let d = guard.get(&dep_entity.id()).unwrap();
21073779f3dSLoGin e.add_indegree();
21173779f3dSLoGin d.add_child(e.clone());
21273779f3dSLoGin if let Some(&false) = visited.get(&dep_entity.id()) {
21373779f3dSLoGin // 输出完整环形依赖
21473779f3dSLoGin let mut err = DependencyCycleError::new(dep_entity.clone());
21573779f3dSLoGin
21673779f3dSLoGin err.add(entity.clone(), dep_entity);
21773779f3dSLoGin return Err(err);
21873779f3dSLoGin }
21973779f3dSLoGin if !visited.contains_key(&dep_entity.id()) {
22073779f3dSLoGin drop(guard);
22173779f3dSLoGin let r = self.dfs(&dep_entity, visited, result);
22273779f3dSLoGin if r.is_err() {
22373779f3dSLoGin let mut err: DependencyCycleError = r.unwrap_err();
22473779f3dSLoGin // 如果错误已经停止传播,则直接返回
22573779f3dSLoGin if err.stop_propagation {
22673779f3dSLoGin return Err(err);
22773779f3dSLoGin }
22873779f3dSLoGin // 如果当前实体是错误的起始实体,则停止传播
22973779f3dSLoGin if entity == &err.head_entity {
23073779f3dSLoGin err.stop_propagation();
23173779f3dSLoGin }
23273779f3dSLoGin err.add(entity.clone(), dep_entity);
23373779f3dSLoGin return Err(err);
23473779f3dSLoGin }
23573779f3dSLoGin }
23673779f3dSLoGin } else {
23773779f3dSLoGin error!(
23873779f3dSLoGin "Dependency not found: {} -> {}",
23973779f3dSLoGin entity.task().name_version(),
24073779f3dSLoGin dep.name_version()
24173779f3dSLoGin );
24273779f3dSLoGin std::process::exit(1);
24373779f3dSLoGin }
24473779f3dSLoGin }
24573779f3dSLoGin visited.insert(entity.id(), true);
24673779f3dSLoGin result.push(entity.clone());
24773779f3dSLoGin return Ok(());
24873779f3dSLoGin }
24973779f3dSLoGin }
25073779f3dSLoGin
25173779f3dSLoGin /// # 任务调度器
25273779f3dSLoGin #[derive(Debug)]
25373779f3dSLoGin pub struct Scheduler {
25473779f3dSLoGin /// DragonOS sysroot在主机上的路径
25570352fd6SLoGin sysroot_dir: PathBuf,
25673779f3dSLoGin /// 要执行的操作
25773779f3dSLoGin action: Action,
25873779f3dSLoGin /// 调度实体列表
25973779f3dSLoGin target: SchedEntities,
26073779f3dSLoGin /// dadk执行的上下文
26173779f3dSLoGin context: Arc<DadkUserExecuteContext>,
26273779f3dSLoGin }
26373779f3dSLoGin
26473779f3dSLoGin pub enum SchedulerError {
26573779f3dSLoGin TaskError(String),
26673779f3dSLoGin /// 不是当前正在编译的目标架构
26773779f3dSLoGin InvalidTargetArch(String),
26873779f3dSLoGin DependencyNotFound(Arc<SchedEntity>, String),
26973779f3dSLoGin RunError(String),
27073779f3dSLoGin }
27173779f3dSLoGin
27273779f3dSLoGin impl Debug for SchedulerError {
27373779f3dSLoGin fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27473779f3dSLoGin match self {
27573779f3dSLoGin Self::TaskError(arg0) => {
27673779f3dSLoGin write!(f, "TaskError: {}", arg0)
27773779f3dSLoGin }
27873779f3dSLoGin SchedulerError::DependencyNotFound(current, msg) => {
27973779f3dSLoGin write!(
28073779f3dSLoGin f,
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result28173779f3dSLoGin "For task {}, dependency not found: {}. Please check file: {}",
28273779f3dSLoGin current.task().name_version(),
28373779f3dSLoGin msg,
28473779f3dSLoGin current.file_path().display()
28573779f3dSLoGin )
28673779f3dSLoGin }
28773779f3dSLoGin SchedulerError::RunError(msg) => {
28873779f3dSLoGin write!(f, "RunError: {}", msg)
28973779f3dSLoGin }
29073779f3dSLoGin SchedulerError::InvalidTargetArch(msg) => {
29173779f3dSLoGin write!(f, "InvalidTargetArch: {}", msg)
29273779f3dSLoGin }
29373779f3dSLoGin }
29473779f3dSLoGin }
29573779f3dSLoGin }
29673779f3dSLoGin
29773779f3dSLoGin impl Scheduler {
29873779f3dSLoGin pub fn new(
29973779f3dSLoGin context: Arc<DadkUserExecuteContext>,
30073779f3dSLoGin dragonos_dir: PathBuf,
30173779f3dSLoGin action: Action,
30273779f3dSLoGin tasks: Vec<(PathBuf, DADKTask)>,
30373779f3dSLoGin ) -> Result<Self, SchedulerError> {
30473779f3dSLoGin let entities = SchedEntities::new();
30573779f3dSLoGin
new( context: Arc<DadkUserExecuteContext>, dragonos_dir: PathBuf, action: Action, tasks: Vec<(PathBuf, DADKTask)>, ) -> Result<Self, SchedulerError>30673779f3dSLoGin let mut scheduler = Scheduler {
30770352fd6SLoGin sysroot_dir: dragonos_dir,
30873779f3dSLoGin action,
30973779f3dSLoGin target: entities,
31073779f3dSLoGin context,
31173779f3dSLoGin };
31273779f3dSLoGin
31373779f3dSLoGin let r = scheduler.add_tasks(tasks);
31473779f3dSLoGin if r.is_err() {
31573779f3dSLoGin error!("Error while adding tasks: {:?}", r);
31673779f3dSLoGin return Err(r.err().unwrap());
31773779f3dSLoGin }
31873779f3dSLoGin
31973779f3dSLoGin return Ok(scheduler);
32073779f3dSLoGin }
32173779f3dSLoGin
32273779f3dSLoGin /// # 添加多个任务
32373779f3dSLoGin ///
32473779f3dSLoGin /// 添加任务到调度器中,如果任务已经存在,则返回错误
32573779f3dSLoGin pub fn add_tasks(&mut self, tasks: Vec<(PathBuf, DADKTask)>) -> Result<(), SchedulerError> {
32673779f3dSLoGin for task in tasks {
32773779f3dSLoGin let e = self.add_task(task.0, task.1);
32873779f3dSLoGin if e.is_err() {
32973779f3dSLoGin if let Err(SchedulerError::InvalidTargetArch(_)) = &e {
33073779f3dSLoGin continue;
33173779f3dSLoGin }
33273779f3dSLoGin e?;
add_tasks(&mut self, tasks: Vec<(PathBuf, DADKTask)>) -> Result<(), SchedulerError>33373779f3dSLoGin }
33473779f3dSLoGin }
33573779f3dSLoGin
33673779f3dSLoGin return Ok(());
33773779f3dSLoGin }
33873779f3dSLoGin
33973779f3dSLoGin /// # 任务是否匹配当前目标架构
34073779f3dSLoGin pub fn task_arch_matched(&self, task: &DADKTask) -> bool {
34173779f3dSLoGin task.target_arch.contains(self.context.target_arch())
34273779f3dSLoGin }
34373779f3dSLoGin
34473779f3dSLoGin /// # 添加一个任务
34573779f3dSLoGin ///
34673779f3dSLoGin /// 添加任务到调度器中,如果任务已经存在,则返回错误
34773779f3dSLoGin pub fn add_task(
task_arch_matched(&self, task: &DADKTask) -> bool34873779f3dSLoGin &mut self,
34973779f3dSLoGin path: PathBuf,
35073779f3dSLoGin task: DADKTask,
35173779f3dSLoGin ) -> Result<Arc<SchedEntity>, SchedulerError> {
35273779f3dSLoGin if !self.task_arch_matched(&task) {
35373779f3dSLoGin return Err(SchedulerError::InvalidTargetArch(format!(
35473779f3dSLoGin "Task {} is not for target arch: {:?}",
35573779f3dSLoGin task.name_version(),
35673779f3dSLoGin self.context.target_arch()
35773779f3dSLoGin )));
35873779f3dSLoGin }
35973779f3dSLoGin
36073779f3dSLoGin let id: i32 = self.generate_task_id();
36173779f3dSLoGin let indegree: usize = 0;
36273779f3dSLoGin let children = Vec::new();
36373779f3dSLoGin let entity = Arc::new(SchedEntity {
36473779f3dSLoGin inner: Mutex::new(InnerEntity {
36573779f3dSLoGin id,
36673779f3dSLoGin task,
36773779f3dSLoGin file_path: path.clone(),
36873779f3dSLoGin indegree,
36973779f3dSLoGin children,
37073779f3dSLoGin }),
37173779f3dSLoGin });
37273779f3dSLoGin let name_version = (entity.task().name.clone(), entity.task().version.clone());
37373779f3dSLoGin
37473779f3dSLoGin if self
37573779f3dSLoGin .target
37673779f3dSLoGin .get_by_name_version(&name_version.0, &name_version.1)
37773779f3dSLoGin .is_some()
37873779f3dSLoGin {
37973779f3dSLoGin return Err(SchedulerError::TaskError(format!(
38073779f3dSLoGin "Task with name [{}] and version [{}] already exists. Config file: {}",
38173779f3dSLoGin name_version.0,
38273779f3dSLoGin name_version.1,
38373779f3dSLoGin path.display()
38473779f3dSLoGin )));
38573779f3dSLoGin }
38673779f3dSLoGin
38773779f3dSLoGin self.target.add(entity.clone());
38873779f3dSLoGin
38973779f3dSLoGin info!("Task added: {}", entity.task().name_version());
39073779f3dSLoGin return Ok(entity);
39173779f3dSLoGin }
39273779f3dSLoGin
39373779f3dSLoGin fn generate_task_id(&self) -> i32 {
39473779f3dSLoGin static TASK_ID: AtomicI32 = AtomicI32::new(0);
39573779f3dSLoGin return TASK_ID.fetch_add(1, Ordering::SeqCst);
39673779f3dSLoGin }
39773779f3dSLoGin
39873779f3dSLoGin /// # 执行调度器中的所有任务
39973779f3dSLoGin pub fn run(&self) -> Result<(), SchedulerError> {
40073779f3dSLoGin // 准备全局环境变量
40173779f3dSLoGin crate::executor::prepare_env(&self.target, &self.context)
40273779f3dSLoGin .map_err(|e| SchedulerError::RunError(format!("{:?}", e)))?;
generate_task_id(&self) -> i3240373779f3dSLoGin
40473779f3dSLoGin match self.action {
40573779f3dSLoGin Action::Build | Action::Install => {
40673779f3dSLoGin self.run_with_topo_sort()?;
40773779f3dSLoGin }
40873779f3dSLoGin Action::Clean(_) => self.run_without_topo_sort()?,
40973779f3dSLoGin }
41073779f3dSLoGin
41173779f3dSLoGin return Ok(());
41273779f3dSLoGin }
41373779f3dSLoGin
41473779f3dSLoGin /// Action需要按照拓扑序执行
41573779f3dSLoGin ///
41673779f3dSLoGin /// Action::Build | Action::Install
41773779f3dSLoGin fn run_with_topo_sort(&self) -> Result<(), SchedulerError> {
41873779f3dSLoGin // 检查是否有不存在的依赖
41973779f3dSLoGin let r = self.check_not_exists_dependency();
42073779f3dSLoGin if r.is_err() {
42173779f3dSLoGin error!("Error while checking tasks: {:?}", r);
42273779f3dSLoGin return r;
42373779f3dSLoGin }
42473779f3dSLoGin
42573779f3dSLoGin // 对调度实体进行拓扑排序
42673779f3dSLoGin let r: Vec<Arc<SchedEntity>> = self.target.topo_sort();
42773779f3dSLoGin
42873779f3dSLoGin let action = self.action.clone();
42970352fd6SLoGin let dragonos_dir = self.sysroot_dir.clone();
43073779f3dSLoGin let id2entity = self.target.id2entity();
43173779f3dSLoGin let count = r.len();
43273779f3dSLoGin
43373779f3dSLoGin // 启动守护线程
43473779f3dSLoGin let handler = std::thread::spawn(move || {
43573779f3dSLoGin Self::build_install_daemon(action, dragonos_dir, id2entity, count, &r)
43673779f3dSLoGin });
43773779f3dSLoGin
43873779f3dSLoGin handler.join().expect("Could not join deamon");
43973779f3dSLoGin
44073779f3dSLoGin return Ok(());
44173779f3dSLoGin }
44273779f3dSLoGin
run(&self) -> Result<(), SchedulerError>44373779f3dSLoGin /// Action不需要按照拓扑序执行
44473779f3dSLoGin fn run_without_topo_sort(&self) -> Result<(), SchedulerError> {
44573779f3dSLoGin // 启动守护线程
44673779f3dSLoGin let action = self.action.clone();
44770352fd6SLoGin let dragonos_dir = self.sysroot_dir.clone();
44873779f3dSLoGin let mut r = self.target.entities();
44973779f3dSLoGin let handler = std::thread::spawn(move || {
45073779f3dSLoGin Self::clean_daemon(action, dragonos_dir, &mut r);
45173779f3dSLoGin });
45273779f3dSLoGin
45373779f3dSLoGin handler.join().expect("Could not join deamon");
45473779f3dSLoGin return Ok(());
45573779f3dSLoGin }
45673779f3dSLoGin
45773779f3dSLoGin pub fn execute(action: Action, dragonos_dir: PathBuf, entity: Arc<SchedEntity>) {
45873779f3dSLoGin let mut executor = Executor::new(entity.clone(), action.clone(), dragonos_dir.clone())
45973779f3dSLoGin .map_err(|e| {
46073779f3dSLoGin error!(
46173779f3dSLoGin "Error while creating executor for task {} : {:?}",
run_with_topo_sort(&self) -> Result<(), SchedulerError>46273779f3dSLoGin entity.task().name_version(),
46373779f3dSLoGin e
46473779f3dSLoGin );
46573779f3dSLoGin exit(-1);
46673779f3dSLoGin })
46773779f3dSLoGin .unwrap();
46873779f3dSLoGin
46973779f3dSLoGin executor
47073779f3dSLoGin .execute()
47173779f3dSLoGin .map_err(|e| {
47273779f3dSLoGin error!(
47373779f3dSLoGin "Error while executing task {} : {:?}",
47473779f3dSLoGin entity.task().name_version(),
47573779f3dSLoGin e
47673779f3dSLoGin );
47773779f3dSLoGin exit(-1);
47873779f3dSLoGin })
47973779f3dSLoGin .unwrap();
48073779f3dSLoGin }
48173779f3dSLoGin
48273779f3dSLoGin /// 构建和安装DADK任务的守护线程
48373779f3dSLoGin ///
48473779f3dSLoGin /// ## 参数
48573779f3dSLoGin ///
48673779f3dSLoGin /// - `action` : 要执行的操作
48773779f3dSLoGin /// - `dragonos_dir` : DragonOS sysroot在主机上的路径
48873779f3dSLoGin /// - `id2entity` : DADK任务id与实体映射表
run_without_topo_sort(&self) -> Result<(), SchedulerError>48973779f3dSLoGin /// - `count` : 当前剩余任务数
49073779f3dSLoGin /// - `r` : 总任务实体表
49173779f3dSLoGin ///
49273779f3dSLoGin /// ## 返回值
49373779f3dSLoGin ///
49473779f3dSLoGin /// 无
49573779f3dSLoGin pub fn build_install_daemon(
49673779f3dSLoGin action: Action,
49773779f3dSLoGin dragonos_dir: PathBuf,
49873779f3dSLoGin id2entity: BTreeMap<i32, Arc<SchedEntity>>,
49973779f3dSLoGin mut count: usize,
50073779f3dSLoGin r: &Vec<Arc<SchedEntity>>,
50173779f3dSLoGin ) {
50273779f3dSLoGin let mut guard = TASK_DEQUE.lock().unwrap();
50373779f3dSLoGin // 初始化0入度的任务实体
50473779f3dSLoGin let mut zero_entity: Vec<Arc<SchedEntity>> = Vec::new();
50573779f3dSLoGin for e in r.iter() {
50673779f3dSLoGin if e.indegree() == 0 {
50773779f3dSLoGin zero_entity.push(e.clone());
50873779f3dSLoGin }
50973779f3dSLoGin }
51073779f3dSLoGin
51173779f3dSLoGin while count > 0 {
51273779f3dSLoGin // 将入度为0的任务实体加入任务队列中,直至没有入度为0的任务实体 或 任务队列满了
51373779f3dSLoGin while !zero_entity.is_empty()
51473779f3dSLoGin && guard.build_install_task(
51573779f3dSLoGin action.clone(),
51673779f3dSLoGin dragonos_dir.clone(),
51773779f3dSLoGin zero_entity.last().unwrap().clone(),
51873779f3dSLoGin )
51973779f3dSLoGin {
52073779f3dSLoGin zero_entity.pop();
52173779f3dSLoGin }
52273779f3dSLoGin
52373779f3dSLoGin let queue = guard.queue_mut();
52473779f3dSLoGin // 如果任务线程已完成,将其从任务队列中删除,并把它的子节点入度减1,如果有0入度子节点,则加入zero_entity,后续可以加入任务队列中
52573779f3dSLoGin queue.retain(|x| {
52673779f3dSLoGin if x.is_finished() {
52773779f3dSLoGin count -= 1;
52873779f3dSLoGin let tid = x.thread().id();
52973779f3dSLoGin let eid = *TID_EID.lock().unwrap().get(&tid).unwrap();
53073779f3dSLoGin let entity = id2entity.get(&eid).unwrap();
53173779f3dSLoGin let zero = entity.sub_children_indegree();
53273779f3dSLoGin for e in zero.iter() {
53373779f3dSLoGin zero_entity.push(e.clone());
53473779f3dSLoGin }
53573779f3dSLoGin return false;
53673779f3dSLoGin }
53773779f3dSLoGin return true;
53873779f3dSLoGin })
53973779f3dSLoGin }
54073779f3dSLoGin }
54173779f3dSLoGin
54273779f3dSLoGin /// 清理DADK任务的守护线程
54373779f3dSLoGin ///
54473779f3dSLoGin /// ## 参数
54573779f3dSLoGin ///
54673779f3dSLoGin /// - `action` : 要执行的操作
54773779f3dSLoGin /// - `dragonos_dir` : DragonOS sysroot在主机上的路径
54873779f3dSLoGin /// - `r` : 总任务实体表
54973779f3dSLoGin ///
55073779f3dSLoGin /// ## 返回值
55173779f3dSLoGin ///
55273779f3dSLoGin /// 无
55373779f3dSLoGin pub fn clean_daemon(action: Action, dragonos_dir: PathBuf, r: &mut Vec<Arc<SchedEntity>>) {
55473779f3dSLoGin let mut guard = TASK_DEQUE.lock().unwrap();
55573779f3dSLoGin while !guard.queue().is_empty() && !r.is_empty() {
55673779f3dSLoGin guard.clean_task(action, dragonos_dir.clone(), r.pop().unwrap().clone());
55773779f3dSLoGin }
55873779f3dSLoGin }
55973779f3dSLoGin
56073779f3dSLoGin /// # 检查是否有不存在的依赖
56173779f3dSLoGin ///
56273779f3dSLoGin /// 如果某个任务的dependency中的任务不存在,则返回错误
56373779f3dSLoGin fn check_not_exists_dependency(&self) -> Result<(), SchedulerError> {
56473779f3dSLoGin for entity in self.target.entities().iter() {
56573779f3dSLoGin for dependency in entity.task().depends.iter() {
56673779f3dSLoGin let name_version = (dependency.name.clone(), dependency.version.clone());
56773779f3dSLoGin if !self
56873779f3dSLoGin .target
56973779f3dSLoGin .get_by_name_version(&name_version.0, &name_version.1)
57073779f3dSLoGin .is_some()
57173779f3dSLoGin {
57273779f3dSLoGin return Err(SchedulerError::DependencyNotFound(
57373779f3dSLoGin entity.clone(),
57473779f3dSLoGin format!("name:{}, version:{}", name_version.0, name_version.1,),
57573779f3dSLoGin ));
57673779f3dSLoGin }
57773779f3dSLoGin }
57873779f3dSLoGin }
57973779f3dSLoGin
58073779f3dSLoGin return Ok(());
58173779f3dSLoGin }
58273779f3dSLoGin }
58373779f3dSLoGin
58473779f3dSLoGin /// # 环形依赖错误路径
58573779f3dSLoGin ///
58673779f3dSLoGin /// 本结构体用于在回溯过程中记录环形依赖的路径。
58773779f3dSLoGin ///
58873779f3dSLoGin /// 例如,假设有如下依赖关系:
58973779f3dSLoGin ///
59073779f3dSLoGin /// ```text
59173779f3dSLoGin /// A -> B -> C -> D -> A
59273779f3dSLoGin /// ```
59373779f3dSLoGin ///
59473779f3dSLoGin /// 则在DFS回溯过程中,会依次记录如下路径:
59573779f3dSLoGin ///
59673779f3dSLoGin /// ```text
59773779f3dSLoGin /// D -> A
clean_daemon(action: Action, dragonos_dir: PathBuf, r: &mut Vec<Arc<SchedEntity>>)59873779f3dSLoGin /// C -> D
59973779f3dSLoGin /// B -> C
60073779f3dSLoGin /// A -> B
60173779f3dSLoGin pub struct DependencyCycleError {
60273779f3dSLoGin /// # 起始实体
60373779f3dSLoGin /// 本错误的起始实体,即环形依赖的起点
60473779f3dSLoGin head_entity: Arc<SchedEntity>,
60573779f3dSLoGin /// 是否停止传播
60673779f3dSLoGin stop_propagation: bool,
60773779f3dSLoGin /// 依赖关系
60873779f3dSLoGin dependencies: Vec<(Arc<SchedEntity>, Arc<SchedEntity>)>,
60973779f3dSLoGin }
61073779f3dSLoGin
61173779f3dSLoGin impl DependencyCycleError {
61273779f3dSLoGin pub fn new(head_entity: Arc<SchedEntity>) -> Self {
61373779f3dSLoGin Self {
61473779f3dSLoGin head_entity,
61573779f3dSLoGin stop_propagation: false,
61673779f3dSLoGin dependencies: Vec::new(),
61773779f3dSLoGin }
61873779f3dSLoGin }
61973779f3dSLoGin
62073779f3dSLoGin pub fn add(&mut self, current: Arc<SchedEntity>, dependency: Arc<SchedEntity>) {
62173779f3dSLoGin self.dependencies.push((current, dependency));
62273779f3dSLoGin }
62373779f3dSLoGin
62473779f3dSLoGin pub fn stop_propagation(&mut self) {
62573779f3dSLoGin self.stop_propagation = true;
62673779f3dSLoGin }
62773779f3dSLoGin
62873779f3dSLoGin #[allow(dead_code)]
62973779f3dSLoGin pub fn dependencies(&self) -> &Vec<(Arc<SchedEntity>, Arc<SchedEntity>)> {
63073779f3dSLoGin &self.dependencies
63173779f3dSLoGin }
63273779f3dSLoGin
63373779f3dSLoGin pub fn display(&self) -> String {
63473779f3dSLoGin let mut tmp = self.dependencies.clone();
63573779f3dSLoGin tmp.reverse();
63673779f3dSLoGin
63773779f3dSLoGin let mut ret = format!("Dependency cycle detected: \nStart ->\n");
63873779f3dSLoGin for (current, dep) in tmp.iter() {
63973779f3dSLoGin ret.push_str(&format!(
64073779f3dSLoGin "->\t{} ({})\t--depends-->\t{} ({})\n",
64173779f3dSLoGin current.task().name_version(),
64273779f3dSLoGin current.file_path().display(),
64373779f3dSLoGin dep.task().name_version(),
64473779f3dSLoGin dep.file_path().display()
64573779f3dSLoGin ));
64673779f3dSLoGin }
64773779f3dSLoGin ret.push_str("-> End");
64873779f3dSLoGin return ret;
64973779f3dSLoGin }
65073779f3dSLoGin }
651