1 use std::{ 2 collections::{BTreeMap, VecDeque}, 3 env::Vars, 4 path::PathBuf, 5 process::{Command, Stdio}, 6 sync::{Arc, RwLock}, 7 time::SystemTime, 8 }; 9 10 use chrono::{DateTime, Utc}; 11 use dadk_config::user::UserCleanLevel; 12 use log::{debug, error, info, warn}; 13 14 use crate::{ 15 context::{Action, DadkUserExecuteContext}, 16 executor::cache::CacheDir, 17 parser::{ 18 task::{CodeSource, PrebuiltSource, TaskType}, 19 task_log::{BuildStatus, InstallStatus, TaskLog}, 20 }, 21 scheduler::{SchedEntities, SchedEntity}, 22 utils::{file::FileUtils, path::abs_path}, 23 }; 24 25 use dadk_config::common::task::TaskEnv; 26 27 use self::cache::{CacheDirType, TaskDataDir}; 28 29 pub mod cache; 30 pub mod source; 31 #[cfg(test)] 32 mod tests; 33 34 lazy_static! { 35 // 全局环境变量的列表 36 pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new()); 37 } 38 39 #[derive(Debug, Clone)] 40 pub struct Executor { 41 entity: Arc<SchedEntity>, 42 action: Action, 43 local_envs: EnvMap, 44 /// 任务构建结果输出到的目录 45 build_dir: CacheDir, 46 /// 如果任务需要源文件缓存,则此字段为 Some(CacheDir),否则为 None(使用本地源文件路径) 47 source_dir: Option<CacheDir>, 48 /// 任务数据目录 49 task_data_dir: TaskDataDir, 50 /// DragonOS sysroot的路径 51 dragonos_sysroot: PathBuf, 52 } 53 54 impl Executor { 55 /// # 创建执行器 56 /// 57 /// 用于执行一个任务 58 /// 59 /// ## 参数 60 /// 61 /// * `entity` - 任务调度实体 62 /// 63 /// ## 返回值 64 /// 65 /// * `Ok(Executor)` - 创建成功 66 /// * `Err(ExecutorError)` - 创建失败 67 pub fn new( 68 entity: Arc<SchedEntity>, 69 action: Action, 70 dragonos_sysroot: PathBuf, 71 ) -> Result<Self, ExecutorError> { 72 let local_envs = EnvMap::new(); 73 let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?; 74 let task_data_dir = TaskDataDir::new(entity.clone())?; 75 76 let source_dir = if CacheDir::need_source_cache(&entity) { 77 Some(CacheDir::new(entity.clone(), CacheDirType::Source)?) 78 } else { 79 None 80 }; 81 82 let result: Executor = Self { 83 action, 84 entity, 85 local_envs, 86 build_dir, 87 source_dir, 88 task_data_dir, 89 dragonos_sysroot, 90 }; 91 92 return Ok(result); 93 } 94 95 /// # 执行任务 96 /// 97 /// 创建执行器后,调用此方法执行任务。 98 /// 该方法会执行以下步骤: 99 /// 100 /// 1. 创建工作线程 101 /// 2. 准备环境变量 102 /// 3. 拉取数据(可选) 103 /// 4. 执行构建 104 pub fn execute(&mut self) -> Result<(), ExecutorError> { 105 info!("Execute task: {}", self.entity.task().name_version()); 106 107 let r = self.do_execute(); 108 self.save_task_data(r.clone()); 109 info!("Task {} finished", self.entity.task().name_version()); 110 return r; 111 } 112 113 /// # 保存任务数据 114 fn save_task_data(&self, r: Result<(), ExecutorError>) { 115 let mut task_log = self.task_data_dir.task_log(); 116 match self.action { 117 Action::Build => { 118 if r.is_ok() { 119 task_log.set_build_status(BuildStatus::Success); 120 } else { 121 task_log.set_build_status(BuildStatus::Failed); 122 } 123 124 task_log.set_build_time_now(); 125 } 126 127 Action::Install => { 128 if r.is_ok() { 129 task_log.set_install_status(InstallStatus::Success); 130 } else { 131 task_log.set_install_status(InstallStatus::Failed); 132 } 133 task_log.set_install_time_now(); 134 } 135 136 Action::Clean(_) => { 137 task_log.clean_build_status(); 138 task_log.clean_install_status(); 139 } 140 } 141 142 self.task_data_dir 143 .save_task_log(&task_log) 144 .expect("Failed to save task log"); 145 } 146 147 fn do_execute(&mut self) -> Result<(), ExecutorError> { 148 // 准备本地环境变量 149 self.prepare_local_env()?; 150 151 match self.action { 152 Action::Build => { 153 // 构建任务 154 self.build()?; 155 } 156 Action::Install => { 157 // 把构建结果安装到DragonOS 158 self.install()?; 159 } 160 Action::Clean(_) => { 161 // 清理构建结果 162 let r = self.clean(); 163 if let Err(e) = r { 164 error!( 165 "Failed to clean task {}: {:?}", 166 self.entity.task().name_version(), 167 e 168 ); 169 } 170 } 171 } 172 173 return Ok(()); 174 } 175 176 fn build(&mut self) -> Result<(), ExecutorError> { 177 if let Some(status) = self.task_log().build_status() { 178 if let Some(build_time) = self.task_log().build_time() { 179 let mut last_modified = last_modified_time(&self.entity.file_path(), build_time)?; 180 last_modified = core::cmp::max( 181 last_modified, 182 last_modified_time(&self.src_work_dir(), build_time)?, 183 ); 184 185 if *status == BuildStatus::Success 186 && (self.entity.task().build_once || last_modified < *build_time) 187 { 188 info!( 189 "Task {} has been built successfully, skip build.", 190 self.entity.task().name_version() 191 ); 192 return Ok(()); 193 } 194 } 195 } 196 197 return self.do_build(); 198 } 199 200 /// # 执行build操作 201 fn do_build(&mut self) -> Result<(), ExecutorError> { 202 // 确认源文件就绪 203 self.prepare_input()?; 204 205 let command: Option<Command> = self.create_command()?; 206 if let Some(cmd) = command { 207 self.run_command(cmd)?; 208 } 209 210 // 检查构建结果,如果为空,则抛出警告 211 if self.build_dir.is_empty()? { 212 warn!( 213 "Task {}: build result is empty, do you forget to copy the result to [$DADK_CURRENT_BUILD_DIR]?", 214 self.entity.task().name_version(), 215 ); 216 } 217 return Ok(()); 218 } 219 220 fn install(&self) -> Result<(), ExecutorError> { 221 log::trace!("dadk-user: install {}", self.entity.task().name_version()); 222 if let Some(status) = self.task_log().install_status() { 223 if let Some(install_time) = self.task_log().install_time() { 224 let last_modified = last_modified_time(&self.build_dir.path, install_time)?; 225 let last_modified = core::cmp::max( 226 last_modified, 227 last_modified_time(&self.entity.file_path(), install_time)?, 228 ); 229 230 if *status == InstallStatus::Success 231 && (self.entity.task().install_once || last_modified < *install_time) 232 { 233 info!( 234 "install: Task {} not changed.", 235 self.entity.task().name_version() 236 ); 237 return Ok(()); 238 } 239 } 240 } 241 log::trace!("dadk-user: to do install {}", self.entity.task().name_version()); 242 return self.do_install(); 243 } 244 245 /// # 执行安装操作,把构建结果安装到DragonOS 246 fn do_install(&self) -> Result<(), ExecutorError> { 247 let binding = self.entity.task(); 248 let in_dragonos_path = binding.install.in_dragonos_path.as_ref(); 249 // 如果没有指定安装路径,则不执行安装 250 if in_dragonos_path.is_none() { 251 return Ok(()); 252 } 253 info!("Installing task: {}", self.entity.task().name_version()); 254 let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string(); 255 256 debug!("in_dragonos_path: {}", in_dragonos_path); 257 // 去除开头的斜杠 258 { 259 let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count(); 260 in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string(); 261 } 262 // 拼接最终的安装路径 263 let install_path = abs_path(&self.dragonos_sysroot.join(in_dragonos_path)); 264 debug!("install_path: {:?}", install_path); 265 // 创建安装路径 266 std::fs::create_dir_all(&install_path).map_err(|e| { 267 ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string())) 268 })?; 269 270 // 拷贝构建结果到安装路径 271 let build_dir: PathBuf = self.build_dir.path.clone(); 272 FileUtils::copy_dir_all(&build_dir, &install_path) 273 .map_err(|e| ExecutorError::InstallError(e))?; 274 info!("Task {} installed.", self.entity.task().name_version()); 275 276 return Ok(()); 277 } 278 279 fn clean(&self) -> Result<(), ExecutorError> { 280 let level = if let Action::Clean(l) = self.action { 281 l 282 } else { 283 panic!( 284 "BUG: clean() called with non-clean action. executor details: {:?}", 285 self 286 ); 287 }; 288 info!( 289 "Cleaning task: {}, level={level:?}", 290 self.entity.task().name_version() 291 ); 292 293 let r: Result<(), ExecutorError> = match level { 294 UserCleanLevel::All => self.clean_all(), 295 UserCleanLevel::InSrc => self.clean_src(), 296 UserCleanLevel::Output => { 297 self.clean_target()?; 298 self.clean_cache() 299 } 300 }; 301 302 if let Err(e) = r { 303 error!( 304 "Failed to clean task: {}, error message: {:?}", 305 self.entity.task().name_version(), 306 e 307 ); 308 return Err(e); 309 } 310 311 return Ok(()); 312 } 313 314 fn clean_all(&self) -> Result<(), ExecutorError> { 315 // 在源文件目录执行清理 316 self.clean_src()?; 317 // 清理构建结果 318 self.clean_target()?; 319 // 清理缓存 320 self.clean_cache()?; 321 return Ok(()); 322 } 323 324 /// 在源文件目录执行清理 325 fn clean_src(&self) -> Result<(), ExecutorError> { 326 let cmd: Option<Command> = self.create_command()?; 327 if cmd.is_none() { 328 // 如果这里没有命令,则认为用户不需要在源文件目录执行清理 329 return Ok(()); 330 } 331 info!( 332 "{}: Cleaning in source directory: {:?}", 333 self.entity.task().name_version(), 334 self.src_work_dir() 335 ); 336 337 let cmd = cmd.unwrap(); 338 self.run_command(cmd)?; 339 return Ok(()); 340 } 341 342 /// 清理构建输出目录 343 fn clean_target(&self) -> Result<(), ExecutorError> { 344 info!( 345 "{}: Cleaning build target directory: {:?}", 346 self.entity.task().name_version(), 347 self.build_dir.path 348 ); 349 350 return self.build_dir.remove_self_recursive(); 351 } 352 353 /// 清理下载缓存 354 fn clean_cache(&self) -> Result<(), ExecutorError> { 355 let cache_dir = self.source_dir.as_ref(); 356 if cache_dir.is_none() { 357 // 如果没有缓存目录,则认为用户不需要清理缓存 358 return Ok(()); 359 } 360 info!( 361 "{}: Cleaning cache directory: {}", 362 self.entity.task().name_version(), 363 self.src_work_dir().display() 364 ); 365 return cache_dir.unwrap().remove_self_recursive(); 366 } 367 368 /// 获取源文件的工作目录 369 fn src_work_dir(&self) -> PathBuf { 370 if let Some(local_path) = self.entity.task().source_path() { 371 return local_path; 372 } 373 return self.source_dir.as_ref().unwrap().path.clone(); 374 } 375 376 fn task_log(&self) -> TaskLog { 377 return self.task_data_dir.task_log(); 378 } 379 380 /// 为任务创建命令 381 fn create_command(&self) -> Result<Option<Command>, ExecutorError> { 382 // 获取命令 383 let raw_cmd = match self.entity.task().task_type { 384 TaskType::BuildFromSource(_) => match self.action { 385 Action::Build => self.entity.task().build.build_command.clone(), 386 Action::Clean(_) => self.entity.task().clean.clean_command.clone(), 387 _ => unimplemented!( 388 "create_command: Action {:?} not supported yet.", 389 self.action 390 ), 391 }, 392 393 TaskType::InstallFromPrebuilt(_) => match self.action { 394 Action::Build => self.entity.task().build.build_command.clone(), 395 Action::Clean(_) => self.entity.task().clean.clean_command.clone(), 396 _ => unimplemented!( 397 "create_command: Action {:?} not supported yet.", 398 self.action 399 ), 400 }, 401 }; 402 403 if raw_cmd.is_none() { 404 return Ok(None); 405 } 406 407 let raw_cmd = raw_cmd.unwrap(); 408 409 let mut command = Command::new("bash"); 410 command.current_dir(self.src_work_dir()); 411 412 // 设置参数 413 command.arg("-c"); 414 command.arg(raw_cmd); 415 416 // 设置环境变量 417 let env_list = ENV_LIST.read().unwrap(); 418 for (key, value) in env_list.envs.iter() { 419 // if key.starts_with("DADK") { 420 // debug!("DADK env found: {}={}", key, value.value); 421 // } 422 command.env(key, value.value.clone()); 423 } 424 drop(env_list); 425 for (key, value) in self.local_envs.envs.iter() { 426 debug!("Local env found: {}={}", key, value.value); 427 command.env(key, value.value.clone()); 428 } 429 430 return Ok(Some(command)); 431 } 432 433 /// # 准备工作线程本地环境变量 434 fn prepare_local_env(&mut self) -> Result<(), ExecutorError> { 435 let binding = self.entity.task(); 436 let task_envs: Option<&Vec<TaskEnv>> = binding.envs.as_ref(); 437 438 if let Some(task_envs) = task_envs { 439 for tv in task_envs.iter() { 440 self.local_envs 441 .add(EnvVar::new(tv.key().to_string(), tv.value().to_string())); 442 } 443 } 444 445 // 添加`DADK_CURRENT_BUILD_DIR`环境变量,便于构建脚本把构建结果拷贝到这里 446 self.local_envs.add(EnvVar::new( 447 "DADK_CURRENT_BUILD_DIR".to_string(), 448 self.build_dir.path.to_str().unwrap().to_string(), 449 )); 450 451 return Ok(()); 452 } 453 454 fn prepare_input(&self) -> Result<(), ExecutorError> { 455 // 拉取源文件 456 let task = self.entity.task(); 457 match &task.task_type { 458 TaskType::BuildFromSource(cs) => { 459 if self.source_dir.is_none() { 460 return Ok(()); 461 } 462 let source_dir = self.source_dir.as_ref().unwrap(); 463 match cs { 464 CodeSource::Git(git) => { 465 git.prepare(source_dir) 466 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 467 } 468 // 本地源文件,不需要拉取 469 CodeSource::Local(_) => return Ok(()), 470 // 在线压缩包,需要下载 471 CodeSource::Archive(archive) => { 472 archive 473 .download_unzip(source_dir) 474 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 475 } 476 } 477 } 478 TaskType::InstallFromPrebuilt(pb) => { 479 match pb { 480 // 本地源文件,不需要拉取 481 PrebuiltSource::Local(local_source) => { 482 let local_path = local_source.path(); 483 let target_path = &self.build_dir.path; 484 FileUtils::copy_dir_all(&local_path, &target_path) 485 .map_err(|e| ExecutorError::TaskFailed(e))?; // let mut cmd = "cp -r ".to_string(); 486 return Ok(()); 487 } 488 // 在线压缩包,需要下载 489 PrebuiltSource::Archive(archive) => { 490 archive 491 .download_unzip(&self.build_dir) 492 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 493 } 494 } 495 } 496 } 497 498 return Ok(()); 499 } 500 501 fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> { 502 let mut child = command 503 .stdin(Stdio::inherit()) 504 .spawn() 505 .map_err(|e| ExecutorError::IoError(e.to_string()))?; 506 507 // 等待子进程结束 508 let r = child 509 .wait() 510 .map_err(|e| ExecutorError::IoError(e.to_string())); 511 debug!("Command finished: {:?}", r); 512 if r.is_ok() { 513 let r = r.unwrap(); 514 if r.success() { 515 return Ok(()); 516 } else { 517 // 执行失败,获取最后100行stderr输出 518 let errmsg = format!( 519 "Task {} failed, exit code = {}", 520 self.entity.task().name_version(), 521 r.code().unwrap() 522 ); 523 error!("{errmsg}"); 524 let command_opt = command.output(); 525 if command_opt.is_err() { 526 return Err(ExecutorError::TaskFailed( 527 "Failed to get command output".to_string(), 528 )); 529 } 530 let command_opt = command_opt.unwrap(); 531 let command_output = String::from_utf8_lossy(&command_opt.stderr); 532 let mut last_100_outputs = command_output 533 .lines() 534 .rev() 535 .take(100) 536 .collect::<Vec<&str>>(); 537 last_100_outputs.reverse(); 538 error!("Last 100 lines msg of stderr:"); 539 for line in last_100_outputs { 540 error!("{}", line); 541 } 542 return Err(ExecutorError::TaskFailed(errmsg)); 543 } 544 } else { 545 let errmsg = format!( 546 "Task {} failed, msg = {:?}", 547 self.entity.task().name_version(), 548 r.err().unwrap() 549 ); 550 error!("{errmsg}"); 551 return Err(ExecutorError::TaskFailed(errmsg)); 552 } 553 } 554 } 555 556 #[derive(Debug, Clone)] 557 pub struct EnvMap { 558 pub envs: BTreeMap<String, EnvVar>, 559 } 560 561 impl EnvMap { 562 pub fn new() -> Self { 563 Self { 564 envs: BTreeMap::new(), 565 } 566 } 567 568 pub fn add(&mut self, env: EnvVar) { 569 self.envs.insert(env.key.clone(), env); 570 } 571 572 #[allow(dead_code)] 573 pub fn get(&self, key: &str) -> Option<&EnvVar> { 574 self.envs.get(key) 575 } 576 577 pub fn add_vars(&mut self, vars: Vars) { 578 for (key, value) in vars { 579 self.add(EnvVar::new(key, value)); 580 } 581 } 582 } 583 584 /// # 环境变量 585 #[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)] 586 pub struct EnvVar { 587 pub key: String, 588 pub value: String, 589 } 590 591 impl EnvVar { 592 pub fn new(key: String, value: String) -> Self { 593 Self { key, value } 594 } 595 } 596 597 /// # 任务执行器错误枚举 598 #[allow(dead_code)] 599 #[derive(Debug, Clone)] 600 pub enum ExecutorError { 601 /// 准备执行环境错误 602 PrepareEnvError(String), 603 IoError(String), 604 /// 构建执行错误 605 TaskFailed(String), 606 /// 安装错误 607 InstallError(String), 608 /// 清理错误 609 CleanError(String), 610 } 611 612 /// # 准备全局环境变量 613 pub fn prepare_env( 614 sched_entities: &SchedEntities, 615 execute_ctx: &Arc<DadkUserExecuteContext>, 616 ) -> Result<(), ExecutorError> { 617 info!("Preparing environment variables..."); 618 let env_list = create_global_env_list(sched_entities, execute_ctx)?; 619 // 写入全局环境变量列表 620 let mut global_env_list = ENV_LIST.write().unwrap(); 621 *global_env_list = env_list; 622 return Ok(()); 623 } 624 625 /// # 创建全局环境变量列表 626 fn create_global_env_list( 627 sched_entities: &SchedEntities, 628 execute_ctx: &Arc<DadkUserExecuteContext>, 629 ) -> Result<EnvMap, ExecutorError> { 630 let mut env_list = EnvMap::new(); 631 let envs: Vars = std::env::vars(); 632 env_list.add_vars(envs); 633 634 // 为每个任务创建特定的环境变量 635 for entity in sched_entities.entities().iter() { 636 // 导出任务的构建目录环境变量 637 let build_dir = CacheDir::build_dir(entity.clone())?; 638 639 let build_dir_key = CacheDir::build_dir_env_key(&entity)?; 640 env_list.add(EnvVar::new( 641 build_dir_key, 642 build_dir.to_str().unwrap().to_string(), 643 )); 644 645 // 如果需要源码缓存目录,则导出 646 if CacheDir::need_source_cache(entity) { 647 let source_dir = CacheDir::source_dir(entity.clone())?; 648 let source_dir_key = CacheDir::source_dir_env_key(&entity)?; 649 env_list.add(EnvVar::new( 650 source_dir_key, 651 source_dir.to_str().unwrap().to_string(), 652 )); 653 } 654 } 655 656 // 创建ARCH环境变量 657 let target_arch = execute_ctx.target_arch(); 658 env_list.add(EnvVar::new("ARCH".to_string(), (*target_arch).into())); 659 660 return Ok(env_list); 661 } 662 663 /// # 获取文件最后的更新时间 664 /// 665 /// ## 参数 666 /// * `path` - 文件路径 667 /// * `last_modified` - 最后的更新时间 668 /// * `build_time` - 构建时间 669 fn last_modified_time( 670 path: &PathBuf, 671 build_time: &DateTime<Utc>, 672 ) -> Result<DateTime<Utc>, ExecutorError> { 673 let mut queue = VecDeque::new(); 674 queue.push_back(path.clone()); 675 676 let mut last_modified = DateTime::<Utc>::from(SystemTime::UNIX_EPOCH); 677 678 while let Some(current_path) = queue.pop_front() { 679 let metadata = current_path 680 .metadata() 681 .map_err(|e| ExecutorError::InstallError(e.to_string()))?; 682 683 if metadata.is_dir() { 684 for r in std::fs::read_dir(¤t_path).unwrap() { 685 if let Ok(entry) = r { 686 // 忽略编译产物目录 687 if entry.file_name() == "target" { 688 continue; 689 } 690 691 let entry_path = entry.path(); 692 let entry_metadata = entry.metadata().unwrap(); 693 // 比较文件的修改时间和last_modified,取最大值 694 let file_modified = DateTime::<Utc>::from(entry_metadata.modified().unwrap()); 695 last_modified = std::cmp::max(last_modified, file_modified); 696 697 // 如果其中某一个文件的修改时间在build_time之后,则直接返回,不用继续搜索 698 if last_modified > *build_time { 699 return Ok(last_modified); 700 } 701 702 if entry_metadata.is_dir() { 703 // 如果是子目录,则将其加入队列 704 queue.push_back(entry_path); 705 } 706 } 707 } 708 } else { 709 // 如果是文件,直接比较修改时间 710 let file_modified = DateTime::<Utc>::from(metadata.modified().unwrap()); 711 last_modified = std::cmp::max(last_modified, file_modified); 712 713 // 如果其中某一个文件的修改时间在build_time之后,则直接返回,不用继续递归 714 if last_modified > *build_time { 715 return Ok(last_modified); 716 } 717 } 718 } 719 720 if last_modified == DateTime::<Utc>::from(SystemTime::UNIX_EPOCH) { 721 return Err(ExecutorError::InstallError(format!( 722 "Failed to get last modified time for path: {}", 723 path.display() 724 ))); 725 } 726 Ok(last_modified) 727 } 728