xref: /DADK/dadk-user/src/scheduler/mod.rs (revision cfb7b78ff5dff2a09cba93336ba222be89cbd3e1)
1 use std::{
2     collections::{BTreeMap, HashMap},
3     fmt::Debug,
4     path::PathBuf,
5     process::exit,
6     sync::{
7         atomic::{AtomicI32, Ordering},
8         Arc, Mutex, RwLock,
9     },
10     thread::ThreadId,
11 };
12 
13 use log::{error, info};
14 
15 use crate::{
16     context::{Action, DadkUserExecuteContext},
17     executor::Executor,
18     parser::task::DADKTask,
19 };
20 
21 use self::task_deque::TASK_DEQUE;
22 
23 pub mod task_deque;
24 #[cfg(test)]
25 mod tests;
26 
27 lazy_static! {
28     // 线程id与任务实体id映射表
29     pub static ref TID_EID: Mutex<HashMap<ThreadId,i32>> = Mutex::new(HashMap::new());
30 }
31 
32 /// # 调度实体内部结构
33 #[derive(Debug, Clone)]
34 pub struct InnerEntity {
35     /// 任务ID
36     id: i32,
37     file_path: PathBuf,
38     /// 任务
39     task: DADKTask,
40     /// 入度
41     indegree: usize,
42     /// 子节点
43     children: Vec<Arc<SchedEntity>>,
44 }
45 
46 /// # 调度实体
47 #[derive(Debug)]
48 pub struct SchedEntity {
49     inner: Mutex<InnerEntity>,
50 }
51 
52 impl PartialEq for SchedEntity {
53     fn eq(&self, other: &Self) -> bool {
54         self.inner.lock().unwrap().id == other.inner.lock().unwrap().id
55     }
56 }
57 
58 impl SchedEntity {
59     #[allow(dead_code)]
60     pub fn id(&self) -> i32 {
61         self.inner.lock().unwrap().id
62     }
63 
64     #[allow(dead_code)]
65     pub fn file_path(&self) -> PathBuf {
66         self.inner.lock().unwrap().file_path.clone()
67     }
68 
69     #[allow(dead_code)]
70     pub fn task(&self) -> DADKTask {
71         self.inner.lock().unwrap().task.clone()
72     }
73 
74     /// 入度加1
75     pub fn add_indegree(&self) {
76         self.inner.lock().unwrap().indegree += 1;
77     }
78 
79     /// 入度减1
80     pub fn sub_indegree(&self) -> usize {
81         self.inner.lock().unwrap().indegree -= 1;
82         return self.inner.lock().unwrap().indegree;
83     }
84 
85     /// 增加子节点
86     pub fn add_child(&self, entity: Arc<SchedEntity>) {
87         self.inner.lock().unwrap().children.push(entity);
88     }
89 
90     /// 获取入度
91     pub fn indegree(&self) -> usize {
92         self.inner.lock().unwrap().indegree
93     }
94 
95     /// 当前任务完成后,所有子节点入度减1
96     ///
97     /// ## 参数
98     ///
99     /// 无
100     ///
101     /// ## 返回值
102     ///
103     /// 所有入度为0的子节点集合
104     pub fn sub_children_indegree(&self) -> Vec<Arc<SchedEntity>> {
105         let mut zero_child = Vec::new();
106         let children = &self.inner.lock().unwrap().children;
107         for child in children.iter() {
108             if child.sub_indegree() == 0 {
109                 zero_child.push(child.clone());
110             }
111         }
112         return zero_child;
113     }
114 }
115 
116 /// # 调度实体列表
117 ///
118 /// 用于存储所有的调度实体
119 #[derive(Debug)]
120 pub struct SchedEntities {
121     /// 任务ID到调度实体的映射
122     id2entity: RwLock<BTreeMap<i32, Arc<SchedEntity>>>,
123 }
124 
125 impl SchedEntities {
126     pub fn new() -> Self {
127         Self {
128             id2entity: RwLock::new(BTreeMap::new()),
129         }
130     }
131 
132     pub fn add(&mut self, entity: Arc<SchedEntity>) {
133         self.id2entity
134             .write()
135             .unwrap()
136             .insert(entity.id(), entity.clone());
137     }
138 
139     #[allow(dead_code)]
140     pub fn get(&self, id: i32) -> Option<Arc<SchedEntity>> {
141         self.id2entity.read().unwrap().get(&id).cloned()
142     }
143 
144     pub fn get_by_name_version(&self, name: &str, version: &str) -> Option<Arc<SchedEntity>> {
145         for e in self.id2entity.read().unwrap().iter() {
146             if e.1.task().name_version_env() == DADKTask::name_version_uppercase(name, version) {
147                 return Some(e.1.clone());
148             }
149         }
150         return None;
151     }
152 
153     pub fn entities(&self) -> Vec<Arc<SchedEntity>> {
154         let mut v = Vec::new();
155         for e in self.id2entity.read().unwrap().iter() {
156             v.push(e.1.clone());
157         }
158         return v;
159     }
160 
161     pub fn id2entity(&self) -> BTreeMap<i32, Arc<SchedEntity>> {
162         self.id2entity.read().unwrap().clone()
163     }
164 
165     #[allow(dead_code)]
166     pub fn len(&self) -> usize {
167         self.id2entity.read().unwrap().len()
168     }
169 
170     #[allow(dead_code)]
171     pub fn is_empty(&self) -> bool {
172         self.id2entity.read().unwrap().is_empty()
173     }
174 
175     #[allow(dead_code)]
176     pub fn clear(&mut self) {
177         self.id2entity.write().unwrap().clear();
178     }
179 
180     pub fn topo_sort(&self) -> Vec<Arc<SchedEntity>> {
181         let mut result = Vec::new();
182         let mut visited = BTreeMap::new();
183         let btree = self.id2entity.write().unwrap().clone();
184         for entity in btree.iter() {
185             if !visited.contains_key(entity.0) {
186                 let r = self.dfs(entity.1, &mut visited, &mut result);
187                 if r.is_err() {
188                     let err = r.unwrap_err();
189                     error!("{}", err.display());
190                     println!("Please fix the errors above and try again.");
191                     std::process::exit(1);
192                 }
193             }
194         }
195         return result;
196     }
197 
198     fn dfs(
199         &self,
200         entity: &Arc<SchedEntity>,
201         visited: &mut BTreeMap<i32, bool>,
202         result: &mut Vec<Arc<SchedEntity>>,
203     ) -> Result<(), DependencyCycleError> {
204         visited.insert(entity.id(), false);
205         for dep in entity.task().depends.iter() {
206             if let Some(dep_entity) = self.get_by_name_version(&dep.name, &dep.version) {
207                 let guard = self.id2entity.write().unwrap();
208                 let e = guard.get(&entity.id()).unwrap();
209                 let d = guard.get(&dep_entity.id()).unwrap();
210                 e.add_indegree();
211                 d.add_child(e.clone());
212                 if let Some(&false) = visited.get(&dep_entity.id()) {
213                     // 输出完整环形依赖
214                     let mut err = DependencyCycleError::new(dep_entity.clone());
215 
216                     err.add(entity.clone(), dep_entity);
217                     return Err(err);
218                 }
219                 if !visited.contains_key(&dep_entity.id()) {
220                     drop(guard);
221                     let r = self.dfs(&dep_entity, visited, result);
222                     if r.is_err() {
223                         let mut err: DependencyCycleError = r.unwrap_err();
224                         // 如果错误已经停止传播,则直接返回
225                         if err.stop_propagation {
226                             return Err(err);
227                         }
228                         // 如果当前实体是错误的起始实体,则停止传播
229                         if entity == &err.head_entity {
230                             err.stop_propagation();
231                         }
232                         err.add(entity.clone(), dep_entity);
233                         return Err(err);
234                     }
235                 }
236             } else {
237                 error!(
238                     "Dependency not found: {} -> {}",
239                     entity.task().name_version(),
240                     dep.name_version()
241                 );
242                 std::process::exit(1);
243             }
244         }
245         visited.insert(entity.id(), true);
246         result.push(entity.clone());
247         return Ok(());
248     }
249 }
250 
251 /// # 任务调度器
252 #[derive(Debug)]
253 pub struct Scheduler {
254     /// DragonOS sysroot在主机上的路径
255     sysroot_dir: PathBuf,
256     /// 要执行的操作
257     action: Action,
258     /// 调度实体列表
259     target: SchedEntities,
260     /// dadk执行的上下文
261     context: Arc<DadkUserExecuteContext>,
262 }
263 
264 pub enum SchedulerError {
265     TaskError(String),
266     /// 不是当前正在编译的目标架构
267     InvalidTargetArch(String),
268     DependencyNotFound(Arc<SchedEntity>, String),
269     RunError(String),
270 }
271 
272 impl Debug for SchedulerError {
273     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
274         match self {
275             Self::TaskError(arg0) => {
276                 write!(f, "TaskError: {}", arg0)
277             }
278             SchedulerError::DependencyNotFound(current, msg) => {
279                 write!(
280                     f,
281                     "For task {}, dependency not found: {}. Please check file: {}",
282                     current.task().name_version(),
283                     msg,
284                     current.file_path().display()
285                 )
286             }
287             SchedulerError::RunError(msg) => {
288                 write!(f, "RunError: {}", msg)
289             }
290             SchedulerError::InvalidTargetArch(msg) => {
291                 write!(f, "InvalidTargetArch: {}", msg)
292             }
293         }
294     }
295 }
296 
297 impl Scheduler {
298     pub fn new(
299         context: Arc<DadkUserExecuteContext>,
300         dragonos_dir: PathBuf,
301         action: Action,
302         tasks: Vec<(PathBuf, DADKTask)>,
303     ) -> Result<Self, SchedulerError> {
304         let entities = SchedEntities::new();
305 
306         let mut scheduler = Scheduler {
307             sysroot_dir: dragonos_dir,
308             action,
309             target: entities,
310             context,
311         };
312 
313         let r = scheduler.add_tasks(tasks);
314         if r.is_err() {
315             error!("Error while adding tasks: {:?}", r);
316             return Err(r.err().unwrap());
317         }
318 
319         return Ok(scheduler);
320     }
321 
322     /// # 添加多个任务
323     ///
324     /// 添加任务到调度器中,如果任务已经存在,则返回错误
325     pub fn add_tasks(&mut self, tasks: Vec<(PathBuf, DADKTask)>) -> Result<(), SchedulerError> {
326         for task in tasks {
327             let e = self.add_task(task.0, task.1);
328             if e.is_err() {
329                 if let Err(SchedulerError::InvalidTargetArch(_)) = &e {
330                     continue;
331                 }
332                 e?;
333             }
334         }
335 
336         return Ok(());
337     }
338 
339     /// # 任务是否匹配当前目标架构
340     pub fn task_arch_matched(&self, task: &DADKTask) -> bool {
341         task.target_arch.contains(self.context.target_arch())
342     }
343 
344     /// # 添加一个任务
345     ///
346     /// 添加任务到调度器中,如果任务已经存在,则返回错误
347     pub fn add_task(
348         &mut self,
349         path: PathBuf,
350         task: DADKTask,
351     ) -> Result<Arc<SchedEntity>, SchedulerError> {
352         if !self.task_arch_matched(&task) {
353             return Err(SchedulerError::InvalidTargetArch(format!(
354                 "Task {} is not for target arch: {:?}",
355                 task.name_version(),
356                 self.context.target_arch()
357             )));
358         }
359 
360         let id: i32 = self.generate_task_id();
361         let indegree: usize = 0;
362         let children = Vec::new();
363         let entity = Arc::new(SchedEntity {
364             inner: Mutex::new(InnerEntity {
365                 id,
366                 task,
367                 file_path: path.clone(),
368                 indegree,
369                 children,
370             }),
371         });
372         let name_version = (entity.task().name.clone(), entity.task().version.clone());
373 
374         if self
375             .target
376             .get_by_name_version(&name_version.0, &name_version.1)
377             .is_some()
378         {
379             return Err(SchedulerError::TaskError(format!(
380                 "Task with name [{}] and version [{}] already exists. Config file: {}",
381                 name_version.0,
382                 name_version.1,
383                 path.display()
384             )));
385         }
386 
387         self.target.add(entity.clone());
388 
389         info!("Task added: {}", entity.task().name_version());
390         return Ok(entity);
391     }
392 
393     fn generate_task_id(&self) -> i32 {
394         static TASK_ID: AtomicI32 = AtomicI32::new(0);
395         return TASK_ID.fetch_add(1, Ordering::SeqCst);
396     }
397 
398     /// # 执行调度器中的所有任务
399     pub fn run(&self) -> Result<(), SchedulerError> {
400         // 准备全局环境变量
401         crate::executor::prepare_env(&self.target, &self.context)
402             .map_err(|e| SchedulerError::RunError(format!("{:?}", e)))?;
403 
404         match self.action {
405             Action::Build | Action::Install => {
406                 self.run_with_topo_sort()?;
407             }
408             Action::Clean(_) => self.run_without_topo_sort()?,
409         }
410 
411         return Ok(());
412     }
413 
414     /// Action需要按照拓扑序执行
415     ///
416     /// Action::Build | Action::Install
417     fn run_with_topo_sort(&self) -> Result<(), SchedulerError> {
418         // 检查是否有不存在的依赖
419         let r = self.check_not_exists_dependency();
420         if r.is_err() {
421             error!("Error while checking tasks: {:?}", r);
422             return r;
423         }
424 
425         // 对调度实体进行拓扑排序
426         let r: Vec<Arc<SchedEntity>> = self.target.topo_sort();
427 
428         let action = self.action.clone();
429         let dragonos_dir = self.sysroot_dir.clone();
430         let id2entity = self.target.id2entity();
431         let count = r.len();
432 
433         // 启动守护线程
434         let handler = std::thread::spawn(move || {
435             Self::build_install_daemon(action, dragonos_dir, id2entity, count, &r)
436         });
437 
438         handler.join().expect("Could not join deamon");
439 
440         return Ok(());
441     }
442 
443     /// Action不需要按照拓扑序执行
444     fn run_without_topo_sort(&self) -> Result<(), SchedulerError> {
445         // 启动守护线程
446         let action = self.action.clone();
447         let dragonos_dir = self.sysroot_dir.clone();
448         let mut r = self.target.entities();
449         let handler = std::thread::spawn(move || {
450             Self::clean_daemon(action, dragonos_dir, &mut r);
451         });
452 
453         handler.join().expect("Could not join deamon");
454         return Ok(());
455     }
456 
457     pub fn execute(action: Action, dragonos_dir: PathBuf, entity: Arc<SchedEntity>) {
458         let mut executor = Executor::new(entity.clone(), action.clone(), dragonos_dir.clone())
459             .map_err(|e| {
460                 error!(
461                     "Error while creating executor for task {} : {:?}",
462                     entity.task().name_version(),
463                     e
464                 );
465                 exit(-1);
466             })
467             .unwrap();
468 
469         executor
470             .execute()
471             .map_err(|e| {
472                 error!(
473                     "Error while executing task {} : {:?}",
474                     entity.task().name_version(),
475                     e
476                 );
477                 exit(-1);
478             })
479             .unwrap();
480     }
481 
482     /// 构建和安装DADK任务的守护线程
483     ///
484     /// ## 参数
485     ///
486     /// - `action` : 要执行的操作
487     /// - `dragonos_dir` : DragonOS sysroot在主机上的路径
488     /// - `id2entity` : DADK任务id与实体映射表
489     /// - `count` : 当前剩余任务数
490     /// - `r` : 总任务实体表
491     ///
492     /// ## 返回值
493     ///
494     /// 无
495     pub fn build_install_daemon(
496         action: Action,
497         dragonos_dir: PathBuf,
498         id2entity: BTreeMap<i32, Arc<SchedEntity>>,
499         mut count: usize,
500         r: &Vec<Arc<SchedEntity>>,
501     ) {
502         let mut guard = TASK_DEQUE.lock().unwrap();
503         // 初始化0入度的任务实体
504         let mut zero_entity: Vec<Arc<SchedEntity>> = Vec::new();
505         for e in r.iter() {
506             if e.indegree() == 0 {
507                 zero_entity.push(e.clone());
508             }
509         }
510 
511         while count > 0 {
512             // 将入度为0的任务实体加入任务队列中,直至没有入度为0的任务实体 或 任务队列满了
513             while !zero_entity.is_empty()
514                 && guard.build_install_task(
515                     action.clone(),
516                     dragonos_dir.clone(),
517                     zero_entity.last().unwrap().clone(),
518                 )
519             {
520                 zero_entity.pop();
521             }
522 
523             let queue = guard.queue_mut();
524             // 如果任务线程已完成,将其从任务队列中删除,并把它的子节点入度减1,如果有0入度子节点,则加入zero_entity,后续可以加入任务队列中
525             queue.retain(|x| {
526                 if x.is_finished() {
527                     count -= 1;
528                     let tid = x.thread().id();
529                     let eid = *TID_EID.lock().unwrap().get(&tid).unwrap();
530                     let entity = id2entity.get(&eid).unwrap();
531                     let zero = entity.sub_children_indegree();
532                     for e in zero.iter() {
533                         zero_entity.push(e.clone());
534                     }
535                     return false;
536                 }
537                 return true;
538             })
539         }
540     }
541 
542     /// 清理DADK任务的守护线程
543     ///
544     /// ## 参数
545     ///
546     /// - `action` : 要执行的操作
547     /// - `dragonos_dir` : DragonOS sysroot在主机上的路径
548     /// - `r` : 总任务实体表
549     ///
550     /// ## 返回值
551     ///
552     /// 无
553     pub fn clean_daemon(action: Action, dragonos_dir: PathBuf, r: &mut Vec<Arc<SchedEntity>>) {
554         let mut guard = TASK_DEQUE.lock().unwrap();
555         while !guard.queue().is_empty() && !r.is_empty() {
556             guard.clean_task(action, dragonos_dir.clone(), r.pop().unwrap().clone());
557         }
558     }
559 
560     /// # 检查是否有不存在的依赖
561     ///
562     /// 如果某个任务的dependency中的任务不存在,则返回错误
563     fn check_not_exists_dependency(&self) -> Result<(), SchedulerError> {
564         for entity in self.target.entities().iter() {
565             for dependency in entity.task().depends.iter() {
566                 let name_version = (dependency.name.clone(), dependency.version.clone());
567                 if !self
568                     .target
569                     .get_by_name_version(&name_version.0, &name_version.1)
570                     .is_some()
571                 {
572                     return Err(SchedulerError::DependencyNotFound(
573                         entity.clone(),
574                         format!("name:{}, version:{}", name_version.0, name_version.1,),
575                     ));
576                 }
577             }
578         }
579 
580         return Ok(());
581     }
582 }
583 
584 /// # 环形依赖错误路径
585 ///
586 /// 本结构体用于在回溯过程中记录环形依赖的路径。
587 ///
588 /// 例如,假设有如下依赖关系:
589 ///
590 /// ```text
591 /// A -> B -> C -> D -> A
592 /// ```
593 ///
594 /// 则在DFS回溯过程中,会依次记录如下路径:
595 ///
596 /// ```text
597 /// D -> A
598 /// C -> D
599 /// B -> C
600 /// A -> B
601 pub struct DependencyCycleError {
602     /// # 起始实体
603     /// 本错误的起始实体,即环形依赖的起点
604     head_entity: Arc<SchedEntity>,
605     /// 是否停止传播
606     stop_propagation: bool,
607     /// 依赖关系
608     dependencies: Vec<(Arc<SchedEntity>, Arc<SchedEntity>)>,
609 }
610 
611 impl DependencyCycleError {
612     pub fn new(head_entity: Arc<SchedEntity>) -> Self {
613         Self {
614             head_entity,
615             stop_propagation: false,
616             dependencies: Vec::new(),
617         }
618     }
619 
620     pub fn add(&mut self, current: Arc<SchedEntity>, dependency: Arc<SchedEntity>) {
621         self.dependencies.push((current, dependency));
622     }
623 
624     pub fn stop_propagation(&mut self) {
625         self.stop_propagation = true;
626     }
627 
628     #[allow(dead_code)]
629     pub fn dependencies(&self) -> &Vec<(Arc<SchedEntity>, Arc<SchedEntity>)> {
630         &self.dependencies
631     }
632 
633     pub fn display(&self) -> String {
634         let mut tmp = self.dependencies.clone();
635         tmp.reverse();
636 
637         let mut ret = format!("Dependency cycle detected: \nStart ->\n");
638         for (current, dep) in tmp.iter() {
639             ret.push_str(&format!(
640                 "->\t{} ({})\t--depends-->\t{} ({})\n",
641                 current.task().name_version(),
642                 current.file_path().display(),
643                 dep.task().name_version(),
644                 dep.file_path().display()
645             ));
646         }
647         ret.push_str("-> End");
648         return ret;
649     }
650 }
651