我终于注册了,因为我对我的问题没有更多的想法。我的后端部分使用 asyncio 和 aiohttp,前端部分使用 javascript。但是我遇到了 405 错误。(我确切地说我是这些库的初学者)
我希望从发布请求中检索 json。这里的javascript函数:
function postJson (data){
$.ajax({
url : 'http://localhost:8080/postJson',
type : 'POST',
dataType : 'json',
contentType : 'application/json',
data : data, //data are ready at json format, I do not think I need to use JSON.stringify ? I does not change anything to the error anywhere
success : function(code_html, statut){
console.log("success POST");
},
error : function(resultat, statut, erreur){
console.log("error POST");
}
});
}
Run Code Online (Sandbox Code Playgroud)
和python代码:
async def postJson(request):
data = await request.post()
#some code …Run Code Online (Sandbox Code Playgroud) 我正在将 aiohttp 2 与 Python 3.6 一起使用,并希望记录进入应用程序的请求。
我做了:
# use ISO timestamps
from time import gmtime
logging.Formatter.converter = gmtime
# create a formatter
ch = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s - %(message)s', '%Y-%m-%dT%H:%M:%S')
ch.setFormatter(formatter)
# show all emssages (default is WARNING)
logging.getLogger('aiohttp.access').setLevel(logging.DEBUG)
# attach the handler
logging.getLogger('aiohttp.access').addHandler(ch)
Run Code Online (Sandbox Code Playgroud)
现在,当应用程序运行时,我会得到以下格式的日志:
2017-04-19T16:02:17 INFO aiohttp.access - 127.0.0.1 - - [19/Apr/2017:16:02:17 +0000] "GET /test HTTP/1.1" 404 547 "-" "curl/7.51.0"
Run Code Online (Sandbox Code Playgroud)
该message组件有一个冗余时间戳,我想自定义其格式。该文件说,它应该是可能的,但我不明白如何使它实际工作,也没有代码示例。
我只发现了这种用法,但有:
mylogger = logging.Logger('aiohttp.access')
mylogger.setLevel(logging.DEBUG)
mylogger.addHandler(ch)
handler = …Run Code Online (Sandbox Code Playgroud) 我想在 mybaiohttp.web应用程序中使用 JSON 序列化。但是我还没有在aiohttpUniverse 中找到用于该任务的任何库。
如果有使用serilizers任何问题django-rest-ramework与aiohttp?
django serialization python-3.x django-rest-framework aiohttp
首先是代码:
import random
import asyncio
from aiohttp import ClientSession
import csv
headers =[]
def extractsites(file):
sites = []
readfile = open(file, "r")
reader = csv.reader(readfile, delimiter=",")
raw = list(reader)
for a in raw:
sites.append((a[1]))
return sites
async def fetchheaders(url, session):
async with session.get(url) as response:
responseheader = await response.headers
print(responseheader)
return responseheader
async def bound_fetch(sem, url, session):
async with sem:
print("doing request for "+ url)
await fetchheaders(url, session)
async def run():
urls = extractsites("cisco-umbrella.csv")
tasks = []
# create instance …Run Code Online (Sandbox Code Playgroud) 蟒蛇 3.6
这是我的 firebase 代码,用于检查用户是否存在于 firebase 上:
import firebase_admin
from firebase_admin import credentials
from firebase_admin import auth
cred = credentials.Certificate('wow.json')
default_app = firebase_admin.initialize_app(cred)
def getIsUser(email=None,uid=None):
try:
user = auth.get_user(uid)
is_user = True
except:
is_user = False
return is_user
Run Code Online (Sandbox Code Playgroud)
这是我调用该函数的 aiohttp:
async def jwt_route(request):
data = await request.json()
uid = data['uid']
is_user = getIsUser(uid=uid)
app.router.add_post('/jwt', jwt_route)
Run Code Online (Sandbox Code Playgroud)
我的问题是 getIsUser(uid=uid) 是阻塞还是不阻塞。如果阻塞,那么我如何使它成为非阻塞的?
我在用aiohttp / asyncio做错了。当我尝试run_my_job()一次循环运行另一个文件时,如果我一次性运行它,则下面的代码可以正常工作:
main.py
========================================
count = 0
batch_count = math.ceil((abc.get_count()/100))
print("there are {} batches to complete.".format(batch_count))
while count < batch_count:
print("starting batch {}...".format(count))
abc.run_my_job()
print("batch {} completed...".format(count))
count += 1
abc.py
===============================
def run_my_job(self):
self.queue_manager(self.do_stuff(all_the_tasks))
def queue_manager(self, method):
print('starting event queue')
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(method)
loop.run_until_complete(future)
loop.close()
async def async_post(self, resource, session, data):
async with session.post(self.api_attr.api_endpoint + resource, headers=self.headers, data=data) as response:
resp = await response.read()
return resp
async def do_stuff(self, data):
print('queueing tasks')
tasks …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用aiohttp从浏览器获取cookie.从文档和谷歌搜索我只发现有关在aiohttp中设置cookie的文章.
在烧瓶中,我会简单地得到饼干
cookie = request.cookies.get('name_of_cookie')
# do something with cookie
Run Code Online (Sandbox Code Playgroud)
有没有一种简单的方法可以使用aiohttp从浏览器中获取cookie?
免责声明:我是aiohttp的初学者
我正在尝试使用aiohttp异步处理get请求但事实证明它比gevent的池版本慢得多.
GEVENT VERSION
import gevent
from gevent import monkey
monkey.patch_all()
from gevent.pool import Pool
import requests
import time
def pooling_task(url):
requests.get(url)
def pooling_main():
start = time.time()
pool = Pool(10)
urls = [
"http://google.com",
"http://yahoo.com",
"http://linkedin.com",
"http://shutterfly.com",
"http://mypublisher.com",
"http://facebook.com"
]
for url in urls:
pool.apply_async(pooling_task, args=(url,))
pool.join()
end = time.time()
print("POOL TIME {}".format(end-start))
if __name__ == '__main__':
print("POOLING VERSION")
pooling_main()
Run Code Online (Sandbox Code Playgroud)
输出 - 池时间6.299163818359375
以下是aiohttp版本
import aiohttp
import asyncio
import time
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
async def fetch(session, url):
async with session.get(url) …Run Code Online (Sandbox Code Playgroud) 我正在测试aiohttp和asyncio。我希望同一事件循环具有套接字,http服务器,http客户端。
我正在使用以下示例代码:
@routes.get('/')
async def hello(request):
return web.Response(text="Hello, world")
app = web.Application()
app.add_routes(routes)
web.run_app(app)
Run Code Online (Sandbox Code Playgroud)
问题run_app是阻塞。我想将http服务器添加到使用以下命令创建的现有事件循环中:
asyncio.get_event_loop()
Run Code Online (Sandbox Code Playgroud) 我正在尝试学习在Python中使用asyncio优化脚本。我的示例返回coroutine was never awaited警告,您可以帮助您理解并找到解决方法吗?
import time
import datetime
import random
import asyncio
import aiohttp
import requests
def requete_bloquante(num):
print(f'Get {num}')
uid = requests.get("https://httpbin.org/uuid").json()['uuid']
print(f"Res {num}: {uid}")
def faire_toutes_les_requetes():
for x in range(10):
requete_bloquante(x)
print("Bloquant : ")
start = datetime.datetime.now()
faire_toutes_les_requetes()
exec_time = (datetime.datetime.now() - start).seconds
print(f"Pour faire 10 requêtes, ça prend {exec_time}s\n")
async def requete_sans_bloquer(num, session):
print(f'Get {num}')
async with session.get("https://httpbin.org/uuid") as response:
uid = (await response.json()['uuid'])
print(f"Res {num}: {uid}")
async def faire_toutes_les_requetes_sans_bloquer():
loop = asyncio.get_event_loop()
with aiohttp.ClientSession() …Run Code Online (Sandbox Code Playgroud) aiohttp ×10
python ×8
python-3.x ×4
python-3.6 ×2
asynchronous ×1
cookies ×1
django ×1
gevent ×1
javascript ×1
json ×1
logging ×1