错误:消息:提供的来源过多:15285。限制为10000

Zen*_*non 2 google-bigquery google-cloud-dataflow apache-beam

我目前正在尝试运行数据流(Apache Beam,Python SDK)任务,以将大于100GB的Tweet文件导入BigQuery,但正在 Error: Message: Too many sources provided: 15285. Limit is 10000.

该任务将使用tweet(JSON),提取5个相关字段,并通过一些转换对它们进行一些转换/消毒,然后将这些值写入BigQuery,以用于进一步处理。

BigQueryCloud Dataflow-来源过多,但这似乎是由于输入文件过多而引起的,而我只有一个输入文件,因此似乎无关紧要。另外,这里提到的解决方案是很神秘的,我不确定是否/如何将它们应用于我的问题。

我的猜测是,BigQuery在持久存储之前为每一行或其他内容写入临时文件,这就是“太多源”的含义吗?

我怎样才能解决这个问题?

[编辑]

码:

import argparse
import json
import logging

import apache_beam as beam

class JsonCoder(object):
    """A JSON coder interpreting each line as a JSON string."""

    def encode(self, x):
        return json.dumps(x)

    def decode(self, x):
        return json.loads(x)

def filter_by_nonempty_county(record):
    if 'county_fips' in record and record['county_fips'] is not None:
        yield record

def run(argv=None):

    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        default='...',
                        help=('Input twitter json file specified as: '
                            'gs://path/to/tweets.json'))
    parser.add_argument(
        '--output',
        required=True,
        help=
        ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
        'or DATASET.TABLE.'))

    known_args, pipeline_args = parser.parse_known_args(argv)



    p = beam.Pipeline(argv=pipeline_args)

    # read text file

    #Read all tweets from given source file
    read_tweets = "Read Tweet File" >> beam.io.ReadFromText(known_args.input, coder=JsonCoder())

    #Extract the relevant fields of the source file
    extract_fields =  "Project relevant fields" >> beam.Map(lambda row: {'text': row['text'],
                                                                  'user_id': row['user']['id'],
                                                                  'location': row['user']['location'] if 'location' in row['user'] else None,
                                                                  'geo':row['geo'] if 'geo' in row else None,
                                                                  'tweet_id': row['id'],
                                                                  'time': row['created_at']})


    #check what type of geo-location the user has
    has_geo_location_or_not = "partition by has geo or not" >> beam.Partition(lambda element, partitions: 0 if element['geo'] is None else 1, 2)


    check_county_not_empty = lambda element, partitions: 1 if 'county_fips' in element and element['county_fips'] is not None else 0

    #tweet has coordinates partition or not
    coordinate_partition = (p
             | read_tweets
             | extract_fields
             | beam.ParDo(TimeConversion())
             | has_geo_location_or_not)


    #lookup by coordinates
    geo_lookup = (coordinate_partition[1] | "geo coordinates mapping" >> beam.ParDo(BeamGeoLocator())
                           | "filter successful geo coords" >> beam.Partition(check_county_not_empty, 2))

    #lookup by profile
    profile_lookup = ((coordinate_partition[0], geo_lookup[0])
                      | "join streams" >> beam.Flatten()
                      | "Lookup from profile location" >> beam.ParDo(ComputeLocationFromProfile())
                      )


    bigquery_output = "write output to BigQuery" >>  beam.io.Write(
       beam.io.BigQuerySink(known_args.output,
                   schema='text:STRING, user_id:INTEGER, county_fips:STRING, tweet_id:INTEGER, time:TIMESTAMP, county_source:STRING',
                  create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                 write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))

    #file_output = "write output" >> beam.io.WriteToText(known_args.output, coder=JsonCoder())


    output = ((profile_lookup, geo_lookup[1]) | "merge streams" >> beam.Flatten()
              | "Filter entries without location" >> beam.FlatMap(filter_by_nonempty_county)
              | "project relevant fields" >> beam.Map(lambda row: {'text': row['text'],
                                                                   'user_id': row['user_id'],
                                                                   'county_fips': row['county_fips'],
                                                                   'tweet_id': row['tweet_id'],
                                                                   'time': row['time'],
                                                                   'county_source': row['county_source']})
              | bigquery_output)

    result = p.run()
    result.wait_until_finish()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.DEBUG)
    run()
Run Code Online (Sandbox Code Playgroud)

这有点复杂,因此直接在bigquery中进行此操作可能会花费太多时间。该代码读取tweets json,通过是否对其进行地理标记来拆分PCollection,如果不是,则尝试通过配置文件位置查找它,映射到与我们的GIS分析相关的位置,然后将其写入BigQuery。

Ben*_*ers 5

文件数与处理元素的分片数相对应。

减少这种情况的一种技巧是生成一些随机密钥,并在将其写出之前根据这些元素对它们进行分组。

例如,你可以使用下面的DoFnPTransform在您的管道:

class _RoundRobinKeyFn(beam.DoFn):
  def __init__(self, count):
    self.count = count

  def start_bundle(self):
    self.counter = random.randint(0, self.count - 1)

  def process(self, element):
    self.counter += 1
    if self.counter >= self.count:
      self.counter -= self.count
    yield self.counter, element

class LimitBundles(beam.PTransform):
  def __init__(self, count):
    self.count = count

  def expand(self, input):
    return input
        | beam.ParDo(_RoundRobinKeyFn(self.count))
        | beam.GroupByKey()
        | beam.FlatMap(lambda kv: kv[1])
Run Code Online (Sandbox Code Playgroud)

您只需要在bigquery_output之前使用它:

output = (# ...
         | LimitBundles(10000)
         | bigquery_output)
Run Code Online (Sandbox Code Playgroud)

(请注意,我只是在未经测试的情况下输入了此内容,因此可能存在一些Python错字。)