使用不推荐使用的动态 API 构建具有动态输入的工作流

Rom*_*hik 4 bioinformatics snakemake

我想做一个工作流,从远程服务器下载一些 FASTQ 文件的列表,检查 md5 并运行一些后处理,例如对齐。

我了解如何使用两个工作流程来实现这一点:

  1. 首先下载fastq文件列表文件,例如md5文件。

  2. 读取md5文件内容并在all规则中为所需的结果文件创建相应的目标。

我想在一个工作流程中做到这一点。下面不正确的工作流程显示了我想要实现的想法。

  • all规则input:部分我不知道文件下载和解析{sample}之前的值md5

  • 我尝试使用动态、检查点和子分支流,但未能达到预期的结果。至于dynamic我只为动态(“fastq/{sample}.fq.gz.md5”)输出实现了这个工作流程。

  • 此外,我对不使用的解决方案感兴趣,dynamic因为它已被弃用。

rule all:
    input:
         "md5",
         "bams/{sample}.bam",

rule download_files_list:
    output: "md5"
    #shell: "wget {}".format(config["url_files_list"])
    run:
        # For testing instead of downloading:
        content = """
        bfe583337fd68b3  ID_001_1.fq.gz
        1636b6756daa65f  ID_001_2.fq.gz
        0428baf25307249  ID_002_1.fq.gz
        de33d81ba5bfa62  ID_002_2.fq.gz
        """.strip()
        with open(output[0], mode="w") as f:
            print(content, file=f)

rule fastq_md5_files:
    input: "md5"
    output: "fastq/{sample}.fq.gz.md5"
    shell: "mkdir -p fastq && awk '{{ print $0 > (\"fastq/\" $2 \".md5\") }}' {input}"

rule download_fastq_and_check_md5:
    input: "fastq/{sample}.fq.gz.md5"
    output: "fastq/{sample}.fq.gz"
    #shell: "wget {}/{{sample}} && md5sum --check {{input}}".format(config["url_file_prefix"])
    shell: "touch {output}" 

rule align_fastq:
    input: "fastq/{sample}.fq.gz"
    output: "bams/{sample}.bam"
    shell: "touch {output}" # aligning task
Run Code Online (Sandbox Code Playgroud)

cdu*_*001 7

我看到很多关于如何使用功能的困惑checkpoint。这是一个简化的说明性示例:

shell.prefix('set -vexu pipefail; ')

rule finish:
        input:
                "D/all.txt"

checkpoint A:
        output:
                mydir = directory("A")
        shell: """
                mkdir -p A
                N=$(( $RANDOM % 7 + 1))
                echo "N=$N"
                # Create a number of files. (
                for i in $(seq 1 $N); do
                        echo $i > "A/$i.txt"
                done
        """

rule B:
        output:
                txt = "B/{i}.txt",
        input:
                txt = "A/{i}.txt",
        shell: """
                mkdir -p B
                cp -f {input.txt} {output.txt}
        """

rule C:
        output:
                txt = "C/{i}.txt",
        input:
                txt = "B/{i}.txt",
        shell: """
                mkdir -p C
                cp -f {input.txt} {output.txt}
        """

def gatherForD_fromC_basedOnA(wildcards):
        checkpoint_output = checkpoints.A.get(**wildcards).output.mydir
        # That will raise if not ready.

        ivals = glob_wildcards(os.path.join(checkpoint_output,
                        "{i}.txt")).i
        print("ivals={}".format(ivals))
        return expand("C/{i}.txt", i=ivals)

rule D:
        output:
                combined = "D/all.txt",
        input:
                gathered = gatherForD_fromC_basedOnA,
        shell: """
                mkdir -p D
                cat {input.gathered} > {output.combined}
        """
Run Code Online (Sandbox Code Playgroud)

复制到snakefile并运行它

snakemake --verbose -p
Run Code Online (Sandbox Code Playgroud)
  • 检查点/规则 A输出随机数量的文件。(当然,您可以将它们基于“输入”部分。)
  • 规则BC使用标准的平行规则蛇形“通配符”的。
  • 规则 D基于输入生成函数接受未知数量的输入。
  • 函数gatherForD_fromC_basedOnA等待 checkpoint-rule 的输出A,但它命名rule 的输出,C最终被 rule 消耗D。因此,snakemake知道D会消耗什么(A完成后)。所以它知道什么C必须生产。所以它知道B必须生产什么。
  • 最后,规则finish等待特定的已知文件。