小编WIT*_*WIT的帖子

没有名为airfow.gcp的模块-如何运行使用python3 / beam 2.15的数据流作业?

当我使用像BigQueryHook这样的运算符/挂钩时,我看到一条消息,表明这些运算符已被弃用,并使用airflow.gcp ...运算符版本。但是,当我尝试在dag中使用它时,它失败并说没有名为airflow.gcp的模块。我拥有带beta功能的最新气流作曲家版本python3。是否可以通过某种方式安装这些运算符?

我正在尝试使用梁2.15在python 3中运行数据流作业。我已经尝试过virtualenv运算符,但这不起作用,因为它只允许使用python2.7。我怎样才能做到这一点?

python-3.x google-cloud-dataflow airflow google-cloud-composer airflow-operator

7
推荐指数
1
解决办法
159
查看次数

Dataflow/apache beam - 传入模式时如何访问当前文件名?

在堆栈溢出之前我已经看过这个问题的答案(/sf/ask/2098853501/),但不是因为apache beam为python添加了可拆分的dofn功能.在将文件模式传递给gcs存储桶时,如何访问当前正在处理的文件的文件名?

我想将文件名传递给我的转换函数:

with beam.Pipeline(options=pipeline_options) as p:                              
    lines = p | ReadFromText('gs://url to file')                                        


    data = (                                                                    
        lines                                                                   
        | 'Jsonify' >> beam.Map(jsonify)                                        
        | 'Unnest' >> beam.FlatMap(unnest)                                      
        | 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(                  
            'project_id:dataset_id.table_name', schema=schema,                     
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,    
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)       
        )                                                   
Run Code Online (Sandbox Code Playgroud)

最后,我想要做的是在转换json的每一行时将文件名传递给我的转换函数(请参阅此内容,然后使用文件名在不同的BQ表中进行查找以获取值).我想一旦我设法知道如何获取文件名,我将能够找出侧输入部分,以便在bq表中进行查找并获得唯一值.

python google-bigquery google-cloud-platform google-cloud-dataflow apache-beam

6
推荐指数
1
解决办法
1627
查看次数

Fargate 用例问题 — 监听 Websocket(客户端)

我正在研究 Fargate 的一个用例,其中我有一些外部触发器(可能是 CloudWatch 事件),它会触发一个 Fargate 任务,该任务打开一个 WebSocket 连接,侦听几个小时并将数据写入 S3,然后最终在经过一段时间后关闭 WebSocket几个小时。

websocket 可以在一定的小时数(通用计时器)后通过某些外部触发器关闭,或者当它从套接字接收到特定消息时关闭。

这是 Fargate 的正确用例吗?外部触发器将传入 websocket 连接字符串,并且用于身份验证的秘密身份验证令牌可以使用 AWS KMS 存储。

外部触发器 -> 打开 ws:// 连接(通过 Fargate) -> 写入 S3

如果这是正确的用例,关于如何解决这个问题有什么建议吗?如果这不是正确的用例,是否还有其他服务更适合仅几个小时的持久连接?

amazon-web-services amazon-ecs websocket aws-fargate amazon-eks

4
推荐指数
1
解决办法
4360
查看次数

在 Pyspark 中有效计算加权滚动平均值,但有一些注意事项

我正在尝试计算(partition by id1, id2 ORDER BY unixTime)Pyspark窗口上的滚动加权平均值,想知道是否有人对如何执行此操作有想法。

滚动平均将采用当前行的列值、该列的前 9 个行值和该列的 9 个后续行值,并根据每个值在该行中的方式加权。因此,当前行的权重为 10 倍,滞后 1/领先 1 值的权重为 9 倍。

如果没有一个值为空,那么加权平均值的分母将为 100。 一个警告是,如果有空值,我们仍然要计算移动平均值(除非有超过 1/2 的值是空值)。

因此,例如,如果当前 val 之前的 9 个值为空,则分母将为 55。如果超过 1/2 的值为空,那么我们将为加权平均值输出 NULL。我们也可以使用我们说如果分母小于 40 或其他什么的逻辑,输出 null。

我附上了一个屏幕截图来解释我在说什么,以防它令人困惑,希望这可以解决问题: 在此处输入图片说明

我知道我可以在 sql 中执行此操作(并且我可以将数据框保存为临时视图),但是因为我必须为多列执行此滚动平均(完全相同的逻辑),理想情况下,如果我可以在 Pyspark 中执行此操作,我会能够编写一个 for 循环,然后为每一列执行此操作。另外,我很想有效地做到这一点。我已经阅读了许多关于滚动平均值的主题,但我认为这种情况略有不同。

对不起,如果我过于复杂,希望它是有道理的。如果这不容易有效地完成,我知道如何通过在窗口上列出 lag(val, 10)... lag(val, 9) over window... 等来计算它,并且可以使用那。

python apache-spark-sql pyspark

4
推荐指数
1
解决办法
319
查看次数

是否可以在 python 客户端中调用 BigQuery 过程?

BigQuery 的脚本/过程刚刚推出测试版 - 是否可以使用 BigQuery python 客户端调用过程?

我试过:

query = """CALL `myproject.dataset.procedure`()...."""
job = client.query(query, location="US",)
print(job.results())
print(job.ddl_operation_performed)

print(job._properties) but that didn't give me the result set from the procedure. Is it possible to get the results?
Run Code Online (Sandbox Code Playgroud)

谢谢你!

编辑 - 我正在调用的存储过程

CREATE OR REPLACE PROCEDURE `Project.Dataset.Table`(IN country STRING, IN accessDate DATE, IN accessId, OUT saleExists INT64)
BEGIN
  IF EXISTS (SELECT 1 FROM dataset.table where purchaseCountry = country and purchaseDate=accessDate and customerId = accessId)
  THEN
  SET saleExists = (SELECT 1);
ELSE …
Run Code Online (Sandbox Code Playgroud)

google-bigquery google-api-python-client

3
推荐指数
1
解决办法
9439
查看次数

Spark - 如何从 S3 读取具有文件名的多个 Json 文件

我在 S3 中有很多行分隔的 json 文件,想要读取 Spark 中的所有这些文件,然后读取 json 中的每一行,并为该行输出一个 Dict/Row,并将文件名作为一列。我将如何在 python 中以有效的方式做到这一点?每个 json 大约为 200 MB。

下面是一个文件示例(像这样有 200,000 行),将此文件命名为 class_scores_0219:

{"name": "Maria C", "class":"Math", "score":"80", "student_identification":22}
{"name": "Maria F", "class":"Physics", "score":"90", "student_identification":12}
{"name": "Fink", "class":"English", "score":"75", "student_identification":7}
Run Code Online (Sandbox Code Playgroud)

输出 DataFrame 为(为简单起见,仅显示一行):

+-------------------+---------+-------+-------+------------------------+
|     file_name     |  name   | class | score | student_identification |
+-------------------+---------+-------+-------+------------------------+
| class_scores_0219 | Maria C | Math  |    80 |                     22 |
+-------------------+---------+-------+-------+------------------------+
Run Code Online (Sandbox Code Playgroud)

我已经使用以下方法设置了 s3 密钥/访问密钥:(sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", SECRET_KEY) 访问密钥也是如此),但可以根据需要以不同的方式进行连接。

我愿意接受任何最有效的选项,我可以提供文件列表并将其输入,或者我可以连接到 boto3 并提供前缀。我是 Spark 新手,因此感谢所有帮助。

python apache-spark apache-spark-sql pyspark databricks

3
推荐指数
1
解决办法
9240
查看次数

将反验证码与硒集成

我正在使用 Selenium/Python 尝试填写表格,然后填写 recaptcha。我找到了 python-anticaptcha 并购买了 10 美元的积分,一切正常,验证码出现,但没有任何反应。我试图寻找几个小时的答案/咨询他们的 api 和示例,但找不到任何东西。最终,验证码应该可以工作,然后网站会生成一个表格,我正在尝试对其进行网络抓取

这就是它最终的样子, 但没有任何反应,大约一分钟后它通常会退出,这是代码

from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
from python_anticaptcha import AnticaptchaClient, NoCaptchaTaskProxylessTask
import re
import pandas as pd
import os
import time
import requests

url = "https://claimittexas.org/app/claim-search"
driver = webdriver.Safari()
driver.implicitly_wait(30)
driver.get(url)

wait = WebDriverWait(driver, 30)
result = driver.find_element_by_xpath('//*[@id="lastName"]')
driver.execute_script("arguments[0].value='Al';",result)
time.sleep(2)
result.submit()

api_key = #MYAPIKEY
site_key = '6LeQLyEUAAAAAKTwLC-xVC0wGDFIqPg1q3Ofam5M'  # …
Run Code Online (Sandbox Code Playgroud)

python selenium recaptcha web-scraping

2
推荐指数
1
解决办法
6672
查看次数

我可以(/它有意义)创建一个pandas数据帧来保存自定义类实例吗?

我正在处理跟踪本地比赛中网球运动员的数据.数据以json文件的形式提供给我,每100毫秒(每秒10次),它知道球场两侧球员的位置以及球的位置,并提供其他比赛数据.使用这个跟踪数据,我创建了一堆自定义python类和子类来创建一个"匹配"对象,我将所有跟踪数据加载到这个"匹配对象"中.我可以创建一个pandas df,保存自定义类的实例/这有意义吗?


背景故事/思考过程(如果我的问题没有意义,那很可能......)

我第一次实现这个,我基本上创建了一个"匹配"对象,我使用python OOP并将比赛分解为游戏,集合,点数,玩家等.玩家部分有点混乱,因为我的计算结果我们每100毫秒创建一个玩家对象和一个玩家的新实例(由于一个玩家在整个游戏中是相同的,但在那个确切的时刻将其视为该玩家,因此很难将其包裹起来).我不确定将这些"播放器"对象更改为大熊猫数据框中的行是否更有意义(它们是大量的,考虑3小时的匹配)或者如果我可以创建一只大熊猫df并让玩家成为一个专栏.玩家组成点,然后点组成帧,所以如果我确实将玩家对象更改为pandas df那将是困难的因为我会在数据帧中有一堆行构成一个点,然后是一堆点制作游戏..以及诸如此类的东西

因为有太多的跟踪数据,效率考虑因素对我来说很重要(虽然我更喜欢做一些有些慢但不是很大的事情,但它有助于我确保/检查所有数据)

python oop class dataframe pandas

1
推荐指数
1
解决办法
420
查看次数