我正在尝试将数据流管道中的结构数组字段写入大查询,生成的表的架构是正确的,但字段中没有填充任何数据。
我的 DoFn 功能:
public class ProcessIpBlocks {
public static class IpBlocksToIp extends DoFn<TableRow, TableRow> {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws JSONException {
TableRow row = c.element();
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Calendar cal = Calendar.getInstance();
long startIp = 0L, endIp = 0L;
if(row.get("start_ip") != null)
startIp = Long.parseLong((String)row.get("start_ip"));
if(row.get("end_ip") != null)
endIp = Long.parseLong((String)row.get("end_ip"));
for(long i= startIp; i<=endIp; i++)
{
TableRow outputRow = new TableRow();
outputRow.set("start_ip", startIp);
outputRow.set("ip", i);
if(row.get("postal_code") …Run Code Online (Sandbox Code Playgroud) 我正在尝试通过 Go 应用程序从现有模板启动数据流作业。
到目前为止,我已经引入google.golang.org/api/dataflow/v1b3并创建了一个CreateJobFromTemplateRequest包含工作信息的文件。
现在,我如何使用 Compute Engine 中的内置服务帐号凭据执行该请求?
有没有办法检查 PCollection 是否为空?
我在 Dataflow 和 Apache Beam 的文档中没有找到任何相关内容。
这与以下 2 个线程相关: Google 云函数 - 无法读取属性“getApplicationDefault” 从云函数触发云数据流管道 - 函数超时
我创建了一个数据流模板,它将数据从 GCS 复制到 BigQuery,作为这两个示例。
作为初始化过程的一部分,我运行
npm init
npm install --save googleapis
Run Code Online (Sandbox Code Playgroud)
这是我的index.js
var {google} = require('googleapis');
exports.goWithTheDataFlow = (event, callback) => {
const file = event.data;
const context = event.context;
console.log(`Event ${context.eventId}`);
console.log(` Event Type: ${context.eventType}`);
console.log(` Bucket: ${file.bucket}`);
console.log(` File: ${file.name}`);
console.log(` Metageneration: ${file.metageneration}`);
console.log(` Created: ${file.timeCreated}`);
console.log(` Updated: ${file.updated}`);
google.auth.getApplicationDefault(function (err, authClient, projectId) {
if (err) {
throw err;
}
console.log(projectId);
const dataflow = google.dataflow({ version: 'v1b3', auth: …Run Code Online (Sandbox Code Playgroud) google-cloud-platform google-cloud-dataflow google-cloud-functions
我有一个简单的任务。我有一堆文件( ~100GB in total ),每一行代表一个实体。我必须将此实体发送到 JanusGraph 服务器。
2018-07-07_05_10_46-8497016571919684639 <- job id
Run Code Online (Sandbox Code Playgroud)
一段时间后,我遇到 OOM,日志显示 Java 被杀死。
从数据流视图中,我可以看到以下日志:
Workflow failed. Causes: S01:TextIO.Read/Read+ParDo(Anonymous)+ParDo(JanusVertexConsumer) failed., A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on:
从堆栈驱动程序视图中,我可以看到: https: //www.dropbox.com/s/zvny7qwhl7hbwyw/Screenshot%202018-07-08%2010.05.33.png ?dl=0
日志显示:
E Out of memory: Kill process 1180 (java) score 1100 or sacrifice child
E Killed process 1180 (java) total-vm:4838044kB, anon-rss:383132kB, file-rss:0kB
更多信息请参见:https ://pastebin.com/raw/MftBwUxs
我怎样才能调试发生了什么?
java google-cloud-dataflow tinkerpop3 apache-beam janusgraph
我提交了一个 GCP 数据流管道,以从 GCP Pub/Sub 接收数据、解析并存储到 GCP 数据存储区。看起来工作很完美。
经过 21 天,我发现成本为 144.54 美元,工作时间为 2,094.72 小时。这意味着我提交后,即使没有从 Pub/Sub 接收(处理)任何数据,也会每秒收费。
这种行为正常吗?或者我设置了错误的参数?我以为CPU使用时间只有在接收到数据时才计算。
有什么方法可以降低相同工作模型的成本(从 Pub/Sub 接收并存储到数据存储)?
我有一个程序,它在 pubSub 中创建一个主题,并向该主题发布消息。我还有一个自动数据流作业(使用模板),它将这些消息保存到我的 BigQuery 表中。现在我打算用 python 管道替换基于模板的作业,其中我的要求是从 PubSub 读取数据,应用转换并将数据保存到 BigQuery/发布到另一个 PubSub 主题。我开始用 python 编写脚本,并进行了大量的试验和错误来实现它,但令我沮丧的是,我无法实现它。代码如下所示:
import apache_beam as beam
from apache_beam.io import WriteToText
TOPIC_PATH = "projects/test-pipeline-253103/topics/test-pipeline-topic"
OUTPUT_PATH = "projects/test-pipeline-253103/topics/topic-repub"
def run():
o = beam.options.pipeline_options.PipelineOptions()
p = beam.Pipeline(options=o)
print("I reached here")
# # Read from PubSub into a PCollection.
data = (
p
| "Read From Pub/Sub" >> beam.io.ReadFromPubSub(topic=TOPIC_PATH)
)
data | beam.io.WriteToPubSub(topic=OUTPUT_PATH)
print("Lines: ", data)
run()
Run Code Online (Sandbox Code Playgroud)
如果我能尽早获得一些帮助,我将非常感激。注意:我在谷歌云上设置了我的项目,并且我的脚本在本地运行。
google-bigquery google-cloud-platform google-cloud-pubsub google-cloud-dataflow apache-beam
我正在 Google Dataflow 上使用 Apache Beam,并通过 lambda 函数调用函数情感,但收到错误消息:函数名称未定义。
output_tweets = (lines
| 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'assign window key' >> beam.WindowInto(window.FixedWindows(10))
| 'batch into n batches' >> BatchElements(min_batch_size=49, max_batch_size=50)
| 'sentiment analysis' >> beam.FlatMap(lambda x: sentiment(x))
)
Run Code Online (Sandbox Code Playgroud)
这是我的 Apache Beam 调用,在最后一行中提到了函数情绪,这给我带来了问题。
函数代码如下(我认为这不重要):
def sentiment(messages):
if not isinstance(messages, list):
messages = [messages]
instances = list(map(lambda message: json.loads(message), messages))
lservice = discovery.build('language', 'v1beta1', developerKey = APIKEY)
for instance in instances['text']:
response = lservice.documents().analyzeSentiment(
body ={
'document': {
'type': 'PLAIN_TEXT',
'content': …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用该apache_beam.io.fileio模块来读取文件lines.txt并将其合并到我的管道中。
lines.txt有以下内容:
line1
line2
line3
Run Code Online (Sandbox Code Playgroud)
当我运行以下管道代码时:
with beam.Pipeline(options=pipeline_options) as p:
lines = (
p
| beam.io.fileio.MatchFiles(file_pattern="lines.txt")
| beam.io.fileio.ReadMatches()
)
# print file contents to screen
lines | 'print to screen' >> beam.Map(print)
Run Code Online (Sandbox Code Playgroud)
我得到以下输出:
<apache_beam.io.fileio.ReadableFile object at 0x000001A8C6C55F08>
Run Code Online (Sandbox Code Playgroud)
我期望
line1
line2
line3
Run Code Online (Sandbox Code Playgroud)
我怎样才能达到我预期的结果?
python google-cloud-platform google-cloud-dataflow apache-beam
我正在尝试在 Google Big Query 上创建数据流 SQL,但收到此错误
Unsupported type for column centroid.centroid: GEOGRAPHY
Run Code Online (Sandbox Code Playgroud)
我找不到任何证据表明 Dataflow SQL 实际上不支持地理数据,并且文档中根本没有提及地理数据。是这样吗,为什么会这样,有什么解决办法吗?