我正在尝试使用Spark构建一个推荐程序,但内存不足:
Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
Run Code Online (Sandbox Code Playgroud)
我想通过spark.executor.memory在运行时修改PySpark中的属性来增加Spark可用的内存.
那可能吗?如果是这样,怎么样?
更新
受@ zero323注释中链接的启发,我试图在PySpark中删除并重新创建上下文:
del sc
from pyspark import SparkConf, SparkContext
conf = (SparkConf().setMaster("http://hadoop01.woolford.io:7077").setAppName("recommender").set("spark.executor.memory", "2g"))
sc = SparkContext(conf = conf)
Run Code Online (Sandbox Code Playgroud)
回:
ValueError: Cannot run multiple SparkContexts at once;
Run Code Online (Sandbox Code Playgroud)
这很奇怪,因为:
>>> sc
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
NameError: name 'sc' is not defined
Run Code Online (Sandbox Code Playgroud) 以下是当前流程的步骤:
insert into permanent_table select * from staging_table).refresh permanent_table在Impala中执行而可用.我看看我构建的过程并且"闻起来"很糟糕:有太多的中间步骤会影响数据流.
大约20个月前,我看到了一个演示,其中数据从Amazon Kinesis管道流式传输,并且可以近乎实时地被Impala查询.我不认为他们做了一件非常丑陋/错综复杂的事情.有没有更有效的方法将数据从Kafka传输到Impala(可能是可以序列化为Parquet的Kafka消费者)?
我认为"将数据流式传输到低延迟SQL"必定是一个相当常见的用例,所以我很想知道其他人是如何解决这个问题的.
我有一个PySpark作业,用于更新HBase中的一些对象(Spark v1.6.0; happybase v0.9).
如果我打开/关闭每行的HBase连接,它会有效:
def process_row(row):
conn = happybase.Connection(host=[hbase_master])
# update HBase record with data from row
conn.close()
my_dataframe.foreach(process_row)
Run Code Online (Sandbox Code Playgroud)
几千次upserts后,我们开始看到这样的错误:
Run Code Online (Sandbox Code Playgroud)TTransportException: Could not connect to [hbase_master]:9090
显然,为每个upsert打开/关闭连接效率很低.这个函数实际上只是一个适当解决方案的占位符.
然后我尝试创建一个process_row使用连接池的函数版本:
pool = happybase.ConnectionPool(size=20, host=[hbase_master])
def process_row(row):
with pool.connection() as conn:
# update HBase record with data from row
Run Code Online (Sandbox Code Playgroud)
由于某种原因,此函数的连接池版本返回错误(请参阅完整的错误消息):
Run Code Online (Sandbox Code Playgroud)TypeError: can't pickle thread.lock objects
你能看出我做错了什么吗?
我看到这篇文章并怀疑我遇到了同样的问题:Spark尝试序列化pool对象并将其分发给每个执行程序,但是这个连接池对象不能在多个执行程序之间共享.
听起来我需要将数据集拆分为分区,并且每个分区使用一个连接(请参阅使用foreachrdd的设计模式).我根据文档中的示例尝试了这个:
def persist_to_hbase(dataframe_partition):
hbase_connection = happybase.Connection(host=[hbase_master])
for row in dataframe_partition:
# persist data
hbase_connection.close() …Run Code Online (Sandbox Code Playgroud) 我想使用PySpark将Spark(v 1.3.0)数据框中的数据保存到Hive表中.
该文件规定:
"spark.sql.hive.convertMetastoreParquet:当设置为false时,Spark SQL将使用Hive SerDe作为镶木桌而不是内置支持."
看看Spark教程,似乎可以设置这个属性:
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.sql("SET spark.sql.hive.convertMetastoreParquet=false")
# code to create dataframe
my_dataframe.saveAsTable("my_dataframe")
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试查询Hive中保存的表时,它返回:
hive> select * from my_dataframe;
OK
Failed with exception java.io.IOException:java.io.IOException:
hdfs://hadoop01.woolford.io:8020/user/hive/warehouse/my_dataframe/part-r-00001.parquet
not a SequenceFile
Run Code Online (Sandbox Code Playgroud)
如何保存表格,使其在Hive中立即可读?
我正在尝试与AppNexus报告API进行集成,遇到问题,并想知道StackOverflow社区中是否有人有分享的洞察力.
AppNexus API有一个步行使用curl和它的排序工作,除了不返回组/维度.这是我做的:
有一个名为auth包含我们凭据的文件:
# JSON file containing our credentials
$ cat auth
{
"auth": {
"username" : "ourAppNexusApiUsername",
"password" : "ourSecretApiUserPassword"
}
}
Run Code Online (Sandbox Code Playgroud)
还有一个包含查询的JSON文件.请注意"列"列表中的维度:
# The query itself, in JSON format.
$ cat query.json
{
"report": {
"format": "csv",
"report_interval": "yesterday",
"groups": [
"publisher_id",
"imp_type",
"geo_country",
"placement_id"
],
"columns": [
"imps_total",
"imps_kept",
"imps_resold",
"publisher_filled_revenue",
"total_convs"
],
"report_type": "publisher_analytics"
}
}
Run Code Online (Sandbox Code Playgroud)
我可以验证:
$ curl -b cookies -c cookies -X POST -d @auth 'https://api.appnexus.com/auth'
{"response":{"status":"OK","token":"hbapi:133820:5571c87753c27:nym2","dbg_info":{"instance":"56.bm-hbapi.prod.lax1","slave_hit":false,"db":"master","parent_dbg_info":{"instance":"63.bm-hbapi.prod.nym2","slave_hit":false,"db":"master","parent_dbg_info":{"instance":"38.bm-api.prod.nym2","slave_hit":false,"db":"master","time":482.32913017273,"version":"1.15.279","warnings":[],"slave_lag":0,"start_microtime":1433520246.311},"awesomesauce_cache_used":false,"count_cache_used":false,"warnings":[],"time":1078.0298709869,"start_microtime":1433520246.2796,"version":"1.15.527","slave_lag":0,"output_term":"not_found"},"awesomesauce_cache_used":false,"count_cache_used":false,"warnings":[],"time":1360.9290122986,"start_microtime":1433520246.1491,"version":"1.15.527","slave_lag":1,"output_term":"not_found","master_instance":"63.bm-hbapi.prod.nym2","proxy":true,"master_time":1078.0298709869}}} …Run Code Online (Sandbox Code Playgroud) 我用Java写了一个Spark工作.该作业打包为阴影jar并执行:
spark-submit my-jar.jar
Run Code Online (Sandbox Code Playgroud)
在代码中,有一些文件(Freemarker模板)驻留在src/main/resources/templates.在本地运行时,我可以访问文件:
File[] files = new File("src/main/resources/templates/").listFiles();
Run Code Online (Sandbox Code Playgroud)
在集群上运行作业时,执行上一行时将返回空指针异常.
如果我运行,jar tf my-jar.jar我可以看到文件打包在一个templates/文件夹中:
[...]
templates/
templates/my_template.ftl
[...]
Run Code Online (Sandbox Code Playgroud)
我只是无法阅读它们; 我怀疑.listFiles()尝试访问群集节点上的本地文件系统,并且文件不存在.
我很想知道如何打包要在自包含的Spark作业中使用的文件.我不想在工作之外将它们复制到HDFS,因为维护变得很麻烦.
我想使用Ansible来管理我们的Hadoop集群(运行Red Hat)的配置.
我有sudo访问权限,可以手动ssh进入节点执行命令.但是,当我尝试运行Ansible模块来执行相同的任务时,我遇到了问题.虽然我有权sudo访问,但我不能成为root用户.当我尝试执行需要提升权限的Ansible脚本时,我收到如下错误:
对不起,用户awoolford不允许在[some_hadoop_node]上执行'/ bin/bash -c echo BECOME-SUCCESS- [...]/usr/bin/python /tmp/ansible-tmp-1446662360.01-231435525506280/copy'作为awoolford.
浏览文档,我认为该become_allow_same_user属性可能会解决此问题,因此我将以下内容添加到ansible.cfg:
[privilege_escalation]
become_allow_same_user=yes
Run Code Online (Sandbox Code Playgroud)
不幸的是,它没有用.
这篇文章表明我需要sudo /bin/sh(或其他一些shell)的权限.不幸的是,出于安全原因,这是不可能的.以下是一段摘录/etc/sudoers:
root ALL=(ALL) ALL
awoolford ALL=(ALL) ALL, !SU, !SHELLS, !RESTRICT
Run Code Online (Sandbox Code Playgroud)
Ansible可以在这样的环境中工作吗?如果是这样,我做错了什么?
我有一些我想用Hive查询的Web服务器日志.HDFS中的目录结构如下所示:
/data/access/web1/2014/09
/data/access/web1/2014/09/access-20140901.log
[... etc ...]
/data/access/web1/2014/10
/data/access/web1/2014/10/access-20141001.log
[... etc ...]
/data/access/web2/2014/09
/data/access/web2/2014/09/access-20140901.log
[... etc ...]
/data/access/web2/2014/10
/data/access/web2/2014/10/access-20141001.log
[... etc ...]
Run Code Online (Sandbox Code Playgroud)
我能够创建一个外部表:
CREATE EXTERNAL TABLE access(
host STRING,
identity STRING,
user STRING,
time STRING,
request STRING,
status STRING,
size STRING,
referer STRING,
agent STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" = "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) ([^ \"]*|\"[^\"]*\") (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\") ([^ \"]*|\"[^\"]*\"))?",
"output.format.string" = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s")
LOCATION …Run Code Online (Sandbox Code Playgroud) 我使用两阶段爬行将每日数据与Scrapy聚合在一起.第一阶段从索引页面生成URL列表,第二阶段将列表中每个URL的HTML写入Kafka主题.

尽管抓取的两个组件是相关的,但我希望它们是独立的:url_generator它将作为计划任务每天page_requester运行一次,并且会持续运行,在可用时处理URL.为了"礼貌",我将进行调整,DOWNLOAD_DELAY以便爬虫在24小时内完成,但对网站施加最小负荷.
我创建了一个CrawlerRunner类,它具有生成URL和检索HTML的功能:
from twisted.internet import reactor
from scrapy.crawler import Crawler
from scrapy import log, signals
from scrapy_somesite.spiders.create_urls_spider import CreateSomeSiteUrlList
from scrapy_somesite.spiders.crawl_urls_spider import SomeSiteRetrievePages
from scrapy.utils.project import get_project_settings
import os
import sys
class CrawlerRunner:
def __init__(self):
sys.path.append(os.path.join(os.path.curdir, "crawl/somesite"))
os.environ['SCRAPY_SETTINGS_MODULE'] = 'scrapy_somesite.settings'
self.settings = get_project_settings()
log.start()
def create_urls(self):
spider = CreateSomeSiteUrlList()
crawler_create_urls = Crawler(self.settings)
crawler_create_urls.signals.connect(reactor.stop, signal=signals.spider_closed)
crawler_create_urls.configure()
crawler_create_urls.crawl(spider)
crawler_create_urls.start()
reactor.run()
def crawl_urls(self):
spider = SomeSiteRetrievePages()
crawler_crawl_urls = Crawler(self.settings)
crawler_crawl_urls.signals.connect(reactor.stop, signal=signals.spider_closed)
crawler_crawl_urls.configure()
crawler_crawl_urls.crawl(spider)
crawler_crawl_urls.start() …Run Code Online (Sandbox Code Playgroud) 我正在用 Python 编写一些 ETL 流程,对于该流程的一部分,使用 Hive。根据文档,Cloudera 的 impyla 客户端可与 Impala 和 Hive 一起使用。
根据我的经验,客户端为 Impala 工作,但是当我尝试连接到 Hive 时挂起:
from impala.dbapi import connect
conn = connect(host='host_running_hs2_service', port=10000, user='awoolford', password='Bzzzzz')
cursor = conn.cursor() <- hangs here
cursor.execute('show tables')
results = cursor.fetchall()
print results
Run Code Online (Sandbox Code Playgroud)
如果我单步执行代码,它会在尝试打开会话时挂起(hiveserver2.py 的第 873 行)。
起初,我怀疑可能是防火墙端口阻止了连接,因此我尝试使用 Java 进行连接。令我惊讶的是,这有效:
public class Main {
private static String driverName = "org.apache.hive.jdbc.HiveDriver";
public static void main(String[] args) throws SQLException {
try {
Class.forName(driverName);
} catch (ClassNotFoundException e) {
e.printStackTrace();
System.exit(1); …Run Code Online (Sandbox Code Playgroud)