小编fas*_*uto的帖子

从嵌套字典中删除字段的优雅方法

我不得不从字典中删除一些字段,这些字段的键位于列表中.所以我写了这个函数:

def delete_keys_from_dict(dict_del, lst_keys):
    """
    Delete the keys present in lst_keys from the dictionary.
    Loops recursively over nested dictionaries.
    """
    dict_foo = dict_del.copy()  #Used as iterator to avoid the 'DictionaryHasChanged' error
    for field in dict_foo.keys():
        if field in lst_keys:
            del dict_del[field]
        if type(dict_foo[field]) == dict:
            delete_keys_from_dict(dict_del[field], lst_keys)
    return dict_del
Run Code Online (Sandbox Code Playgroud)

这段代码有效,但它不是很优雅,我确信有更好的解决方案.

python dictionary

20
推荐指数
4
解决办法
2万
查看次数

什么是在Python中生成API KEY和SECRET的最简单,最安全的方法

我需要生成一个API密钥和Secret,它将存储在Redis服务器中.生成密钥和秘密的最佳方法是什么?

我正在开发一个基于Django-tastypie框架的应用程序.

python django tastypie

18
推荐指数
2
解决办法
1万
查看次数

卡桑德拉表现不佳?

我必须为具有大量插入(1M /天)的项目选择Cassandra或MongoDB(或另一个nosql数据库,我接受建议).所以我创建了一个小测试来测量写入性能.这是在Cassandra中插入的代码:

import time
import os
import random
import string
import pycassa

def get_random_string(string_length):
    return ''.join(random.choice(string.letters) for i in xrange(string_length))

def connect():
    """Connect to a test database"""
    connection = pycassa.connect('test_keyspace', ['localhost:9160'])
    db = pycassa.ColumnFamily(connection,'foo')
    return db

def random_insert(db):
    """Insert a record into the database. The record has the following format
    ID timestamp
    4 random strings
    3 random integers"""
    record = {}
    record['id'] = str(time.time())
    record['str1'] = get_random_string(64)
    record['str2'] = get_random_string(64)
    record['str3'] = get_random_string(64)
    record['str4'] = get_random_string(64)
    record['num1'] = str(random.randint(0, 100)) …
Run Code Online (Sandbox Code Playgroud)

python mongodb cassandra nosql

8
推荐指数
1
解决办法
3732
查看次数

运行脚本时出现Python属性错误:类型对象'BaseCommand'没有属性'option_list'

我看过这篇关于如何从django运行python脚本的帖子:http: //www.djangotutsme.com/how-to-run-python-script-from-django/
我试过这个例子,但在运行时遇到以下错误python manage.py runscript myscript.我安装了Python 2.7,Django 1.10和django扩展1.6.1.

Traceback (most recent call last):
  File "manage.py", line 10, in <module>
    execute_from_command_line(sys.argv)
  File "/usr/lib/python2.7/site-packages/Django-1.10.dev20151201151517-py2.7.egg/django/core/management/__init__.py", line 349, in execute_from_command_line
    utility.execute()
  File "/usr/lib/python2.7/site-packages/Django-1.10.dev20151201151517-py2.7.egg/django/core/management/__init__.py", line 341, in execute
    self.fetch_command(subcommand).run_from_argv(self.argv)
  File "/usr/lib/python2.7/site-packages/Django-1.10.dev20151201151517-py2.7.egg/django/core/management/__init__.py", line 193, in fetch_command
    klass = load_command_class(app_name, subcommand)
  File "/usr/lib/python2.7/site-packages/Django-1.10.dev20151201151517-py2.7.egg/django/core/management/__init__.py", line 40, in load_command_class
    module = import_module('%s.management.commands.%s' % (app_name, name))
  File "/usr/lib64/python2.7/importlib/__init__.py", line 37, in import_module
    __import__(name)
  File "/usr/lib/python2.7/site-packages/django_extensions-1.6.1-py2.7.egg/django_extensions/management/commands/runscript.py", line 6, in <module>
    from django_extensions.management.email_notifications import \
  File "/usr/lib/python2.7/site-packages/django_extensions-1.6.1-py2.7.egg/django_extensions/management/email_notifications.py", line …
Run Code Online (Sandbox Code Playgroud)

python django attributeerror

8
推荐指数
1
解决办法
6286
查看次数

即使在任务开始后,AsyncResult(task_id)也会返回"PENDING"状态

在项目中,我尝试轮询一个长时间运行的任务的task.state并更新其运行状态.它在开发中起作用,但是当我在生产服务器上移动项目时它不起作用.即使我可以看到任务开始在花上,我仍然不停地'待命'.但是,当任务完成时,我仍然可以更新结果,当task.state =='SUCCESS'时.我在生产中使用python 2.6,Django 1.6和Celery 3.1,结果后端AMQP.

@csrf_exempt
def poll_state(request):
    data = 'Fail'

    if request.is_ajax():
            if 'task_id' in request.POST.keys() and request.POST['task_id']:
                    task_id = request.POST['task_id']
                    email = request.POST['email']
                    task = AsyncResult(task_id)
                    print "task.state=", task.state
                    if task.state == 'STARTED':
                            task_state = 'Running'
                            data = 'Running'
                            #data = 'Running'
                    elif task.state == 'PENDING' or task.state == 'RETRY':
                            task_state = 'Waiting'
                            data = 'Pending'
                    elif task.state == 'SUCCESS':
                            task_state = 'Finished'
                            if task.result:
                                    data = task.result
                            else:
                                    data = 'None'

                    else:
                            task_state = task.state
                            data = …
Run Code Online (Sandbox Code Playgroud)

python django celery celery-task django-celery

6
推荐指数
2
解决办法
2092
查看次数

带有过滤器的SQLAlchemy func.count

我正在使用一个像这样分页的框架:

def get_count_query(self):
    return self.session.query(func.count('*')).select_from(self.model)

def paginate(self):
    ... <irrelevant>...
    count = self.get_count_query.scalar()
    ...
Run Code Online (Sandbox Code Playgroud)

我想覆盖get_count_query方法以使用我自己的查询,因为我正在过滤一些结果而get_count_query只返回表中的所有元素.查询是动态创建的,例如,一个查询可以是:

Asset.query.join(StatusLabel).filter(StatusLabel.status == 'Deployable', or_(
                                     Asset.assigned_to.isnot(None)),
                                     Asset.deleted_at.is_(None))
Run Code Online (Sandbox Code Playgroud)

我可以使用query.count()以下方法轻松计算此查询中的元素:

def get_count_query(self):
    q = Asset.query.join(StatusLabel).filter(StatusLabel.status == 'Deployable', or_(
                                             Asset.assigned_to.isnot(None)),
                                             Asset.deleted_at.is_(None))
    return q.count()
Run Code Online (Sandbox Code Playgroud)

但是一旦它到达.scalar()方法就会失败(我无法删除此方法).所以问题是:我如何申请func.count('*')现有的查询?

我可以从查询中检索过滤器并将其应用于func.count('*')查询吗?

python sqlalchemy flask-sqlalchemy

6
推荐指数
1
解决办法
3585
查看次数

Django prefetch_related与m2m通过关系

我有以下型号

class Film(models.Model):
    crew = models.ManyToManyField('Person', through='Role', blank=True)

class Role(models.Model):
    person = models.ForeignKey('Person')
    film = models.ForeignKey('Film')
    person_role = models.ForeignKey(RoleType)
    credit = models.CharField(max_length=200)
    credited_as = models.CharField(max_length=100)

class RoleType(models.Model):
    """Actor, director, makeup artist..."""
    name = models.CharField(max_length=50)

class Person(models.Model):
    slug = models.SlugField(max_length=30, unique=True, null=True)
    full_name = models.CharField(max_length=255)
Run Code Online (Sandbox Code Playgroud)

A Film("星球大战:克隆人战争")有几个Person("克里斯托弗李"),每个人都可以拥有一个或多个Role("杜库伯爵之声"),每个人Role都有一个RoleType("配音演员").

我正在使用DetailView来显示 Film

class FilmDetail(DetailView):
    model = Film
Run Code Online (Sandbox Code Playgroud)

在我的模板中,我显示所有人,所以每次我显示一个电影609查询正在执行.为了减少这种情况,我想使用,prefetch_related所以我将视图更改为:

class FilmDetail(DetailView):
    model = Film

    def get_queryset(self):
        return super(FilmDetail, self).get_queryset().prefetch_related('crew')
Run Code Online (Sandbox Code Playgroud)

但是这并没有减少查询次数(610),我尝试了以下参数来预取相关的内容并且它不起作用:

def get_queryset(self):
        return super(FilmDetail, …
Run Code Online (Sandbox Code Playgroud)

django django-models django-queryset

5
推荐指数
1
解决办法
3063
查看次数

从SVN安装pip [错误2]

我正在尝试通过SVN信息库中的pip安装我的软件包。Subversion服务器的版本为1.6.11。

我正在使用以下命令:

pip install -e svn+http://svn.mysite.com/svn/project/project#egg=project


Error [Error 2] Can not find the file while executing command svn checkout -q http://svn.mysite.com/svn/project/project/
Can not find command 'svn'
Run Code Online (Sandbox Code Playgroud)

我怎么解决这个问题?

python svn pip

4
推荐指数
1
解决办法
1494
查看次数

将数据从Oracle移动到Cassandra和/或MongoDB

在工作中我们正在考虑从Oracle迁移到NoSQL数据库,因此我必须对Cassandra和MongoDB进行一些测试.我必须将大量表移到NoSQL数据库,我们的想法是在这两个平台之间同步数据.

所以我创建了一个简单的过程,使选择进入Oracle DB并插入到mongo中.我的一些同事指出,也许有一种更简单(也更专业)的方法.

以前有人有这个问题吗?你怎么解决的?

oracle mongodb cassandra

3
推荐指数
1
解决办法
6061
查看次数

python-attrs 中带有额外参数的自定义验证器

我有几个使用 attrs 定义的类,如下所示:

from attr import attrs, attrib, validators

@attrs
class MyClass:
    name = attrib(])
    @name.validator
    def check_length(self, attribute, value):
        if not (3 <= len(value) <= 30):
            raise ValueError("Name must be between 3 and 30 characters")

    description = attrib()
    @description.validator
    def check_length(self, attribute, value):
        if not (10 <= len(value) <= 400):
            raise ValueError("Description must be between 10 and 400 characters")
Run Code Online (Sandbox Code Playgroud)

对于几个属性,我需要创建一个验证器来检查数据是否在某个范围内。我想避免重复,所以我可以创建一个自定义验证器,在其中传递一些额外的最小值和最大值参数,如下所示:

def range_validator(instance, attribute, value, min_value, max_value):
    if  min_value >= len(value) >= max_value:
        raise ValueError("Must be between {} and {}".format(min_value, …
Run Code Online (Sandbox Code Playgroud)

python python-3.x python-attrs

3
推荐指数
1
解决办法
2160
查看次数