当我使用像BigQueryHook这样的运算符/挂钩时,我看到一条消息,表明这些运算符已被弃用,并使用airflow.gcp ...运算符版本。但是,当我尝试在dag中使用它时,它失败并说没有名为airflow.gcp的模块。我拥有带beta功能的最新气流作曲家版本python3。是否可以通过某种方式安装这些运算符?
我正在尝试使用梁2.15在python 3中运行数据流作业。我已经尝试过virtualenv运算符,但这不起作用,因为它只允许使用python2.7。我怎样才能做到这一点?
python-3.x google-cloud-dataflow airflow google-cloud-composer airflow-operator
在堆栈溢出之前我已经看过这个问题的答案(/sf/ask/2098853501/),但不是因为apache beam为python添加了可拆分的dofn功能.在将文件模式传递给gcs存储桶时,如何访问当前正在处理的文件的文件名?
我想将文件名传递给我的转换函数:
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText('gs://url to file')
data = (
lines
| 'Jsonify' >> beam.Map(jsonify)
| 'Unnest' >> beam.FlatMap(unnest)
| 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(
'project_id:dataset_id.table_name', schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
Run Code Online (Sandbox Code Playgroud)
最后,我想要做的是在转换json的每一行时将文件名传递给我的转换函数(请参阅此内容,然后使用文件名在不同的BQ表中进行查找以获取值).我想一旦我设法知道如何获取文件名,我将能够找出侧输入部分,以便在bq表中进行查找并获得唯一值.
python google-bigquery google-cloud-platform google-cloud-dataflow apache-beam
我正在研究 Fargate 的一个用例,其中我有一些外部触发器(可能是 CloudWatch 事件),它会触发一个 Fargate 任务,该任务打开一个 WebSocket 连接,侦听几个小时并将数据写入 S3,然后最终在经过一段时间后关闭 WebSocket几个小时。
websocket 可以在一定的小时数(通用计时器)后通过某些外部触发器关闭,或者当它从套接字接收到特定消息时关闭。
这是 Fargate 的正确用例吗?外部触发器将传入 websocket 连接字符串,并且用于身份验证的秘密身份验证令牌可以使用 AWS KMS 存储。
外部触发器 -> 打开 ws:// 连接(通过 Fargate) -> 写入 S3
如果这是正确的用例,关于如何解决这个问题有什么建议吗?如果这不是正确的用例,是否还有其他服务更适合仅几个小时的持久连接?
amazon-web-services amazon-ecs websocket aws-fargate amazon-eks
我正在尝试计算(partition by id1, id2 ORDER BY unixTime)Pyspark窗口上的滚动加权平均值,想知道是否有人对如何执行此操作有想法。
滚动平均将采用当前行的列值、该列的前 9 个行值和该列的 9 个后续行值,并根据每个值在该行中的方式加权。因此,当前行的权重为 10 倍,滞后 1/领先 1 值的权重为 9 倍。
如果没有一个值为空,那么加权平均值的分母将为 100。 一个警告是,如果有空值,我们仍然要计算移动平均值(除非有超过 1/2 的值是空值)。
因此,例如,如果当前 val 之前的 9 个值为空,则分母将为 55。如果超过 1/2 的值为空,那么我们将为加权平均值输出 NULL。我们也可以使用我们说如果分母小于 40 或其他什么的逻辑,输出 null。
我附上了一个屏幕截图来解释我在说什么,以防它令人困惑,希望这可以解决问题:

我知道我可以在 sql 中执行此操作(并且我可以将数据框保存为临时视图),但是因为我必须为多列执行此滚动平均(完全相同的逻辑),理想情况下,如果我可以在 Pyspark 中执行此操作,我会能够编写一个 for 循环,然后为每一列执行此操作。另外,我很想有效地做到这一点。我已经阅读了许多关于滚动平均值的主题,但我认为这种情况略有不同。
对不起,如果我过于复杂,希望它是有道理的。如果这不容易有效地完成,我知道如何通过在窗口上列出 lag(val, 10)... lag(val, 9) over window... 等来计算它,并且可以使用那。
BigQuery 的脚本/过程刚刚推出测试版 - 是否可以使用 BigQuery python 客户端调用过程?
我试过:
query = """CALL `myproject.dataset.procedure`()...."""
job = client.query(query, location="US",)
print(job.results())
print(job.ddl_operation_performed)
print(job._properties) but that didn't give me the result set from the procedure. Is it possible to get the results?
Run Code Online (Sandbox Code Playgroud)
谢谢你!
编辑 - 我正在调用的存储过程
CREATE OR REPLACE PROCEDURE `Project.Dataset.Table`(IN country STRING, IN accessDate DATE, IN accessId, OUT saleExists INT64)
BEGIN
IF EXISTS (SELECT 1 FROM dataset.table where purchaseCountry = country and purchaseDate=accessDate and customerId = accessId)
THEN
SET saleExists = (SELECT 1);
ELSE …Run Code Online (Sandbox Code Playgroud) 我在 S3 中有很多行分隔的 json 文件,想要读取 Spark 中的所有这些文件,然后读取 json 中的每一行,并为该行输出一个 Dict/Row,并将文件名作为一列。我将如何在 python 中以有效的方式做到这一点?每个 json 大约为 200 MB。
下面是一个文件示例(像这样有 200,000 行),将此文件命名为 class_scores_0219:
{"name": "Maria C", "class":"Math", "score":"80", "student_identification":22}
{"name": "Maria F", "class":"Physics", "score":"90", "student_identification":12}
{"name": "Fink", "class":"English", "score":"75", "student_identification":7}
Run Code Online (Sandbox Code Playgroud)
输出 DataFrame 为(为简单起见,仅显示一行):
+-------------------+---------+-------+-------+------------------------+
| file_name | name | class | score | student_identification |
+-------------------+---------+-------+-------+------------------------+
| class_scores_0219 | Maria C | Math | 80 | 22 |
+-------------------+---------+-------+-------+------------------------+
Run Code Online (Sandbox Code Playgroud)
我已经使用以下方法设置了 s3 密钥/访问密钥:(sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", SECRET_KEY)
访问密钥也是如此),但可以根据需要以不同的方式进行连接。
我愿意接受任何最有效的选项,我可以提供文件列表并将其输入,或者我可以连接到 boto3 并提供前缀。我是 Spark 新手,因此感谢所有帮助。
我正在使用 Selenium/Python 尝试填写表格,然后填写 recaptcha。我找到了 python-anticaptcha 并购买了 10 美元的积分,一切正常,验证码出现,但没有任何反应。我试图寻找几个小时的答案/咨询他们的 api 和示例,但找不到任何东西。最终,验证码应该可以工作,然后网站会生成一个表格,我正在尝试对其进行网络抓取
这就是它最终的样子, 但没有任何反应,大约一分钟后它通常会退出,这是代码
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
from python_anticaptcha import AnticaptchaClient, NoCaptchaTaskProxylessTask
import re
import pandas as pd
import os
import time
import requests
url = "https://claimittexas.org/app/claim-search"
driver = webdriver.Safari()
driver.implicitly_wait(30)
driver.get(url)
wait = WebDriverWait(driver, 30)
result = driver.find_element_by_xpath('//*[@id="lastName"]')
driver.execute_script("arguments[0].value='Al';",result)
time.sleep(2)
result.submit()
api_key = #MYAPIKEY
site_key = '6LeQLyEUAAAAAKTwLC-xVC0wGDFIqPg1q3Ofam5M' # …Run Code Online (Sandbox Code Playgroud) 我正在处理跟踪本地比赛中网球运动员的数据.数据以json文件的形式提供给我,每100毫秒(每秒10次),它知道球场两侧球员的位置以及球的位置,并提供其他比赛数据.使用这个跟踪数据,我创建了一堆自定义python类和子类来创建一个"匹配"对象,我将所有跟踪数据加载到这个"匹配对象"中.我可以创建一个pandas df,保存自定义类的实例/这有意义吗?
背景故事/思考过程(如果我的问题没有意义,那很可能......)
我第一次实现这个,我基本上创建了一个"匹配"对象,我使用python OOP并将比赛分解为游戏,集合,点数,玩家等.玩家部分有点混乱,因为我的计算结果我们每100毫秒创建一个玩家对象和一个玩家的新实例(由于一个玩家在整个游戏中是相同的,但在那个确切的时刻将其视为该玩家,因此很难将其包裹起来).我不确定将这些"播放器"对象更改为大熊猫数据框中的行是否更有意义(它们是大量的,考虑3小时的匹配)或者如果我可以创建一只大熊猫df并让玩家成为一个专栏.玩家组成点,然后点组成帧,所以如果我确实将玩家对象更改为pandas df那将是困难的因为我会在数据帧中有一堆行构成一个点,然后是一堆点制作游戏..以及诸如此类的东西
因为有太多的跟踪数据,效率考虑因素对我来说很重要(虽然我更喜欢做一些有些慢但不是很大的事情,但它有助于我确保/检查所有数据)
python ×5
pyspark ×2
airflow ×1
amazon-ecs ×1
amazon-eks ×1
apache-beam ×1
apache-spark ×1
aws-fargate ×1
class ×1
databricks ×1
dataframe ×1
oop ×1
pandas ×1
python-3.x ×1
recaptcha ×1
selenium ×1
web-scraping ×1
websocket ×1