在 bash 脚本中并行运行数组中的每个元素

lig*_*ght 5 bash shell

假设我有一个如下所示的 bash 脚本:

array=( 1 2 3 4 5 6 )

for each in "${array[@]}"
do
  echo "$each"

  command --arg1 $each

done
Run Code Online (Sandbox Code Playgroud)

如果我想并行运行循环中的所有内容,我可以更改command --arg1 $eachcommand --arg1 $each &.

但现在假设我想获取结果command --arg1 $each并用这些结果做一些事情,如下所示:

array=( 1 2 3 4 5 6 )
for each in "${array[@]}"
do
  echo "$each"

  lags=($(command --arg1 $each)

  lngth_lags=${#lags[*]}

  for (( i=1; i<=$(( $lngth_lags -1 )); i++))
  do

    result=${lags[$i]}
    echo -e "$timestamp\t$result" >> $log_file
    echo "result piped"

  done

done
Run Code Online (Sandbox Code Playgroud)

如果我只是&在 的末尾添加 a command --arg1 $each,则后面的所有内容command --arg1 $each都会运行而不会command --arg1 $each首先完成。我该如何防止这种情况发生?另外,如何限制循环可以占用的线程数量?

本质上,这个块应该并行运行1,2,3,4,5,6

  echo "$each"

  lags=($(command --arg1 $each)

  lngth_lags=${#lags[*]}

  for (( i=1; i<=$(( $lngth_lags -1 )); i++))
  do

    result=${lags[$i]}
    echo -e "$timestamp\t$result" >> $log_file
    echo "result piped"

  done
Run Code Online (Sandbox Code Playgroud)

- - -编辑 - - - -

这是原始代码:

#!/bin/bash
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/etc/kafka/kafka.client.jaas.conf"
IFS=$'\n'
array=($(kafka-consumer-groups --bootstrap-server kafka1:9092 --list --command-config /etc/kafka/client.properties --new-consumer))

lngth=${#array[*]}

echo "array length: " $lngth

timestamp=$(($(date +%s%N)/1000000))

log_time=`date +%Y-%m-%d:%H`

echo "log time: " $log_time

log_file="/home/ec2-user/laglogs/laglog.$log_time.log"

echo "log file: " $log_file

echo "timestamp: " $timestamp

get_lags () {

  echo "$1"

  lags=($(kafka-consumer-groups --bootstrap-server kafka1:9092 --describe  --group $1 --command-config /etc/kafka/client.properties --new-consumer))

  lngth_lags=${#lags[*]}

  for (( i=1; i<=$(( $lngth_lags -1 )); i++))
  do

    result=${lags[$i]}
    echo -e "$timestamp\t$result" >> $log_file
    echo "result piped"

  done
}

for each in "${array[@]}"
do 

   get_lags $each &

done
Run Code Online (Sandbox Code Playgroud)

------编辑2---------

尝试以下答案:

#!/bin/bash
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/etc/kafka/kafka.client.jaas.conf"
IFS=$'\n'
array=($(kafka-consumer-groups --bootstrap-server kafka1:9092 --list --command-config /etc/kafka/client.properties --new-consumer))

lngth=${#array[*]}

echo "array length: " $lngth

timestamp=$(($(date +%s%N)/1000000))

log_time=`date +%Y-%m-%d:%H`

echo "log time: " $log_time

log_file="/home/ec2-user/laglogs/laglog.$log_time.log"

echo "log file: " $log_file

echo "timestamp: " $timestamp

max_proc_count=8

run_for_each() {
  local each=$1
  echo "Processing: $each" >&2
  IFS=$'\n' read -r -d '' -a lags < <(kafka-consumer-groups --bootstrap-server kafka1:9092 --describe --command-config /etc/kafka/client.properties --new-consumer --group "$each" && printf '\0')
  for result in "${lags[@]}"; do
    printf '%(%Y-%m-%dT%H:%M:%S)T\t%s\t%s\n' -1 "$each" "$result"
  done >>"$log_file"
}

export -f run_for_each
export log_file # make log_file visible to subprocesses

printf '%s\0' "${array[@]}" |
  xargs -P "$max_proc_count" -n 1 -0 bash -c 'run_for_each "$@"'
Run Code Online (Sandbox Code Playgroud)

Cha*_*ffy 4

方便的做法是将后台代码推送到单独的脚本或导出的函数中。这样xargs可以创建一个新的 shell,并从其父级访问该函数。(确保export子级中也需要可用的任何其他变量)。

array=( 1 2 3 4 5 6 )
max_proc_count=8
log_file=out.txt

run_for_each() {
  local each=$1
  echo "Processing: $each" >&2
  IFS=$' \t\n' read -r -d '' -a lags < <(yourcommand --arg1 "$each" && printf '\0')
  for result in "${lags[@]}"; do
    printf '%(%Y-%m-%dT%H:%M:%S)T\t%s\t%s\n' -1 "$each" "$result"
  done >>"$log_file"
}

export -f run_for_each
export log_file # make log_file visible to subprocesses

printf '%s\0' "${array[@]}" |
  xargs -P "$max_proc_count" -n 1 -0 bash -c 'run_for_each "$@"'
Run Code Online (Sandbox Code Playgroud)

一些注意事项:

  • 使用echo -e是不好的形式。请参阅POSIX 规范echo中的 APPLICATION USAGE 和 RATIONALE 部分,明确建议使用printf代替(并且定义-e选项,并且明确定义 notecho不得接受除 之外的任何选项-n)。
  • 我们将该each值包含在日志文件中,以便稍后可以从那里提取它。
  • 您尚未指定 的输出是否以yourcommand空格分隔、制表符分隔、行分隔或其他方式分隔。因此,我暂时接受这一切;IFS修改传递给readtaste的值。
  • printf '%(...)T'无需外部工具(例如date需要 bash 4.2 或更高版本)即可获取时间戳。如果您认为合适,请替换为您自己的代码。
  • read -r -a arrayname < <(...)比 稳健得多arrayname=( $(...) )。特别是,它避免将发出的值视为全局变量 - 将*s 替换为当前目录中的文件列表,或者Foo[Bar]使用FooB该名称的任何文件是否存在(或者,如果设置了failglobnullglob选项,则触发失败或不发出任何值)在这种情况下根本没有)。
  • 在整个循环中将 stdout 重定向到log_file一次比每次要运行printf一次时都重定向它更有效。请注意,让多个进程同时写入同一个文件只有在所有进程都打开该文件时才安全O_APPEND(这>>会做),并且它们以足够小的块写入以单独完成单个系统调用(这可能除非个体lags值非常大,否则会发生)。

  • 这是一个很好、简单的解决方案。必须注意的是,它适用于每个单独操作的结果可以单独处理的情况(即处理后没有交互)。此外,“xargs”一次只会启动“$max_proc_count”进程,但每个任务都会导致启动一个新进程,这在某些罕见的、性能关键的情况下可能不是最佳的,但更简单(因此在大多数情况下,与创建/控制您自己的工作进程相比,这是更好的选择。 (2认同)