假设我有一个如下所示的 bash 脚本:
array=( 1 2 3 4 5 6 )
for each in "${array[@]}"
do
echo "$each"
command --arg1 $each
done
Run Code Online (Sandbox Code Playgroud)
如果我想并行运行循环中的所有内容,我可以更改command --arg1 $each为command --arg1 $each &.
但现在假设我想获取结果command --arg1 $each并用这些结果做一些事情,如下所示:
array=( 1 2 3 4 5 6 )
for each in "${array[@]}"
do
echo "$each"
lags=($(command --arg1 $each)
lngth_lags=${#lags[*]}
for (( i=1; i<=$(( $lngth_lags -1 )); i++))
do
result=${lags[$i]}
echo -e "$timestamp\t$result" >> $log_file
echo "result piped"
done
done
Run Code Online (Sandbox Code Playgroud)
如果我只是&在 的末尾添加 a command --arg1 $each,则后面的所有内容command --arg1 $each都会运行而不会command --arg1 $each首先完成。我该如何防止这种情况发生?另外,如何限制循环可以占用的线程数量?
本质上,这个块应该并行运行1,2,3,4,5,6
echo "$each"
lags=($(command --arg1 $each)
lngth_lags=${#lags[*]}
for (( i=1; i<=$(( $lngth_lags -1 )); i++))
do
result=${lags[$i]}
echo -e "$timestamp\t$result" >> $log_file
echo "result piped"
done
Run Code Online (Sandbox Code Playgroud)
- - -编辑 - - - -
这是原始代码:
#!/bin/bash
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/etc/kafka/kafka.client.jaas.conf"
IFS=$'\n'
array=($(kafka-consumer-groups --bootstrap-server kafka1:9092 --list --command-config /etc/kafka/client.properties --new-consumer))
lngth=${#array[*]}
echo "array length: " $lngth
timestamp=$(($(date +%s%N)/1000000))
log_time=`date +%Y-%m-%d:%H`
echo "log time: " $log_time
log_file="/home/ec2-user/laglogs/laglog.$log_time.log"
echo "log file: " $log_file
echo "timestamp: " $timestamp
get_lags () {
echo "$1"
lags=($(kafka-consumer-groups --bootstrap-server kafka1:9092 --describe --group $1 --command-config /etc/kafka/client.properties --new-consumer))
lngth_lags=${#lags[*]}
for (( i=1; i<=$(( $lngth_lags -1 )); i++))
do
result=${lags[$i]}
echo -e "$timestamp\t$result" >> $log_file
echo "result piped"
done
}
for each in "${array[@]}"
do
get_lags $each &
done
Run Code Online (Sandbox Code Playgroud)
------编辑2---------
尝试以下答案:
#!/bin/bash
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/etc/kafka/kafka.client.jaas.conf"
IFS=$'\n'
array=($(kafka-consumer-groups --bootstrap-server kafka1:9092 --list --command-config /etc/kafka/client.properties --new-consumer))
lngth=${#array[*]}
echo "array length: " $lngth
timestamp=$(($(date +%s%N)/1000000))
log_time=`date +%Y-%m-%d:%H`
echo "log time: " $log_time
log_file="/home/ec2-user/laglogs/laglog.$log_time.log"
echo "log file: " $log_file
echo "timestamp: " $timestamp
max_proc_count=8
run_for_each() {
local each=$1
echo "Processing: $each" >&2
IFS=$'\n' read -r -d '' -a lags < <(kafka-consumer-groups --bootstrap-server kafka1:9092 --describe --command-config /etc/kafka/client.properties --new-consumer --group "$each" && printf '\0')
for result in "${lags[@]}"; do
printf '%(%Y-%m-%dT%H:%M:%S)T\t%s\t%s\n' -1 "$each" "$result"
done >>"$log_file"
}
export -f run_for_each
export log_file # make log_file visible to subprocesses
printf '%s\0' "${array[@]}" |
xargs -P "$max_proc_count" -n 1 -0 bash -c 'run_for_each "$@"'
Run Code Online (Sandbox Code Playgroud)
方便的做法是将后台代码推送到单独的脚本或导出的函数中。这样xargs可以创建一个新的 shell,并从其父级访问该函数。(确保export子级中也需要可用的任何其他变量)。
array=( 1 2 3 4 5 6 )
max_proc_count=8
log_file=out.txt
run_for_each() {
local each=$1
echo "Processing: $each" >&2
IFS=$' \t\n' read -r -d '' -a lags < <(yourcommand --arg1 "$each" && printf '\0')
for result in "${lags[@]}"; do
printf '%(%Y-%m-%dT%H:%M:%S)T\t%s\t%s\n' -1 "$each" "$result"
done >>"$log_file"
}
export -f run_for_each
export log_file # make log_file visible to subprocesses
printf '%s\0' "${array[@]}" |
xargs -P "$max_proc_count" -n 1 -0 bash -c 'run_for_each "$@"'
Run Code Online (Sandbox Code Playgroud)
一些注意事项:
echo -e是不好的形式。请参阅POSIX 规范echo中的 APPLICATION USAGE 和 RATIONALE 部分,明确建议使用printf代替(并且不定义-e选项,并且明确定义 notecho不得接受除 之外的任何选项-n)。each值包含在日志文件中,以便稍后可以从那里提取它。yourcommand空格分隔、制表符分隔、行分隔或其他方式分隔。因此,我暂时接受这一切;IFS修改传递给readtaste的值。printf '%(...)T'无需外部工具(例如date需要 bash 4.2 或更高版本)即可获取时间戳。如果您认为合适,请替换为您自己的代码。read -r -a arrayname < <(...)比 稳健得多arrayname=( $(...) )。特别是,它避免将发出的值视为全局变量 - 将*s 替换为当前目录中的文件列表,或者Foo[Bar]使用FooB该名称的任何文件是否存在(或者,如果设置了failglob或nullglob选项,则触发失败或不发出任何值)在这种情况下根本没有)。log_file一次比每次要运行printf一次时都重定向它更有效。请注意,让多个进程同时写入同一个文件只有在所有进程都打开该文件时才安全O_APPEND(这>>会做),并且它们以足够小的块写入以单独完成单个系统调用(这可能是除非个体lags值非常大,否则会发生)。