我是 Javascript 的新手,想使用 Cheerio 库来做一些网页抓取。在图书馆介绍中看到了这段文字。我不确定选择器、上下文和根之间有什么区别。
Cheerio 的选择器实现与 jQuery 几乎相同,因此 API 非常相似。
$( 选择器, [上下文], [根] )
选择器在上下文范围内搜索,在根范围内搜索。选择器和上下文可以是字符串表达式、DOM 元素、DOM 元素数组或cheerio 对象。root 通常是 HTML 文档字符串。
这个选择器方法是遍历和操作文档的起点。与 jQuery 一样,它是在文档中选择元素的主要方法,但与 jQuery 不同的是,它构建在 CSSSelect 库之上,该库实现了大多数 Sizzle 选择器。
示例 API:
<ul id="fruits">
<li class="apple">Apple</li>
<li class="orange">Orange</li>
<li class="pear">Pear</li>
</ul>
Run Code Online (Sandbox Code Playgroud)
$('.apple', '#fruits').text() //=> 苹果
$('ul .pear').attr('class') //=> 梨
$('li[class=orange]').html() //=> 橙色
在第一个示例中,.apple 是选择器,#fruits 是上下文。那讲得通。在第二个例子中,ul 是选择器,.pear 是上下文吗?如果选择器是为了在上下文中搜索,那么鉴于 .pear 嵌套在 ul 中,这很奇怪?
我在 ruby 中有一个 webscraper,它使用这个来分析证书:
http = Net::HTTP.new(http_endpoint.host, http_endpoint.port)
http.use_ssl = true
http.verify_mode = OpenSSL::SSL::VERIFY_NONE
http.start do |h|
@cert = h.peer_cert
end
Run Code Online (Sandbox Code Playgroud)
是否可以使用 webmock 模拟此请求并在测试期间提供假证书?
我想将 django 与 aiohttp/asyncio 集成以进行异步编程和 websockets 处理。我知道 django 有 celery 和 django-channels 来分别执行异步任务和 websocket 服务器,但是 aiohttp 预先内置了异步和 websocket 服务器,我发现该框架在创建功能时比 celery/django 通道更具可扩展性和简单性到 webscraping(我不知道是否可以在 celery I 中进行 webscraping,还没有尝试过)。
并且它也完美支持异步和等待。
但我的问题是:我们如何在一个项目中同时实现 django 和 aiohttp?我们可以使用 aiohttp 服务器来代替使用 django 的开发服务器来为站点提供服务。
我们是否能够将 django 与 aiohttp 功能集成(例如让我们举个例子:如果我想将用户提交的输入的网站抓取到我的数据库中。我可以在获取网站并将以下网站发布到我的函数中时使用 await 调用吗?我的 django 数据库?或者将函数结果发布到另一个 django 函数?)
我想知道集成的缺点,如果有的话?
在发布您的答案时,请您发布一个示例实际集成示例,而不是通过 github 向我建议这些库。
我已经在使用puppeteer来抓取我的页面,但是,我还需要原始 html(基本上是页面源)。
我知道我可以获取原始 html,但也许 puppeteer 已将其保存在某个地方。
puppeteer 在 goto() 之后是否保存页面源代码?
我正在 GCP 上定义一个云函数,用于在 Python 中抓取网站。
我从定义一个简单地打开 webdriver 的函数开始:
from selenium import webdriver
def launch_search(request):
# Starting a webdriver
driver = webdriver.Chrome()
return 'Success'
Run Code Online (Sandbox Code Playgroud)
这个功能不起作用(Error: could not handle the request当我触发它时),可能是因为我的远程机器上没有安装 Chrome 驱动程序。所以:
python selenium automation web-scraping google-cloud-platform
我正在编写一个 Prometheus 导出器,它必须读取不同的 CSV 文件。每个文件都包含过去一整天的数据(目标是让导出器每天读取一个新的 CSV 文件。每天都会将一个 CSV 文件上传到服务器,其中包含前一天的数据。
在 CSV 文件中,我每 5 分钟就有相同的指标。例如:
Date;Time;data
23.03.20;23:55:00;1
23.03.20;23:50:00;50
23.03.20;23:45:00;3
Run Code Online (Sandbox Code Playgroud)
我很难在普罗米修斯中正确添加这些数据。
class CSVCollector(object):
def collect(self):
# We list all the min files in the current directory
list_min = glob.glob("min*.csv")
metric = GaugeMetricFamily(
'day_tests_seconds',
'kw', labels=["jobname"])
for min in list_min :
with open(min) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=';')
line_count = 0
for row in csv_reader:
if line_count == 1:
correct_date_format = row[0][:6] + "20" + row[0][6:]
datetime_row = correct_date_format + ';' + …Run Code Online (Sandbox Code Playgroud) 这篇文章与这篇文章非常相似:使用 selenium 和 python 在鼠标悬停后弹出时提取数据
但我无法找到我想要的答案。
我正在尝试抓取与此非常相似的传单地图:https://leafletjs.com/examples/choropleth/,理想情况下,我想下载将鼠标移到多边形上后出现的所有信息:
原始帖子循环遍历每个圆元素,我想对每个多边形执行相同的操作。
代码试验:
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
driver = webdriver.Chrome
driver.get("https://leafletjs.com/examples/choropleth/")
timeout = 1000
explicit_wait30 = WebDriverWait(driver, 30)
try:
# Wait for all circles to load
poli = explicit_wait30.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.leaflet-interactive')))
except TimeoutException:
driver.refresh()
data = []
i=1
for circle in poli:
i+=1
# Execute mouseover on the element
driver.execute_script("const mouseoverEvent = new Event('mouseover');arguments[0].dispatchEvent(mouseoverEvent)", poli)
# Wait for the data to appear …Run Code Online (Sandbox Code Playgroud) 我正在为一个网站实现一个网络抓取脚本,该脚本将收集一些有用的信息。
脚本是使用 Puppeteer 库实现的,基本上是一组指令,例如:
我正在寻找某种方法来测试此功能。理想情况下,我想要做的是将真实的 HTML 响应“预保存”在测试文件夹中,然后模拟外部网站响应(确保它们始终相同)。然后断言收集到的信息是正确的。
我熟悉几种能够模拟fetch浏览器中功能端点的工具。我正在寻找类似的东西,但对于Puppeteer.
到目前为止,我正在考虑的唯一解决方案是使用browser实例作为我的脚本的依赖项。然后模拟newPage浏览器的方法返回带有自定义拦截器的页面。但这看起来工作量很大。
对此还有其他解决方案吗?
我正在构建一个网络抓取 API,大部分抓取都是通过 AsyncIO 协程完成的,如下所示:
async def parse_event(self):
do scraping
# call the func
asyncio.run(b.parse_event())
Run Code Online (Sandbox Code Playgroud)
这工作得很好,但是当我同时抓取concurrent.futures.ThreadPoolExecutor多个网站时,我首先使用多个线程来抓取。但由于我已经实现了协程逻辑,所以我现在无法asyncio.run直接在线程中使用该方法。
之前(没有协程):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
w1_future = executor.submit(self.w1.parse_event)
w2_future = executor.submit(self.w2.parse_event)
w3_future = executor.submit(self.w3.parse_event)
Run Code Online (Sandbox Code Playgroud)
之后,我预计会出现如下所示的情况
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
w1_future = executor.submit(asyncio.run(self.w1.parse_event))
w2_future = executor.submit(asyncio.run(self.w2.parse_event))
w3_future = executor.submit(asyncio.run(self.w3.parse_event))
Run Code Online (Sandbox Code Playgroud)
不幸的是它不起作用。
我正在逐步将来自网络抓取的数据行附加到 DataFrame 中。虽然,有时我正在抓取的数据已经存在于 DataFrame 中,所以我不想再次附加它。检查 DataFrame 是否已有数据的最有效方法是什么?在末尾删除重复项不是一个选项,因为我想提取特定数量的记录,并且在末尾删除重复项将使最终 DataFrame 的记录少于指定数量。
\nres = pd.DataFrame([], columns=GD_SCHEMA)\n\nreviews = self.browser.find_elements_by_class_name('empReview')\nidx = 0\nfor review in reviews:\n data = extract_review(review) # This is a dict with the same keys as \xc2\xb4res\xc2\xb4\n \n # Most efficient way to check if \xc2\xb4data\xc2\xb4 already exists in \xc2\xb4res\xc2\xb4 before appending?\n res.loc[idx] = data\n idx += 1\nRun Code Online (Sandbox Code Playgroud)\n python ×6
javascript ×2
puppeteer ×2
selenium ×2
aiohttp ×1
asynchronous ×1
automation ×1
cheerio ×1
coroutine ×1
dart ×1
django ×1
exporter ×1
jestjs ×1
jquery ×1
leaflet ×1
pandas ×1
prometheus ×1
ruby ×1
unit-testing ×1
web-scraping ×1
webmock ×1