小编Bhu*_*esh的帖子

使用 AWS MSK NOT_ENOUGH_REPLICAS 的 Debezium

我在 AWS 中有一个正在运行的 debezium 集群,没有问题。我想尝试一下 AWS MSK。所以我启动了一个集群。然后我启动了一个 EC2 来运行我的连接器。

然后安装 confluent-kafka

sudo apt-get update && sudo apt-get install confluent-platform-2.12
Run Code Online (Sandbox Code Playgroud)

默认情况下,AWS MSK 没有架构注册表,所以我从连接器 EC2 架构注册表 conf 文件配置了它:

kafkastore.connection.url=z-1.bhuvi-XXXXXXXXX.amazonaws.com:2181,z-3.bhuvi-XXXXXXXXX.amazonaws.com:2181,z-2.bhuvi-XXXXXXXXX.amazonaws.com:2181


kafkastore.bootstrap.servers=PLAINTEXT://b-2.bhuvi-XXXXXXXXX.amazonaws.com:9092,PLAINTEXT://b-4.bhuvi-XXXXXXXXX.amazonaws.com:9092,PLAINTEXT://b-1.bhuvi-XXXXXXXXX.amazonaws.com:9092
Run Code Online (Sandbox Code Playgroud)

然后/etc/kafka/connect-distributed.properties归档

bootstrap.servers=b-4.bhuvi-XXXXXXXXX.amazonaws.com:9092,b-3.bhuvi-XXXXXXXXX.amazonaws.com:9092,b-2.bhuvi-XXXXXXXXX.amazonaws.com:9092

plugin.path=/usr/share/java,/usr/share/confluent-hub-components
Run Code Online (Sandbox Code Playgroud)

安装连接器:

confluent-hub install debezium/debezium-connector-mysql:latest
Run Code Online (Sandbox Code Playgroud)

启动服务

systemctl start confluent-schema-registry
systemctl start confluent-connect-distributed
Run Code Online (Sandbox Code Playgroud)

现在一切都开始了。然后我创建了一个 mysql.json 文件。

{
    "name": "mysql-connector-db01",
    "config": {
        "name": "mysql-connector-db01",
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.server.id": "1",
        "tasks.max": "3",
        "database.history.kafka.bootstrap.servers": "172.31.47.152:9092,172.31.38.158:9092,172.31.46.207:9092",
        "database.history.kafka.topic": "schema-changes.mysql",
        "database.server.name": "mysql-db01",
        "database.hostname": "172.31.84.129",
        "database.port": "3306",
        "database.user": "bhuvi",
        "database.password": "my_stong_password",
        "database.whitelist": "proddb,test",
        "internal.key.converter.schemas.enable": "false",
        "key.converter.schemas.enable": "false", …
Run Code Online (Sandbox Code Playgroud)

amazon-web-services apache-kafka debezium amazon-msk

7
推荐指数
1
解决办法
3592
查看次数

Python - 使用 PyGithub 将文件直接上传到 github

我有一个公共存储库,并想使用 python (PyGithub 库)将文件上传到该存储库。

我从SO引用了以下代码:

import base64
from github import Github
from github import InputGitTreeElement

user = "GithubUsername"
password = "*********"
g = Github(user,password)
repo = g.get_user().get_repo('git-test')
file_list = [
    'C:\\Users\jesse\Dropbox\Swell-Forecast\git-test\index.html',
    'C:\\Users\jesse\Dropbox\Swell-Forecast\git-test\margin_table.html'
]

file_names = [
    'index.html',
    'margin_table.html'
]
commit_message = 'python update 2'
master_ref = repo.get_git_ref('heads/master')
master_sha = master_ref.object.sha
base_tree = repo.get_git_tree(master_sha)
element_list = list()
for i, entry in enumerate(file_list):
    with open(entry) as input_file:
        data = input_file.read()
    if entry.endswith('.png'):
        data = base64.b64encode(data)
    element = InputGitTreeElement(file_names[i], '100644', 'blob', data) …
Run Code Online (Sandbox Code Playgroud)

python git github

6
推荐指数
1
解决办法
2万
查看次数

Redshift - 分隔值缺少结束引号

我正在尝试将 CSV 文件加载到 redshift。

分隔符“|”

CSV 的第一列:

1 |Bhuvi|"This is ok"|xyz@domain.com
Run Code Online (Sandbox Code Playgroud)

我用这个命令来加载。

copy tbl from 's3://datawarehouse/source.csv'   
iam_role  'arn:aws:iam:::role/xxx'cas-pulse-redshift' 
delimiter '|' 
removequotes 
ACCEPTINVCHARS ; 
Run Code Online (Sandbox Code Playgroud)

错误:

raw_field_value | This is ok" |xyz@domain.com
err_code        | 1214
err_reason      | Delimited value missing end quote
Run Code Online (Sandbox Code Playgroud)

然后我也尝试了这个。

copy tbl from 's3://datawarehouse/source.csv'   
iam_role  'arn:aws:iam:::role/xxx' 
CSV QUOTE '\"' 
DELIMITER '|'   
ACCEPTINVCHARS ; 
Run Code Online (Sandbox Code Playgroud)

csv amazon-s3 amazon-web-services amazon-redshift

5
推荐指数
1
解决办法
9457
查看次数

Boto3 - 在 S3 中递归地将文件从一个文件夹复制到另一个文件夹

我是我的 S3 存储桶,有很多文件采用不同的文件格式。所以我想从所有具有.JSON扩展名的子文件夹复制到另一个文件夹。

当前结构:

S3://mybucket/f1/file.JPG

S3://mybucket/f1/newfile.JSON

S3://mybucket/f2/Oldfile.JSON
Run Code Online (Sandbox Code Playgroud)

它(JSON FILES)应该被复制到文件夹中:

S3://mybucket/arrange/newfile.JSON
S3://mybucket/arrange/Oldfile.JSON
Run Code Online (Sandbox Code Playgroud)

我试过这个(但没有 JSON 过滤器)来自 stackoverflow

import os
import boto3
old_bucket_name = 'SRC'
old_prefix = 'A/B/C/'
new_bucket_name = 'TGT'
new_prefix = 'L/M/N/'
s3 = boto3.resource('s3')
old_bucket = s3.Bucket(old_bucket_name )
new_bucket = s3.Bucket(new_bucket_name )

for obj in old_bucket.objects.filter(Prefix=old_prefix):
    old_source = { 'Bucket': old_bucket_name,
                   'Key': obj.key}
    # replace the prefix
    new_key = obj.key.replace(old_prefix, new_prefix)
    new_obj = new_bucket.Object(new_key)
    new_obj.copy(old_source)
Run Code Online (Sandbox Code Playgroud)

python amazon-web-services boto3

5
推荐指数
1
解决办法
3488
查看次数

如何列出 AWS RedShift 中的所有存储过程

我正在检查这个,但没有找到合适的。所以我准备了一个并在这里分享这个查询。

plsql amazon-web-services amazon-redshift

4
推荐指数
1
解决办法
4107
查看次数

气流 - 在函数内调用运算符

我正在尝试使用另一个 python 运算符调用函数内部的 python 运算符。似乎我错过了一些东西,有人可以帮我找出我错过的东西。

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago


dd = datetime(2018, 1, 1)
args = {
    'owner': 'airflow',
    'start_date': dd,
    'retries': 0

}

def postgres_to_gcs():
    t1 = BashOperator(
    task_id='count_lines',
    bash_command='echo "task1"',
    xcom_push=True,
    dag=dag)
    return t1



with DAG('python_dag', description='Python DAG', schedule_interval='*/15 * * * *', start_date=dd, catchup=False) as dag:
    python_task = PythonOperator(task_id='python_task', python_callable=postgres_to_gcs)
 
    python_task

Run Code Online (Sandbox Code Playgroud)

错误:

[2020-10-10 09:34:10,700] {baseoperator.py:351} WARNING …
Run Code Online (Sandbox Code Playgroud)

python airflow

4
推荐指数
1
解决办法
2557
查看次数

带有 CSV 扩展名的 Redshift Unload 命令

我正在使用以下卸载命令 -

unload ('select * from '')to  's3://**summary.csv**'
CREDENTIALS 'aws_access_key_id='';aws_secret_access_key=''' parallel off allowoverwrite CSV HEADER;
Run Code Online (Sandbox Code Playgroud)

在S3中创建的文件是summary.csv000

如果我从命令中更改并删除文件扩展名,如下所示

unload ('select * from '')to  's3://**summary**'
CREDENTIALS 'aws_access_key_id='';aws_secret_access_key=''' parallel off allowoverwrite CSV HEADER;
Run Code Online (Sandbox Code Playgroud)

在S3中创建的文件是summary000

有没有办法获取summary.csv,这样我就不必在将其导入excel之前更改文件扩展名?

谢谢。

amazon-s3 amazon-redshift

4
推荐指数
1
解决办法
6793
查看次数

Python - AWS Lambda 从 JSON 输入中提取密钥

我正在尝试实现一个功能,该功能将从 cloudwatch 获取事件并打印结果。我能够获得该事件,但我想从该 JSON 中提取一个特定的键。

这是我的功能:

import json
    def lambda_handler(event, context):
        print("Received event: " + json.dumps(event, indent=2)) 
        message = event['Records'][0]['Sns']['Message']
        print(message)
Run Code Online (Sandbox Code Playgroud)

事件来自 Cloudwatch:

"Records": [
{
"EventVersion": "1.0", 
"EventSubscriptionArn": "arn:aws:sns:us-east-1:xxxxxxxxxxxxx:bhuvi:XXXXXXXXXXXXXXXXXXXXXXXXXX", 
"EventSource": "aws:sns", 
"Sns": {
"SignatureVersion": "1", 
"Timestamp": "2018-01-13T19:18:44.369Z", 
"Signature": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", 
"SigningCertUrl": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX.pem", 
"MessageId": "4b76b0ea-5e0f-502f-81ec-e23e03dbaf01", 
"Message": "{\"AlarmName\":\"test\",\"AlarmDescription\":\"test\",\"AWSAccountId\":\"xxxxxxxxxxxxx\",\"NewStateValue\":\"ALARM\",\"NewStateReason\":\"Threshold Crossed: 1 out of the last 1 datapoints [2.6260535333900545 (13/01/18 19:13:00)] was greater than or equal to the threshold (1.0) (minimum 1 datapoint for OK -> ALARM transition).\",\"StateChangeTime\":\"2018-01-13T19:18:44.312+0000\",\"Region\":\"US East (N. Virginia)\",\"OldStateValue\":\"OK\",\"Trigger\":{\"MetricName\":\"CPUUtilization\",\"Namespace\":\"AWS/RDS\",\"StatisticType\":\"Statistic\",\"Statistic\":\"AVERAGE\",\"Unit\":null,\"Dimensions\":[{\"name\":\"DBInstanceIdentifier\",\"value\":\"myrds\"}],\"Period\":300,\"EvaluationPeriods\":1,\"ComparisonOperator\":\"GreaterThanOrEqualToThreshold\",\"Threshold\":1.0,\"TreatMissingData\":\"\",\"EvaluateLowSampleCountPercentile\":\"\"}}", 
"MessageAttributes":
{}
, …
Run Code Online (Sandbox Code Playgroud)

python lambda json amazon-web-services

2
推荐指数
1
解决办法
9438
查看次数

cassandra - 将快照拍摄到不同的位置

我想每隔 1 小时进行一次 Cassandra 备份并将其移动到共享位置。

Cassandra 在默认位置拍摄快照,如何在/opt/backup位置拍摄快照?

linux backup cassandra

1
推荐指数
1
解决办法
2885
查看次数

Bash-从文件读取行时打印第一和第二列

我有一个如下的测试文件。(但实际文件有1000多行和许多列)

apple,2
mango,5
coconut,10
Run Code Online (Sandbox Code Playgroud)

我要按以下方式打印此文件。

I have apple and the count is 2
I have mango and the count is 5
I have coconut and the count is 10
Run Code Online (Sandbox Code Playgroud)

我尝试while read line使用awk -F ',' '{print $1}',但即时通讯未获得实际输出。有人可以帮我吗?

linux bash shell do-while

0
推荐指数
1
解决办法
1327
查看次数

BigQuery-参数类型:INT64,STRING的operator =没有匹配的签名

我从BQ收到了一个奇怪的错误(也许是我第一次收到此错误)。

No matching signature for operator = for argument types: INT64, STRING. Supported signatures: ANY = ANY at [27:1]
Run Code Online (Sandbox Code Playgroud)

查询:

No matching signature for operator = for argument types: INT64, STRING. Supported signatures: ANY = ANY at [27:1]
Run Code Online (Sandbox Code Playgroud)

错误行JOIN t2.id = t3.id t2.id显示此错误。

它是整数列。

sql google-bigquery google-cloud-platform

0
推荐指数
2
解决办法
1844
查看次数