获取 R 中异步期货的子进程的 PID

cha*_*has 5 asynchronous r shiny-server

server <- function(input, output, session) {
  out1_rows <- reactiveVal()

   observeEvent(input$run1, {
   prog <- Progress$new(session)
   prog$set(message = "Analysis in progress",
         detail = "This may take a while...",
         value = NULL)

  fut1 = future({
  system(paste("Command1" , input$file ">", "out1.txt"))

  system(paste("Command2" , out1.txt ">", "out2.txt"))
  head_rows <- read.delim("out2.txt")
    return(head_rows)
    }) %...>%
     out1_rows() %>%
  finally( ~ prog$close())
NULL
})


 observeEvent(req(out1_rows()), {
 output$out_table <-
  DT::renderDataTable(DT::datatable(
    out1_rows(),
    )
  ))

observeEvent(input$cancel, {
    async_pid <- fut1$job$pid  ##this is empty
    #async_pid <- Sys.getpid()  ##this return PID for main process and kills "/opt/shiny-server/R/SockJSAdapter.R"  but not for subprocesses inside future()
    system(paste("kill -15", async_pid))
  })
}
Run Code Online (Sandbox Code Playgroud)

在这里,我需要终止在 future() 中运行命令的进程。我尝试以上述方式获取运行 future() 进程的 PID 并在input$cancel触发时终止。但是,fut1$job$pid不返回任何 PID 值,因此终止操作不成功。

来自未来小插曲的此链接显示了如何为 future() 作业获取 PID。但是,就我而言,我无法Sys.getpid()在 future() 内部使用,因为我不确定如何存储 PID 值,因为进程已经从我的系统命令返回了一些输出。

此页面未来的 GIT显示了使用语法的 External Kill 的替代方法fut1$job$pid。但这无法获取PID。

在尝试了不同的方法或对语法视而不见后,我无法弄清楚这一点。有人可以暗示这样做的方法。

ism*_*gal 3

您能为我们提供一个完整的可重现示例吗?

您可能想看看库(future.callr):

使用plan(callr)你可以获取 pid 并杀死进程,如下所示:

library(future)
library(promises)
library(future.callr)

plan(callr)

myFuture <- future({
  Sys.sleep(5)
  return(runif(1))
})

myFuture$process$get_pid()
myFuture$process$is_alive()

# myFuture$process$kill()
# myFuture$process$is_alive()

then(myFuture, onFulfilled = function(value){
print(value)
}, onRejected = NULL)
Run Code Online (Sandbox Code Playgroud)

编辑- 根据您的代码改编:

library(shiny)
library(DT)
library(future)
library(promises)
library(future.callr)
library(shinyjs)
library(V8)

plan(callr)

ui <- fluidPage(
  useShinyjs(),
  titlePanel("Trigger & kill future"),
  sidebarLayout(
    sidebarPanel(
      actionButton(inputId="run1", label="run future"),
      actionButton(inputId="cancel", label="kill future")
    ),
    mainPanel(
      dataTableOutput('out_table')
    )
  )
)

server <- function(input, output, session) {

  disable("cancel") 
  out1 <- reactiveValues(rows=NULL)

  observeEvent(input$run1, {

    disable("run1")
    enable("cancel")
    out1$rows <- NULL

    prog <- Progress$new(session)
    prog$set(message = "Analysis in progress",
             detail = "This may take a while...",
             value = NULL)

    fut1 <<- future({
      # system(paste("Command1" , input$file, ">", "out1.txt"))
      # system(paste("Command2" , out1.txt, ">", "out2.txt"))
      # head_rows <- read.delim("out2.txt")
      head_rows <- data.frame(replicate(5, sample(runif(20, 0, 1), 20, rep=TRUE)))
      Sys.sleep(5)
      return(head_rows)
    })

    print(paste("Running async process with PID:", fut1$process$get_pid()))

    then(fut1, onFulfilled = function(value){
      out1$rows <<- value
    }, onRejected = function(error){NULL})

    finally(fut1, function(){
      prog$close()
      disable("cancel")
      enable("run1")
    })

    return(NULL)
  }, ignoreInit = TRUE)


  observeEvent(req(out1$rows), {
    output$out_table <- DT::renderDataTable(DT::datatable(out1$rows))
  })

  observeEvent(input$cancel, {
    async_pid <- fut1$process$get_pid()
    print(paste("Killing PID:", async_pid))
    # system(paste("kill -9", async_pid)) # Linux - kill
    # system(paste("taskkill /f /pid", async_pid)) # Windows - kill
    fut1$process$kill() # library(future.callr) - kill
    out1$rows <- NULL
    disable("cancel")
    enable("run1")
  }, ignoreInit = TRUE)

}

shinyApp(ui = ui, server = server)
Run Code Online (Sandbox Code Playgroud)

偶尔出现的错误:

Unhandled promise error: callr failed, could not start R, exited with non-zero status, has crashed or was killed 
Warning: Error in : callr failed, could not start R, exited with non-zero status, has crashed or was killed 
  95: <Anonymous>
Run Code Online (Sandbox Code Playgroud)

也许@HenrikB 的这句话告诉我们我们遇到了什么:

但是,为了使其正常工作,您可能还需要使用 withCallingHandlers() 等使您的未来表达式/未来代码中断感知。我也不知道如果您连续发出太多中断信号会发生什么 - 可能是您设法中断工作线程的主 R 循环,这将导致 R 工作线程终止。这将导致 R 工作人员丢失,并且您会遇到开头提到的问题。

这里也提到了该错误,但目前在 future.callr-context 中我不知道如何解决它。

第二次编辑: 现在我从 Henrik Bengtsson 那里得到了一些进一步的反馈。他再次提到,核心 future API 目前不支持 futures 的终止。因此,最终无论我们使用什么后端,都可能会遇到问题。

忽略此信息,我会再看一下小library(ipc) 插图,它提供了有关“终​​止长时间运行的进程”主题的两个示例。但我已经在这里指出了这一点- 这可能导致了这个问题

最后,对于您的场景,所有这些可能都是无用的,因为您正在使用system()创建自己的子进程的调用(并相应地拥有自己的 pid)。因此,为什么不在系统命令中使用(正如我在此处的wait = FALSE评论中提到的那样)来获取异步行为并使用类似的东西捕获它们的 pid (请参阅)。这样你就不需要任何期货了。myCommand & echo $!