我正在使用 ProcessPoolExecutor 上下文管理器并行运行多个 Kafka 使用者。我需要存储子进程的进程 ID,以便稍后我可以干净地终止这些进程。我有这样的代码:
Class MultiProcessConsumer:
...
def run_in_parallel(self):
parallelism_factor = 5
with ProcessPoolExecutor() as executor:
processes = [executor.submit(self.consume) for _ in range(parallelism_factor)]
# It would be nice If I could write [process.pid for process in processes] to a file here.
def consume(self):
while True:
for message in self.kafka_consumer:
do_stuff(message)
Run Code Online (Sandbox Code Playgroud)
os.get_pid()我知道我可以在 Consumer 方法中使用来获取 PID。但是,正确处理它们(在消费者不断关闭或启动的情况下)需要一些额外的工作。
您建议我如何在这样的上下文中获取并存储子进程的 PID?
在让消费者保持活力方面,有些事情让我感到困惑。假设我有一个不断写入数据的主题。但是,在一天中的一个小时内,没有新消息。如果我为我的消费者设置了超时,当没有新消息时,消费者将被关闭。
现在,新消息到达。但是,没有消费者活着消费它们。
我应该如何处理这些场景?我的消费者可能会消费所有消息并被关闭。让他们活下去的最好方法是什么?有没有办法在新消息到达时自动调用它们?此类场景的最佳实践是什么?
我有一个想要打包的应用程序,它依赖于protobuf. protobuf 必须这样安装:
pip install protobuf --no-binary=protobuf protobuf
Run Code Online (Sandbox Code Playgroud)
我怎样才能将其包含在我的中setup.py install_requires?
注意:我的要求要求此过程自动通过 setup.py。
因此,我尝试在 install_requires 或requirements.txt 中包含这些内容,但没有成功:
#requirements.txt
protobuf
--no-binary=protobuf
Run Code Online (Sandbox Code Playgroud)
当我打包应用程序时,如何为安装提供--no-binary=protobuf protobuf(我知道是标志)?pipprotobuf
更新: 我想要的是按照我在安装软件包时提到的方式安装 protobuf 软件包:
`pip install my_package.whl`
Run Code Online (Sandbox Code Playgroud) 我是Django的新手,试图练习一个用户注册的简单项目(使用django registration-redux),上传一些文件然后他/她提供了她的文件列表,可以下载.这是我的models.py,forms.py和views:
models.py
class UserProfile(models.Model):
user = models.ForeignKey(User, related_name='uploaded_by')
names = models.CharField(max_length=40)
lastname = models.CharField(max_length=50)
email = models.EmailField()
uploads = models.FileField(upload_to= 'blablabla')
def __str__(self):
return self.email
Run Code Online (Sandbox Code Playgroud)
forms.py
class UserProfileForm(forms.ModelForm):
class Meta:
model = UserProfile
fields = ['names', 'uploads']
Run Code Online (Sandbox Code Playgroud)
view.py
from .forms import UserProfileForm
from .models import UserProfile
@login_required()
def new(requst):
form = UserProfileForm(requst.POST or None, requst.FILES or None)
if form.is_valid():
form.save()
context = {'title': 'welcome', 'form': form}
return render(requst, 'upform.html', context)
Run Code Online (Sandbox Code Playgroud)
但是当我向用户登录并尝试上传文件时,我收到错误:/ new NOT NULL约束中的IntegrityError失败:_userprofile.user_id
在挖掘了很多之后我注意到有人提出错误的原因是因为在发布表单的过程中无论如何都没有包含用户,所以我尝试了任何想到的内容和我在表单中添加用户字段的情况.py工作:
forms.py
class UserProfileForm(forms.ModelForm):
class …Run Code Online (Sandbox Code Playgroud) 我有一个远程存储项目,当用户请求他的文件时,django 服务器在本地检索和存储文件(用于某些处理)作为临时文件,然后使用 mod x-sendfile 将其提供给用户。我当然希望在向用户提供临时文件后将其删除。
文档指出,NamedTemporaryFile如果设置为 delete 参数,则False在所有引用都消失后会导致删除文件。但是当为用户提供服务时tempfile,它不会被删除。如果我在下载时设置 delete=True,我会收到“/ServeSegment/Test.jpg/在此服务器上找不到请求的 URL ”。
这是列出用户文件的视图:
def file_profile(request):
obj = MainFile.objects.filter(owner=request.user)
context = {'title': 'welcome',
'obj': obj
}
return render(request, 'ServeSegments.html', context=context)
Run Code Online (Sandbox Code Playgroud)
这是检索、临时存储和提供请求文件的视图:
def ServeSegment(request, segmentID):
if request.method == 'GET':
url = 'http://192.168.43.7:8000/foo/'+str(segmentID)
r = requests.get(url, stream=True)
if r.status_code == 200:
with tempfile.NamedTemporaryFile(dir=
'/tmp/Files', mode='w+b') as f:
for chunk in r.iter_content(1024):
f.write(chunk)
response = HttpResponse()
response['Content-Disposition'] = 'attachment; segmentID={0}'.format(f.name)
response['X-Sendfile'] = "{0}".format(f.name)
return response
else:
return …Run Code Online (Sandbox Code Playgroud) 我正在尝试通过以下代码从 IMDB 中抓取一些电影评论:
import requests
from time import sleep
url='https://www.imdb.com/title/tt0068646/reviews?ref_=tt_urv'
response= requests.get(url)
Run Code Online (Sandbox Code Playgroud)
并收到此错误:
SSLError: HTTPSConnectionPool(host='www.imdb.com', port=443): Max retries exceeded with url: /title/tt0068646/reviews?ref_=tt_urv (Caused by SSLError(SSLError("bad handshake: Error([('SSL routines', 'tls_process_server_certificate', 'certificate verify failed')])")))
Run Code Online (Sandbox Code Playgroud)
有什么想法吗?
有没有什么办法,通过任何 pythonic Elasticsearch 库,使用字段检查给定索引中是否存在文档_id?
假设我有一个随机 doc _id 73H316Dhgh,我想检查它是否存在于给定索引上?我将如何使用 python Elasticsearch 库来解决这个问题?
当使用 Django 默认单元测试时,修补设置属性很简单(@override_settings例如使用装饰器)。
我想覆盖测试方法设置的几个属性。当我使用 pytest-django 时,如何才能实现这一目标?
我是django的新手并拥有以下型号:
class MainFile(models.Model):
owner = models.ForeignKey(User)
# docfile = models.FileField(verbose_name= 'Enter you file', null=True)
file_id = models.UUIDField(max_length=38, primary_key=True, default=1)
file_name = models.CharField(max_length=70)
file_suffix = models.CharField(max_length=5, blank=True, null=True)
file_date = models.DateTimeField(auto_now_add=True, null=True)
Run Code Online (Sandbox Code Playgroud)
所以我想使用file_id过滤表的一行,并拥有该行,访问file_suffix.目前我发现了以下方法,但它很棘手:
obj = MainFile.objects.filter(file_id = 'e3e978a3-0375-4bb9-85a2-5175e2ec097d')
suffix = [i.file_suffix for i in obj]
Run Code Online (Sandbox Code Playgroud)
我试过很多方法但除了上面没有运气但我知道这不是正确的方法.如何使用file_id作为过滤器创建obj后获取file_suffix?
我知道可以将模型方法或属性添加到序列化器中,如下所示:
class Order(models.Model):
...
def tax_status(self, check_item_bought=True):
...
Run Code Online (Sandbox Code Playgroud)
因此,要添加total_tax到OrderSerializer,就这么简单:
class OrderSerializer(serializers.ModelSerializer):
tax_status = serializers.CharField(required=False)
class Meta:
model = Order
fields = ["tax_status", ...]
Run Code Online (Sandbox Code Playgroud)
上面的效果很好。但是,我需要tax_status_all向序列化器添加另一个字段,该字段指向相同的方法,但将 arg 设置check_item_bought为 False。我怎样才能做到这一点?任何建议都会有所帮助。