有人告诉我,日志记录不能用于多处理.在多处理混淆日志的情况下,您必须执行并发控制.
但我做了一些测试,似乎在使用多处理登录时没有问题
import time
import logging
from multiprocessing import Process, current_process, pool
# setup log
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S',
filename='/tmp/test.log',
filemode='w')
def func(the_time, logger):
proc = current_process()
while True:
if time.time() >= the_time:
logger.info('proc name %s id %s' % (proc.name, proc.pid))
return
if __name__ == '__main__':
the_time = time.time() + 5
for x in xrange(1, 10):
proc = Process(target=func, name=x, args=(the_time, logger))
proc.start()
Run Code Online (Sandbox Code Playgroud)
正如您从代码中看到的那样.
我故意让子进程在同一时刻(开始后5s)写日志以增加冲突的可能性.但是根本没有冲突.
所以我的问题是我们可以在多处理中使用日志记录吗?为什么这么多帖子说我们不能?
我有一些使用多进程的代码,如下所示:
import multiprocessing
from multiprocessing import Pool
pool = Pool(processes=100)
result = []
for job in job_list:
result.append(
pool.apply_async(
handle_job, (job)
)
)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
该程序正在对非常大的数据集进行大量计算.因此,我们需要多进程来同时处理工作以提高性能.
我被告知,对于托管系统,一个docker容器只是一个进程.所以我想知道如何在Docker中处理我的多进程?
以下是我的担忧:
由于容器只是一个过程,我的多进程代码在这个过程中会变成多线程吗?
性能会下降吗?因为我使用多进程的原因是同时完成工作以获得更好的性能.
在 Django 休息框架中。当您查询数据库模型并且它不存在时,您将遇到如下异常
ModleName.DoesNotExist
Run Code Online (Sandbox Code Playgroud)
此异常将根据模型名称而变化。例如:
查询模型 Car 会提出
Car.DoesNotExist
Run Code Online (Sandbox Code Playgroud)
查询模型平面会升起
Plane.DoesNotExist
Run Code Online (Sandbox Code Playgroud)
这会导致您无法在一个常见的地方捕获异常的麻烦。因为你不知道 Exception 的父类。每次查询模型时都必须捕获异常,例如:
try:
return Car.objects.get(pk=1)
except Car.DoesNotExist:
raise Http404
Run Code Online (Sandbox Code Playgroud)
为什么 Django 设计这样的异常?是否可以通过其共同祖先捕获异常?
我将我的代码提交给Spark独立集群.提交命令如下:
nohup ./bin/spark-submit \
--master spark://ES01:7077 \
--executor-memory 4G \
--num-executors 1 \
--total-executor-cores 1 \
--conf "spark.storage.memoryFraction=0.2" \
./myCode.py 1>a.log 2>b.log &
Run Code Online (Sandbox Code Playgroud)
我在上面的命令中指定执行程序使用4G内存.但是使用top命令监视执行程序进程,我注意到内存使用量不断增长.现在top命令输出如下:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
12578 root 20 0 20.223g 5.790g 23856 S 61.5 37.3 20:49.36 java
Run Code Online (Sandbox Code Playgroud)
我的总内存为16G,因此37.3%已经超过我指定的4GB.它仍在增长.
使用ps命令,可以知道它是执行程序进程.
[root@ES01 ~]# ps -awx | grep spark | grep java
10409 ? Sl 1:43 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --ip ES01 --port 7077 --webui-port 8080
10603 ? Sl …
Run Code Online (Sandbox Code Playgroud) WAL(Write-Ahead Log)技术已经在很多系统中使用。
WAL 的机制是,当客户端写入数据时,系统会做两件事:
有两个好处:
为什么不直接将数据写入磁盘?您将每次写入都直接写入磁盘。成功时,您告诉客户端成功,如果写入失败,则返回失败的响应或超时。
这样,您仍然拥有这两个好处。
那么使用 WAL 有什么好处呢?
我有一些数据流需要计算。我正在考虑使用火花流来完成这项工作。但是有一件事我不确定并且感到担心。
我的要求是这样的:
数据每 5 分钟以 CSV 文件的形式出现。我需要最近 5 分钟、1 小时和 1 天的数据报告。所以如果我设置一个火花流来做这个计算。我需要间隔为 5 分钟。我还需要设置两个窗口 1 小时和 1 天。
每5分钟就会有1GB的数据进来。所以一小时的窗口会计算出12GB(60/5)的数据,一天的窗口会计算出288GB(24*60/5)的数据。
我对火花没有太多经验。所以这让我很担心。
火花能处理这么大的窗户吗?
计算这些 288 GB 数据需要多少 RAM?超过 288 GB 的内存?(我知道这可能取决于我的磁盘 I/O、CPU 和计算模式。但我只是想要一些基于经验的估计答案)
如果对一天/一小时数据的计算在流中过于昂贵。你有什么更好的建议吗?
像tcp和udp这样的协议都用数字表示.
import socket
socket.getprotocobyname('tcp')
Run Code Online (Sandbox Code Playgroud)
上面的代码将返回6.
如果我知道协议号,我怎么能得到协议名称?
我正在使用python客户端(Confulent kafka)从kafka使用.有时消费者会遇到如下错误:
ERROR KafkaError{code=_TRANSPORT,val=-195,str="GroupCoordinator response error: Local: Broker transport failure"}
Run Code Online (Sandbox Code Playgroud)
有人可以帮忙解释错误是什么意思吗?"运输失败"似乎意味着消费者与经纪人有网络问题,是吗?发生此错误时该怎么办?
我在下面有一些非常简单的代码.#!/ usr/bin/python来自多处理导入池导入时间
def worker(job):
if job in range(25,30):
time.sleep(10)
print "job:%s" %job
return (job)
pool = Pool(processes=10)
result = []
for job in range(1, 1000):
result.append(pool.apply_async(worker(job)))
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
正如您所看到的,我有一个工作人员使用多处理来处理1000个作业.如果工作是25-30,那么工人将睡10秒.这是尝试模拟时间/资源成本工作.
当我运行上面的代码时,输出如下所示.从作业25.整个过程就像一个顺序过程一样运行.因为每10秒就有一个作业后的输出24.直到作业30完成.
但为什么?不应该同时运行多处理过程吗?
[root@localhost tmp]# ./a.py
job:1
job:2
job:3
job:4
job:5
job:6
job:7
job:8
job:9
job:10
job:11
job:12
job:13
job:14
job:15
job:16
job:17
job:18
job:19
job:20
job:21
job:22
job:23
job:24
job:25
job:26
...
Run Code Online (Sandbox Code Playgroud) pyspark有一个apiqueueStream,用于从一系列rdd构造dstream。
\n\nqueueStream(rdds, oneAtATime=True, default=None)\nCreate an input stream from an queue of RDDs or list. In each batch, it will process either one or all of the RDDs returned by the queue.\n\nNOTE: changes to the queue after the stream is created will not be recognized.\n\nParameters: \nrdds \xe2\x80\x93 Queue of RDDs\noneAtATime \xe2\x80\x93 pick one rdd each time or pick all of them once.\ndefault \xe2\x80\x93 The default rdd if no more in rdds\n
Run Code Online (Sandbox Code Playgroud)\n\n问题一:
\n\n在分布式环境中,如果我定义一个队列对象q1。我执行像 q1.add(RDD) 这样的操作。q1对象会传输到所有工作节点吗?如果将该对象复制到其他节点,q1.add(RDD)操作会出现问题吗?
\n\n问题2:
\n\n …python ×4
apache-spark ×3
apache-kafka ×1
ceph ×1
database ×1
django ×1
docker ×1
memory ×1
memory-leaks ×1
networking ×1
rocksdb ×1
stream ×1