我使用AWS EC2指南安装了Spark,我可以使用bin/pyspark脚本启动程序以获得spark提示,也可以成功执行Quick Start quide.
但是,我不能为我的生活弄清楚如何INFO在每个命令后停止所有详细的日志记录.
我已经在我的log4j.properties文件中的几乎所有可能的场景中尝试了我的conf文件,在我从中启动应用程序的文件夹以及每个节点上,没有做任何事情.INFO执行每个语句后,我仍然会打印日志语句.
我对这应该如何工作非常困惑.
#Set everything to be logged to the console log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
Run Code Online (Sandbox Code Playgroud)
这是我使用时的完整类路径SPARK_PRINT_LAUNCH_COMMAND:
Spark命令:/Library/Java/JavaVirtualMachines/jdk1.8.0_05.jdk/Contents/Home/bin/java -cp:/root/spark-1.0.1-bin-hadoop2/conf:/root/spark-1.0.1 -bin-hadoop2/CONF:/root/spark-1.0.1-bin-hadoop2/lib/spark-assembly-1.0.1-hadoop2.2.0.jar:/root/spark-1.0.1-bin-hadoop2/lib /datanucleus-api-jdo-3.2.1.jar:/root/spark-1.0.1-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/root/spark-1.0.1-bin-hadoop2 /lib/datanucleus-rdbms-3.2.1.jar -XX:MaxPermSize = 128m -Djava.library.path = -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit spark-shell --class org.apache.spark. repl.Main
内容spark-env.sh:
#!/usr/bin/env bash
# This file is sourced when …Run Code Online (Sandbox Code Playgroud) 我有多个模式,如下所示,具有不同的列名和数据类型.我想使用DataFrameScala为每个模式生成测试/模拟数据,并将其保存到镶木地板文件中.
下面是示例模式(来自示例json),以动态生成数据,其中包含虚拟值.
val schema1 = StructType(
List(
StructField("a", DoubleType, true),
StructField("aa", StringType, true)
StructField("p", LongType, true),
StructField("pp", StringType, true)
)
)
Run Code Online (Sandbox Code Playgroud)
我需要像这样的rdd/dataframe,每行1000个,基于上面模式中的列数.
val data = Seq(
Row(1d, "happy", 1L, "Iam"),
Row(2d, "sad", 2L, "Iam"),
Row(3d, "glad", 3L, "Iam")
)
Run Code Online (Sandbox Code Playgroud)
基本上......就像我需要动态生成数据的200个数据集一样,为每个方案编写单独的程序对我来说是不可能的.
PLS.帮助我你的想法或impl.因为我是新手.
是否可以根据不同类型的模式生成动态数据?
我有带行键的HBase表,它由文本ID和时间戳组成,如下所示:
...
string_id1.1470913344067
string_id1.1470913345067
string_id2.1470913344067
string_id2.1470913345067
...
Run Code Online (Sandbox Code Playgroud)
如何过滤HBase扫描(在Scala或Java中)以获得某些字符串ID和时间戳超过某个值的结果?
谢谢
尝试在 databricks 社区版平台上使用 spark 从 url 读取数据我尝试使用 spark.read.csv 并使用 SparkFiles 但仍然缺少一些简单的点
url = "https://raw.githubusercontent.com/thomaspernet/data_csv_r/master/data/adult.csv"
from pyspark import SparkFiles
spark.sparkContext.addFile(url)
# sc.addFile(url)
# sqlContext = SQLContext(sc)
# df = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema= True)
df = spark.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema= True)
Run Code Online (Sandbox Code Playgroud)
得到路径相关的错误:
Path does not exist: dbfs:/local_disk0/spark-9f23ed57-133e-41d5-91b2-12555d641961/userFiles-d252b3ba-499c-42c9-be48-96358357fb75/adult.csv;'
我也尝试过其他方式
val content = scala.io.Source.fromURL("https://raw.githubusercontent.com/thomaspernet/data_csv_r/master/data/adult.csv").mkString
# val list = content.split("\n").filter(_ != "")
val rdd = sc.parallelize(content)
val df = rdd.toDF
SyntaxError: invalid syntax
File "<command-332010883169993>", line 16
val content = scala.io.Source.fromURL("https://raw.githubusercontent.com/thomaspernet/data_csv_r/master/data/adult.csv").mkString
^
SyntaxError: invalid syntax
Run Code Online (Sandbox Code Playgroud)
数据应该直接加载到 …
我有一个 Spark 应用程序,它以 Avro 格式写入输出文件。现在我希望这些数据在 Hive 中可用,因为使用该数据的应用程序只能通过 Hive 表来实现。
这里描述了可以通过CREATE EXTERNAL TABLE在 Hive 中使用来做到这一点。现在我的问题是,该方法的效率如何CREATE EXTERNAL TABLE。它将所有 Avro 数据复制到 HDFS 上的其他位置来工作,还是只是创建一些metainfo可用于查询 Avro 数据的 ?
另外,如果我想继续向该表添加新的 Avro 数据怎么办?我可以创建一次这样的外部表,然后继续向其中添加新的 Avro 数据吗?另外,如果有人在更新数据时查询数据怎么办?它允许原子事务吗?
我想将1GB(1000万条记录)CSV文件加载到Hbase中.我为它写了Map-Reduce程序.我的代码工作正常但需要1小时才能完成.最后一个减速机需要超过半小时的时间.有人可以帮帮我吗?
我的守则如下:
Driver.Java
package com.cloudera.examples.hbase.bulkimport;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* HBase bulk import example
* Data preparation MapReduce job driver
*
* - args[0]: HDFS input path
*
- args[1]: HDFS output path
*
- args[2]: HBase table name
*
*/
public class Driver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
/*
* NBA Final 2010 game … 我正在使用Java作为查询Hbase的客户端.
我的Hbase表设置如下:
ROWKEY | HOST | EVENT
-----------|--------------|----------
21_1465435 | host.hst.com | clicked
22_1463456 | hlo.wrld.com | dragged
. . .
. . .
. . .
Run Code Online (Sandbox Code Playgroud)
我需要做的第一件事是让所有的清单ROWKEYs已host.hst.com与它相关联.
我可以在Column上创建一个扫描器,host对于每个行值,column value = host.hst.com我将添加相应ROWKEY的列表.看起来效率很高.O(n)获取所有行.
现在是困难的部分.对于ROWKEY列表中的每一个,我需要得到相应的EVENT.
如果我使用普通GET命令来获取单元格(ROWKEY, EVENT),我相信会创建一个扫描程序EVENT,需要O(n)时间才能找到正确的单元格并返回值.这对每个人来说都是非常糟糕的时间复杂性ROWKEY.结合这两者给了我们O(n^2).
有没有更有效的方法来解决这个问题?
非常感谢您提前帮助!
我遇到了这个:_*表示许多spark-scala答案,但找不到任何文档.实际上是什么意思?这种用法的一个例子就是这个问题的答案
如何在Spark Java中使用isin的DataFrame过滤器?
线:
df.filter(col("something").isin(list: _*)
Run Code Online (Sandbox Code Playgroud) 我使用 PostGre 作为数据库。我想为每个批次捕获一个表数据并将其转换为 parquet 文件并存储到 s3 中。我尝试使用 Spark 和 readStream 的 JDBC 选项进行连接,如下所示......
val jdbcDF = spark.readStream
.format("jdbc")
.option("url", "jdbc:postgresql://myserver:5432/mydatabase")
.option("dbtable", "database.schema.table")
.option("user", "xxxxx")
.option("password", "xxxxx")
.load()
Run Code Online (Sandbox Code Playgroud)
但它抛出了不受支持的异常
Exception in thread "main" java.lang.UnsupportedOperationException: Data source jdbc does not support streamed reading
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
at examples.SparkJDBCStreaming$.delayedEndpoint$examples$SparkJDBCStreaming$1(SparkJDBCStreaming.scala:16)
at examples.SparkJDBCStreaming$delayedInit$body.apply(SparkJDBCStreaming.scala:5)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
Run Code Online (Sandbox Code Playgroud)
我走在正确的轨道上吗?Spark Streaming 真的不支持数据库作为数据源吗?
据我所知,执行此操作的另一种方法是编写一个 kafka 生产者将数据发布到 kafka 主题,然后使用 Spark Streaming...
注意 …
我像这样实现 Apache Spark Scheduling Within (Scala 代码):
\n\n// group into list of 10 items...\nval maxSimultaneousSubmitAndMonitorThreadsInDriver = 10\n\n// ... in order to throttle the number of threads submitting and monitoring apps at a time\nval lists = myList grouped maxSimultaneousThreadsInDriver \n\nfor (aList <- lists) {\n\n // pick a list, then convert it to Scala Parallel list\n aList.par.foreach { // so 10 threads MAX at a time, that can handle job submission and monitoring\n case (file_name) => {\n\n // in each driver …Run Code Online (Sandbox Code Playgroud) scala apache-spark spark-streaming apache-spark-sql databricks
scala ×6
apache-spark ×5
hadoop ×4
hbase ×3
java ×3
mapreduce ×3
databricks ×2
hadoop2 ×2
pyspark ×2
apache-kafka ×1
atomic ×1
avro ×1
hive ×1
postgresql ×1
python ×1