我们有一个脚本可以定期从各种来源下载文档。我将把它转移到 celery,但在这样做的同时,我想利用连接池,但我不知道如何去做。
我目前的想法是使用请求来做这样的事情:
import celery
import requests
s = requests.session()
@celery.task(retry=2)
def get_doc(url):
doc = s.get(url)
#do stuff with doc
Run Code Online (Sandbox Code Playgroud)
但我担心连接会无限期地保持打开状态。
只要我正在处理新文档,我真的只需要连接保持打开状态。
所以这样的事情可能:
import celery
import requests
def get_all_docs()
docs = Doc.objects.filter(some_filter=True)
s = requests.session()
for doc in docs: t=get_doc.delay(doc.url, s)
@celery.task(retry=2)
def get_doc(url):
doc = s.get(url)
#do stuff with doc
Run Code Online (Sandbox Code Playgroud)
但是,在这种情况下,我不确定连接会话是否会跨实例持续存在,或者一旦酸洗/取消酸洗完成,请求是否会创建新连接。
最后,我可以在类方法上尝试对任务装饰器的实验性支持,如下所示:
import celery
import requests
class GetDoc(object):
def __init__(self):
self.s = requests.session()
@celery.task(retry=2)
def get_doc(url):
doc = self.s.get(url)
#do stuff with doc
Run Code Online (Sandbox Code Playgroud)
最后一个似乎是最好的方法,我将对此进行测试;但是,我想知道这里是否有人已经做过类似的事情,或者如果没有,你们中的一个人可能有比上述方法更好的方法。
我使用下面的语句来获取 html 字符串:
import urllib3
url ='http://urllib3.readthedocs.org/'
http_pool = urllib3.connection_from_url(url)
r = http_pool.urlopen('GET',url)
print (r.data)
Run Code Online (Sandbox Code Playgroud)
但输出是:
b'<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "b'\n<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"\n "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">\n\n\n<html xmlns="http://www.w3.org/1999/xhtml">\n <head>\n <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />\n \n\n .......................................\n</script>\n\n\n\n </body>\n</html>''
Run Code Online (Sandbox Code Playgroud)
我怎样才能得到一个原始的 html 字符串?
我正在使用以下代码从 REST API 获取数据:
\n\nimport requests\nimport json\n\nkey = "my service key"\n\napi = "http://api.data.go.kr/openapi/pblprfr-event-info-std?serviceKey=", key, "&s_page=1&s_list=100&type=json"\n\n\nr = requests.get(api)\n\ndata = json.loads(r.text)\n\nprint(data["\xed\x96\x89\xec\x82\xac\xeb\xaa\x85"]) \nRun Code Online (Sandbox Code Playgroud)\n\n此代码产生以下错误:
\n\n\n\n\n文件“sel2.py”,第 1 行,位于 <module> 中
\n\nRun Code Online (Sandbox Code Playgroud)\n\nimport requests\n文件“/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/requests/ init .py”,第 46 行,位于 <module> 中
\n\nRun Code Online (Sandbox Code Playgroud)\n\nfrom .exceptions import RequestsDependencyWarning\n文件“/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/requests/exceptions.py”,第 9 行,位于 <module> 中
\n\nRun Code Online (Sandbox Code Playgroud)\n\nfrom urllib3.exceptions import HTTPError as BaseHTTPError\nModuleNotFoundError:没有名为“urllib3.exceptions”的模块;\'urllib3\' 不是一个包
\n
有什么想法可能是什么问题吗?
\n我在一些代码片段和请求文档中看到过这样的事情:
import requests
sess = requests.Session()
adapter = requests.adapters.HTTPAdapter(max_retries=20)
sess.mount('https://', adapter)
Run Code Online (Sandbox Code Playgroud)
我试图更好地了解.mount()这里的作用。在这种情况下,是否只是增加对所有调用的允许重试次数sess.request()?它是否在模拟以下内容:
for _ in range(max_retries):
try:
return sess.request(...)
except:
pass
Run Code Online (Sandbox Code Playgroud)
或者还有更多事情发生吗?
我知道requests.Session实例是用具有的适配器初始化的max_retries=0,因此以上只是基于此的预感。
.mount()在这种情况下,了解如何具体改变会话对象的行为会很有帮助。
我找到了下一个例子
def prepare_retry_requester(retries: int = 5, forcelist: List = (503,)) -> requests.Session:
requester = requests.Session()
retry = urllib3.Retry(total=retries, backoff_factor=1, status_forcelist=forcelist)
for protocol in 'http://', 'https://':
requester.mount(protocol, requests.adapters.HTTPAdapter(max_retries=retry))
return requester
with prepare_retry_requester(forcelist=[502, 503, 413]) as requester:
response = requester.post(url, data=serialized)
Run Code Online (Sandbox Code Playgroud)
但如果我出现502一段时间错误(服务器重新启动 10 秒),它仍然会失败。
我已经构建了一个烧瓶应用程序。此应用程序对具有类似“ http://xyz:8080/graphql ”的 URL 的 graphql 服务器进行 API 调用。
该应用程序在 docker 容器中进行容器化,并在 docker compose 上运行。
这个外部 graphql 服务器可以从 chrome 浏览器和 api 工具访问。但是当容器调用服务器 url 时,它会给出这样的错误。
**HTTPConnectionPool(host='xyz', port=8080): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f21777
f0c18>: Failed to establish a new connection: [Errno -5] No address associated with hostname',))**
Run Code Online (Sandbox Code Playgroud)
url 与 DNS 绑定,IP 地址也在那里。我没有服务器的 IP 地址,只有 DNS,即“xyz”,有人遇到过这种问题吗?下面是为在应用程序中连接到 graphql 而编写的类。
import requests
class GraphQL:
def graphql(self, query, variables = {}, headers = None):
url = 'http://xyz:8080/graphql'
response = …Run Code Online (Sandbox Code Playgroud) raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPSConnectionPool(host='mycompanyurl.in',
port=443): Max retries exceeded with url: /api/v1/issues.json (Caused by
NewConnectionError('<requests.packages.urllib3.connection.VerifiedHTTPSConnection object
at 0x51047d0>: Failed to establish a new connection: [Errno 110] Connection timed out',))
Run Code Online (Sandbox Code Playgroud)
不过,mycompanyurl.in没关系,我也可以在浏览器中打开它。我正在使用Python 2.7.5。
给出以下示例用法:
adapter = HTTPAdapter(max_retries=Retry(
total=5,
backoff_factor=0.1,
status_forcelist=[429, 500, 502, 503, 504],
method_whitelist=["HEAD", "GET", "OPTIONS"]
))
session = requests.Session()
session.mount("http://", adapter)
session.mount("https://", adapter)
rsp = session.post(url, json=my_json, params=my_params)
Run Code Online (Sandbox Code Playgroud)
我偶尔会得到:
('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))
Run Code Online (Sandbox Code Playgroud)
我也想围绕这个问题建立重试(相关问题),并且我希望能够通过从库的重试循环内部发出这样的错误来测试它。
我怎么做?
我们将跟踪信息注入到基于 urllib3 实现的 API 客户端库中所有 http 请求调用的请求标头中
def _init_jaeger_tracer():
'''Jaeger tracer initialization'''
config = Config(
config={
'sampler': {
'type': 'const',
'param': 1,
},
},
service_name="session"
)
return config.new_tracer()
class APIObject(rest.APIObject):
'''Class for injecting traces into urllib3 request headers'''
def __init__(self, configuration):
print("RESTClientObject child class called####")
self._tracer = None
super().__init__(configuration)
self._tracer = _init_jaeger_tracer()
# pylint: disable=W0221
def request(self, method, url, *args, **kwargs):
lower_method = method.lower()
with self._tracer.start_active_span('requests.{}'.format(lower_method)) as scope:
span = scope.span
span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT)
span.set_tag(tags.COMPONENT, 'request')
span.set_tag(tags.HTTP_METHOD, lower_method)
span.set_tag(tags.HTTP_URL, url)
headers …Run Code Online (Sandbox Code Playgroud) 我使用 packagerequests一起urllib3.util.retry.Retry()发送数以万计的查询。我试图计算查询次数和必要尝试的次数,直到成功检索到所需的数据。我的目标是构建 API 可靠性的衡量标准。
为了解决这个问题,我们假设 的 Response 对象requests包含以下数据:
from requests import Session
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
def create_session():
session = Session()
retries = Retry(
total = 15,
backoff_factor = 0.5,
status_forcelist = [401, 408, 429, 500, 502, 504],
allowed_methods = frozenset(["GET"])
)
session.mount('http://', HTTPAdapter(max_retries=retries))
session.mount('https://', HTTPAdapter(max_retries=retries))
return session
urls = ['https://httpbin.org/status/500']
count_queries = len(urls)
count_attempts = 0
with create_session() as s:
for url in urls:
response = s.get(url)
count_attempts += …Run Code Online (Sandbox Code Playgroud)