我有一个生产中的应用程序,每天必须处理几千兆字节的消息.我喜欢卡夫卡的建筑和表演; 它非常符合我的需求.
我想在某个时候用Kafka替换我的消息传递层.0.7.1版本在稳定性和性能一致性方面是否足以满足生产需求?
尝试将JDBC DataFrame加载到Spark SQL时,我遇到了非常奇怪的问题.
我在我的笔记本电脑上尝试了几个Spark集群 - YARN,独立集群和伪分布式模式.它在Spark 1.3.0和1.3.1上都是可重现的.spark-shell在执行代码时和使用时都会出现问题spark-submit.我试过MySQL和MS SQL JDBC驱动程序但没有成功.
考虑以下示例:
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://localhost:3306/test"
val t1 = {
sqlContext.load("jdbc", Map(
"url" -> url,
"driver" -> driver,
"dbtable" -> "t1",
"partitionColumn" -> "id",
"lowerBound" -> "0",
"upperBound" -> "100",
"numPartitions" -> "50"
))
}
Run Code Online (Sandbox Code Playgroud)
到目前为止,架构得到了正确解决:
t1: org.apache.spark.sql.DataFrame = [id: int, name: string]
Run Code Online (Sandbox Code Playgroud)
但是当我评估DataFrame时:
t1.take(1)
Run Code Online (Sandbox Code Playgroud)
发生以下异常:
15/04/29 01:56:44 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 192.168.1.42): java.sql.SQLException: No suitable driver found …Run Code Online (Sandbox Code Playgroud) 这是关于我从Apache Spark查询Cassandra时遇到的问题.
来自Spark的正常查询工作正常,没有任何问题,但是当我查询条件是关键时,我得到以下错误.最初我尝试查询复合键列族,它也给出了与下面相同的问题.
"引起:InvalidRequestException(为什么:如果包含Equal,则empid不能被多个关系限制)"
专栏系列:
CREATE TABLE emp (
empID int,
deptID int,
first_name varchar,
last_name varchar,
PRIMARY KEY (empID));
Run Code Online (Sandbox Code Playgroud)
列族内容:
empID, deptID, first_name, last_name
104, 15, 'jane', 'smith'
Run Code Online (Sandbox Code Playgroud)
示例SCALA代码:
val job=new Job()
job.setInputFormatClass(classOf[CqlPagingInputFormat])
val host: String = "localhost"
val port: String = "9160"
ConfigHelper.setInputInitialAddress(job.getConfiguration(), host)
ConfigHelper.setInputRpcPort(job.getConfiguration(), port)
ConfigHelper.setInputColumnFamily(job.getConfiguration(), "demodb", "emp")
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
CqlConfigHelper.setInputColumns(job.getConfiguration(), "empid,deptid,first_name,last_name")
//CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), limit.toString)
CqlConfigHelper.setInputWhereClauses(job.getConfiguration(),"empid='104'")
// Make a new Hadoop RDD
val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
classOf[CqlPagingInputFormat],
classOf[Map[String, ByteBuffer]],
classOf[Map[String, ByteBuffer]])
Run Code Online (Sandbox Code Playgroud)
我恳请你告诉我,如果有任何解决这种情况,因为我在过去几天努力克服这个问题.
谢谢
我在Zookeeper上构建了一个服务发现层,用于在分布式环境中查找Thrift服务.我正在寻找在生产环境中运行这些服务的最佳方法.
目前,它是通过打包部署到Tomcat的战争来完成的.在servlet实例化期间,将创建Spring ApplicationContext,从而创建TThreadPoolServerTomcat内部.
我不喜欢这个,原因有两个:
在试图找到解决这个问题的最佳策略的过程中,我想出了几个选择:
有没有人建议他们以前如何处理托管分布式服务器.我最好只在Tomcat中使用HTTP吗?
在多次重新尝试并更改bat文件后,我终于成功地在Windows 7中运行了Kafka和Zookeeper.这是一台运行Java 7的32位机器.不幸的是,我无法创建主题.我尝试使用这里的教程:http://janschulte.wordpress.com/2013/10/13/apache-kafka-0-8-on-windows/
我执行了命令:
C:\ Cambria\kafka_2.8.0-0.8.0> bin\kafka-create-topic.bat --zookeeper localhost:2181 --replica 1 --partition 1 --topic topic
并最终出现以下错误:
Exception in thread "main" joptsimple.UnrecognizedOptionException: 'û' is not a recognized option
at joptsimple.OptionException.unrecognizedOption(OptionException.java:89)
at joptsimple.OptionParser.validateOptionCharacters(OptionParser.java:586)
at joptsimple.OptionParser.handleShortOptionCluster(OptionParser.java:511)
at joptsimple.OptionParser.handleShortOptionToken(OptionParser.java:506)
at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:59)
at joptsimple.OptionParser.parse(OptionParser.java:433)
at kafka.admin.CreateTopicCommand$.main(CreateTopicCommand.scala:56)
at kafka.admin.CreateTopicCommand.main(CreateTopicCommand.scala)
Run Code Online (Sandbox Code Playgroud)
所有类路径条目都是正确的,并且还存在必需的JAR文件.我不确定是什么问题.有人可以帮我吗?
PS:我尝试使用Cygwin并遇到Class Not Found错误,尽管类路径正确设置.
我有一个8个节点的cassandra集群,cassandra 1.0.8.
我正在尝试使用batch_mutate()在循环中执行大量小插入.经过一段时间(约200K插入)服务器重置连接时出现以下异常:
org.apache.thrift.transport.TTransportException: java.net.SocketException: Connection reset
at org.apache.thrift.transport.TIOStreamTransport.write(TIOStreamTransport.java:147)
at org.apache.thrift.transport.TFramedTransport.flush(TFramedTransport.java:157)
at org.apache.cassandra.thrift.Cassandra$Client.send_batch_mutate(Cassandra.java:998)
at org.apache.cassandra.thrift.Cassandra$Client.batch_mutate(Cassandra.java:986)
at org.scale7.cassandra.pelops.Mutator$1.execute(Mutator.java:46)
at org.scale7.cassandra.pelops.Mutator$1.execute(Mutator.java:42)
at org.scale7.cassandra.pelops.Operand.tryOperation(Operand.java:56)
at org.scale7.cassandra.pelops.Mutator.execute(Mutator.java:51)
...
Caused by: java.net.SocketException: Connection reset
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:96)
at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
at org.apache.thrift.transport.TIOStreamTransport.write(TIOStreamTransport.java:145)
... 25 more
Run Code Online (Sandbox Code Playgroud)
除此之外,群集工作正常.服务器日志很干净.
可能是什么导致了这个问题?
谢谢!
验证scala集合的预期值时,assertResult方法很方便:
"The list" should "be generated correctly" in {
assertResult(List(10, 20)) {
//Some code that should output
List(10, 20)
}
}
Run Code Online (Sandbox Code Playgroud)
如果出现问题,会生成错误消息:
Expected List(10, 20), but got List(10, 30)
Run Code Online (Sandbox Code Playgroud)
不幸的是,它不适用于数组,因为==运算符检查标识,而不是相等(这种行为背后的原因已经讨论了很多,例如:为什么Array的= =函数不为数组返回true(1,2) ==数组(1,2)?).
因此,类似的数组检查会生成以下错误消息:
Expected Array(10, 20), but got Array(10, 20)
Run Code Online (Sandbox Code Playgroud)
原因是,可以使用should equal匹配器:
"The array" should "be generated correctly" in {
Array(10, 20) should equal {
//Some code that should output
Array(10, 20)
}
}
Run Code Online (Sandbox Code Playgroud)
但IMO不那么方便,因为它更像是一种期望验证的平等检查:
Array(10, 20) did not equal Array(10, 30) …Run Code Online (Sandbox Code Playgroud) 我的Nimbus主机和主管正常运行.当我向Nimbus主机提交wordcount拓扑时,它成功上传.上传拓扑主管后给出错误
kill: No such process
Run Code Online (Sandbox Code Playgroud)
当我检查worker-6001.log并6002.log发现以下错误时:
2014-02-09 17:20:05 b.s.m.TransportFactory [INFO] Storm peer transport plugin:backtype.storm.messaging.zmq
2014-02-09 17:20:05 b.s.d.worker [ERROR] Error on initialization of server mk-worker
java.lang.UnsatisfiedLinkError: no jzmq in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1886) ~[na:1.7.0_51]
at java.lang.Runtime.loadLibrary0(Runtime.java:849) ~[na:1.7.0_51]
at java.lang.System.loadLibrary(System.java:1088) ~[na:1.7.0_51]
at org.zeromq.ZMQ.<clinit>(ZMQ.java:34) ~[jzmq-2.1.0.jar:na]
at java.lang.Class.forName0(Native Method) ~[na:1.7.0_51]
at java.lang.Class.forName(Class.java:190) ~[na:1.7.0_51]
at backtype.storm.messaging.zmq$loading__4784__auto__.invoke(zmq.clj:1) ~[storm-core-0.9.0.1.jar:na]
at backtype.storm.messaging.zmq__init.load(Unknown Source) ~[storm-core-0.9.0.1.jar:na]
at backtype.storm.messaging.zmq__init.<clinit>(Unknown Source) ~[storm-core-0.9.0.1.jar:na]
at java.lang.Class.forName0(Native Method) ~[na:1.7.0_51]
at java.lang.Class.forName(Class.java:270) ~[na:1.7.0_51]
at clojure.lang.RT.loadClassForName(RT.java:2056) ~[clojure-1.4.0.jar:na]
at clojure.lang.RT.load(RT.java:419) ~[clojure-1.4.0.jar:na]
at clojure.lang.RT.load(RT.java:400) ~[clojure-1.4.0.jar:na]
at …Run Code Online (Sandbox Code Playgroud) scala ×3
apache-kafka ×2
apache-spark ×2
cassandra ×2
java ×2
apache-storm ×1
arrays ×1
hadoop ×1
jdbc ×1
jzmq ×1
scalatest ×1
thrift ×1
ubuntu-12.04 ×1
windows-7 ×1
zeromq ×1