xref: /DADK/dadk-user/src/scheduler/mod.rs (revision d2ade6ef037f43d0342021730a0d8b3af7bc36c1)
173779f3dSLoGin use std::{
273779f3dSLoGin     collections::{BTreeMap, HashMap},
373779f3dSLoGin     fmt::Debug,
473779f3dSLoGin     path::PathBuf,
573779f3dSLoGin     process::exit,
673779f3dSLoGin     sync::{
773779f3dSLoGin         atomic::{AtomicI32, Ordering},
873779f3dSLoGin         Arc, Mutex, RwLock,
973779f3dSLoGin     },
1073779f3dSLoGin     thread::ThreadId,
1173779f3dSLoGin };
1273779f3dSLoGin 
1373779f3dSLoGin use log::{error, info};
1473779f3dSLoGin 
1573779f3dSLoGin use crate::{
16eaa67f3cSLoGin     context::{Action, DadkUserExecuteContext},
17*d2ade6efSJomo     executor::Executor,
1873779f3dSLoGin     parser::task::DADKTask,
1973779f3dSLoGin };
2073779f3dSLoGin 
2173779f3dSLoGin use self::task_deque::TASK_DEQUE;
2273779f3dSLoGin 
2373779f3dSLoGin pub mod task_deque;
2473779f3dSLoGin #[cfg(test)]
2573779f3dSLoGin mod tests;
2673779f3dSLoGin 
2773779f3dSLoGin lazy_static! {
2873779f3dSLoGin     // 线程id与任务实体id映射表
2973779f3dSLoGin     pub static ref TID_EID: Mutex<HashMap<ThreadId,i32>> = Mutex::new(HashMap::new());
3073779f3dSLoGin }
3173779f3dSLoGin 
3273779f3dSLoGin /// # 调度实体内部结构
3373779f3dSLoGin #[derive(Debug, Clone)]
3473779f3dSLoGin pub struct InnerEntity {
3573779f3dSLoGin     /// 任务ID
3673779f3dSLoGin     id: i32,
3773779f3dSLoGin     file_path: PathBuf,
3873779f3dSLoGin     /// 任务
3973779f3dSLoGin     task: DADKTask,
4073779f3dSLoGin     /// 入度
4173779f3dSLoGin     indegree: usize,
4273779f3dSLoGin     /// 子节点
4373779f3dSLoGin     children: Vec<Arc<SchedEntity>>,
4473779f3dSLoGin }
4573779f3dSLoGin 
4673779f3dSLoGin /// # 调度实体
4773779f3dSLoGin #[derive(Debug)]
4873779f3dSLoGin pub struct SchedEntity {
4973779f3dSLoGin     inner: Mutex<InnerEntity>,
5073779f3dSLoGin }
5173779f3dSLoGin 
5273779f3dSLoGin impl PartialEq for SchedEntity {
5373779f3dSLoGin     fn eq(&self, other: &Self) -> bool {
5473779f3dSLoGin         self.inner.lock().unwrap().id == other.inner.lock().unwrap().id
5573779f3dSLoGin     }
eq(&self, other: &Self) -> bool5673779f3dSLoGin }
5773779f3dSLoGin 
5873779f3dSLoGin impl SchedEntity {
5973779f3dSLoGin     #[allow(dead_code)]
6073779f3dSLoGin     pub fn id(&self) -> i32 {
6173779f3dSLoGin         self.inner.lock().unwrap().id
6273779f3dSLoGin     }
6373779f3dSLoGin 
6473779f3dSLoGin     #[allow(dead_code)]
6573779f3dSLoGin     pub fn file_path(&self) -> PathBuf {
6673779f3dSLoGin         self.inner.lock().unwrap().file_path.clone()
6773779f3dSLoGin     }
6873779f3dSLoGin 
6973779f3dSLoGin     #[allow(dead_code)]
7073779f3dSLoGin     pub fn task(&self) -> DADKTask {
7173779f3dSLoGin         self.inner.lock().unwrap().task.clone()
7273779f3dSLoGin     }
7373779f3dSLoGin 
7473779f3dSLoGin     /// 入度加1
7573779f3dSLoGin     pub fn add_indegree(&self) {
7673779f3dSLoGin         self.inner.lock().unwrap().indegree += 1;
7773779f3dSLoGin     }
7873779f3dSLoGin 
7973779f3dSLoGin     /// 入度减1
8073779f3dSLoGin     pub fn sub_indegree(&self) -> usize {
8173779f3dSLoGin         self.inner.lock().unwrap().indegree -= 1;
8273779f3dSLoGin         return self.inner.lock().unwrap().indegree;
8373779f3dSLoGin     }
8473779f3dSLoGin 
8573779f3dSLoGin     /// 增加子节点
8673779f3dSLoGin     pub fn add_child(&self, entity: Arc<SchedEntity>) {
8773779f3dSLoGin         self.inner.lock().unwrap().children.push(entity);
8873779f3dSLoGin     }
8973779f3dSLoGin 
9073779f3dSLoGin     /// 获取入度
9173779f3dSLoGin     pub fn indegree(&self) -> usize {
9273779f3dSLoGin         self.inner.lock().unwrap().indegree
9373779f3dSLoGin     }
9473779f3dSLoGin 
9573779f3dSLoGin     /// 当前任务完成后,所有子节点入度减1
9673779f3dSLoGin     ///
9773779f3dSLoGin     /// ## 参数
9873779f3dSLoGin     ///
9973779f3dSLoGin     /// 无
10073779f3dSLoGin     ///
10173779f3dSLoGin     /// ## 返回值
10273779f3dSLoGin     ///
10373779f3dSLoGin     /// 所有入度为0的子节点集合
10473779f3dSLoGin     pub fn sub_children_indegree(&self) -> Vec<Arc<SchedEntity>> {
10573779f3dSLoGin         let mut zero_child = Vec::new();
10673779f3dSLoGin         let children = &self.inner.lock().unwrap().children;
10773779f3dSLoGin         for child in children.iter() {
10873779f3dSLoGin             if child.sub_indegree() == 0 {
10973779f3dSLoGin                 zero_child.push(child.clone());
11073779f3dSLoGin             }
11173779f3dSLoGin         }
11273779f3dSLoGin         return zero_child;
11373779f3dSLoGin     }
11473779f3dSLoGin }
11573779f3dSLoGin 
11673779f3dSLoGin /// # 调度实体列表
11773779f3dSLoGin ///
11873779f3dSLoGin /// 用于存储所有的调度实体
11973779f3dSLoGin #[derive(Debug)]
12073779f3dSLoGin pub struct SchedEntities {
12173779f3dSLoGin     /// 任务ID到调度实体的映射
12273779f3dSLoGin     id2entity: RwLock<BTreeMap<i32, Arc<SchedEntity>>>,
12373779f3dSLoGin }
12473779f3dSLoGin 
12573779f3dSLoGin impl SchedEntities {
12673779f3dSLoGin     pub fn new() -> Self {
12773779f3dSLoGin         Self {
12873779f3dSLoGin             id2entity: RwLock::new(BTreeMap::new()),
12973779f3dSLoGin         }
13073779f3dSLoGin     }
13173779f3dSLoGin 
13273779f3dSLoGin     pub fn add(&mut self, entity: Arc<SchedEntity>) {
13373779f3dSLoGin         self.id2entity
new() -> Self13473779f3dSLoGin             .write()
13573779f3dSLoGin             .unwrap()
13673779f3dSLoGin             .insert(entity.id(), entity.clone());
13773779f3dSLoGin     }
13873779f3dSLoGin 
13973779f3dSLoGin     #[allow(dead_code)]
add(&mut self, entity: Arc<SchedEntity>)14073779f3dSLoGin     pub fn get(&self, id: i32) -> Option<Arc<SchedEntity>> {
14173779f3dSLoGin         self.id2entity.read().unwrap().get(&id).cloned()
14273779f3dSLoGin     }
14373779f3dSLoGin 
14473779f3dSLoGin     pub fn get_by_name_version(&self, name: &str, version: &str) -> Option<Arc<SchedEntity>> {
14573779f3dSLoGin         for e in self.id2entity.read().unwrap().iter() {
14673779f3dSLoGin             if e.1.task().name_version_env() == DADKTask::name_version_uppercase(name, version) {
14773779f3dSLoGin                 return Some(e.1.clone());
get(&self, id: i32) -> Option<Arc<SchedEntity>>14873779f3dSLoGin             }
14973779f3dSLoGin         }
15073779f3dSLoGin         return None;
15173779f3dSLoGin     }
get_by_name_version(&self, name: &str, version: &str) -> Option<Arc<SchedEntity>>15273779f3dSLoGin 
15373779f3dSLoGin     pub fn entities(&self) -> Vec<Arc<SchedEntity>> {
15473779f3dSLoGin         let mut v = Vec::new();
15573779f3dSLoGin         for e in self.id2entity.read().unwrap().iter() {
15673779f3dSLoGin             v.push(e.1.clone());
15773779f3dSLoGin         }
15873779f3dSLoGin         return v;
15973779f3dSLoGin     }
16073779f3dSLoGin 
entities(&self) -> Vec<Arc<SchedEntity>>16173779f3dSLoGin     pub fn id2entity(&self) -> BTreeMap<i32, Arc<SchedEntity>> {
16273779f3dSLoGin         self.id2entity.read().unwrap().clone()
16373779f3dSLoGin     }
16473779f3dSLoGin 
16573779f3dSLoGin     #[allow(dead_code)]
16673779f3dSLoGin     pub fn len(&self) -> usize {
16773779f3dSLoGin         self.id2entity.read().unwrap().len()
16873779f3dSLoGin     }
id2entity(&self) -> BTreeMap<i32, Arc<SchedEntity>>16973779f3dSLoGin 
17073779f3dSLoGin     #[allow(dead_code)]
17173779f3dSLoGin     pub fn is_empty(&self) -> bool {
17273779f3dSLoGin         self.id2entity.read().unwrap().is_empty()
17373779f3dSLoGin     }
len(&self) -> usize17473779f3dSLoGin 
17573779f3dSLoGin     #[allow(dead_code)]
17673779f3dSLoGin     pub fn clear(&mut self) {
17773779f3dSLoGin         self.id2entity.write().unwrap().clear();
17873779f3dSLoGin     }
is_empty(&self) -> bool17973779f3dSLoGin 
18073779f3dSLoGin     pub fn topo_sort(&self) -> Vec<Arc<SchedEntity>> {
18173779f3dSLoGin         let mut result = Vec::new();
18273779f3dSLoGin         let mut visited = BTreeMap::new();
18373779f3dSLoGin         let btree = self.id2entity.write().unwrap().clone();
18473779f3dSLoGin         for entity in btree.iter() {
18573779f3dSLoGin             if !visited.contains_key(entity.0) {
18673779f3dSLoGin                 let r = self.dfs(entity.1, &mut visited, &mut result);
18773779f3dSLoGin                 if r.is_err() {
18873779f3dSLoGin                     let err = r.unwrap_err();
18973779f3dSLoGin                     error!("{}", err.display());
19073779f3dSLoGin                     println!("Please fix the errors above and try again.");
19173779f3dSLoGin                     std::process::exit(1);
19273779f3dSLoGin                 }
19373779f3dSLoGin             }
19473779f3dSLoGin         }
19573779f3dSLoGin         return result;
19673779f3dSLoGin     }
19773779f3dSLoGin 
19873779f3dSLoGin     fn dfs(
19973779f3dSLoGin         &self,
20073779f3dSLoGin         entity: &Arc<SchedEntity>,
20173779f3dSLoGin         visited: &mut BTreeMap<i32, bool>,
20273779f3dSLoGin         result: &mut Vec<Arc<SchedEntity>>,
20373779f3dSLoGin     ) -> Result<(), DependencyCycleError> {
20473779f3dSLoGin         visited.insert(entity.id(), false);
20573779f3dSLoGin         for dep in entity.task().depends.iter() {
dfs( &self, entity: &Arc<SchedEntity>, visited: &mut BTreeMap<i32, bool>, result: &mut Vec<Arc<SchedEntity>>, ) -> Result<(), DependencyCycleError>20673779f3dSLoGin             if let Some(dep_entity) = self.get_by_name_version(&dep.name, &dep.version) {
20773779f3dSLoGin                 let guard = self.id2entity.write().unwrap();
20873779f3dSLoGin                 let e = guard.get(&entity.id()).unwrap();
20973779f3dSLoGin                 let d = guard.get(&dep_entity.id()).unwrap();
21073779f3dSLoGin                 e.add_indegree();
21173779f3dSLoGin                 d.add_child(e.clone());
21273779f3dSLoGin                 if let Some(&false) = visited.get(&dep_entity.id()) {
21373779f3dSLoGin                     // 输出完整环形依赖
21473779f3dSLoGin                     let mut err = DependencyCycleError::new(dep_entity.clone());
21573779f3dSLoGin 
21673779f3dSLoGin                     err.add(entity.clone(), dep_entity);
21773779f3dSLoGin                     return Err(err);
21873779f3dSLoGin                 }
21973779f3dSLoGin                 if !visited.contains_key(&dep_entity.id()) {
22073779f3dSLoGin                     drop(guard);
22173779f3dSLoGin                     let r = self.dfs(&dep_entity, visited, result);
22273779f3dSLoGin                     if r.is_err() {
22373779f3dSLoGin                         let mut err: DependencyCycleError = r.unwrap_err();
22473779f3dSLoGin                         // 如果错误已经停止传播,则直接返回
22573779f3dSLoGin                         if err.stop_propagation {
22673779f3dSLoGin                             return Err(err);
22773779f3dSLoGin                         }
22873779f3dSLoGin                         // 如果当前实体是错误的起始实体,则停止传播
22973779f3dSLoGin                         if entity == &err.head_entity {
23073779f3dSLoGin                             err.stop_propagation();
23173779f3dSLoGin                         }
23273779f3dSLoGin                         err.add(entity.clone(), dep_entity);
23373779f3dSLoGin                         return Err(err);
23473779f3dSLoGin                     }
23573779f3dSLoGin                 }
23673779f3dSLoGin             } else {
23773779f3dSLoGin                 error!(
23873779f3dSLoGin                     "Dependency not found: {} -> {}",
23973779f3dSLoGin                     entity.task().name_version(),
24073779f3dSLoGin                     dep.name_version()
24173779f3dSLoGin                 );
24273779f3dSLoGin                 std::process::exit(1);
24373779f3dSLoGin             }
24473779f3dSLoGin         }
24573779f3dSLoGin         visited.insert(entity.id(), true);
24673779f3dSLoGin         result.push(entity.clone());
24773779f3dSLoGin         return Ok(());
24873779f3dSLoGin     }
24973779f3dSLoGin }
25073779f3dSLoGin 
25173779f3dSLoGin /// # 任务调度器
25273779f3dSLoGin #[derive(Debug)]
25373779f3dSLoGin pub struct Scheduler {
25473779f3dSLoGin     /// DragonOS sysroot在主机上的路径
25570352fd6SLoGin     sysroot_dir: PathBuf,
25673779f3dSLoGin     /// 要执行的操作
25773779f3dSLoGin     action: Action,
25873779f3dSLoGin     /// 调度实体列表
25973779f3dSLoGin     target: SchedEntities,
26073779f3dSLoGin     /// dadk执行的上下文
26173779f3dSLoGin     context: Arc<DadkUserExecuteContext>,
26273779f3dSLoGin }
26373779f3dSLoGin 
26473779f3dSLoGin pub enum SchedulerError {
26573779f3dSLoGin     TaskError(String),
26673779f3dSLoGin     /// 不是当前正在编译的目标架构
26773779f3dSLoGin     InvalidTargetArch(String),
26873779f3dSLoGin     DependencyNotFound(Arc<SchedEntity>, String),
26973779f3dSLoGin     RunError(String),
27073779f3dSLoGin }
27173779f3dSLoGin 
27273779f3dSLoGin impl Debug for SchedulerError {
27373779f3dSLoGin     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27473779f3dSLoGin         match self {
27573779f3dSLoGin             Self::TaskError(arg0) => {
27673779f3dSLoGin                 write!(f, "TaskError: {}", arg0)
27773779f3dSLoGin             }
27873779f3dSLoGin             SchedulerError::DependencyNotFound(current, msg) => {
27973779f3dSLoGin                 write!(
28073779f3dSLoGin                     f,
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result28173779f3dSLoGin                     "For task {}, dependency not found: {}. Please check file: {}",
28273779f3dSLoGin                     current.task().name_version(),
28373779f3dSLoGin                     msg,
28473779f3dSLoGin                     current.file_path().display()
28573779f3dSLoGin                 )
28673779f3dSLoGin             }
28773779f3dSLoGin             SchedulerError::RunError(msg) => {
28873779f3dSLoGin                 write!(f, "RunError: {}", msg)
28973779f3dSLoGin             }
29073779f3dSLoGin             SchedulerError::InvalidTargetArch(msg) => {
29173779f3dSLoGin                 write!(f, "InvalidTargetArch: {}", msg)
29273779f3dSLoGin             }
29373779f3dSLoGin         }
29473779f3dSLoGin     }
29573779f3dSLoGin }
29673779f3dSLoGin 
29773779f3dSLoGin impl Scheduler {
29873779f3dSLoGin     pub fn new(
29973779f3dSLoGin         context: Arc<DadkUserExecuteContext>,
30073779f3dSLoGin         dragonos_dir: PathBuf,
30173779f3dSLoGin         action: Action,
30273779f3dSLoGin         tasks: Vec<(PathBuf, DADKTask)>,
30373779f3dSLoGin     ) -> Result<Self, SchedulerError> {
30473779f3dSLoGin         let entities = SchedEntities::new();
30573779f3dSLoGin 
new( context: Arc<DadkUserExecuteContext>, dragonos_dir: PathBuf, action: Action, tasks: Vec<(PathBuf, DADKTask)>, ) -> Result<Self, SchedulerError>30673779f3dSLoGin         let mut scheduler = Scheduler {
30770352fd6SLoGin             sysroot_dir: dragonos_dir,
30873779f3dSLoGin             action,
30973779f3dSLoGin             target: entities,
31073779f3dSLoGin             context,
31173779f3dSLoGin         };
31273779f3dSLoGin 
31373779f3dSLoGin         let r = scheduler.add_tasks(tasks);
31473779f3dSLoGin         if r.is_err() {
31573779f3dSLoGin             error!("Error while adding tasks: {:?}", r);
31673779f3dSLoGin             return Err(r.err().unwrap());
31773779f3dSLoGin         }
31873779f3dSLoGin 
31973779f3dSLoGin         return Ok(scheduler);
32073779f3dSLoGin     }
32173779f3dSLoGin 
32273779f3dSLoGin     /// # 添加多个任务
32373779f3dSLoGin     ///
32473779f3dSLoGin     /// 添加任务到调度器中,如果任务已经存在,则返回错误
32573779f3dSLoGin     pub fn add_tasks(&mut self, tasks: Vec<(PathBuf, DADKTask)>) -> Result<(), SchedulerError> {
32673779f3dSLoGin         for task in tasks {
32773779f3dSLoGin             let e = self.add_task(task.0, task.1);
32873779f3dSLoGin             if e.is_err() {
32973779f3dSLoGin                 if let Err(SchedulerError::InvalidTargetArch(_)) = &e {
33073779f3dSLoGin                     continue;
33173779f3dSLoGin                 }
33273779f3dSLoGin                 e?;
add_tasks(&mut self, tasks: Vec<(PathBuf, DADKTask)>) -> Result<(), SchedulerError>33373779f3dSLoGin             }
33473779f3dSLoGin         }
33573779f3dSLoGin 
33673779f3dSLoGin         return Ok(());
33773779f3dSLoGin     }
33873779f3dSLoGin 
33973779f3dSLoGin     /// # 任务是否匹配当前目标架构
34073779f3dSLoGin     pub fn task_arch_matched(&self, task: &DADKTask) -> bool {
34173779f3dSLoGin         task.target_arch.contains(self.context.target_arch())
34273779f3dSLoGin     }
34373779f3dSLoGin 
34473779f3dSLoGin     /// # 添加一个任务
34573779f3dSLoGin     ///
34673779f3dSLoGin     /// 添加任务到调度器中,如果任务已经存在,则返回错误
34773779f3dSLoGin     pub fn add_task(
task_arch_matched(&self, task: &DADKTask) -> bool34873779f3dSLoGin         &mut self,
34973779f3dSLoGin         path: PathBuf,
35073779f3dSLoGin         task: DADKTask,
35173779f3dSLoGin     ) -> Result<Arc<SchedEntity>, SchedulerError> {
35273779f3dSLoGin         if !self.task_arch_matched(&task) {
35373779f3dSLoGin             return Err(SchedulerError::InvalidTargetArch(format!(
35473779f3dSLoGin                 "Task {} is not for target arch: {:?}",
35573779f3dSLoGin                 task.name_version(),
35673779f3dSLoGin                 self.context.target_arch()
35773779f3dSLoGin             )));
35873779f3dSLoGin         }
35973779f3dSLoGin 
36073779f3dSLoGin         let id: i32 = self.generate_task_id();
36173779f3dSLoGin         let indegree: usize = 0;
36273779f3dSLoGin         let children = Vec::new();
36373779f3dSLoGin         let entity = Arc::new(SchedEntity {
36473779f3dSLoGin             inner: Mutex::new(InnerEntity {
36573779f3dSLoGin                 id,
36673779f3dSLoGin                 task,
36773779f3dSLoGin                 file_path: path.clone(),
36873779f3dSLoGin                 indegree,
36973779f3dSLoGin                 children,
37073779f3dSLoGin             }),
37173779f3dSLoGin         });
37273779f3dSLoGin         let name_version = (entity.task().name.clone(), entity.task().version.clone());
37373779f3dSLoGin 
37473779f3dSLoGin         if self
37573779f3dSLoGin             .target
37673779f3dSLoGin             .get_by_name_version(&name_version.0, &name_version.1)
37773779f3dSLoGin             .is_some()
37873779f3dSLoGin         {
37973779f3dSLoGin             return Err(SchedulerError::TaskError(format!(
38073779f3dSLoGin                 "Task with name [{}] and version [{}] already exists. Config file: {}",
38173779f3dSLoGin                 name_version.0,
38273779f3dSLoGin                 name_version.1,
38373779f3dSLoGin                 path.display()
38473779f3dSLoGin             )));
38573779f3dSLoGin         }
38673779f3dSLoGin 
38773779f3dSLoGin         self.target.add(entity.clone());
38873779f3dSLoGin 
38973779f3dSLoGin         info!("Task added: {}", entity.task().name_version());
39073779f3dSLoGin         return Ok(entity);
39173779f3dSLoGin     }
39273779f3dSLoGin 
39373779f3dSLoGin     fn generate_task_id(&self) -> i32 {
39473779f3dSLoGin         static TASK_ID: AtomicI32 = AtomicI32::new(0);
39573779f3dSLoGin         return TASK_ID.fetch_add(1, Ordering::SeqCst);
39673779f3dSLoGin     }
39773779f3dSLoGin 
39873779f3dSLoGin     /// # 执行调度器中的所有任务
39973779f3dSLoGin     pub fn run(&self) -> Result<(), SchedulerError> {
40073779f3dSLoGin         // 准备全局环境变量
40173779f3dSLoGin         crate::executor::prepare_env(&self.target, &self.context)
40273779f3dSLoGin             .map_err(|e| SchedulerError::RunError(format!("{:?}", e)))?;
generate_task_id(&self) -> i3240373779f3dSLoGin 
40473779f3dSLoGin         match self.action {
40573779f3dSLoGin             Action::Build | Action::Install => {
40673779f3dSLoGin                 self.run_with_topo_sort()?;
40773779f3dSLoGin             }
40873779f3dSLoGin             Action::Clean(_) => self.run_without_topo_sort()?,
40973779f3dSLoGin         }
41073779f3dSLoGin 
41173779f3dSLoGin         return Ok(());
41273779f3dSLoGin     }
41373779f3dSLoGin 
41473779f3dSLoGin     /// Action需要按照拓扑序执行
41573779f3dSLoGin     ///
41673779f3dSLoGin     /// Action::Build | Action::Install
41773779f3dSLoGin     fn run_with_topo_sort(&self) -> Result<(), SchedulerError> {
41873779f3dSLoGin         // 检查是否有不存在的依赖
41973779f3dSLoGin         let r = self.check_not_exists_dependency();
42073779f3dSLoGin         if r.is_err() {
42173779f3dSLoGin             error!("Error while checking tasks: {:?}", r);
42273779f3dSLoGin             return r;
42373779f3dSLoGin         }
42473779f3dSLoGin 
42573779f3dSLoGin         // 对调度实体进行拓扑排序
42673779f3dSLoGin         let r: Vec<Arc<SchedEntity>> = self.target.topo_sort();
42773779f3dSLoGin 
42873779f3dSLoGin         let action = self.action.clone();
42970352fd6SLoGin         let dragonos_dir = self.sysroot_dir.clone();
43073779f3dSLoGin         let id2entity = self.target.id2entity();
43173779f3dSLoGin         let count = r.len();
43273779f3dSLoGin 
43373779f3dSLoGin         // 启动守护线程
43473779f3dSLoGin         let handler = std::thread::spawn(move || {
43573779f3dSLoGin             Self::build_install_daemon(action, dragonos_dir, id2entity, count, &r)
43673779f3dSLoGin         });
43773779f3dSLoGin 
43873779f3dSLoGin         handler.join().expect("Could not join deamon");
43973779f3dSLoGin 
44073779f3dSLoGin         return Ok(());
44173779f3dSLoGin     }
44273779f3dSLoGin 
run(&self) -> Result<(), SchedulerError>44373779f3dSLoGin     /// Action不需要按照拓扑序执行
44473779f3dSLoGin     fn run_without_topo_sort(&self) -> Result<(), SchedulerError> {
44573779f3dSLoGin         // 启动守护线程
44673779f3dSLoGin         let action = self.action.clone();
44770352fd6SLoGin         let dragonos_dir = self.sysroot_dir.clone();
44873779f3dSLoGin         let mut r = self.target.entities();
44973779f3dSLoGin         let handler = std::thread::spawn(move || {
45073779f3dSLoGin             Self::clean_daemon(action, dragonos_dir, &mut r);
45173779f3dSLoGin         });
45273779f3dSLoGin 
45373779f3dSLoGin         handler.join().expect("Could not join deamon");
45473779f3dSLoGin         return Ok(());
45573779f3dSLoGin     }
45673779f3dSLoGin 
45773779f3dSLoGin     pub fn execute(action: Action, dragonos_dir: PathBuf, entity: Arc<SchedEntity>) {
45873779f3dSLoGin         let mut executor = Executor::new(entity.clone(), action.clone(), dragonos_dir.clone())
45973779f3dSLoGin             .map_err(|e| {
46073779f3dSLoGin                 error!(
46173779f3dSLoGin                     "Error while creating executor for task {} : {:?}",
run_with_topo_sort(&self) -> Result<(), SchedulerError>46273779f3dSLoGin                     entity.task().name_version(),
46373779f3dSLoGin                     e
46473779f3dSLoGin                 );
46573779f3dSLoGin                 exit(-1);
46673779f3dSLoGin             })
46773779f3dSLoGin             .unwrap();
46873779f3dSLoGin 
46973779f3dSLoGin         executor
47073779f3dSLoGin             .execute()
47173779f3dSLoGin             .map_err(|e| {
47273779f3dSLoGin                 error!(
47373779f3dSLoGin                     "Error while executing task {} : {:?}",
47473779f3dSLoGin                     entity.task().name_version(),
47573779f3dSLoGin                     e
47673779f3dSLoGin                 );
47773779f3dSLoGin                 exit(-1);
47873779f3dSLoGin             })
47973779f3dSLoGin             .unwrap();
48073779f3dSLoGin     }
48173779f3dSLoGin 
48273779f3dSLoGin     /// 构建和安装DADK任务的守护线程
48373779f3dSLoGin     ///
48473779f3dSLoGin     /// ## 参数
48573779f3dSLoGin     ///
48673779f3dSLoGin     /// - `action` : 要执行的操作
48773779f3dSLoGin     /// - `dragonos_dir` : DragonOS sysroot在主机上的路径
48873779f3dSLoGin     /// - `id2entity` : DADK任务id与实体映射表
run_without_topo_sort(&self) -> Result<(), SchedulerError>48973779f3dSLoGin     /// - `count` : 当前剩余任务数
49073779f3dSLoGin     /// - `r` : 总任务实体表
49173779f3dSLoGin     ///
49273779f3dSLoGin     /// ## 返回值
49373779f3dSLoGin     ///
49473779f3dSLoGin     /// 无
49573779f3dSLoGin     pub fn build_install_daemon(
49673779f3dSLoGin         action: Action,
49773779f3dSLoGin         dragonos_dir: PathBuf,
49873779f3dSLoGin         id2entity: BTreeMap<i32, Arc<SchedEntity>>,
49973779f3dSLoGin         mut count: usize,
50073779f3dSLoGin         r: &Vec<Arc<SchedEntity>>,
50173779f3dSLoGin     ) {
50273779f3dSLoGin         let mut guard = TASK_DEQUE.lock().unwrap();
50373779f3dSLoGin         // 初始化0入度的任务实体
50473779f3dSLoGin         let mut zero_entity: Vec<Arc<SchedEntity>> = Vec::new();
50573779f3dSLoGin         for e in r.iter() {
50673779f3dSLoGin             if e.indegree() == 0 {
50773779f3dSLoGin                 zero_entity.push(e.clone());
50873779f3dSLoGin             }
50973779f3dSLoGin         }
51073779f3dSLoGin 
51173779f3dSLoGin         while count > 0 {
51273779f3dSLoGin             // 将入度为0的任务实体加入任务队列中,直至没有入度为0的任务实体 或 任务队列满了
51373779f3dSLoGin             while !zero_entity.is_empty()
51473779f3dSLoGin                 && guard.build_install_task(
51573779f3dSLoGin                     action.clone(),
51673779f3dSLoGin                     dragonos_dir.clone(),
51773779f3dSLoGin                     zero_entity.last().unwrap().clone(),
51873779f3dSLoGin                 )
51973779f3dSLoGin             {
52073779f3dSLoGin                 zero_entity.pop();
52173779f3dSLoGin             }
52273779f3dSLoGin 
52373779f3dSLoGin             let queue = guard.queue_mut();
52473779f3dSLoGin             // 如果任务线程已完成,将其从任务队列中删除,并把它的子节点入度减1,如果有0入度子节点,则加入zero_entity,后续可以加入任务队列中
52573779f3dSLoGin             queue.retain(|x| {
52673779f3dSLoGin                 if x.is_finished() {
52773779f3dSLoGin                     count -= 1;
52873779f3dSLoGin                     let tid = x.thread().id();
52973779f3dSLoGin                     let eid = *TID_EID.lock().unwrap().get(&tid).unwrap();
53073779f3dSLoGin                     let entity = id2entity.get(&eid).unwrap();
53173779f3dSLoGin                     let zero = entity.sub_children_indegree();
53273779f3dSLoGin                     for e in zero.iter() {
53373779f3dSLoGin                         zero_entity.push(e.clone());
53473779f3dSLoGin                     }
53573779f3dSLoGin                     return false;
53673779f3dSLoGin                 }
53773779f3dSLoGin                 return true;
53873779f3dSLoGin             })
53973779f3dSLoGin         }
54073779f3dSLoGin     }
54173779f3dSLoGin 
54273779f3dSLoGin     /// 清理DADK任务的守护线程
54373779f3dSLoGin     ///
54473779f3dSLoGin     /// ## 参数
54573779f3dSLoGin     ///
54673779f3dSLoGin     /// - `action` : 要执行的操作
54773779f3dSLoGin     /// - `dragonos_dir` : DragonOS sysroot在主机上的路径
54873779f3dSLoGin     /// - `r` : 总任务实体表
54973779f3dSLoGin     ///
55073779f3dSLoGin     /// ## 返回值
55173779f3dSLoGin     ///
55273779f3dSLoGin     /// 无
55373779f3dSLoGin     pub fn clean_daemon(action: Action, dragonos_dir: PathBuf, r: &mut Vec<Arc<SchedEntity>>) {
55473779f3dSLoGin         let mut guard = TASK_DEQUE.lock().unwrap();
55573779f3dSLoGin         while !guard.queue().is_empty() && !r.is_empty() {
55673779f3dSLoGin             guard.clean_task(action, dragonos_dir.clone(), r.pop().unwrap().clone());
55773779f3dSLoGin         }
55873779f3dSLoGin     }
55973779f3dSLoGin 
56073779f3dSLoGin     /// # 检查是否有不存在的依赖
56173779f3dSLoGin     ///
56273779f3dSLoGin     /// 如果某个任务的dependency中的任务不存在,则返回错误
56373779f3dSLoGin     fn check_not_exists_dependency(&self) -> Result<(), SchedulerError> {
56473779f3dSLoGin         for entity in self.target.entities().iter() {
56573779f3dSLoGin             for dependency in entity.task().depends.iter() {
56673779f3dSLoGin                 let name_version = (dependency.name.clone(), dependency.version.clone());
56773779f3dSLoGin                 if !self
56873779f3dSLoGin                     .target
56973779f3dSLoGin                     .get_by_name_version(&name_version.0, &name_version.1)
57073779f3dSLoGin                     .is_some()
57173779f3dSLoGin                 {
57273779f3dSLoGin                     return Err(SchedulerError::DependencyNotFound(
57373779f3dSLoGin                         entity.clone(),
57473779f3dSLoGin                         format!("name:{}, version:{}", name_version.0, name_version.1,),
57573779f3dSLoGin                     ));
57673779f3dSLoGin                 }
57773779f3dSLoGin             }
57873779f3dSLoGin         }
57973779f3dSLoGin 
58073779f3dSLoGin         return Ok(());
58173779f3dSLoGin     }
58273779f3dSLoGin }
58373779f3dSLoGin 
58473779f3dSLoGin /// # 环形依赖错误路径
58573779f3dSLoGin ///
58673779f3dSLoGin /// 本结构体用于在回溯过程中记录环形依赖的路径。
58773779f3dSLoGin ///
58873779f3dSLoGin /// 例如,假设有如下依赖关系:
58973779f3dSLoGin ///
59073779f3dSLoGin /// ```text
59173779f3dSLoGin /// A -> B -> C -> D -> A
59273779f3dSLoGin /// ```
59373779f3dSLoGin ///
59473779f3dSLoGin /// 则在DFS回溯过程中,会依次记录如下路径:
59573779f3dSLoGin ///
59673779f3dSLoGin /// ```text
59773779f3dSLoGin /// D -> A
clean_daemon(action: Action, dragonos_dir: PathBuf, r: &mut Vec<Arc<SchedEntity>>)59873779f3dSLoGin /// C -> D
59973779f3dSLoGin /// B -> C
60073779f3dSLoGin /// A -> B
60173779f3dSLoGin pub struct DependencyCycleError {
60273779f3dSLoGin     /// # 起始实体
60373779f3dSLoGin     /// 本错误的起始实体,即环形依赖的起点
60473779f3dSLoGin     head_entity: Arc<SchedEntity>,
60573779f3dSLoGin     /// 是否停止传播
60673779f3dSLoGin     stop_propagation: bool,
60773779f3dSLoGin     /// 依赖关系
60873779f3dSLoGin     dependencies: Vec<(Arc<SchedEntity>, Arc<SchedEntity>)>,
60973779f3dSLoGin }
61073779f3dSLoGin 
61173779f3dSLoGin impl DependencyCycleError {
61273779f3dSLoGin     pub fn new(head_entity: Arc<SchedEntity>) -> Self {
61373779f3dSLoGin         Self {
61473779f3dSLoGin             head_entity,
61573779f3dSLoGin             stop_propagation: false,
61673779f3dSLoGin             dependencies: Vec::new(),
61773779f3dSLoGin         }
61873779f3dSLoGin     }
61973779f3dSLoGin 
62073779f3dSLoGin     pub fn add(&mut self, current: Arc<SchedEntity>, dependency: Arc<SchedEntity>) {
62173779f3dSLoGin         self.dependencies.push((current, dependency));
62273779f3dSLoGin     }
62373779f3dSLoGin 
62473779f3dSLoGin     pub fn stop_propagation(&mut self) {
62573779f3dSLoGin         self.stop_propagation = true;
62673779f3dSLoGin     }
62773779f3dSLoGin 
62873779f3dSLoGin     #[allow(dead_code)]
62973779f3dSLoGin     pub fn dependencies(&self) -> &Vec<(Arc<SchedEntity>, Arc<SchedEntity>)> {
63073779f3dSLoGin         &self.dependencies
63173779f3dSLoGin     }
63273779f3dSLoGin 
63373779f3dSLoGin     pub fn display(&self) -> String {
63473779f3dSLoGin         let mut tmp = self.dependencies.clone();
63573779f3dSLoGin         tmp.reverse();
63673779f3dSLoGin 
63773779f3dSLoGin         let mut ret = format!("Dependency cycle detected: \nStart ->\n");
63873779f3dSLoGin         for (current, dep) in tmp.iter() {
63973779f3dSLoGin             ret.push_str(&format!(
64073779f3dSLoGin                 "->\t{} ({})\t--depends-->\t{} ({})\n",
64173779f3dSLoGin                 current.task().name_version(),
64273779f3dSLoGin                 current.file_path().display(),
64373779f3dSLoGin                 dep.task().name_version(),
64473779f3dSLoGin                 dep.file_path().display()
64573779f3dSLoGin             ));
64673779f3dSLoGin         }
64773779f3dSLoGin         ret.push_str("-> End");
64873779f3dSLoGin         return ret;
64973779f3dSLoGin     }
65073779f3dSLoGin }
651