导入产品配置文件允许您按名称,sku过滤要导出的产品...我希望在管理产品页面中具有相同的功能.管理员将过滤网格中的产品,然后单击"导出"按钮以获取过滤的产品.
如何添加"导出"按钮?我需要覆盖哪些模板/块?当管理员单击按钮时,如何获取过滤后的集合?如何将过滤后的集合导出到csv文件?我可以使用数据流吗?
谢谢
我有一个TransformManyBlock<Tin, Tout>和在运行时添加消费者(ActionBlocks)通过LinkTo(...).
TransformManyBlock是正确的数据流块,消耗元素,转换它们,然后输出(输入相同的数字元素作为输入)给几个消费者(每个链接到消费者消费相同的元素,如广播)?我故意不选择BroadCastBlock,因为它似乎无法像BufferBlock一样转换元素.
我想知道在运行时如何取消消费者链接(ActionBlocks)?就我所见,LinkTo()似乎没有提供这样的功能.
我正在运行 Apache Beam 管道,从 Google Cloud Storage 读取文本文件,对这些文件执行一些解析并将解析后的数据写入 Bigquery。
为了保持简短,这里忽略解析和 google_cloud_options,我的代码如下:(apache-beam 2.5.0 with GCP add-ons and Dataflow as runner)
p = Pipeline(options=options)
lines = p | 'read from file' >>
beam.io.ReadFromText('some_gcs_bucket_path*') | \
'parse xml to dict' >> beam.ParDo(
beam.io.WriteToBigQuery(
'my_table',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
p.run()
Run Code Online (Sandbox Code Playgroud)
这运行良好并成功地将相关数据附加到我的 Bigquery 表中以获取少量输入文件。但是,当我将输入文件的数量增加到 +- 800k 时,出现错误:
“BoundedSource.split() 操作返回的 BoundedSource 对象的总大小大于允许的限制。”
我发现故障排除 apache 光束管道导入错误 [BoundedSource 对象大于允许的限制]建议使用 ReadAllFromText 而不是 ReadFromText。
但是,当我换出时,出现以下错误:
p = Pipeline(options=options)
lines = p | 'read from file' >>
beam.io.ReadFromText('some_gcs_bucket_path*') | \
'parse …Run Code Online (Sandbox Code Playgroud) 我想在 GCP 上的一个数据流作业中运行两个并行管道。我已经创建了一个管道,它工作得很好,但我想要另一个管道而不创建另一份工作。
我已经搜索了很多答案,但找不到任何代码示例:(
如果我这样运行它就不起作用:
pipe1.run();
pipe2.run();
Run Code Online (Sandbox Code Playgroud)
它给我“已经有一个活动的作业名称...如果您想提交第二个作业,请再次尝试使用--jobName”设置不同的名称
java dataflow google-cloud-platform gcloud google-cloud-dataflow
我正在运行Visual Studio 2008,SSIS教程描述于:http: //msdn.microsoft.com/en-us/library/ms167106.aspx
我完成了所有任务但是遇到了以下错误:
错误1验证错误.提取样本货币数据:提取样本货币数据:输入列"CurrencyAlternateKey"(123)具有先前未在数据流任务中使用的谱系ID 55.第1课.dtsx 0 0
错误2验证错误.提取样本货币数据SSIS.Pipeline:输入列"CurrencyAlternateKey"(123)具有以前未在数据流任务中使用的谱系ID 55.第1课.dtsx 0 0
你能说出我需要做些什么才能使这个版本没有错误吗?
Angular 2支持单向数据流,如果有人可以解释或提供一些资源的参考资料,可以通过图解详细解释单向数据流.
我只是想更多关于以下声明.当我试图理解HDFS如何写入数据节点时.我得到了关于HDFS写入的以下解释.
为什么hdfs客户端将4kb发送到数据节点而不是将整个块64MB发送到数据节点?有人可以详细解释一下吗?
为了获得更好的性能,数据节点维护用于数据传输的管道.数据节点1在其开始传输到流中的数据节点2之前不需要等待完整的块到达.实际上,对于给定块,从客户端到数据节点1的数据传输发生在4KB的较小块中.当数据节点1从客户端接收到第一个4KB块时,它将该块存储在其本地存储库中,并立即开始将其传输到流中的数据节点2.同样,当数据节点2从数据节点1接收到第一个4KB块时,它将该块存储在其本地存储库中并立即开始将其传输到数据节点3.这样,流中除最后一个之外的所有数据节点都从前一个并将其传输到流中的下一个数据节点,以通过避免每个阶段的等待时间来提高写入性能.
当您需要从数据流作业中的bigquery中的一个或多个表中读取所有数据时,我会说有两种方法.第一种方法是使用BigQueryIOwith from,它读取有问题的表,第二种方法是使用fromQuery指定读取同一个表中所有数据的查询的位置.所以我的问题是:
我没有在文档中找到任何关于此的内容,但我真的很想知道.我想可能read更快,因为您不需要运行扫描数据的查询,这意味着它更类似于您在BigQueryUI中的预览功能.如果这是真的,它也可能便宜得多,但如果它们的成本相同则有意义.
简而言之,有什么区别:
BigQueryIO.read(...).from(tableName)
Run Code Online (Sandbox Code Playgroud)
和
BigQueryIO.read(...).fromQuery("SELECT * FROM " + tableName)
Run Code Online (Sandbox Code Playgroud) 我目前的理解是NiFi处理器属性特定于该处理器。因此,向处理器添加新属性将仅在该处理器中可见,而不会传递给以后的处理器块?
这就是为什么UpdateAttribute有必要添加在流文件中遍历数据流时保留在其中的元数据的原因:
那么,允许用户在处理器中添加自定义属性(超出该处理器执行所定义和要求的属性)的价值是什么?它类似于创建可以在其他属性中使用的变量吗?
我正在使用数据流 kafka 到 bigquery 模板。启动数据流作业后,它会在队列中停留一段时间,然后失败并显示以下错误:
Error occurred in the launcher container: Template launch failed. See console logs.
Run Code Online (Sandbox Code Playgroud)
查看日志时,我看到以下堆栈跟踪:
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:192)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
at com.google.cloud.teleport.v2.templates.KafkaToBigQuery.run(KafkaToBigQuery.java:343)
at com.google.cloud.teleport.v2.templates.KafkaToBigQuery.main(KafkaToBigQuery.java:222)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata –
Run Code Online (Sandbox Code Playgroud)
在启动工作时,我提供了以下参数:
我的 kafka 主题只包含消息:你好
kafka 安装在 gcp 实例中,该实例与数据流工作者位于同一区域和子网中。
dataflow apache-kafka google-cloud-platform google-cloud-dataflow
dataflow ×10
angular ×1
apache-beam ×1
apache-kafka ×1
apache-nifi ×1
c# ×1
concurrency ×1
etl ×1
gcloud ×1
grid ×1
hadoop ×1
hdfs ×1
java ×1
magento ×1
ssis ×1
tpl-dataflow ×1