我正在尝试使用来自 Kafka 主题的消息。我在confluent_kafka消费者周围使用包装器。在开始使用消息之前,我需要检查是否建立了连接。
我读到消费者很懒惰,所以我需要执行一些操作才能建立连接。但是我想在不执行consumeorpoll操作的情况下检查连接建立。
此外,我尝试给出一些错误的配置,以查看民意调查的反应是什么。我得到的回应是:
b'Broker: No more messages'
Run Code Online (Sandbox Code Playgroud)
那么,如何判断是连接参数错误、连接中断还是主题中实际上没有消息?
我在 Snowflake 有一张桌子。表中的一列称为obj_key(对象键)。表大小非常大(以 TB 为单位),因此性能要求很高。
现在,每次完成对象更新时都会向表中添加一个新条目。新插入的行在列中具有相同obj_key但不同的条目time_modified。假设我想obj_key在某些条件下获取与表不同的数据。
我有以下三种方法:
方法一:
SELECT obj_key
FROM my_table
WHERE some_condition
GROUP BY obj_key;
Run Code Online (Sandbox Code Playgroud)
方法二:
SELECT distinct(obj_key)
FROM my_table
WHERE some_condition;
Run Code Online (Sandbox Code Playgroud)
方法三:
SELECT obj_key
FROM my_table
WHERE some_condition
QUALIFY ROW_NUMBER() OVER (PARTITION BY obj_key ORDER BY obj_key) = 1;
Run Code Online (Sandbox Code Playgroud)
所以基本上我的问题可以归结为这些:
我读过,distinct在多个列上是由 执行的group_by(col1, col2, ..., col n)。那么两者的性能有何不同(如果有的话)?
既然PARTITION BY还需要一个ORDER BY,那么它不会大大降低性能吗?
如果有人能够提供这些查询如何在 SnowFlake 上运行的细节,我会很高兴。
database group-by distinct partition-by snowflake-cloud-data-platform
我正在使用 boto3 连接到 s3,下载对象并进行一些处理。我正在使用多处理池来执行上述操作。
以下是我正在使用的代码的概要:
session = None
def set_global_session():
global session
if not session:
session = boto3.Session(region_name='us-east-1')
def function_to_be_sent_to_mp_pool():
s3 = session.client('s3', region_name='us-east-1')
list_of_b_n_o = list_of_buckets_and_objects
for bucket, object in list_of_b_n_o:
content = s3.get_object(Bucket=bucket, Key=key)
data = json.loads(content['Body'].read().decode('utf-8'))
write_processed_data_to_a_location()
def main():
pool = mp.Pool(initializer=set_global_session, processes=40)
pool.starmap(function_to_be_sent_to_mp_pool, list_of_b_n_o_i)
Run Code Online (Sandbox Code Playgroud)
现在,当 时processes=40,一切都运转良好。什么时候processes = 64,还是不错的。
但是,当我增加到 时processes=128,出现以下错误:
botocore.exceptions.NoCredentialsError: Unable to locate credentials
Run Code Online (Sandbox Code Playgroud)
我们的机器具有访问 S3 所需的 IAM 角色。此外,发生的奇怪的事情是,对于某些进程,它工作正常,而对于其他一些进程,它会抛出凭据错误。为什么会发生这种情况,以及如何解决这个问题?
发生的另一个奇怪的事情是我能够在 2 个单独的终端选项卡中触发两个作业(每个选项卡都有一个单独的 ssh 登录 shell 到机器)。每个作业都会生成 64 个进程,而且效果也很好,这意味着有 128 …
class aSDAE_module():
def get_middle_layer(self,aSDAE,train_user):
middle=self.model.predict({'user_rating':aSDAE,'user_sideinformation':train_user},batch_size=self.batch_size)[2]
return middle
alpha = asdae_module.get_middle_layer(R.toarray(),aSDAE.toarray())
Run Code Online (Sandbox Code Playgroud)
这是我卡住的一段代码,我不知道如何解决以下错误:
Traceback (most recent call last):
File "./run.py", line 142, in <module>
train_user=train_user, train_item=train_item, valid_user=valid_user, test_user=test_user, R=R)
File "/home/hira/Desktop/PHD/PHDMF-master/asdae_models.py", line 52, in PHDMF
alpha = asdae_module.get_middle_layer(R.toarray(),aSDAE.toarray())
AttributeError: 'list' object has no attribute 'toarray'
Run Code Online (Sandbox Code Playgroud) python ×2
python-3.x ×2
apache-kafka ×1
boto3 ×1
database ×1
distinct ×1
group-by ×1
kafka-python ×1
partition-by ×1