在Deployment Manager Jinja模板中,我尝试创建日志接收器:
- name: {{ ALOGSINK }}
type: gcp-types/logging-v2:projects.sinks
properties:
sink: {{ ALOGSINK }}
parent: projects/{{ PROJECT }}
uniqueWriterIdentity: true
outputVersionFormat: V2
destination: storage.googleapis.com/{{ LOGGINGBUCKET }}
filter: >-
resource.type="deployment" AND
resource.labels.name="{{ DEPLOYMENT }}"
Run Code Online (Sandbox Code Playgroud)
我希望将它们配置为在写入目标 GCS存储桶时使用“ 唯一作者身份 ” 。
这意味着将为每个日志接收器自动创建一个特定的服务帐户。
并且必须授予此服务帐户权限以写入指定的(并且已经存在)存储桶。
因此,在模板的授予权限的部分中,我可以使用来引用服务帐户标识(电子邮件地址)$(ref.logsink>.writerIdentity)
。
现在,对于有趣的部分-向存储桶的ACL添加绑定的唯一可靠方法是使用对象的insert
方法BucketAccessControls
:
- name: {{ LOGGINGBUCKET }}-{{ ALOGSINK }}-acl
action: gcp-types/storage-v1:storage.BucketAccessControls.insert
properties:
bucket: $(ref.bucket-name)
entity: user-$(ref.{{ ALOGSINK }}.writerIdentity}
role: WRITER
Run Code Online (Sandbox Code Playgroud)
问题是的writerIdentity
形式为serviceAccount:<email>
,但方法所entity
期望insert
的形式为user-<email> …
通过从HDFS读取.csv文件在Hive中创建表时遇到问题.查询如下:
CREATE EXTERNAL TABLE testmail (memberId String , email String, sentdate String,actiontype String, actiondate String, campaignid String,campaignname String)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/user/hadoop/cloudera/ameeth/ca_email.csv';
Run Code Online (Sandbox Code Playgroud)
得到错误.元数据错误:
MetaException(消息:hdfs:// PC:8020/user/hadoop/cloudera/ameeth/ca_email.csv不是目录或无法创建目录)
任何人都可以帮助我.实际上我想在.sql文件中运行这样的staments作为工作
当我使用像BigQueryHook这样的运算符/挂钩时,我看到一条消息,表明这些运算符已被弃用,并使用airflow.gcp ...运算符版本。但是,当我尝试在dag中使用它时,它失败并说没有名为airflow.gcp的模块。我拥有带beta功能的最新气流作曲家版本python3。是否可以通过某种方式安装这些运算符?
我正在尝试使用梁2.15在python 3中运行数据流作业。我已经尝试过virtualenv运算符,但这不起作用,因为它只允许使用python2.7。我怎样才能做到这一点?
python-3.x google-cloud-dataflow airflow google-cloud-composer airflow-operator
带命令的气流安装失败
sudo pip3 install apache-airflow[gcp_api]
Run Code Online (Sandbox Code Playgroud)
昨天一切都很好.今天我看到以下错误:
Could not find a version that satisfies the requirement apache-beam[gcp]==2.3.0 (from google-cloud-dataflow->apache-airflow[gcp_api]) (from versions: 0.6.0, 2.0.0, 2.1.0, 2.1.1, 2.2.0)
No matching distribution found for apache-beam[gcp]==2.3.0 (from google-cloud-dataflow->apache-airflow[gcp_api])
Run Code Online (Sandbox Code Playgroud)
有人可以帮我吗?
提前致谢
google-cloud-platform google-cloud-dataflow airflow apache-beam
我想查询表/数据集中的所有列及其描述。我正在寻找类似的元数据表,如__TABLES_SUMMARY__
和__TABLES__
。
目标是在 Data Studio 中为 BigQuery 表构建数据字典报告。
查询所需的内存"select id from table order by rand()"
将超出分配的内存,从而导致查询失败。如何从一个相当大的表中获得所有行的随机排列?表的大小超过 10 亿行。
我是Apache Beam的新手,我在其中尝试编写管道以从Google BigQuery提取数据,然后使用Python将数据以CSV格式写入GCS。
使用,beam.io.read(beam.io.BigQuerySource())
我能够从BigQuery读取数据,但不确定如何将其以CSV格式写入GCS。
是否有实现相同功能的自定义功能,能否请您帮我吗?
import logging
import apache_beam as beam
PROJECT='project_id'
BUCKET='project_bucket'
def run():
argv = [
'--project={0}'.format(PROJECT),
'--job_name=readwritebq',
'--save_main_session',
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--runner=DataflowRunner'
]
with beam.Pipeline(argv=argv) as p:
# Execute the SQL in big query and store the result data set into given Destination big query table.
BQ_SQL_TO_TABLE = p | 'read_bq_view' >> beam.io.Read(
beam.io.BigQuerySource(query = 'Select * from `dataset.table`', use_standard_sql=True))
# Extract data from Bigquery to GCS in CSV format.
# This is where I need …
Run Code Online (Sandbox Code Playgroud) 我有一个从pubsub获得的Object的PCollection,可以这样说:
PCollection<Student> pStudent ;
Run Code Online (Sandbox Code Playgroud)
在学生属性中,有一个属性,比如说studentID;并且我想使用此学生ID从BigQuery读取属性(class_code),并将我从BQ获取的class_code设置为PCollcetion中的Student Object
有谁知道如何实现这一目标?我知道在Beam中有一个,BigQueryIO
但是如果我要在BQ中执行的查询字符串条件来自PCollection中的学生对象(studentID),那么我该怎么办?如何从BigQuery的结果中将值设置为PCollection ?
我有一个 python 脚本,它执行 gbq 作业以将 csv 文件 f 加载到 BigQuery 中的表中。我尝试以 csv 格式上传数据并收到以下错误:
400 Invalid schema update. Cannot add fields (field: string_field_8)
Run Code Online (Sandbox Code Playgroud)
这是我的 csv:
id,first_name,username,last_name,chat_username,chat_id,forward_date,message_text
231125223|Just|koso|swissborg_bounty|-1001368946079|1517903147|tes
481895079|Emerson|EmersonEmory|swissborg_bounty|-1001368946079|1517904387|pictu
316560356|Ken Sam|ICOnomix|swissborg_bounty|-1001368946079|1517904515|Today
Run Code Online (Sandbox Code Playgroud)
这是我的代码:
from google.cloud.bigquery import Client
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '***.json'
os.environ['GOOGLE_CLOUD_DISABLE_GRPC'] = 'True'
from google.cloud import bigquery
dataset_name = 'test_temporary_dataset'
table_name='table_telega'
bigquery_client = bigquery.Client()
dataset = bigquery_client.dataset(dataset_name)
table = dataset.table(table_name)
job_config = bigquery.LoadJobConfig()
job_config.source_format = 'text/csv'
job_config.skip_leading_rows = 1
job_config.autodetect = True
job_config.fieldDelimiter='|'
job_config.allow_jagged_rows=True
job_config.ignoreUnknownValues=True
job_config.allow_quoted_newlines=True
with open('**.csv', 'rb') as source_file: …
Run Code Online (Sandbox Code Playgroud) 从像 pub/sub 这样的无限源读取数据后,我正在应用窗口化。我需要将属于一个窗口的所有记录写入一个单独的文件。我在 Java 中找到了这个,但在 python 中找不到任何东西。