我的阅读文本文件的示例代码是
val text = sc.hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], sc.defaultMinPartitions)
var rddwithPath = text.asInstanceOf[HadoopRDD[LongWritable, Text]].mapPartitionsWithInputSplit { (inputSplit, iterator) ?
val file = inputSplit.asInstanceOf[FileSplit]
iterator.map { tpl ? (file.getPath.toString, tpl._2.toString) }
}.reduceByKey((a,b) => a)
Run Code Online (Sandbox Code Playgroud)
这样我怎么能使用PDF和Xml文件
什么是 Spark SQL 数据类型等效DecimalType(2,9)于 SQL?
例如:print(column.dataType==X)=> 应该给我 True。SQL 中 Column 的数据类型是 DecimalType(2,9)
试过:X= DataTypes.createDecimalType(2,9),工作正常
我正在寻找一个通用DecimalType类来过滤DecimalType数据框中的所有列,而不管精度和比例如何。
public static void main(String[] args) throws IOException, URISyntaxException
Run Code Online (Sandbox Code Playgroud)
{Configuration config = new Configuration();
config.set("fs.default.name","hdfs://127.0.0.1:50070/dfshealth.jsp");
FileSystem dfs = FileSystem.get(config);
String dirName = "TestDirectory";
Path src = new Path(dfs.getWorkingDirectory()+"/"+dirName);
dfs.mkdirs(src);
Run Code Online (Sandbox Code Playgroud)
}}
他们是一个例外
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/commons/configuration/Configuration
at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.<init>(DefaultMetricsSystem.java:37)
at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.<clinit>(DefaultMetricsSystem.java:34)
at org.apache.hadoop.security.UgiInstrumentation.create(UgiInstrumentation.java:51)
at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:217)
at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:185)
at org.apache.hadoop.security.UserGroupInformation.isSecurityEnabled(UserGroupInformation.java:237)
at org.apache.hadoop.security.KerberosName.<clinit>(KerberosName.java:79)
at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:210)
at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:185)
at org.apache.hadoop.security.UserGroupInformation.isSecurityEnabled(UserGroupInformation.java:237)
at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:482)
at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:468)
at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:1519)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1420)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:123)
at com.TestConnection.main(TestConnection.java:21)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.configuration.Configuration
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native …Run Code Online (Sandbox Code Playgroud) 如何枚举 HDFS 目录中的文件?这是用于使用 Scala 枚举 Apache Spark 集群中的文件。我看到有 sc.textfile() 选项,但它也会读取内容。我只想读取文件名。
我实际上尝试了 listStatus。但是没有用。得到以下错误。我正在使用 Azure HDInsight Spark,并且 blob 存储文件夹“testContainer@testhdi.blob.core.windows.net/example/”包含 .json 文件。
val fs = FileSystem.get(new Configuration())
val status = fs.listStatus(new Path("wasb://testContainer@testhdi.blob.core.windows.net/example/"))
status.foreach(x=> println(x.getPath)
=========
Error:
========
java.io.FileNotFoundException: Filewasb://testContainer@testhdi.blob.core.windows.net/example does not exist.
at org.apache.hadoop.fs.azure.NativeAzureFileSystem.listStatus(NativeAzureFileSystem.java:2076)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:23)
at $iwC$$iwC$$iwC.<init>(<console>:28)
at $iwC$$iwC.<init>(<console>:30)
at $iwC.<init>(<console>:32)
at <init>(<console>:34)
at .<init>(<console>:38)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) …Run Code Online (Sandbox Code Playgroud) 我有以下数据框:
A B
Tenor
1 15.1726 0.138628
2 15.1726 0.147002
3 15.1726 0.155376
4 15.1726 0.163749
5 15.1726 0.172123
Run Code Online (Sandbox Code Playgroud)
我希望能够通过连接前面的列(包括索引)来创建另一个具有字符串的列。例如,这个新列的第一行将是:XXXX1XXXX15.1726XXXX0.138628
我怎样才能在熊猫中做到这一点?如果我尝试在字符串公式中使用df[ColumnName]Pandas 总是会带上索引,这会弄乱我的字符串。
我有两个数据帧df1和ip2Country.
df1包含IP地址,我试图将IP地址映射到经度和纬度等地理位置信息中.ip2Country
我将它作为Spark提交作业运行,但操作需要很长时间,即使df1只有少于2500行.
我的代码:
val agg =df1.join(ip2Country, ip2Country("network_start_int")=df1("sint")
, "inner")
.select($"src_ip"
,$"country_name".alias("scountry")
,$"iso_3".alias("scode")
,$"longitude".alias("slong")
,$"latitude".alias("slat")
,$"dst_ip",$"dint",$"count")
.filter($"slong".isNotNull)
val agg1 =agg.join(ip2Country, ip2Country("network_start_int")=agg("dint")
, "inner")
.select($"src_ip",$"scountry"
,$"scode",$"slong"
,$"slat",$"dst_ip"
,$"country_name".alias("dcountry")
,$"iso_3".alias("dcode")
,$"longitude".alias("dlong")
,$"latitude".alias("dlat"),$"count")
.filter($"dlong".isNotNull)
有没有其他方式加入这两个表?或者我做错了吗?
I have 2 large datasets.
First dataset contains about 130 million entries.
The second dataset contains about 40000 entries.
The data is fetched from MySQL tables.
I need to do a cross-join but I am getting
java.sql.SQLException: GC overhead limit exceeded
Run Code Online (Sandbox Code Playgroud)
What is the best optimum technique to do this in Scala?
Following is a snippet of my code:
val df1 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table1,"id",100,100000,40, MySqlConnection.getConnectionProperties))
val df2 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table2, MySqlConnection.getConnectionProperties))
val df2Cache = df2.repartition(40).cache()
val crossProduct = df1.join(df2Cache)
Run Code Online (Sandbox Code Playgroud)
df1 is the …
对于模式演化 Mergeschema 可以在 Spark 中用于 Parquet 文件格式,我对此有以下说明
这是否仅支持 Parquet 文件格式或任何其他文件格式,如 csv、txt 文件。
如果在中间添加新的附加列,我知道 Mergeschema 会将列移到最后。
如果列顺序受到干扰,那么 Mergeschema 是否会在创建时将列对齐到正确的顺序,还是我们需要通过选择所有列来手动执行此操作。
从评论更新:
例如,如果我有一个如下的架构并创建如下表 -spark.sql("CREATE TABLE emp USING DELTA LOCATION '****'") empid,empname,salary====> 001,ABC,10000如果我得到以下格式,第二天empid,empage,empdept,empname,salary====> 001,30,XYZ,ABC,10000。
是否有新列 - 之后empage, empdept会添加empid,empname,salary columns?
我正在处理从Habse中的表中检索到的Get对象.我想动态检索与该get相关的所有列值,因为我不知道列族的确切名称
val result1 = hTable.get(g)
if (!result1.isEmpty) {
//binaryEpisodes = result1.getValue(Bytes.toBytes("episodes"),Bytes.toBytes("episodes"))
//instead of above retrieve all values dynamically
}
Run Code Online (Sandbox Code Playgroud) 我想转换DataFrame为快译通使用collectAsMap()函数RDD。
代码:
dict = df.rdd.collectAsMap()
Run Code Online (Sandbox Code Playgroud)
错误日志:
ValueError: dictionary update sequence element #0 has length 8; 2 is required
Run Code Online (Sandbox Code Playgroud)
更新:
DF 有 8 个字段,是否意味着collectAsMap()只能使用具有两个字段的 DF?
apache-spark ×7
scala ×6
hadoop ×3
hdfs ×2
java ×2
python ×2
apache ×1
azure ×1
databricks ×1
dictionary ×1
hbase ×1
pandas ×1
pyspark-sql ×1
rdd ×1