在为HDInsight编程时,我遇到了类似的行
$storageAccountKey = Get-AzureRmStorageAccountKey
-ResourceGroupName $resourceGroupName
-Name $storageAccountName
| %{ $_.Key1 }
Run Code Online (Sandbox Code Playgroud)
我理解$_是指Get-AzureRmStorageAccountKey命令的结果.但究竟是什么意思%{}?
我需要从提供REST接口的Web服务中读取一些JSON数据,以便从我的SPARK SQL代码中查询数据以进行分析.我能够读取存储在blob存储中的JSON并使用它.
我想知道什么是从REST服务读取数据的最佳方式,并像其他任何方式一样使用它DataFrame.
BTW我正在使用,SPARK 1.6 of Linux cluster on HD insight如果这有帮助.如果有人可以共享任何代码片段,我也会很感激,因为我对SPARK环境仍然很新.
我正在浏览Microsoft文档:
https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-overview
我是Azure Data lake和HDInsight的新手.URL中有一条声明告诉我们
"Azure Data Lake Store can be accessed from Hadoop (available with HDInsight cluster) using the WebHDFS-compatible REST APIs."
Run Code Online (Sandbox Code Playgroud)
根据我最初的理解,Data lake store是一个可以存储任何类型数据的商店.我认为,HDInsight也有同样的事情.
我的问题是Azure Data湖和Azure HDInsight有什么区别?如果HDInsight可以用于文件存储或任何类型的存储,那么为什么要使用Data Lake?如果有人可以详细说明这一点,那就太棒了.谢谢.
我正在使用HDInsight,并且在完成运行查询后需要删除我的集群.但是,我需要我收集的数据才能存活一天.我正在处理将从table1创建计算列并将它们插入table2的查询.首先,我想要一个简单的测试来复制行.你可以从select语句创建一个外部表吗?
drop table if exists table2;
create external table table2 as
select *
from table1
STORED AS TEXTFILE LOCATION 'wasb://{container name}@{storage name}.blob.core.windows.net/';
Run Code Online (Sandbox Code Playgroud) 我想在表中添加一个新列,但前提是该列尚不存在.
如果列不存在,则此方法有效:
ALTER TABLE MyTable ADD COLUMNS (mycolumn string);
Run Code Online (Sandbox Code Playgroud)
但是当我第二次执行它时,我收到一个错误.
Column 'mycolumn' exists
Run Code Online (Sandbox Code Playgroud)
当我尝试使用CREATE TABLE和ADD PARTITION支持的"IF NOT EXISTS"语法时,出现语法错误:
ALTER TABLE MyTable ADD IF NOT EXISTS COLUMNS (mycolumn string);
FAILED: ParseException line 3:42 required (...)+ loop did not match anything at input 'COLUMNS' in add partition statement
Run Code Online (Sandbox Code Playgroud)
我需要的是可以迭代执行的东西,所以我可以运行我的查询是否存在此列.
情况:我已经开始了一项新工作,并被分配了如何处理传感器数据表的任务.它有13亿行传感器数据.数据非常简单:基本上只是传感器ID,日期和该时间点的传感器值(双倍).
目前,数据存储在MSSQL Server数据库的表中.
到今年年底,我预计行数将增加到2-3亿.
我正在寻找一种更好的方式来存储和查询这些数据(按日期),因为我们有很多"大数据"产品,而且我没有管理这些大数据集的真实经验,我在这里问对于任何指针.
它不是一家大公司,我们的资源不是无限的;)
关于我们的用例的更多细节:
到目前为止,我的研究使我考虑了以下解决方案:
将数据保留在SQL Server中
但是对表进行分区(它现在没有分区).这将需要企业版的SQL Server,其成本很高.
将数据移动到Azure SQL Server.
在那里我们将获得更少的资金,但是一旦我们的DB增长到250GB以上,它的成本会更高(并且超过500gb).
使用多个数据库
我们每个客户可以使用1个DB.几个较小的数据库将比一个巨大的数据库便宜,但我们有很多客户和计划更多,所以我真的不想考虑管理所有这些数据库.
Azure存储表
到目前为止,这是我最喜欢的选项.我们可以按公司/传感器/年/月对数据进行分区,使用行键日期并存储传感器值.
我还没来得及测试查询性能,但从我看来它应该是好的.但是有一个主要的缺点,那就是每个HTTP请求返回1000个项目的限制.如果我们需要获取一周的所有传感器数据,我们需要进行大量的HTTP请求.我现在不确定这对我们的用例有多大问题.
Azure HDInsight(Azure中的Hadoop)
如上所述,我没有大数据的经验,目前我还没有充分了解Hadoop是否适合我们的情况(在给定的时间跨度内通过API公开传感器数据).我应该更深入地学习,还是我的时间更好地花在追求另一种选择上?
有没有人有类似案例的经验.什么对你有用?请记住,价格很重要,而"简单"的解决方案可能比非常复杂的解决方案更受欢迎,即使复杂的解决方案可以更好地执行几秒钟.
更新1: 回答以下评论中的一些问题.
更新2: 今天我体验了azure表存储和HDInsight(HDI).我们在查询"灵活性"方面并不需要太多,因此我认为Azure表存储看起来很有前景.由于我提到的每个请求1000项限制,因此抽出数据有点慢,但在我的测试中,我认为它对我们的用例来说足够快.
我也偶然发现了OpenTSDB,这是我首先尝试HDI的原因.按照Azure教程(https://azure.microsoft.com/en-us/documentation/articles/hdinsight-hbase-tutorial-get-started/),我能够快速存储一百万条记录并测试一些查询.查询比Azure表存储快得多.我甚至可以在一个http请求中删除300 000条记录(虽然耗时30秒).
但它的成本比Azure表存储要多得多,而且我认为我可以优化我的代码以提高Azure表存储的查询性能(更细粒度的分区键和并行运行请求).因此,由于简单,价格和"足够好"的性能,我现在倾向于Azure Table Storage.
我很快就会向外部顾问介绍我的发现,所以我很高兴能够了解他对事物的看法.
我在csv文件中有一个时间戳字段,我使用spark csv库加载到数据帧.同一段代码在我的本地机器上使用Spark 2.0版本,但在Azure Hortonworks HDP 3.5和3.6上引发错误.
我已经检查过,Azure HDInsight 3.5也使用相同的Spark版本,所以我不认为它是Spark版本的问题.
import org.apache.spark.sql.types._
val sourceFile = "C:\\2017\\datetest"
val sourceSchemaStruct = new StructType()
.add("EventDate",DataTypes.TimestampType)
.add("Name",DataTypes.StringType)
val df = spark.read
.format("com.databricks.spark.csv")
.option("header","true")
.option("delimiter","|")
.option("mode","FAILFAST")
.option("inferSchema","false")
.option("dateFormat","yyyy/MM/dd HH:mm:ss.SSS")
.schema(sourceSchemaStruct)
.load(sourceFile)
Run Code Online (Sandbox Code Playgroud)
整个例外情况如下:
Caused by: java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
at java.sql.Timestamp.valueOf(Timestamp.java:237)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:179)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply$mcJ$sp(UnivocityParser.scala:142)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply(UnivocityParser.scala:142)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply(UnivocityParser.scala:142)
at scala.util.Try.getOrElse(Try.scala:79)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13.apply(UnivocityParser.scala:139)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13.apply(UnivocityParser.scala:135)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$nullSafeDatum(UnivocityParser.scala:179)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9.apply(UnivocityParser.scala:135)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9.apply(UnivocityParser.scala:134)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert(UnivocityParser.scala:215)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.parse(UnivocityParser.scala:187)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:304)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:304)
at org.apache.spark.sql.execution.datasources.FailureSafeParser.parse(FailureSafeParser.scala:61)
... 27 more …Run Code Online (Sandbox Code Playgroud) hdinsight hortonworks-data-platform apache-spark apache-spark-sql
我正在尝试在Azure HDInsight按需群集上运行基于Spark的应用程序,并且看到许多SparkExceptions(由ConcurrentModificationExceptions引起)被记录.启动本地Spark实例时,应用程序运行时没有这些错误.
我在使用累加器时看到了类似错误的报告,而我的代码确实使用了CollectionAccumulator,但是我已经在我使用它的地方放置了同步块,并且没有任何区别.与累加器相关的代码如下所示:
class MySparkClass(sc : SparkContext) {
val myAccumulator = sc.collectionAccumulator[MyRecord]
override def add(record: MyRecord) = {
synchronized {
myAccumulator.add(record)
}
}
override def endOfBatch() = {
synchronized {
myAccumulator.value.asScala.foreach((record: MyRecord) => {
processIt(record)
})
}
}
}
Run Code Online (Sandbox Code Playgroud)
异常不会导致应用程序失败,但是当endOfBatch调用并且代码尝试从累加器中读取值时,它是空的并且processIt永远不会被调用.
我们使用HDInsight版本3.6和Spark版本2.3.0
18/11/26 11:04:37 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814) …Run Code Online (Sandbox Code Playgroud) 我将数据保存为Azure blob存储中的镶木地板文件.数据按年,月,日和小时分区,如:
cont/data/year=2017/month=02/day=01/
我想使用以下create语句在Hive中创建外部表,我使用此引用编写.
CREATE EXTERNAL TABLE table_name (uid string, title string, value string)
PARTITIONED BY (year int, month int, day int) STORED AS PARQUET
LOCATION 'wasb://cont@storage_name.blob.core.windows.net/data';
Run Code Online (Sandbox Code Playgroud)
这会创建表,但在查询时没有行.我尝试了相同的创建语句没有PARTITIONED BY子句,这似乎工作.所以看起来问题就是分区.
我错过了什么?
hdinsight ×10
azure ×4
hive ×3
apache-spark ×2
hadoop ×2
bigdata ×1
parquet ×1
powershell ×1
scala ×1
sql-server ×1