小编Ale*_*ord的帖子

在运行时增加PySpark可用的内存

我正在尝试使用Spark构建一个推荐程序,但内存不足:

Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
Run Code Online (Sandbox Code Playgroud)

我想通过spark.executor.memory在运行时修改PySpark中的属性来增加Spark可用的内存.

那可能吗?如果是这样,怎么样?

更新

受@ zero323注释中链接的启发,我试图在PySpark中删除并重新创建上下文:

del sc
from pyspark import SparkConf, SparkContext
conf = (SparkConf().setMaster("http://hadoop01.woolford.io:7077").setAppName("recommender").set("spark.executor.memory", "2g"))
sc = SparkContext(conf = conf)
Run Code Online (Sandbox Code Playgroud)

回:

ValueError: Cannot run multiple SparkContexts at once;
Run Code Online (Sandbox Code Playgroud)

这很奇怪,因为:

>>> sc
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
NameError: name 'sc' is not defined
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

27
推荐指数
2
解决办法
3万
查看次数

如何有效地将数据从Kafka移动到Impala表?

以下是当前流程的步骤:

  1. Flafka将日志写入HDFS上的"着陆区".
  2. 由Oozie安排的工作将完整文件从着陆区复制到临时区域.
  3. 临时数据由Hive表"架构化",该表使用暂存区域作为其位置.
  4. 来自登台表的记录被添加到永久Hive表(例如insert into permanent_table select * from staging_table).
  5. 来自Hive表的数据在Impala中通过refresh permanent_table在Impala中执行而可用.

现有数据流

我看看我构建的过程并且"闻起来"很糟糕:有太多的中间步骤会影响数据流.

大约20个月前,我看到了一个演示,其中数据从Amazon Kinesis管道流式传输,并且可以近乎实时地被Impala查询.我不认为他们做了一件非常丑陋/错综复杂的事情.有没有更有效的方法将数据从Kafka传输到Impala(可能是可以序列化为Parquet的Kafka消费者)?

我认为"将数据流式传输到低延迟SQL"必定是一个相当常见的用例,所以我很想知道其他人是如何解决这个问题的.

hadoop flume apache-kafka impala

13
推荐指数
1
解决办法
1602
查看次数

使用HappyBase连接池的PySpark dataframe.foreach()返回'TypeError:无法pickle thread.lock对象'

我有一个PySpark作业,用于更新HBase中的一些对象(Spark v1.6.0; happybase v0.9).

如果我打开/关闭每行的HBase连接,它会有效:

def process_row(row):
    conn = happybase.Connection(host=[hbase_master])
    # update HBase record with data from row
    conn.close()

my_dataframe.foreach(process_row)
Run Code Online (Sandbox Code Playgroud)

几千次upserts后,我们开始看到这样的错误:

TTransportException: Could not connect to [hbase_master]:9090
Run Code Online (Sandbox Code Playgroud)

显然,为每个upsert打开/关闭连接效率很低.这个函数实际上只是一个适当解决方案的占位符.

然后我尝试创建一个process_row使用连接池的函数版本:

pool = happybase.ConnectionPool(size=20, host=[hbase_master])

def process_row(row):
    with pool.connection() as conn:
        # update HBase record with data from row
Run Code Online (Sandbox Code Playgroud)

由于某种原因,此函数的连接池版本返回错误(请参阅完整的错误消息):

TypeError: can't pickle thread.lock objects
Run Code Online (Sandbox Code Playgroud)

你能看出我做错了什么吗?

更新

我看到这篇文章并怀疑我遇到了同样的问题:Spark尝试序列化pool对象并将其分发给每个执行程序,但是这个连接池对象不能在多个执行程序之间共享.

听起来我需要将数据集拆分为分区,并且每个分区使用一个连接(请参阅使用foreachrdd的设计模式).我根据文档中的示例尝试了这个:

def persist_to_hbase(dataframe_partition):
    hbase_connection = happybase.Connection(host=[hbase_master])
    for row in dataframe_partition:
        # persist data
    hbase_connection.close() …
Run Code Online (Sandbox Code Playgroud)

python happybase apache-spark pyspark

10
推荐指数
1
解决办法
2057
查看次数

将Spark数据框保存到Hive:table不可读,因为"镶木地板不是SequenceFile"

我想使用PySpark将Spark(v 1.3.0)数据框中的数据保存到Hive表中.

文件规定:

"spark.sql.hive.convertMetastoreParquet:当设置为false时,Spark SQL将使用Hive SerDe作为镶木桌而不是内置支持."

看看Spark教程,似乎可以设置这个属性:

from pyspark.sql import HiveContext

sqlContext = HiveContext(sc)
sqlContext.sql("SET spark.sql.hive.convertMetastoreParquet=false")

# code to create dataframe

my_dataframe.saveAsTable("my_dataframe")
Run Code Online (Sandbox Code Playgroud)

但是,当我尝试查询Hive中保存的表时,它返回:

hive> select * from my_dataframe;
OK
Failed with exception java.io.IOException:java.io.IOException: 
hdfs://hadoop01.woolford.io:8020/user/hive/warehouse/my_dataframe/part-r-00001.parquet
not a SequenceFile
Run Code Online (Sandbox Code Playgroud)

如何保存表格,使其在Hive中立即可读?

hive apache-spark apache-spark-sql pyspark

9
推荐指数
1
解决办法
2万
查看次数

AppNexus API未返回的维度

我正在尝试与AppNexus报告API进行集成,遇到问题,并想知道StackOverflow社区中是否有人有分享的洞察力.

AppNexus API有一个步行使用curl和它的排序工作,除了不返回组/维度.这是我做的:

有一个名为auth包含我们凭据的文件:

# JSON file containing our credentials
$ cat auth
{
    "auth": {
        "username" : "ourAppNexusApiUsername",
        "password" : "ourSecretApiUserPassword"
    }
}
Run Code Online (Sandbox Code Playgroud)

还有一个包含查询的JSON文件.请注意"列"列表中的维度:

# The query itself, in JSON format. 
$ cat query.json 
{
    "report": {
        "format": "csv",
        "report_interval": "yesterday",
        "groups": [
            "publisher_id",
            "imp_type",
            "geo_country",
            "placement_id"
        ],
        "columns": [
            "imps_total",
            "imps_kept",
            "imps_resold",
            "publisher_filled_revenue",
            "total_convs"
        ],
        "report_type": "publisher_analytics"
    }
}
Run Code Online (Sandbox Code Playgroud)

我可以验证:

$ curl -b cookies -c cookies -X POST -d @auth 'https://api.appnexus.com/auth'

{"response":{"status":"OK","token":"hbapi:133820:5571c87753c27:nym2","dbg_info":{"instance":"56.bm-hbapi.prod.lax1","slave_hit":false,"db":"master","parent_dbg_info":{"instance":"63.bm-hbapi.prod.nym2","slave_hit":false,"db":"master","parent_dbg_info":{"instance":"38.bm-api.prod.nym2","slave_hit":false,"db":"master","time":482.32913017273,"version":"1.15.279","warnings":[],"slave_lag":0,"start_microtime":1433520246.311},"awesomesauce_cache_used":false,"count_cache_used":false,"warnings":[],"time":1078.0298709869,"start_microtime":1433520246.2796,"version":"1.15.527","slave_lag":0,"output_term":"not_found"},"awesomesauce_cache_used":false,"count_cache_used":false,"warnings":[],"time":1360.9290122986,"start_microtime":1433520246.1491,"version":"1.15.527","slave_lag":1,"output_term":"not_found","master_instance":"63.bm-hbapi.prod.nym2","proxy":true,"master_time":1078.0298709869}}} …
Run Code Online (Sandbox Code Playgroud)

api rest

8
推荐指数
1
解决办法
1050
查看次数

Java中的Spark作业:如何在群集上运行时从"资源"访问文件

我用Java写了一个Spark工作.该作业打包为阴影jar并执行:

spark-submit my-jar.jar
Run Code Online (Sandbox Code Playgroud)

在代码中,有一些文件(Freemarker模板)驻留在src/main/resources/templates.在本地运行时,我可以访问文件:

File[] files = new File("src/main/resources/templates/").listFiles();
Run Code Online (Sandbox Code Playgroud)

在集群上运行作业时,执行上一行时将返回空指针异常.

如果我运行,jar tf my-jar.jar我可以看到文件打包在一个templates/文件夹中:

 [...]
 templates/
 templates/my_template.ftl
 [...]
Run Code Online (Sandbox Code Playgroud)

我只是无法阅读它们; 我怀疑.listFiles()尝试访问群集节点上的本地文件系统,并且文件不存在.

我很想知道如何打包要在自包含的Spark作业中使用的文件.我不想在工作之外将它们复制到HDFS,因为维护变得很麻烦.

java apache-spark

8
推荐指数
2
解决办法
7243
查看次数

Ansible:有sudo但没有root

我想使用Ansible来管理我们的Hadoop集群(运行Red Hat)的配置.

我有sudo访问权限,可以手动ssh进入节点执行命令.但是,当我尝试运行Ansible模块来执行相同的任务时,我遇到了问题.虽然我有权sudo访问,但我不能成为root用户.当我尝试执行需要提升权限的Ansible脚本时,我收到如下错误:

对不起,用户awoolford不允许在[some_hadoop_node]上执行'/ bin/bash -c echo BECOME-SUCCESS- [...]/usr/bin/python /tmp/ansible-tmp-1446662360.01-231435525506280/copy'作为awoolford.

浏览文档,我认为该become_allow_same_user属性可能会解决此问题,因此我将以下内容添加到ansible.cfg:

[privilege_escalation]
become_allow_same_user=yes
Run Code Online (Sandbox Code Playgroud)

不幸的是,它没有用.

这篇文章表明我需要sudo /bin/sh(或其他一些shell)的权限.不幸的是,出于安全原因,这是不可能的.以下是一段摘录/etc/sudoers:

root            ALL=(ALL)   ALL
awoolford       ALL=(ALL)   ALL, !SU, !SHELLS, !RESTRICT
Run Code Online (Sandbox Code Playgroud)

Ansible可以在这样的环境中工作吗?如果是这样,我做错了什么?

redhat ansible ansible-playbook

7
推荐指数
1
解决办法
1698
查看次数

Hive可以递归地下载到没有分区或编辑hive-site.xml的子目录中吗?

我有一些我想用Hive查询的Web服务器日志.HDFS中的目录结构如下所示:

/data/access/web1/2014/09
/data/access/web1/2014/09/access-20140901.log
[... etc ...]
/data/access/web1/2014/10
/data/access/web1/2014/10/access-20141001.log
[... etc ...]
/data/access/web2/2014/09
/data/access/web2/2014/09/access-20140901.log
[... etc ...]
/data/access/web2/2014/10
/data/access/web2/2014/10/access-20141001.log
[... etc ...]
Run Code Online (Sandbox Code Playgroud)

我能够创建一个外部表:

CREATE EXTERNAL TABLE access(
  host STRING,
  identity STRING,
  user STRING,
  time STRING,
  request STRING,
  status STRING,
  size STRING,
  referer STRING,
  agent STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
  "input.regex" = "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) ([^ \"]*|\"[^\"]*\") (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\") ([^ \"]*|\"[^\"]*\"))?",
  "output.format.string" = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s")
LOCATION …
Run Code Online (Sandbox Code Playgroud)

hadoop hive mapreduce

6
推荐指数
1
解决办法
2万
查看次数

Scrapy`ReactorNotRestartable`:一个运行两个(或更多)蜘蛛的类

我使用两阶段爬行将每日数据与Scrapy聚合在一起.第一阶段从索引页面生成URL列表,第二阶段将列表中每个URL的HTML写入Kafka主题.

用于Scrapy爬虫的kafka集群

尽管抓取的两个组件是相关的,但我希望它们是独立的:url_generator它将作为计划任务每​​天page_requester运行一次,并且会持续运行,在可用时处理URL.为了"礼貌",我将进行调整,DOWNLOAD_DELAY以便爬虫在24小时内完成,但对网站施加最小负荷.

我创建了一个CrawlerRunner类,它具有生成URL和检索HTML的功能:

from twisted.internet import reactor
from scrapy.crawler import Crawler
from scrapy import log, signals
from scrapy_somesite.spiders.create_urls_spider import CreateSomeSiteUrlList
from scrapy_somesite.spiders.crawl_urls_spider import SomeSiteRetrievePages
from scrapy.utils.project import get_project_settings
import os
import sys

class CrawlerRunner:

    def __init__(self):
        sys.path.append(os.path.join(os.path.curdir, "crawl/somesite"))
        os.environ['SCRAPY_SETTINGS_MODULE'] = 'scrapy_somesite.settings'
        self.settings = get_project_settings()
        log.start()

    def create_urls(self):
        spider = CreateSomeSiteUrlList()
        crawler_create_urls = Crawler(self.settings)
        crawler_create_urls.signals.connect(reactor.stop, signal=signals.spider_closed)
        crawler_create_urls.configure()
        crawler_create_urls.crawl(spider)
        crawler_create_urls.start()
        reactor.run()

    def crawl_urls(self):
        spider = SomeSiteRetrievePages()
        crawler_crawl_urls = Crawler(self.settings)
        crawler_crawl_urls.signals.connect(reactor.stop, signal=signals.spider_closed)
        crawler_crawl_urls.configure()
        crawler_crawl_urls.crawl(spider)
        crawler_crawl_urls.start() …
Run Code Online (Sandbox Code Playgroud)

twisted scrapy scrapy-spider

6
推荐指数
1
解决办法
6189
查看次数

连接到 HiveServer2 时 impyla 挂起

我正在用 Python 编写一些 ETL 流程,对于该流程的一部分,使用 Hive。根据文档,Cloudera 的 impyla 客户端可与 Impala 和 Hive 一起使用。

根据我的经验,客户端为 Impala 工作,但是当我尝试连接到 Hive 时挂起:

from impala.dbapi import connect

conn = connect(host='host_running_hs2_service', port=10000, user='awoolford', password='Bzzzzz')
cursor = conn.cursor()          <- hangs here
cursor.execute('show tables')
results = cursor.fetchall()
print results
Run Code Online (Sandbox Code Playgroud)

如果我单步执行代码,它会在尝试打开会话时挂起(hiveserver2.py 的第 873 行)。

起初,我怀疑可能是防火墙端口阻止了连接,因此我尝试使用 Java 进行连接。令我惊讶的是,这有效:

public class Main {
    private static String driverName = "org.apache.hive.jdbc.HiveDriver";
    public static void main(String[] args) throws SQLException {
        try {
            Class.forName(driverName);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            System.exit(1); …
Run Code Online (Sandbox Code Playgroud)

python hive

6
推荐指数
1
解决办法
4446
查看次数