小编Haf*_*did的帖子

如何跳过Spark中CSV文件的标题?

假设我给三个文件路径指向要读取的Spark上下文,并且每个文件在第一行中都有一个模式.我们如何从头文件中跳过架构线?

val rdd=sc.textFile("file1,file2,file3")
Run Code Online (Sandbox Code Playgroud)

现在,我们如何跳过此rdd的标题行?

csv scala apache-spark

67
推荐指数
7
解决办法
10万
查看次数

由于heredoc,Cloudwatch 警报创建失败

我正在尝试使用 terraform 创建复合 cloudwatch 警报。但不幸的是我的 terraform 代码因以下错误而中断:

错误:创建 CloudWatch 复合警报时出错 (node-count-office-time-composite-alarm-DP-1474-desert):ValidationError:AlarmRule 不得包含前导或尾随空格或者为 null 状态代码:400,请求 ID:272b14ae- e6bd-4e65-8bb8-25372d9a5f7c

以下是我的地形代码:

resource "aws_cloudwatch_composite_alarm" "node_count_office_time_alarm" {
  depends_on = [aws_cloudwatch_metric_alarm.node_count, aws_cloudwatch_metric_alarm.office_time]
  alarm_description = "Composite alarm for node count & office time"
  alarm_name        = "node-count-office-time-composite-alarm-${local.postfix}"
  alarm_actions = [var.sns_topic_arn]
  ok_actions    = [var.sns_topic_arn]
alarm_rule =<<-EOF
ALARM(${aws_cloudwatch_metric_alarm.node_count.alarm_name}) AND
ALARM(${aws_cloudwatch_metric_alarm.office_time.alarm_name})
EOF
}
Run Code Online (Sandbox Code Playgroud)

我检查了很多次,我的alarm_rule中没有前导或尾随空格。AND 运算符后仅换行。我正在使用 terraform 0.15.3 版本。有人遇到类似的问题吗?我该如何解决这个问题?谢谢

heredoc amazon-cloudwatch terraform cloudwatch-alarms

5
推荐指数
1
解决办法
1296
查看次数

在读取第一批数据后停止火花流动

我正在使用spark streaming来消费kafka消息.我想从kafka获取一些消息作为样本,而不是阅读所有消息.所以我想阅读一批消息,将它们返回给调用者并停止火花流.目前我在awaitTermination方法的spark流上下文方法中传递batchInterval时间.我现在不知道如何将处理过的数据从spark流返回给调用者.这是我目前正在使用的代码

def getsample(params: scala.collection.immutable.Map[String, String]): Unit = {
    if (params.contains("zookeeperQourum"))
      zkQuorum = params.get("zookeeperQourum").get
    if (params.contains("userGroup"))
      group = params.get("userGroup").get
    if (params.contains("topics"))
      topics = params.get("topics").get
    if (params.contains("numberOfThreads"))
      numThreads = params.get("numberOfThreads").get
    if (params.contains("sink"))
      sink = params.get("sink").get
    if (params.contains("batchInterval"))
      interval = params.get("batchInterval").get.toInt
    val sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("spark://cloud2-server:7077")
    val ssc = new StreamingContext(sparkConf, Seconds(interval))
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    var consumerConfig = scala.collection.immutable.Map.empty[String, String]
    consumerConfig += ("auto.offset.reset" -> "smallest")
    consumerConfig += ("zookeeper.connect" -> zkQuorum)
    consumerConfig += ("group.id" -> group)
    var data = KafkaUtils.createStream[Array[Byte], Array[Byte], …
Run Code Online (Sandbox Code Playgroud)

spark-skinning apache-kafka

4
推荐指数
1
解决办法
2598
查看次数

使用spark从hbase读取特定的列数据

我在HBase中有一个名为"orders"的表,它有列族'o',列为{id,fname,lname,email},行键为id.我试图只使用spark从hbase获取fname和email的值.目前,我正在做的是下面给出的

   override def put(params: scala.collection.Map[String, Any]): Boolean = {
    var sparkConfig = new SparkConf().setAppName("Connector")
    var sc: SparkContext = new SparkContext(sparkConfig)
    var hbaseConfig = HBaseConfiguration.create()
    hbaseConfig.set("hbase.zookeeper.quorum", ZookeeperQourum)
    hbaseConfig.set("hbase.zookeeper.property.clientPort", zookeeperPort)
    hbaseConfig.set(TableInputFormat.INPUT_TABLE, schemdto.tableName);
    hbaseConfig.set(TableInputFormat.SCAN_COLUMNS, "o:fname,o:email");
    var hBaseRDD = sc.newAPIHadoopRDD(hbaseConfig, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    try {
      hBaseRDD.map(tuple => tuple._2).map(result => result.raw())
        .map(f => KeyValueToString(f)).saveAsTextFile(sink)

      return true;
    } catch {
      case ex: Exception => {
        println(ex.getMessage())
        return false
      }
    }
}


def KeyValueToString(keyValues: Array[KeyValue]): String = {
    var it = keyValues.iterator
    var res = new StringBuilder …
Run Code Online (Sandbox Code Playgroud)

hbase scala apache-spark

2
推荐指数
1
解决办法
4600
查看次数