假设我给三个文件路径指向要读取的Spark上下文,并且每个文件在第一行中都有一个模式.我们如何从头文件中跳过架构线?
val rdd=sc.textFile("file1,file2,file3")
Run Code Online (Sandbox Code Playgroud)
现在,我们如何跳过此rdd的标题行?
我正在尝试使用 terraform 创建复合 cloudwatch 警报。但不幸的是我的 terraform 代码因以下错误而中断:
错误:创建 CloudWatch 复合警报时出错 (node-count-office-time-composite-alarm-DP-1474-desert):ValidationError:AlarmRule 不得包含前导或尾随空格或者为 null 状态代码:400,请求 ID:272b14ae- e6bd-4e65-8bb8-25372d9a5f7c
以下是我的地形代码:
resource "aws_cloudwatch_composite_alarm" "node_count_office_time_alarm" {
depends_on = [aws_cloudwatch_metric_alarm.node_count, aws_cloudwatch_metric_alarm.office_time]
alarm_description = "Composite alarm for node count & office time"
alarm_name = "node-count-office-time-composite-alarm-${local.postfix}"
alarm_actions = [var.sns_topic_arn]
ok_actions = [var.sns_topic_arn]
alarm_rule =<<-EOF
ALARM(${aws_cloudwatch_metric_alarm.node_count.alarm_name}) AND
ALARM(${aws_cloudwatch_metric_alarm.office_time.alarm_name})
EOF
}
Run Code Online (Sandbox Code Playgroud)
我检查了很多次,我的alarm_rule中没有前导或尾随空格。AND 运算符后仅换行。我正在使用 terraform 0.15.3 版本。有人遇到类似的问题吗?我该如何解决这个问题?谢谢
我正在使用spark streaming来消费kafka消息.我想从kafka获取一些消息作为样本,而不是阅读所有消息.所以我想阅读一批消息,将它们返回给调用者并停止火花流.目前我在awaitTermination方法的spark流上下文方法中传递batchInterval时间.我现在不知道如何将处理过的数据从spark流返回给调用者.这是我目前正在使用的代码
def getsample(params: scala.collection.immutable.Map[String, String]): Unit = {
if (params.contains("zookeeperQourum"))
zkQuorum = params.get("zookeeperQourum").get
if (params.contains("userGroup"))
group = params.get("userGroup").get
if (params.contains("topics"))
topics = params.get("topics").get
if (params.contains("numberOfThreads"))
numThreads = params.get("numberOfThreads").get
if (params.contains("sink"))
sink = params.get("sink").get
if (params.contains("batchInterval"))
interval = params.get("batchInterval").get.toInt
val sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("spark://cloud2-server:7077")
val ssc = new StreamingContext(sparkConf, Seconds(interval))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
var consumerConfig = scala.collection.immutable.Map.empty[String, String]
consumerConfig += ("auto.offset.reset" -> "smallest")
consumerConfig += ("zookeeper.connect" -> zkQuorum)
consumerConfig += ("group.id" -> group)
var data = KafkaUtils.createStream[Array[Byte], Array[Byte], …
Run Code Online (Sandbox Code Playgroud) 我在HBase中有一个名为"orders"的表,它有列族'o',列为{id,fname,lname,email},行键为id.我试图只使用spark从hbase获取fname和email的值.目前,我正在做的是下面给出的
override def put(params: scala.collection.Map[String, Any]): Boolean = {
var sparkConfig = new SparkConf().setAppName("Connector")
var sc: SparkContext = new SparkContext(sparkConfig)
var hbaseConfig = HBaseConfiguration.create()
hbaseConfig.set("hbase.zookeeper.quorum", ZookeeperQourum)
hbaseConfig.set("hbase.zookeeper.property.clientPort", zookeeperPort)
hbaseConfig.set(TableInputFormat.INPUT_TABLE, schemdto.tableName);
hbaseConfig.set(TableInputFormat.SCAN_COLUMNS, "o:fname,o:email");
var hBaseRDD = sc.newAPIHadoopRDD(hbaseConfig, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
try {
hBaseRDD.map(tuple => tuple._2).map(result => result.raw())
.map(f => KeyValueToString(f)).saveAsTextFile(sink)
return true;
} catch {
case ex: Exception => {
println(ex.getMessage())
return false
}
}
}
def KeyValueToString(keyValues: Array[KeyValue]): String = {
var it = keyValues.iterator
var res = new StringBuilder …
Run Code Online (Sandbox Code Playgroud)