小编Mic*_*ber的帖子

pyspark:从现有列创建MapType列

我需要根据现有列爬行新的Spark DF MapType列,其中列名是键,值是值.

作为例子 - 我有这个DF:

rdd = sc.parallelize([('123k', 1.3, 6.3, 7.6),
                      ('d23d', 1.5, 2.0, 2.2), 
                      ('as3d', 2.2, 4.3, 9.0)
                          ])
schema = StructType([StructField('key', StringType(), True),
                     StructField('metric1', FloatType(), True),
                     StructField('metric2', FloatType(), True),
                     StructField('metric3', FloatType(), True)])
df = sqlContext.createDataFrame(rdd, schema)

+----+-------+-------+-------+
| key|metric1|metric2|metric3|
+----+-------+-------+-------+
|123k|    1.3|    6.3|    7.6|
|d23d|    1.5|    2.0|    2.2|
|as3d|    2.2|    4.3|    9.0|
+----+-------+-------+-------+
Run Code Online (Sandbox Code Playgroud)

我已经到目前为止,我可以从这里创建一个structType:

nameCol = struct([name for name in df.columns if ("metric" in name)]).alias("metric")
df2 = df.select("key", nameCol)

+----+-------------+
| key|       metric|
+----+-------------+
|123k|[1.3,6.3,7.6]|
|d23d|[1.5,2.0,2.2]|
|as3d|[2.2,4.3,9.0]| …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

10
推荐指数
1
解决办法
9545
查看次数

每个解析的页面阻止爬行的大量请求

我抓了一个爬网,其中每个子页面包含300多个我需要遵循的链接.一分钟后,抓取速度会减慢/有时会以0页/分钟的速度爬行.

如果我以每页10-50页链接进行爬行,则不会显示相同的问题.

我已经配置了10个concurrent_requests和10个已处理的项目以及反应堆theradpool和400.这意味着每10个处理项目最多可达3.000个产量...

记录显示解析功能每页需要70多秒.记录显示此处所需的时间来自产量(每次产量最多需要2个秒).

似乎scrapy一直等到发动机?或类似的东西做了一个任务,并准备处理新的收益率请求?将请求添加到调度程序并不需要很长时间,所以在我看来,收益正在等待其他东西.

有什么提示可以调整或出错吗?

是否可以批量生成请求而不是单独产生每个请求?是否可以将它们添加到调度程序而不会产生它们?

一些额外的信息: - 如果我使用scrapy-redis或只是基于磁盘的调度程序没有区别. - 由于渲染javascript,下载爬网页面最多可能需要10秒钟. - Autothrottle被禁用 - 如果我提供更多的cpu资源,它不会加快速度.

telnet-> est()

time()-engine.start_time                        : 676.0599975585938
engine.has_capacity()                           : False
len(engine.downloader.active)                   : 7
engine.scraper.is_idle()                        : False
engine.spider.name                              : onetwothree
engine.spider_is_idle(engine.spider)            : False
engine.slot.closing                             : False
len(engine.slot.inprogress)                     : 28
len(engine.slot.scheduler.dqs or [])            : AttributeError (exception)
len(engine.slot.scheduler.mqs)                  : AttributeError (exception)
len(engine.scraper.slot.queue)                  : 0
len(engine.scraper.slot.active)                 : 21
engine.scraper.slot.active_size                 : 3878605
engine.scraper.slot.itemproc_size               : 0
engine.scraper.slot.needs_backout()             : False

AttributeError seems to come from scrapy-redis plugin, 
without scrapy counts up the pages …
Run Code Online (Sandbox Code Playgroud)

python scrapy

5
推荐指数
1
解决办法
304
查看次数

标签 统计

python ×2

apache-spark ×1

pyspark ×1

scrapy ×1