xref: /DADK/dadk-user/src/scheduler/mod.rs (revision eaa67f3cf8881c221a744937c6318444b068a801)
1 use std::{
2     collections::{BTreeMap, HashMap},
3     fmt::Debug,
4     path::PathBuf,
5     process::exit,
6     sync::{
7         atomic::{AtomicI32, Ordering},
8         Arc, Mutex, RwLock,
9     },
10     thread::ThreadId,
11 };
12 
13 use log::{error, info};
14 
15 use crate::{
16     context::{Action, DadkUserExecuteContext},
17     executor::{target::Target, Executor},
18     parser::task::DADKTask,
19 };
20 
21 use self::task_deque::TASK_DEQUE;
22 
23 pub mod task_deque;
24 #[cfg(test)]
25 mod tests;
26 
27 lazy_static! {
28     // 线程id与任务实体id映射表
29     pub static ref TID_EID: Mutex<HashMap<ThreadId,i32>> = Mutex::new(HashMap::new());
30 }
31 
32 /// # 调度实体内部结构
33 #[derive(Debug, Clone)]
34 pub struct InnerEntity {
35     /// 任务ID
36     id: i32,
37     file_path: PathBuf,
38     /// 任务
39     task: DADKTask,
40     /// 入度
41     indegree: usize,
42     /// 子节点
43     children: Vec<Arc<SchedEntity>>,
44     /// target管理
45     target: Option<Target>,
46 }
47 
48 /// # 调度实体
49 #[derive(Debug)]
50 pub struct SchedEntity {
51     inner: Mutex<InnerEntity>,
52 }
53 
54 impl PartialEq for SchedEntity {
55     fn eq(&self, other: &Self) -> bool {
56         self.inner.lock().unwrap().id == other.inner.lock().unwrap().id
57     }
58 }
59 
60 impl SchedEntity {
61     #[allow(dead_code)]
62     pub fn id(&self) -> i32 {
63         self.inner.lock().unwrap().id
64     }
65 
66     #[allow(dead_code)]
67     pub fn file_path(&self) -> PathBuf {
68         self.inner.lock().unwrap().file_path.clone()
69     }
70 
71     #[allow(dead_code)]
72     pub fn task(&self) -> DADKTask {
73         self.inner.lock().unwrap().task.clone()
74     }
75 
76     /// 入度加1
77     pub fn add_indegree(&self) {
78         self.inner.lock().unwrap().indegree += 1;
79     }
80 
81     /// 入度减1
82     pub fn sub_indegree(&self) -> usize {
83         self.inner.lock().unwrap().indegree -= 1;
84         return self.inner.lock().unwrap().indegree;
85     }
86 
87     /// 增加子节点
88     pub fn add_child(&self, entity: Arc<SchedEntity>) {
89         self.inner.lock().unwrap().children.push(entity);
90     }
91 
92     /// 获取入度
93     pub fn indegree(&self) -> usize {
94         self.inner.lock().unwrap().indegree
95     }
96 
97     /// 获取target
98     pub fn target(&self) -> Option<Target> {
99         self.inner.lock().unwrap().target.clone()
100     }
101 
102     /// 当前任务完成后,所有子节点入度减1
103     ///
104     /// ## 参数
105     ///
106     /// 无
107     ///
108     /// ## 返回值
109     ///
110     /// 所有入度为0的子节点集合
111     pub fn sub_children_indegree(&self) -> Vec<Arc<SchedEntity>> {
112         let mut zero_child = Vec::new();
113         let children = &self.inner.lock().unwrap().children;
114         for child in children.iter() {
115             if child.sub_indegree() == 0 {
116                 zero_child.push(child.clone());
117             }
118         }
119         return zero_child;
120     }
121 }
122 
123 /// # 调度实体列表
124 ///
125 /// 用于存储所有的调度实体
126 #[derive(Debug)]
127 pub struct SchedEntities {
128     /// 任务ID到调度实体的映射
129     id2entity: RwLock<BTreeMap<i32, Arc<SchedEntity>>>,
130 }
131 
132 impl SchedEntities {
133     pub fn new() -> Self {
134         Self {
135             id2entity: RwLock::new(BTreeMap::new()),
136         }
137     }
138 
139     pub fn add(&mut self, entity: Arc<SchedEntity>) {
140         self.id2entity
141             .write()
142             .unwrap()
143             .insert(entity.id(), entity.clone());
144     }
145 
146     #[allow(dead_code)]
147     pub fn get(&self, id: i32) -> Option<Arc<SchedEntity>> {
148         self.id2entity.read().unwrap().get(&id).cloned()
149     }
150 
151     pub fn get_by_name_version(&self, name: &str, version: &str) -> Option<Arc<SchedEntity>> {
152         for e in self.id2entity.read().unwrap().iter() {
153             if e.1.task().name_version_env() == DADKTask::name_version_uppercase(name, version) {
154                 return Some(e.1.clone());
155             }
156         }
157         return None;
158     }
159 
160     pub fn entities(&self) -> Vec<Arc<SchedEntity>> {
161         let mut v = Vec::new();
162         for e in self.id2entity.read().unwrap().iter() {
163             v.push(e.1.clone());
164         }
165         return v;
166     }
167 
168     pub fn id2entity(&self) -> BTreeMap<i32, Arc<SchedEntity>> {
169         self.id2entity.read().unwrap().clone()
170     }
171 
172     #[allow(dead_code)]
173     pub fn len(&self) -> usize {
174         self.id2entity.read().unwrap().len()
175     }
176 
177     #[allow(dead_code)]
178     pub fn is_empty(&self) -> bool {
179         self.id2entity.read().unwrap().is_empty()
180     }
181 
182     #[allow(dead_code)]
183     pub fn clear(&mut self) {
184         self.id2entity.write().unwrap().clear();
185     }
186 
187     pub fn topo_sort(&self) -> Vec<Arc<SchedEntity>> {
188         let mut result = Vec::new();
189         let mut visited = BTreeMap::new();
190         let btree = self.id2entity.write().unwrap().clone();
191         for entity in btree.iter() {
192             if !visited.contains_key(entity.0) {
193                 let r = self.dfs(entity.1, &mut visited, &mut result);
194                 if r.is_err() {
195                     let err = r.unwrap_err();
196                     error!("{}", err.display());
197                     println!("Please fix the errors above and try again.");
198                     std::process::exit(1);
199                 }
200             }
201         }
202         return result;
203     }
204 
205     fn dfs(
206         &self,
207         entity: &Arc<SchedEntity>,
208         visited: &mut BTreeMap<i32, bool>,
209         result: &mut Vec<Arc<SchedEntity>>,
210     ) -> Result<(), DependencyCycleError> {
211         visited.insert(entity.id(), false);
212         for dep in entity.task().depends.iter() {
213             if let Some(dep_entity) = self.get_by_name_version(&dep.name, &dep.version) {
214                 let guard = self.id2entity.write().unwrap();
215                 let e = guard.get(&entity.id()).unwrap();
216                 let d = guard.get(&dep_entity.id()).unwrap();
217                 e.add_indegree();
218                 d.add_child(e.clone());
219                 if let Some(&false) = visited.get(&dep_entity.id()) {
220                     // 输出完整环形依赖
221                     let mut err = DependencyCycleError::new(dep_entity.clone());
222 
223                     err.add(entity.clone(), dep_entity);
224                     return Err(err);
225                 }
226                 if !visited.contains_key(&dep_entity.id()) {
227                     drop(guard);
228                     let r = self.dfs(&dep_entity, visited, result);
229                     if r.is_err() {
230                         let mut err: DependencyCycleError = r.unwrap_err();
231                         // 如果错误已经停止传播,则直接返回
232                         if err.stop_propagation {
233                             return Err(err);
234                         }
235                         // 如果当前实体是错误的起始实体,则停止传播
236                         if entity == &err.head_entity {
237                             err.stop_propagation();
238                         }
239                         err.add(entity.clone(), dep_entity);
240                         return Err(err);
241                     }
242                 }
243             } else {
244                 error!(
245                     "Dependency not found: {} -> {}",
246                     entity.task().name_version(),
247                     dep.name_version()
248                 );
249                 std::process::exit(1);
250             }
251         }
252         visited.insert(entity.id(), true);
253         result.push(entity.clone());
254         return Ok(());
255     }
256 }
257 
258 /// # 任务调度器
259 #[derive(Debug)]
260 pub struct Scheduler {
261     /// DragonOS sysroot在主机上的路径
262     sysroot_dir: PathBuf,
263     /// 要执行的操作
264     action: Action,
265     /// 调度实体列表
266     target: SchedEntities,
267     /// dadk执行的上下文
268     context: Arc<DadkUserExecuteContext>,
269 }
270 
271 pub enum SchedulerError {
272     TaskError(String),
273     /// 不是当前正在编译的目标架构
274     InvalidTargetArch(String),
275     DependencyNotFound(Arc<SchedEntity>, String),
276     RunError(String),
277 }
278 
279 impl Debug for SchedulerError {
280     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
281         match self {
282             Self::TaskError(arg0) => {
283                 write!(f, "TaskError: {}", arg0)
284             }
285             SchedulerError::DependencyNotFound(current, msg) => {
286                 write!(
287                     f,
288                     "For task {}, dependency not found: {}. Please check file: {}",
289                     current.task().name_version(),
290                     msg,
291                     current.file_path().display()
292                 )
293             }
294             SchedulerError::RunError(msg) => {
295                 write!(f, "RunError: {}", msg)
296             }
297             SchedulerError::InvalidTargetArch(msg) => {
298                 write!(f, "InvalidTargetArch: {}", msg)
299             }
300         }
301     }
302 }
303 
304 impl Scheduler {
305     pub fn new(
306         context: Arc<DadkUserExecuteContext>,
307         dragonos_dir: PathBuf,
308         action: Action,
309         tasks: Vec<(PathBuf, DADKTask)>,
310     ) -> Result<Self, SchedulerError> {
311         let entities = SchedEntities::new();
312 
313         let mut scheduler = Scheduler {
314             sysroot_dir: dragonos_dir,
315             action,
316             target: entities,
317             context,
318         };
319 
320         let r = scheduler.add_tasks(tasks);
321         if r.is_err() {
322             error!("Error while adding tasks: {:?}", r);
323             return Err(r.err().unwrap());
324         }
325 
326         return Ok(scheduler);
327     }
328 
329     /// # 添加多个任务
330     ///
331     /// 添加任务到调度器中,如果任务已经存在,则返回错误
332     pub fn add_tasks(&mut self, tasks: Vec<(PathBuf, DADKTask)>) -> Result<(), SchedulerError> {
333         for task in tasks {
334             let e = self.add_task(task.0, task.1);
335             if e.is_err() {
336                 if let Err(SchedulerError::InvalidTargetArch(_)) = &e {
337                     continue;
338                 }
339                 e?;
340             }
341         }
342 
343         return Ok(());
344     }
345 
346     /// # 任务是否匹配当前目标架构
347     pub fn task_arch_matched(&self, task: &DADKTask) -> bool {
348         task.target_arch.contains(self.context.target_arch())
349     }
350 
351     /// # 添加一个任务
352     ///
353     /// 添加任务到调度器中,如果任务已经存在,则返回错误
354     pub fn add_task(
355         &mut self,
356         path: PathBuf,
357         task: DADKTask,
358     ) -> Result<Arc<SchedEntity>, SchedulerError> {
359         if !self.task_arch_matched(&task) {
360             return Err(SchedulerError::InvalidTargetArch(format!(
361                 "Task {} is not for target arch: {:?}",
362                 task.name_version(),
363                 self.context.target_arch()
364             )));
365         }
366 
367         let id: i32 = self.generate_task_id();
368         let indegree: usize = 0;
369         let children = Vec::new();
370         let target = self.generate_task_target(&path, &task.rust_target)?;
371         let entity = Arc::new(SchedEntity {
372             inner: Mutex::new(InnerEntity {
373                 id,
374                 task,
375                 file_path: path.clone(),
376                 indegree,
377                 children,
378                 target,
379             }),
380         });
381         let name_version = (entity.task().name.clone(), entity.task().version.clone());
382 
383         if self
384             .target
385             .get_by_name_version(&name_version.0, &name_version.1)
386             .is_some()
387         {
388             return Err(SchedulerError::TaskError(format!(
389                 "Task with name [{}] and version [{}] already exists. Config file: {}",
390                 name_version.0,
391                 name_version.1,
392                 path.display()
393             )));
394         }
395 
396         self.target.add(entity.clone());
397 
398         info!("Task added: {}", entity.task().name_version());
399         return Ok(entity);
400     }
401 
402     fn generate_task_id(&self) -> i32 {
403         static TASK_ID: AtomicI32 = AtomicI32::new(0);
404         return TASK_ID.fetch_add(1, Ordering::SeqCst);
405     }
406 
407     fn generate_task_target(
408         &self,
409         path: &PathBuf,
410         rust_target: &Option<String>,
411     ) -> Result<Option<Target>, SchedulerError> {
412         if let Some(rust_target) = rust_target {
413             // 如果rust_target字段不为none,说明需要target管理
414             // 获取dadk任务路径,用于生成临时dadk文件名
415             let file_str = path.as_path().to_str().unwrap();
416             let tmp_dadk_path = Target::tmp_dadk(file_str);
417             let tmp_dadk_str = tmp_dadk_path.as_path().to_str().unwrap();
418 
419             if Target::is_user_target(rust_target) {
420                 // 如果target文件是用户自己的
421                 if let Ok(target_path) = Target::user_target_path(rust_target) {
422                     let target_path_str = target_path.as_path().to_str().unwrap();
423                     let index = target_path_str.rfind('/').unwrap();
424                     let target_name = target_path_str[index + 1..].to_string();
425                     let tmp_target = PathBuf::from(format!("{}{}", tmp_dadk_str, target_name));
426                     return Ok(Some(Target::new(tmp_target)));
427                 } else {
428                     return Err(SchedulerError::TaskError(
429                         "The path of target file is invalid.".to_string(),
430                     ));
431                 }
432             } else {
433                 // 如果target文件是内置的
434                 let tmp_target = PathBuf::from(format!("{}{}.json", tmp_dadk_str, rust_target));
435                 return Ok(Some(Target::new(tmp_target)));
436             }
437         }
438         return Ok(None);
439     }
440 
441     /// # 执行调度器中的所有任务
442     pub fn run(&self) -> Result<(), SchedulerError> {
443         // 准备全局环境变量
444         crate::executor::prepare_env(&self.target, &self.context)
445             .map_err(|e| SchedulerError::RunError(format!("{:?}", e)))?;
446 
447         match self.action {
448             Action::Build | Action::Install => {
449                 self.run_with_topo_sort()?;
450             }
451             Action::Clean(_) => self.run_without_topo_sort()?,
452         }
453 
454         return Ok(());
455     }
456 
457     /// Action需要按照拓扑序执行
458     ///
459     /// Action::Build | Action::Install
460     fn run_with_topo_sort(&self) -> Result<(), SchedulerError> {
461         // 检查是否有不存在的依赖
462         let r = self.check_not_exists_dependency();
463         if r.is_err() {
464             error!("Error while checking tasks: {:?}", r);
465             return r;
466         }
467 
468         // 对调度实体进行拓扑排序
469         let r: Vec<Arc<SchedEntity>> = self.target.topo_sort();
470 
471         let action = self.action.clone();
472         let dragonos_dir = self.sysroot_dir.clone();
473         let id2entity = self.target.id2entity();
474         let count = r.len();
475 
476         // 启动守护线程
477         let handler = std::thread::spawn(move || {
478             Self::build_install_daemon(action, dragonos_dir, id2entity, count, &r)
479         });
480 
481         handler.join().expect("Could not join deamon");
482 
483         return Ok(());
484     }
485 
486     /// Action不需要按照拓扑序执行
487     fn run_without_topo_sort(&self) -> Result<(), SchedulerError> {
488         // 启动守护线程
489         let action = self.action.clone();
490         let dragonos_dir = self.sysroot_dir.clone();
491         let mut r = self.target.entities();
492         let handler = std::thread::spawn(move || {
493             Self::clean_daemon(action, dragonos_dir, &mut r);
494         });
495 
496         handler.join().expect("Could not join deamon");
497         return Ok(());
498     }
499 
500     pub fn execute(action: Action, dragonos_dir: PathBuf, entity: Arc<SchedEntity>) {
501         let mut executor = Executor::new(entity.clone(), action.clone(), dragonos_dir.clone())
502             .map_err(|e| {
503                 error!(
504                     "Error while creating executor for task {} : {:?}",
505                     entity.task().name_version(),
506                     e
507                 );
508                 exit(-1);
509             })
510             .unwrap();
511 
512         executor
513             .execute()
514             .map_err(|e| {
515                 error!(
516                     "Error while executing task {} : {:?}",
517                     entity.task().name_version(),
518                     e
519                 );
520                 exit(-1);
521             })
522             .unwrap();
523     }
524 
525     /// 构建和安装DADK任务的守护线程
526     ///
527     /// ## 参数
528     ///
529     /// - `action` : 要执行的操作
530     /// - `dragonos_dir` : DragonOS sysroot在主机上的路径
531     /// - `id2entity` : DADK任务id与实体映射表
532     /// - `count` : 当前剩余任务数
533     /// - `r` : 总任务实体表
534     ///
535     /// ## 返回值
536     ///
537     /// 无
538     pub fn build_install_daemon(
539         action: Action,
540         dragonos_dir: PathBuf,
541         id2entity: BTreeMap<i32, Arc<SchedEntity>>,
542         mut count: usize,
543         r: &Vec<Arc<SchedEntity>>,
544     ) {
545         let mut guard = TASK_DEQUE.lock().unwrap();
546         // 初始化0入度的任务实体
547         let mut zero_entity: Vec<Arc<SchedEntity>> = Vec::new();
548         for e in r.iter() {
549             if e.indegree() == 0 {
550                 zero_entity.push(e.clone());
551             }
552         }
553 
554         while count > 0 {
555             // 将入度为0的任务实体加入任务队列中,直至没有入度为0的任务实体 或 任务队列满了
556             while !zero_entity.is_empty()
557                 && guard.build_install_task(
558                     action.clone(),
559                     dragonos_dir.clone(),
560                     zero_entity.last().unwrap().clone(),
561                 )
562             {
563                 zero_entity.pop();
564             }
565 
566             let queue = guard.queue_mut();
567             // 如果任务线程已完成,将其从任务队列中删除,并把它的子节点入度减1,如果有0入度子节点,则加入zero_entity,后续可以加入任务队列中
568             queue.retain(|x| {
569                 if x.is_finished() {
570                     count -= 1;
571                     let tid = x.thread().id();
572                     let eid = *TID_EID.lock().unwrap().get(&tid).unwrap();
573                     let entity = id2entity.get(&eid).unwrap();
574                     let zero = entity.sub_children_indegree();
575                     for e in zero.iter() {
576                         zero_entity.push(e.clone());
577                     }
578                     return false;
579                 }
580                 return true;
581             })
582         }
583     }
584 
585     /// 清理DADK任务的守护线程
586     ///
587     /// ## 参数
588     ///
589     /// - `action` : 要执行的操作
590     /// - `dragonos_dir` : DragonOS sysroot在主机上的路径
591     /// - `r` : 总任务实体表
592     ///
593     /// ## 返回值
594     ///
595     /// 无
596     pub fn clean_daemon(action: Action, dragonos_dir: PathBuf, r: &mut Vec<Arc<SchedEntity>>) {
597         let mut guard = TASK_DEQUE.lock().unwrap();
598         while !guard.queue().is_empty() && !r.is_empty() {
599             guard.clean_task(action, dragonos_dir.clone(), r.pop().unwrap().clone());
600         }
601     }
602 
603     /// # 检查是否有不存在的依赖
604     ///
605     /// 如果某个任务的dependency中的任务不存在,则返回错误
606     fn check_not_exists_dependency(&self) -> Result<(), SchedulerError> {
607         for entity in self.target.entities().iter() {
608             for dependency in entity.task().depends.iter() {
609                 let name_version = (dependency.name.clone(), dependency.version.clone());
610                 if !self
611                     .target
612                     .get_by_name_version(&name_version.0, &name_version.1)
613                     .is_some()
614                 {
615                     return Err(SchedulerError::DependencyNotFound(
616                         entity.clone(),
617                         format!("name:{}, version:{}", name_version.0, name_version.1,),
618                     ));
619                 }
620             }
621         }
622 
623         return Ok(());
624     }
625 }
626 
627 /// # 环形依赖错误路径
628 ///
629 /// 本结构体用于在回溯过程中记录环形依赖的路径。
630 ///
631 /// 例如,假设有如下依赖关系:
632 ///
633 /// ```text
634 /// A -> B -> C -> D -> A
635 /// ```
636 ///
637 /// 则在DFS回溯过程中,会依次记录如下路径:
638 ///
639 /// ```text
640 /// D -> A
641 /// C -> D
642 /// B -> C
643 /// A -> B
644 pub struct DependencyCycleError {
645     /// # 起始实体
646     /// 本错误的起始实体,即环形依赖的起点
647     head_entity: Arc<SchedEntity>,
648     /// 是否停止传播
649     stop_propagation: bool,
650     /// 依赖关系
651     dependencies: Vec<(Arc<SchedEntity>, Arc<SchedEntity>)>,
652 }
653 
654 impl DependencyCycleError {
655     pub fn new(head_entity: Arc<SchedEntity>) -> Self {
656         Self {
657             head_entity,
658             stop_propagation: false,
659             dependencies: Vec::new(),
660         }
661     }
662 
663     pub fn add(&mut self, current: Arc<SchedEntity>, dependency: Arc<SchedEntity>) {
664         self.dependencies.push((current, dependency));
665     }
666 
667     pub fn stop_propagation(&mut self) {
668         self.stop_propagation = true;
669     }
670 
671     #[allow(dead_code)]
672     pub fn dependencies(&self) -> &Vec<(Arc<SchedEntity>, Arc<SchedEntity>)> {
673         &self.dependencies
674     }
675 
676     pub fn display(&self) -> String {
677         let mut tmp = self.dependencies.clone();
678         tmp.reverse();
679 
680         let mut ret = format!("Dependency cycle detected: \nStart ->\n");
681         for (current, dep) in tmp.iter() {
682             ret.push_str(&format!(
683                 "->\t{} ({})\t--depends-->\t{} ({})\n",
684                 current.task().name_version(),
685                 current.file_path().display(),
686                 dep.task().name_version(),
687                 dep.file_path().display()
688             ));
689         }
690         ret.push_str("-> End");
691         return ret;
692     }
693 }
694