我正在尝试实现具有以下功能的多线程管道数据流框架:
流水线可以被描述为一个无环的有向图。每个节点执行一些处理并具有任意数量的任意类型的输入和一个任意类型的输出。
对于每个给定的输入数据实例,每个节点不应执行多次,之后应缓存结果。尽管此缓存不应该在内存中持续所需的时间更长,并且应该在任何其他节点不再需要时将其删除。
每个节点都应该支持惰性求值,即应该只在其他节点需要它的输出时才执行。
是否可以通过使用 C++11 多线程特性来实现这一点,尤其是std::future,std::promise和std::async? 任何人都可以提供线索吗?
我是 Pipeline 世界和 Google API DataFlow 的新手。
我想使用 sqlQuery 从 BigQuery 读取数据。当我读取所有数据库时,它工作正常。
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<TableRow> qData = p.apply(
BigQueryIO.Read
.named("Read")
.from("test:DataSetTest.data"));
Run Code Online (Sandbox Code Playgroud)
但是当我使用 fromQuery 时出现错误。
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<TableRow> qData = p.apply(
BigQueryIO.Read
.named("Read")
.fromQuery("SELECT * FROM DataSetTest.data"));
Run Code Online (Sandbox Code Playgroud)
错误:
线程“main”中出现异常 java.lang.IllegalArgumentException:查询“SELECT * FROM DataSetTest.data”的验证失败。如果查询依赖于管道的早期阶段,则可以使用#withoutValidation 禁用此验证。
在 com.google.cloud.dataflow.sdk.io.BigQueryIO$Read$Bound.dryRunQuery(BigQueryIO.java:449)
在 com.google.cloud.dataflow.sdk.io.BigQueryIO$Read$Bound.validate(BigQueryIO.java:432)
在com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:357)
在com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
在 com.google.cloud.dataflow.sdk.values.PBegin.apply(PBegin.java:47)
在com.google.cloud.dataflow.sdk.Pipeline.apply(Pipeline.java:151)
在 Test.java.packageid.StarterPipeline.main(StarterPipeline.java:72)
引起原因:java.lang.NullPointerException:必须指定必需参数projectId。
在 com.google.api.client.repackaged.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:229)
在 com.google.api.client.util.Preconditions.checkNotNull(Preconditions.java:140)
在 com.google.api.services.bigquery.Bigquery$Jobs$Query。(Bigquery.java:1751)
在 com.google.api.services.bigquery.Bigquery$Jobs.query(Bigquery.java:1724)
在 com.google.cloud.dataflow.sdk.io.BigQueryIO$Read$Bound.dryRunQuery(BigQueryIO.java:445)
... 6 更多 …
java google-app-engine dataflow google-bigquery google-cloud-dataflow
从:
https://github.com/spotify/scio/wiki/Scio-data-guideline
“比 groupByKey 更喜欢组合/聚合/减少转换。请记住,减少操作必须是关联的和可交换的。”
为什么特别喜欢聚合而不是 groupByKey?
我正在使用 Python SDK 2.15.0 构建数据流管道。在此管道中,我需要在管道的多个阶段将附加数据加入到每个元素。
所有这些附加数据都是从 Google Cloud Storage 上的 avro 文件中读取的(Dataflow 和 GCS 存储桶使用的同一区域),使用 map 函数将其组织为键值元组,然后使用 pvalue.AsDict( 作为侧输入传递到 DoFn )。侧面输入数据在管道执行期间不会改变。
第一次连接(侧面输入大小 ~ 1 MB)进行得非常顺利。然而,第二次连接确实表现不佳。它的 sideinput 大小约为 50 MB。
数据流执行图清楚地显示了导致性能不佳的原因:我的 ParDo 步骤消耗的大约 90% 的时间都花在了读取侧面输入上。即使我只使用四个工作节点,从 sideinput 读取的数据量也超出了其实际大小几个数量级。
我能做些什么来防止这种情况发生吗?我是否需要以某种方式配置工作缓存大小?在我的 DoFn 的设置方法中准备附加数据而不是将其作为 sideinput 传递会更好吗?
以下是我准备侧面输入的方法:
sideinput_1 = pvalue.AsDict(p | "Read side input data 1" >> beam.io.ReadFromAvro("gs:/bucket/small_file.avro",0,False,True) \
| "Prepare sideinput 1" >> beam.Map(lambda x: (x["KEY"],x["VALUE"])))
# Preparing data for later join
sideinput_2 = pvalue.AsDict(p | "Read side input data 2" >> beam.io.ReadFromAvro("gs://bucket/bigger_file.avro",0,False,True) \ …Run Code Online (Sandbox Code Playgroud) 我正在 Google Dataflow 上使用 Apache Beam,并通过 lambda 函数调用函数情感,但收到错误消息:函数名称未定义。
output_tweets = (lines
| 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'assign window key' >> beam.WindowInto(window.FixedWindows(10))
| 'batch into n batches' >> BatchElements(min_batch_size=49, max_batch_size=50)
| 'sentiment analysis' >> beam.FlatMap(lambda x: sentiment(x))
)
Run Code Online (Sandbox Code Playgroud)
这是我的 Apache Beam 调用,在最后一行中提到了函数情绪,这给我带来了问题。
函数代码如下(我认为这不重要):
def sentiment(messages):
if not isinstance(messages, list):
messages = [messages]
instances = list(map(lambda message: json.loads(message), messages))
lservice = discovery.build('language', 'v1beta1', developerKey = APIKEY)
for instance in instances['text']:
response = lservice.documents().analyzeSentiment(
body ={
'document': {
'type': 'PLAIN_TEXT',
'content': …Run Code Online (Sandbox Code Playgroud) 我是 Apache NIFI 新手。我已在本地 Windows 计算机上安装了 Apache NIFI。现在,它要求用户名和密码登录。您知道我在哪里可以找到或设置密码吗?我在安装过程中没有设置任何用户名和密码。我非常感谢您尽快回复。请看图片附件-> Nifi登录需要用户名和密码
我正在尝试使用SSIS平面文件目标,但是无法解决使输出文件具有命名为同一事物的两列的问题。
我要求输出文件具有列标题:
first1, last1, email, shortname, email
Run Code Online (Sandbox Code Playgroud)
每当我尝试映射源数据时,都会收到错误消息,提示诸如“此列名称已存在”和“有多个数据源列,名称为”电子邮件”之类的信息。
最好的解决方法是什么?
谢谢
在Dataflow/Apache Beam程序中说,我正在尝试读取具有指数级增长数据的表.我想提高读取的性能.
BigQueryIO.Read.from("projectid:dataset.tablename")
Run Code Online (Sandbox Code Playgroud)
要么
BigQueryIO.Read.fromQuery("SELECT A, B FROM [projectid:dataset.tablename]")
Run Code Online (Sandbox Code Playgroud)
如果我只选择表中所需的列,而不是上面的整个表,那么我的读取性能会提高吗?
我知道选择几列会降低成本.但是想知道上面的读取性能.
我应该使用哪个图来描述这样的链:
Input data->preprocessing->preprocessed data->
algorithm 1->if a good result, next step, if not - do algorithm 1 again...
Run Code Online (Sandbox Code Playgroud) 典型情况:生产者快,消费者慢,需要让生产者慢下来。
无法按我预期工作的示例代码(解释如下):
// I assumed this block will behave like BlockingCollection, but it doesn't
var bb = new BufferBlock<int>(new DataflowBlockOptions {
BoundedCapacity = 3, // looks like this does nothing
});
// fast producer
int dataSource = -1;
var producer = Task.Run(() => {
while (dataSource < 10) {
var message = ++dataSource;
bb.Post(message);
Console.WriteLine($"Posted: {message}");
}
Console.WriteLine("Calling .Complete() on buffer block");
bb.Complete();
});
// slow consumer
var ab = new ActionBlock<int>(i => {
Thread.Sleep(500);
Console.WriteLine($"Received: {i}");
}, …Run Code Online (Sandbox Code Playgroud) 我正在构建一个从 MYSQL 数据库读取数据并在 BigQuery 中创建副本的变更数据捕获管道。我将在 Pub/Sub 中推送更改并使用 Dataflow 将它们传输到 Google Cloud Storage。我已经能够弄清楚如何流式传输更改,但是我需要对数据库中的几个表运行批处理。
在从 Pub/Sub 等无限源读取时,能否使用 Dataflow 运行批处理作业?我可以运行此批处理作业以将数据从 Pub/Sub 传输到 Cloud Storage,然后将此数据加载到 BigQuery 吗?我想要一个批处理作业,因为流作业成本更高。
dataflow batch-processing google-cloud-storage google-bigquery google-cloud-pubsub
我想知道应该在 GCP pubsub 中创建什么类型的订阅,以便处理来自 pubsub 主题的高频数据。我将以每秒 100 多条消息的速度在数据流中摄取数据。拉或推订阅真的很重要,以及它将如何影响速度等等。
dataflow google-cloud-platform google-cloud-pubsub apache-beam
dataflow ×12
apache-beam ×4
python ×2
apache-nifi ×1
architecture ×1
asynchronous ×1
c# ×1
c++ ×1
diagram ×1
etl ×1
flat-file ×1
java ×1
modeling ×1
pipeline ×1
scala ×1
spotify-scio ×1
ssis ×1
tpl-dataflow ×1
uml ×1