我在HDFS中有成千上万的小文件.需要处理稍小的文件子集(也是数千个),fileList包含需要处理的文件路径列表.
// fileList == list of filepaths in HDFS
var masterRDD: org.apache.spark.rdd.RDD[(String, String)] = sparkContext.emptyRDD
for (i <- 0 to fileList.size() - 1) {
val filePath = fileStatus.get(i)
val fileRDD = sparkContext.textFile(filePath)
val sampleRDD = fileRDD.filter(line => line.startsWith("#####")).map(line => (filePath, line))
masterRDD = masterRDD.union(sampleRDD)
}
masterRDD.first()
Run Code Online (Sandbox Code Playgroud)
//一旦退出循环,执行任何操作都会导致由于RDD的长谱系导致的堆栈溢出错误
Exception in thread "main" java.lang.StackOverflowError
at scala.runtime.AbstractFunction1.<init>(AbstractFunction1.scala:12)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.<init>(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) …Run Code Online (Sandbox Code Playgroud) 场景:
假设Hive中有一个表,并在Apache Spark中使用下面的SparkSql进行查询,其中表名作为参数传递并连接到查询.
在非分布式系统的情况下,我对SQL注入漏洞有基本的了解,并且在JDBC的上下文中理解了在这种场景中createStatement/preparedStatement的用法.
但是在sparksql的情况下,这个代码很容易受到攻击吗?任何见解?
def main(args: Array[String]) {
val sconf = new SparkConf().setAppName("TestApp")
val sparkContext = new SparkContext(sconf)
val hiveSqlContext = new org.apache.spark.sql.hive.HiveContext(sparkContext)
val tableName = args(0) // passed as an argument
val tableData = hiveSqlContext.sql("select IdNUm, Name from hiveSchemaName." + tableName + " where IdNum <> '' ")
.map( x => (x.getString(0), x.getString(1)) ).collectAsMap()
................
...............
}
Run Code Online (Sandbox Code Playgroud) 在Centos机器上,Python v2.6.6和Apache Spark v1.2.1
尝试运行./pyspark时出现以下错误
似乎python的一些问题,但无法弄清楚
15/06/18 08:11:16 INFO spark.SparkContext: Successfully stopped SparkContext
Traceback (most recent call last):
File "/usr/lib/spark_1.2.1/spark-1.2.1-bin-hadoop2.4/python/pyspark/shell.py", line 45, in <module>
sc = SparkContext(appName="PySparkShell", pyFiles=add_files)
File "/usr/lib/spark_1.2.1/spark-1.2.1-bin-hadoop2.4/python/pyspark/context.py", line 105, in __init__
conf, jsc)
File "/usr/lib/spark_1.2.1/spark-1.2.1-bin-hadoop2.4/python/pyspark/context.py", line 157, in _do_init
self._accumulatorServer = accumulators._start_update_server()
File "/usr/lib/spark_1.2.1/spark-1.2.1-bin-hadoop2.4/python/pyspark/accumulators.py", line 269, in _start_update_server
server = AccumulatorServer(("localhost", 0), _UpdateRequestHandler)
File "/usr/lib64/python2.6/SocketServer.py", line 402, in __init__
self.server_bind()
File "/usr/lib64/python2.6/SocketServer.py", line 413, in server_bind
self.socket.bind(self.server_address)
File "<string>", line 1, in bind
socket.gaierror: [Errno -2] Name …Run Code Online (Sandbox Code Playgroud)