最近有一个问题在接受我采访时被问到.
问题:有一个类用于分析代码的执行时间.这堂课就像:
Class StopWatch {
long startTime;
long stopTime;
void start() {// set startTime}
void stop() { // set stopTime}
long getTime() {// return difference}
}
Run Code Online (Sandbox Code Playgroud)
期望客户端创建StopWatch的实例并相应地调用方法.用户代码可能会搞乱使用导致意外结果的方法.Ex,start(),stop()和getTime()调用应该是有序的.
必须"重新配置"该类,以便可以防止用户弄乱序列.
如果在start()之前调用stop(),或者做一些if/else检查,我建议使用自定义异常,但是面试官不满意.
是否有设计模式来处理这种情况?
编辑:可以修改类成员和方法实现.
我正在尝试使用术语过滤器过滤文档.我不知道如何在过滤器中引入通配符.我试过这样的事情:
"filter":{
"bool":{
"must":{
"terms":{
"wildcard" : {
"aircraft":[
"a380*"
]
}
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
但我得到了SearchParseException.有没有办法在过滤器框架中使用通配符?
对于我的代码段如下:
val levelsFile = sc.textFile(levelsFilePath)
val levelsSplitedFile = levelsFile.map(line => line.split(fileDelimiter, -1))
val levelPairRddtemp = levelsSplitedFile
.filter(linearr => ( linearr(pogIndex).length!=0))
.map(linearr => (linearr(pogIndex).toLong, levelsIndexes.map(x => linearr(x))
.filter(value => (!value.equalsIgnoreCase("") && !value.equalsIgnoreCase(" ") && !value.equalsIgnoreCase("null")))))
.mapValues(value => value.mkString(","))
.partitionBy(new HashPartitioner(24))
.persist(StorageLevel.MEMORY_ONLY_SER)
levelPairRddtemp.count // just to trigger rdd creation
Run Code Online (Sandbox Code Playgroud)
信息
executors
(每个5G)和12个核心.Spark
版本:1.5.2问题
当我看到它SparkUI
时Storage tab
,我看到的是:
在里面RDD
看来,24个partitions
中只有2个被缓存.
对此行为的任何解释,以及如何解决此问题.
编辑1:我刚尝试使用60个分区HashPartitioner
作为:
..
.partitionBy(new HashPartitioner(60))
..
Run Code Online (Sandbox Code Playgroud)
它工作了.现在我得到了整个RDD
缓存.有什么猜测这里可能发生了什么?数据偏差是否会导致此行为? …
memory-management scala distributed-computing apache-spark rdd
我正在寻找一种在android文件系统中临时保存位图文件的方法.只有在将文件用作服务器的POST请求的一部分之后才需要该文件,之后我希望它不再存在.我正在寻找更快的方法.
...
File file = new File(Environment.getExternalStorageDirectory().getPath().toString()+"/ImageDB/" + fileName+".png");
FileOutputStream filecon = new FileOutputStream(file);
sampleResized.compress(Bitmap.CompressFormat.JPEG, 90, filecon);
...
Run Code Online (Sandbox Code Playgroud)
我目前正在使用这种方法.
编辑:我从Android中创建临时文件获得了我的解决方案
我试图重写该方法(w3schools 教程的一部分).
问题是使变量字符串成为正则表达式的一部分.
教程示例代码:
function myFunction() {
var str = "The rain in SPAIN stays mainly in the plain";
var res = str.match(/ain/gi);
console.log(res)
}
Run Code Online (Sandbox Code Playgroud)
我试过了:
function myFunction() {
var str = "The rain in SPAIN stays mainly in the plain";
var test = "ain";
var re = "/"+test+"/gi";
var res = str.match(re);
console.log(res);
}
Run Code Online (Sandbox Code Playgroud)
我尝试的方式不起作用.
实验
我尝试了下面的代码片段Spark 1.6.1
.
val soDF = sqlContext.read.parquet("/batchPoC/saleOrder") # This has 45 files
soDF.registerTempTable("so")
sqlContext.sql("select dpHour, count(*) as cnt from so group by dpHour order by cnt").write.parquet("/out/")
Run Code Online (Sandbox Code Playgroud)
该Physical Plan
方法是:
== Physical Plan ==
Sort [cnt#59L ASC], true, 0
+- ConvertToUnsafe
+- Exchange rangepartitioning(cnt#59L ASC,200), None
+- ConvertToSafe
+- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Final,isDistinct=false)], output=[dpHour#38,cnt#59L])
+- TungstenExchange hashpartitioning(dpHour#38,200), None
+- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Partial,isDistinct=false)], output=[dpHour#38,count#63L])
+- Scan ParquetRelation[dpHour#38] InputPaths: hdfs://hdfsNode:8020/batchPoC/saleOrder
Run Code Online (Sandbox Code Playgroud)
因为Job 9
,DAG
是:
因为 …
我试图从Kafka队列中消耗序列化的avro事件.使用简单的java生成器填充kafka队列.为清楚起见,我分享了三个组成部分:
Avro架构文件
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Run Code Online (Sandbox Code Playgroud)
Java Producer代码片段(User.class使用avro-tools生成)
User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
user1.setFavoriteColor("blue");
String topic = "MemoryTest";
// Properties set in 'props'
KafkaProducer<Message, byte[]> producer = new KafkaProducer<Message, byte[]>(props);
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<User> writer = new SpecificDatumWriter<User>(User.class);
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(user1, encoder);
encoder.flush();
out.close();
byte[] serializedBytes = out.toByteArray();
producer.send(new ProducerRecord<Message, byte[]>(topic, serializedBytes)); …
Run Code Online (Sandbox Code Playgroud) 我有两个RDD[K,V]
,在哪里K=Long
和V=Object
.让我们叫rdd1
和rdd2
.我有一个共同的自定义分区程序.我试图找到一种方法,union
或join
通过避免或最小化数据移动.
val kafkaRdd1 = /* from kafka sources */
val kafkaRdd2 = /* from kafka sources */
val rdd1 = kafkaRdd1.partitionBy(new MyCustomPartitioner(24))
val rdd2 = kafkaRdd2.partitionBy(new MyCustomPartitioner(24))
val rdd3 = rdd1.union(rdd2) // Without shuffle
val rdd3 = rdd1.leftOuterjoin(rdd2) // Without shuffle
Run Code Online (Sandbox Code Playgroud)
假设(或强制执行)nth-Partition
两者rdd1
和rdd2
同一slave
节点是否安全?
我有一个tuple
像这样的数组:
val a = Array((1,2,3), (2,3,4))
Run Code Online (Sandbox Code Playgroud)
我想为下面的方法编写泛型方法:
def sum2nd(aa: Array[(Int, Int, Int)]) = {
aa.map { a => a._2 }.sum
}
Run Code Online (Sandbox Code Playgroud)
所以我正在寻找一种方法,如:
def sumNth(aa: Array[(Int, Int, Int)], n: Int)
Run Code Online (Sandbox Code Playgroud) 我有一个Kafka
基于Spark Streaming
每5分钟运行一次的应用程序.查看运行5天后的统计数据,有一些观察结果:
问题:
Processing Time
即使事件数量或多或少相同(在最后一个低谷期间)已大幅增加?GC logs
在每个处理周期结束时,我差不多有70个.这是正常的? processing time
以可接受的延迟保持?garbage-collection performance-testing apache-spark spark-streaming
我遇到了一个代码片段,但无法理解它。片段是:
implicit val dummyVisit = Visit("", 1L, 1, 1, 1, 1L)
implicit val dummyOrder = Order("", 1L, 1, 1, 1, 1L)
def process[T](events : Array[T])(implicit t: T):Unit = {
println(t)
if(!events.isEmpty)
t match {
case r: Order => processOrder(events.asInstanceOf[Array[Order]])
case r: Visit => processVisit(events.asInstanceOf[Array[Visit]]);
}
}
def processOrder(arr: Array[Order]): Unit = { println(arr.size) }
def processVisit(arr: Array[Visit]): Unit = { println(arr.size) }
Run Code Online (Sandbox Code Playgroud)
变量, 要求&存在implicit
。t
dummyVisit
dummyOrder
问题:
这是正确的使用方法吗implicit parameter
?
有没有更好的方法来获取 的类类型T
,而不使用隐式参数?
我有一个这样的代码片段:
case class Purchase(cid: Int, pid: String, num: String)
val x = sc.parallelize(Array(
Purchase(123, "234", "1"),
Purchase(123, "247", "2"),
Purchase(189, "254", "3"),
Purchase(187, "299", "4")
))
// I have a dataframe structure: [cid: int, pid: string, num: string]
val df = sqlContext.createDataFrame(x)
// Defining a column name which I need to transform. Its value can change, like pid
val colName = "num"
// Defining a UDF. The definition of the UDF can change
val toIntUdf = udf((myString: String) => myString.toInt …
Run Code Online (Sandbox Code Playgroud) 我有两个类A
和B
,就像这样:
class A {
public Integer fetchMax() {
// Make a network call & return result
}
}
class B {
public Double fetchPercentile(Integer input) {
// Make a network call & return result
}
}
Run Code Online (Sandbox Code Playgroud)
现在,我需要提供retry
这两种方法的机理fetchMax()
及fetchPercentile(Integer)
.我想使用一个helper
类来提供这种行为,其中retry
可以采用instance
(A或B)的方法,method-name
和method-arguments
.重试基本上会对提供的对象方法进行重新尝试.
像这样的东西:
class Retry {
public static R retry(T obj, Function<T, R> method, Object... arguments) {
// Retry logic
while(/* retry condition */)
{ …
Run Code Online (Sandbox Code Playgroud) apache-spark ×5
scala ×4
java ×3
android ×1
apache-kafka ×1
avro ×1
bitmap ×1
booleanquery ×1
collections ×1
file ×1
generics ×1
hash ×1
implicit ×1
java-8 ×1
javascript ×1
lambda ×1
logstash ×1
oop ×1
parquet ×1
partitioning ×1
querydsl ×1
rdd ×1
shuffle ×1
string ×1
unsafe ×1
wildcard ×1