我有一个包含约 30k 个独特文档的数据集,这些文档被标记,因为它们中有某个关键字。数据集中的一些关键字段是文档标题、文件大小、关键字和摘录(关键字周围 50 个字)。这些约 30k 个唯一文档中的每一个都有多个关键字,并且每个文档在数据集中每个关键字都有一行(因此,每个文档都有多行)。以下是原始数据集中关键字段的示例:
我的目标是建立一个模型来标记某些事件(孩子们抱怨家庭作业等)的文档,因此我需要对关键字和摘录字段进行矢量化,然后将它们压缩,这样每个唯一文档就有一行。
仅使用关键字作为我要执行的操作的示例 - 我应用了 Tokenizer、StopWordsRemover 和 CountVectorizer,然后它们将输出一个带有计数矢量化结果的稀疏矩阵。一个稀疏向量可能类似于: sparseVector(158, {7: 1.0, 65: 1.0, 78: 2.0, 110: 1.0, 155: 3.0})
我想做两件事之一:
为了让您了解我的意思 - 下图左侧是 CountVectorizer 输出的所需密集向量表示,左侧是我想要的最终数据集。
我正在使用Zeppelin笔记本/ Apache Spark,我经常收到以下错误:
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)中的org.apache.thrift.transport.TTransportException位于org.apache.thrift.transport.TTransport.readAll(TTransport.java:86).位于org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)的apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol. java:219)org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)atg.apache.zeppelin.interpreter.thrift.RemoteInterpreterService $ Client.recv_interpret(RemoteInterpreterService.java:249)org.apache.zeppelin .interpreter.thrift.RemoteInterpreterService $ Client.interpret(RemoteInterpreterService.java:233)atg.apache.zeppelin.interpreter.remote.RemoteInterpreter.interpret(RemoteInterpreter.java:269)org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret (LazyOpenInterpreter.java:94)org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:279)org.apache.zeppelin.scheduler.Job.run(Job.java:176)at org.apache.zeppelin.scheduler.RemoteScheduler $ JobRunner.run(RemoteScheduler.java:328)at java .util.concurrent.Executors $ RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.access $ 201(ScheduledThreadPoolExecutor. java:180)at java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor $ Worker .run(ThreadPoolExecutor.java:617)在java.lang.Thread.run(Thread.java:745)
如果我尝试再次运行相同的代码(只是忽略错误),我得到这个(只是顶线):
java.net.SocketException:管道损坏(写入失败)
然后,如果我尝试第三次(或任何时间后)运行它,我会收到此错误:
java.net.ConnectException:连接被拒绝(连接被拒绝)
如果我在Zeppelin Notebooks中重新启动解释器,那么它(最初)可以工作但最终我最终再次收到此错误.
在我的过程中的各个步骤(数据清理,矢量化等)中发生了这个错误,但是它最常出现的时间(到目前为止)是我正在拟合模型的时候.为了让您更好地了解我实际在做什么以及通常何时发生,我将引导您完成我的过程:
我正在使用Apache Spark ML并完成了一些标准的矢量化,加权等(CountVectorizer,IDF),然后在该数据上构建模型.
我使用VectorAssember创建我的特征向量,将其转换为密集向量,并将其转换为数据帧:
assembler = VectorAssembler(inputCols = ["fileSize", "hour", "day", "month", "punct_title", "cap_title", "punct_excerpt", "title_tfidf", "ct_tfidf", "excerpt_tfidf", "regex_tfidf"], outputCol="features")
vector_train = assembler.transform(train_raw).select("Target", "features")
vector_test = assembler.transform(test_raw).select("Target", "features")
train_final = vector_train.rdd.map(lambda x: Row(label=x[0],features=DenseVector(x[1].toArray())))
test_final = vector_test.rdd.map(lambda x: Row(label=x[0],features=DenseVector(x[1].toArray())))
train_final_df …Run Code Online (Sandbox Code Playgroud) python apache-spark pyspark apache-zeppelin apache-spark-mllib