所以我一直在使用sbt with assembly将我的所有依赖项打包成一个jar用于我的spark作业.我有几个工作,我c3p0用来设置连接池信息,广播出来,然后foreachPartition在RDD上使用然后获取连接,并将数据插入数据库.在我的sbt构建脚本中,我包含了
"mysql" % "mysql-connector-java" % "5.1.33"
Run Code Online (Sandbox Code Playgroud)
这可确保JDBC连接器与作业打包在一起.一切都很好.
所以最近我开始玩SparkSQL,并意识到简单地采用数据帧并将其保存到具有新功能的jdbc源更容易 1.3.0
我收到以下异常:
java.sql.SQLException:在java.sql中找不到合适的jdbc驱动程序:mysql://some.domain.com/myschema?user = user&password = password at java.sql.DriverManager.getConnection(DriverManager.java:596).的DriverManager.getConnection(DriverManager.java:233)
当我在本地运行时,我通过设置绕过它
SPARK_CLASSPATH=/path/where/mysql-connector-is.jar
Run Code Online (Sandbox Code Playgroud)
最终我想知道的是,为什么这个工作不应该找到驱动程序什么时候应该打包它呢?我的其他工作从未遇到过这个问题.从我可以告诉他们c3p0和数据帧代码都使用java.sql.DriverManager(它处理从我可以告诉你的一切导入所有)所以它应该工作得很好?如果有什么东西阻止汇编方法工作,我需要做些什么来使其工作?
我设置了一个简单的测试来从S3流式传输文本文件,并在我尝试类似的东西时使其工作
val input = ssc.textFileStream("s3n://mybucket/2015/04/03/")
Run Code Online (Sandbox Code Playgroud)
在桶中我会有日志文件进去,一切都会正常工作.
但如果他们是一个子文件夹,它将找不到任何放入子文件夹的文件(是的,我知道hdfs实际上并没有使用文件夹结构)
val input = ssc.textFileStream("s3n://mybucket/2015/04/")
Run Code Online (Sandbox Code Playgroud)
所以,我试着像我之前用标准的spark应用程序那样简单地做通配符
val input = ssc.textFileStream("s3n://mybucket/2015/04/*")
Run Code Online (Sandbox Code Playgroud)
但是当我尝试这个时它会抛出一个错误
java.io.FileNotFoundException: File s3n://mybucket/2015/04/* does not exist.
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1483)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1523)
at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:176)
at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:134)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
.....
Run Code Online (Sandbox Code Playgroud)
我知道你在为标准的spark应用程序读取fileInput时可以使用通配符,但看起来在进行流式输入时,它不会这样做,也不会自动处理子文件夹中的文件.这里有什么我想念的吗?
最终我需要的是一个全天候运行的流媒体作业,它将监视一个按日期放置日志的S3存储桶
所以像
s3n://mybucket/<YEAR>/<MONTH>/<DAY>/<LogfileName>
Run Code Online (Sandbox Code Playgroud)
有没有办法把它交给最顶层的文件夹,它会自动读取显示在任何文件夹中的文件(显然每天都会增加日期)?
编辑
因此,在深入研究http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources上的文档时,它指出不支持嵌套目录.
谁能解释为什么会这样呢?
此外,由于我的文件将根据其日期嵌套,在流媒体应用程序中解决此问题的好方法是什么?这有点复杂,因为日志需要几分钟才能写入S3,所以当天写的最后一个文件可以写在前一天的文件夹中,即使我们进入新的一天几分钟.
我们已经有一个小火花集群运行了一个月,现在已成功执行作业或让我启动一个火花壳到集群.
如果我向集群提交作业或使用shell连接到它并不重要,错误总是相同的.
root@~]$ $SPARK_HOME/bin/spark-shell
Spark assembly has been built with Hive, including Datanucleus jars on classpath
14/11/10 20:43:01 INFO spark.SecurityManager: Changing view acls to: root,
14/11/10 20:43:01 INFO spark.SecurityManager: Changing modify acls to: root,
14/11/10 20:43:01 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, )
14/11/10 20:43:01 INFO spark.HttpServer: Starting HTTP Server
14/11/10 20:43:01 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/11/10 20:43:01 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:60223
14/11/10 20:43:01 INFO util.Utils: Successfully started …Run Code Online (Sandbox Code Playgroud) 很抱歉这个长标题,但我们在使用corebluetooth for ios时遇到了一个非常有趣的问题.我们正在CBCentralManager中发出检索外围设备的调用,并且能够找到之前配对的设备.
无论设备是打开还是关闭,都会发生这种情况.我在苹果的文档中找不到任何关于为什么它能够在关闭时找到它并且它没有出现在设置 - >蓝牙 - >设备中的设备.我怀疑Apple正在缓存此信息,但无法找到任何文档来确认这一点.此外,当设备关闭并且我们发出连接调用时,程序继续正常执行,但didFailToConnect的委托永远不会被调用.打开设备后,它会立即连接.
尝试连接设备时有没有办法传递超时参数?如果没有,那么处理重新连接到以前使用的应用程序设备的最佳解决方案是什么(我们将最后连接的设备存储在应用程序中).
我正在阅读Scala编程,第2版(精彩的书,比scala的网站更好,以非摇滚科学的方式解释事物),我注意到这......在回到不可变和可变的时候很奇怪集.
它将以下内容声明为不可变集
var jetSet=Set("Boeing", "Airbus")
jetSet+="Lear"
println(jetSet.contains("Cessna"))
Run Code Online (Sandbox Code Playgroud)
然后声明只有Mutable集定义了+ =方法.好吧,这很有道理.问题是这段代码有效.在REPL中测试时创建的集合类型实际上是不可变集合,但它上面定义了+ =方法,并且它的功能非常好.看哪
scala> var a = Set("Adam", "Bill")
a: scala.collection.immutable.Set[String] = Set(Adam, Bill)
scala> a += "Colleen"
scala> println(a)
Set(Adam, Bill, Colleen)
scala> a.getClass
res8: Class[_ <: scala.collection.immutable.Set[String]] = class scala.collection.immutable.Set$Set3
Run Code Online (Sandbox Code Playgroud)
但是如果我声明Set为val,则创建的Immutable Set 没有定义+ =方法
scala> val b = Set("Adam", "Bill")
b: scala.collection.immutable.Set[String] = Set(Adam, Bill)
scala> b += "Colleen"
<console>:9: error: value += is not a member of scala.collection.immutable.Set[String]
b += "Colleen"
Run Code Online (Sandbox Code Playgroud)
这里发生了什么?它们都被声明为一个不可变的Set,但声明var可以访问+ =方法并且可以使用它.
另外,当我继续在var Immutable Set上调用getClass方法时,我发现了一些奇怪的东西....
scala> a.getClass
res10: Class[_ …Run Code Online (Sandbox Code Playgroud) 我有一个数据库调用的大型结果集,我需要流回用户,因为它不能全部适合内存.
我可以通过设置选项来回传数据库中的结果
val statement = session.conn.prepareStatement(query,
java.sql.ResultSet.TYPE_FORWARD_ONLY,
java.sql.ResultSet.CONCUR_READ_ONLY)
statement.setFetchSize(Integer.MIN_VALUE)
....
....
val res = statement.executeQuery
Run Code Online (Sandbox Code Playgroud)
然后使用迭代器
val result = new Iterator[MyResultClass] {
def hasNext = res.next
def next = MyResultClass(someValue = res.getString("someColumn"), anotherValue = res.getInt("anotherValue"))
}
Run Code Online (Sandbox Code Playgroud)
在Scala中,Iterator扩展了TraversableOnce,它允许我根据https://www.playframework.com/documentation/2.3.x/上的文档将Iterator传递给用于Play框架中的Chunked Response的Enumerator类.ScalaStream
在查看Enumerator的源代码时,我发现它有一个重载的apply方法来使用TraversableOnce对象
我尝试使用以下代码
import play.api.libs.iteratee.Enumerator
val dataContent = Enumerator(result)
Ok.chunked(dataContent)
Run Code Online (Sandbox Code Playgroud)
但这不起作用,因为它抛出以下异常
Cannot write an instance of Iterator[MyResultClass] to HTTP response. Try to define a Writeable[Iterator[MyResultClass]]
Run Code Online (Sandbox Code Playgroud)
我在文档中找不到任何关于Writable是什么或者做什么的文档.我以为一旦Enumerator消耗了TraversableOnce对象,就会从那里拿走它,但我想不是吗?
apache-spark ×3
scala ×2
chunked ×1
hadoop ×1
hdfs ×1
immutability ×1
ios ×1
jdbc ×1
mapreduce ×1
mutable ×1
objective-c ×1
set ×1