我是齐柏林飞艇的新手.我有一个用例,其中我有一个pandas数据帧.我需要使用zeppelin的内置图表来可视化集合我在这里没有明确的方法.我的理解是使用zeppelin,如果它是RDD格式,我们可以将数据可视化.所以,我想将pandas dataframe转换为spark数据帧,然后进行一些查询(使用sql),我会想象.首先,我试图将pandas数据帧转换为spark,但我失败了
%pyspark
import pandas as pd
from pyspark.sql import SQLContext
print sc
df = pd.DataFrame([("foo", 1), ("bar", 2)], columns=("k", "v"))
print type(df)
print df
sqlCtx = SQLContext(sc)
sqlCtx.createDataFrame(df).show()
Run Code Online (Sandbox Code Playgroud)
我得到了以下错误
Traceback (most recent call last): File "/tmp/zeppelin_pyspark.py",
line 162, in <module> eval(compiledCode) File "<string>",
line 8, in <module> File "/home/bala/Software/spark-1.5.0-bin-hadoop2.6/python/pyspark/sql/context.py",
line 406, in createDataFrame rdd, schema = self._createFromLocal(data, schema) File "/home/bala/Software/spark-1.5.0-bin-hadoop2.6/python/pyspark/sql/context.py",
line 322, in _createFromLocal struct = self._inferSchemaFromList(data) File "/home/bala/Software/spark-1.5.0-bin-hadoop2.6/python/pyspark/sql/context.py",
line 211, in _inferSchemaFromList schema = _infer_schema(first) File "/home/bala/Software/spark-1.5.0-bin-hadoop2.6/python/pyspark/sql/types.py",
line …
Run Code Online (Sandbox Code Playgroud) 我刚刚安装了apache zeppelin(从git repo的最新源代码构建)并成功地看到它在端口10008中启动并运行.我用一行代码创建了一个新的笔记本
val a = "Hello World!"
Run Code Online (Sandbox Code Playgroud)
并运行此段并看到以下错误
java.net.ConnectException:连接被拒绝在java.net.PlainSocketImpl.socketConnect(本机方法)在java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)在java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)在java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)在java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)在java.net.Socket.connect(Socket.java:589)在org.apache. thrift.transport.TSocket.open(TSocket.java:182)在org.apache.zeppelin.interpreter.remote.ClientFactory.create(ClientFactory.java:51)在org.apache.zeppelin.interpreter.remote.ClientFactory.create( ClientFactory.java:37)位于org.apache.commons.pool2.impl的org.apache.commons.pool2.BasePooledObjectFactory.makeObject(BasePooledObjectFactory.java:60).GenericObjectPool.create(GenericObjectPool.java:861)在org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435)在org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java: 363)在org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess.getClient(RemoteInterpreterProcess.java:139)在org.apache.zeppelin.interpreter.remote.RemoteInterpreter.init(RemoteInterpreter.java:137)在org.apache.zeppelin位于org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java)的org.apache.zeppelin.interpreter.LazyOpenInterpreter.getFormType(LazyOpenInterpreter.java:104)中的.interpreter.remote.RemoteInterpreter.getFormType(RemoteInterpreter.java:257) :197)org.apache.zeppelin.scheduler.Job.run(Job.java:170)at org.apache.zeppelin.scheduler.RemoteScheduler $ JobRunner.运行(RemoteScheduler.java:304)在java.util.concurrent.Executors $ RunnableAdapter.call(Executors.java:511)在java.util.concurrent.FutureTask.run(FutureTask.java:266)在java.util.concurrent中.ScheduledThreadPoolExecutor $ ScheduledFutureTask.access $ 201(ScheduledThreadPoolExecutor.java:180)java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)ScheduledThreadPoolExecutor $ ScheduledFutureTask.access $ 201(ScheduledThreadPoolExecutor.java:180)java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745ScheduledThreadPoolExecutor $ ScheduledFutureTask.access $ 201(ScheduledThreadPoolExecutor.java:180)java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745
任何线索?
我的后端是火花1.5,我通过解释器的网络界面验证,齐柏林飞艇指向正确版本的火花并适应spark.home.
我在Apache flink中有一个小用例,即批处理系统.我需要处理一组文件.每个文件的处理必须由一台机器处理.我有以下代码.始终只占用一个任务槽,并且一个接一个地处理文件.我有6个节点(所以6个任务管理器),并在每个节点配置4个任务槽.所以,我希望一次处理24个文件.
class MyMapPartitionFunction extends RichMapPartitionFunction[java.io.File, Int] {
override def mapPartition(
myfiles: java.lang.Iterable[java.io.File],
out:org.apache.flink.util.Collector[Int])
: Unit = {
var temp = myfiles.iterator()
while(temp.hasNext()){
val fp1 = getRuntimeContext.getDistributedCache.getFile("hadoopRun.sh")
val file = new File(temp.next().toURI)
Process(
"/bin/bash ./run.sh " + argumentsList(3)+ "/" + file.getName + " " + argumentsList(7) + "/" + file.getName + ".csv",
new File(fp1.getAbsoluteFile.getParent))
.lines
.foreach{println}
out.collect(1)
}
}
}
Run Code Online (Sandbox Code Playgroud)
我启动了flink as ./bin/start-cluster.sh命令,Web用户界面显示它有6个任务管理器,24个任务槽.
这些文件夹包含大约49个文件.当我在这个集合上创建mapPartition时,我希望跨越49个并行进程.但是,在我的基础设施中,它们都是一个接一个地处理的.这意味着只有一台机器(一个任务管理器)处理所有49个文件名.我想要的是,每个插槽配置2个任务,我希望同时处理24个文件.
任何指针肯定会有所帮助.我在flink-conf.yaml文件中有这些参数
jobmanager.heap.mb: 2048
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
parallelism.default: 24
Run Code Online (Sandbox Code Playgroud)
提前致谢.谁能让我知道我哪里出错了?