xref: /DADK/dadk-user/src/scheduler/mod.rs (revision 70352fd6b1ba6ff2dca344d5c30e0e9b76b5e6b4)
173779f3dSLoGin use std::{
273779f3dSLoGin     collections::{BTreeMap, HashMap},
373779f3dSLoGin     fmt::Debug,
473779f3dSLoGin     path::PathBuf,
573779f3dSLoGin     process::exit,
673779f3dSLoGin     sync::{
773779f3dSLoGin         atomic::{AtomicI32, Ordering},
873779f3dSLoGin         Arc, Mutex, RwLock,
973779f3dSLoGin     },
1073779f3dSLoGin     thread::ThreadId,
1173779f3dSLoGin };
1273779f3dSLoGin 
1373779f3dSLoGin use log::{error, info};
1473779f3dSLoGin 
1573779f3dSLoGin use crate::{
1673779f3dSLoGin     console::Action,
1773779f3dSLoGin     context::DadkUserExecuteContext,
1873779f3dSLoGin     executor::{target::Target, Executor},
1973779f3dSLoGin     parser::task::DADKTask,
2073779f3dSLoGin };
2173779f3dSLoGin 
2273779f3dSLoGin use self::task_deque::TASK_DEQUE;
2373779f3dSLoGin 
2473779f3dSLoGin pub mod task_deque;
2573779f3dSLoGin #[cfg(test)]
2673779f3dSLoGin mod tests;
2773779f3dSLoGin 
2873779f3dSLoGin lazy_static! {
2973779f3dSLoGin     // 线程id与任务实体id映射表
3073779f3dSLoGin     pub static ref TID_EID: Mutex<HashMap<ThreadId,i32>> = Mutex::new(HashMap::new());
3173779f3dSLoGin }
3273779f3dSLoGin 
3373779f3dSLoGin /// # 调度实体内部结构
3473779f3dSLoGin #[derive(Debug, Clone)]
3573779f3dSLoGin pub struct InnerEntity {
3673779f3dSLoGin     /// 任务ID
3773779f3dSLoGin     id: i32,
3873779f3dSLoGin     file_path: PathBuf,
3973779f3dSLoGin     /// 任务
4073779f3dSLoGin     task: DADKTask,
4173779f3dSLoGin     /// 入度
4273779f3dSLoGin     indegree: usize,
4373779f3dSLoGin     /// 子节点
4473779f3dSLoGin     children: Vec<Arc<SchedEntity>>,
4573779f3dSLoGin     /// target管理
4673779f3dSLoGin     target: Option<Target>,
4773779f3dSLoGin }
4873779f3dSLoGin 
4973779f3dSLoGin /// # 调度实体
5073779f3dSLoGin #[derive(Debug)]
5173779f3dSLoGin pub struct SchedEntity {
5273779f3dSLoGin     inner: Mutex<InnerEntity>,
5373779f3dSLoGin }
5473779f3dSLoGin 
5573779f3dSLoGin impl PartialEq for SchedEntity {
5673779f3dSLoGin     fn eq(&self, other: &Self) -> bool {
5773779f3dSLoGin         self.inner.lock().unwrap().id == other.inner.lock().unwrap().id
5873779f3dSLoGin     }
5973779f3dSLoGin }
6073779f3dSLoGin 
6173779f3dSLoGin impl SchedEntity {
6273779f3dSLoGin     #[allow(dead_code)]
6373779f3dSLoGin     pub fn id(&self) -> i32 {
6473779f3dSLoGin         self.inner.lock().unwrap().id
6573779f3dSLoGin     }
6673779f3dSLoGin 
6773779f3dSLoGin     #[allow(dead_code)]
6873779f3dSLoGin     pub fn file_path(&self) -> PathBuf {
6973779f3dSLoGin         self.inner.lock().unwrap().file_path.clone()
7073779f3dSLoGin     }
7173779f3dSLoGin 
7273779f3dSLoGin     #[allow(dead_code)]
7373779f3dSLoGin     pub fn task(&self) -> DADKTask {
7473779f3dSLoGin         self.inner.lock().unwrap().task.clone()
7573779f3dSLoGin     }
7673779f3dSLoGin 
7773779f3dSLoGin     /// 入度加1
7873779f3dSLoGin     pub fn add_indegree(&self) {
7973779f3dSLoGin         self.inner.lock().unwrap().indegree += 1;
8073779f3dSLoGin     }
8173779f3dSLoGin 
8273779f3dSLoGin     /// 入度减1
8373779f3dSLoGin     pub fn sub_indegree(&self) -> usize {
8473779f3dSLoGin         self.inner.lock().unwrap().indegree -= 1;
8573779f3dSLoGin         return self.inner.lock().unwrap().indegree;
8673779f3dSLoGin     }
8773779f3dSLoGin 
8873779f3dSLoGin     /// 增加子节点
8973779f3dSLoGin     pub fn add_child(&self, entity: Arc<SchedEntity>) {
9073779f3dSLoGin         self.inner.lock().unwrap().children.push(entity);
9173779f3dSLoGin     }
9273779f3dSLoGin 
9373779f3dSLoGin     /// 获取入度
9473779f3dSLoGin     pub fn indegree(&self) -> usize {
9573779f3dSLoGin         self.inner.lock().unwrap().indegree
9673779f3dSLoGin     }
9773779f3dSLoGin 
9873779f3dSLoGin     /// 获取target
9973779f3dSLoGin     pub fn target(&self) -> Option<Target> {
10073779f3dSLoGin         self.inner.lock().unwrap().target.clone()
10173779f3dSLoGin     }
10273779f3dSLoGin 
10373779f3dSLoGin     /// 当前任务完成后,所有子节点入度减1
10473779f3dSLoGin     ///
10573779f3dSLoGin     /// ## 参数
10673779f3dSLoGin     ///
10773779f3dSLoGin     /// 无
10873779f3dSLoGin     ///
10973779f3dSLoGin     /// ## 返回值
11073779f3dSLoGin     ///
11173779f3dSLoGin     /// 所有入度为0的子节点集合
11273779f3dSLoGin     pub fn sub_children_indegree(&self) -> Vec<Arc<SchedEntity>> {
11373779f3dSLoGin         let mut zero_child = Vec::new();
11473779f3dSLoGin         let children = &self.inner.lock().unwrap().children;
11573779f3dSLoGin         for child in children.iter() {
11673779f3dSLoGin             if child.sub_indegree() == 0 {
11773779f3dSLoGin                 zero_child.push(child.clone());
11873779f3dSLoGin             }
11973779f3dSLoGin         }
12073779f3dSLoGin         return zero_child;
12173779f3dSLoGin     }
12273779f3dSLoGin }
12373779f3dSLoGin 
12473779f3dSLoGin /// # 调度实体列表
12573779f3dSLoGin ///
12673779f3dSLoGin /// 用于存储所有的调度实体
12773779f3dSLoGin #[derive(Debug)]
12873779f3dSLoGin pub struct SchedEntities {
12973779f3dSLoGin     /// 任务ID到调度实体的映射
13073779f3dSLoGin     id2entity: RwLock<BTreeMap<i32, Arc<SchedEntity>>>,
13173779f3dSLoGin }
13273779f3dSLoGin 
13373779f3dSLoGin impl SchedEntities {
13473779f3dSLoGin     pub fn new() -> Self {
13573779f3dSLoGin         Self {
13673779f3dSLoGin             id2entity: RwLock::new(BTreeMap::new()),
13773779f3dSLoGin         }
13873779f3dSLoGin     }
13973779f3dSLoGin 
14073779f3dSLoGin     pub fn add(&mut self, entity: Arc<SchedEntity>) {
14173779f3dSLoGin         self.id2entity
14273779f3dSLoGin             .write()
14373779f3dSLoGin             .unwrap()
14473779f3dSLoGin             .insert(entity.id(), entity.clone());
14573779f3dSLoGin     }
14673779f3dSLoGin 
14773779f3dSLoGin     #[allow(dead_code)]
14873779f3dSLoGin     pub fn get(&self, id: i32) -> Option<Arc<SchedEntity>> {
14973779f3dSLoGin         self.id2entity.read().unwrap().get(&id).cloned()
15073779f3dSLoGin     }
15173779f3dSLoGin 
15273779f3dSLoGin     pub fn get_by_name_version(&self, name: &str, version: &str) -> Option<Arc<SchedEntity>> {
15373779f3dSLoGin         for e in self.id2entity.read().unwrap().iter() {
15473779f3dSLoGin             if e.1.task().name_version_env() == DADKTask::name_version_uppercase(name, version) {
15573779f3dSLoGin                 return Some(e.1.clone());
15673779f3dSLoGin             }
15773779f3dSLoGin         }
15873779f3dSLoGin         return None;
15973779f3dSLoGin     }
16073779f3dSLoGin 
16173779f3dSLoGin     pub fn entities(&self) -> Vec<Arc<SchedEntity>> {
16273779f3dSLoGin         let mut v = Vec::new();
16373779f3dSLoGin         for e in self.id2entity.read().unwrap().iter() {
16473779f3dSLoGin             v.push(e.1.clone());
16573779f3dSLoGin         }
16673779f3dSLoGin         return v;
16773779f3dSLoGin     }
16873779f3dSLoGin 
16973779f3dSLoGin     pub fn id2entity(&self) -> BTreeMap<i32, Arc<SchedEntity>> {
17073779f3dSLoGin         self.id2entity.read().unwrap().clone()
17173779f3dSLoGin     }
17273779f3dSLoGin 
17373779f3dSLoGin     #[allow(dead_code)]
17473779f3dSLoGin     pub fn len(&self) -> usize {
17573779f3dSLoGin         self.id2entity.read().unwrap().len()
17673779f3dSLoGin     }
17773779f3dSLoGin 
17873779f3dSLoGin     #[allow(dead_code)]
17973779f3dSLoGin     pub fn is_empty(&self) -> bool {
18073779f3dSLoGin         self.id2entity.read().unwrap().is_empty()
18173779f3dSLoGin     }
18273779f3dSLoGin 
18373779f3dSLoGin     #[allow(dead_code)]
18473779f3dSLoGin     pub fn clear(&mut self) {
18573779f3dSLoGin         self.id2entity.write().unwrap().clear();
18673779f3dSLoGin     }
18773779f3dSLoGin 
18873779f3dSLoGin     pub fn topo_sort(&self) -> Vec<Arc<SchedEntity>> {
18973779f3dSLoGin         let mut result = Vec::new();
19073779f3dSLoGin         let mut visited = BTreeMap::new();
19173779f3dSLoGin         let btree = self.id2entity.write().unwrap().clone();
19273779f3dSLoGin         for entity in btree.iter() {
19373779f3dSLoGin             if !visited.contains_key(entity.0) {
19473779f3dSLoGin                 let r = self.dfs(entity.1, &mut visited, &mut result);
19573779f3dSLoGin                 if r.is_err() {
19673779f3dSLoGin                     let err = r.unwrap_err();
19773779f3dSLoGin                     error!("{}", err.display());
19873779f3dSLoGin                     println!("Please fix the errors above and try again.");
19973779f3dSLoGin                     std::process::exit(1);
20073779f3dSLoGin                 }
20173779f3dSLoGin             }
20273779f3dSLoGin         }
20373779f3dSLoGin         return result;
20473779f3dSLoGin     }
20573779f3dSLoGin 
20673779f3dSLoGin     fn dfs(
20773779f3dSLoGin         &self,
20873779f3dSLoGin         entity: &Arc<SchedEntity>,
20973779f3dSLoGin         visited: &mut BTreeMap<i32, bool>,
21073779f3dSLoGin         result: &mut Vec<Arc<SchedEntity>>,
21173779f3dSLoGin     ) -> Result<(), DependencyCycleError> {
21273779f3dSLoGin         visited.insert(entity.id(), false);
21373779f3dSLoGin         for dep in entity.task().depends.iter() {
21473779f3dSLoGin             if let Some(dep_entity) = self.get_by_name_version(&dep.name, &dep.version) {
21573779f3dSLoGin                 let guard = self.id2entity.write().unwrap();
21673779f3dSLoGin                 let e = guard.get(&entity.id()).unwrap();
21773779f3dSLoGin                 let d = guard.get(&dep_entity.id()).unwrap();
21873779f3dSLoGin                 e.add_indegree();
21973779f3dSLoGin                 d.add_child(e.clone());
22073779f3dSLoGin                 if let Some(&false) = visited.get(&dep_entity.id()) {
22173779f3dSLoGin                     // 输出完整环形依赖
22273779f3dSLoGin                     let mut err = DependencyCycleError::new(dep_entity.clone());
22373779f3dSLoGin 
22473779f3dSLoGin                     err.add(entity.clone(), dep_entity);
22573779f3dSLoGin                     return Err(err);
22673779f3dSLoGin                 }
22773779f3dSLoGin                 if !visited.contains_key(&dep_entity.id()) {
22873779f3dSLoGin                     drop(guard);
22973779f3dSLoGin                     let r = self.dfs(&dep_entity, visited, result);
23073779f3dSLoGin                     if r.is_err() {
23173779f3dSLoGin                         let mut err: DependencyCycleError = r.unwrap_err();
23273779f3dSLoGin                         // 如果错误已经停止传播,则直接返回
23373779f3dSLoGin                         if err.stop_propagation {
23473779f3dSLoGin                             return Err(err);
23573779f3dSLoGin                         }
23673779f3dSLoGin                         // 如果当前实体是错误的起始实体,则停止传播
23773779f3dSLoGin                         if entity == &err.head_entity {
23873779f3dSLoGin                             err.stop_propagation();
23973779f3dSLoGin                         }
24073779f3dSLoGin                         err.add(entity.clone(), dep_entity);
24173779f3dSLoGin                         return Err(err);
24273779f3dSLoGin                     }
24373779f3dSLoGin                 }
24473779f3dSLoGin             } else {
24573779f3dSLoGin                 error!(
24673779f3dSLoGin                     "Dependency not found: {} -> {}",
24773779f3dSLoGin                     entity.task().name_version(),
24873779f3dSLoGin                     dep.name_version()
24973779f3dSLoGin                 );
25073779f3dSLoGin                 std::process::exit(1);
25173779f3dSLoGin             }
25273779f3dSLoGin         }
25373779f3dSLoGin         visited.insert(entity.id(), true);
25473779f3dSLoGin         result.push(entity.clone());
25573779f3dSLoGin         return Ok(());
25673779f3dSLoGin     }
25773779f3dSLoGin }
25873779f3dSLoGin 
25973779f3dSLoGin /// # 任务调度器
26073779f3dSLoGin #[derive(Debug)]
26173779f3dSLoGin pub struct Scheduler {
26273779f3dSLoGin     /// DragonOS sysroot在主机上的路径
263*70352fd6SLoGin     sysroot_dir: PathBuf,
26473779f3dSLoGin     /// 要执行的操作
26573779f3dSLoGin     action: Action,
26673779f3dSLoGin     /// 调度实体列表
26773779f3dSLoGin     target: SchedEntities,
26873779f3dSLoGin     /// dadk执行的上下文
26973779f3dSLoGin     context: Arc<DadkUserExecuteContext>,
27073779f3dSLoGin }
27173779f3dSLoGin 
27273779f3dSLoGin pub enum SchedulerError {
27373779f3dSLoGin     TaskError(String),
27473779f3dSLoGin     /// 不是当前正在编译的目标架构
27573779f3dSLoGin     InvalidTargetArch(String),
27673779f3dSLoGin     DependencyNotFound(Arc<SchedEntity>, String),
27773779f3dSLoGin     RunError(String),
27873779f3dSLoGin }
27973779f3dSLoGin 
28073779f3dSLoGin impl Debug for SchedulerError {
28173779f3dSLoGin     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28273779f3dSLoGin         match self {
28373779f3dSLoGin             Self::TaskError(arg0) => {
28473779f3dSLoGin                 write!(f, "TaskError: {}", arg0)
28573779f3dSLoGin             }
28673779f3dSLoGin             SchedulerError::DependencyNotFound(current, msg) => {
28773779f3dSLoGin                 write!(
28873779f3dSLoGin                     f,
28973779f3dSLoGin                     "For task {}, dependency not found: {}. Please check file: {}",
29073779f3dSLoGin                     current.task().name_version(),
29173779f3dSLoGin                     msg,
29273779f3dSLoGin                     current.file_path().display()
29373779f3dSLoGin                 )
29473779f3dSLoGin             }
29573779f3dSLoGin             SchedulerError::RunError(msg) => {
29673779f3dSLoGin                 write!(f, "RunError: {}", msg)
29773779f3dSLoGin             }
29873779f3dSLoGin             SchedulerError::InvalidTargetArch(msg) => {
29973779f3dSLoGin                 write!(f, "InvalidTargetArch: {}", msg)
30073779f3dSLoGin             }
30173779f3dSLoGin         }
30273779f3dSLoGin     }
30373779f3dSLoGin }
30473779f3dSLoGin 
30573779f3dSLoGin impl Scheduler {
30673779f3dSLoGin     pub fn new(
30773779f3dSLoGin         context: Arc<DadkUserExecuteContext>,
30873779f3dSLoGin         dragonos_dir: PathBuf,
30973779f3dSLoGin         action: Action,
31073779f3dSLoGin         tasks: Vec<(PathBuf, DADKTask)>,
31173779f3dSLoGin     ) -> Result<Self, SchedulerError> {
31273779f3dSLoGin         let entities = SchedEntities::new();
31373779f3dSLoGin 
31473779f3dSLoGin         let mut scheduler = Scheduler {
315*70352fd6SLoGin             sysroot_dir: dragonos_dir,
31673779f3dSLoGin             action,
31773779f3dSLoGin             target: entities,
31873779f3dSLoGin             context,
31973779f3dSLoGin         };
32073779f3dSLoGin 
32173779f3dSLoGin         let r = scheduler.add_tasks(tasks);
32273779f3dSLoGin         if r.is_err() {
32373779f3dSLoGin             error!("Error while adding tasks: {:?}", r);
32473779f3dSLoGin             return Err(r.err().unwrap());
32573779f3dSLoGin         }
32673779f3dSLoGin 
32773779f3dSLoGin         return Ok(scheduler);
32873779f3dSLoGin     }
32973779f3dSLoGin 
33073779f3dSLoGin     /// # 添加多个任务
33173779f3dSLoGin     ///
33273779f3dSLoGin     /// 添加任务到调度器中,如果任务已经存在,则返回错误
33373779f3dSLoGin     pub fn add_tasks(&mut self, tasks: Vec<(PathBuf, DADKTask)>) -> Result<(), SchedulerError> {
33473779f3dSLoGin         for task in tasks {
33573779f3dSLoGin             let e = self.add_task(task.0, task.1);
33673779f3dSLoGin             if e.is_err() {
33773779f3dSLoGin                 if let Err(SchedulerError::InvalidTargetArch(_)) = &e {
33873779f3dSLoGin                     continue;
33973779f3dSLoGin                 }
34073779f3dSLoGin                 e?;
34173779f3dSLoGin             }
34273779f3dSLoGin         }
34373779f3dSLoGin 
34473779f3dSLoGin         return Ok(());
34573779f3dSLoGin     }
34673779f3dSLoGin 
34773779f3dSLoGin     /// # 任务是否匹配当前目标架构
34873779f3dSLoGin     pub fn task_arch_matched(&self, task: &DADKTask) -> bool {
34973779f3dSLoGin         task.target_arch.contains(self.context.target_arch())
35073779f3dSLoGin     }
35173779f3dSLoGin 
35273779f3dSLoGin     /// # 添加一个任务
35373779f3dSLoGin     ///
35473779f3dSLoGin     /// 添加任务到调度器中,如果任务已经存在,则返回错误
35573779f3dSLoGin     pub fn add_task(
35673779f3dSLoGin         &mut self,
35773779f3dSLoGin         path: PathBuf,
35873779f3dSLoGin         task: DADKTask,
35973779f3dSLoGin     ) -> Result<Arc<SchedEntity>, SchedulerError> {
36073779f3dSLoGin         if !self.task_arch_matched(&task) {
36173779f3dSLoGin             return Err(SchedulerError::InvalidTargetArch(format!(
36273779f3dSLoGin                 "Task {} is not for target arch: {:?}",
36373779f3dSLoGin                 task.name_version(),
36473779f3dSLoGin                 self.context.target_arch()
36573779f3dSLoGin             )));
36673779f3dSLoGin         }
36773779f3dSLoGin 
36873779f3dSLoGin         let id: i32 = self.generate_task_id();
36973779f3dSLoGin         let indegree: usize = 0;
37073779f3dSLoGin         let children = Vec::new();
37173779f3dSLoGin         let target = self.generate_task_target(&path, &task.rust_target)?;
37273779f3dSLoGin         let entity = Arc::new(SchedEntity {
37373779f3dSLoGin             inner: Mutex::new(InnerEntity {
37473779f3dSLoGin                 id,
37573779f3dSLoGin                 task,
37673779f3dSLoGin                 file_path: path.clone(),
37773779f3dSLoGin                 indegree,
37873779f3dSLoGin                 children,
37973779f3dSLoGin                 target,
38073779f3dSLoGin             }),
38173779f3dSLoGin         });
38273779f3dSLoGin         let name_version = (entity.task().name.clone(), entity.task().version.clone());
38373779f3dSLoGin 
38473779f3dSLoGin         if self
38573779f3dSLoGin             .target
38673779f3dSLoGin             .get_by_name_version(&name_version.0, &name_version.1)
38773779f3dSLoGin             .is_some()
38873779f3dSLoGin         {
38973779f3dSLoGin             return Err(SchedulerError::TaskError(format!(
39073779f3dSLoGin                 "Task with name [{}] and version [{}] already exists. Config file: {}",
39173779f3dSLoGin                 name_version.0,
39273779f3dSLoGin                 name_version.1,
39373779f3dSLoGin                 path.display()
39473779f3dSLoGin             )));
39573779f3dSLoGin         }
39673779f3dSLoGin 
39773779f3dSLoGin         self.target.add(entity.clone());
39873779f3dSLoGin 
39973779f3dSLoGin         info!("Task added: {}", entity.task().name_version());
40073779f3dSLoGin         return Ok(entity);
40173779f3dSLoGin     }
40273779f3dSLoGin 
40373779f3dSLoGin     fn generate_task_id(&self) -> i32 {
40473779f3dSLoGin         static TASK_ID: AtomicI32 = AtomicI32::new(0);
40573779f3dSLoGin         return TASK_ID.fetch_add(1, Ordering::SeqCst);
40673779f3dSLoGin     }
40773779f3dSLoGin 
40873779f3dSLoGin     fn generate_task_target(
40973779f3dSLoGin         &self,
41073779f3dSLoGin         path: &PathBuf,
41173779f3dSLoGin         rust_target: &Option<String>,
41273779f3dSLoGin     ) -> Result<Option<Target>, SchedulerError> {
41373779f3dSLoGin         if let Some(rust_target) = rust_target {
41473779f3dSLoGin             // 如果rust_target字段不为none,说明需要target管理
41573779f3dSLoGin             // 获取dadk任务路径,用于生成临时dadk文件名
41673779f3dSLoGin             let file_str = path.as_path().to_str().unwrap();
41773779f3dSLoGin             let tmp_dadk_path = Target::tmp_dadk(file_str);
41873779f3dSLoGin             let tmp_dadk_str = tmp_dadk_path.as_path().to_str().unwrap();
41973779f3dSLoGin 
42073779f3dSLoGin             if Target::is_user_target(rust_target) {
42173779f3dSLoGin                 // 如果target文件是用户自己的
42273779f3dSLoGin                 if let Ok(target_path) = Target::user_target_path(rust_target) {
42373779f3dSLoGin                     let target_path_str = target_path.as_path().to_str().unwrap();
42473779f3dSLoGin                     let index = target_path_str.rfind('/').unwrap();
42573779f3dSLoGin                     let target_name = target_path_str[index + 1..].to_string();
42673779f3dSLoGin                     let tmp_target = PathBuf::from(format!("{}{}", tmp_dadk_str, target_name));
42773779f3dSLoGin                     return Ok(Some(Target::new(tmp_target)));
42873779f3dSLoGin                 } else {
42973779f3dSLoGin                     return Err(SchedulerError::TaskError(
43073779f3dSLoGin                         "The path of target file is invalid.".to_string(),
43173779f3dSLoGin                     ));
43273779f3dSLoGin                 }
43373779f3dSLoGin             } else {
43473779f3dSLoGin                 // 如果target文件是内置的
43573779f3dSLoGin                 let tmp_target = PathBuf::from(format!("{}{}.json", tmp_dadk_str, rust_target));
43673779f3dSLoGin                 return Ok(Some(Target::new(tmp_target)));
43773779f3dSLoGin             }
43873779f3dSLoGin         }
43973779f3dSLoGin         return Ok(None);
44073779f3dSLoGin     }
44173779f3dSLoGin 
44273779f3dSLoGin     /// # 执行调度器中的所有任务
44373779f3dSLoGin     pub fn run(&self) -> Result<(), SchedulerError> {
44473779f3dSLoGin         // 准备全局环境变量
44573779f3dSLoGin         crate::executor::prepare_env(&self.target, &self.context)
44673779f3dSLoGin             .map_err(|e| SchedulerError::RunError(format!("{:?}", e)))?;
44773779f3dSLoGin 
44873779f3dSLoGin         match self.action {
44973779f3dSLoGin             Action::Build | Action::Install => {
45073779f3dSLoGin                 self.run_with_topo_sort()?;
45173779f3dSLoGin             }
45273779f3dSLoGin             Action::Clean(_) => self.run_without_topo_sort()?,
45373779f3dSLoGin             _ => unimplemented!(),
45473779f3dSLoGin         }
45573779f3dSLoGin 
45673779f3dSLoGin         return Ok(());
45773779f3dSLoGin     }
45873779f3dSLoGin 
45973779f3dSLoGin     /// Action需要按照拓扑序执行
46073779f3dSLoGin     ///
46173779f3dSLoGin     /// Action::Build | Action::Install
46273779f3dSLoGin     fn run_with_topo_sort(&self) -> Result<(), SchedulerError> {
46373779f3dSLoGin         // 检查是否有不存在的依赖
46473779f3dSLoGin         let r = self.check_not_exists_dependency();
46573779f3dSLoGin         if r.is_err() {
46673779f3dSLoGin             error!("Error while checking tasks: {:?}", r);
46773779f3dSLoGin             return r;
46873779f3dSLoGin         }
46973779f3dSLoGin 
47073779f3dSLoGin         // 对调度实体进行拓扑排序
47173779f3dSLoGin         let r: Vec<Arc<SchedEntity>> = self.target.topo_sort();
47273779f3dSLoGin 
47373779f3dSLoGin         let action = self.action.clone();
474*70352fd6SLoGin         let dragonos_dir = self.sysroot_dir.clone();
47573779f3dSLoGin         let id2entity = self.target.id2entity();
47673779f3dSLoGin         let count = r.len();
47773779f3dSLoGin 
47873779f3dSLoGin         // 启动守护线程
47973779f3dSLoGin         let handler = std::thread::spawn(move || {
48073779f3dSLoGin             Self::build_install_daemon(action, dragonos_dir, id2entity, count, &r)
48173779f3dSLoGin         });
48273779f3dSLoGin 
48373779f3dSLoGin         handler.join().expect("Could not join deamon");
48473779f3dSLoGin 
48573779f3dSLoGin         return Ok(());
48673779f3dSLoGin     }
48773779f3dSLoGin 
48873779f3dSLoGin     /// Action不需要按照拓扑序执行
48973779f3dSLoGin     fn run_without_topo_sort(&self) -> Result<(), SchedulerError> {
49073779f3dSLoGin         // 启动守护线程
49173779f3dSLoGin         let action = self.action.clone();
492*70352fd6SLoGin         let dragonos_dir = self.sysroot_dir.clone();
49373779f3dSLoGin         let mut r = self.target.entities();
49473779f3dSLoGin         let handler = std::thread::spawn(move || {
49573779f3dSLoGin             Self::clean_daemon(action, dragonos_dir, &mut r);
49673779f3dSLoGin         });
49773779f3dSLoGin 
49873779f3dSLoGin         handler.join().expect("Could not join deamon");
49973779f3dSLoGin         return Ok(());
50073779f3dSLoGin     }
50173779f3dSLoGin 
50273779f3dSLoGin     pub fn execute(action: Action, dragonos_dir: PathBuf, entity: Arc<SchedEntity>) {
50373779f3dSLoGin         let mut executor = Executor::new(entity.clone(), action.clone(), dragonos_dir.clone())
50473779f3dSLoGin             .map_err(|e| {
50573779f3dSLoGin                 error!(
50673779f3dSLoGin                     "Error while creating executor for task {} : {:?}",
50773779f3dSLoGin                     entity.task().name_version(),
50873779f3dSLoGin                     e
50973779f3dSLoGin                 );
51073779f3dSLoGin                 exit(-1);
51173779f3dSLoGin             })
51273779f3dSLoGin             .unwrap();
51373779f3dSLoGin 
51473779f3dSLoGin         executor
51573779f3dSLoGin             .execute()
51673779f3dSLoGin             .map_err(|e| {
51773779f3dSLoGin                 error!(
51873779f3dSLoGin                     "Error while executing task {} : {:?}",
51973779f3dSLoGin                     entity.task().name_version(),
52073779f3dSLoGin                     e
52173779f3dSLoGin                 );
52273779f3dSLoGin                 exit(-1);
52373779f3dSLoGin             })
52473779f3dSLoGin             .unwrap();
52573779f3dSLoGin     }
52673779f3dSLoGin 
52773779f3dSLoGin     /// 构建和安装DADK任务的守护线程
52873779f3dSLoGin     ///
52973779f3dSLoGin     /// ## 参数
53073779f3dSLoGin     ///
53173779f3dSLoGin     /// - `action` : 要执行的操作
53273779f3dSLoGin     /// - `dragonos_dir` : DragonOS sysroot在主机上的路径
53373779f3dSLoGin     /// - `id2entity` : DADK任务id与实体映射表
53473779f3dSLoGin     /// - `count` : 当前剩余任务数
53573779f3dSLoGin     /// - `r` : 总任务实体表
53673779f3dSLoGin     ///
53773779f3dSLoGin     /// ## 返回值
53873779f3dSLoGin     ///
53973779f3dSLoGin     /// 无
54073779f3dSLoGin     pub fn build_install_daemon(
54173779f3dSLoGin         action: Action,
54273779f3dSLoGin         dragonos_dir: PathBuf,
54373779f3dSLoGin         id2entity: BTreeMap<i32, Arc<SchedEntity>>,
54473779f3dSLoGin         mut count: usize,
54573779f3dSLoGin         r: &Vec<Arc<SchedEntity>>,
54673779f3dSLoGin     ) {
54773779f3dSLoGin         let mut guard = TASK_DEQUE.lock().unwrap();
54873779f3dSLoGin         // 初始化0入度的任务实体
54973779f3dSLoGin         let mut zero_entity: Vec<Arc<SchedEntity>> = Vec::new();
55073779f3dSLoGin         for e in r.iter() {
55173779f3dSLoGin             if e.indegree() == 0 {
55273779f3dSLoGin                 zero_entity.push(e.clone());
55373779f3dSLoGin             }
55473779f3dSLoGin         }
55573779f3dSLoGin 
55673779f3dSLoGin         while count > 0 {
55773779f3dSLoGin             // 将入度为0的任务实体加入任务队列中,直至没有入度为0的任务实体 或 任务队列满了
55873779f3dSLoGin             while !zero_entity.is_empty()
55973779f3dSLoGin                 && guard.build_install_task(
56073779f3dSLoGin                     action.clone(),
56173779f3dSLoGin                     dragonos_dir.clone(),
56273779f3dSLoGin                     zero_entity.last().unwrap().clone(),
56373779f3dSLoGin                 )
56473779f3dSLoGin             {
56573779f3dSLoGin                 zero_entity.pop();
56673779f3dSLoGin             }
56773779f3dSLoGin 
56873779f3dSLoGin             let queue = guard.queue_mut();
56973779f3dSLoGin             // 如果任务线程已完成,将其从任务队列中删除,并把它的子节点入度减1,如果有0入度子节点,则加入zero_entity,后续可以加入任务队列中
57073779f3dSLoGin             queue.retain(|x| {
57173779f3dSLoGin                 if x.is_finished() {
57273779f3dSLoGin                     count -= 1;
57373779f3dSLoGin                     let tid = x.thread().id();
57473779f3dSLoGin                     let eid = *TID_EID.lock().unwrap().get(&tid).unwrap();
57573779f3dSLoGin                     let entity = id2entity.get(&eid).unwrap();
57673779f3dSLoGin                     let zero = entity.sub_children_indegree();
57773779f3dSLoGin                     for e in zero.iter() {
57873779f3dSLoGin                         zero_entity.push(e.clone());
57973779f3dSLoGin                     }
58073779f3dSLoGin                     return false;
58173779f3dSLoGin                 }
58273779f3dSLoGin                 return true;
58373779f3dSLoGin             })
58473779f3dSLoGin         }
58573779f3dSLoGin     }
58673779f3dSLoGin 
58773779f3dSLoGin     /// 清理DADK任务的守护线程
58873779f3dSLoGin     ///
58973779f3dSLoGin     /// ## 参数
59073779f3dSLoGin     ///
59173779f3dSLoGin     /// - `action` : 要执行的操作
59273779f3dSLoGin     /// - `dragonos_dir` : DragonOS sysroot在主机上的路径
59373779f3dSLoGin     /// - `r` : 总任务实体表
59473779f3dSLoGin     ///
59573779f3dSLoGin     /// ## 返回值
59673779f3dSLoGin     ///
59773779f3dSLoGin     /// 无
59873779f3dSLoGin     pub fn clean_daemon(action: Action, dragonos_dir: PathBuf, r: &mut Vec<Arc<SchedEntity>>) {
59973779f3dSLoGin         let mut guard = TASK_DEQUE.lock().unwrap();
60073779f3dSLoGin         while !guard.queue().is_empty() && !r.is_empty() {
60173779f3dSLoGin             guard.clean_task(action, dragonos_dir.clone(), r.pop().unwrap().clone());
60273779f3dSLoGin         }
60373779f3dSLoGin     }
60473779f3dSLoGin 
60573779f3dSLoGin     /// # 检查是否有不存在的依赖
60673779f3dSLoGin     ///
60773779f3dSLoGin     /// 如果某个任务的dependency中的任务不存在,则返回错误
60873779f3dSLoGin     fn check_not_exists_dependency(&self) -> Result<(), SchedulerError> {
60973779f3dSLoGin         for entity in self.target.entities().iter() {
61073779f3dSLoGin             for dependency in entity.task().depends.iter() {
61173779f3dSLoGin                 let name_version = (dependency.name.clone(), dependency.version.clone());
61273779f3dSLoGin                 if !self
61373779f3dSLoGin                     .target
61473779f3dSLoGin                     .get_by_name_version(&name_version.0, &name_version.1)
61573779f3dSLoGin                     .is_some()
61673779f3dSLoGin                 {
61773779f3dSLoGin                     return Err(SchedulerError::DependencyNotFound(
61873779f3dSLoGin                         entity.clone(),
61973779f3dSLoGin                         format!("name:{}, version:{}", name_version.0, name_version.1,),
62073779f3dSLoGin                     ));
62173779f3dSLoGin                 }
62273779f3dSLoGin             }
62373779f3dSLoGin         }
62473779f3dSLoGin 
62573779f3dSLoGin         return Ok(());
62673779f3dSLoGin     }
62773779f3dSLoGin }
62873779f3dSLoGin 
62973779f3dSLoGin /// # 环形依赖错误路径
63073779f3dSLoGin ///
63173779f3dSLoGin /// 本结构体用于在回溯过程中记录环形依赖的路径。
63273779f3dSLoGin ///
63373779f3dSLoGin /// 例如,假设有如下依赖关系:
63473779f3dSLoGin ///
63573779f3dSLoGin /// ```text
63673779f3dSLoGin /// A -> B -> C -> D -> A
63773779f3dSLoGin /// ```
63873779f3dSLoGin ///
63973779f3dSLoGin /// 则在DFS回溯过程中,会依次记录如下路径:
64073779f3dSLoGin ///
64173779f3dSLoGin /// ```text
64273779f3dSLoGin /// D -> A
64373779f3dSLoGin /// C -> D
64473779f3dSLoGin /// B -> C
64573779f3dSLoGin /// A -> B
64673779f3dSLoGin pub struct DependencyCycleError {
64773779f3dSLoGin     /// # 起始实体
64873779f3dSLoGin     /// 本错误的起始实体,即环形依赖的起点
64973779f3dSLoGin     head_entity: Arc<SchedEntity>,
65073779f3dSLoGin     /// 是否停止传播
65173779f3dSLoGin     stop_propagation: bool,
65273779f3dSLoGin     /// 依赖关系
65373779f3dSLoGin     dependencies: Vec<(Arc<SchedEntity>, Arc<SchedEntity>)>,
65473779f3dSLoGin }
65573779f3dSLoGin 
65673779f3dSLoGin impl DependencyCycleError {
65773779f3dSLoGin     pub fn new(head_entity: Arc<SchedEntity>) -> Self {
65873779f3dSLoGin         Self {
65973779f3dSLoGin             head_entity,
66073779f3dSLoGin             stop_propagation: false,
66173779f3dSLoGin             dependencies: Vec::new(),
66273779f3dSLoGin         }
66373779f3dSLoGin     }
66473779f3dSLoGin 
66573779f3dSLoGin     pub fn add(&mut self, current: Arc<SchedEntity>, dependency: Arc<SchedEntity>) {
66673779f3dSLoGin         self.dependencies.push((current, dependency));
66773779f3dSLoGin     }
66873779f3dSLoGin 
66973779f3dSLoGin     pub fn stop_propagation(&mut self) {
67073779f3dSLoGin         self.stop_propagation = true;
67173779f3dSLoGin     }
67273779f3dSLoGin 
67373779f3dSLoGin     #[allow(dead_code)]
67473779f3dSLoGin     pub fn dependencies(&self) -> &Vec<(Arc<SchedEntity>, Arc<SchedEntity>)> {
67573779f3dSLoGin         &self.dependencies
67673779f3dSLoGin     }
67773779f3dSLoGin 
67873779f3dSLoGin     pub fn display(&self) -> String {
67973779f3dSLoGin         let mut tmp = self.dependencies.clone();
68073779f3dSLoGin         tmp.reverse();
68173779f3dSLoGin 
68273779f3dSLoGin         let mut ret = format!("Dependency cycle detected: \nStart ->\n");
68373779f3dSLoGin         for (current, dep) in tmp.iter() {
68473779f3dSLoGin             ret.push_str(&format!(
68573779f3dSLoGin                 "->\t{} ({})\t--depends-->\t{} ({})\n",
68673779f3dSLoGin                 current.task().name_version(),
68773779f3dSLoGin                 current.file_path().display(),
68873779f3dSLoGin                 dep.task().name_version(),
68973779f3dSLoGin                 dep.file_path().display()
69073779f3dSLoGin             ));
69173779f3dSLoGin         }
69273779f3dSLoGin         ret.push_str("-> End");
69373779f3dSLoGin         return ret;
69473779f3dSLoGin     }
69573779f3dSLoGin }
696