我使用前缀过滤器获得了给定部分行键的扫描结果:
行键示例:123_abc、456_def、789_ghi
var prefix=Bytes.toBytes("123")
var scan = new Scan(prefix)
var prefixFilter = new PrefixFilter(prefix)
scan.setFilter(prefixFilter)
var resultScanner = table.getScanner(scan)
Run Code Online (Sandbox Code Playgroud)
现在,我的问题是如何指定多个前缀过滤器作为扫描操作的输入。Result 对象应该包含所有具有给定前缀的行键值的行,比如 123 或 456。
我尝试了以下使用 FilterList 方法的答案,但无法获得所需的结果:
对此的任何帮助(在 Scala 或 Java 中)将不胜感激。谢谢你。
我正在使用结构化 Spark Streaming 读取来自 Kafka(每秒 100.000 行)的数据,并且我正在尝试将所有数据插入 HBase。
我在 Cloudera Hadoop 2.6 中使用 Spark 2.3
eventhubs.writeStream
.foreach(new MyHBaseWriter[Row])
.option("checkpointLocation", checkpointDir)
.start()
.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
MyHBaseWriter 看起来像这样:
class AtomeHBaseWriter[RECORD] extends HBaseForeachWriter[Row] {
override def toPut(record: Row): Put = {
override val tableName: String = "hbase-table-name"
override def toPut(record: Row): Put = {
// Get Json
val data = JSON.parseFull(record.getString(0)).asInstanceOf[Some[Map[String, Object]]]
val key = data.getOrElse(Map())("key")+ ""
val val = data.getOrElse(Map())("val")+ ""
val p = new Put(Bytes.toBytes(key))
//Add columns ... …Run Code Online (Sandbox Code Playgroud) 我有一个可以做一些工作的应用程序,最后它需要从hdfs中读取一些文件并将其存储到hbase中。该应用程序在使用master local时运行,而在使用Apache Spark时没有问题,但是当我使用spark-submit运行它时,它不再起作用了,我失败了。
命令行代码为:
./bin/spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.0 pathjar
Run Code Online (Sandbox Code Playgroud)
我得到的错误是:
Java.io.IOException: java.lang.reflect.InvocationTargetException
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:240)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:218)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)
at org.apache.hadoop.hbase.client.HBaseAdmin.checkHBaseAvailable(HBaseAdmin.java:3207)
at utils.HBaseClient.getConnection(HBaseClient.java:63)
at utils.HBaseClient.exists(HBaseClient.java:224)
at utils.HBaseUtils.createTable(HBaseUtils.java:201)
at utils.HBaseUtils.execute(HBaseUtils.java:86)
at Query1.getResponse(Query1.java:43)
at Main.main(Main.java:138)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:238)
... 21 more
Caused by: java.lang.RuntimeException: Could not create interface …Run Code Online (Sandbox Code Playgroud) 我正在尝试scala 从命令行EMR 获取某些S3文件夹的文件夹大小。
我在S3中将JSON数据存储为GZ文件。我发现我可以计算文件中JSON记录的数量:
spark.read.json("s3://mybucket/subfolder/subsubfolder/").count
Run Code Online (Sandbox Code Playgroud)
但是现在我需要知道数据占多少GB。
我正在寻找一些选项来获取不同文件的大小,而不是整个文件夹的大小。
我试图从查找表(3 行和 3 列)中获取行并逐行迭代并将每行中的值作为参数传递给 SPARK SQL。
DB | TBL | COL
----------------
db | txn | ID
db | sales | ID
db | fee | ID
Run Code Online (Sandbox Code Playgroud)
我在 spark shell 中尝试了一行,它奏效了。但我发现很难遍历行。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val db_name:String = "db"
val tbl_name:String = "transaction"
val unique_col:String = "transaction_number"
val dupDf = sqlContext.sql(s"select count(*), transaction_number from $db_name.$tbl_name group by $unique_col having count(*)>1")
Run Code Online (Sandbox Code Playgroud)
请让我知道如何遍历行并作为参数传递?
Null在 spark 中将值加载到数据库时出现以下错误。Datatype目标表是smallint
Caused by: java.lang.IllegalArgumentException: Can't get JDBC type for null
Run Code Online (Sandbox Code Playgroud)
代码 :
val hivedata = spark.sql(s"""select 1 as column1 , B a column2 , NULL as column3 from table""")
hivedata .write.mode(SaveMode.Append).jdbc(url = con, table = targettable, Pconnectionropertiess)
Run Code Online (Sandbox Code Playgroud)
谁能帮帮我吗
我有这个数据框,我试图将此数据框操作转换为 sql 使用pivot函数
val df = Seq(
(1, "a,b,c"),
(2, "b,c")
).toDF("id", "page_path")
df.createOrReplaceTempView("df")
df.show()
df
.withColumn("splitted", split($"page_path", ","))
.withColumn("exploded", explode($"splitted"))
.groupBy("id")
.pivot("exploded")
.count().show
Run Code Online (Sandbox Code Playgroud)
产生的输出:
+---+----+---+---+
| id| a| b| c|
+---+----+---+---+
| 1| 1| 1| 1|
| 2|null| 1| 1|
+---+----+---+---+
Run Code Online (Sandbox Code Playgroud)
我看到这个databricks 链接以 sql 方式使用pivot函数,我尝试应用但失败了。
有人知道以 sql 方式应用数据透视函数吗?
我刚刚尝试了这个方法
spark.sql(
"""
(select * from (Select id, exploded from ( select id, explode(split( page_path ,',')) as exploded from df )
group by id, exploded …Run Code Online (Sandbox Code Playgroud) 我每次运行带有cassandra连接器的spark中的scala程序时都会收到此错误
Exception during preparation of SELECT count(*) FROM "eventtest"."simpletbl" WHERE token("a") > ? AND token("a") <= ?
ALLOW FILTERING: class org.joda.time.DateTime in JavaMirror with org.apache.spark.util.MutableURLClassLoader@23041911 of type class org.apache.spark.util.MutableURLClassLoader
with classpath
[file:
/home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/work/app-20150711142923-0023/0/./spark-cassandra-connector_2.10-1.4.0-M1.jar
,file:
/home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/work/app-20150711142923-0023/0/./cassandra-driver-core-2.1.5.jar,file:
/home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/work/app-20150711142923-0023/0/./cassandra-spark-job_2.10-1.0.jar,file:
/home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/work/app-20150711142923-0023/0/./guava-18.0.jar,file:
/home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/work/app-20150711142923-0023/0/./joda-convert-1.2.jar,file:
/home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/work/app-20150711142923-0023/0/./cassandra-clientutil-2.1.5.jar,file:
/home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/work/app-20150711142923-0023/0/./google-collections-1.0.jar] and parent being sun.misc.Launcher$AppClassLoader@6132b73b of type class sun.misc.Launcher$AppClassLoader with classpath [file:
/home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/conf/,file:
/home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/lib/spark-assembly-1.4.0-hadoop2.4.0.jar,file:
/home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,file:
/home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,file:
/home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar] and parent being sun.misc.Launcher$ExtClassLoader@489bb457 of type class sun.misc.Launcher$ExtClassLoader with classpath [file:
/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/dnsns.jar,file:
/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/sunpkcs11.jar,file:
/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/sunjce_provider.jar,file:
/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/zipfs.jar,file:
/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/libatk-wrapper.so,file:
/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/java-atk-wrapper.jar,file:
/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/localedata.jar,file:
/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/icedtea-sound.jar] and parent being primordial classloader …Run Code Online (Sandbox Code Playgroud) 我用
sc.wholeTextFiles(",".join(fs), minPartitions=200)
Run Code Online (Sandbox Code Playgroud)
在具有 96cpus 的单个 dataproc 节点上从 S3 下载 6k XMLs 文件(每个文件 50MB)。当我有 minPartitions=200 时,AWS 拒绝了我的连接,但是当我使用 minPartitions=50 时一切正常。为什么?
来自 Spark 的一些日志:
(...)
19/05/22 14:11:17 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:17 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:26 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:26 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 …Run Code Online (Sandbox Code Playgroud) 所以,我试图加载一个 csv 文件,然后将它保存为一个镶木地板文件,然后将它加载到一个 Hive 表中。但是,无论何时将其加载到表中,值都不合适并且到处都是。我正在使用 Pyspark/Hive
这是我将 csv 转换为 parquet 并将其写入我的 HDFS 位置的代码:
#This creates the sparkSession
from pyspark.sql import SparkSession
#from pyspark.sql import SQLContext
spark = (SparkSession \
.builder \
.appName("S_POCC") \
.enableHiveSupport()\
.getOrCreate())
df = spark.read.load('/user/new_file.csv', format="csv", sep=",", inferSchema="true", header="false")
df.write.save('hdfs://my_path/table/test1.parquet')
Run Code Online (Sandbox Code Playgroud)
这成功地将它转换为镶木地板和路径,但是当我在 Hive 中使用以下语句加载它时,它给出了一个奇怪的输出。
蜂巢声明:
drop table sndbx_test.test99 purge ;
create external table if not exists test99 ( c0 string, c1 string, c2 string, c3 string, c4 string, c5 string, c6 string);
load data inpath …Run Code Online (Sandbox Code Playgroud) 我正在阅读一个有很多空格的文件,需要过滤掉空间.之后我们需要将其转换为数据帧.示例输入如下.
2017123 ¦ ¦10¦running¦00000¦111¦-EXAMPLE
Run Code Online (Sandbox Code Playgroud)
我的解决方案是以下函数解析所有空格并修剪文件.
def truncateRDD(fileName : String): RDD[String] = {
val example = sc.textFile(fileName)
example.map(lines => lines.replaceAll("""[\t\p{Zs}]+""", ""))
}
Run Code Online (Sandbox Code Playgroud)
但是,我不确定如何将其纳入数据帧.sc.textFile返回一个RDD[String].我试过了案例类方法,但问题是我们有800字段模式,案例类不能超过22.
我想以某种方式将RDD [String]转换为RDD [Row],以便我可以使用该createDataFrame函数.
val DF = spark.createDataFrame(rowRDD, schema)
Run Code Online (Sandbox Code Playgroud)
有关如何做到这一点的任何建议?
我想用他们的pid和打印一系列进程burst time.为此,我生成pid使用fork()然后pid使用它getpid().但是,由于fork创建了一个与父进程隔离运行的子进程,因此我没有得到预期的行为.程序应该做的是生成给定的进程number_of_process,然后在特定的结构元素内存储pid和随机burst time值.这是我的代码: -
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<signal.h>
struct process{
int pid;
int bt;
};
int main()
{
int no_of_process,i,new_process;
printf("Enter the number of process\n");
scanf("%d",&no_of_process);
struct process p[no_of_process];
for(i=0;i<no_of_process;i++){
new_process=fork();
p[i].pid = getpid();
p[i].bt = rand()%10;
//kill(getpid(),SIGKILL);
}
for(i=0;i<no_of_process;i++){
printf("process %d and bt %d\n",p[i].pid,p[i].bt);
}
return 0;
}
Run Code Online (Sandbox Code Playgroud)
我试图杀死子进程,但这会停止整个程序.进程数的输出= 2
process 6373 and bt 3
process 6373 and bt 6
process …Run Code Online (Sandbox Code Playgroud)