标签: apache-beam

Apache Beam相对于Spark/Flink进行批处理有什么好处?

Apache Beam支持多个运行后端,包括Apache Spark和Flink.我熟悉Spark/Flink,我试图看到Beam的批处理优缺点.

看一下Beam字数统计示例,它感觉它与原生的Spark/Flink等价物非常相似,可能有一个稍微冗长的语法.

我目前没有看到为这样的任务选择Beam over Spark/Flink的一大好处.到目前为止我能做的唯一观察:

  • Pro:不同执行后端的抽象.
  • Con:这种抽象的代价是对Spark/Flink中执行的内容的控制较少.

是否有更好的例子突出了梁模型的其他优点/缺点?是否有关于失控如何影响性能的信息?

请注意,我并不是要求在流方面存在差异,这些问题本文中已部分涵盖并在本文中进行了总结(由于Spark 1.X而过时).

apache-spark apache-flink apache-beam

63
推荐指数
2
解决办法
2万
查看次数

什么是Apache Beam?

我正在浏览Apache帖子,发现了一个名为Beam的新术语.任何人都可以解释Apache Beam究竟是什么?我试图谷歌,但无法得到一个明确的答案.

apache-beam

50
推荐指数
2
解决办法
2万
查看次数

解释Apache Beam python语法

我已阅读了Beam文档,并查看了Python文档,但未找到大多数示例Apache Beam代码中使用的语法的良好解释.

谁能解释什么_,|以及>>在下面的代码在做什么?引号中的文字即"ReadTrainingData"是否有意义,还是可以与任何其他标签交换?换句话说,该标签是如何使用的?

train_data = pipeline | 'ReadTrainingData' >> _ReadData(training_data)
evaluate_data = pipeline | 'ReadEvalData' >> _ReadData(eval_data)

input_metadata = dataset_metadata.DatasetMetadata(schema=input_schema)

_ = (input_metadata
| 'WriteInputMetadata' >> tft_beam_io.WriteMetadata(
       os.path.join(output_dir, path_constants.RAW_METADATA_DIR),
       pipeline=pipeline))

preprocessing_fn = reddit.make_preprocessing_fn(frequency_threshold)
(train_dataset, train_metadata), transform_fn = (
  (train_data, input_metadata)
  | 'AnalyzeAndTransform' >> tft.AnalyzeAndTransformDataset(
      preprocessing_fn))
Run Code Online (Sandbox Code Playgroud)

python apache-beam

31
推荐指数
1
解决办法
2972
查看次数

Apache Beam:FlatMap vs Map?

我想了解在哪种情况下我应该使用FlatMap或Map.我的文档似乎并不清楚.

我仍然不明白在哪种情况下我应该使用FlatMap或Map的转换.

有人能给我一个例子,这样我才能理解他们的不同之处吗?

我理解Spark中FlatMap与Map的区别,但不确定是否有相似之处?

google-cloud-dataflow apache-beam

25
推荐指数
2
解决办法
6289
查看次数

Apache Airflow或Apache Beam用于数据处理和作业调度

我正在尝试提供有用的信息,但我远非数据工程师.

我目前正在使用python库pandas对我的数据执行一系列转换,这些数据有很多输入(目前是CSV和excel文件).输出是几个excel文件.我希望能够通过并行计算执行计划的受监视批处理作业(我的意思是不像我正在做的那样使用pandas),每月一次.

我真的不知道Beam或Airflow,我很快就通读了文档,似乎两者都可以实现.我应该使用哪一个?

pandas airflow apache-beam

23
推荐指数
3
解决办法
9560
查看次数

谷歌数据流作业成本优化

我已经为 100 GB 大小的 522 个 gzip 文件运行了以下代码,解压缩后,它将是大约 320 GB 数据和 protobuf 格式的数据,并将输出写入 GCS。我已经使用 n1 标准机器和区域进行输入,输出都得到了照顾,工作花费了我大约 17 美元,这是半小时的数据,所以我真的很需要在这里做一些成本优化。

我从下面的查询中得到的成本

SELECT l.value AS JobID,  ROUND(SUM(cost),3) AS JobCost 
FROM `PROJECT.gcp_billing_data.gcp_billing_export_v1_{}` bill, 
UNNEST(bill.labels) l
WHERE service.description = 'Cloud Dataflow' and l.key = 'goog-dataflow-job-id' and 
extract(date from _PARTITIONTIME) > "2020-12-31"
GROUP BY 1
Run Code Online (Sandbox Code Playgroud)

完整代码

  import time
  import sys
  import argparse
  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.pipeline_options import SetupOptions
  import csv
  import base64
  from google.protobuf import timestamp_pb2
  from google.protobuf.json_format import MessageToDict …
Run Code Online (Sandbox Code Playgroud)

python protocol-buffers avro google-cloud-dataflow apache-beam

22
推荐指数
1
解决办法
574
查看次数

你如何表达Apache Beam中长时间延伸的非规范化连接

对于上下文,我从未使用过Beam.我试图了解如何将Beam模型应用于常见用例.

考虑到你有一个无限的Producer集合和一个无限的产品集合,这样每个产品都有一个Producer(一对多,Producer到Product).并且您拥有产品生产者在其产品之前(或之后不久)出现的附加属性.但制作人可能会在其产品出现之前几年出现.

如果你想生产一个无限制的产品系列,他们的生产商与他们联系起来,那么表达这种产品的恰当方式是什么?拥有延长多年的窗口连接似乎打败了窗口的点.但是,将生产者作为侧面输入并不能解决生产者在产品何时出现时可能非常接近的问题.

有没有合适的方法来混合这两个概念?

apache-beam

20
推荐指数
1
解决办法
418
查看次数

使用Google pub/sub更新单例HashMap

我有一个用例,我初始化一个包含一组查找数据的HashMap(有关物联网设备的物理位置等的信息).该查找数据用作第二数据集的参考数据,该第二数据集是PCollection.此PCollection是一个数据流,提供IoT设备记录的数据.来自物联网设备的数据流使用Apache Beam管道,该管道作为Google Dataflow使用Google Cloud pub/sub运行.

当我处理PCollection(设备数据)时,我将Google Cloud发布/订阅数据链接到HashMap中的相关查找条目.

我需要更新HashMap,基于第二个将更改推送到其数据的pub/sub.这是我到目前为止获得PCollection并使用HashMap进行查找的方法:

HashMap - >包含预加载的查找数据(有关IoT设备的信息)

PCollection - >包含来自管道数据流的数据(物联网设备记录的数据)

我正在为IoT设备查找数据生成一个HashMap作为单例:

public class MyData {

    private static final MyData instance = new MyData ();

    private MyData () {     
            HashMap myDataMap = new HashMap<String, String>();          
               ... logic to populate the map

            this.referenceData = myDataMap;

    }

    public HashMap<Integer, DeviceReference> referenceData;

    public static DeviceData getInstance(){
        return instance;
    }
}
Run Code Online (Sandbox Code Playgroud)

然后我在不同的类中使用HashMap,我订阅了数据的更新(这些是例如给我新的数据的消息,这些数据与已经存储在HashMap中的实体有关).我正在使用带有Apache beam的Google pub/sub订阅更改:

HashMap<String, String> referenceData = MyData.getInstance().referenceData;

Pipeline pipeLine = Pipeline.create(options);           

// subscribe to changes in data

org.apache.beam.sdk.values.PCollection myDataUpdates; …
Run Code Online (Sandbox Code Playgroud)

java publish-subscribe bigdata google-cloud-platform apache-beam

16
推荐指数
1
解决办法
581
查看次数

窗口化后,Google数据流流媒体管道不会将工作负载分配给多个工作人员

我正在尝试在python中设置数据流流管道.我对批处理管道有很多经验.我们的基本架构如下所示: 在此输入图像描述

第一步是进行一些基本处理,每条消息大约需要2秒才能进入窗口.我们使用3秒和3秒间隔的滑动窗口(可能会稍后更改,因此我们有重叠的窗口).作为最后一步,我们有SOG预测需要大约15秒的时间来处理,这显然是我们的瓶颈变换.

因此,我们似乎面临的问题是,在窗口化之前工作负载完全分布在我们的工作者上,但最重要的转换根本不是分布式的.所有的窗户一次一个地处理,看似1个工人,而我们有50个可用.

日志告诉我们,sog预测步骤每15秒输出一次,如果窗口将被更多的工作人员处理,则不应该是这种情况,因此这会在一段时间内产生巨大的延迟,这是我们不想要的.使用1分钟的消息,最后一个窗口的延迟为5分钟.当分配工作时,这应该只有大约15秒(SOG预测时间).所以在这一点上我们是无能为力的..

在此输入图像描述

有没有人看到我们的代码是否有问题或如何防止/规避这个?似乎这是谷歌云数据流内部发生的事情.这是否也出现在Java流媒体管道中?

在批处理模式下,一切正常.在那里,人们可以尝试进行重新洗牌以确保不会发生融合等.但是在流媒体窗口化之后,这是不可能的.

args = parse_arguments(sys.argv if argv is None else argv)
pipeline_options = get_pipeline_options(project=args.project_id,
                                        job_name='XX',
                                        num_workers=args.workers,
                                        max_num_workers=MAX_NUM_WORKERS,
                                        disk_size_gb=DISK_SIZE_GB,
                                        local=args.local,
                                        streaming=args.streaming)

pipeline = beam.Pipeline(options=pipeline_options)

# Build pipeline
# pylint: disable=C0330
if args.streaming:
    frames = (pipeline | 'ReadFromPubsub' >> beam.io.ReadFromPubSub(
        subscription=SUBSCRIPTION_PATH,
        with_attributes=True,
        timestamp_attribute='timestamp'
    ))

    frame_tpl = frames | 'CreateFrameTuples' >> beam.Map(
        create_frame_tuples_fn)

crops = frame_tpl | 'MakeCrops' >> beam.Map(make_crops_fn, NR_CROPS)
bboxs = crops | 'bounding boxes tfserv' >> beam.Map(
    pred_bbox_tfserv_fn, SERVER_URL)

sliding_windows = bboxs | 'Window' >> beam.WindowInto( …
Run Code Online (Sandbox Code Playgroud)

google-cloud-dataflow apache-beam

14
推荐指数
1
解决办法
363
查看次数

使用TensorFlow变换有效地将标记转换为单词向量

我想在训练,验证和推理阶段使用TensorFlow Transform将标记转换为单词向量.

我按照这个StackOverflow帖子实现了从标记到向量的初始转换.转换按预期工作,我获取EMB_DIM每个令牌的向量.

import numpy as np
import tensorflow as tf

tf.reset_default_graph()
EMB_DIM = 10

def load_pretrained_glove():
    tokens = ["a", "cat", "plays", "piano"]
    return tokens, np.random.rand(len(tokens), EMB_DIM)

# sample string 
string_tensor = tf.constant(["plays", "piano", "unknown_token", "another_unknown_token"])


pretrained_vocab, pretrained_embs = load_pretrained_glove()

vocab_lookup = tf.contrib.lookup.index_table_from_tensor(
    mapping = tf.constant(pretrained_vocab),
    default_value = len(pretrained_vocab))
string_tensor = vocab_lookup.lookup(string_tensor)

# define the word embedding
pretrained_embs = tf.get_variable(
    name="embs_pretrained",
    initializer=tf.constant_initializer(np.asarray(pretrained_embs), dtype=tf.float32),
    shape=pretrained_embs.shape,
    trainable=False)

unk_embedding = tf.get_variable(
    name="unk_embedding",
    shape=[1, EMB_DIM],
    initializer=tf.random_uniform_initializer(-0.04, 0.04),
    trainable=False)

embeddings = …
Run Code Online (Sandbox Code Playgroud)

word2vec tensorflow apache-beam tensorflow-transform glove

13
推荐指数
1
解决办法
569
查看次数