小编Pyt*_*ice的帖子

如何将字典列表保存到文件中?

我有一个词典列表.有时,我想更改并保存其中一个词典,以便在重新启动脚本时使用新消息.现在,我通过修改脚本并重新运行来进行更改.我想把它从脚本中拉出来并将字典列表放到某种配置文件中.

我已经找到了如何将列表写入文件的答案,但这假设它是一个平面列表.我怎么能用词典列表呢?

我的列表看起来像这样:

logic_steps = [
    {
        'pattern': "asdfghjkl",
        'message': "This is not possible"
    },
    {
        'pattern': "anotherpatterntomatch",
        'message': "The parameter provided application is invalid"
    },
    {
        'pattern': "athirdpatterntomatch",
        'message': "Expected value for debugging"
    },
]
Run Code Online (Sandbox Code Playgroud)

python dictionary file list

16
推荐指数
3
解决办法
3万
查看次数

使用boto3时S3连接超时

我正在使用boto3来操作S3.如果我的应用程序由于网络问题而无法访问S3,则连接将挂起,直到最终超时.我想设置较低的连接超时.我遇到了这个允许设置超时的botocore PR:

$ sudo iptables -A OUTPUT -p tcp --dport 443 -j DROP

from botocore.client import Config
import boto3

config = Config(connect_timeout=5, read_timeout=5)

s3 = boto3.client('s3', config=config)

s3.head_bucket(Bucket='my-s3-bucket') 
Run Code Online (Sandbox Code Playgroud)

这会引发ConnectTimeout,但错误输出仍然需要很长时间:

ConnectTimeout: HTTPSConnectionPool(host='my-s3-bucket.s3.amazonaws.com', port=443): Max retries exceeded with url: / (Caused by ConnectTimeoutError(<botocore.awsrequest.AWSHTTPSConnection object at 0x2ad5dd0>, 'Connection to my-s3-bucket.s3.amazonaws.com timed out. (connect timeout=5)'))
Run Code Online (Sandbox Code Playgroud)

调整连接和读取超时不会影响连接响应的速度.

python amazon-s3 amazon-web-services boto3

15
推荐指数
2
解决办法
2万
查看次数

如何将数据发送到正在运行的python线程?

我有一个在我的应用程序中的不同线程中运行的类.我可以一次运行多个线程,线程是守护进程.一段时间后,其中一些线程需要接收和处理消息.我该怎么做呢?

我的代码示例如下所示:

import threading
import time 

class MyThread(threading.Thread):
    def __init__(self, args=(), kwargs=None):
        threading.Thread.__init__(self, args=(), kwargs=None)
        self.daemon = True
        self.receive_messages = args[0]

    def run(self):
        print threading.currentThread().getName(), self.receive_messages

    def do_thing_with_message(self, message):
        if self.receive_messages:
            print threading.currentThread().getName(), "Received %s".format(message)

if __name__ == '__main__':
    threads = []
    for t in range(10):
        threads.append( MyThread(args=(t % 2 == 0,)))
        threads[t].start()
        time.sleep(0.1)

    for t in threads:
        t.do_thing_with_message("Print this!")
Run Code Online (Sandbox Code Playgroud)

这输出:

Thread-1 True
Thread-2 False
Thread-3 True
Thread-4 False
Thread-5 True
Thread-6 False
Thread-7 True
Thread-8 False
Thread-9 …
Run Code Online (Sandbox Code Playgroud)

python multithreading

14
推荐指数
1
解决办法
1万
查看次数

如何覆盖 Gunicorn 的日志配置以使用自定义格式化程序

我希望 gunicorn.error 使用以下基于键值的日志格式,而不是在gunicorn/glogging.py 中定义的默认值:

'format': 'timestamp=%(asctime)s pid=%(process)d loglevel=%(levelname)s msg=%(message)s'`
Run Code Online (Sandbox Code Playgroud)

在我的 gunicorn 配置文件中:

import logging.config

workers = 2
bind = "127.0.0.1:8000"
loglevel = 'INFO'

LOGGING = {
    'version': 1,
    'disable_existing_loggers': True,
    'formatters': {
        'key_value': {
            'format': 'timestamp=%(asctime)s pid=%(process)d loglevel=%(levelname)s msg=%(message)s'
        },
    },
    'handlers': {
        'console': {
            'level': 'INFO',
            'class': 'logging.StreamHandler',
            'formatter': 'key_value',
            'stream': 'ext://sys.stdout'
        }
    },
    'loggers': {
        'gunicorn.error': {
            'handlers': ['console'],
            'level': 'INFO',
            'propagate': False,
        },
        'flask.app': {
            'handlers': ['console'],
            'level': 'INFO',
            'propagate': False,
        }
    },
}

logging.config.dictConfig(LOGGING) …
Run Code Online (Sandbox Code Playgroud)

python logging flask gunicorn

12
推荐指数
4
解决办法
1万
查看次数

如何使用在gunicorn的服务器挂钩中创建的变量?

我正在使用 Gunicorn 来运行我的 Flask 应用程序。我想注册服务器挂钩以在应用程序启动时和关闭之前执行某些操作,但我对如何将变量传递给这些函数以及如何提取在它们中创建的变量感到困惑。

在gunicorn.conf.py中:

bind = "0.0.0.0:8000"
workers = 2
loglevel = "info"
preload = True

def on_starting(server):
    # register some variables here
    print "Starting Flask application"

def on_exit(server):
    # perform some clean up tasks here using variables from the application
    print "Shutting down Flask application"
Run Code Online (Sandbox Code Playgroud)

在 app.py 中,示例 Flask 应用程序:

from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/hello', methods=['POST'])
def hello_world():
      return jsonify(message='Hello World')

if __name__ == '__main__':
   app.run(host='0.0.0.0', port=9000, debug=False)
Run Code Online (Sandbox Code Playgroud)

像这样运行gunicorn:$ gunicorn -c …

python flask gunicorn

8
推荐指数
2
解决办法
3748
查看次数

Python多处理 - 捕获信号以重新启动子进程或关闭父进程

我正在使用多处理库来生成两个子进程.我想确保只要父进程处于活动状态,如果子进程死掉(接收SIGKILL或SIGTERM),它们就会自动重启.另一方面,如果父进程收到SIGTERM/SIGINT,我希望它终止所有子进程然后退出.

这就是我解决问题的方法:

import sys
import time
from signal import signal, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_IGN
from functools import partial
import multiprocessing
import setproctitle

class HelloWorld(multiprocessing.Process):
    def __init__(self):
        super(HelloWorld, self).__init__()

        # ignore, let parent handle it
        signal(SIGTERM, SIG_IGN)

    def run(self):

        setproctitle.setproctitle("helloProcess")

        while True:
            print "Hello World"
            time.sleep(1)

class Counter(multiprocessing.Process):
    def __init__(self):
        super(Counter, self).__init__()

        self.counter = 1

        # ignore, let parent handle it
        signal(SIGTERM, SIG_IGN)

    def run(self):

        setproctitle.setproctitle("counterProcess")

        while True:
            print self.counter
            time.sleep(1)
            self.counter += 1


def signal_handler(helloProcess, counterProcess, signum, frame):

    print …
Run Code Online (Sandbox Code Playgroud)

python linux signals python-multiprocessing

6
推荐指数
2
解决办法
3108
查看次数

使用HTTPS和gitpython克隆私人仓库

我正在使用gitpython通过HTTPS克隆git存储库.如果项目是私人仓库,它将提示输入用户名和密码.如何通过pythonically与提示进行交互以传递用户名和密码变量?

from git import Repo

HTTPS_REMOTE_URL = 'https://github.com/username/private-project'
DEST_NAME = 'https-cloned-private-project'
cloned_repo = Repo.clone_from(HTTPS_REMOTE_URL, DEST_NAME)
Run Code Online (Sandbox Code Playgroud)

运行此代码的输出:

$ python example.py
Username for 'https://github.com': example
Password for 'https://example@github.com': 
Run Code Online (Sandbox Code Playgroud)

我知道可以在URL中包含用户名和密码:

HTTPS_REMOTE_URL = 'https://username:password@github.com/username/private-project'
Run Code Online (Sandbox Code Playgroud)

但是,如果这是一个私人回购,我无法提前知道.

python git gitpython

5
推荐指数
1
解决办法
4034
查看次数

使用带有 gunicorn 的自定义信号处理程序

我有一个带有自定义信号处理程序的 Flask 应用程序,用于在退出之前处理清理任务。使用gunicorn 运行应用程序时,gunicorn 会在应用程序完成所有清理任务之前将其杀死。

python signals flask gunicorn

5
推荐指数
1
解决办法
3177
查看次数

Flask应用程序退出时如何停止Gunicorn

我正在使用gunicorn运行我的Flask应用程序,但是当Flask应用程序由于错误而退出时,gunicorn将创建一个新的worker而不会退出。

Flask示例应用程序:

$ vim app.py

# main file
import sys
import os
import logging
from flask import Flask

from views import views

def create_app():

    app = Flask(__name__)

    app_name = os.environ.get('FLASK_APP_NAME', None)
    if app_name is None:
        logging.error("Failed to load configuration")
        sys.exit(2)

    app.config['APP_NAME'] = app_name

    console = logging.StreamHandler(sys.stdout)
    logging.getLogger().addHandler(console)
    logging.getLogger().setLevel(logging.INFO)

    logging.info("Starting Flask application")

    app.register_blueprint(views)

    return app

app = create_app()

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000, debug=False, threaded=True)


$ vim views.py

# views
from flask import request, jsonify, Blueprint
from flask import current_app …
Run Code Online (Sandbox Code Playgroud)

python linux flask gunicorn

4
推荐指数
3
解决办法
2763
查看次数