如何从数据流中的google存储桶中读取csv文件,组合,对数据流中的数据帧进行一些转换,然后将其转储到bigquery中?

Imr*_*ali 5 python google-cloud-platform google-cloud-dataflow apache-beam

我必须用 python 编写一个数据流作业,它将从 GCS 读取两个不同的 .csv 文件,执行联接操作,对联接数据帧的结果执行转换,然后最终将其发送到 BigQuery 表?

我对此很陌生,经过大量的研发,我知道我们可以从 apache.beam 完成所有管道操作。我终于找到了一个模板,但在给定的点上仍然有很多困惑。

import logging
import os

import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from apache_beam.pipeline import PipelineOptions


os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='auth_file.json'


class DataTransformation:
    """A helper class that translates a CSV into a format BigQuery will accept."""

     def __init__(self):
         dir_path = os.path.dirname(os.path.realpath(__file__))
         # Here we read the output schema from a json file.  This is used to specify the types
         # of data we are writing to BigQuery.
         self.schema = os.path.join(dir_path, 'resources',
                                    'gs://wahtch_dog_dataflow/schema.json')
        
     # Parse the input csv and convert into a BigQuery-savable dictionary.
     def read_all_from_url(beam.DoFn):
           with FileSystems.open(url) as f:
            return f.read()
    

def run(argv=None):
    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to read. This can be a local file or '
        'a file in a Google Storage Bucket.',
        default = 'gs://wahtch_dog_dataflow/demo.csv')

    parser.add_argument('--output',
                        dest='output',
                        required=False,
                        help='Output BQ table to write results to.',
                        default='watchdog_output.transformed')

    # Parse arguments from the command line.
    known_args, pipeline_args = parser.parse_known_args(argv)

    # DataIngestion is a class we built in this script to hold the logic for
    # transforming the file into a BigQuery table.
    data_ingestion = DataTransformation()
    url = "gs://smart-ivr-dl-pushed-data"
    # Initiate the pipeline using the pipeline arguments passed in from the
    # command line. This includes information such as the project ID and
    # where Dataflow should store temp files.
    p = beam.Pipeline(options=PipelineOptions(pipeline_args))

    (
     p | beam.Create(urls)
       |'Reading latest file' >> beam.ParDo(read_all_from_url())
    
     # This stage of the pipeline translates from a CSV file single row
     # input as a string, to a dictionary object consumable by BigQuery.
     # It refers to a function we have written. This function will
     # be run in parallel on different workers using input from the
     # previous stage of the pipeline.
     | 'String To BigQuery Row' >>
         beam.Map(lambda s: data_ingestion.parse_method(s))
     | 'Write to BigQuery' >> beam.io.Write(
         beam.io.BigQuerySink(
             # The table name is a required argument for the BigQuery sink.
             # In this case we use the value passed in from the command line.
             known_args.output,
             # Here we use the simplest way of defining a schema:
             # fieldName:fieldType

             ###### schema of the ivr
             schema=schema ,

             # Creates the table in BigQuery if it does not yet exist.
             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
             # Deletes all data in the BigQuery table before writing.
             write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
    p.run().wait_until_finish()

        
if __name__ == '__main__':
     logging.getLogger().setLevel(logging.INFO)
     DataTransformation.run()
Run Code Online (Sandbox Code Playgroud)

我有的问题:

  1. read_all_from_url:这里如何读取多个文件,这里的url是什么?是存储桶的名称,还是存储路径?

  2. 如何从存储桶读取模式(上面我从某个地方找到了它,我们可以这样读取,但我怀疑它是否可以像上面那样读取模式)

  3. 如何进行转型?就像我想执行 groupby 等。

更新:

。我能够从目录中读取两个文件。但每个文件都充当 pcollection。让我告诉你我的逻辑步骤 1)从本地目录读取两个文件。2)使用连接操作连接两个数据帧:我被困在这里3)在此连接数据帧中执行一些转换。

class ReadOrc(beam.DoFn):
      def process(self, element):
           df = pd.read_csv(element) 
           yield df
        

csv_lines = (p | beam.Create(urls) |
            'Reading latest file' >> beam.ParDo(ReadOrc()) 
          | 'transform' >> beam.ParDo(transform()))
Run Code Online (Sandbox Code Playgroud)

上面的代码从目录中读取 2 个文件,并在 p 集合中具有类似 (df1, df2) 的值

现在在转换中我想加入数据帧并执行预处理步骤。

Pab*_*blo 1

我已经删除了所有额外的设置和配置,并且我正在共享一个小型管道,它应该或多或少地满足您的需要。

但请考虑,BigQuery 应该能够自行导入单个 CSV 文件,而无需使用 Dataflow 作业。这会更有帮助吗?


如果您仍然想使用 Dataflow 导入到 BQ,此管道或多或少应该可以解决问题:

根据您的意见,我建议您尝试以下操作:

import logging
import io

import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.pipeline import PipelineOptions

p = beam.Pipeline(options=PipelineOptions(pipeline_args))

(
 p 
 | beam.Create(urls)
 | 'Finding latest file' >> fileio.MatchAll()
 | 'Get file handlers' >> fileio.ReadMatches()
 | 'Read each file handler' >> beam.FlatMap(
       lambda rf: csv.reader(io.TextIOWrapper(rf.open())))
 | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
         known_args.output,
         schema=schema ,
         create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
         write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))

p.run().wait_until_finish()
Run Code Online (Sandbox Code Playgroud)

如果您正在从 CSV 读取数据框,您可以这样做yield df.iterrows()- 这会将数据框分解为单独的行 - 然后您可以加入它们。