173779f3dSLoGin use std::{ 273779f3dSLoGin collections::{BTreeMap, HashMap}, 373779f3dSLoGin fmt::Debug, 473779f3dSLoGin path::PathBuf, 573779f3dSLoGin process::exit, 673779f3dSLoGin sync::{ 773779f3dSLoGin atomic::{AtomicI32, Ordering}, 873779f3dSLoGin Arc, Mutex, RwLock, 973779f3dSLoGin }, 1073779f3dSLoGin thread::ThreadId, 1173779f3dSLoGin }; 1273779f3dSLoGin 1373779f3dSLoGin use log::{error, info}; 1473779f3dSLoGin 1573779f3dSLoGin use crate::{ 1673779f3dSLoGin console::Action, 1773779f3dSLoGin context::DadkUserExecuteContext, 1873779f3dSLoGin executor::{target::Target, Executor}, 1973779f3dSLoGin parser::task::DADKTask, 2073779f3dSLoGin }; 2173779f3dSLoGin 2273779f3dSLoGin use self::task_deque::TASK_DEQUE; 2373779f3dSLoGin 2473779f3dSLoGin pub mod task_deque; 2573779f3dSLoGin #[cfg(test)] 2673779f3dSLoGin mod tests; 2773779f3dSLoGin 2873779f3dSLoGin lazy_static! { 2973779f3dSLoGin // 线程id与任务实体id映射表 3073779f3dSLoGin pub static ref TID_EID: Mutex<HashMap<ThreadId,i32>> = Mutex::new(HashMap::new()); 3173779f3dSLoGin } 3273779f3dSLoGin 3373779f3dSLoGin /// # 调度实体内部结构 3473779f3dSLoGin #[derive(Debug, Clone)] 3573779f3dSLoGin pub struct InnerEntity { 3673779f3dSLoGin /// 任务ID 3773779f3dSLoGin id: i32, 3873779f3dSLoGin file_path: PathBuf, 3973779f3dSLoGin /// 任务 4073779f3dSLoGin task: DADKTask, 4173779f3dSLoGin /// 入度 4273779f3dSLoGin indegree: usize, 4373779f3dSLoGin /// 子节点 4473779f3dSLoGin children: Vec<Arc<SchedEntity>>, 4573779f3dSLoGin /// target管理 4673779f3dSLoGin target: Option<Target>, 4773779f3dSLoGin } 4873779f3dSLoGin 4973779f3dSLoGin /// # 调度实体 5073779f3dSLoGin #[derive(Debug)] 5173779f3dSLoGin pub struct SchedEntity { 5273779f3dSLoGin inner: Mutex<InnerEntity>, 5373779f3dSLoGin } 5473779f3dSLoGin 5573779f3dSLoGin impl PartialEq for SchedEntity { 5673779f3dSLoGin fn eq(&self, other: &Self) -> bool { 5773779f3dSLoGin self.inner.lock().unwrap().id == other.inner.lock().unwrap().id 5873779f3dSLoGin } 5973779f3dSLoGin } 6073779f3dSLoGin 6173779f3dSLoGin impl SchedEntity { 6273779f3dSLoGin #[allow(dead_code)] 6373779f3dSLoGin pub fn id(&self) -> i32 { 6473779f3dSLoGin self.inner.lock().unwrap().id 6573779f3dSLoGin } 6673779f3dSLoGin 6773779f3dSLoGin #[allow(dead_code)] 6873779f3dSLoGin pub fn file_path(&self) -> PathBuf { 6973779f3dSLoGin self.inner.lock().unwrap().file_path.clone() 7073779f3dSLoGin } 7173779f3dSLoGin 7273779f3dSLoGin #[allow(dead_code)] 7373779f3dSLoGin pub fn task(&self) -> DADKTask { 7473779f3dSLoGin self.inner.lock().unwrap().task.clone() 7573779f3dSLoGin } 7673779f3dSLoGin 7773779f3dSLoGin /// 入度加1 7873779f3dSLoGin pub fn add_indegree(&self) { 7973779f3dSLoGin self.inner.lock().unwrap().indegree += 1; 8073779f3dSLoGin } 8173779f3dSLoGin 8273779f3dSLoGin /// 入度减1 8373779f3dSLoGin pub fn sub_indegree(&self) -> usize { 8473779f3dSLoGin self.inner.lock().unwrap().indegree -= 1; 8573779f3dSLoGin return self.inner.lock().unwrap().indegree; 8673779f3dSLoGin } 8773779f3dSLoGin 8873779f3dSLoGin /// 增加子节点 8973779f3dSLoGin pub fn add_child(&self, entity: Arc<SchedEntity>) { 9073779f3dSLoGin self.inner.lock().unwrap().children.push(entity); 9173779f3dSLoGin } 9273779f3dSLoGin 9373779f3dSLoGin /// 获取入度 9473779f3dSLoGin pub fn indegree(&self) -> usize { 9573779f3dSLoGin self.inner.lock().unwrap().indegree 9673779f3dSLoGin } 9773779f3dSLoGin 9873779f3dSLoGin /// 获取target 9973779f3dSLoGin pub fn target(&self) -> Option<Target> { 10073779f3dSLoGin self.inner.lock().unwrap().target.clone() 10173779f3dSLoGin } 10273779f3dSLoGin 10373779f3dSLoGin /// 当前任务完成后,所有子节点入度减1 10473779f3dSLoGin /// 10573779f3dSLoGin /// ## 参数 10673779f3dSLoGin /// 10773779f3dSLoGin /// 无 10873779f3dSLoGin /// 10973779f3dSLoGin /// ## 返回值 11073779f3dSLoGin /// 11173779f3dSLoGin /// 所有入度为0的子节点集合 11273779f3dSLoGin pub fn sub_children_indegree(&self) -> Vec<Arc<SchedEntity>> { 11373779f3dSLoGin let mut zero_child = Vec::new(); 11473779f3dSLoGin let children = &self.inner.lock().unwrap().children; 11573779f3dSLoGin for child in children.iter() { 11673779f3dSLoGin if child.sub_indegree() == 0 { 11773779f3dSLoGin zero_child.push(child.clone()); 11873779f3dSLoGin } 11973779f3dSLoGin } 12073779f3dSLoGin return zero_child; 12173779f3dSLoGin } 12273779f3dSLoGin } 12373779f3dSLoGin 12473779f3dSLoGin /// # 调度实体列表 12573779f3dSLoGin /// 12673779f3dSLoGin /// 用于存储所有的调度实体 12773779f3dSLoGin #[derive(Debug)] 12873779f3dSLoGin pub struct SchedEntities { 12973779f3dSLoGin /// 任务ID到调度实体的映射 13073779f3dSLoGin id2entity: RwLock<BTreeMap<i32, Arc<SchedEntity>>>, 13173779f3dSLoGin } 13273779f3dSLoGin 13373779f3dSLoGin impl SchedEntities { 13473779f3dSLoGin pub fn new() -> Self { 13573779f3dSLoGin Self { 13673779f3dSLoGin id2entity: RwLock::new(BTreeMap::new()), 13773779f3dSLoGin } 13873779f3dSLoGin } 13973779f3dSLoGin 14073779f3dSLoGin pub fn add(&mut self, entity: Arc<SchedEntity>) { 14173779f3dSLoGin self.id2entity 14273779f3dSLoGin .write() 14373779f3dSLoGin .unwrap() 14473779f3dSLoGin .insert(entity.id(), entity.clone()); 14573779f3dSLoGin } 14673779f3dSLoGin 14773779f3dSLoGin #[allow(dead_code)] 14873779f3dSLoGin pub fn get(&self, id: i32) -> Option<Arc<SchedEntity>> { 14973779f3dSLoGin self.id2entity.read().unwrap().get(&id).cloned() 15073779f3dSLoGin } 15173779f3dSLoGin 15273779f3dSLoGin pub fn get_by_name_version(&self, name: &str, version: &str) -> Option<Arc<SchedEntity>> { 15373779f3dSLoGin for e in self.id2entity.read().unwrap().iter() { 15473779f3dSLoGin if e.1.task().name_version_env() == DADKTask::name_version_uppercase(name, version) { 15573779f3dSLoGin return Some(e.1.clone()); 15673779f3dSLoGin } 15773779f3dSLoGin } 15873779f3dSLoGin return None; 15973779f3dSLoGin } 16073779f3dSLoGin 16173779f3dSLoGin pub fn entities(&self) -> Vec<Arc<SchedEntity>> { 16273779f3dSLoGin let mut v = Vec::new(); 16373779f3dSLoGin for e in self.id2entity.read().unwrap().iter() { 16473779f3dSLoGin v.push(e.1.clone()); 16573779f3dSLoGin } 16673779f3dSLoGin return v; 16773779f3dSLoGin } 16873779f3dSLoGin 16973779f3dSLoGin pub fn id2entity(&self) -> BTreeMap<i32, Arc<SchedEntity>> { 17073779f3dSLoGin self.id2entity.read().unwrap().clone() 17173779f3dSLoGin } 17273779f3dSLoGin 17373779f3dSLoGin #[allow(dead_code)] 17473779f3dSLoGin pub fn len(&self) -> usize { 17573779f3dSLoGin self.id2entity.read().unwrap().len() 17673779f3dSLoGin } 17773779f3dSLoGin 17873779f3dSLoGin #[allow(dead_code)] 17973779f3dSLoGin pub fn is_empty(&self) -> bool { 18073779f3dSLoGin self.id2entity.read().unwrap().is_empty() 18173779f3dSLoGin } 18273779f3dSLoGin 18373779f3dSLoGin #[allow(dead_code)] 18473779f3dSLoGin pub fn clear(&mut self) { 18573779f3dSLoGin self.id2entity.write().unwrap().clear(); 18673779f3dSLoGin } 18773779f3dSLoGin 18873779f3dSLoGin pub fn topo_sort(&self) -> Vec<Arc<SchedEntity>> { 18973779f3dSLoGin let mut result = Vec::new(); 19073779f3dSLoGin let mut visited = BTreeMap::new(); 19173779f3dSLoGin let btree = self.id2entity.write().unwrap().clone(); 19273779f3dSLoGin for entity in btree.iter() { 19373779f3dSLoGin if !visited.contains_key(entity.0) { 19473779f3dSLoGin let r = self.dfs(entity.1, &mut visited, &mut result); 19573779f3dSLoGin if r.is_err() { 19673779f3dSLoGin let err = r.unwrap_err(); 19773779f3dSLoGin error!("{}", err.display()); 19873779f3dSLoGin println!("Please fix the errors above and try again."); 19973779f3dSLoGin std::process::exit(1); 20073779f3dSLoGin } 20173779f3dSLoGin } 20273779f3dSLoGin } 20373779f3dSLoGin return result; 20473779f3dSLoGin } 20573779f3dSLoGin 20673779f3dSLoGin fn dfs( 20773779f3dSLoGin &self, 20873779f3dSLoGin entity: &Arc<SchedEntity>, 20973779f3dSLoGin visited: &mut BTreeMap<i32, bool>, 21073779f3dSLoGin result: &mut Vec<Arc<SchedEntity>>, 21173779f3dSLoGin ) -> Result<(), DependencyCycleError> { 21273779f3dSLoGin visited.insert(entity.id(), false); 21373779f3dSLoGin for dep in entity.task().depends.iter() { 21473779f3dSLoGin if let Some(dep_entity) = self.get_by_name_version(&dep.name, &dep.version) { 21573779f3dSLoGin let guard = self.id2entity.write().unwrap(); 21673779f3dSLoGin let e = guard.get(&entity.id()).unwrap(); 21773779f3dSLoGin let d = guard.get(&dep_entity.id()).unwrap(); 21873779f3dSLoGin e.add_indegree(); 21973779f3dSLoGin d.add_child(e.clone()); 22073779f3dSLoGin if let Some(&false) = visited.get(&dep_entity.id()) { 22173779f3dSLoGin // 输出完整环形依赖 22273779f3dSLoGin let mut err = DependencyCycleError::new(dep_entity.clone()); 22373779f3dSLoGin 22473779f3dSLoGin err.add(entity.clone(), dep_entity); 22573779f3dSLoGin return Err(err); 22673779f3dSLoGin } 22773779f3dSLoGin if !visited.contains_key(&dep_entity.id()) { 22873779f3dSLoGin drop(guard); 22973779f3dSLoGin let r = self.dfs(&dep_entity, visited, result); 23073779f3dSLoGin if r.is_err() { 23173779f3dSLoGin let mut err: DependencyCycleError = r.unwrap_err(); 23273779f3dSLoGin // 如果错误已经停止传播,则直接返回 23373779f3dSLoGin if err.stop_propagation { 23473779f3dSLoGin return Err(err); 23573779f3dSLoGin } 23673779f3dSLoGin // 如果当前实体是错误的起始实体,则停止传播 23773779f3dSLoGin if entity == &err.head_entity { 23873779f3dSLoGin err.stop_propagation(); 23973779f3dSLoGin } 24073779f3dSLoGin err.add(entity.clone(), dep_entity); 24173779f3dSLoGin return Err(err); 24273779f3dSLoGin } 24373779f3dSLoGin } 24473779f3dSLoGin } else { 24573779f3dSLoGin error!( 24673779f3dSLoGin "Dependency not found: {} -> {}", 24773779f3dSLoGin entity.task().name_version(), 24873779f3dSLoGin dep.name_version() 24973779f3dSLoGin ); 25073779f3dSLoGin std::process::exit(1); 25173779f3dSLoGin } 25273779f3dSLoGin } 25373779f3dSLoGin visited.insert(entity.id(), true); 25473779f3dSLoGin result.push(entity.clone()); 25573779f3dSLoGin return Ok(()); 25673779f3dSLoGin } 25773779f3dSLoGin } 25873779f3dSLoGin 25973779f3dSLoGin /// # 任务调度器 26073779f3dSLoGin #[derive(Debug)] 26173779f3dSLoGin pub struct Scheduler { 26273779f3dSLoGin /// DragonOS sysroot在主机上的路径 263*70352fd6SLoGin sysroot_dir: PathBuf, 26473779f3dSLoGin /// 要执行的操作 26573779f3dSLoGin action: Action, 26673779f3dSLoGin /// 调度实体列表 26773779f3dSLoGin target: SchedEntities, 26873779f3dSLoGin /// dadk执行的上下文 26973779f3dSLoGin context: Arc<DadkUserExecuteContext>, 27073779f3dSLoGin } 27173779f3dSLoGin 27273779f3dSLoGin pub enum SchedulerError { 27373779f3dSLoGin TaskError(String), 27473779f3dSLoGin /// 不是当前正在编译的目标架构 27573779f3dSLoGin InvalidTargetArch(String), 27673779f3dSLoGin DependencyNotFound(Arc<SchedEntity>, String), 27773779f3dSLoGin RunError(String), 27873779f3dSLoGin } 27973779f3dSLoGin 28073779f3dSLoGin impl Debug for SchedulerError { 28173779f3dSLoGin fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 28273779f3dSLoGin match self { 28373779f3dSLoGin Self::TaskError(arg0) => { 28473779f3dSLoGin write!(f, "TaskError: {}", arg0) 28573779f3dSLoGin } 28673779f3dSLoGin SchedulerError::DependencyNotFound(current, msg) => { 28773779f3dSLoGin write!( 28873779f3dSLoGin f, 28973779f3dSLoGin "For task {}, dependency not found: {}. Please check file: {}", 29073779f3dSLoGin current.task().name_version(), 29173779f3dSLoGin msg, 29273779f3dSLoGin current.file_path().display() 29373779f3dSLoGin ) 29473779f3dSLoGin } 29573779f3dSLoGin SchedulerError::RunError(msg) => { 29673779f3dSLoGin write!(f, "RunError: {}", msg) 29773779f3dSLoGin } 29873779f3dSLoGin SchedulerError::InvalidTargetArch(msg) => { 29973779f3dSLoGin write!(f, "InvalidTargetArch: {}", msg) 30073779f3dSLoGin } 30173779f3dSLoGin } 30273779f3dSLoGin } 30373779f3dSLoGin } 30473779f3dSLoGin 30573779f3dSLoGin impl Scheduler { 30673779f3dSLoGin pub fn new( 30773779f3dSLoGin context: Arc<DadkUserExecuteContext>, 30873779f3dSLoGin dragonos_dir: PathBuf, 30973779f3dSLoGin action: Action, 31073779f3dSLoGin tasks: Vec<(PathBuf, DADKTask)>, 31173779f3dSLoGin ) -> Result<Self, SchedulerError> { 31273779f3dSLoGin let entities = SchedEntities::new(); 31373779f3dSLoGin 31473779f3dSLoGin let mut scheduler = Scheduler { 315*70352fd6SLoGin sysroot_dir: dragonos_dir, 31673779f3dSLoGin action, 31773779f3dSLoGin target: entities, 31873779f3dSLoGin context, 31973779f3dSLoGin }; 32073779f3dSLoGin 32173779f3dSLoGin let r = scheduler.add_tasks(tasks); 32273779f3dSLoGin if r.is_err() { 32373779f3dSLoGin error!("Error while adding tasks: {:?}", r); 32473779f3dSLoGin return Err(r.err().unwrap()); 32573779f3dSLoGin } 32673779f3dSLoGin 32773779f3dSLoGin return Ok(scheduler); 32873779f3dSLoGin } 32973779f3dSLoGin 33073779f3dSLoGin /// # 添加多个任务 33173779f3dSLoGin /// 33273779f3dSLoGin /// 添加任务到调度器中,如果任务已经存在,则返回错误 33373779f3dSLoGin pub fn add_tasks(&mut self, tasks: Vec<(PathBuf, DADKTask)>) -> Result<(), SchedulerError> { 33473779f3dSLoGin for task in tasks { 33573779f3dSLoGin let e = self.add_task(task.0, task.1); 33673779f3dSLoGin if e.is_err() { 33773779f3dSLoGin if let Err(SchedulerError::InvalidTargetArch(_)) = &e { 33873779f3dSLoGin continue; 33973779f3dSLoGin } 34073779f3dSLoGin e?; 34173779f3dSLoGin } 34273779f3dSLoGin } 34373779f3dSLoGin 34473779f3dSLoGin return Ok(()); 34573779f3dSLoGin } 34673779f3dSLoGin 34773779f3dSLoGin /// # 任务是否匹配当前目标架构 34873779f3dSLoGin pub fn task_arch_matched(&self, task: &DADKTask) -> bool { 34973779f3dSLoGin task.target_arch.contains(self.context.target_arch()) 35073779f3dSLoGin } 35173779f3dSLoGin 35273779f3dSLoGin /// # 添加一个任务 35373779f3dSLoGin /// 35473779f3dSLoGin /// 添加任务到调度器中,如果任务已经存在,则返回错误 35573779f3dSLoGin pub fn add_task( 35673779f3dSLoGin &mut self, 35773779f3dSLoGin path: PathBuf, 35873779f3dSLoGin task: DADKTask, 35973779f3dSLoGin ) -> Result<Arc<SchedEntity>, SchedulerError> { 36073779f3dSLoGin if !self.task_arch_matched(&task) { 36173779f3dSLoGin return Err(SchedulerError::InvalidTargetArch(format!( 36273779f3dSLoGin "Task {} is not for target arch: {:?}", 36373779f3dSLoGin task.name_version(), 36473779f3dSLoGin self.context.target_arch() 36573779f3dSLoGin ))); 36673779f3dSLoGin } 36773779f3dSLoGin 36873779f3dSLoGin let id: i32 = self.generate_task_id(); 36973779f3dSLoGin let indegree: usize = 0; 37073779f3dSLoGin let children = Vec::new(); 37173779f3dSLoGin let target = self.generate_task_target(&path, &task.rust_target)?; 37273779f3dSLoGin let entity = Arc::new(SchedEntity { 37373779f3dSLoGin inner: Mutex::new(InnerEntity { 37473779f3dSLoGin id, 37573779f3dSLoGin task, 37673779f3dSLoGin file_path: path.clone(), 37773779f3dSLoGin indegree, 37873779f3dSLoGin children, 37973779f3dSLoGin target, 38073779f3dSLoGin }), 38173779f3dSLoGin }); 38273779f3dSLoGin let name_version = (entity.task().name.clone(), entity.task().version.clone()); 38373779f3dSLoGin 38473779f3dSLoGin if self 38573779f3dSLoGin .target 38673779f3dSLoGin .get_by_name_version(&name_version.0, &name_version.1) 38773779f3dSLoGin .is_some() 38873779f3dSLoGin { 38973779f3dSLoGin return Err(SchedulerError::TaskError(format!( 39073779f3dSLoGin "Task with name [{}] and version [{}] already exists. Config file: {}", 39173779f3dSLoGin name_version.0, 39273779f3dSLoGin name_version.1, 39373779f3dSLoGin path.display() 39473779f3dSLoGin ))); 39573779f3dSLoGin } 39673779f3dSLoGin 39773779f3dSLoGin self.target.add(entity.clone()); 39873779f3dSLoGin 39973779f3dSLoGin info!("Task added: {}", entity.task().name_version()); 40073779f3dSLoGin return Ok(entity); 40173779f3dSLoGin } 40273779f3dSLoGin 40373779f3dSLoGin fn generate_task_id(&self) -> i32 { 40473779f3dSLoGin static TASK_ID: AtomicI32 = AtomicI32::new(0); 40573779f3dSLoGin return TASK_ID.fetch_add(1, Ordering::SeqCst); 40673779f3dSLoGin } 40773779f3dSLoGin 40873779f3dSLoGin fn generate_task_target( 40973779f3dSLoGin &self, 41073779f3dSLoGin path: &PathBuf, 41173779f3dSLoGin rust_target: &Option<String>, 41273779f3dSLoGin ) -> Result<Option<Target>, SchedulerError> { 41373779f3dSLoGin if let Some(rust_target) = rust_target { 41473779f3dSLoGin // 如果rust_target字段不为none,说明需要target管理 41573779f3dSLoGin // 获取dadk任务路径,用于生成临时dadk文件名 41673779f3dSLoGin let file_str = path.as_path().to_str().unwrap(); 41773779f3dSLoGin let tmp_dadk_path = Target::tmp_dadk(file_str); 41873779f3dSLoGin let tmp_dadk_str = tmp_dadk_path.as_path().to_str().unwrap(); 41973779f3dSLoGin 42073779f3dSLoGin if Target::is_user_target(rust_target) { 42173779f3dSLoGin // 如果target文件是用户自己的 42273779f3dSLoGin if let Ok(target_path) = Target::user_target_path(rust_target) { 42373779f3dSLoGin let target_path_str = target_path.as_path().to_str().unwrap(); 42473779f3dSLoGin let index = target_path_str.rfind('/').unwrap(); 42573779f3dSLoGin let target_name = target_path_str[index + 1..].to_string(); 42673779f3dSLoGin let tmp_target = PathBuf::from(format!("{}{}", tmp_dadk_str, target_name)); 42773779f3dSLoGin return Ok(Some(Target::new(tmp_target))); 42873779f3dSLoGin } else { 42973779f3dSLoGin return Err(SchedulerError::TaskError( 43073779f3dSLoGin "The path of target file is invalid.".to_string(), 43173779f3dSLoGin )); 43273779f3dSLoGin } 43373779f3dSLoGin } else { 43473779f3dSLoGin // 如果target文件是内置的 43573779f3dSLoGin let tmp_target = PathBuf::from(format!("{}{}.json", tmp_dadk_str, rust_target)); 43673779f3dSLoGin return Ok(Some(Target::new(tmp_target))); 43773779f3dSLoGin } 43873779f3dSLoGin } 43973779f3dSLoGin return Ok(None); 44073779f3dSLoGin } 44173779f3dSLoGin 44273779f3dSLoGin /// # 执行调度器中的所有任务 44373779f3dSLoGin pub fn run(&self) -> Result<(), SchedulerError> { 44473779f3dSLoGin // 准备全局环境变量 44573779f3dSLoGin crate::executor::prepare_env(&self.target, &self.context) 44673779f3dSLoGin .map_err(|e| SchedulerError::RunError(format!("{:?}", e)))?; 44773779f3dSLoGin 44873779f3dSLoGin match self.action { 44973779f3dSLoGin Action::Build | Action::Install => { 45073779f3dSLoGin self.run_with_topo_sort()?; 45173779f3dSLoGin } 45273779f3dSLoGin Action::Clean(_) => self.run_without_topo_sort()?, 45373779f3dSLoGin _ => unimplemented!(), 45473779f3dSLoGin } 45573779f3dSLoGin 45673779f3dSLoGin return Ok(()); 45773779f3dSLoGin } 45873779f3dSLoGin 45973779f3dSLoGin /// Action需要按照拓扑序执行 46073779f3dSLoGin /// 46173779f3dSLoGin /// Action::Build | Action::Install 46273779f3dSLoGin fn run_with_topo_sort(&self) -> Result<(), SchedulerError> { 46373779f3dSLoGin // 检查是否有不存在的依赖 46473779f3dSLoGin let r = self.check_not_exists_dependency(); 46573779f3dSLoGin if r.is_err() { 46673779f3dSLoGin error!("Error while checking tasks: {:?}", r); 46773779f3dSLoGin return r; 46873779f3dSLoGin } 46973779f3dSLoGin 47073779f3dSLoGin // 对调度实体进行拓扑排序 47173779f3dSLoGin let r: Vec<Arc<SchedEntity>> = self.target.topo_sort(); 47273779f3dSLoGin 47373779f3dSLoGin let action = self.action.clone(); 474*70352fd6SLoGin let dragonos_dir = self.sysroot_dir.clone(); 47573779f3dSLoGin let id2entity = self.target.id2entity(); 47673779f3dSLoGin let count = r.len(); 47773779f3dSLoGin 47873779f3dSLoGin // 启动守护线程 47973779f3dSLoGin let handler = std::thread::spawn(move || { 48073779f3dSLoGin Self::build_install_daemon(action, dragonos_dir, id2entity, count, &r) 48173779f3dSLoGin }); 48273779f3dSLoGin 48373779f3dSLoGin handler.join().expect("Could not join deamon"); 48473779f3dSLoGin 48573779f3dSLoGin return Ok(()); 48673779f3dSLoGin } 48773779f3dSLoGin 48873779f3dSLoGin /// Action不需要按照拓扑序执行 48973779f3dSLoGin fn run_without_topo_sort(&self) -> Result<(), SchedulerError> { 49073779f3dSLoGin // 启动守护线程 49173779f3dSLoGin let action = self.action.clone(); 492*70352fd6SLoGin let dragonos_dir = self.sysroot_dir.clone(); 49373779f3dSLoGin let mut r = self.target.entities(); 49473779f3dSLoGin let handler = std::thread::spawn(move || { 49573779f3dSLoGin Self::clean_daemon(action, dragonos_dir, &mut r); 49673779f3dSLoGin }); 49773779f3dSLoGin 49873779f3dSLoGin handler.join().expect("Could not join deamon"); 49973779f3dSLoGin return Ok(()); 50073779f3dSLoGin } 50173779f3dSLoGin 50273779f3dSLoGin pub fn execute(action: Action, dragonos_dir: PathBuf, entity: Arc<SchedEntity>) { 50373779f3dSLoGin let mut executor = Executor::new(entity.clone(), action.clone(), dragonos_dir.clone()) 50473779f3dSLoGin .map_err(|e| { 50573779f3dSLoGin error!( 50673779f3dSLoGin "Error while creating executor for task {} : {:?}", 50773779f3dSLoGin entity.task().name_version(), 50873779f3dSLoGin e 50973779f3dSLoGin ); 51073779f3dSLoGin exit(-1); 51173779f3dSLoGin }) 51273779f3dSLoGin .unwrap(); 51373779f3dSLoGin 51473779f3dSLoGin executor 51573779f3dSLoGin .execute() 51673779f3dSLoGin .map_err(|e| { 51773779f3dSLoGin error!( 51873779f3dSLoGin "Error while executing task {} : {:?}", 51973779f3dSLoGin entity.task().name_version(), 52073779f3dSLoGin e 52173779f3dSLoGin ); 52273779f3dSLoGin exit(-1); 52373779f3dSLoGin }) 52473779f3dSLoGin .unwrap(); 52573779f3dSLoGin } 52673779f3dSLoGin 52773779f3dSLoGin /// 构建和安装DADK任务的守护线程 52873779f3dSLoGin /// 52973779f3dSLoGin /// ## 参数 53073779f3dSLoGin /// 53173779f3dSLoGin /// - `action` : 要执行的操作 53273779f3dSLoGin /// - `dragonos_dir` : DragonOS sysroot在主机上的路径 53373779f3dSLoGin /// - `id2entity` : DADK任务id与实体映射表 53473779f3dSLoGin /// - `count` : 当前剩余任务数 53573779f3dSLoGin /// - `r` : 总任务实体表 53673779f3dSLoGin /// 53773779f3dSLoGin /// ## 返回值 53873779f3dSLoGin /// 53973779f3dSLoGin /// 无 54073779f3dSLoGin pub fn build_install_daemon( 54173779f3dSLoGin action: Action, 54273779f3dSLoGin dragonos_dir: PathBuf, 54373779f3dSLoGin id2entity: BTreeMap<i32, Arc<SchedEntity>>, 54473779f3dSLoGin mut count: usize, 54573779f3dSLoGin r: &Vec<Arc<SchedEntity>>, 54673779f3dSLoGin ) { 54773779f3dSLoGin let mut guard = TASK_DEQUE.lock().unwrap(); 54873779f3dSLoGin // 初始化0入度的任务实体 54973779f3dSLoGin let mut zero_entity: Vec<Arc<SchedEntity>> = Vec::new(); 55073779f3dSLoGin for e in r.iter() { 55173779f3dSLoGin if e.indegree() == 0 { 55273779f3dSLoGin zero_entity.push(e.clone()); 55373779f3dSLoGin } 55473779f3dSLoGin } 55573779f3dSLoGin 55673779f3dSLoGin while count > 0 { 55773779f3dSLoGin // 将入度为0的任务实体加入任务队列中,直至没有入度为0的任务实体 或 任务队列满了 55873779f3dSLoGin while !zero_entity.is_empty() 55973779f3dSLoGin && guard.build_install_task( 56073779f3dSLoGin action.clone(), 56173779f3dSLoGin dragonos_dir.clone(), 56273779f3dSLoGin zero_entity.last().unwrap().clone(), 56373779f3dSLoGin ) 56473779f3dSLoGin { 56573779f3dSLoGin zero_entity.pop(); 56673779f3dSLoGin } 56773779f3dSLoGin 56873779f3dSLoGin let queue = guard.queue_mut(); 56973779f3dSLoGin // 如果任务线程已完成,将其从任务队列中删除,并把它的子节点入度减1,如果有0入度子节点,则加入zero_entity,后续可以加入任务队列中 57073779f3dSLoGin queue.retain(|x| { 57173779f3dSLoGin if x.is_finished() { 57273779f3dSLoGin count -= 1; 57373779f3dSLoGin let tid = x.thread().id(); 57473779f3dSLoGin let eid = *TID_EID.lock().unwrap().get(&tid).unwrap(); 57573779f3dSLoGin let entity = id2entity.get(&eid).unwrap(); 57673779f3dSLoGin let zero = entity.sub_children_indegree(); 57773779f3dSLoGin for e in zero.iter() { 57873779f3dSLoGin zero_entity.push(e.clone()); 57973779f3dSLoGin } 58073779f3dSLoGin return false; 58173779f3dSLoGin } 58273779f3dSLoGin return true; 58373779f3dSLoGin }) 58473779f3dSLoGin } 58573779f3dSLoGin } 58673779f3dSLoGin 58773779f3dSLoGin /// 清理DADK任务的守护线程 58873779f3dSLoGin /// 58973779f3dSLoGin /// ## 参数 59073779f3dSLoGin /// 59173779f3dSLoGin /// - `action` : 要执行的操作 59273779f3dSLoGin /// - `dragonos_dir` : DragonOS sysroot在主机上的路径 59373779f3dSLoGin /// - `r` : 总任务实体表 59473779f3dSLoGin /// 59573779f3dSLoGin /// ## 返回值 59673779f3dSLoGin /// 59773779f3dSLoGin /// 无 59873779f3dSLoGin pub fn clean_daemon(action: Action, dragonos_dir: PathBuf, r: &mut Vec<Arc<SchedEntity>>) { 59973779f3dSLoGin let mut guard = TASK_DEQUE.lock().unwrap(); 60073779f3dSLoGin while !guard.queue().is_empty() && !r.is_empty() { 60173779f3dSLoGin guard.clean_task(action, dragonos_dir.clone(), r.pop().unwrap().clone()); 60273779f3dSLoGin } 60373779f3dSLoGin } 60473779f3dSLoGin 60573779f3dSLoGin /// # 检查是否有不存在的依赖 60673779f3dSLoGin /// 60773779f3dSLoGin /// 如果某个任务的dependency中的任务不存在,则返回错误 60873779f3dSLoGin fn check_not_exists_dependency(&self) -> Result<(), SchedulerError> { 60973779f3dSLoGin for entity in self.target.entities().iter() { 61073779f3dSLoGin for dependency in entity.task().depends.iter() { 61173779f3dSLoGin let name_version = (dependency.name.clone(), dependency.version.clone()); 61273779f3dSLoGin if !self 61373779f3dSLoGin .target 61473779f3dSLoGin .get_by_name_version(&name_version.0, &name_version.1) 61573779f3dSLoGin .is_some() 61673779f3dSLoGin { 61773779f3dSLoGin return Err(SchedulerError::DependencyNotFound( 61873779f3dSLoGin entity.clone(), 61973779f3dSLoGin format!("name:{}, version:{}", name_version.0, name_version.1,), 62073779f3dSLoGin )); 62173779f3dSLoGin } 62273779f3dSLoGin } 62373779f3dSLoGin } 62473779f3dSLoGin 62573779f3dSLoGin return Ok(()); 62673779f3dSLoGin } 62773779f3dSLoGin } 62873779f3dSLoGin 62973779f3dSLoGin /// # 环形依赖错误路径 63073779f3dSLoGin /// 63173779f3dSLoGin /// 本结构体用于在回溯过程中记录环形依赖的路径。 63273779f3dSLoGin /// 63373779f3dSLoGin /// 例如,假设有如下依赖关系: 63473779f3dSLoGin /// 63573779f3dSLoGin /// ```text 63673779f3dSLoGin /// A -> B -> C -> D -> A 63773779f3dSLoGin /// ``` 63873779f3dSLoGin /// 63973779f3dSLoGin /// 则在DFS回溯过程中,会依次记录如下路径: 64073779f3dSLoGin /// 64173779f3dSLoGin /// ```text 64273779f3dSLoGin /// D -> A 64373779f3dSLoGin /// C -> D 64473779f3dSLoGin /// B -> C 64573779f3dSLoGin /// A -> B 64673779f3dSLoGin pub struct DependencyCycleError { 64773779f3dSLoGin /// # 起始实体 64873779f3dSLoGin /// 本错误的起始实体,即环形依赖的起点 64973779f3dSLoGin head_entity: Arc<SchedEntity>, 65073779f3dSLoGin /// 是否停止传播 65173779f3dSLoGin stop_propagation: bool, 65273779f3dSLoGin /// 依赖关系 65373779f3dSLoGin dependencies: Vec<(Arc<SchedEntity>, Arc<SchedEntity>)>, 65473779f3dSLoGin } 65573779f3dSLoGin 65673779f3dSLoGin impl DependencyCycleError { 65773779f3dSLoGin pub fn new(head_entity: Arc<SchedEntity>) -> Self { 65873779f3dSLoGin Self { 65973779f3dSLoGin head_entity, 66073779f3dSLoGin stop_propagation: false, 66173779f3dSLoGin dependencies: Vec::new(), 66273779f3dSLoGin } 66373779f3dSLoGin } 66473779f3dSLoGin 66573779f3dSLoGin pub fn add(&mut self, current: Arc<SchedEntity>, dependency: Arc<SchedEntity>) { 66673779f3dSLoGin self.dependencies.push((current, dependency)); 66773779f3dSLoGin } 66873779f3dSLoGin 66973779f3dSLoGin pub fn stop_propagation(&mut self) { 67073779f3dSLoGin self.stop_propagation = true; 67173779f3dSLoGin } 67273779f3dSLoGin 67373779f3dSLoGin #[allow(dead_code)] 67473779f3dSLoGin pub fn dependencies(&self) -> &Vec<(Arc<SchedEntity>, Arc<SchedEntity>)> { 67573779f3dSLoGin &self.dependencies 67673779f3dSLoGin } 67773779f3dSLoGin 67873779f3dSLoGin pub fn display(&self) -> String { 67973779f3dSLoGin let mut tmp = self.dependencies.clone(); 68073779f3dSLoGin tmp.reverse(); 68173779f3dSLoGin 68273779f3dSLoGin let mut ret = format!("Dependency cycle detected: \nStart ->\n"); 68373779f3dSLoGin for (current, dep) in tmp.iter() { 68473779f3dSLoGin ret.push_str(&format!( 68573779f3dSLoGin "->\t{} ({})\t--depends-->\t{} ({})\n", 68673779f3dSLoGin current.task().name_version(), 68773779f3dSLoGin current.file_path().display(), 68873779f3dSLoGin dep.task().name_version(), 68973779f3dSLoGin dep.file_path().display() 69073779f3dSLoGin )); 69173779f3dSLoGin } 69273779f3dSLoGin ret.push_str("-> End"); 69373779f3dSLoGin return ret; 69473779f3dSLoGin } 69573779f3dSLoGin } 696