我想实现类似选项3 RabbitMQ的拓扑这里,除了一些差别:
新的拓扑每天应该处理几千条消息。并且它应该有两个交换:一个用于处理主队列(大约30个),另一个用于处理重试和错误队列(大约60个)。我一直在关注本教程和常规的RMQ教程,以及许多SO帖子。RMQ服务器在Docker容器中启动。
我面临的问题是,并非所有消息都被使用者接收,并且接收消息的顺序是意外的。我还看到同一封邮件被拒绝两次。这是我的代码:
exchanges.py
def callback(self, channel, method, properties, body):
print("delivery_tag: {0}".format(method.delivery_tag))
data = json.loads(body)
routingKey = data.get('routing-key')
routingKey_dl_error = queues_dict[routingKey]['error']
print(" [X] Got {0}".format(body))
print(" [X] Received {0} (try: {1})".format(data.get('keyword'), int(properties.priority)+1))
# redirect faulty messages to *.error queues
if data.get('keyword') == 'FAIL':
channel.basic_publish(exchange='exchange.retry',
routing_key=routingKey_dl_error,
body=json.dumps(data),
properties=pika.BasicProperties(
delivery_mode=2,
priority=int(properties.priority),
timestamp=int(time.time()),
headers=properties.headers))
print(" [*] Sent to error queue: {0}".format(routingKey_dl_error))
time.sleep(5)
channel.basic_ack(delivery_tag=method.delivery_tag) #leaving this in creates 1000s of iterations(?!)
# check number of sent counts
else:
# …Run Code Online (Sandbox Code Playgroud) 我正在尝试将芹菜任务中的数据输出到一个单独的窗口中.我是一个新手JavaScript和AJAX,而这正是我目前的问题所在.执行视图后,将启动celery任务并success.html呈现下一个html页面():
success.html
{% block content %}
<body>
{% if task_id %}
<h1>task_id has been called: {{ task_id }}</h1>
<script src="{% static 'MyAPP/bootstrap/js/task_output_retrieval.js' %}"></script>
<script type='text/javascript'> task_state("{{ task_id }}"); </script>
<script src="{% static 'MyAPP/bootstrap/js/update-hello-user.js' %}"></script>
<script type='text/javascript'> second(); </script>
<h1> END </h1>
{% endif %}
</body>
{% endblock content %}
Run Code Online (Sandbox Code Playgroud)
我知道这JavaScript是被调用的,因为窗口至少是打开的.这是.js:
task_output_retrieval.js
function task_state (task_id) {
var taskID = task_id;
var newWin = window.open('', 'new window', 'width=200, height=100');
$.ajax({
url: '{% …Run Code Online (Sandbox Code Playgroud) mymodule.py
def write_df_to_csv(self, df, modified_fn):
new_csv = self.path + "/" + modified_fn
df.to_csv(new_csv, sep=";", encoding='utf-8', index=False)
Run Code Online (Sandbox Code Playgroud)
test_mymodule.py
class TestMyModule(unittest.TestCase):
def setUp(self):
args = parse_args(["-f", "test1"])
self.mm = MyModule(args)
self.mm.path = "Random/path"
self.test_df = pd.DataFrame(
[
["bob", "a"],
["sue", "b"],
["sue", "c"],
["joe", "c"],
["bill", "d"],
["max", "b"],
],
columns=["A", "B"],
)
def test_write_df_to_csv(self):
to_csv_mock = mock.MagicMock()
with mock.patch("project.mymodule.to_csv", to_csv_mock, create=True):
self.mm.write_df_to_csv(self.test_df, "Stuff.csv")
to_csv_mock.assert_called_with(self.mm.path + "/" + "Stuff.csv")
Run Code Online (Sandbox Code Playgroud)
当我运行这个测试时,我得到:
FileNotFoundError: [Errno 2] No such file or directory: 'Random/path/Stuff.csv'
Run Code Online (Sandbox Code Playgroud)
我试图to_csv用我的方法来嘲笑。我的其他测试按预期运行,但是我不确定这个测试出了什么问题。我对 …
我最近从我的环境中卸载了预提交。我在 pipelinev 中执行了以下操作:
pipenv --rm
<deleted Pipfile and Pipfile.lock>
pipenv install -r requirements.txt
Run Code Online (Sandbox Code Playgroud)
我确保预提交模块不再位于requirements.txt 中。当我进行 git 提交时,我得到:
~/my_project/.venv/bin/python: No module named pre_commit
Run Code Online (Sandbox Code Playgroud)
这阻止了我提交,而且我不知道这是从哪里来的,因为未安装预提交。此外,指定的回溯路径指向 python 而不是 python3。我缺少什么?
之前曾问过类似的问题,但我仍然无法找到解决方案。我的代码:
try:
connection = cx_Oracle.connect(ORACLE_CONNECT)
logger.info("Connection to Oracle success.")
print ("Oracle DB version: " + connection.version)
print ("Oracle client encoding: " + connection.encoding)
print ("Python version: " + platform.python_version())
except cx_Oracle.DatabaseError as e:
error, = e.args
if error.code == 1017:
print ("Username/password invalid.")
logger.debug("Username/password invalid: %s", error.code)
else:
logger.debug("Database connection error: %s", e)
print ("Database connection error: %s".format(e))
raise
cursor = connection.cursor()
smsreport_text_new = tuple(smsreport_text)
find_command = self.identify_unique_msgid(smsreport_list)
cursor.execute(find_command)
def identify_unique_msgid(self, smsreport_list):
msgid_i_to_be_crosschecked = smsreport_list.get('msgid_i')
msgid_ii_to_be_crosschecked = smsreport_list.get('msgid_ii')
find_command = …Run Code Online (Sandbox Code Playgroud) 我对如何以及在何处实现并发编辑功能感到困惑,因此无法执行Mutex并发编辑.我的代码:
models.py
class Order(models.Model):
edit_version = models.IntegerField(default=0, editable=True) # For concurrency editing
### Added for concurrency with 2 or more users wanting to edit the same form ###
locked = models.BooleanField(default = False)
def lock_edit(self):
self.locked = True
print ("locked_1: {0}".format(self.locked)) #Test purposes only
super().save() # what's this doing exctly??
def save_edit(self):
self.locked = False
print ("locked_2: {0}".format(self.locked)) #Test purposes only
super().save()
Run Code Online (Sandbox Code Playgroud)
view.py
@permission_required('myapp.edit_order', fn=objectgetter(Order, 'id'))
def edit_order(request,id = None):
"""
"""
order = Order.objects.get(id=id)
print ("order: {0}".format(order))
print ("EDIT_VERSION: …Run Code Online (Sandbox Code Playgroud) django django-models python-3.x database-concurrency django-1.11
这个问题更多的是一个关于如何最好地构建 ETL 管道的架构问题。目前,我有一个通过 SQS ping 的 AWS Lambda。但处理数据可能需要 15 分钟多一点(AWS 的运行时间限制),并且使用 forsam build部署会导致 .zip 大于 250MB,因此会引发错误。因此需要 AWS Lambda 的替代方案。到目前为止我看到的替代方案是:
SQS -> ECS (Fargate) SQS -> Lambda -> ECS (Fargate)
我没有找到任何关于这两个选项的优缺点的提示,以及通常首选的选项。关于如何解决这个问题有什么建议吗?
我有两个子图,并且想为它们同时使用x和y轴标签。我的代码如下:
fig, ax = plt.subplots()
ax = fig.add_subplot(111)
ax1 = fig.add_subplot(211)
ax2 = fig.add_subplot(212)
ax.set_ylabel("array2 stuff")
plt.subplot(2, 1, 1)
plt.plot(array1, array2, 'o-', label='stuff')
plt.title("my stuff")
plt.legend(loc="lower left")
plt.grid()
plt.subplot(2, 1, 2)
plt.plot(array1, array2, 'o-', label='stuff')
plt.xlabel("Date")
ax.set_ylabel("array2 stuff")
plt.legent(loc="lower left")
plt.ylim(-constant, constant)
plt.grid()
plot.show()
Run Code Online (Sandbox Code Playgroud)
x轴标签似乎可以工作,但是y标签不会在两个图之间居中。相反,它以下部图的y轴为中心。
sql = ("INSERT INTO {0} "
"(id, timestamp, status, priority, client, group, capacity, level, mail_id) "
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)".format(TABLE_NAME_MAIL))
values = ('NULL', report['timestamp'], 'succeeded', report['priority'], c.strip(), report['group'], 'NULL', 'NULL', ref_mailid)
cursor.execute(sql, values)
#cursor.execute('INSERT INTO %s VALUES (NULL,"%s","%s","%s","%s","%s",NULL,NULL,"%s") ' % (TABLE_NAME_REPORT, report['timestamp'], 'succeeded', report['priority'], c.strip(), report['group'], ref_mailid))
Run Code Online (Sandbox Code Playgroud)
注释掉的cursor.execute工作,未注释的抛出错误:
_mysql_exceptions.ProgrammingError: (1064, "您的 SQL 语法有错误;请检查与您的 MySQL 服务器版本相对应的手册,以获取在 'group, capacity, level, mail_id 附近使用的正确语法) VALUES ('NULL', ' 2014-12-05 23:46:56',在第 1 行'成功'")
列“ id”有AUTO_INCREMENT
为什么我收到这个错误?
熊猫数据框如下所示:
Col1 Col2
A 1
A 1
A 1
B 0
B 0
B 1
B 1
B 1
C 1
C 1
C 1
C 1
Run Code Online (Sandbox Code Playgroud)
我想将所有分组在一起Col1,然后检查该组(即A)的所有值Col2是否 均为1。在此示例中,所需的输出为:
[A, C]
Run Code Online (Sandbox Code Playgroud)
(因为只有A和C的所有值都设置为1)。我该怎么做呢?
python-3.x ×4
python ×3
django ×2
pandas ×2
python-3.6 ×2
ajax ×1
amazon-ecs ×1
amazon-sqs ×1
aws-lambda ×1
celery ×1
cx-oracle ×1
django-1.11 ×1
django-urls ×1
docker ×1
etl ×1
javascript ×1
matplotlib ×1
mysql ×1
oracle ×1
pika ×1
pipenv ×1
plot ×1
pre-commit ×1
rabbitmq ×1
sql ×1
subplot ×1