1 use std::{ 2 collections::BTreeMap, 3 env::Vars, 4 path::PathBuf, 5 process::{Command, Stdio}, 6 sync::{Arc, RwLock}, 7 }; 8 9 use log::{debug, error, info, warn}; 10 11 use crate::{ 12 console::{clean::CleanLevel, Action}, 13 context::DadkUserExecuteContext, 14 executor::cache::CacheDir, 15 parser::{ 16 task::{CodeSource, PrebuiltSource, TaskEnv, TaskType}, 17 task_log::{BuildStatus, InstallStatus, TaskLog}, 18 }, 19 scheduler::{SchedEntities, SchedEntity}, 20 utils::file::FileUtils, 21 }; 22 23 use self::cache::{CacheDirType, TaskDataDir}; 24 25 pub mod cache; 26 pub mod source; 27 pub mod target; 28 #[cfg(test)] 29 mod tests; 30 31 lazy_static! { 32 // 全局环境变量的列表 33 pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new()); 34 } 35 36 #[derive(Debug, Clone)] 37 pub struct Executor { 38 entity: Arc<SchedEntity>, 39 action: Action, 40 local_envs: EnvMap, 41 /// 任务构建结果输出到的目录 42 build_dir: CacheDir, 43 /// 如果任务需要源文件缓存,则此字段为 Some(CacheDir),否则为 None(使用本地源文件路径) 44 source_dir: Option<CacheDir>, 45 /// 任务数据目录 46 task_data_dir: TaskDataDir, 47 /// DragonOS sysroot的路径 48 dragonos_sysroot: PathBuf, 49 } 50 51 impl Executor { 52 /// # 创建执行器 53 /// 54 /// 用于执行一个任务 55 /// 56 /// ## 参数 57 /// 58 /// * `entity` - 任务调度实体 59 /// 60 /// ## 返回值 61 /// 62 /// * `Ok(Executor)` - 创建成功 63 /// * `Err(ExecutorError)` - 创建失败 64 pub fn new( 65 entity: Arc<SchedEntity>, 66 action: Action, 67 dragonos_sysroot: PathBuf, 68 ) -> Result<Self, ExecutorError> { 69 let local_envs = EnvMap::new(); 70 let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?; 71 let task_data_dir = TaskDataDir::new(entity.clone())?; 72 73 let source_dir = if CacheDir::need_source_cache(&entity) { 74 Some(CacheDir::new(entity.clone(), CacheDirType::Source)?) 75 } else { 76 None 77 }; 78 79 let result: Executor = Self { 80 action, 81 entity, 82 local_envs, 83 build_dir, 84 source_dir, 85 task_data_dir, 86 dragonos_sysroot, 87 }; 88 89 return Ok(result); 90 } 91 92 /// # 执行任务 93 /// 94 /// 创建执行器后,调用此方法执行任务。 95 /// 该方法会执行以下步骤: 96 /// 97 /// 1. 创建工作线程 98 /// 2. 准备环境变量 99 /// 3. 拉取数据(可选) 100 /// 4. 执行构建 101 pub fn execute(&mut self) -> Result<(), ExecutorError> { 102 info!("Execute task: {}", self.entity.task().name_version()); 103 104 let r = self.do_execute(); 105 self.save_task_data(r.clone()); 106 info!("Task {} finished", self.entity.task().name_version()); 107 return r; 108 } 109 110 /// # 保存任务数据 111 fn save_task_data(&self, r: Result<(), ExecutorError>) { 112 let mut task_log = self.task_data_dir.task_log(); 113 match self.action { 114 Action::Build => { 115 if r.is_ok() { 116 task_log.set_build_status(BuildStatus::Success); 117 } else { 118 task_log.set_build_status(BuildStatus::Failed); 119 } 120 121 task_log.set_build_time_now(); 122 } 123 124 Action::Install => { 125 if r.is_ok() { 126 task_log.set_install_status(InstallStatus::Success); 127 } else { 128 task_log.set_install_status(InstallStatus::Failed); 129 } 130 } 131 132 Action::Clean(_) => { 133 task_log.clean_build_status(); 134 task_log.clean_install_status(); 135 } 136 137 _ => {} 138 } 139 140 self.task_data_dir 141 .save_task_log(&task_log) 142 .expect("Failed to save task log"); 143 } 144 145 fn do_execute(&mut self) -> Result<(), ExecutorError> { 146 // 准备本地环境变量 147 self.prepare_local_env()?; 148 149 match self.action { 150 Action::Build => { 151 // 构建任务 152 self.build()?; 153 } 154 Action::Install => { 155 // 把构建结果安装到DragonOS 156 self.install()?; 157 } 158 Action::Clean(_) => { 159 // 清理构建结果 160 let r = self.clean(); 161 if let Err(e) = r { 162 error!( 163 "Failed to clean task {}: {:?}", 164 self.entity.task().name_version(), 165 e 166 ); 167 } 168 } 169 _ => { 170 error!("Unsupported action: {:?}", self.action); 171 } 172 } 173 174 return Ok(()); 175 } 176 177 /// # 执行build操作 178 fn build(&mut self) -> Result<(), ExecutorError> { 179 if let Some(status) = self.task_log().build_status() { 180 if *status == BuildStatus::Success && self.entity.task().build_once { 181 info!( 182 "Task {} has been built successfully, skip build.", 183 self.entity.task().name_version() 184 ); 185 return Ok(()); 186 } 187 } 188 189 self.mv_target_to_tmp()?; 190 191 // 确认源文件就绪 192 self.prepare_input()?; 193 194 let command: Option<Command> = self.create_command()?; 195 if let Some(cmd) = command { 196 self.run_command(cmd)?; 197 } 198 199 // 检查构建结果,如果为空,则抛出警告 200 if self.build_dir.is_empty()? { 201 warn!( 202 "Task {}: build result is empty, do you forget to copy the result to [$DADK_CURRENT_BUILD_DIR]?", 203 self.entity.task().name_version(), 204 ); 205 } 206 return Ok(()); 207 } 208 209 /// # 执行安装操作,把构建结果安装到DragonOS 210 fn install(&self) -> Result<(), ExecutorError> { 211 if let Some(status) = self.task_log().install_status() { 212 if *status == InstallStatus::Success && self.entity.task().install_once { 213 info!( 214 "Task {} has been installed successfully, skip install.", 215 self.entity.task().name_version() 216 ); 217 return Ok(()); 218 } 219 } 220 221 let binding = self.entity.task(); 222 let in_dragonos_path = binding.install.in_dragonos_path.as_ref(); 223 // 如果没有指定安装路径,则不执行安装 224 if in_dragonos_path.is_none() { 225 return Ok(()); 226 } 227 info!("Installing task: {}", self.entity.task().name_version()); 228 let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string(); 229 230 debug!("in_dragonos_path: {}", in_dragonos_path); 231 // 去除开头的斜杠 232 { 233 let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count(); 234 in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string(); 235 } 236 // 拼接最终的安装路径 237 let install_path = self.dragonos_sysroot.join(in_dragonos_path); 238 debug!("install_path: {:?}", install_path); 239 // 创建安装路径 240 std::fs::create_dir_all(&install_path).map_err(|e| { 241 ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string())) 242 })?; 243 244 // 拷贝构建结果到安装路径 245 let build_dir: PathBuf = self.build_dir.path.clone(); 246 FileUtils::copy_dir_all(&build_dir, &install_path) 247 .map_err(|e| ExecutorError::InstallError(e))?; 248 info!("Task {} installed.", self.entity.task().name_version()); 249 250 // 安装完后,删除临时target文件 251 if let Some(target) = self.entity.target() { 252 target.clean_tmpdadk()?; 253 } 254 255 return Ok(()); 256 } 257 258 fn clean(&self) -> Result<(), ExecutorError> { 259 let level = if let Action::Clean(l) = self.action { 260 l.level 261 } else { 262 panic!( 263 "BUG: clean() called with non-clean action. executor details: {:?}", 264 self 265 ); 266 }; 267 info!( 268 "Cleaning task: {}, level={level}", 269 self.entity.task().name_version() 270 ); 271 272 let r: Result<(), ExecutorError> = match level { 273 CleanLevel::All => self.clean_all(), 274 CleanLevel::Src => self.clean_src(), 275 CleanLevel::Target => self.clean_target(), 276 CleanLevel::Cache => self.clean_cache(), 277 }; 278 279 if let Err(e) = r { 280 error!( 281 "Failed to clean task: {}, error message: {:?}", 282 self.entity.task().name_version(), 283 e 284 ); 285 return Err(e); 286 } 287 288 return Ok(()); 289 } 290 291 fn clean_all(&self) -> Result<(), ExecutorError> { 292 // 在源文件目录执行清理 293 self.clean_src()?; 294 // 清理构建结果 295 self.clean_target()?; 296 // 清理缓存 297 self.clean_cache()?; 298 return Ok(()); 299 } 300 301 /// 在源文件目录执行清理 302 fn clean_src(&self) -> Result<(), ExecutorError> { 303 let cmd: Option<Command> = self.create_command()?; 304 if cmd.is_none() { 305 // 如果这里没有命令,则认为用户不需要在源文件目录执行清理 306 return Ok(()); 307 } 308 info!( 309 "{}: Cleaning in source directory: {:?}", 310 self.entity.task().name_version(), 311 self.src_work_dir() 312 ); 313 314 let cmd = cmd.unwrap(); 315 self.run_command(cmd)?; 316 return Ok(()); 317 } 318 319 /// 清理构建输出目录 320 fn clean_target(&self) -> Result<(), ExecutorError> { 321 info!( 322 "{}: Cleaning build target directory: {:?}", 323 self.entity.task().name_version(), 324 self.build_dir.path 325 ); 326 327 return self.build_dir.remove_self_recursive(); 328 } 329 330 /// 清理下载缓存 331 fn clean_cache(&self) -> Result<(), ExecutorError> { 332 let cache_dir = self.source_dir.as_ref(); 333 if cache_dir.is_none() { 334 // 如果没有缓存目录,则认为用户不需要清理缓存 335 return Ok(()); 336 } 337 info!( 338 "{}: Cleaning cache directory: {}", 339 self.entity.task().name_version(), 340 self.src_work_dir().display() 341 ); 342 return cache_dir.unwrap().remove_self_recursive(); 343 } 344 345 /// 获取源文件的工作目录 346 fn src_work_dir(&self) -> PathBuf { 347 if let Some(local_path) = self.entity.task().source_path() { 348 return local_path; 349 } 350 return self.source_dir.as_ref().unwrap().path.clone(); 351 } 352 353 fn task_log(&self) -> TaskLog { 354 return self.task_data_dir.task_log(); 355 } 356 357 /// 为任务创建命令 358 fn create_command(&self) -> Result<Option<Command>, ExecutorError> { 359 // 获取命令 360 let raw_cmd = match self.entity.task().task_type { 361 TaskType::BuildFromSource(_) => match self.action { 362 Action::Build => self.entity.task().build.build_command.clone(), 363 Action::Clean(_) => self.entity.task().clean.clean_command.clone(), 364 _ => unimplemented!( 365 "create_command: Action {:?} not supported yet.", 366 self.action 367 ), 368 }, 369 370 TaskType::InstallFromPrebuilt(_) => match self.action { 371 Action::Build => self.entity.task().build.build_command.clone(), 372 Action::Clean(_) => self.entity.task().clean.clean_command.clone(), 373 _ => unimplemented!( 374 "create_command: Action {:?} not supported yet.", 375 self.action 376 ), 377 }, 378 }; 379 380 if raw_cmd.is_none() { 381 return Ok(None); 382 } 383 384 let raw_cmd = raw_cmd.unwrap(); 385 386 let mut command = Command::new("bash"); 387 command.current_dir(self.src_work_dir()); 388 389 // 设置参数 390 command.arg("-c"); 391 command.arg(raw_cmd); 392 393 // 设置环境变量 394 let env_list = ENV_LIST.read().unwrap(); 395 for (key, value) in env_list.envs.iter() { 396 // if key.starts_with("DADK") { 397 // debug!("DADK env found: {}={}", key, value.value); 398 // } 399 command.env(key, value.value.clone()); 400 } 401 drop(env_list); 402 for (key, value) in self.local_envs.envs.iter() { 403 debug!("Local env found: {}={}", key, value.value); 404 command.env(key, value.value.clone()); 405 } 406 407 return Ok(Some(command)); 408 } 409 410 /// # 准备工作线程本地环境变量 411 fn prepare_local_env(&mut self) -> Result<(), ExecutorError> { 412 // 设置本地环境变量 413 self.prepare_target_env()?; 414 415 let binding = self.entity.task(); 416 let task_envs: Option<&Vec<TaskEnv>> = binding.envs.as_ref(); 417 418 if let Some(task_envs) = task_envs { 419 for tv in task_envs.iter() { 420 self.local_envs 421 .add(EnvVar::new(tv.key().to_string(), tv.value().to_string())); 422 } 423 } 424 425 // 添加`DADK_CURRENT_BUILD_DIR`环境变量,便于构建脚本把构建结果拷贝到这里 426 self.local_envs.add(EnvVar::new( 427 "DADK_CURRENT_BUILD_DIR".to_string(), 428 self.build_dir.path.to_str().unwrap().to_string(), 429 )); 430 431 return Ok(()); 432 } 433 434 fn prepare_input(&self) -> Result<(), ExecutorError> { 435 // 拉取源文件 436 let task = self.entity.task(); 437 match &task.task_type { 438 TaskType::BuildFromSource(cs) => { 439 if self.source_dir.is_none() { 440 return Ok(()); 441 } 442 let source_dir = self.source_dir.as_ref().unwrap(); 443 match cs { 444 CodeSource::Git(git) => { 445 git.prepare(source_dir) 446 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 447 } 448 // 本地源文件,不需要拉取 449 CodeSource::Local(_) => return Ok(()), 450 // 在线压缩包,需要下载 451 CodeSource::Archive(archive) => { 452 archive 453 .download_unzip(source_dir) 454 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 455 } 456 } 457 } 458 TaskType::InstallFromPrebuilt(pb) => { 459 match pb { 460 // 本地源文件,不需要拉取 461 PrebuiltSource::Local(local_source) => { 462 let local_path = local_source.path(); 463 let target_path = &self.build_dir.path; 464 FileUtils::copy_dir_all(&local_path, &target_path) 465 .map_err(|e| ExecutorError::TaskFailed(e))?; // let mut cmd = "cp -r ".to_string(); 466 return Ok(()); 467 } 468 // 在线压缩包,需要下载 469 PrebuiltSource::Archive(archive) => { 470 archive 471 .download_unzip(&self.build_dir) 472 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 473 } 474 } 475 } 476 } 477 478 return Ok(()); 479 } 480 481 fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> { 482 let mut child = command 483 .stdin(Stdio::inherit()) 484 .spawn() 485 .map_err(|e| ExecutorError::IoError(e.to_string()))?; 486 487 // 等待子进程结束 488 let r = child 489 .wait() 490 .map_err(|e| ExecutorError::IoError(e.to_string())); 491 debug!("Command finished: {:?}", r); 492 if r.is_ok() { 493 let r = r.unwrap(); 494 if r.success() { 495 return Ok(()); 496 } else { 497 // 执行失败,获取最后100行stderr输出 498 let errmsg = format!( 499 "Task {} failed, exit code = {}", 500 self.entity.task().name_version(), 501 r.code().unwrap() 502 ); 503 error!("{errmsg}"); 504 let command_opt = command.output(); 505 if command_opt.is_err() { 506 return Err(ExecutorError::TaskFailed( 507 "Failed to get command output".to_string(), 508 )); 509 } 510 let command_opt = command_opt.unwrap(); 511 let command_output = String::from_utf8_lossy(&command_opt.stderr); 512 let mut last_100_outputs = command_output 513 .lines() 514 .rev() 515 .take(100) 516 .collect::<Vec<&str>>(); 517 last_100_outputs.reverse(); 518 error!("Last 100 lines msg of stderr:"); 519 for line in last_100_outputs { 520 error!("{}", line); 521 } 522 return Err(ExecutorError::TaskFailed(errmsg)); 523 } 524 } else { 525 let errmsg = format!( 526 "Task {} failed, msg = {:?}", 527 self.entity.task().name_version(), 528 r.err().unwrap() 529 ); 530 error!("{errmsg}"); 531 return Err(ExecutorError::TaskFailed(errmsg)); 532 } 533 } 534 535 pub fn mv_target_to_tmp(&mut self) -> Result<(), ExecutorError> { 536 if let Some(rust_target) = self.entity.task().rust_target.clone() { 537 // 将target文件拷贝至 /tmp 下对应的dadk文件的临时target文件中 538 self.entity 539 .target() 540 .as_ref() 541 .unwrap() 542 .cp_to_tmp(&rust_target)?; 543 } 544 return Ok(()); 545 } 546 547 pub fn prepare_target_env(&mut self) -> Result<(), ExecutorError> { 548 if self.entity.task().rust_target.is_some() { 549 // 如果有dadk任务有rust_target字段,需要设置DADK_RUST_TARGET_FILE环境变量,值为临时target文件路径 550 self.entity 551 .target() 552 .as_ref() 553 .unwrap() 554 .prepare_env(&mut self.local_envs); 555 } 556 return Ok(()); 557 } 558 } 559 560 #[derive(Debug, Clone)] 561 pub struct EnvMap { 562 pub envs: BTreeMap<String, EnvVar>, 563 } 564 565 impl EnvMap { 566 pub fn new() -> Self { 567 Self { 568 envs: BTreeMap::new(), 569 } 570 } 571 572 pub fn add(&mut self, env: EnvVar) { 573 self.envs.insert(env.key.clone(), env); 574 } 575 576 #[allow(dead_code)] 577 pub fn get(&self, key: &str) -> Option<&EnvVar> { 578 self.envs.get(key) 579 } 580 581 pub fn add_vars(&mut self, vars: Vars) { 582 for (key, value) in vars { 583 self.add(EnvVar::new(key, value)); 584 } 585 } 586 } 587 588 /// # 环境变量 589 #[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)] 590 pub struct EnvVar { 591 pub key: String, 592 pub value: String, 593 } 594 595 impl EnvVar { 596 pub fn new(key: String, value: String) -> Self { 597 Self { key, value } 598 } 599 } 600 601 /// # 任务执行器错误枚举 602 #[allow(dead_code)] 603 #[derive(Debug, Clone)] 604 pub enum ExecutorError { 605 /// 准备执行环境错误 606 PrepareEnvError(String), 607 IoError(String), 608 /// 构建执行错误 609 TaskFailed(String), 610 /// 安装错误 611 InstallError(String), 612 /// 清理错误 613 CleanError(String), 614 } 615 616 /// # 准备全局环境变量 617 pub fn prepare_env( 618 sched_entities: &SchedEntities, 619 execute_ctx: &Arc<DadkUserExecuteContext>, 620 ) -> Result<(), ExecutorError> { 621 info!("Preparing environment variables..."); 622 let env_list = create_global_env_list(sched_entities, execute_ctx)?; 623 // 写入全局环境变量列表 624 let mut global_env_list = ENV_LIST.write().unwrap(); 625 *global_env_list = env_list; 626 return Ok(()); 627 } 628 629 /// # 创建全局环境变量列表 630 fn create_global_env_list( 631 sched_entities: &SchedEntities, 632 execute_ctx: &Arc<DadkUserExecuteContext>, 633 ) -> Result<EnvMap, ExecutorError> { 634 let mut env_list = EnvMap::new(); 635 let envs: Vars = std::env::vars(); 636 env_list.add_vars(envs); 637 638 // 为每个任务创建特定的环境变量 639 for entity in sched_entities.entities().iter() { 640 // 导出任务的构建目录环境变量 641 let build_dir = CacheDir::build_dir(entity.clone())?; 642 643 let build_dir_key = CacheDir::build_dir_env_key(&entity)?; 644 env_list.add(EnvVar::new( 645 build_dir_key, 646 build_dir.to_str().unwrap().to_string(), 647 )); 648 649 // 如果需要源码缓存目录,则导出 650 if CacheDir::need_source_cache(entity) { 651 let source_dir = CacheDir::source_dir(entity.clone())?; 652 let source_dir_key = CacheDir::source_dir_env_key(&entity)?; 653 env_list.add(EnvVar::new( 654 source_dir_key, 655 source_dir.to_str().unwrap().to_string(), 656 )); 657 } 658 } 659 660 // 创建ARCH环境变量 661 let target_arch = execute_ctx.target_arch(); 662 env_list.add(EnvVar::new("ARCH".to_string(), (*target_arch).into())); 663 664 return Ok(env_list); 665 } 666