python 依赖项可以加载到 Google Cloud Dataflow 管道中吗?
我想使用gensim 的短语建模器,它逐行读取数据以自动检测常用短语/二元组(经常出现在彼此旁边的两个词)。
因此,管道的第一次运行会将每个句子传递给这个短语建模器。
然后第二次通过管道将采用相同的短语建模器并将这个短语建模器应用于每个句子,以确定应该一起建模的短语。例子:
machine和learning频繁地出现在语料库中,它们将被转换为单个单词machine_learning。 这可以在 Dataflow 中完成吗?
可以pip install gensim在工作机器上强制传递构建/需求文件吗?
我有一个将结果写入 BigQuery 表的 Apache Beam/Dataflow 管道。然后我想查询此表以获取管道的单独部分。但是,我似乎无法弄清楚如何正确设置此管道依赖项。我编写(然后想要查询)的新表与用于某些过滤逻辑的单独表相连,这就是为什么我实际上需要编写表然后运行查询。逻辑如下:
with beam.Pipeline(options=pipeline_options) as p:
table_data = p | 'CreatTable' >> # ... logic to generate table ...
# Write Table to BQ
table_written = table_data | 'WriteTempTrainDataBQ' >> beam.io.WriteToBigQuery(...)
query_results = table_written | 'QueryNewTable' >> beam.io.Read(beam.io.BigQuerySource(query=query_new_table))
Run Code Online (Sandbox Code Playgroud)
ifquery_new_table实际上是对已经存在的 BQ 表的查询,我更改为query_results = p |而不是table_written这正常工作。但是,如果我尝试查询我在管道中间写入的表,那么在实际生成该表之前,我无法让管道步骤“等待”。有没有办法做到这一点,我忽略了?
当我尝试按顺序执行此步骤时,我收到一个断言错误assert isinstance(pbegin, pvalue.PBegin) AssertionError,我正在阅读这意味着这table_written是问题,因为它不是有效的 PCollection 实例。
有谁知道我可以用什么来代替 table_written 来让它按需要按顺序运行?
我正在尝试修改此处显示的解决方案:用Python发送100,000个HTTP请求的最快方法是什么?除了不是检查标头状态而是创建一个返回字典的API请求外,我希望所有这些API请求的最终结果都是所有字典的列表。
这是我的代码-考虑到api_calls是一个列表,每个列表都为json请求打开...
from threading import Thread
from Queue import Queue
concurrent = 200
def doWork():
while True:
url = q.get()
result = makeRequest(url[0])
doSomethingWithResult(result, url)
q.task_done()
def makeRequest(ourl):
try:
api_call = urlopen(ourl).read()
result = json.loads(api_call)
return result, ourl
except:
return "error", ourl
def doSomethingWithResult(result, url):
print(url,result)
q = Queue(concurrent * 2)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in api_calls:
q.put(url)
q.join()
except KeyboardInterrupt:
sys.exit(1)
Run Code Online (Sandbox Code Playgroud)
像链接的示例一样,当前可以成功在每行上打印url和结果。我想做的是将(url,result)添加到每个线程的列表中,然后最后将它们加入一个主列表中。我不知道如何拥有此主列表并在最后加入结果。有人可以帮我在doSomethingWithResult中进行修改吗?如果我正在做一个大循环,我将只有一个空列表,并将每个API请求后的结果附加到列表中,但是由于我正在使用线程,因此我不知道如何模仿这一点。
我希望一个常见的响应是使用https://en.wikipedia.org/wiki/Asynchronous_I/O,如果这是建议,那么我将不胜感激有人实际上提供了一个示例,该示例可以完成我所拥有的代码上面链接。
我正在试验Cloud ML Engine上的分布式培训选项,并观察了一些奇特的结果.我基本上改变了人口普查自定义估算器示例,以包含一个略有不同的模型,并将我的损失函数更改为AdamOptimizer作为唯一真正的更改.基于这个其他线程,我的理解是任何分布式培训都应该是数据并行异步培训,这表明"如果在10个工作节点中分配10,000个批次,则每个节点大约可以处理1000个批次".在我的实验中,我有大约650k的训练样例,我正在运行以下实验,一个批次大小为128的1个纪元.鉴于650k训练样例和128个批量大小,我希望在一个大约5.1k步骤时代.这是我为不同--scale-tier的人所看到的表现
没有分发
分散式
STANDARD_1:14.5步/秒 - 26k步(26k*128 = ~3.3M,这比实际数据中的训练样本多),29分钟的停留时间
CUSTOM - 5个complex_model_m工作人员,2个large_model参数服务器:27步/秒,31k步(128*31k = ~3.9M,这比实际数据中的650k训练样本多),挂机时间20分钟
我的期望是基于文章的数据平行是分布式培训会将批次分成所有工人,所以如果我有5个批次的5个工人,那么每个工人将执行~1,000批次.然而,我观察到的实际行为似乎更接近于自己执行1个时代的5名工人中的每一个.在分布式设置中进行训练时,在一个时代中采取的步数是训练样例的6倍 - 我知道步骤的真正定义是每次更新渐变时,但我对数据并行训练的理解是这只会拆分批次,所以应该有相同数量的渐变更新 - 有什么理由为什么会出现这种行为?在数据并行异步训练分布式环境中需要更多的训练步骤是否有意义?任何人都可以解释我观察到的行为吗?