小编Kni*_*t71的帖子

如何使用Spark计算累积和

我有一个(String,Int)的rdd,按键排序

val data = Array(("c1",6), ("c2",3),("c3",4))
val rdd = sc.parallelize(data).sortByKey
Run Code Online (Sandbox Code Playgroud)

现在我想要启动第一个键的值为零,后续键为前一个键的总和.

例如:c1 = 0,c2 = c1的值,c3 =(c1值+ c2值),c4 =(c1 + .. + c3值)预期输出:

(c1,0), (c2,6), (c3,9)...
Run Code Online (Sandbox Code Playgroud)

是否有可能实现这一目标?我用地图尝试了它,但总和没有保留在地图内.

var sum = 0 ;
val t = keycount.map{ x => { val temp = sum; sum = sum + x._2 ; (x._1,temp); }}
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

14
推荐指数
1
解决办法
9829
查看次数

Spark Streaming检查站到亚马逊s3

我试图检查rdd到非hdfs系统.从DSE文档看来,似乎无法使用cassandra文件系统.所以我打算用亚马逊s3.但我无法找到任何使用AWS的好例子.

问题

  • 如何将Amazon S3用作检查点目录?是否足以调用ssc.checkpoint(amazons3url)?
  • 是否可以为检查点提供除hadoop文件系统之外的任何其他可靠的数据存储?

spark-streaming

11
推荐指数
1
解决办法
4035
查看次数

如何找到cassandra键空间占用的总空间?

我试图找到cassandra键空间占用的总物理大小.

我有一个msg生成器,它将很多消息转储到cassandra.我想找出cassandra表中消息的总物理大小.

当我做du -h /mnt/data/keyspacelinux时只说12kb.我确信数据大小远大于此.数据的其余部分必须要么是memtables 还是应该在压实.

如何找到该密钥空间中cassandra占用的总空间?

我试过了

     nodetool cfstats <keyspace>
Run Code Online (Sandbox Code Playgroud)

但它只给我那个特定的节点.并且字节存在于memtable中.我实际上想要在群集中的所有节点上实际写入磁盘的键空间的总大小.有没有命令找到这个?

谢谢您的帮助.

cassandra

10
推荐指数
2
解决办法
1万
查看次数

spark驱动程序如何序列化发送给执行程序的任务?

RDD通过对象中的用户定义函数/方法进行一系列转换.并且这些函数以任务的形式传递给执行程序.这些任务是spark-core中定义的Scala类的实例.

我假设用户定义的函数/方法包装在一个任务对象中并传递给执行程序.

  1. 执行者如何知道包含在任务类中的需要执行的方法是什么?

  2. 序列化到底有何帮助?

  3. spark上下文如何读取用户代码并将其转换为任务?

apache-spark

8
推荐指数
3
解决办法
3803
查看次数

如何配置 tox 来获取日志

我正在尝试在我的项目中使用毒性自动化测试。但我无法弄清楚使用 tox 时 python 文件中 test_methods 的日志或打印去了哪里。我还对整个 tox 目录进行了 grep 查找日志,但找不到它。

问题

1)如何在tox中配置日志目录?

2)日志的默认配置是什么?

有指向文档和示例的指针吗?

我的 tox.ini 文件

 [tox]
 minversion = 1.6
 skipsdist = True

 [testenv]
 skip_install = True
 setenv =
     VIRTUAL_ENV={envdir}
 deps = -r{toxinidir}/test-requirements.txt

 passenv = LANG

 [testenv:test]
 commands = ./test.sh --slowest --testr-args='{posargs}'
Run Code Online (Sandbox Code Playgroud)

python tox

6
推荐指数
1
解决办法
6086
查看次数

对于函数式编程中的vs map

我正在学习使用scala进行函数式编程.一般来说,我注意到for循环在功能程序中没有太多使用,而是使用map.

问题

  1. 在性能,可读性等方面使用map for for循环有什么好处?

  2. 使用循环可以实现映射功能的目的是什么?

程序1:使用For循环

val num = 1 to 1000
val another = 1000 to 2000
for ( i <- num )
{
  for ( j <- another) 
  {
    println(i,j)
  }
}
Run Code Online (Sandbox Code Playgroud)

计划2:使用地图

val num = 1 to 1000
val another = 1000 to 2000
val mapper = num.map(x => another.map(y => (x,y))).flatten
mapper.map(x=>println(x))
Run Code Online (Sandbox Code Playgroud)

程序1和程序2都做同样的事情.

functional-programming scala

5
推荐指数
1
解决办法
2087
查看次数

如何在阅读spark中的cassandra分区时获得良好的性能?

我正在使用cassandra-connector将数据从cassandra分区读取到spark.我尝试了以下解决方案来读取partitions.I尝试通过尽可能多地创建rdds来并行化任务,但解决方案ONE和解决方案TWO都具有相同的性能.

在解决方案ONE中,我可以立即看到spark UI中的各个阶段.我试图在解决方案TWO中避免一个for循环.

在解决方案TWO中,阶段在相当长的时间之后出现.随着用户ID的数量增加,在阶段出现在解决方案TWO的火花UI中之前,时间显着增加.

Version
 spark - 1.1
 Dse - 4.6
 cassandra-connector -1.1

Setup
 3 - Nodes with spark cassandra
 Each node has 1 core dedicated to this task.
 512MB ram for the executor memory.
Run Code Online (Sandbox Code Playgroud)

我的cassandra表架构,

 CREATE TABLE   test (
   user text,
   userid bigint,
   period timestamp,
   ip text,
   data blob,
   PRIMARY KEY((user,userid,period),ip)
   );
Run Code Online (Sandbox Code Playgroud)

第一解决方案

 val users = List("u1","u2","u3")
 val period = List("2000-05-01","2000-05-01")
 val partitions = users.flatMap(x => period.map(y => (x,y))))
 val userids = 1 to 10
 for (userid <- userids){ …
Run Code Online (Sandbox Code Playgroud)

scala cassandra datastax-enterprise apache-spark

5
推荐指数
1
解决办法
2956
查看次数

如何在sbt多项目构建中获取子项目路径

我试图在sbt中的多项目构建中获得子项目的位置.但我只能获得根项目目录.

lazy val copyToResources = taskKey[Unit]("copies the assembly jar.")
private val rootLocation: File = file(".").getAbsoluteFile
private val subProjectLocation: File =  file("sub_project").getAbsoluteFile.getParentFile
lazy val settings = Seq(copyToResources := {
val absPath = subProjectLocation.getAbsolutePath
println(s"rootLocation:$subProjectLocation $absPath, sub-proj-location: ${rootLocation.getAbsolutePath}")
 })
Run Code Online (Sandbox Code Playgroud)

输出:

 rootLocation:/home/user/projects/workarea/repo /home/vdinakaran/projects/workarea/repo, sub-proj-location: /home/vdinakaran/projects/workarea/repo
 rootLocation:/home/user/projects/workarea/repo /home/vdinakaran/projects/workarea/repo, sub-proj-location: /home/vdinakaran/projects/workarea/repo
Run Code Online (Sandbox Code Playgroud)

目录结构:

repo
   |-- sub_project
Run Code Online (Sandbox Code Playgroud)

作为解决方法,我使用rootLocation添加了sub_project文件夹.但为什么文件("sub_project")没有返回路径?

scala sbt

5
推荐指数
1
解决办法
4084
查看次数

Shell脚本以表格格式打印

我试图使用shell脚本以表格格式打印一组值.该表有n行和4列.我尝试了下面这段代码.

btfield=`grep -i -r ^"diag.* <string>" *.txt |awk '{print $5}'|cut -d+ -f 1 |sort -u`
ipaddr=''
i=0
format="%10s\t%10s\t%10s   \n"
echo "| Function  " "     |         IP         |"  "    occurences     |"  
for a in $btfield
do
  b=`grep -i -r ^"diag.* <string>" *.txt |grep  -i $a |cut -d: -f 1|cut -d_ -f 2|cut -d't' -f 1`
  noOcc=`grep -i -r ^"diag.* backtrace" *.txt |grep  -i $a|wc -l`
  #echo $b
  ipaddr[i]=${b}
  printf "$format"  $a  ${ipaddr[i]} $noOcc
  i=$((i+1))
  #echo $i
done
Run Code Online (Sandbox Code Playgroud)

上面的代码根据格式说明符从各种文件和打印中查找不同的字段.

但我所看到的是输出的错位形式.有没有办法以表格格式打印值?列宽是固定的,如果单元格中的值超过宽度,则必须自行换行.

Sample output: …
Run Code Online (Sandbox Code Playgroud)

shell

4
推荐指数
2
解决办法
3万
查看次数

如何让依赖的jar在集群模式下激发提交

我使用集群模式运行spark进行部署.以下是命令

JARS=$JARS_HOME/amqp-client-3.5.3.jar,$JARS_HOME/nscala-time_2.10-2.0.0.jar,\
$JARS_HOME/rabbitmq-0.1.0-RELEASE.jar,\
$JARS_HOME/kafka_2.10-0.8.2.1.jar,$JARS_HOME/kafka-clients-0.8.2.1.jar,\
$JARS_HOME/spark-streaming-kafka_2.10-1.4.1.jar,\
$JARS_HOME/zkclient-0.3.jar,$JARS_HOME/protobuf-java-2.4.0a.jar

dse spark-submit -v --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
 --executor-memory 512M \
 --total-executor-cores 3 \
 --deploy-mode "cluster" \
 --master spark://$MASTER:7077 \
 --jars=$JARS \
 --supervise \
 --class "com.testclass" $APP_JAR  input.json \
 --files "/home/test/input.json"
Run Code Online (Sandbox Code Playgroud)

上述命令在客户端模式下正常工作.但是当我在集群模式下使用它时,我发现了类未找到异常

Exception in thread "main" java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$
Run Code Online (Sandbox Code Playgroud)

在客户端模式下,依赖jar将被复制到/ var/lib/spark/work目录,而在集群模式下则不会.请帮助我解决这个问题.

编辑:

我正在使用nfs,并且我已经在同名的所有spark节点上安装了相同的目录.我仍然得到错误.如何能够选择同一目录下但不依赖于依赖的jar的应用程序jar?

apache-spark spark-streaming

4
推荐指数
1
解决办法
1万
查看次数

如何在cassandra中执行同步丢弃并创建键空间?

我不想在键空间的任何表中有任何数据.所以我决定删除键空间(如果存在)并立即创建它.我使用下面的代码来实现相同的目标.

 CassandraConnector(conf).withSessionDo { session =>
  session.execute(s"DROP KEYSPACE if EXISTS $keyspace")
  session.execute("""CREATE KEYSPACE if NOT EXISTS %s
  WITH replication = {'class':'SimpleStrategy','replication_factor':'1'};""".format(keyspace)
Run Code Online (Sandbox Code Playgroud)

)}

但它未能创建密钥空间.从日志我只能看到一个警告说明

Received a DROPPED notification for table test.table_tracker, but this keyspace is unknown in our metadata.
Run Code Online (Sandbox Code Playgroud)

我也尝试过使用python cassandra驱动程序.但结果是一样的.我相信有一些竞争条件,并且丢弃键空间发生异步(如果我错了,请纠正我).

如何同步删除和创建键空间?

cassandra spark-cassandra-connector

2
推荐指数
1
解决办法
924
查看次数

从 shell 脚本返回一个字符串/值

我已经阅读了有关 Stack Overflow 中函数的返回输出的 qns。所有帖子都说要使用回声

#!/bin/bash

 function myown()
   {
       echo "i dont need this in retval"

       echo "Need this alone in retVal"
   }

  retVal=$(myown)

  echo $retVal
Run Code Online (Sandbox Code Playgroud)

o/p:我在 retval 中不需要这个 在 retVal 中只需要这个

预期:在 retVal 中只需要这个

有没有办法在 echo.h 中刷新以前的输出。或者我需要解析所有的回显输出以获得我的返回值?有没有简单的方法来做到这一点?因为我可能有对调试有用的回声和回声来返回一个值。

bash shell

1
推荐指数
1
解决办法
2820
查看次数

sizeof的工作原理如何?可变长度字符串数组的内存映射

const char *pointerStr[]=
{
   "BEST123,      ",     // 0x00
   "Best2233,     ",     // 0x01
   "ABCDEFGH,     ",     // 0x02
   "123456,     ",     // 0x03
   "helloworld,     "     // 0x04
};
typedef struct
{
   char value;
   char name[40]; 
}StrInfo;

typedef struct
{
   int regMax;
   StrInfo info[60];   
} structNew;

void main()
{
  int i;
  structNew  pret;
for ( i=0;i<5;i++)
{     
  printf("PointerStr size of %dth %d \n",i,sizeof(pointerStr[i]));
  printf("pret size of %dth %d \n",i,sizeof(pret.info[i].name));
}
}
Run Code Online (Sandbox Code Playgroud)

以上程序产生的结果

PointerStr size of 0th 4 
pret size of 0th 40 
PointerStr size …
Run Code Online (Sandbox Code Playgroud)

c sizeof

-1
推荐指数
1
解决办法
1155
查看次数