我有一个代码,如下所示
object ErrorTest {
case class APIResults(status:String, col_1:Long, col_2:Double, ...)
def funcA(rows:ArrayBuffer[Row])(implicit defaultFormats:DefaultFormats):ArrayBuffer[APIResults] = {
//call some API ang get results and return APIResults
...
}
// MARK: load properties
val props = loadProperties()
private def loadProperties(): Properties = {
val configFile = new File("config.properties")
val reader = new FileReader(configFile)
val props = new Properties()
props.load(reader)
props
}
def main(args: Array[String]): Unit = {
val prop_a = props.getProperty("prop_a")
val session = Context.initialSparkSession();
import session.implicits._
val initialSet = ArrayBuffer.empty[Row]
val addToSet … scala> List(List(1), List(2), List(3), List(4))
res18: List[List[Int]] = List(List(1), List(2), List(3), List(4))
scala> res18.flatten
res19: List[Int] = List(1, 2, 3, 4)
scala> res18.flatMap(identity)
res20: List[Int] = List(1, 2, 3, 4)
Run Code Online (Sandbox Code Playgroud)
这两个功能有什么区别吗?何时使用一个而不是另一个?有任何权衡吗?
我创建了一个简单的Kafka Producer&Consumer.我使用的是kafka_2.11-0.9.0.0.这是我的制作人代码,
public class KafkaProducerTest {
public static String topicName = "test-topic-2";
public static void main(String[] args) {
// TODO Auto-generated method stub
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
StringSerializer.class.getName());
props.put("value.serializer",
StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer(props);
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
topicName, Integer.toString(i), Integer.toString(i));
System.out.println(producerRecord);
producer.send(producerRecord);
}
producer.close();
}
}
Run Code Online (Sandbox Code Playgroud)
在启动捆绑ia时遇到以下错误,
2016-05-20 09:44:57,792 | …Run Code Online (Sandbox Code Playgroud) apache-karaf apache-kafka apache-spark spark-streaming-kafka
我是MapReduce的新手,我无法弄清楚分区器和组合器的区别.我知道两者都在map和reduce任务之间的中间步骤中运行,并且都减少了reduce任务要处理的数据量.请用一个例子来解释差异.
我听说过Whole-Stage Code Generationsql来优化查询.通过p539-neumann.pdf和sparksql-sql-codegen-is-not-giving-any-improvemnt
但不幸的是,没有人回答上述问题.
很想知道使用Spark 2.0这个功能的场景是什么.但谷歌搜索后没有得到正确的用例.
每当我们使用sql时,我们可以使用此功能吗?如果是这样,任何正确的用例看到这个工作?
伙计们,
我正在尝试在每行一个单个密钥标识符的spark中加入2个大型数据帧(每个100GB +)时遇到此问题.
我在EMR上使用Spark 1.6,这就是我正在做的事情:
val df1 = sqlContext.read.json("hdfs:///df1/")
val df2 = sqlContext.read.json("hdfs:///df2/")
// clean up and filter steps later
df1.registerTempTable("df1")
df2.registerTempTable("df2")
val df3 = sql("select df1.*, df2.col1 from df1 left join df2 on df1.col3 = df2.col4")
df3.write.json("hdfs:///df3/")
Run Code Online (Sandbox Code Playgroud)
这基本上是我正在做的事情的要点,以及最终加入df1和df2之间的其他清理和过滤步骤.
我看到的错误是:
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:860)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136)
at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:503)
at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:420)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:625)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) …Run Code Online (Sandbox Code Playgroud) 我将所有类型的文件存储在亚马逊 S3 上。在亚马逊S3存储桶中,所有文件都存储在不同的文件夹中,我知道亚马逊S3中没有文件夹的概念。对象只能通过它们的键来识别。如果我存储任何带有“mydocs/personal/profile-pic.jpg”之类的密钥的文件,这意味着将在那里创建两个父文件夹(mydocs 文件夹内的个人文件夹)。
我想计算java中像“mydocs”这样的任何文件夹的大小。我使用下面给出的代码计算了存储桶总大小:
public long calculateBucketSize(String bucketName) {
long totalSize = 0;
int totalItems = 0;
ObjectListing objects = listObjects(bucketName);
do {
for (S3ObjectSummary objectSummary : objects.getObjectSummaries()) {
totalSize += objectSummary.getSize();
totalItems++;
}
objects = listNextBatchOfObjects(objects);
} while (objects.isTruncated());
System.out.println("Amazon S3 bucket: " + bucketName + " containing "
+ totalItems + " objects with a total size of " + totalSize
+ " bytes.");
return totalSize;
}
Run Code Online (Sandbox Code Playgroud)
此方法将返回存储桶的总大小。我想计算任何单个文件夹的大小。任何帮助将不胜感激。
我有一个类似图形的对象,我从服务器发送到包含nodes它的客户端adjacencyLists.
我有类似的东西:
Clearing c1 = new Clearing(1, 134, 151);
Clearing c6 = new Clearing(6, 250, 88);
c1.adjacentByPath.add(new Path(1, c6));
c6.adjacentByPath.add(new Path(1, c1));
Run Code Online (Sandbox Code Playgroud)
每次我发送包含这些清除的对象时,我收到以下错误:
Exception in thread "Server" java.lang.StackOverflowError
at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:448)
at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
......
Run Code Online (Sandbox Code Playgroud)
在Kryonet有解决方法吗?谢谢
我是Apache Spark的新手,我创建了几个RDD和DataFrames,缓存它们,现在我想通过使用下面的命令来解决它们中的一些问题
rddName.unpersist()
Run Code Online (Sandbox Code Playgroud)
但我不记得他们的名字.我使用sc.getPersistentRDDs但输出不包括名称.我还使用浏览器查看缓存的rdds,但同样没有名称信息.我错过了什么吗?
如何从arraytypeSpark Scala中的列中仅获取2个数据?我得到的数据就像val df = spark.sqlContext.sql("select col1, col2 from test_tbl").
我有以下数据:
col1 | col2
--- | ---
a | [test1,test2,test3,test4,.....]
b | [a1,a2,a3,a4,a5,.....]
Run Code Online (Sandbox Code Playgroud)
我想获得如下数据:
col1| col2
----|----
a | test1,test2
b | a1,a2
Run Code Online (Sandbox Code Playgroud)
当我这样做时df.withColumn("test", col("col2").take(5))它不起作用.它给出了这个错误:
value take不是org.apache.spark.sql.ColumnName的成员
如何按上述顺序获取数据?
scala dataframe apache-spark apache-spark-sql spark-dataframe
apache-spark ×7
scala ×5
java ×4
dataframe ×2
amazon-s3 ×1
apache-kafka ×1
apache-karaf ×1
aws-sdk ×1
flatmap ×1
hadoop ×1
kryo ×1
kryonet ×1
mapreduce ×1
partitioner ×1
rdd ×1