小编Phi*_*sky的帖子

10
推荐指数
2
解决办法
4570
查看次数

使用python Spark将大型CSV发送到Kafka

我正在尝试向kafka发送一个大的CSV.基本结构是读取CSV的一行并用标题压缩它.

a = dict(zip(header, line.split(",")
Run Code Online (Sandbox Code Playgroud)

然后将其转换为json:

message = json.dumps(a)
Run Code Online (Sandbox Code Playgroud)

然后我使用kafka-python库发送消息

from kafka import SimpleProducer, KafkaClient
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
producer.send_messages("topic", message)
Run Code Online (Sandbox Code Playgroud)

使用PYSPARK我很容易从CSV文件中创建RDD消息

sc = SparkContext()
text = sc.textFile("file.csv")
header = text.first().split(',')
def remove_header(itr_index, itr):
    return iter(list(itr)[1:]) if itr_index == 0 else itr
noHeader = text.mapPartitionsWithIndex(remove_header)

messageRDD = noHeader.map(lambda x: json.dumps(dict(zip(header, x.split(","))
Run Code Online (Sandbox Code Playgroud)

现在我想发送这些消息:我定义了一个函数

def sendkafka(message):
  kafka = KafkaClient("localhost:9092")
  producer = SimpleProducer(kafka)
  return producer.send_messages('topic',message)
Run Code Online (Sandbox Code Playgroud)

然后我创建一个新的RDD来发送消息

sentRDD = messageRDD.map(lambda x: kafkasend(x))
Run Code Online (Sandbox Code Playgroud)

然后我调用sentRDD.count()

哪个开始搅拌和发送消息

不幸的是,这很慢.它每秒发送1000条消息.这是一个10节点的集群,每个集群有4个cpus和8GB的内存.

相比之下,在1000万行csv上创建消息大约需要7秒.〜约2gb

我认为问题是我在函数内实例化一个kafka生成器.但是,如果我不这样做就会引发抱怨,即使我尝试在全球范围内定义它,生产者也不存在.

也许有人可以阐明如何解决这个问题.

谢谢,

python apache-kafka apache-spark pyspark kafka-python

6
推荐指数
1
解决办法
5778
查看次数

在工作节点上安装SPARK模块

我在cloudera环境中以独立模式运行SPARK 1.3.我可以从ipython笔记本运行pyspark,但是只要我添加第二个工作节点,我的代码就会停止运行并返回错误.我很确定这是因为我的主设备上的模块对于工作节点是不可见的.我尝试导入numpy但是它没有用,即使我通过anaconda安装了我的工人numpy.我以同样的方式在master和worker上安装了anaconda.

但是,根据Josh Rosen的建议,我确保在工作节点上安装了库.

https://groups.google.com/forum/#!topic/spark-users/We_F8vlxvq0

但是,我似乎仍然遇到问题.包括我的工人不认识命令abs的事实.这是python 2.6中的标准

我正在运行的代码来自这篇文章:

https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python

def isprime(n):
    """
    check if integer n is a prime
    """
    # make sure n is a positive integer
    n = abs(int(n))
    # 0 and 1 are not primes
    if n < 2:
        return False
    # 2 is the only even prime number
    if n == 2:
        return True
    # all other even numbers are not primes
    if not n & 1:
        return False
    # range starts with 3 and only …
Run Code Online (Sandbox Code Playgroud)

python numpy apache-spark pyspark

4
推荐指数
1
解决办法
3804
查看次数