小编Bal*_*ala的帖子

将pandas数据帧转换为zeppelin中的spark数据帧

我是齐柏林飞艇的新手.我有一个用例,其中我有一个pandas数据帧.我需要使用zeppelin的内置图表来可视化集合我在这里没有明确的方法.我的理解是使用zeppelin,如果它是RDD格式,我们可以将数据可视化.所以,我想将pandas dataframe转换为spark数据帧,然后进行一些查询(使用sql),我会想象.首先,我试图将pandas数据帧转换为spark,但我失败了

%pyspark
import pandas as pd
from pyspark.sql import SQLContext
print sc
df = pd.DataFrame([("foo", 1), ("bar", 2)], columns=("k", "v"))
print type(df)
print df
sqlCtx = SQLContext(sc)
sqlCtx.createDataFrame(df).show()
Run Code Online (Sandbox Code Playgroud)

我得到了以下错误

Traceback (most recent call last): File "/tmp/zeppelin_pyspark.py", 
line 162, in <module> eval(compiledCode) File "<string>", 
line 8, in <module> File "/home/bala/Software/spark-1.5.0-bin-hadoop2.6/python/pyspark/sql/context.py", 
line 406, in createDataFrame rdd, schema = self._createFromLocal(data, schema) File "/home/bala/Software/spark-1.5.0-bin-hadoop2.6/python/pyspark/sql/context.py", 
line 322, in _createFromLocal struct = self._inferSchemaFromList(data) File "/home/bala/Software/spark-1.5.0-bin-hadoop2.6/python/pyspark/sql/context.py", 
line 211, in _inferSchemaFromList schema = _infer_schema(first) File "/home/bala/Software/spark-1.5.0-bin-hadoop2.6/python/pyspark/sql/types.py", 
line …
Run Code Online (Sandbox Code Playgroud)

dataframe pandas apache-spark apache-zeppelin

16
推荐指数
2
解决办法
3万
查看次数

齐柏林飞艇的Hello世界失败了

我刚刚安装了apache zeppelin(从git repo的最新源代码构建)并成功地看到它在端口10008中启动并运行.我用一行代码创建了一个新的笔记本

val a = "Hello World!"
Run Code Online (Sandbox Code Playgroud)

并运行此段并看到以下错误

java.net.ConnectException:连接被拒绝在java.net.PlainSocketImpl.socketConnect(本机方法)在java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)在java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)在java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)在java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)在java.net.Socket.connect(Socket.java:589)在org.apache. thrift.transport.TSocket.open(TSocket.java:182)在org.apache.zeppelin.interpreter.remote.ClientFactory.create(ClientFactory.java:51)在org.apache.zeppelin.interpreter.remote.ClientFactory.create( ClientFactory.java:37)位于org.apache.commons.pool2.impl的org.apache.commons.pool2.BasePooledObjectFactory.makeObject(BasePooledObjectFactory.java:60).GenericObjectPool.create(GenericObjectPool.java:861)在org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435)在org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java: 363)在org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess.getClient(RemoteInterpreterProcess.java:139)在org.apache.zeppelin.interpreter.remote.RemoteInterpreter.init(RemoteInterpreter.java:137)在org.apache.zeppelin位于org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java)的org.apache.zeppelin.interpreter.LazyOpenInterpreter.getFormType(LazyOpenInterpreter.java:104)中的.interpreter.remote.RemoteInterpreter.getFormType(RemoteInterpreter.java:257) :197)org.apache.zeppelin.scheduler.Job.run(Job.java:170)at org.apache.zeppelin.scheduler.RemoteScheduler $ JobRunner.运行(RemoteScheduler.java:304)在java.util.concurrent.Executors $ RunnableAdapter.call(Executors.java:511)在java.util.concurrent.FutureTask.run(FutureTask.java:266)在java.util.concurrent中.ScheduledThreadPoolExecutor $ ScheduledFutureTask.access $ 201(ScheduledThreadPoolExecutor.java:180)java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)ScheduledThreadPoolExecutor $ ScheduledFutureTask.access $ 201(ScheduledThreadPoolExecutor.java:180)java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745ScheduledThreadPoolExecutor $ ScheduledFutureTask.access $ 201(ScheduledThreadPoolExecutor.java:180)java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745

任何线索?

我的后端是火花1.5,我通过解释器的网络界面验证,齐柏林飞艇指向正确版本的火花并适应spark.home.

apache-spark apache-zeppelin

12
推荐指数
2
解决办法
2万
查看次数

flink作业不是跨机器分布的

我在Apache flink中有一个小用例,即批处理系统.我需要处理一组文件.每个文件的处理必须由一台机器处理.我有以下代码.始终只占用一个任务槽,并且一个接一个地处理文件.我有6个节点(所以6个任务管理器),并在每个节点配置4个任务槽.所以,我希望一次处理24个文件.

class MyMapPartitionFunction extends RichMapPartitionFunction[java.io.File, Int] {
  override def mapPartition(
      myfiles: java.lang.Iterable[java.io.File],
      out:org.apache.flink.util.Collector[Int])
    : Unit  =  {
    var temp = myfiles.iterator()
    while(temp.hasNext()){
      val fp1 = getRuntimeContext.getDistributedCache.getFile("hadoopRun.sh")
      val file = new File(temp.next().toURI)
      Process(
        "/bin/bash ./run.sh  " + argumentsList(3)+ "/" + file.getName + " " + argumentsList(7) + "/" + file.getName + ".csv",
        new File(fp1.getAbsoluteFile.getParent))
        .lines
        .foreach{println}
      out.collect(1)
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

我启动了flink as ./bin/start-cluster.sh命令,Web用户界面显示它有6个任务管理器,24个任务槽.

这些文件夹包含大约49个文件.当我在这个集合上创建mapPartition时,我希望跨越49个并行进程.但是,在我的基础设施中,它们都是一个接一个地处理的.这意味着只有一台机器(一个任务管理器)处理所有49个文件名.我想要的是,每个插槽配置2个任务,我希望同时处理24个文件.

任何指针肯定会有所帮助.我在flink-conf.yaml文件中有这些参数

jobmanager.heap.mb: 2048
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
parallelism.default: 24
Run Code Online (Sandbox Code Playgroud)

提前致谢.谁能让我知道我哪里出错了?

scala batch-processing apache-flink

9
推荐指数
1
解决办法
393
查看次数