小编gho*_*ost的帖子

如何以编程方式检查 Kafka Broker 是否已启动并在 Python 中运行

我正在尝试使用来自 Kafka 主题的消息。我在confluent_kafka消费者周围使用包装器。在开始使用消息之前,我需要检查是否建立了连接。

我读到消费者很懒惰,所以我需要执行一些操作才能建立连接。但是我想在不执行consumeorpoll操作的情况下检查连接建立。

此外,我尝试给出一些错误的配置,以查看民意调查的反应是什么。我得到的回应是:

b'Broker: No more messages'
Run Code Online (Sandbox Code Playgroud)

那么,如何判断是连接参数错误、连接中断还是主题中实际上没有消息?

python apache-kafka kafka-consumer-api kafka-python

5
推荐指数
1
解决办法
4521
查看次数

SnowFlake在group by、partition on、distinct上的性能

我在 Snowflake 有一张桌子。表中的一列称为obj_key(对象键)。表大小非常大(以 TB 为单位),因此性能要求很高。

现在,每次完成对象更新时都会向表中添加一个新条目。新插入的行在列中具有相同obj_key但不同的条目time_modified。假设我想obj_key在某些条件下获取与表不同的数据。

我有以下三种方法:

方法一:

SELECT obj_key 
FROM my_table
WHERE some_condition
GROUP BY obj_key;
Run Code Online (Sandbox Code Playgroud)

方法二:

SELECT distinct(obj_key) 
FROM my_table
WHERE some_condition;
Run Code Online (Sandbox Code Playgroud)

方法三:

SELECT obj_key
FROM my_table
WHERE some_condition
QUALIFY ROW_NUMBER() OVER (PARTITION BY obj_key ORDER BY obj_key) = 1;
Run Code Online (Sandbox Code Playgroud)

所以基本上我的问题可以归结为这些:

我读过,distinct在多个列上是由 执行的group_by(col1, col2, ..., col n)。那么两者的性能有何不同(如果有的话)?

既然PARTITION BY还需要一个ORDER BY,那么它不会大大降低性能吗?

如果有人能够提供这些查询如何在 SnowFlake 上运行的细节,我会很高兴。

database group-by distinct partition-by snowflake-cloud-data-platform

5
推荐指数
1
解决办法
6284
查看次数

多处理池中的 Boto3 客户端失败,并显示“botocore.exceptions.NoCredentialsError:无法找到凭据”

我正在使用 boto3 连接到 s3,下载对象并进行一些处理。我正在使用多处理池来执行上述操作。

以下是我正在使用的代码的概要:

session = None

def set_global_session():
    global session
    if not session:
        session = boto3.Session(region_name='us-east-1')

def function_to_be_sent_to_mp_pool():
    s3 = session.client('s3', region_name='us-east-1')
    list_of_b_n_o = list_of_buckets_and_objects
    for bucket, object in list_of_b_n_o:
        content = s3.get_object(Bucket=bucket, Key=key)
        data = json.loads(content['Body'].read().decode('utf-8'))
        write_processed_data_to_a_location()

def main():
    pool = mp.Pool(initializer=set_global_session, processes=40)
    pool.starmap(function_to_be_sent_to_mp_pool, list_of_b_n_o_i)
Run Code Online (Sandbox Code Playgroud)

现在,当 时processes=40,一切都运转良好。什么时候processes = 64,还是不错的。

但是,当我增加到 时processes=128,出现以下错误:

botocore.exceptions.NoCredentialsError: Unable to locate credentials
Run Code Online (Sandbox Code Playgroud)

我们的机器具有访问 S3 所需的 IAM 角色。此外,发生的奇怪的事情是,对于某些进程,它工作正常,而对于其他一些进程,它会抛出凭据错误。为什么会发生这种情况,以及如何解决这个问题?

发生的另一个奇怪的事情是我能够在 2 个单独的终端选项卡中触发两个作业(每个选项卡都有一个单独的 ssh 登录 shell 到机器)。每个作业都会生成 64 个进程,而且效果也很好,这意味着有 128 …

multiprocessing python-3.x boto3

5
推荐指数
1
解决办法
2658
查看次数

AttributeError: 'list' 对象没有属性 'toarray'

class aSDAE_module():        
    def get_middle_layer(self,aSDAE,train_user):
        middle=self.model.predict({'user_rating':aSDAE,'user_sideinformation':train_user},batch_size=self.batch_size)[2]
        return middle
   
alpha = asdae_module.get_middle_layer(R.toarray(),aSDAE.toarray()) 
Run Code Online (Sandbox Code Playgroud)

这是我卡住的一段代码,我不知道如何解决以下错误:

    Traceback (most recent call last):
    File "./run.py", line 142, in <module>
    train_user=train_user, train_item=train_item, valid_user=valid_user, test_user=test_user, R=R)
    File "/home/hira/Desktop/PHD/PHDMF-master/asdae_models.py", line 52, in PHDMF
    alpha = asdae_module.get_middle_layer(R.toarray(),aSDAE.toarray())  
    AttributeError: 'list' object has no attribute 'toarray'
Run Code Online (Sandbox Code Playgroud)

python python-3.x

0
推荐指数
1
解决办法
5362
查看次数