我正在尝试构建一个Dockerfile,通过pip将需求安装到它构建的容器中.我认为可能无法在谷歌的SDK /云控制台中设置某些内容.我收到此错误:
me@cloud-q2smart:~/github/jokepkg/test$ sudo pip install -r test-ggl-install/requirements.txt
Obtaining funniest from git+https://source.developers.google.com/p/cloud-q2smart/r/jokepkg#egg=funniest-0.1 (from -r test-ggl-install/requirements.txt (line 1))
Updating ./src/funniest clone
git: 'credential-gcloud.sh' is not a git command. See 'git --help'.
Username for 'https://source.developers.google.com':
Run Code Online (Sandbox Code Playgroud)
该消息看起来像是试图将'credential-gcloud.sh'传递给git,而git正在响应它不知道它应该对它意味着什么.看起来谷歌设置用于身份验证的一些内部手段已经破裂.
我真的很想从私人回购中正确安装它.我不希望开发团队因为身份验证机制被破坏而不得不将其流程更改为更麻烦的东西.
...但它很棘手......因为pip在幕后做了一些事情来调用git,看起来谷歌正试图在幕后做一些事情来告诉git如何进行身份验证.
我创建了一个项目,试图在谷歌云源中的私人仓库中安装一个简单的python包.运行它的说明如下:
( Start up google cloud shell )
$ git clone https://github.com/jnorment-q2/jokepkg.git
# pull down my sample package
$ cd jokepkg
# ( create repo in cloud )
$ git remote add google https://source.developers.google.com/p/cloud-q2smart/r/jokepkg
$ git push google master
# Change to test directory …Run Code Online (Sandbox Code Playgroud) git pip docker google-cloud-platform google-cloud-source-repos
我正在尝试实施Reshuffle转换以防止过度融合,但我不知道如何更改版本<KV<String,String>>以处理简单的 PCollections。(这里PCollection <KV<String,String>>描述了如何重新洗牌 。)
在我的管道中添加更多步骤之前,我将如何扩展官方 Avro I/O示例代码以重新洗牌?
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
PCollection<GenericRecord> records =
p.apply(AvroIO.Read.named("ReadFromAvro")
.from("gs://my_bucket/path/records-*.avro")
.withSchema(schema));
Run Code Online (Sandbox Code Playgroud) 可以通过以下方式在Data Storage上读取未存储的JSON文件:
p.apply("read logfiles", TextIO.Read.from("gs://bucket/*").withCoder(TableRowJsonCoder.of()));
Run Code Online (Sandbox Code Playgroud)
如果我只想用BigQuery编写那些带有最小过滤的日志,我可以通过使用像这样的DoFn来实现:
private static class Formatter extends DoFn<TableRow,TableRow> {
@Override
public void processElement(ProcessContext c) throws Exception {
// .clone() since input is immutable
TableRow output = c.element().clone();
// remove misleading timestamp field
output.remove("@timestamp");
// set timestamp field by using the element's timestamp
output.set("timestamp", c.timestamp().toString());
c.output(output);
}
}
}
Run Code Online (Sandbox Code Playgroud)
但是,我不知道如何以这种方式访问JSON文件中的嵌套字段.
RECORDnamed r,是否可以访问其键/值而无需进一步序列化/反序列化?Jackson库,它让使用标准更有意义Coder的TextIO.Read替代TableRowJsonCoder,从而获得一些我失去这样的演出回来的?编辑
文件是换行符的新行,看起来像这样:
{"@timestamp":"2015-x", "message":"bla", "r":{"analyzed":"blub", "query": {"where":"9999"}}}
{"@timestamp":"2015-x", "message":"blub", "r":{"analyzed":"bla", "query": {"where":"1111"}}}
Run Code Online (Sandbox Code Playgroud) 在我发现的官方最佳实践中:
按日期对表进行分区并查询相关分区;例如 WHERE _PARTITIONDATE="2017-01-01" 只扫描 2017 年 1 月 1 日的分区
我开始使用_PARTITIONDATE伪列了很多,因为我觉得它更容易编写查询这样,在对比的_PARTITIONTIME列和应用TIMESTAMP()功能到我的约会,像显示在这个例子:
WHERE _PARTITIONTIME = TIMESTAMP('2016-03-28')
我想知道我是否应该继续使用_PARTITIONDATE- 因为我找不到关于它的更多文档。此外,与此相反,_PARTITIONTIME它不会在基于网络的 BigQuery SQL 编辑器中突出显示语法。
是有利于官方的方式_PARTITIONTIME过_PARTTIONDATE?
我正在使用Apache Beam 2.6从单个Kafka主题中读取并将输出写入Google Cloud Storage(GCS).现在我想改变管道,以便它正在读取多个主题并将其写出来gs://bucket/topic/...
在阅读我TextIO在管道的最后一步中使用的单个主题时:
TextIO.write()
.to(
new DateNamedFiles(
String.format("gs://bucket/data%s/", suffix), currentMillisString))
.withWindowedWrites()
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
.withNumShards(1));
Run Code Online (Sandbox Code Playgroud)
这是一个类似的问题,我试图改编的代码.
FileIO.<EventType, Event>writeDynamic()
.by(
new SerializableFunction<Event, EventType>() {
@Override
public EventType apply(Event input) {
return EventType.TRANSFER; // should return real type here, just a dummy
}
})
.via(
Contextful.fn(
new SerializableFunction<Event, String>() {
@Override
public String apply(Event input) {
return "Dummy"; // should return the Event converted to a String
}
}),
TextIO.sink())
.to(DynamicFileDestinations.constant(new DateNamedFiles("gs://bucket/tmp%s/%s/",
currentMillisString), …Run Code Online (Sandbox Code Playgroud)