我在 AWS 中有一个正在运行的 debezium 集群,没有问题。我想尝试一下 AWS MSK。所以我启动了一个集群。然后我启动了一个 EC2 来运行我的连接器。
然后安装 confluent-kafka
sudo apt-get update && sudo apt-get install confluent-platform-2.12
Run Code Online (Sandbox Code Playgroud)
默认情况下,AWS MSK 没有架构注册表,所以我从连接器 EC2 架构注册表 conf 文件配置了它:
kafkastore.connection.url=z-1.bhuvi-XXXXXXXXX.amazonaws.com:2181,z-3.bhuvi-XXXXXXXXX.amazonaws.com:2181,z-2.bhuvi-XXXXXXXXX.amazonaws.com:2181
kafkastore.bootstrap.servers=PLAINTEXT://b-2.bhuvi-XXXXXXXXX.amazonaws.com:9092,PLAINTEXT://b-4.bhuvi-XXXXXXXXX.amazonaws.com:9092,PLAINTEXT://b-1.bhuvi-XXXXXXXXX.amazonaws.com:9092
Run Code Online (Sandbox Code Playgroud)
然后/etc/kafka/connect-distributed.properties
归档
bootstrap.servers=b-4.bhuvi-XXXXXXXXX.amazonaws.com:9092,b-3.bhuvi-XXXXXXXXX.amazonaws.com:9092,b-2.bhuvi-XXXXXXXXX.amazonaws.com:9092
plugin.path=/usr/share/java,/usr/share/confluent-hub-components
Run Code Online (Sandbox Code Playgroud)
confluent-hub install debezium/debezium-connector-mysql:latest
Run Code Online (Sandbox Code Playgroud)
systemctl start confluent-schema-registry
systemctl start confluent-connect-distributed
Run Code Online (Sandbox Code Playgroud)
现在一切都开始了。然后我创建了一个 mysql.json 文件。
{
"name": "mysql-connector-db01",
"config": {
"name": "mysql-connector-db01",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.server.id": "1",
"tasks.max": "3",
"database.history.kafka.bootstrap.servers": "172.31.47.152:9092,172.31.38.158:9092,172.31.46.207:9092",
"database.history.kafka.topic": "schema-changes.mysql",
"database.server.name": "mysql-db01",
"database.hostname": "172.31.84.129",
"database.port": "3306",
"database.user": "bhuvi",
"database.password": "my_stong_password",
"database.whitelist": "proddb,test",
"internal.key.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false", …
Run Code Online (Sandbox Code Playgroud) 我有一个公共存储库,并想使用 python (PyGithub 库)将文件上传到该存储库。
我从SO引用了以下代码:
import base64
from github import Github
from github import InputGitTreeElement
user = "GithubUsername"
password = "*********"
g = Github(user,password)
repo = g.get_user().get_repo('git-test')
file_list = [
'C:\\Users\jesse\Dropbox\Swell-Forecast\git-test\index.html',
'C:\\Users\jesse\Dropbox\Swell-Forecast\git-test\margin_table.html'
]
file_names = [
'index.html',
'margin_table.html'
]
commit_message = 'python update 2'
master_ref = repo.get_git_ref('heads/master')
master_sha = master_ref.object.sha
base_tree = repo.get_git_tree(master_sha)
element_list = list()
for i, entry in enumerate(file_list):
with open(entry) as input_file:
data = input_file.read()
if entry.endswith('.png'):
data = base64.b64encode(data)
element = InputGitTreeElement(file_names[i], '100644', 'blob', data) …
Run Code Online (Sandbox Code Playgroud) 我正在尝试将 CSV 文件加载到 redshift。
分隔符“|”
CSV 的第一列:
1 |Bhuvi|"This is ok"|xyz@domain.com
Run Code Online (Sandbox Code Playgroud)
我用这个命令来加载。
copy tbl from 's3://datawarehouse/source.csv'
iam_role 'arn:aws:iam:::role/xxx'cas-pulse-redshift'
delimiter '|'
removequotes
ACCEPTINVCHARS ;
Run Code Online (Sandbox Code Playgroud)
错误:
raw_field_value | This is ok" |xyz@domain.com
err_code | 1214
err_reason | Delimited value missing end quote
Run Code Online (Sandbox Code Playgroud)
然后我也尝试了这个。
copy tbl from 's3://datawarehouse/source.csv'
iam_role 'arn:aws:iam:::role/xxx'
CSV QUOTE '\"'
DELIMITER '|'
ACCEPTINVCHARS ;
Run Code Online (Sandbox Code Playgroud) 我是我的 S3 存储桶,有很多文件采用不同的文件格式。所以我想从所有具有.JSON扩展名的子文件夹复制到另一个文件夹。
当前结构:
S3://mybucket/f1/file.JPG
S3://mybucket/f1/newfile.JSON
S3://mybucket/f2/Oldfile.JSON
Run Code Online (Sandbox Code Playgroud)
它(JSON FILES)应该被复制到文件夹中:
S3://mybucket/arrange/newfile.JSON
S3://mybucket/arrange/Oldfile.JSON
Run Code Online (Sandbox Code Playgroud)
我试过这个(但没有 JSON 过滤器)来自 stackoverflow
import os
import boto3
old_bucket_name = 'SRC'
old_prefix = 'A/B/C/'
new_bucket_name = 'TGT'
new_prefix = 'L/M/N/'
s3 = boto3.resource('s3')
old_bucket = s3.Bucket(old_bucket_name )
new_bucket = s3.Bucket(new_bucket_name )
for obj in old_bucket.objects.filter(Prefix=old_prefix):
old_source = { 'Bucket': old_bucket_name,
'Key': obj.key}
# replace the prefix
new_key = obj.key.replace(old_prefix, new_prefix)
new_obj = new_bucket.Object(new_key)
new_obj.copy(old_source)
Run Code Online (Sandbox Code Playgroud) 我正在检查这个,但没有找到合适的。所以我准备了一个并在这里分享这个查询。
我正在尝试使用另一个 python 运算符调用函数内部的 python 运算符。似乎我错过了一些东西,有人可以帮我找出我错过的东西。
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
dd = datetime(2018, 1, 1)
args = {
'owner': 'airflow',
'start_date': dd,
'retries': 0
}
def postgres_to_gcs():
t1 = BashOperator(
task_id='count_lines',
bash_command='echo "task1"',
xcom_push=True,
dag=dag)
return t1
with DAG('python_dag', description='Python DAG', schedule_interval='*/15 * * * *', start_date=dd, catchup=False) as dag:
python_task = PythonOperator(task_id='python_task', python_callable=postgres_to_gcs)
python_task
Run Code Online (Sandbox Code Playgroud)
[2020-10-10 09:34:10,700] {baseoperator.py:351} WARNING …
Run Code Online (Sandbox Code Playgroud) 我正在使用以下卸载命令 -
unload ('select * from '')to 's3://**summary.csv**'
CREDENTIALS 'aws_access_key_id='';aws_secret_access_key=''' parallel off allowoverwrite CSV HEADER;
Run Code Online (Sandbox Code Playgroud)
在S3中创建的文件是summary.csv000
如果我从命令中更改并删除文件扩展名,如下所示
unload ('select * from '')to 's3://**summary**'
CREDENTIALS 'aws_access_key_id='';aws_secret_access_key=''' parallel off allowoverwrite CSV HEADER;
Run Code Online (Sandbox Code Playgroud)
在S3中创建的文件是summary000
有没有办法获取summary.csv,这样我就不必在将其导入excel之前更改文件扩展名?
谢谢。
我正在尝试实现一个功能,该功能将从 cloudwatch 获取事件并打印结果。我能够获得该事件,但我想从该 JSON 中提取一个特定的键。
import json
def lambda_handler(event, context):
print("Received event: " + json.dumps(event, indent=2))
message = event['Records'][0]['Sns']['Message']
print(message)
Run Code Online (Sandbox Code Playgroud)
"Records": [
{
"EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws:sns:us-east-1:xxxxxxxxxxxxx:bhuvi:XXXXXXXXXXXXXXXXXXXXXXXXXX",
"EventSource": "aws:sns",
"Sns": {
"SignatureVersion": "1",
"Timestamp": "2018-01-13T19:18:44.369Z",
"Signature": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"SigningCertUrl": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX.pem",
"MessageId": "4b76b0ea-5e0f-502f-81ec-e23e03dbaf01",
"Message": "{\"AlarmName\":\"test\",\"AlarmDescription\":\"test\",\"AWSAccountId\":\"xxxxxxxxxxxxx\",\"NewStateValue\":\"ALARM\",\"NewStateReason\":\"Threshold Crossed: 1 out of the last 1 datapoints [2.6260535333900545 (13/01/18 19:13:00)] was greater than or equal to the threshold (1.0) (minimum 1 datapoint for OK -> ALARM transition).\",\"StateChangeTime\":\"2018-01-13T19:18:44.312+0000\",\"Region\":\"US East (N. Virginia)\",\"OldStateValue\":\"OK\",\"Trigger\":{\"MetricName\":\"CPUUtilization\",\"Namespace\":\"AWS/RDS\",\"StatisticType\":\"Statistic\",\"Statistic\":\"AVERAGE\",\"Unit\":null,\"Dimensions\":[{\"name\":\"DBInstanceIdentifier\",\"value\":\"myrds\"}],\"Period\":300,\"EvaluationPeriods\":1,\"ComparisonOperator\":\"GreaterThanOrEqualToThreshold\",\"Threshold\":1.0,\"TreatMissingData\":\"\",\"EvaluateLowSampleCountPercentile\":\"\"}}",
"MessageAttributes":
{}
, …
Run Code Online (Sandbox Code Playgroud) 我想每隔 1 小时进行一次 Cassandra 备份并将其移动到共享位置。
Cassandra 在默认位置拍摄快照,如何在/opt/backup位置拍摄快照?
我有一个如下的测试文件。(但实际文件有1000多行和许多列)
apple,2
mango,5
coconut,10
Run Code Online (Sandbox Code Playgroud)
我要按以下方式打印此文件。
I have apple and the count is 2
I have mango and the count is 5
I have coconut and the count is 10
Run Code Online (Sandbox Code Playgroud)
我尝试while read line
使用awk -F ',' '{print $1}'
,但即时通讯未获得实际输出。有人可以帮我吗?
我从BQ收到了一个奇怪的错误(也许是我第一次收到此错误)。
No matching signature for operator = for argument types: INT64, STRING. Supported signatures: ANY = ANY at [27:1]
Run Code Online (Sandbox Code Playgroud)
查询:
No matching signature for operator = for argument types: INT64, STRING. Supported signatures: ANY = ANY at [27:1]
Run Code Online (Sandbox Code Playgroud)
错误行JOIN t2.id = t3.id
t2.id显示此错误。
它是整数列。