小编Kra*_* Li的帖子

python日志记录是否支持多处理?

有人告诉我,日志记录不能用于多处理.在多处理混淆日志的情况下,您必须执行并发控制.

但我做了一些测试,似乎在使用多处理登录时没有问题

import time
import logging
from multiprocessing import Process, current_process, pool


# setup log
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%a, %d %b %Y %H:%M:%S',
                    filename='/tmp/test.log',
                    filemode='w')


def func(the_time, logger):
    proc = current_process()
    while True:
        if time.time() >= the_time:
            logger.info('proc name %s id %s' % (proc.name, proc.pid))
            return



if __name__ == '__main__':

    the_time = time.time() + 5

    for x in xrange(1, 10):
        proc = Process(target=func, name=x, args=(the_time, logger))
        proc.start()
Run Code Online (Sandbox Code Playgroud)

正如您从代码中看到的那样.

我故意让子进程在同一时刻(开始后5s)写日志以增加冲突的可能性.但是根本没有冲突.

所以我的问题是我们可以在多处理中使用日志记录吗?为什么这么多帖子说我们不能?

python python-multiprocessing

23
推荐指数
2
解决办法
5930
查看次数

我们可以在docker中运行多进程程序吗?

我有一些使用多进程的代码,如下所示:

import multiprocessing
from multiprocessing import Pool

pool = Pool(processes=100)
result = []

for job in job_list:        
    result.append( 
        pool.apply_async(
            handle_job, (job)
            )
        )
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)

该程序正在对非常大的数据集进行大量计算.因此,我们需要多进程来同时处理工作以提高性能.

我被告知,对于托管系统,一个docker容器只是一个进程.所以我想知道如何在Docker中处理我的多进程?

以下是我的担忧:

  1. 由于容器只是一个过程,我的多进程代码在这个过程中会变成多线程吗?

  2. 性能会下降吗?因为我使用多进程的原因是同时完成工作以获得更好的性能.

multithreading multiprocessing docker

12
推荐指数
1
解决办法
1万
查看次数

如何在 Django rest 框架中捕获 Model.DoesNotExist 异常?

在 Django 休息框架中。当您查询数据库模型并且它不存在时,您将遇到如下异常

ModleName.DoesNotExist
Run Code Online (Sandbox Code Playgroud)

此异常将根据模型名称而变化。例如:

查询模型 Car 会提出

Car.DoesNotExist
Run Code Online (Sandbox Code Playgroud)

查询模型平面会升起

Plane.DoesNotExist
Run Code Online (Sandbox Code Playgroud)

这会导致您无法在一个常见的地方捕获异常的麻烦。因为你不知道 Exception 的父类。每次查询模型时都必须捕获异常,例如:

    try:
        return Car.objects.get(pk=1)
    except Car.DoesNotExist:
        raise Http404
Run Code Online (Sandbox Code Playgroud)

为什么 Django 设计这样的异常?是否可以通过其共同祖先捕获异常?

django django-models django-rest-framework

10
推荐指数
1
解决办法
4181
查看次数

这是火花流或内存泄漏的错误吗?

我将我的代码提交给Spark独立集群.提交命令如下:

nohup ./bin/spark-submit  \  
--master spark://ES01:7077 \
--executor-memory 4G \
--num-executors 1 \
--total-executor-cores 1 \
--conf "spark.storage.memoryFraction=0.2"  \
./myCode.py 1>a.log 2>b.log &
Run Code Online (Sandbox Code Playgroud)

我在上面的命令中指定执行程序使用4G内存.但是使用top命令监视执行程序进程,我注意到内存使用量不断增长.现在top命令输出如下:

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND                                                                                                                                                    
12578 root      20   0 20.223g 5.790g  23856 S  61.5 37.3  20:49.36 java       
Run Code Online (Sandbox Code Playgroud)

我的总内存为16G,因此37.3%已经超过我指定的4GB.它仍在增长.

使用ps命令,可以知道它是执行程序进程.

[root@ES01 ~]# ps -awx | grep spark | grep java
10409 ?        Sl     1:43 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --ip ES01 --port 7077 --webui-port 8080
10603 ?        Sl …
Run Code Online (Sandbox Code Playgroud)

memory memory-leaks apache-spark apache-spark-sql

8
推荐指数
1
解决办法
3698
查看次数

WAL怎么可能?提前写日志?有比直接写入磁盘更好的性能吗?

WAL(Write-Ahead Log)技术已经在很多系统中使用。

WAL 的机制是,当客户端写入数据时,系统会做两件事:

  1. 日志写入磁盘并返回给客户端
  2. 数据异步写入磁盘、缓存或内存

有两个好处:

  1. 如果出现异常(即断电),我们可以从日志中恢复数据。
  2. 性能好,因为我们异步写入数据,可以批量操作

为什么不直接将数据写入磁盘?您将每次写入都直接写入磁盘。成功时,您告诉客户端成功,如果写入失败,则返回失败的响应或超时。

这样,您仍然拥有这两个好处。

  1. 在断电的情况下,您不需要恢复任何东西。因为返回给客户端的每个成功响应都意味着数据确实在磁盘上。
  2. 性能应该是一样的。虽然我们经常接触磁盘,但是 WAL 也是一样的(WAL 每次写入成功都意味着它在磁盘上成功)

那么使用 WAL 有什么好处呢?

database ceph rocksdb

8
推荐指数
1
解决办法
1376
查看次数

火花流窗口可以有多大?

我有一些数据流需要计算。我正在考虑使用火花流来完成这项工作。但是有一件事我不确定并且感到担心。

我的要求是这样的:

数据每 5 分钟以 CSV 文件的形式出现。我需要最近 5 分钟、1 小时和 1 天的数据报告。所以如果我设置一个火花流来做这个计算。我需要间隔为 5 分钟。我还需要设置两个窗口 1 小时和 1 天。

每5分钟就会有1GB的数据进来。所以一小时的窗口会计算出12GB(60/5)的数据,一天的窗口会计算出288GB(24*60/5)的数据。

我对火花没有太多经验。所以这让我很担心。

  1. 火花能处理这么大的窗户吗?

  2. 计算这些 288 GB 数据需要多少 RAM?超过 288 GB 的内存?(我知道这可能取决于我的磁盘 I/O、CPU 和计算模式。但我只是想要一些基于经验的估计答案)

  3. 如果对一天/一小时数据的计算在流中过于昂贵。你有什么更好的建议吗?

stream apache-spark spark-streaming

6
推荐指数
0
解决办法
1237
查看次数

如何使用python将协议号转换为名称?

像tcp和udp这样的协议都用数字表示.

import socket
socket.getprotocobyname('tcp') 
Run Code Online (Sandbox Code Playgroud)

上面的代码将返回6.

如果我知道协议号,我怎么能得到协议名称?

python networking

6
推荐指数
1
解决办法
2185
查看次数

"经纪人运输失败"在卡夫卡意味着什么?

我正在使用python客户端(Confulent kafka)从kafka使用.有时消费者会遇到如下错误:

ERROR KafkaError{code=_TRANSPORT,val=-195,str="GroupCoordinator response error: Local: Broker transport failure"}
Run Code Online (Sandbox Code Playgroud)

有人可以帮忙解释错误是什么意思吗?"运输失败"似乎意味着消费者与经纪人有网络问题,是吗?发生此错误时该怎么办?

python apache-kafka

6
推荐指数
1
解决办法
1109
查看次数

为什么Python多处理按顺序运行?

我在下面有一些非常简单的代码.#!/ usr/bin/python来自多处理导入池导入时间

def worker(job):
    if job in range(25,30):
        time.sleep(10)
    print "job:%s" %job
    return (job)

pool = Pool(processes=10)
result = []

for job in range(1, 1000):
    result.append(pool.apply_async(worker(job)))
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,我有一个工作人员使用多处理来处理1000个作业.如果工作是25-30,那么工人将睡10秒.这是尝试模拟时间/资源成本工作.

当我运行上面的代码时,输​​出如下所示.从作业25.整个过程就像一个顺序过程一样运行.因为每10秒就有一个作业后的输出24.直到作业30完成.

但为什么?不应该同时运行多处理过程吗?

[root@localhost tmp]# ./a.py 
job:1
job:2
job:3
job:4
job:5
job:6
job:7
job:8
job:9
job:10
job:11
job:12
job:13
job:14
job:15
job:16
job:17
job:18
job:19
job:20
job:21
job:22
job:23
job:24


job:25
job:26
...
Run Code Online (Sandbox Code Playgroud)

python multiprocessing

5
推荐指数
1
解决办法
102
查看次数

如何理解apache Spark中的queueStream API?

pyspark有一个apiqueueStream,用于从一系列rdd构造dstream。

\n\n
queueStream(rdds, oneAtATime=True, default=None)\nCreate an input stream from an queue of RDDs or list. In each batch, it will process either one or all of the RDDs returned by the queue.\n\nNOTE: changes to the queue after the stream is created will not be recognized.\n\nParameters: \nrdds \xe2\x80\x93 Queue of RDDs\noneAtATime \xe2\x80\x93 pick one rdd each time or pick all of them once.\ndefault \xe2\x80\x93 The default rdd if no more in rdds\n
Run Code Online (Sandbox Code Playgroud)\n\n

问题一:

\n\n

在分布式环境中,如果我定义一个队列对象q1。我执行像 q1.add(RDD) 这样的操作。q1对象会传输到所有工作节点吗?如果将该对象复制到其他节点,q1.add(RDD)操作会出现问题吗?

\n\n

问题2:

\n\n …

apache-spark

5
推荐指数
1
解决办法
3890
查看次数