1 use std::{ 2 collections::{BTreeMap, HashMap}, 3 fmt::Debug, 4 path::PathBuf, 5 process::exit, 6 sync::{ 7 atomic::{AtomicI32, Ordering}, 8 Arc, Mutex, RwLock, 9 }, 10 thread::ThreadId, 11 }; 12 13 use log::{error, info}; 14 15 use crate::{ 16 context::{Action, DadkUserExecuteContext}, 17 executor::Executor, 18 parser::task::DADKTask, 19 }; 20 21 use self::task_deque::TASK_DEQUE; 22 23 pub mod task_deque; 24 #[cfg(test)] 25 mod tests; 26 27 lazy_static! { 28 // 线程id与任务实体id映射表 29 pub static ref TID_EID: Mutex<HashMap<ThreadId,i32>> = Mutex::new(HashMap::new()); 30 } 31 32 /// # 调度实体内部结构 33 #[derive(Debug, Clone)] 34 pub struct InnerEntity { 35 /// 任务ID 36 id: i32, 37 file_path: PathBuf, 38 /// 任务 39 task: DADKTask, 40 /// 入度 41 indegree: usize, 42 /// 子节点 43 children: Vec<Arc<SchedEntity>>, 44 } 45 46 /// # 调度实体 47 #[derive(Debug)] 48 pub struct SchedEntity { 49 inner: Mutex<InnerEntity>, 50 } 51 52 impl PartialEq for SchedEntity { 53 fn eq(&self, other: &Self) -> bool { 54 self.inner.lock().unwrap().id == other.inner.lock().unwrap().id 55 } 56 } 57 58 impl SchedEntity { 59 #[allow(dead_code)] 60 pub fn id(&self) -> i32 { 61 self.inner.lock().unwrap().id 62 } 63 64 #[allow(dead_code)] 65 pub fn file_path(&self) -> PathBuf { 66 self.inner.lock().unwrap().file_path.clone() 67 } 68 69 #[allow(dead_code)] 70 pub fn task(&self) -> DADKTask { 71 self.inner.lock().unwrap().task.clone() 72 } 73 74 /// 入度加1 75 pub fn add_indegree(&self) { 76 self.inner.lock().unwrap().indegree += 1; 77 } 78 79 /// 入度减1 80 pub fn sub_indegree(&self) -> usize { 81 self.inner.lock().unwrap().indegree -= 1; 82 return self.inner.lock().unwrap().indegree; 83 } 84 85 /// 增加子节点 86 pub fn add_child(&self, entity: Arc<SchedEntity>) { 87 self.inner.lock().unwrap().children.push(entity); 88 } 89 90 /// 获取入度 91 pub fn indegree(&self) -> usize { 92 self.inner.lock().unwrap().indegree 93 } 94 95 /// 当前任务完成后,所有子节点入度减1 96 /// 97 /// ## 参数 98 /// 99 /// 无 100 /// 101 /// ## 返回值 102 /// 103 /// 所有入度为0的子节点集合 104 pub fn sub_children_indegree(&self) -> Vec<Arc<SchedEntity>> { 105 let mut zero_child = Vec::new(); 106 let children = &self.inner.lock().unwrap().children; 107 for child in children.iter() { 108 if child.sub_indegree() == 0 { 109 zero_child.push(child.clone()); 110 } 111 } 112 return zero_child; 113 } 114 } 115 116 /// # 调度实体列表 117 /// 118 /// 用于存储所有的调度实体 119 #[derive(Debug)] 120 pub struct SchedEntities { 121 /// 任务ID到调度实体的映射 122 id2entity: RwLock<BTreeMap<i32, Arc<SchedEntity>>>, 123 } 124 125 impl SchedEntities { 126 pub fn new() -> Self { 127 Self { 128 id2entity: RwLock::new(BTreeMap::new()), 129 } 130 } 131 132 pub fn add(&mut self, entity: Arc<SchedEntity>) { 133 self.id2entity 134 .write() 135 .unwrap() 136 .insert(entity.id(), entity.clone()); 137 } 138 139 #[allow(dead_code)] 140 pub fn get(&self, id: i32) -> Option<Arc<SchedEntity>> { 141 self.id2entity.read().unwrap().get(&id).cloned() 142 } 143 144 pub fn get_by_name_version(&self, name: &str, version: &str) -> Option<Arc<SchedEntity>> { 145 for e in self.id2entity.read().unwrap().iter() { 146 if e.1.task().name_version_env() == DADKTask::name_version_uppercase(name, version) { 147 return Some(e.1.clone()); 148 } 149 } 150 return None; 151 } 152 153 pub fn entities(&self) -> Vec<Arc<SchedEntity>> { 154 let mut v = Vec::new(); 155 for e in self.id2entity.read().unwrap().iter() { 156 v.push(e.1.clone()); 157 } 158 return v; 159 } 160 161 pub fn id2entity(&self) -> BTreeMap<i32, Arc<SchedEntity>> { 162 self.id2entity.read().unwrap().clone() 163 } 164 165 #[allow(dead_code)] 166 pub fn len(&self) -> usize { 167 self.id2entity.read().unwrap().len() 168 } 169 170 #[allow(dead_code)] 171 pub fn is_empty(&self) -> bool { 172 self.id2entity.read().unwrap().is_empty() 173 } 174 175 #[allow(dead_code)] 176 pub fn clear(&mut self) { 177 self.id2entity.write().unwrap().clear(); 178 } 179 180 pub fn topo_sort(&self) -> Vec<Arc<SchedEntity>> { 181 let mut result = Vec::new(); 182 let mut visited = BTreeMap::new(); 183 let btree = self.id2entity.write().unwrap().clone(); 184 for entity in btree.iter() { 185 if !visited.contains_key(entity.0) { 186 let r = self.dfs(entity.1, &mut visited, &mut result); 187 if r.is_err() { 188 let err = r.unwrap_err(); 189 error!("{}", err.display()); 190 println!("Please fix the errors above and try again."); 191 std::process::exit(1); 192 } 193 } 194 } 195 return result; 196 } 197 198 fn dfs( 199 &self, 200 entity: &Arc<SchedEntity>, 201 visited: &mut BTreeMap<i32, bool>, 202 result: &mut Vec<Arc<SchedEntity>>, 203 ) -> Result<(), DependencyCycleError> { 204 visited.insert(entity.id(), false); 205 for dep in entity.task().depends.iter() { 206 if let Some(dep_entity) = self.get_by_name_version(&dep.name, &dep.version) { 207 let guard = self.id2entity.write().unwrap(); 208 let e = guard.get(&entity.id()).unwrap(); 209 let d = guard.get(&dep_entity.id()).unwrap(); 210 e.add_indegree(); 211 d.add_child(e.clone()); 212 if let Some(&false) = visited.get(&dep_entity.id()) { 213 // 输出完整环形依赖 214 let mut err = DependencyCycleError::new(dep_entity.clone()); 215 216 err.add(entity.clone(), dep_entity); 217 return Err(err); 218 } 219 if !visited.contains_key(&dep_entity.id()) { 220 drop(guard); 221 let r = self.dfs(&dep_entity, visited, result); 222 if r.is_err() { 223 let mut err: DependencyCycleError = r.unwrap_err(); 224 // 如果错误已经停止传播,则直接返回 225 if err.stop_propagation { 226 return Err(err); 227 } 228 // 如果当前实体是错误的起始实体,则停止传播 229 if entity == &err.head_entity { 230 err.stop_propagation(); 231 } 232 err.add(entity.clone(), dep_entity); 233 return Err(err); 234 } 235 } 236 } else { 237 error!( 238 "Dependency not found: {} -> {}", 239 entity.task().name_version(), 240 dep.name_version() 241 ); 242 std::process::exit(1); 243 } 244 } 245 visited.insert(entity.id(), true); 246 result.push(entity.clone()); 247 return Ok(()); 248 } 249 } 250 251 /// # 任务调度器 252 #[derive(Debug)] 253 pub struct Scheduler { 254 /// DragonOS sysroot在主机上的路径 255 sysroot_dir: PathBuf, 256 /// 要执行的操作 257 action: Action, 258 /// 调度实体列表 259 target: SchedEntities, 260 /// dadk执行的上下文 261 context: Arc<DadkUserExecuteContext>, 262 } 263 264 pub enum SchedulerError { 265 TaskError(String), 266 /// 不是当前正在编译的目标架构 267 InvalidTargetArch(String), 268 DependencyNotFound(Arc<SchedEntity>, String), 269 RunError(String), 270 } 271 272 impl Debug for SchedulerError { 273 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 274 match self { 275 Self::TaskError(arg0) => { 276 write!(f, "TaskError: {}", arg0) 277 } 278 SchedulerError::DependencyNotFound(current, msg) => { 279 write!( 280 f, 281 "For task {}, dependency not found: {}. Please check file: {}", 282 current.task().name_version(), 283 msg, 284 current.file_path().display() 285 ) 286 } 287 SchedulerError::RunError(msg) => { 288 write!(f, "RunError: {}", msg) 289 } 290 SchedulerError::InvalidTargetArch(msg) => { 291 write!(f, "InvalidTargetArch: {}", msg) 292 } 293 } 294 } 295 } 296 297 impl Scheduler { 298 pub fn new( 299 context: Arc<DadkUserExecuteContext>, 300 dragonos_dir: PathBuf, 301 action: Action, 302 tasks: Vec<(PathBuf, DADKTask)>, 303 ) -> Result<Self, SchedulerError> { 304 let entities = SchedEntities::new(); 305 306 let mut scheduler = Scheduler { 307 sysroot_dir: dragonos_dir, 308 action, 309 target: entities, 310 context, 311 }; 312 313 let r = scheduler.add_tasks(tasks); 314 if r.is_err() { 315 error!("Error while adding tasks: {:?}", r); 316 return Err(r.err().unwrap()); 317 } 318 319 return Ok(scheduler); 320 } 321 322 /// # 添加多个任务 323 /// 324 /// 添加任务到调度器中,如果任务已经存在,则返回错误 325 pub fn add_tasks(&mut self, tasks: Vec<(PathBuf, DADKTask)>) -> Result<(), SchedulerError> { 326 for task in tasks { 327 let e = self.add_task(task.0, task.1); 328 if e.is_err() { 329 if let Err(SchedulerError::InvalidTargetArch(_)) = &e { 330 continue; 331 } 332 e?; 333 } 334 } 335 336 return Ok(()); 337 } 338 339 /// # 任务是否匹配当前目标架构 340 pub fn task_arch_matched(&self, task: &DADKTask) -> bool { 341 task.target_arch.contains(self.context.target_arch()) 342 } 343 344 /// # 添加一个任务 345 /// 346 /// 添加任务到调度器中,如果任务已经存在,则返回错误 347 pub fn add_task( 348 &mut self, 349 path: PathBuf, 350 task: DADKTask, 351 ) -> Result<Arc<SchedEntity>, SchedulerError> { 352 if !self.task_arch_matched(&task) { 353 return Err(SchedulerError::InvalidTargetArch(format!( 354 "Task {} is not for target arch: {:?}", 355 task.name_version(), 356 self.context.target_arch() 357 ))); 358 } 359 360 let id: i32 = self.generate_task_id(); 361 let indegree: usize = 0; 362 let children = Vec::new(); 363 let entity = Arc::new(SchedEntity { 364 inner: Mutex::new(InnerEntity { 365 id, 366 task, 367 file_path: path.clone(), 368 indegree, 369 children, 370 }), 371 }); 372 let name_version = (entity.task().name.clone(), entity.task().version.clone()); 373 374 if self 375 .target 376 .get_by_name_version(&name_version.0, &name_version.1) 377 .is_some() 378 { 379 return Err(SchedulerError::TaskError(format!( 380 "Task with name [{}] and version [{}] already exists. Config file: {}", 381 name_version.0, 382 name_version.1, 383 path.display() 384 ))); 385 } 386 387 self.target.add(entity.clone()); 388 389 info!("Task added: {}", entity.task().name_version()); 390 return Ok(entity); 391 } 392 393 fn generate_task_id(&self) -> i32 { 394 static TASK_ID: AtomicI32 = AtomicI32::new(0); 395 return TASK_ID.fetch_add(1, Ordering::SeqCst); 396 } 397 398 /// # 执行调度器中的所有任务 399 pub fn run(&self) -> Result<(), SchedulerError> { 400 // 准备全局环境变量 401 crate::executor::prepare_env(&self.target, &self.context) 402 .map_err(|e| SchedulerError::RunError(format!("{:?}", e)))?; 403 404 match self.action { 405 Action::Build | Action::Install => { 406 self.run_with_topo_sort()?; 407 } 408 Action::Clean(_) => self.run_without_topo_sort()?, 409 } 410 411 return Ok(()); 412 } 413 414 /// Action需要按照拓扑序执行 415 /// 416 /// Action::Build | Action::Install 417 fn run_with_topo_sort(&self) -> Result<(), SchedulerError> { 418 // 检查是否有不存在的依赖 419 let r = self.check_not_exists_dependency(); 420 if r.is_err() { 421 error!("Error while checking tasks: {:?}", r); 422 return r; 423 } 424 425 // 对调度实体进行拓扑排序 426 let r: Vec<Arc<SchedEntity>> = self.target.topo_sort(); 427 428 let action = self.action.clone(); 429 let dragonos_dir = self.sysroot_dir.clone(); 430 let id2entity = self.target.id2entity(); 431 let count = r.len(); 432 433 // 启动守护线程 434 let handler = std::thread::spawn(move || { 435 Self::build_install_daemon(action, dragonos_dir, id2entity, count, &r) 436 }); 437 438 handler.join().expect("Could not join deamon"); 439 440 return Ok(()); 441 } 442 443 /// Action不需要按照拓扑序执行 444 fn run_without_topo_sort(&self) -> Result<(), SchedulerError> { 445 // 启动守护线程 446 let action = self.action.clone(); 447 let dragonos_dir = self.sysroot_dir.clone(); 448 let mut r = self.target.entities(); 449 let handler = std::thread::spawn(move || { 450 Self::clean_daemon(action, dragonos_dir, &mut r); 451 }); 452 453 handler.join().expect("Could not join deamon"); 454 return Ok(()); 455 } 456 457 pub fn execute(action: Action, dragonos_dir: PathBuf, entity: Arc<SchedEntity>) { 458 let mut executor = Executor::new(entity.clone(), action.clone(), dragonos_dir.clone()) 459 .map_err(|e| { 460 error!( 461 "Error while creating executor for task {} : {:?}", 462 entity.task().name_version(), 463 e 464 ); 465 exit(-1); 466 }) 467 .unwrap(); 468 469 executor 470 .execute() 471 .map_err(|e| { 472 error!( 473 "Error while executing task {} : {:?}", 474 entity.task().name_version(), 475 e 476 ); 477 exit(-1); 478 }) 479 .unwrap(); 480 } 481 482 /// 构建和安装DADK任务的守护线程 483 /// 484 /// ## 参数 485 /// 486 /// - `action` : 要执行的操作 487 /// - `dragonos_dir` : DragonOS sysroot在主机上的路径 488 /// - `id2entity` : DADK任务id与实体映射表 489 /// - `count` : 当前剩余任务数 490 /// - `r` : 总任务实体表 491 /// 492 /// ## 返回值 493 /// 494 /// 无 495 pub fn build_install_daemon( 496 action: Action, 497 dragonos_dir: PathBuf, 498 id2entity: BTreeMap<i32, Arc<SchedEntity>>, 499 mut count: usize, 500 r: &Vec<Arc<SchedEntity>>, 501 ) { 502 let mut guard = TASK_DEQUE.lock().unwrap(); 503 // 初始化0入度的任务实体 504 let mut zero_entity: Vec<Arc<SchedEntity>> = Vec::new(); 505 for e in r.iter() { 506 if e.indegree() == 0 { 507 zero_entity.push(e.clone()); 508 } 509 } 510 511 while count > 0 { 512 // 将入度为0的任务实体加入任务队列中,直至没有入度为0的任务实体 或 任务队列满了 513 while !zero_entity.is_empty() 514 && guard.build_install_task( 515 action.clone(), 516 dragonos_dir.clone(), 517 zero_entity.last().unwrap().clone(), 518 ) 519 { 520 zero_entity.pop(); 521 } 522 523 let queue = guard.queue_mut(); 524 // 如果任务线程已完成,将其从任务队列中删除,并把它的子节点入度减1,如果有0入度子节点,则加入zero_entity,后续可以加入任务队列中 525 queue.retain(|x| { 526 if x.is_finished() { 527 count -= 1; 528 let tid = x.thread().id(); 529 let eid = *TID_EID.lock().unwrap().get(&tid).unwrap(); 530 let entity = id2entity.get(&eid).unwrap(); 531 let zero = entity.sub_children_indegree(); 532 for e in zero.iter() { 533 zero_entity.push(e.clone()); 534 } 535 return false; 536 } 537 return true; 538 }) 539 } 540 } 541 542 /// 清理DADK任务的守护线程 543 /// 544 /// ## 参数 545 /// 546 /// - `action` : 要执行的操作 547 /// - `dragonos_dir` : DragonOS sysroot在主机上的路径 548 /// - `r` : 总任务实体表 549 /// 550 /// ## 返回值 551 /// 552 /// 无 553 pub fn clean_daemon(action: Action, dragonos_dir: PathBuf, r: &mut Vec<Arc<SchedEntity>>) { 554 let mut guard = TASK_DEQUE.lock().unwrap(); 555 while !guard.queue().is_empty() && !r.is_empty() { 556 guard.clean_task(action, dragonos_dir.clone(), r.pop().unwrap().clone()); 557 } 558 } 559 560 /// # 检查是否有不存在的依赖 561 /// 562 /// 如果某个任务的dependency中的任务不存在,则返回错误 563 fn check_not_exists_dependency(&self) -> Result<(), SchedulerError> { 564 for entity in self.target.entities().iter() { 565 for dependency in entity.task().depends.iter() { 566 let name_version = (dependency.name.clone(), dependency.version.clone()); 567 if !self 568 .target 569 .get_by_name_version(&name_version.0, &name_version.1) 570 .is_some() 571 { 572 return Err(SchedulerError::DependencyNotFound( 573 entity.clone(), 574 format!("name:{}, version:{}", name_version.0, name_version.1,), 575 )); 576 } 577 } 578 } 579 580 return Ok(()); 581 } 582 } 583 584 /// # 环形依赖错误路径 585 /// 586 /// 本结构体用于在回溯过程中记录环形依赖的路径。 587 /// 588 /// 例如,假设有如下依赖关系: 589 /// 590 /// ```text 591 /// A -> B -> C -> D -> A 592 /// ``` 593 /// 594 /// 则在DFS回溯过程中,会依次记录如下路径: 595 /// 596 /// ```text 597 /// D -> A 598 /// C -> D 599 /// B -> C 600 /// A -> B 601 pub struct DependencyCycleError { 602 /// # 起始实体 603 /// 本错误的起始实体,即环形依赖的起点 604 head_entity: Arc<SchedEntity>, 605 /// 是否停止传播 606 stop_propagation: bool, 607 /// 依赖关系 608 dependencies: Vec<(Arc<SchedEntity>, Arc<SchedEntity>)>, 609 } 610 611 impl DependencyCycleError { 612 pub fn new(head_entity: Arc<SchedEntity>) -> Self { 613 Self { 614 head_entity, 615 stop_propagation: false, 616 dependencies: Vec::new(), 617 } 618 } 619 620 pub fn add(&mut self, current: Arc<SchedEntity>, dependency: Arc<SchedEntity>) { 621 self.dependencies.push((current, dependency)); 622 } 623 624 pub fn stop_propagation(&mut self) { 625 self.stop_propagation = true; 626 } 627 628 #[allow(dead_code)] 629 pub fn dependencies(&self) -> &Vec<(Arc<SchedEntity>, Arc<SchedEntity>)> { 630 &self.dependencies 631 } 632 633 pub fn display(&self) -> String { 634 let mut tmp = self.dependencies.clone(); 635 tmp.reverse(); 636 637 let mut ret = format!("Dependency cycle detected: \nStart ->\n"); 638 for (current, dep) in tmp.iter() { 639 ret.push_str(&format!( 640 "->\t{} ({})\t--depends-->\t{} ({})\n", 641 current.task().name_version(), 642 current.file_path().display(), 643 dep.task().name_version(), 644 dep.file_path().display() 645 )); 646 } 647 ret.push_str("-> End"); 648 return ret; 649 } 650 } 651