我正在使用带有Python SDK的Google Cloud Dataflow.
我想要 :
我怎样才能获得该列表?在下面的combine变换之后,我创建了一个ListPCollectionView对象但是我无法迭代该对象:
class ToUniqueList(beam.CombineFn):
def create_accumulator(self):
return []
def add_input(self, accumulator, element):
if element not in accumulator:
accumulator.append(element)
return accumulator
def merge_accumulators(self, accumulators):
return list(set(accumulators))
def extract_output(self, accumulator):
return accumulator
def get_list_of_dates(pcoll):
return (pcoll
| 'get the list of dates' >> beam.CombineGlobally(ToUniqueList()))
Run Code Online (Sandbox Code Playgroud)
我做错了吗?最好的方法是什么?
谢谢.
Lock.acquire/release在以下声明中是否有一个隐藏的内容:
my_thread_local = threading.local()
my_thread_local.some_value = 1
Run Code Online (Sandbox Code Playgroud)
这个怎么样:
local_variable = my_thread_local.some_value
Run Code Online (Sandbox Code Playgroud) 我试图拥有我创建的 AutoValue 定义对象的 PCollection,并且我添加了适当的注释以通过DefaultSchema(AutoValueSchema.class). 就像这样:
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class MyAutoClas {
public abstract String getMyStr();
public abstract Integer getMyInt();
@CreateSchema
public static MyAutoClass create(String myStr, Integer myInt) {
return new AutoValue_MyAutoClass(myStr, myInt);
}
}
Run Code Online (Sandbox Code Playgroud)
我有一个小测试用例,如下所示:
PCollection<KV<String, MyAutoClass>> result = pipeline
.apply(Create.of(MyAutoClass.create("abc", 1)))
.apply(WithKeys.of(in -> in.getMyStr()));
PAssert.that(result).containsInAnyOrder(KV.of("abc", MyAutoClass.create("abc", 1)));
pipeline.run().waitUntilFinish();
Run Code Online (Sandbox Code Playgroud)
当我尝试运行此程序时,我看到以下错误:
[ERROR] testMyAutoValueClass(.....) Time elapsed: 1.891 s <<< ERROR!
java.lang.RuntimeException: Creator parameter arg0 Doesn't correspond to a schema field
at org.apache.beam.sdk.schemas.utils.ByteBuddyUtils$InvokeUserCreateInstruction.<init>(ByteBuddyUtils.java:717)
at org.apache.beam.sdk.schemas.utils.ByteBuddyUtils$StaticFactoryMethodInstruction.<init>(ByteBuddyUtils.java:660)
at org.apache.beam.sdk.schemas.utils.JavaBeanUtils.createStaticCreator(JavaBeanUtils.java:284)
at org.apache.beam.sdk.schemas.utils.JavaBeanUtils.lambda$getStaticCreator$4(JavaBeanUtils.java:273)
at …Run Code Online (Sandbox Code Playgroud) 我有一个C代码库,都位于同一目录中.我想找到所有具有相同名称的代码文件的头文件.
现在,我有以下命令:
ls *.h | sed s/.h/.c/
Run Code Online (Sandbox Code Playgroud)
这将返回我要搜索的文件名"列表".如何将此列表传递给另一个命令,以便我可以看到哪些头文件具有共享相同名称的代码文件?
我用来argparse解析控制台输入。对于我的输入选项之一,我想添加一个依赖于其他选项的默认选项:
parser.add_argument('--job_name')
parser.add_argument('--user_name',
default='guest')
Run Code Online (Sandbox Code Playgroud)
我希望用户能够定义 a job_name,但如果用户没有定义,我希望是job_name,user_name + datetime.date.today()(例如'guest2016-10-23')。
可以在 之内做到这一点吗argparse?也许通过给予lambda...?
由于文档仅适用于JAVA,我无法理解其含义.
它声明 - "虽然ParDo总是产生一个主输出PCollection(作为应用的返回值),你也可以让你的ParDo产生任意数量的额外输出PCollections.如果你选择有多个输出,你的ParDo将返回所有的输出PCollections(包括主输出)捆绑在一起.例如,在Java中,输出PCollections捆绑在一个类型安全的PCollectionTuple中."
我理解捆绑在一起意味着什么,但如果我在我的DoFn中产生一个标签,它是否会产生一个包含所有其他输出的空包,并在代码中遇到它们时产生其他输出?或者它等待所有产量准备好输入并将它们全部输出到一起?
文档中没有太多清晰度.虽然我认为它不会等待,只是遇到收益,但我仍然需要了解发生了什么.
我想在带有 python 的数据流模板中使用 FireStore。
我做了这样的事情:
with beam.Pipeline(options=options) as p:
(p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(sub).with_output_types(bytes)
| 'String to dictionary' >> beam.Map(firestore_update_multiple)
)
Run Code Online (Sandbox Code Playgroud)
这是使用它的适当方式吗?
额外的信息
def firestore_update_multiple(row):
from google.cloud import firestore
db = firestore.Client()
doc_ref = db.collection(u'data').document(u'one')
doc_ref.update({
u'arrayExample': u'DataflowRunner',
u'booleanExample': True
})
Run Code Online (Sandbox Code Playgroud) python google-cloud-platform google-cloud-dataflow google-cloud-firestore
现在我只能使用 ParDo 获取类中的 RunTime 值,还有其他方法可以像在我的函数中一样使用运行时参数吗?
这是我现在得到的代码:
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--firestore_document',default='')
def run(argv=None):
parser = argparse.ArgumentParser()
pipeline_options = PipelineOptions()
user_options = pipeline_options.view_as(UserOptions)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
rows = (p
| 'Create inputs' >> beam.Create([''])
| 'Call Firestore' >> beam.ParDo(
CallFirestore(user_options.firestore_document))
| 'Read DB2' >> beam.Map(ReadDB2))
Run Code Online (Sandbox Code Playgroud)
我希望 user_options.firestore_document 无需执行 ParDo 即可在其他功能中使用
python parameters google-cloud-platform google-cloud-dataflow apache-beam
我有一个用例,我读入存储在谷歌云存储中的换行 json 元素并开始处理每个 json。在处理每个 json 时,我必须调用外部 API 来执行重复数据删除,无论该 json 元素之前是否被发现。我在做一个ParDo与DoFn每个JSON。
我还没有看到任何在线教程说明如何从 apache beam DoFnDataflow调用外部 API 端点。
我正在使用JAVABeam 的 SDK。我学习的一些教程解释了使用startBundle和FinishBundle但我不清楚如何使用它
我试图从这篇文章中获得提示 - 使用$ scope.$ emit和$ scope.$ on 但是当控制器彼此无关时,似乎没有任何工作.
那是 -
<div ng-controller="CtrlA">
</div>
<div ng-controller="CtrlB">
</div>
Run Code Online (Sandbox Code Playgroud)
在CtrlB中我会做这样的事情:
$rootScope.$broadcast('userHasLoggedIn', {})
Run Code Online (Sandbox Code Playgroud)
在CtrlA中,我会这样听:
$rootScope.$on('userHasLoggedIn, function(event, data){});
Run Code Online (Sandbox Code Playgroud)
并且没有 - 除非我在CtrlA div中嵌入CtrlB div,否则CtrlA永远不会收到广播事件
任何的想法?
我有一个Java 8应用程序,我在一个线程上运行一个任务:
Thread t = new Thread(() -> { runTask();
finalizeTask(); });
t.start()
saveThread(t);
Run Code Online (Sandbox Code Playgroud)
在某些时候,从不同的线程,这个任务可能会被中断:
Thread t = getThread(); // Obtains thread t
t.interrupt();
Run Code Online (Sandbox Code Playgroud)
有了这个,我希望runTask();线路被中断,但我还是喜欢finalizeTask();运行.
如何捕获线程中断并处理它?
apache-beam ×5
python ×5
java ×3
angularjs ×1
auto-value ×1
java-8 ×1
linux ×1
ls ×1
parameters ×1
shell ×1
unix ×1