1 use std::{ 2 collections::{BTreeMap, HashMap}, 3 fmt::Debug, 4 path::PathBuf, 5 process::exit, 6 sync::{ 7 atomic::{AtomicI32, Ordering}, 8 Arc, Mutex, RwLock, 9 }, 10 thread::ThreadId, 11 }; 12 13 use log::{error, info}; 14 15 use crate::{ 16 context::{Action, DadkUserExecuteContext}, 17 executor::{target::Target, Executor}, 18 parser::task::DADKTask, 19 }; 20 21 use self::task_deque::TASK_DEQUE; 22 23 pub mod task_deque; 24 #[cfg(test)] 25 mod tests; 26 27 lazy_static! { 28 // 线程id与任务实体id映射表 29 pub static ref TID_EID: Mutex<HashMap<ThreadId,i32>> = Mutex::new(HashMap::new()); 30 } 31 32 /// # 调度实体内部结构 33 #[derive(Debug, Clone)] 34 pub struct InnerEntity { 35 /// 任务ID 36 id: i32, 37 file_path: PathBuf, 38 /// 任务 39 task: DADKTask, 40 /// 入度 41 indegree: usize, 42 /// 子节点 43 children: Vec<Arc<SchedEntity>>, 44 /// target管理 45 target: Option<Target>, 46 } 47 48 /// # 调度实体 49 #[derive(Debug)] 50 pub struct SchedEntity { 51 inner: Mutex<InnerEntity>, 52 } 53 54 impl PartialEq for SchedEntity { 55 fn eq(&self, other: &Self) -> bool { 56 self.inner.lock().unwrap().id == other.inner.lock().unwrap().id 57 } 58 } 59 60 impl SchedEntity { 61 #[allow(dead_code)] 62 pub fn id(&self) -> i32 { 63 self.inner.lock().unwrap().id 64 } 65 66 #[allow(dead_code)] 67 pub fn file_path(&self) -> PathBuf { 68 self.inner.lock().unwrap().file_path.clone() 69 } 70 71 #[allow(dead_code)] 72 pub fn task(&self) -> DADKTask { 73 self.inner.lock().unwrap().task.clone() 74 } 75 76 /// 入度加1 77 pub fn add_indegree(&self) { 78 self.inner.lock().unwrap().indegree += 1; 79 } 80 81 /// 入度减1 82 pub fn sub_indegree(&self) -> usize { 83 self.inner.lock().unwrap().indegree -= 1; 84 return self.inner.lock().unwrap().indegree; 85 } 86 87 /// 增加子节点 88 pub fn add_child(&self, entity: Arc<SchedEntity>) { 89 self.inner.lock().unwrap().children.push(entity); 90 } 91 92 /// 获取入度 93 pub fn indegree(&self) -> usize { 94 self.inner.lock().unwrap().indegree 95 } 96 97 /// 获取target 98 pub fn target(&self) -> Option<Target> { 99 self.inner.lock().unwrap().target.clone() 100 } 101 102 /// 当前任务完成后,所有子节点入度减1 103 /// 104 /// ## 参数 105 /// 106 /// 无 107 /// 108 /// ## 返回值 109 /// 110 /// 所有入度为0的子节点集合 111 pub fn sub_children_indegree(&self) -> Vec<Arc<SchedEntity>> { 112 let mut zero_child = Vec::new(); 113 let children = &self.inner.lock().unwrap().children; 114 for child in children.iter() { 115 if child.sub_indegree() == 0 { 116 zero_child.push(child.clone()); 117 } 118 } 119 return zero_child; 120 } 121 } 122 123 /// # 调度实体列表 124 /// 125 /// 用于存储所有的调度实体 126 #[derive(Debug)] 127 pub struct SchedEntities { 128 /// 任务ID到调度实体的映射 129 id2entity: RwLock<BTreeMap<i32, Arc<SchedEntity>>>, 130 } 131 132 impl SchedEntities { 133 pub fn new() -> Self { 134 Self { 135 id2entity: RwLock::new(BTreeMap::new()), 136 } 137 } 138 139 pub fn add(&mut self, entity: Arc<SchedEntity>) { 140 self.id2entity 141 .write() 142 .unwrap() 143 .insert(entity.id(), entity.clone()); 144 } 145 146 #[allow(dead_code)] 147 pub fn get(&self, id: i32) -> Option<Arc<SchedEntity>> { 148 self.id2entity.read().unwrap().get(&id).cloned() 149 } 150 151 pub fn get_by_name_version(&self, name: &str, version: &str) -> Option<Arc<SchedEntity>> { 152 for e in self.id2entity.read().unwrap().iter() { 153 if e.1.task().name_version_env() == DADKTask::name_version_uppercase(name, version) { 154 return Some(e.1.clone()); 155 } 156 } 157 return None; 158 } 159 160 pub fn entities(&self) -> Vec<Arc<SchedEntity>> { 161 let mut v = Vec::new(); 162 for e in self.id2entity.read().unwrap().iter() { 163 v.push(e.1.clone()); 164 } 165 return v; 166 } 167 168 pub fn id2entity(&self) -> BTreeMap<i32, Arc<SchedEntity>> { 169 self.id2entity.read().unwrap().clone() 170 } 171 172 #[allow(dead_code)] 173 pub fn len(&self) -> usize { 174 self.id2entity.read().unwrap().len() 175 } 176 177 #[allow(dead_code)] 178 pub fn is_empty(&self) -> bool { 179 self.id2entity.read().unwrap().is_empty() 180 } 181 182 #[allow(dead_code)] 183 pub fn clear(&mut self) { 184 self.id2entity.write().unwrap().clear(); 185 } 186 187 pub fn topo_sort(&self) -> Vec<Arc<SchedEntity>> { 188 let mut result = Vec::new(); 189 let mut visited = BTreeMap::new(); 190 let btree = self.id2entity.write().unwrap().clone(); 191 for entity in btree.iter() { 192 if !visited.contains_key(entity.0) { 193 let r = self.dfs(entity.1, &mut visited, &mut result); 194 if r.is_err() { 195 let err = r.unwrap_err(); 196 error!("{}", err.display()); 197 println!("Please fix the errors above and try again."); 198 std::process::exit(1); 199 } 200 } 201 } 202 return result; 203 } 204 205 fn dfs( 206 &self, 207 entity: &Arc<SchedEntity>, 208 visited: &mut BTreeMap<i32, bool>, 209 result: &mut Vec<Arc<SchedEntity>>, 210 ) -> Result<(), DependencyCycleError> { 211 visited.insert(entity.id(), false); 212 for dep in entity.task().depends.iter() { 213 if let Some(dep_entity) = self.get_by_name_version(&dep.name, &dep.version) { 214 let guard = self.id2entity.write().unwrap(); 215 let e = guard.get(&entity.id()).unwrap(); 216 let d = guard.get(&dep_entity.id()).unwrap(); 217 e.add_indegree(); 218 d.add_child(e.clone()); 219 if let Some(&false) = visited.get(&dep_entity.id()) { 220 // 输出完整环形依赖 221 let mut err = DependencyCycleError::new(dep_entity.clone()); 222 223 err.add(entity.clone(), dep_entity); 224 return Err(err); 225 } 226 if !visited.contains_key(&dep_entity.id()) { 227 drop(guard); 228 let r = self.dfs(&dep_entity, visited, result); 229 if r.is_err() { 230 let mut err: DependencyCycleError = r.unwrap_err(); 231 // 如果错误已经停止传播,则直接返回 232 if err.stop_propagation { 233 return Err(err); 234 } 235 // 如果当前实体是错误的起始实体,则停止传播 236 if entity == &err.head_entity { 237 err.stop_propagation(); 238 } 239 err.add(entity.clone(), dep_entity); 240 return Err(err); 241 } 242 } 243 } else { 244 error!( 245 "Dependency not found: {} -> {}", 246 entity.task().name_version(), 247 dep.name_version() 248 ); 249 std::process::exit(1); 250 } 251 } 252 visited.insert(entity.id(), true); 253 result.push(entity.clone()); 254 return Ok(()); 255 } 256 } 257 258 /// # 任务调度器 259 #[derive(Debug)] 260 pub struct Scheduler { 261 /// DragonOS sysroot在主机上的路径 262 sysroot_dir: PathBuf, 263 /// 要执行的操作 264 action: Action, 265 /// 调度实体列表 266 target: SchedEntities, 267 /// dadk执行的上下文 268 context: Arc<DadkUserExecuteContext>, 269 } 270 271 pub enum SchedulerError { 272 TaskError(String), 273 /// 不是当前正在编译的目标架构 274 InvalidTargetArch(String), 275 DependencyNotFound(Arc<SchedEntity>, String), 276 RunError(String), 277 } 278 279 impl Debug for SchedulerError { 280 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 281 match self { 282 Self::TaskError(arg0) => { 283 write!(f, "TaskError: {}", arg0) 284 } 285 SchedulerError::DependencyNotFound(current, msg) => { 286 write!( 287 f, 288 "For task {}, dependency not found: {}. Please check file: {}", 289 current.task().name_version(), 290 msg, 291 current.file_path().display() 292 ) 293 } 294 SchedulerError::RunError(msg) => { 295 write!(f, "RunError: {}", msg) 296 } 297 SchedulerError::InvalidTargetArch(msg) => { 298 write!(f, "InvalidTargetArch: {}", msg) 299 } 300 } 301 } 302 } 303 304 impl Scheduler { 305 pub fn new( 306 context: Arc<DadkUserExecuteContext>, 307 dragonos_dir: PathBuf, 308 action: Action, 309 tasks: Vec<(PathBuf, DADKTask)>, 310 ) -> Result<Self, SchedulerError> { 311 let entities = SchedEntities::new(); 312 313 let mut scheduler = Scheduler { 314 sysroot_dir: dragonos_dir, 315 action, 316 target: entities, 317 context, 318 }; 319 320 let r = scheduler.add_tasks(tasks); 321 if r.is_err() { 322 error!("Error while adding tasks: {:?}", r); 323 return Err(r.err().unwrap()); 324 } 325 326 return Ok(scheduler); 327 } 328 329 /// # 添加多个任务 330 /// 331 /// 添加任务到调度器中,如果任务已经存在,则返回错误 332 pub fn add_tasks(&mut self, tasks: Vec<(PathBuf, DADKTask)>) -> Result<(), SchedulerError> { 333 for task in tasks { 334 let e = self.add_task(task.0, task.1); 335 if e.is_err() { 336 if let Err(SchedulerError::InvalidTargetArch(_)) = &e { 337 continue; 338 } 339 e?; 340 } 341 } 342 343 return Ok(()); 344 } 345 346 /// # 任务是否匹配当前目标架构 347 pub fn task_arch_matched(&self, task: &DADKTask) -> bool { 348 task.target_arch.contains(self.context.target_arch()) 349 } 350 351 /// # 添加一个任务 352 /// 353 /// 添加任务到调度器中,如果任务已经存在,则返回错误 354 pub fn add_task( 355 &mut self, 356 path: PathBuf, 357 task: DADKTask, 358 ) -> Result<Arc<SchedEntity>, SchedulerError> { 359 if !self.task_arch_matched(&task) { 360 return Err(SchedulerError::InvalidTargetArch(format!( 361 "Task {} is not for target arch: {:?}", 362 task.name_version(), 363 self.context.target_arch() 364 ))); 365 } 366 367 let id: i32 = self.generate_task_id(); 368 let indegree: usize = 0; 369 let children = Vec::new(); 370 let target = self.generate_task_target(&path, &task.rust_target)?; 371 let entity = Arc::new(SchedEntity { 372 inner: Mutex::new(InnerEntity { 373 id, 374 task, 375 file_path: path.clone(), 376 indegree, 377 children, 378 target, 379 }), 380 }); 381 let name_version = (entity.task().name.clone(), entity.task().version.clone()); 382 383 if self 384 .target 385 .get_by_name_version(&name_version.0, &name_version.1) 386 .is_some() 387 { 388 return Err(SchedulerError::TaskError(format!( 389 "Task with name [{}] and version [{}] already exists. Config file: {}", 390 name_version.0, 391 name_version.1, 392 path.display() 393 ))); 394 } 395 396 self.target.add(entity.clone()); 397 398 info!("Task added: {}", entity.task().name_version()); 399 return Ok(entity); 400 } 401 402 fn generate_task_id(&self) -> i32 { 403 static TASK_ID: AtomicI32 = AtomicI32::new(0); 404 return TASK_ID.fetch_add(1, Ordering::SeqCst); 405 } 406 407 fn generate_task_target( 408 &self, 409 path: &PathBuf, 410 rust_target: &Option<String>, 411 ) -> Result<Option<Target>, SchedulerError> { 412 if let Some(rust_target) = rust_target { 413 // 如果rust_target字段不为none,说明需要target管理 414 // 获取dadk任务路径,用于生成临时dadk文件名 415 let file_str = path.as_path().to_str().unwrap(); 416 let tmp_dadk_path = Target::tmp_dadk(file_str); 417 let tmp_dadk_str = tmp_dadk_path.as_path().to_str().unwrap(); 418 419 if Target::is_user_target(rust_target) { 420 // 如果target文件是用户自己的 421 if let Ok(target_path) = Target::user_target_path(rust_target) { 422 let target_path_str = target_path.as_path().to_str().unwrap(); 423 let index = target_path_str.rfind('/').unwrap(); 424 let target_name = target_path_str[index + 1..].to_string(); 425 let tmp_target = PathBuf::from(format!("{}{}", tmp_dadk_str, target_name)); 426 return Ok(Some(Target::new(tmp_target))); 427 } else { 428 return Err(SchedulerError::TaskError( 429 "The path of target file is invalid.".to_string(), 430 )); 431 } 432 } else { 433 // 如果target文件是内置的 434 let tmp_target = PathBuf::from(format!("{}{}.json", tmp_dadk_str, rust_target)); 435 return Ok(Some(Target::new(tmp_target))); 436 } 437 } 438 return Ok(None); 439 } 440 441 /// # 执行调度器中的所有任务 442 pub fn run(&self) -> Result<(), SchedulerError> { 443 // 准备全局环境变量 444 crate::executor::prepare_env(&self.target, &self.context) 445 .map_err(|e| SchedulerError::RunError(format!("{:?}", e)))?; 446 447 match self.action { 448 Action::Build | Action::Install => { 449 self.run_with_topo_sort()?; 450 } 451 Action::Clean(_) => self.run_without_topo_sort()?, 452 } 453 454 return Ok(()); 455 } 456 457 /// Action需要按照拓扑序执行 458 /// 459 /// Action::Build | Action::Install 460 fn run_with_topo_sort(&self) -> Result<(), SchedulerError> { 461 // 检查是否有不存在的依赖 462 let r = self.check_not_exists_dependency(); 463 if r.is_err() { 464 error!("Error while checking tasks: {:?}", r); 465 return r; 466 } 467 468 // 对调度实体进行拓扑排序 469 let r: Vec<Arc<SchedEntity>> = self.target.topo_sort(); 470 471 let action = self.action.clone(); 472 let dragonos_dir = self.sysroot_dir.clone(); 473 let id2entity = self.target.id2entity(); 474 let count = r.len(); 475 476 // 启动守护线程 477 let handler = std::thread::spawn(move || { 478 Self::build_install_daemon(action, dragonos_dir, id2entity, count, &r) 479 }); 480 481 handler.join().expect("Could not join deamon"); 482 483 return Ok(()); 484 } 485 486 /// Action不需要按照拓扑序执行 487 fn run_without_topo_sort(&self) -> Result<(), SchedulerError> { 488 // 启动守护线程 489 let action = self.action.clone(); 490 let dragonos_dir = self.sysroot_dir.clone(); 491 let mut r = self.target.entities(); 492 let handler = std::thread::spawn(move || { 493 Self::clean_daemon(action, dragonos_dir, &mut r); 494 }); 495 496 handler.join().expect("Could not join deamon"); 497 return Ok(()); 498 } 499 500 pub fn execute(action: Action, dragonos_dir: PathBuf, entity: Arc<SchedEntity>) { 501 let mut executor = Executor::new(entity.clone(), action.clone(), dragonos_dir.clone()) 502 .map_err(|e| { 503 error!( 504 "Error while creating executor for task {} : {:?}", 505 entity.task().name_version(), 506 e 507 ); 508 exit(-1); 509 }) 510 .unwrap(); 511 512 executor 513 .execute() 514 .map_err(|e| { 515 error!( 516 "Error while executing task {} : {:?}", 517 entity.task().name_version(), 518 e 519 ); 520 exit(-1); 521 }) 522 .unwrap(); 523 } 524 525 /// 构建和安装DADK任务的守护线程 526 /// 527 /// ## 参数 528 /// 529 /// - `action` : 要执行的操作 530 /// - `dragonos_dir` : DragonOS sysroot在主机上的路径 531 /// - `id2entity` : DADK任务id与实体映射表 532 /// - `count` : 当前剩余任务数 533 /// - `r` : 总任务实体表 534 /// 535 /// ## 返回值 536 /// 537 /// 无 538 pub fn build_install_daemon( 539 action: Action, 540 dragonos_dir: PathBuf, 541 id2entity: BTreeMap<i32, Arc<SchedEntity>>, 542 mut count: usize, 543 r: &Vec<Arc<SchedEntity>>, 544 ) { 545 let mut guard = TASK_DEQUE.lock().unwrap(); 546 // 初始化0入度的任务实体 547 let mut zero_entity: Vec<Arc<SchedEntity>> = Vec::new(); 548 for e in r.iter() { 549 if e.indegree() == 0 { 550 zero_entity.push(e.clone()); 551 } 552 } 553 554 while count > 0 { 555 // 将入度为0的任务实体加入任务队列中,直至没有入度为0的任务实体 或 任务队列满了 556 while !zero_entity.is_empty() 557 && guard.build_install_task( 558 action.clone(), 559 dragonos_dir.clone(), 560 zero_entity.last().unwrap().clone(), 561 ) 562 { 563 zero_entity.pop(); 564 } 565 566 let queue = guard.queue_mut(); 567 // 如果任务线程已完成,将其从任务队列中删除,并把它的子节点入度减1,如果有0入度子节点,则加入zero_entity,后续可以加入任务队列中 568 queue.retain(|x| { 569 if x.is_finished() { 570 count -= 1; 571 let tid = x.thread().id(); 572 let eid = *TID_EID.lock().unwrap().get(&tid).unwrap(); 573 let entity = id2entity.get(&eid).unwrap(); 574 let zero = entity.sub_children_indegree(); 575 for e in zero.iter() { 576 zero_entity.push(e.clone()); 577 } 578 return false; 579 } 580 return true; 581 }) 582 } 583 } 584 585 /// 清理DADK任务的守护线程 586 /// 587 /// ## 参数 588 /// 589 /// - `action` : 要执行的操作 590 /// - `dragonos_dir` : DragonOS sysroot在主机上的路径 591 /// - `r` : 总任务实体表 592 /// 593 /// ## 返回值 594 /// 595 /// 无 596 pub fn clean_daemon(action: Action, dragonos_dir: PathBuf, r: &mut Vec<Arc<SchedEntity>>) { 597 let mut guard = TASK_DEQUE.lock().unwrap(); 598 while !guard.queue().is_empty() && !r.is_empty() { 599 guard.clean_task(action, dragonos_dir.clone(), r.pop().unwrap().clone()); 600 } 601 } 602 603 /// # 检查是否有不存在的依赖 604 /// 605 /// 如果某个任务的dependency中的任务不存在,则返回错误 606 fn check_not_exists_dependency(&self) -> Result<(), SchedulerError> { 607 for entity in self.target.entities().iter() { 608 for dependency in entity.task().depends.iter() { 609 let name_version = (dependency.name.clone(), dependency.version.clone()); 610 if !self 611 .target 612 .get_by_name_version(&name_version.0, &name_version.1) 613 .is_some() 614 { 615 return Err(SchedulerError::DependencyNotFound( 616 entity.clone(), 617 format!("name:{}, version:{}", name_version.0, name_version.1,), 618 )); 619 } 620 } 621 } 622 623 return Ok(()); 624 } 625 } 626 627 /// # 环形依赖错误路径 628 /// 629 /// 本结构体用于在回溯过程中记录环形依赖的路径。 630 /// 631 /// 例如,假设有如下依赖关系: 632 /// 633 /// ```text 634 /// A -> B -> C -> D -> A 635 /// ``` 636 /// 637 /// 则在DFS回溯过程中,会依次记录如下路径: 638 /// 639 /// ```text 640 /// D -> A 641 /// C -> D 642 /// B -> C 643 /// A -> B 644 pub struct DependencyCycleError { 645 /// # 起始实体 646 /// 本错误的起始实体,即环形依赖的起点 647 head_entity: Arc<SchedEntity>, 648 /// 是否停止传播 649 stop_propagation: bool, 650 /// 依赖关系 651 dependencies: Vec<(Arc<SchedEntity>, Arc<SchedEntity>)>, 652 } 653 654 impl DependencyCycleError { 655 pub fn new(head_entity: Arc<SchedEntity>) -> Self { 656 Self { 657 head_entity, 658 stop_propagation: false, 659 dependencies: Vec::new(), 660 } 661 } 662 663 pub fn add(&mut self, current: Arc<SchedEntity>, dependency: Arc<SchedEntity>) { 664 self.dependencies.push((current, dependency)); 665 } 666 667 pub fn stop_propagation(&mut self) { 668 self.stop_propagation = true; 669 } 670 671 #[allow(dead_code)] 672 pub fn dependencies(&self) -> &Vec<(Arc<SchedEntity>, Arc<SchedEntity>)> { 673 &self.dependencies 674 } 675 676 pub fn display(&self) -> String { 677 let mut tmp = self.dependencies.clone(); 678 tmp.reverse(); 679 680 let mut ret = format!("Dependency cycle detected: \nStart ->\n"); 681 for (current, dep) in tmp.iter() { 682 ret.push_str(&format!( 683 "->\t{} ({})\t--depends-->\t{} ({})\n", 684 current.task().name_version(), 685 current.file_path().display(), 686 dep.task().name_version(), 687 dep.file_path().display() 688 )); 689 } 690 ret.push_str("-> End"); 691 return ret; 692 } 693 } 694