我正在用 Python 编写一些 ETL 流程,对于该流程的一部分,使用 Hive。根据文档,Cloudera 的 impyla 客户端可与 Impala 和 Hive 一起使用。
根据我的经验,客户端为 Impala 工作,但是当我尝试连接到 Hive 时挂起:
from impala.dbapi import connect
conn = connect(host='host_running_hs2_service', port=10000, user='awoolford', password='Bzzzzz')
cursor = conn.cursor() <- hangs here
cursor.execute('show tables')
results = cursor.fetchall()
print results
Run Code Online (Sandbox Code Playgroud)
如果我单步执行代码,它会在尝试打开会话时挂起(hiveserver2.py 的第 873 行)。
起初,我怀疑可能是防火墙端口阻止了连接,因此我尝试使用 Java 进行连接。令我惊讶的是,这有效:
public class Main {
private static String driverName = "org.apache.hive.jdbc.HiveDriver";
public static void main(String[] args) throws SQLException {
try {
Class.forName(driverName);
} catch (ClassNotFoundException e) {
e.printStackTrace();
System.exit(1); …Run Code Online (Sandbox Code Playgroud) 我有一个Uber jar执行一些级联ETL任务.jar的执行方式如下:
hadoop jar munge-data.jar
Run Code Online (Sandbox Code Playgroud)
我想在工作启动时将参数传递给jar,例如
hadoop jar munge-data.jar -Denv=prod
Run Code Online (Sandbox Code Playgroud)
将根据环境从属性文件中读取不同的凭据,主机名等.
如果作业被执行java jar munge-data.jar -Denv=prod,这将起作用,因为env可以访问该属性:
System.getProperty("env")
Run Code Online (Sandbox Code Playgroud)
但是,执行jar时这不起作用hadoop jar ....
我看到了一个类似的线程,其中回答者声明可以使用类似于org.apache.hadoop.conf.Configuration类的方法访问属性.从答案来看,我不清楚如何conf创建对象.我尝试了以下内容并返回null:
Configuration configuration = new Configuration();
System.out.println(configuration.get("env"));
Run Code Online (Sandbox Code Playgroud)
据推测,需要读取/设置配置属性.
你能告诉我如何将属性传递hadoop jar [...] -DsomeProperty=someValue给我的ETL工作吗?
我被问到如何从作为Oozie的Java动作运行的Spring Boot应用程序捕获日志输出.
我最初的想法是,可以编辑一些log4j属性来捕获YARN或Oozie中的应用程序日志.然后我发现,对于在各种群集节点上运行的特定应用程序,Kafka将是一种更容易捕获和聚合日志消息的方法.通过订阅主题而不是通过日志文件捕获来监视分布式系统要容易得多.
我注意到Kafka有一个log4j appender,所以我尝试创建一个最小的可重现的例子(发布在github上:https://github.com/alexwoolford/spring-boot-log-to-kafka-example).以下是一段摘录pom.xml:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.4.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>net.logstash.log4j</groupId>
<artifactId>jsonevent-layout</artifactId>
<version>1.7</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
</dependencies>
Run Code Online (Sandbox Code Playgroud)
我的log4j.properties文件如下所示:
log4j.rootLogger=INFO
log4j.appender.KAFKA=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.KAFKA.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.KAFKA.topic=logs
log4j.appender.KAFKA.brokerList=hdp-single-node:6667
log4j.appender.KAFKA.syncSend=true
log4j.appender.KAFKA.producer.type=async
log4j.logger.io.woolford=INFO, KAFKA
Run Code Online (Sandbox Code Playgroud)
这有效,除了它生成一个警告:
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See …Run Code Online (Sandbox Code Playgroud) 此链接中的PDF(http://www.lenovo.com/psref/pdf/psref450.pdf)包含许多这样的表:

我想以编程方式从这些表中提取数据和结构.
我尝试过的事情:使用PDF将PDF转换为HTML
我打算将PDF转换为HTML,然后使用BeautifulSoup解析它.
输出可以是JSON(例如,每个表一个对象),XML,或几乎任何维护结构的格式.
我创建了一个Vagrant/Ansible手册来构建单节点Kafka VM.
我们的想法是在原型设计时提供一些灵活性:如果我们想要一个快速而肮脏的Kafka消息队列,我们可以简单地git clone [my 'kafka in a box' repo],cd ..和vagrant up.
这是我到目前为止所做的:
Vagrantfile:
VAGRANTFILE_API_VERSION = "2"
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
config.vm.box = "hashicorp/precise64"
config.vm.network "forwarded_port", guest:9092, host: 9092
config.vm.provider "virtualbox" do |vb|
vb.customize ["modifyvm", :id, "--memory", "2048"]
end
config.vm.provision "ansible" do |ansible|
ansible.playbook = "kafkaPlaybook.yml"
end
end
Run Code Online (Sandbox Code Playgroud)
...和Ansible kafkaPlaybook.yml文件:
---
- hosts: all
user: vagrant
sudo: True
tasks:
- name: install linux packages
action: apt update_cache=yes pkg={{item}} state=installed
with_items:
- vim
- openjdk-7-jdk …Run Code Online (Sandbox Code Playgroud) 我有一个Hive表,用于跟踪在一个进程的各个阶段中移动的对象的状态.该表如下所示:
hive> desc journeys;
object_id string
journey_statuses array<string>
Run Code Online (Sandbox Code Playgroud)
以下是记录的典型示例:
12345678 ["A","A","A","B","B","B","C","C","C","C","D"]
Run Code Online (Sandbox Code Playgroud)
表中的记录是使用Hive 0.13生成的collect_list,并且状态有一个订单(如果订单不重要,我会使用collect_set).对于每个object_id,我想缩短旅程以按照它们出现的顺序返回旅程状态.
我写了一个从stdin读取的快速Python脚本:
#!/usr/bin/env python
import sys
import itertools
for line in sys.stdin:
inputList = eval(line.strip())
readahead = iter(inputList)
next(readahead)
result = []
for id, (a, b) in enumerate(itertools.izip(inputList, readahead)):
if id == 0:
result.append(a)
if a != b:
result.append(b)
print result
Run Code Online (Sandbox Code Playgroud)
我打算在Hive transform电话中使用它.它似乎在本地运行时工作:
$ echo '["A","A","A","B","B","B","C","C","C","C","D"]' | python abbreviate_list.py
['A', 'B', 'C', 'D']
Run Code Online (Sandbox Code Playgroud)
但是,当我添加文件并尝试在Hive中执行时,会返回错误:
hive> add file abbreviateList.py;
Added resource: abbreviateList.py
hive> select …Run Code Online (Sandbox Code Playgroud) 我正在尝试在Ubuntu 14.04上设置一个三节点Aerospike集群.除了IP地址/名称,每台机器都是相同的.我根据文档在每台机器上安装了Aerospike和管理控制台.
然后我编辑了网络/服务和网络/心跳部分/etc/aerospike/aerospike.conf:
network {
service {
address any
port 3000
access-address 10.0.1.11 # 10.0.1.12 and 10.0.1.13 on the other two nodes
}
heartbeat {
mode mesh
port 3002
mesh-seed-address-port 10.0.1.11 3002
mesh-seed-address-port 10.0.1.12 3002
mesh-seed-address-port 10.0.1.13 3002
interval 150
timeout 10
}
[...]
}
Run Code Online (Sandbox Code Playgroud)
当我sudo service aerospike start在每个节点上时,服务运行但它没有聚集.如果我尝试在管理控制台中添加另一个节点,它会通知我:"此处无法监视节点10.0.1.12:3000,因为它属于不同的集群."
你能看出我做错了什么吗?我应该aerospike.conf在每个节点上进行哪些更改,以便设置Aerospike集群而不是三个隔离的实例?
我想使用PySpark(Spark 1.6.2)对Hive表中存在的数值数据执行主成分分析(PCA).我能够将Hive表导入Spark数据帧:
>>> from pyspark.sql import HiveContext
>>> hiveContext = HiveContext(sc)
>>> dataframe = hiveContext.sql("SELECT * FROM my_table")
>>> type(dataframe)
<class 'pyspark.sql.dataframe.DataFrame'>
>>> dataframe.columns
['par001', 'par002', 'par003', etc...]
>>> dataframe.collect()
[Row(par001=1.1, par002=5.5, par003=8.2, etc...), Row(par001=0.0, par002=5.7, par003=4.2, etc...), etc...]
Run Code Online (Sandbox Code Playgroud)
有一个很棒的StackOverflow帖子,展示了如何在PySpark中执行PCA:https://stackoverflow.com/a/33481471/2626491
在帖子的"测试"部分,@ assellnaut创建了一个只有一列的数据框(称为"要素"):
>>> from pyspark.ml.feature import *
>>> from pyspark.mllib.linalg import Vectors
>>> data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),),
... (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
... (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
>>> df = sqlContext.createDataFrame(data,["features"])
>>> type(df) …Run Code Online (Sandbox Code Playgroud) 我在HDFS中有一些事件日志数据,其原始格式如下所示:
2015-11-05 19:36:25.764 INFO [...etc...]
Run Code Online (Sandbox Code Playgroud)
外部表指向此HDFS位置:
CREATE EXTERNAL TABLE `log_stage`(
`event_time` timestamp,
[...])
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
Run Code Online (Sandbox Code Playgroud)
为了提高性能,我们想在Impala中进行查询.log_stage通过执行Hive查询将数据插入到Hive/Impala Parquet支持的表中:INSERT INTO TABLE log SELECT * FROM log_stage.这是Parquet表的DDL:
CREATE TABLE `log`(
`event_time` timestamp,
[...])
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
Run Code Online (Sandbox Code Playgroud)
问题:在Impala中查询时,时间戳提前7个小时:
Hive time: 2015-11-05 19:36:25.764
Impala time: 2015-11-06 02:36:25.764
> as.POSIXct("2015-11-06 02:36:25") - as.POSIXct("2015-11-05 19:36:25")
Time difference of 7 hours
Run Code Online (Sandbox Code Playgroud)
注意:服务器的时区(from …
我想跟踪 YARN 中的一些相关应用程序。它们是通过命令行提交的,例如
yarn jar hadoop-mapreduce-examples.jar pi 10 100
Run Code Online (Sandbox Code Playgroud)
Python 有一个非常易于使用的 YARN 客户端,它返回以下内容:
finalStatus = SUCCEEDED
id = application_1458083392566_0929
state = FINISHED
name = QuasiMonteCarlo
applicationType = MAPREDUCE
user = awoolford
applicationTags =
[...etc...]
Run Code Online (Sandbox Code Playgroud)
我注意到有一个applicationTags房产。这将是跟踪相关应用程序组的理想方法。我尝试通过设置它HADOOP_CLIENT_OPTS,例如
HADOOP_CLIENT_OPTS="-DapplicationTags=batch123,chunk62" hadoop jar [...etc...]
Run Code Online (Sandbox Code Playgroud)
...但是applicationTags当我尝试通过 Python 客户端检索字符串时,该字符串没有显示在 YARN 中。
问)如何提交 YARN 作业并applicationTags从命令行填充属性?
python ×4
hadoop ×3
hive ×3
apache-kafka ×2
aerospike ×1
apache-spark ×1
apache-tika ×1
hadoop-yarn ×1
impala ×1
jar ×1
java ×1
log4j ×1
logging ×1
parsing ×1
pca ×1
pdf ×1
pdfbox ×1
pyspark ×1
spring-boot ×1
timezone ×1
vagrant ×1