小编use*_*018的帖子

HBase:如何在单个扫描操作中指定多个前缀过滤器

我使用前缀过滤器获得了给定部分行键的扫描结果:

行键示例:123_abc、456_def、789_ghi

var prefix=Bytes.toBytes("123")
var scan = new Scan(prefix)
var prefixFilter = new PrefixFilter(prefix)
scan.setFilter(prefixFilter)
var resultScanner = table.getScanner(scan)
Run Code Online (Sandbox Code Playgroud)

现在,我的问题是如何指定多个前缀过滤器作为扫描操作的输入。Result 对象应该包含所有具有给定前缀的行键值的行,比如 123 或 456。

我尝试了以下使用 FilterList 方法的答案,但无法获得所需的结果:

将多个前缀行过滤器设置为扫描器 hbase java

对此的任何帮助(在 Scala 或 Java 中)将不胜感激。谢谢你。

java hadoop hbase scala mapreduce

4
推荐指数
1
解决办法
4366
查看次数

使用结构化 Spark Streaming 在 HBase 中批量插入数据

我正在使用结构化 Spark Streaming 读取来自 Kafka(每秒 100.000 行)的数据,并且我正在尝试将所有数据插入 HBase。

我在 Cloudera Hadoop 2.6 中使用 Spark 2.3

我尝试了类似我在这里看到的东西

eventhubs.writeStream
 .foreach(new MyHBaseWriter[Row])
 .option("checkpointLocation", checkpointDir)
 .start()
 .awaitTermination()
Run Code Online (Sandbox Code Playgroud)

MyHBaseWriter 看起来像这样:

class AtomeHBaseWriter[RECORD] extends HBaseForeachWriter[Row] {
  override def toPut(record: Row): Put = {
    override val tableName: String = "hbase-table-name"

    override def toPut(record: Row): Put = {
        // Get Json
        val data = JSON.parseFull(record.getString(0)).asInstanceOf[Some[Map[String, Object]]]
        val key = data.getOrElse(Map())("key")+ ""
        val val = data.getOrElse(Map())("val")+ ""

        val p = new Put(Bytes.toBytes(key))
        //Add columns ... …
Run Code Online (Sandbox Code Playgroud)

hbase scala bulkinsert apache-spark spark-streaming

4
推荐指数
1
解决办法
1314
查看次数

Hbase在提交火花时效果不佳

我有一个可以做一些工作的应用程序,最后它需要从hdfs中读取一些文件并将其存储到hbase中。该应用程序在使用master local时运行,而在使用Apache Spark时没有问题,但是当我使用spark-submit运行它时,它不再起作用了,我失败了。

命令行代码为:

./bin/spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.0 pathjar
Run Code Online (Sandbox Code Playgroud)

我得到的错误是:

Java.io.IOException: java.lang.reflect.InvocationTargetException
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:240)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:218)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)
at org.apache.hadoop.hbase.client.HBaseAdmin.checkHBaseAvailable(HBaseAdmin.java:3207)
at utils.HBaseClient.getConnection(HBaseClient.java:63)
at utils.HBaseClient.exists(HBaseClient.java:224)
at utils.HBaseUtils.createTable(HBaseUtils.java:201)
at utils.HBaseUtils.execute(HBaseUtils.java:86)
at Query1.getResponse(Query1.java:43)
at Main.main(Main.java:138)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:238)
... 21 more
Caused by: java.lang.RuntimeException: Could not create  interface …
Run Code Online (Sandbox Code Playgroud)

java hbase scala apache-spark spark-submit

3
推荐指数
1
解决办法
189
查看次数

在EMR中使用Spark Scala获取S3对象大小(文件夹,文件)

我正在尝试scala 从命令行EMR 获取某些S3文件夹的文件夹大小。

我在S3中将JSON数据存储为GZ文件。我发现我可以计算文件中JSON记录的数量:

spark.read.json("s3://mybucket/subfolder/subsubfolder/").count
Run Code Online (Sandbox Code Playgroud)

但是现在我需要知道数据占多少GB。

我正在寻找一些选项来获取不同文件的大小,而不是整个文件夹的大小。

scala amazon-s3 amazon-web-services apache-spark

3
推荐指数
1
解决办法
210
查看次数

迭代数据帧中的每一行,将其存储在 val 中并作为参数传递给 Spark SQL 查询

我试图从查找表(3 行和 3 列)中获取行并逐行迭代并将每行中的值作为参数传递给 SPARK SQL。

DB | TBL   | COL
----------------
db | txn   | ID

db | sales | ID

db | fee   | ID
Run Code Online (Sandbox Code Playgroud)

我在 spark shell 中尝试了一行,它奏效了。但我发现很难遍历行。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val db_name:String = "db"

val tbl_name:String = "transaction"

val unique_col:String = "transaction_number"

val dupDf = sqlContext.sql(s"select count(*), transaction_number from $db_name.$tbl_name group by $unique_col having count(*)>1") 
Run Code Online (Sandbox Code Playgroud)

请让我知道如何遍历行并作为参数传递?

scala dataframe apache-spark apache-spark-sql

3
推荐指数
1
解决办法
3236
查看次数

引起:java.lang.IllegalArgumentException: Can't get JDBC type for null

Null在 spark 中将值加载到数据库时出现以下错误。Datatype目标表是smallint

Caused by: java.lang.IllegalArgumentException: Can't get JDBC type for null
Run Code Online (Sandbox Code Playgroud)

代码 :

val hivedata = spark.sql(s"""select 1 as column1 , B a column2 , NULL as column3 from table""")

hivedata .write.mode(SaveMode.Append).jdbc(url = con, table = targettable, Pconnectionropertiess)
Run Code Online (Sandbox Code Playgroud)

谁能帮帮我吗

hadoop hive scala apache-spark apache-spark-sql

3
推荐指数
1
解决办法
2528
查看次数

如何在SQL中使用pivot(而不是作为DataFrame分组运算符)?

我有这个数据框,我试图将此数据框操作转换为 sql 使用pivot函数

  val df = Seq(
    (1, "a,b,c"),
    (2, "b,c")
  ).toDF("id", "page_path")
  df.createOrReplaceTempView("df")

  df.show()
  df
    .withColumn("splitted", split($"page_path", ","))
    .withColumn("exploded", explode($"splitted"))
    .groupBy("id")
    .pivot("exploded")
    .count().show
Run Code Online (Sandbox Code Playgroud)

产生的输出:

+---+----+---+---+
| id|   a|  b|  c|
+---+----+---+---+
|  1|   1|  1|  1|
|  2|null|  1|  1|
+---+----+---+---+

Run Code Online (Sandbox Code Playgroud)

我看到这个databricks 链接以 sql 方式使用pivot函数,我尝试应用但失败了。

有人知道以 sql 方式应用数据透视函数吗?

我刚刚尝试了这个方法

  spark.sql(
    """
      (select * from (Select id, exploded from ( select id, explode(split( page_path ,',')) as exploded from df )
      group by id, exploded …
Run Code Online (Sandbox Code Playgroud)

sql scala apache-spark apache-spark-sql

3
推荐指数
1
解决办法
1453
查看次数

火花工作cassandra错误

我每次运行带有cassandra连接器的spark中的scala程序时都会收到此错误

    Exception during preparation of SELECT count(*) FROM "eventtest"."simpletbl" WHERE token("a") > ? AND token("a") <= ?   
    ALLOW FILTERING: class org.joda.time.DateTime in JavaMirror with org.apache.spark.util.MutableURLClassLoader@23041911 of type class org.apache.spark.util.MutableURLClassLoader 
    with classpath 
    [file: 
    /home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/work/app-20150711142923-0023/0/./spark-cassandra-connector_2.10-1.4.0-M1.jar
    ,file: 
    /home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/work/app-20150711142923-0023/0/./cassandra-driver-core-2.1.5.jar,file: 
    /home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/work/app-20150711142923-0023/0/./cassandra-spark-job_2.10-1.0.jar,file: 
    /home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/work/app-20150711142923-0023/0/./guava-18.0.jar,file: 
    /home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/work/app-20150711142923-0023/0/./joda-convert-1.2.jar,file: 
    /home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/work/app-20150711142923-0023/0/./cassandra-clientutil-2.1.5.jar,file: 
    /home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/work/app-20150711142923-0023/0/./google-collections-1.0.jar] and parent being sun.misc.Launcher$AppClassLoader@6132b73b of type class sun.misc.Launcher$AppClassLoader with classpath [file: 
    /home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/conf/,file: 
    /home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/lib/spark-assembly-1.4.0-hadoop2.4.0.jar,file: 
    /home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,file: 
    /home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,file: 
    /home/sysadmin/ApacheSpark/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar] and parent being sun.misc.Launcher$ExtClassLoader@489bb457 of type class sun.misc.Launcher$ExtClassLoader with classpath [file: 
    /usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/dnsns.jar,file: 
    /usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/sunpkcs11.jar,file: 
    /usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/sunjce_provider.jar,file: 
    /usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/zipfs.jar,file: 
    /usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/libatk-wrapper.so,file: 
    /usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/java-atk-wrapper.jar,file: 
    /usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/localedata.jar,file: 
    /usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/icedtea-sound.jar] and parent being primordial classloader …
Run Code Online (Sandbox Code Playgroud)

scala cassandra apache-spark

2
推荐指数
1
解决办法
654
查看次数

当我将 WholeTextFiles() 与 pyspark 结合使用时,为什么 AWS 拒绝了我的连接?

我用

sc.wholeTextFiles(",".join(fs), minPartitions=200)
Run Code Online (Sandbox Code Playgroud)

在具有 96cpus 的单个 dataproc 节点上从 S3 下载 6k XMLs 文件(每个文件 50MB)。当我有 minPartitions=200 时,AWS 拒绝了我的连接,但是当我使用 minPartitions=50 时一切正常。为什么?

来自 Spark 的一些日志:

(...)
19/05/22 14:11:17 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:17 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:26 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:26 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 …
Run Code Online (Sandbox Code Playgroud)

python scala amazon-s3 apache-spark pyspark

2
推荐指数
1
解决办法
1467
查看次数

如何使用 Spark 将镶木地板文件加载到 Hive 表中?

所以,我试图加载一个 csv 文件,然后将它保存为一个镶木地板文件,然后将它加载到一个 Hive 表中。但是,无论何时将其加载到表中,值都不合适并且到处都是。我正在使用 Pyspark/Hive

这是我的 csv 文件中的内容: 在此处输入图片说明

这是我将 csv 转换为 parquet 并将其写入我的 HDFS 位置的代码:

#This creates the sparkSession 
from pyspark.sql import SparkSession
#from pyspark.sql import SQLContext
spark = (SparkSession \
        .builder \
        .appName("S_POCC") \
        .enableHiveSupport()\
        .getOrCreate())

df = spark.read.load('/user/new_file.csv', format="csv", sep=",", inferSchema="true", header="false")
df.write.save('hdfs://my_path/table/test1.parquet')
Run Code Online (Sandbox Code Playgroud)

这成功地将它转换为镶木地板和路径,但是当我在 Hive 中使用以下语句加载它时,它给出了一个奇怪的输出。

蜂巢声明:

drop table sndbx_test.test99 purge ;

create external table if not exists test99 ( c0 string, c1 string, c2 string, c3 string, c4 string, c5 string, c6 string);

load data inpath …
Run Code Online (Sandbox Code Playgroud)

python hive scala apache-spark pyspark

2
推荐指数
1
解决办法
2852
查看次数

将RDD [String]转换为RDD [Row]到Dataframe Spark Scala

我正在阅读一个有很多空格的文件,需要过滤掉空间.之后我们需要将其转换为数据帧.示例输入如下.

2017123 ¦     ¦10¦running¦00000¦111¦-EXAMPLE
Run Code Online (Sandbox Code Playgroud)

我的解决方案是以下函数解析所有空格并修剪文件.

def truncateRDD(fileName : String): RDD[String] = {
    val example = sc.textFile(fileName)
    example.map(lines => lines.replaceAll("""[\t\p{Zs}]+""", ""))
}
Run Code Online (Sandbox Code Playgroud)

但是,我不确定如何将其纳入数据帧.sc.textFile返回一个RDD[String].我试过了案例类方法,但问题是我们有800字段模式,案例类不能超过22.

我想以某种方式将RDD [String]转换为RDD [Row],以便我可以使用该createDataFrame函数.

val DF = spark.createDataFrame(rowRDD, schema)
Run Code Online (Sandbox Code Playgroud)

有关如何做到这一点的任何建议?

hadoop scala dataframe apache-spark spark-dataframe

1
推荐指数
1
解决办法
1万
查看次数

如何只终止子进程?

我想用他们的pid和打印一系列进程burst time.为此,我生成pid使用fork()然后pid使用它getpid().但是,由于fork创建了一个与父进程隔离运行的子进程,因此我没有得到预期的行为.程序应该做的是生成给定的进程number_of_process,然后在特定的结构元素内存储pid和随机burst time值.这是我的代码: -

#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<signal.h>

struct process{
    int pid;
    int bt;
};

int main()
{   
    int no_of_process,i,new_process;
    printf("Enter the number of process\n");
    scanf("%d",&no_of_process);
    struct process p[no_of_process];
    for(i=0;i<no_of_process;i++){
        new_process=fork();
        p[i].pid = getpid();
        p[i].bt = rand()%10;
        //kill(getpid(),SIGKILL);
    }
    for(i=0;i<no_of_process;i++){
        printf("process %d and bt %d\n",p[i].pid,p[i].bt);
    }
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

我试图杀死子进程,但这会停止整个程序.进程数的输出= 2

process 6373 and bt 3
process 6373 and bt 6
process …
Run Code Online (Sandbox Code Playgroud)

c fork

0
推荐指数
1
解决办法
524
查看次数