谷歌数据流似乎下降了第1000个记录

use*_*244 0 python-2.7 google-cloud-platform google-cloud-dataflow apache-beam gcp

我使用Google Dataflow(apache-beam)设置了一个小测试.该实验的用例是获取(csv)文件并将选定列写入(txt)文件.

实验代码如下:

from __future__ import absolute_import

import argparse
import logging
import re

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

class EmitColDoFn(beam.DoFn):
    first = True
    header = ""
    def __init__(self, i):
        super(EmitColDoFn, self).__init__()
        self.line_count =  Metrics.counter(self.__class__, 'lines')
        self.i = i

    def process(self, element):
        if self.first:
            self.header = element
            self.first = False
        else:
            self.line_count.inc()
            cols = re.split(',', element)
            return (cols[self.i],)

def run(argv=None):
    """Main entry point; defines and runs the wordcount pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        dest='input',
                        default='/users/sms/python_beam/data/MOCK_DATA (4).csv',
#                        default='gs://dataflow-samples/shakespeare/kinglear.txt',
                        help='Input file to process.')
    parser.add_argument('--output',
                        dest='output',
                        default="/users/sms/python_beam/data/",
#                        required=True,
                        help='Output file to write results to.')
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    p = beam.Pipeline(options=pipeline_options)

    # Read the text file[pattern] into a PCollection.
    lines = p | 'read' >> ReadFromText(known_args.input)

    column = (lines
            | 'email col' >> (beam.ParDo(EmitColDoFn(3)))
            | "col file" >> WriteToText(known_args.output, ".txt", shard_name_template="SS_Col"))

    result = p.run()
    result.wait_until_finish()

    if (not hasattr(result, 'has_job')  # direct runner
        or result.has_job):  # not just a template creation
        lines_filter = MetricsFilter().with_name('lines')
        query_result = result.metrics().query(lines_filter)
        if query_result['counters']:
            lines_counter = query_result['counters'][0]

        print "Lines committed", lines_counter.committed
run()
Run Code Online (Sandbox Code Playgroud)

以下样本1的最后几行:

990,Corabel,Feldbau,cfeldbaurh@deliciousdays.com,Female,84.102.162.190,DJ
991,Kiley,Rottcher,krottcherri@stanford.edu,Male,91.97.155.28,CA
992,Glenda,Clist,gclistrj@state.gov,Female,24.98.253.127,UA
993,Ingunna,Maher,imaherrk@army.mil,Female,159.31.127.19,PL
994,Megan,Giacopetti,mgiacopettirl@instagram.com,Female,115.6.63.52,RU
995,Briny,Dutnall,bdutnallrm@xrea.com,Female,102.81.33.24,SE
996,Jan,Caddan,jcaddanrn@jalbum.net,Female,115.142.222.106,PL
Run Code Online (Sandbox Code Playgroud)

运行它会产生预期的输出:

/usr/local/bin/python2.7
/Users/sms/Library/Preferences/PyCharmCE2017.1/scratches/scratch_4.py
No handlers could be found for logger "oauth2client.contrib.multistore_file"
Lines committed 996

Process finished with exit code 0
Run Code Online (Sandbox Code Playgroud)

现在为了奇怪的结果.在下一次运行中,行数增加到1000.

994,Megan,Giacopetti,mgiacopettirl@instagram.com,Female,115.6.63.52,RU
995,Briny,Dutnall,bdutnallrm@xrea.com,Female,102.81.33.24,SE
996,Jan,Caddan,jcaddanrn@jalbum.net,Female,115.142.222.106,PL
997,Shannen,Gaisford,sgaisfordr7@rediff.com,Female,167.255.222.92,RU
998,Lorianna,Slyne,lslyner8@cbc.ca,Female,54.169.60.13,CN
999,Franklin,Yaakov,fyaakovr9@latimes.com,Male,122.1.92.236,CN
1000,Wilhelmine,Cariss,wcarissra@creativecommons.org,Female,237.48.113.255,PL
Run Code Online (Sandbox Code Playgroud)

但这次出局是

/usr/local/bin/python2.7
/Users/sms/Library/Preferences/PyCharmCE2017.1/scratches/scratch_4.py
No handlers could be found for logger "oauth2client.contrib.multistore_file"
Lines committed 999

Process finished with exit code 0
Run Code Online (Sandbox Code Playgroud)

检查输出文件显示最后一行未处理.

bdutnallrm@xrea.com
jcaddanrn@jalbum.net
sgaisfordr7@rediff.com
lslyner8@cbc.ca
fyaakovr9@latimes.com
Run Code Online (Sandbox Code Playgroud)

有什么想法在这里发生了什么?

Rag*_*adi 5

'EditColDoFn'跳过第一行,假设每个文件都有一个实例.当你有1000多行时,DirectRunner会创建两个包:第一个包含1000行,第二个包含1行.在Beam应用程序中,输入可能会分成多个包,以便并行处理.与文件数和捆绑数无关.相同的应用程序可以处理遍布许多文件的terra字节数据.

ReadFromText 有一个选项'skip_header_lines',您可以设置为1以跳过每个输入文件中的标题行.