173779f3dSLoGin use std::{ 273779f3dSLoGin collections::{BTreeMap, HashMap}, 373779f3dSLoGin fmt::Debug, 473779f3dSLoGin path::PathBuf, 573779f3dSLoGin process::exit, 673779f3dSLoGin sync::{ 773779f3dSLoGin atomic::{AtomicI32, Ordering}, 873779f3dSLoGin Arc, Mutex, RwLock, 973779f3dSLoGin }, 1073779f3dSLoGin thread::ThreadId, 1173779f3dSLoGin }; 1273779f3dSLoGin 1373779f3dSLoGin use log::{error, info}; 1473779f3dSLoGin 1573779f3dSLoGin use crate::{ 16*eaa67f3cSLoGin context::{Action, DadkUserExecuteContext}, 1773779f3dSLoGin executor::{target::Target, Executor}, 1873779f3dSLoGin parser::task::DADKTask, 1973779f3dSLoGin }; 2073779f3dSLoGin 2173779f3dSLoGin use self::task_deque::TASK_DEQUE; 2273779f3dSLoGin 2373779f3dSLoGin pub mod task_deque; 2473779f3dSLoGin #[cfg(test)] 2573779f3dSLoGin mod tests; 2673779f3dSLoGin 2773779f3dSLoGin lazy_static! { 2873779f3dSLoGin // 线程id与任务实体id映射表 2973779f3dSLoGin pub static ref TID_EID: Mutex<HashMap<ThreadId,i32>> = Mutex::new(HashMap::new()); 3073779f3dSLoGin } 3173779f3dSLoGin 3273779f3dSLoGin /// # 调度实体内部结构 3373779f3dSLoGin #[derive(Debug, Clone)] 3473779f3dSLoGin pub struct InnerEntity { 3573779f3dSLoGin /// 任务ID 3673779f3dSLoGin id: i32, 3773779f3dSLoGin file_path: PathBuf, 3873779f3dSLoGin /// 任务 3973779f3dSLoGin task: DADKTask, 4073779f3dSLoGin /// 入度 4173779f3dSLoGin indegree: usize, 4273779f3dSLoGin /// 子节点 4373779f3dSLoGin children: Vec<Arc<SchedEntity>>, 4473779f3dSLoGin /// target管理 4573779f3dSLoGin target: Option<Target>, 4673779f3dSLoGin } 4773779f3dSLoGin 4873779f3dSLoGin /// # 调度实体 4973779f3dSLoGin #[derive(Debug)] 5073779f3dSLoGin pub struct SchedEntity { 5173779f3dSLoGin inner: Mutex<InnerEntity>, 5273779f3dSLoGin } 5373779f3dSLoGin 5473779f3dSLoGin impl PartialEq for SchedEntity { 5573779f3dSLoGin fn eq(&self, other: &Self) -> bool { 5673779f3dSLoGin self.inner.lock().unwrap().id == other.inner.lock().unwrap().id 5773779f3dSLoGin } 5873779f3dSLoGin } 5973779f3dSLoGin 6073779f3dSLoGin impl SchedEntity { 6173779f3dSLoGin #[allow(dead_code)] 6273779f3dSLoGin pub fn id(&self) -> i32 { 6373779f3dSLoGin self.inner.lock().unwrap().id 6473779f3dSLoGin } 6573779f3dSLoGin 6673779f3dSLoGin #[allow(dead_code)] 6773779f3dSLoGin pub fn file_path(&self) -> PathBuf { 6873779f3dSLoGin self.inner.lock().unwrap().file_path.clone() 6973779f3dSLoGin } 7073779f3dSLoGin 7173779f3dSLoGin #[allow(dead_code)] 7273779f3dSLoGin pub fn task(&self) -> DADKTask { 7373779f3dSLoGin self.inner.lock().unwrap().task.clone() 7473779f3dSLoGin } 7573779f3dSLoGin 7673779f3dSLoGin /// 入度加1 7773779f3dSLoGin pub fn add_indegree(&self) { 7873779f3dSLoGin self.inner.lock().unwrap().indegree += 1; 7973779f3dSLoGin } 8073779f3dSLoGin 8173779f3dSLoGin /// 入度减1 8273779f3dSLoGin pub fn sub_indegree(&self) -> usize { 8373779f3dSLoGin self.inner.lock().unwrap().indegree -= 1; 8473779f3dSLoGin return self.inner.lock().unwrap().indegree; 8573779f3dSLoGin } 8673779f3dSLoGin 8773779f3dSLoGin /// 增加子节点 8873779f3dSLoGin pub fn add_child(&self, entity: Arc<SchedEntity>) { 8973779f3dSLoGin self.inner.lock().unwrap().children.push(entity); 9073779f3dSLoGin } 9173779f3dSLoGin 9273779f3dSLoGin /// 获取入度 9373779f3dSLoGin pub fn indegree(&self) -> usize { 9473779f3dSLoGin self.inner.lock().unwrap().indegree 9573779f3dSLoGin } 9673779f3dSLoGin 9773779f3dSLoGin /// 获取target 9873779f3dSLoGin pub fn target(&self) -> Option<Target> { 9973779f3dSLoGin self.inner.lock().unwrap().target.clone() 10073779f3dSLoGin } 10173779f3dSLoGin 10273779f3dSLoGin /// 当前任务完成后,所有子节点入度减1 10373779f3dSLoGin /// 10473779f3dSLoGin /// ## 参数 10573779f3dSLoGin /// 10673779f3dSLoGin /// 无 10773779f3dSLoGin /// 10873779f3dSLoGin /// ## 返回值 10973779f3dSLoGin /// 11073779f3dSLoGin /// 所有入度为0的子节点集合 11173779f3dSLoGin pub fn sub_children_indegree(&self) -> Vec<Arc<SchedEntity>> { 11273779f3dSLoGin let mut zero_child = Vec::new(); 11373779f3dSLoGin let children = &self.inner.lock().unwrap().children; 11473779f3dSLoGin for child in children.iter() { 11573779f3dSLoGin if child.sub_indegree() == 0 { 11673779f3dSLoGin zero_child.push(child.clone()); 11773779f3dSLoGin } 11873779f3dSLoGin } 11973779f3dSLoGin return zero_child; 12073779f3dSLoGin } 12173779f3dSLoGin } 12273779f3dSLoGin 12373779f3dSLoGin /// # 调度实体列表 12473779f3dSLoGin /// 12573779f3dSLoGin /// 用于存储所有的调度实体 12673779f3dSLoGin #[derive(Debug)] 12773779f3dSLoGin pub struct SchedEntities { 12873779f3dSLoGin /// 任务ID到调度实体的映射 12973779f3dSLoGin id2entity: RwLock<BTreeMap<i32, Arc<SchedEntity>>>, 13073779f3dSLoGin } 13173779f3dSLoGin 13273779f3dSLoGin impl SchedEntities { 13373779f3dSLoGin pub fn new() -> Self { 13473779f3dSLoGin Self { 13573779f3dSLoGin id2entity: RwLock::new(BTreeMap::new()), 13673779f3dSLoGin } 13773779f3dSLoGin } 13873779f3dSLoGin 13973779f3dSLoGin pub fn add(&mut self, entity: Arc<SchedEntity>) { 14073779f3dSLoGin self.id2entity 14173779f3dSLoGin .write() 14273779f3dSLoGin .unwrap() 14373779f3dSLoGin .insert(entity.id(), entity.clone()); 14473779f3dSLoGin } 14573779f3dSLoGin 14673779f3dSLoGin #[allow(dead_code)] 14773779f3dSLoGin pub fn get(&self, id: i32) -> Option<Arc<SchedEntity>> { 14873779f3dSLoGin self.id2entity.read().unwrap().get(&id).cloned() 14973779f3dSLoGin } 15073779f3dSLoGin 15173779f3dSLoGin pub fn get_by_name_version(&self, name: &str, version: &str) -> Option<Arc<SchedEntity>> { 15273779f3dSLoGin for e in self.id2entity.read().unwrap().iter() { 15373779f3dSLoGin if e.1.task().name_version_env() == DADKTask::name_version_uppercase(name, version) { 15473779f3dSLoGin return Some(e.1.clone()); 15573779f3dSLoGin } 15673779f3dSLoGin } 15773779f3dSLoGin return None; 15873779f3dSLoGin } 15973779f3dSLoGin 16073779f3dSLoGin pub fn entities(&self) -> Vec<Arc<SchedEntity>> { 16173779f3dSLoGin let mut v = Vec::new(); 16273779f3dSLoGin for e in self.id2entity.read().unwrap().iter() { 16373779f3dSLoGin v.push(e.1.clone()); 16473779f3dSLoGin } 16573779f3dSLoGin return v; 16673779f3dSLoGin } 16773779f3dSLoGin 16873779f3dSLoGin pub fn id2entity(&self) -> BTreeMap<i32, Arc<SchedEntity>> { 16973779f3dSLoGin self.id2entity.read().unwrap().clone() 17073779f3dSLoGin } 17173779f3dSLoGin 17273779f3dSLoGin #[allow(dead_code)] 17373779f3dSLoGin pub fn len(&self) -> usize { 17473779f3dSLoGin self.id2entity.read().unwrap().len() 17573779f3dSLoGin } 17673779f3dSLoGin 17773779f3dSLoGin #[allow(dead_code)] 17873779f3dSLoGin pub fn is_empty(&self) -> bool { 17973779f3dSLoGin self.id2entity.read().unwrap().is_empty() 18073779f3dSLoGin } 18173779f3dSLoGin 18273779f3dSLoGin #[allow(dead_code)] 18373779f3dSLoGin pub fn clear(&mut self) { 18473779f3dSLoGin self.id2entity.write().unwrap().clear(); 18573779f3dSLoGin } 18673779f3dSLoGin 18773779f3dSLoGin pub fn topo_sort(&self) -> Vec<Arc<SchedEntity>> { 18873779f3dSLoGin let mut result = Vec::new(); 18973779f3dSLoGin let mut visited = BTreeMap::new(); 19073779f3dSLoGin let btree = self.id2entity.write().unwrap().clone(); 19173779f3dSLoGin for entity in btree.iter() { 19273779f3dSLoGin if !visited.contains_key(entity.0) { 19373779f3dSLoGin let r = self.dfs(entity.1, &mut visited, &mut result); 19473779f3dSLoGin if r.is_err() { 19573779f3dSLoGin let err = r.unwrap_err(); 19673779f3dSLoGin error!("{}", err.display()); 19773779f3dSLoGin println!("Please fix the errors above and try again."); 19873779f3dSLoGin std::process::exit(1); 19973779f3dSLoGin } 20073779f3dSLoGin } 20173779f3dSLoGin } 20273779f3dSLoGin return result; 20373779f3dSLoGin } 20473779f3dSLoGin 20573779f3dSLoGin fn dfs( 20673779f3dSLoGin &self, 20773779f3dSLoGin entity: &Arc<SchedEntity>, 20873779f3dSLoGin visited: &mut BTreeMap<i32, bool>, 20973779f3dSLoGin result: &mut Vec<Arc<SchedEntity>>, 21073779f3dSLoGin ) -> Result<(), DependencyCycleError> { 21173779f3dSLoGin visited.insert(entity.id(), false); 21273779f3dSLoGin for dep in entity.task().depends.iter() { 21373779f3dSLoGin if let Some(dep_entity) = self.get_by_name_version(&dep.name, &dep.version) { 21473779f3dSLoGin let guard = self.id2entity.write().unwrap(); 21573779f3dSLoGin let e = guard.get(&entity.id()).unwrap(); 21673779f3dSLoGin let d = guard.get(&dep_entity.id()).unwrap(); 21773779f3dSLoGin e.add_indegree(); 21873779f3dSLoGin d.add_child(e.clone()); 21973779f3dSLoGin if let Some(&false) = visited.get(&dep_entity.id()) { 22073779f3dSLoGin // 输出完整环形依赖 22173779f3dSLoGin let mut err = DependencyCycleError::new(dep_entity.clone()); 22273779f3dSLoGin 22373779f3dSLoGin err.add(entity.clone(), dep_entity); 22473779f3dSLoGin return Err(err); 22573779f3dSLoGin } 22673779f3dSLoGin if !visited.contains_key(&dep_entity.id()) { 22773779f3dSLoGin drop(guard); 22873779f3dSLoGin let r = self.dfs(&dep_entity, visited, result); 22973779f3dSLoGin if r.is_err() { 23073779f3dSLoGin let mut err: DependencyCycleError = r.unwrap_err(); 23173779f3dSLoGin // 如果错误已经停止传播,则直接返回 23273779f3dSLoGin if err.stop_propagation { 23373779f3dSLoGin return Err(err); 23473779f3dSLoGin } 23573779f3dSLoGin // 如果当前实体是错误的起始实体,则停止传播 23673779f3dSLoGin if entity == &err.head_entity { 23773779f3dSLoGin err.stop_propagation(); 23873779f3dSLoGin } 23973779f3dSLoGin err.add(entity.clone(), dep_entity); 24073779f3dSLoGin return Err(err); 24173779f3dSLoGin } 24273779f3dSLoGin } 24373779f3dSLoGin } else { 24473779f3dSLoGin error!( 24573779f3dSLoGin "Dependency not found: {} -> {}", 24673779f3dSLoGin entity.task().name_version(), 24773779f3dSLoGin dep.name_version() 24873779f3dSLoGin ); 24973779f3dSLoGin std::process::exit(1); 25073779f3dSLoGin } 25173779f3dSLoGin } 25273779f3dSLoGin visited.insert(entity.id(), true); 25373779f3dSLoGin result.push(entity.clone()); 25473779f3dSLoGin return Ok(()); 25573779f3dSLoGin } 25673779f3dSLoGin } 25773779f3dSLoGin 25873779f3dSLoGin /// # 任务调度器 25973779f3dSLoGin #[derive(Debug)] 26073779f3dSLoGin pub struct Scheduler { 26173779f3dSLoGin /// DragonOS sysroot在主机上的路径 26270352fd6SLoGin sysroot_dir: PathBuf, 26373779f3dSLoGin /// 要执行的操作 26473779f3dSLoGin action: Action, 26573779f3dSLoGin /// 调度实体列表 26673779f3dSLoGin target: SchedEntities, 26773779f3dSLoGin /// dadk执行的上下文 26873779f3dSLoGin context: Arc<DadkUserExecuteContext>, 26973779f3dSLoGin } 27073779f3dSLoGin 27173779f3dSLoGin pub enum SchedulerError { 27273779f3dSLoGin TaskError(String), 27373779f3dSLoGin /// 不是当前正在编译的目标架构 27473779f3dSLoGin InvalidTargetArch(String), 27573779f3dSLoGin DependencyNotFound(Arc<SchedEntity>, String), 27673779f3dSLoGin RunError(String), 27773779f3dSLoGin } 27873779f3dSLoGin 27973779f3dSLoGin impl Debug for SchedulerError { 28073779f3dSLoGin fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 28173779f3dSLoGin match self { 28273779f3dSLoGin Self::TaskError(arg0) => { 28373779f3dSLoGin write!(f, "TaskError: {}", arg0) 28473779f3dSLoGin } 28573779f3dSLoGin SchedulerError::DependencyNotFound(current, msg) => { 28673779f3dSLoGin write!( 28773779f3dSLoGin f, 28873779f3dSLoGin "For task {}, dependency not found: {}. Please check file: {}", 28973779f3dSLoGin current.task().name_version(), 29073779f3dSLoGin msg, 29173779f3dSLoGin current.file_path().display() 29273779f3dSLoGin ) 29373779f3dSLoGin } 29473779f3dSLoGin SchedulerError::RunError(msg) => { 29573779f3dSLoGin write!(f, "RunError: {}", msg) 29673779f3dSLoGin } 29773779f3dSLoGin SchedulerError::InvalidTargetArch(msg) => { 29873779f3dSLoGin write!(f, "InvalidTargetArch: {}", msg) 29973779f3dSLoGin } 30073779f3dSLoGin } 30173779f3dSLoGin } 30273779f3dSLoGin } 30373779f3dSLoGin 30473779f3dSLoGin impl Scheduler { 30573779f3dSLoGin pub fn new( 30673779f3dSLoGin context: Arc<DadkUserExecuteContext>, 30773779f3dSLoGin dragonos_dir: PathBuf, 30873779f3dSLoGin action: Action, 30973779f3dSLoGin tasks: Vec<(PathBuf, DADKTask)>, 31073779f3dSLoGin ) -> Result<Self, SchedulerError> { 31173779f3dSLoGin let entities = SchedEntities::new(); 31273779f3dSLoGin 31373779f3dSLoGin let mut scheduler = Scheduler { 31470352fd6SLoGin sysroot_dir: dragonos_dir, 31573779f3dSLoGin action, 31673779f3dSLoGin target: entities, 31773779f3dSLoGin context, 31873779f3dSLoGin }; 31973779f3dSLoGin 32073779f3dSLoGin let r = scheduler.add_tasks(tasks); 32173779f3dSLoGin if r.is_err() { 32273779f3dSLoGin error!("Error while adding tasks: {:?}", r); 32373779f3dSLoGin return Err(r.err().unwrap()); 32473779f3dSLoGin } 32573779f3dSLoGin 32673779f3dSLoGin return Ok(scheduler); 32773779f3dSLoGin } 32873779f3dSLoGin 32973779f3dSLoGin /// # 添加多个任务 33073779f3dSLoGin /// 33173779f3dSLoGin /// 添加任务到调度器中,如果任务已经存在,则返回错误 33273779f3dSLoGin pub fn add_tasks(&mut self, tasks: Vec<(PathBuf, DADKTask)>) -> Result<(), SchedulerError> { 33373779f3dSLoGin for task in tasks { 33473779f3dSLoGin let e = self.add_task(task.0, task.1); 33573779f3dSLoGin if e.is_err() { 33673779f3dSLoGin if let Err(SchedulerError::InvalidTargetArch(_)) = &e { 33773779f3dSLoGin continue; 33873779f3dSLoGin } 33973779f3dSLoGin e?; 34073779f3dSLoGin } 34173779f3dSLoGin } 34273779f3dSLoGin 34373779f3dSLoGin return Ok(()); 34473779f3dSLoGin } 34573779f3dSLoGin 34673779f3dSLoGin /// # 任务是否匹配当前目标架构 34773779f3dSLoGin pub fn task_arch_matched(&self, task: &DADKTask) -> bool { 34873779f3dSLoGin task.target_arch.contains(self.context.target_arch()) 34973779f3dSLoGin } 35073779f3dSLoGin 35173779f3dSLoGin /// # 添加一个任务 35273779f3dSLoGin /// 35373779f3dSLoGin /// 添加任务到调度器中,如果任务已经存在,则返回错误 35473779f3dSLoGin pub fn add_task( 35573779f3dSLoGin &mut self, 35673779f3dSLoGin path: PathBuf, 35773779f3dSLoGin task: DADKTask, 35873779f3dSLoGin ) -> Result<Arc<SchedEntity>, SchedulerError> { 35973779f3dSLoGin if !self.task_arch_matched(&task) { 36073779f3dSLoGin return Err(SchedulerError::InvalidTargetArch(format!( 36173779f3dSLoGin "Task {} is not for target arch: {:?}", 36273779f3dSLoGin task.name_version(), 36373779f3dSLoGin self.context.target_arch() 36473779f3dSLoGin ))); 36573779f3dSLoGin } 36673779f3dSLoGin 36773779f3dSLoGin let id: i32 = self.generate_task_id(); 36873779f3dSLoGin let indegree: usize = 0; 36973779f3dSLoGin let children = Vec::new(); 37073779f3dSLoGin let target = self.generate_task_target(&path, &task.rust_target)?; 37173779f3dSLoGin let entity = Arc::new(SchedEntity { 37273779f3dSLoGin inner: Mutex::new(InnerEntity { 37373779f3dSLoGin id, 37473779f3dSLoGin task, 37573779f3dSLoGin file_path: path.clone(), 37673779f3dSLoGin indegree, 37773779f3dSLoGin children, 37873779f3dSLoGin target, 37973779f3dSLoGin }), 38073779f3dSLoGin }); 38173779f3dSLoGin let name_version = (entity.task().name.clone(), entity.task().version.clone()); 38273779f3dSLoGin 38373779f3dSLoGin if self 38473779f3dSLoGin .target 38573779f3dSLoGin .get_by_name_version(&name_version.0, &name_version.1) 38673779f3dSLoGin .is_some() 38773779f3dSLoGin { 38873779f3dSLoGin return Err(SchedulerError::TaskError(format!( 38973779f3dSLoGin "Task with name [{}] and version [{}] already exists. Config file: {}", 39073779f3dSLoGin name_version.0, 39173779f3dSLoGin name_version.1, 39273779f3dSLoGin path.display() 39373779f3dSLoGin ))); 39473779f3dSLoGin } 39573779f3dSLoGin 39673779f3dSLoGin self.target.add(entity.clone()); 39773779f3dSLoGin 39873779f3dSLoGin info!("Task added: {}", entity.task().name_version()); 39973779f3dSLoGin return Ok(entity); 40073779f3dSLoGin } 40173779f3dSLoGin 40273779f3dSLoGin fn generate_task_id(&self) -> i32 { 40373779f3dSLoGin static TASK_ID: AtomicI32 = AtomicI32::new(0); 40473779f3dSLoGin return TASK_ID.fetch_add(1, Ordering::SeqCst); 40573779f3dSLoGin } 40673779f3dSLoGin 40773779f3dSLoGin fn generate_task_target( 40873779f3dSLoGin &self, 40973779f3dSLoGin path: &PathBuf, 41073779f3dSLoGin rust_target: &Option<String>, 41173779f3dSLoGin ) -> Result<Option<Target>, SchedulerError> { 41273779f3dSLoGin if let Some(rust_target) = rust_target { 41373779f3dSLoGin // 如果rust_target字段不为none,说明需要target管理 41473779f3dSLoGin // 获取dadk任务路径,用于生成临时dadk文件名 41573779f3dSLoGin let file_str = path.as_path().to_str().unwrap(); 41673779f3dSLoGin let tmp_dadk_path = Target::tmp_dadk(file_str); 41773779f3dSLoGin let tmp_dadk_str = tmp_dadk_path.as_path().to_str().unwrap(); 41873779f3dSLoGin 41973779f3dSLoGin if Target::is_user_target(rust_target) { 42073779f3dSLoGin // 如果target文件是用户自己的 42173779f3dSLoGin if let Ok(target_path) = Target::user_target_path(rust_target) { 42273779f3dSLoGin let target_path_str = target_path.as_path().to_str().unwrap(); 42373779f3dSLoGin let index = target_path_str.rfind('/').unwrap(); 42473779f3dSLoGin let target_name = target_path_str[index + 1..].to_string(); 42573779f3dSLoGin let tmp_target = PathBuf::from(format!("{}{}", tmp_dadk_str, target_name)); 42673779f3dSLoGin return Ok(Some(Target::new(tmp_target))); 42773779f3dSLoGin } else { 42873779f3dSLoGin return Err(SchedulerError::TaskError( 42973779f3dSLoGin "The path of target file is invalid.".to_string(), 43073779f3dSLoGin )); 43173779f3dSLoGin } 43273779f3dSLoGin } else { 43373779f3dSLoGin // 如果target文件是内置的 43473779f3dSLoGin let tmp_target = PathBuf::from(format!("{}{}.json", tmp_dadk_str, rust_target)); 43573779f3dSLoGin return Ok(Some(Target::new(tmp_target))); 43673779f3dSLoGin } 43773779f3dSLoGin } 43873779f3dSLoGin return Ok(None); 43973779f3dSLoGin } 44073779f3dSLoGin 44173779f3dSLoGin /// # 执行调度器中的所有任务 44273779f3dSLoGin pub fn run(&self) -> Result<(), SchedulerError> { 44373779f3dSLoGin // 准备全局环境变量 44473779f3dSLoGin crate::executor::prepare_env(&self.target, &self.context) 44573779f3dSLoGin .map_err(|e| SchedulerError::RunError(format!("{:?}", e)))?; 44673779f3dSLoGin 44773779f3dSLoGin match self.action { 44873779f3dSLoGin Action::Build | Action::Install => { 44973779f3dSLoGin self.run_with_topo_sort()?; 45073779f3dSLoGin } 45173779f3dSLoGin Action::Clean(_) => self.run_without_topo_sort()?, 45273779f3dSLoGin } 45373779f3dSLoGin 45473779f3dSLoGin return Ok(()); 45573779f3dSLoGin } 45673779f3dSLoGin 45773779f3dSLoGin /// Action需要按照拓扑序执行 45873779f3dSLoGin /// 45973779f3dSLoGin /// Action::Build | Action::Install 46073779f3dSLoGin fn run_with_topo_sort(&self) -> Result<(), SchedulerError> { 46173779f3dSLoGin // 检查是否有不存在的依赖 46273779f3dSLoGin let r = self.check_not_exists_dependency(); 46373779f3dSLoGin if r.is_err() { 46473779f3dSLoGin error!("Error while checking tasks: {:?}", r); 46573779f3dSLoGin return r; 46673779f3dSLoGin } 46773779f3dSLoGin 46873779f3dSLoGin // 对调度实体进行拓扑排序 46973779f3dSLoGin let r: Vec<Arc<SchedEntity>> = self.target.topo_sort(); 47073779f3dSLoGin 47173779f3dSLoGin let action = self.action.clone(); 47270352fd6SLoGin let dragonos_dir = self.sysroot_dir.clone(); 47373779f3dSLoGin let id2entity = self.target.id2entity(); 47473779f3dSLoGin let count = r.len(); 47573779f3dSLoGin 47673779f3dSLoGin // 启动守护线程 47773779f3dSLoGin let handler = std::thread::spawn(move || { 47873779f3dSLoGin Self::build_install_daemon(action, dragonos_dir, id2entity, count, &r) 47973779f3dSLoGin }); 48073779f3dSLoGin 48173779f3dSLoGin handler.join().expect("Could not join deamon"); 48273779f3dSLoGin 48373779f3dSLoGin return Ok(()); 48473779f3dSLoGin } 48573779f3dSLoGin 48673779f3dSLoGin /// Action不需要按照拓扑序执行 48773779f3dSLoGin fn run_without_topo_sort(&self) -> Result<(), SchedulerError> { 48873779f3dSLoGin // 启动守护线程 48973779f3dSLoGin let action = self.action.clone(); 49070352fd6SLoGin let dragonos_dir = self.sysroot_dir.clone(); 49173779f3dSLoGin let mut r = self.target.entities(); 49273779f3dSLoGin let handler = std::thread::spawn(move || { 49373779f3dSLoGin Self::clean_daemon(action, dragonos_dir, &mut r); 49473779f3dSLoGin }); 49573779f3dSLoGin 49673779f3dSLoGin handler.join().expect("Could not join deamon"); 49773779f3dSLoGin return Ok(()); 49873779f3dSLoGin } 49973779f3dSLoGin 50073779f3dSLoGin pub fn execute(action: Action, dragonos_dir: PathBuf, entity: Arc<SchedEntity>) { 50173779f3dSLoGin let mut executor = Executor::new(entity.clone(), action.clone(), dragonos_dir.clone()) 50273779f3dSLoGin .map_err(|e| { 50373779f3dSLoGin error!( 50473779f3dSLoGin "Error while creating executor for task {} : {:?}", 50573779f3dSLoGin entity.task().name_version(), 50673779f3dSLoGin e 50773779f3dSLoGin ); 50873779f3dSLoGin exit(-1); 50973779f3dSLoGin }) 51073779f3dSLoGin .unwrap(); 51173779f3dSLoGin 51273779f3dSLoGin executor 51373779f3dSLoGin .execute() 51473779f3dSLoGin .map_err(|e| { 51573779f3dSLoGin error!( 51673779f3dSLoGin "Error while executing task {} : {:?}", 51773779f3dSLoGin entity.task().name_version(), 51873779f3dSLoGin e 51973779f3dSLoGin ); 52073779f3dSLoGin exit(-1); 52173779f3dSLoGin }) 52273779f3dSLoGin .unwrap(); 52373779f3dSLoGin } 52473779f3dSLoGin 52573779f3dSLoGin /// 构建和安装DADK任务的守护线程 52673779f3dSLoGin /// 52773779f3dSLoGin /// ## 参数 52873779f3dSLoGin /// 52973779f3dSLoGin /// - `action` : 要执行的操作 53073779f3dSLoGin /// - `dragonos_dir` : DragonOS sysroot在主机上的路径 53173779f3dSLoGin /// - `id2entity` : DADK任务id与实体映射表 53273779f3dSLoGin /// - `count` : 当前剩余任务数 53373779f3dSLoGin /// - `r` : 总任务实体表 53473779f3dSLoGin /// 53573779f3dSLoGin /// ## 返回值 53673779f3dSLoGin /// 53773779f3dSLoGin /// 无 53873779f3dSLoGin pub fn build_install_daemon( 53973779f3dSLoGin action: Action, 54073779f3dSLoGin dragonos_dir: PathBuf, 54173779f3dSLoGin id2entity: BTreeMap<i32, Arc<SchedEntity>>, 54273779f3dSLoGin mut count: usize, 54373779f3dSLoGin r: &Vec<Arc<SchedEntity>>, 54473779f3dSLoGin ) { 54573779f3dSLoGin let mut guard = TASK_DEQUE.lock().unwrap(); 54673779f3dSLoGin // 初始化0入度的任务实体 54773779f3dSLoGin let mut zero_entity: Vec<Arc<SchedEntity>> = Vec::new(); 54873779f3dSLoGin for e in r.iter() { 54973779f3dSLoGin if e.indegree() == 0 { 55073779f3dSLoGin zero_entity.push(e.clone()); 55173779f3dSLoGin } 55273779f3dSLoGin } 55373779f3dSLoGin 55473779f3dSLoGin while count > 0 { 55573779f3dSLoGin // 将入度为0的任务实体加入任务队列中,直至没有入度为0的任务实体 或 任务队列满了 55673779f3dSLoGin while !zero_entity.is_empty() 55773779f3dSLoGin && guard.build_install_task( 55873779f3dSLoGin action.clone(), 55973779f3dSLoGin dragonos_dir.clone(), 56073779f3dSLoGin zero_entity.last().unwrap().clone(), 56173779f3dSLoGin ) 56273779f3dSLoGin { 56373779f3dSLoGin zero_entity.pop(); 56473779f3dSLoGin } 56573779f3dSLoGin 56673779f3dSLoGin let queue = guard.queue_mut(); 56773779f3dSLoGin // 如果任务线程已完成,将其从任务队列中删除,并把它的子节点入度减1,如果有0入度子节点,则加入zero_entity,后续可以加入任务队列中 56873779f3dSLoGin queue.retain(|x| { 56973779f3dSLoGin if x.is_finished() { 57073779f3dSLoGin count -= 1; 57173779f3dSLoGin let tid = x.thread().id(); 57273779f3dSLoGin let eid = *TID_EID.lock().unwrap().get(&tid).unwrap(); 57373779f3dSLoGin let entity = id2entity.get(&eid).unwrap(); 57473779f3dSLoGin let zero = entity.sub_children_indegree(); 57573779f3dSLoGin for e in zero.iter() { 57673779f3dSLoGin zero_entity.push(e.clone()); 57773779f3dSLoGin } 57873779f3dSLoGin return false; 57973779f3dSLoGin } 58073779f3dSLoGin return true; 58173779f3dSLoGin }) 58273779f3dSLoGin } 58373779f3dSLoGin } 58473779f3dSLoGin 58573779f3dSLoGin /// 清理DADK任务的守护线程 58673779f3dSLoGin /// 58773779f3dSLoGin /// ## 参数 58873779f3dSLoGin /// 58973779f3dSLoGin /// - `action` : 要执行的操作 59073779f3dSLoGin /// - `dragonos_dir` : DragonOS sysroot在主机上的路径 59173779f3dSLoGin /// - `r` : 总任务实体表 59273779f3dSLoGin /// 59373779f3dSLoGin /// ## 返回值 59473779f3dSLoGin /// 59573779f3dSLoGin /// 无 59673779f3dSLoGin pub fn clean_daemon(action: Action, dragonos_dir: PathBuf, r: &mut Vec<Arc<SchedEntity>>) { 59773779f3dSLoGin let mut guard = TASK_DEQUE.lock().unwrap(); 59873779f3dSLoGin while !guard.queue().is_empty() && !r.is_empty() { 59973779f3dSLoGin guard.clean_task(action, dragonos_dir.clone(), r.pop().unwrap().clone()); 60073779f3dSLoGin } 60173779f3dSLoGin } 60273779f3dSLoGin 60373779f3dSLoGin /// # 检查是否有不存在的依赖 60473779f3dSLoGin /// 60573779f3dSLoGin /// 如果某个任务的dependency中的任务不存在,则返回错误 60673779f3dSLoGin fn check_not_exists_dependency(&self) -> Result<(), SchedulerError> { 60773779f3dSLoGin for entity in self.target.entities().iter() { 60873779f3dSLoGin for dependency in entity.task().depends.iter() { 60973779f3dSLoGin let name_version = (dependency.name.clone(), dependency.version.clone()); 61073779f3dSLoGin if !self 61173779f3dSLoGin .target 61273779f3dSLoGin .get_by_name_version(&name_version.0, &name_version.1) 61373779f3dSLoGin .is_some() 61473779f3dSLoGin { 61573779f3dSLoGin return Err(SchedulerError::DependencyNotFound( 61673779f3dSLoGin entity.clone(), 61773779f3dSLoGin format!("name:{}, version:{}", name_version.0, name_version.1,), 61873779f3dSLoGin )); 61973779f3dSLoGin } 62073779f3dSLoGin } 62173779f3dSLoGin } 62273779f3dSLoGin 62373779f3dSLoGin return Ok(()); 62473779f3dSLoGin } 62573779f3dSLoGin } 62673779f3dSLoGin 62773779f3dSLoGin /// # 环形依赖错误路径 62873779f3dSLoGin /// 62973779f3dSLoGin /// 本结构体用于在回溯过程中记录环形依赖的路径。 63073779f3dSLoGin /// 63173779f3dSLoGin /// 例如,假设有如下依赖关系: 63273779f3dSLoGin /// 63373779f3dSLoGin /// ```text 63473779f3dSLoGin /// A -> B -> C -> D -> A 63573779f3dSLoGin /// ``` 63673779f3dSLoGin /// 63773779f3dSLoGin /// 则在DFS回溯过程中,会依次记录如下路径: 63873779f3dSLoGin /// 63973779f3dSLoGin /// ```text 64073779f3dSLoGin /// D -> A 64173779f3dSLoGin /// C -> D 64273779f3dSLoGin /// B -> C 64373779f3dSLoGin /// A -> B 64473779f3dSLoGin pub struct DependencyCycleError { 64573779f3dSLoGin /// # 起始实体 64673779f3dSLoGin /// 本错误的起始实体,即环形依赖的起点 64773779f3dSLoGin head_entity: Arc<SchedEntity>, 64873779f3dSLoGin /// 是否停止传播 64973779f3dSLoGin stop_propagation: bool, 65073779f3dSLoGin /// 依赖关系 65173779f3dSLoGin dependencies: Vec<(Arc<SchedEntity>, Arc<SchedEntity>)>, 65273779f3dSLoGin } 65373779f3dSLoGin 65473779f3dSLoGin impl DependencyCycleError { 65573779f3dSLoGin pub fn new(head_entity: Arc<SchedEntity>) -> Self { 65673779f3dSLoGin Self { 65773779f3dSLoGin head_entity, 65873779f3dSLoGin stop_propagation: false, 65973779f3dSLoGin dependencies: Vec::new(), 66073779f3dSLoGin } 66173779f3dSLoGin } 66273779f3dSLoGin 66373779f3dSLoGin pub fn add(&mut self, current: Arc<SchedEntity>, dependency: Arc<SchedEntity>) { 66473779f3dSLoGin self.dependencies.push((current, dependency)); 66573779f3dSLoGin } 66673779f3dSLoGin 66773779f3dSLoGin pub fn stop_propagation(&mut self) { 66873779f3dSLoGin self.stop_propagation = true; 66973779f3dSLoGin } 67073779f3dSLoGin 67173779f3dSLoGin #[allow(dead_code)] 67273779f3dSLoGin pub fn dependencies(&self) -> &Vec<(Arc<SchedEntity>, Arc<SchedEntity>)> { 67373779f3dSLoGin &self.dependencies 67473779f3dSLoGin } 67573779f3dSLoGin 67673779f3dSLoGin pub fn display(&self) -> String { 67773779f3dSLoGin let mut tmp = self.dependencies.clone(); 67873779f3dSLoGin tmp.reverse(); 67973779f3dSLoGin 68073779f3dSLoGin let mut ret = format!("Dependency cycle detected: \nStart ->\n"); 68173779f3dSLoGin for (current, dep) in tmp.iter() { 68273779f3dSLoGin ret.push_str(&format!( 68373779f3dSLoGin "->\t{} ({})\t--depends-->\t{} ({})\n", 68473779f3dSLoGin current.task().name_version(), 68573779f3dSLoGin current.file_path().display(), 68673779f3dSLoGin dep.task().name_version(), 68773779f3dSLoGin dep.file_path().display() 68873779f3dSLoGin )); 68973779f3dSLoGin } 69073779f3dSLoGin ret.push_str("-> End"); 69173779f3dSLoGin return ret; 69273779f3dSLoGin } 69373779f3dSLoGin } 694