您好,我想知道如何在 PySpark 中转置 RowMatrix。
data = [(MLLibVectors.dense([1.0, 2.0]), ), (MLLibVectors.dense([3.0, 4.0]), )]
df=sqlContext.createDataFrame(data, ["features"])
features=df.select("features").rdd.map(lambda row: row[0])
mat=RowMatrix(features)
print mat.rows.first()
#[1.0,2.0]
mat=mat.Transpose()
print mat.rows.first()
#[1.0,3.0]
Run Code Online (Sandbox Code Playgroud)
有人用Python实现这个吗?我看过类似的帖子,但一切都在 Scala 中。谢谢。
我正在尝试以下 Java 示例
这是我的代码
public class App {
public static void main(String[] args) {
System.out.println("Hello World!");
System.setProperty("hadoop.home.dir", "D:\\del");
List<MyRecord> firstRow = new ArrayList<MyRecord>();
firstRow.add(new App().new MyRecord("1", "Love is blind"));
List<MyRecord> secondRow = new ArrayList<MyRecord>();
secondRow.add(new App().new MyRecord("1", "Luv is blind"));
SparkSession spark = SparkSession.builder().appName("LSHExample").config("spark.master", "local")
.getOrCreate();
Dataset firstDataFrame = spark.createDataFrame(firstRow, MyRecord.class);
Dataset secondDataFrame = spark.createDataFrame(secondRow, MyRecord.class);
firstDataFrame.show(20, false);
secondDataFrame.show(20, false);
RegexTokenizer regexTokenizer = new RegexTokenizer().setInputCol("text").setOutputCol("words")
.setPattern("\\W");
NGram ngramTransformer = new NGram().setN(3).setInputCol("words").setOutputCol("ngrams");
HashingTF hashingTF = new HashingTF().setInputCol("ngrams").setOutputCol("vectors");
MinHashLSH …Run Code Online (Sandbox Code Playgroud) 我想在spark数据帧中总结不同的列.
码
from pyspark.sql import functions as F
cols = ["A.p1","B.p1"]
df = spark.createDataFrame([[1,2],[4,89],[12,60]],schema=cols)
# 1. Works
df = df.withColumn('sum1', sum([df[col] for col in ["`A.p1`","`B.p1`"]]))
#2. Doesnt work
df = df.withColumn('sum1', F.sum([df[col] for col in ["`A.p1`","`B.p1`"]]))
#3. Doesnt work
df = df.withColumn('sum1', sum(df.select(["`A.p1`","`B.p1`"])))
Run Code Online (Sandbox Code Playgroud)
为什么不接近#2..不工作?我在Spark 2.2上
我真的很想从 Spark Slave 连接到 Bolt,但我想如果我能让它与 cypher-shell 一起工作,其余的就会到位。所以我可以这样做:
cypher-shell -a localhost
Run Code Online (Sandbox Code Playgroud)
连接良好
如果我在实际 ip 的盒子上执行此操作:
cypher-shell -a 172.20.0.71:7687
Run Code Online (Sandbox Code Playgroud)
连接被拒绝,我以为我所要做的就是在conf中做:
dbms.connector.bolt.address=0.0.0.0:7687
Run Code Online (Sandbox Code Playgroud)
这似乎不起作用,我还缺少其他东西吗?(或者至少上面的内容似乎不起作用,我确实重新启动了我的 neo4j 实例,因此它应该接受对 conf 的更改)。仅使用 cypher-shell 上的默认设置进行连接确实会给出以下结果:
Connected to Neo4j 3.3.0 at bolt://localhost:7687 as user neo4j.
Type :help for a list of available commands or :exit to exit the shell.
Note that Cypher queries must end with a semicolon.
Run Code Online (Sandbox Code Playgroud)
那么也许它没有完全重新启动并接受对 的更改neo4j.conf?或者更可能的是我错过了其他东西来让它按照我认为应该的方式工作。我可以在哪里从另一个 ip 或本地通过实际 ip 地址进行连接?
我有以下DataFrame:
+----------+
| date|
+----------+
|2017-01-25|
|2017-01-21|
|2017-01-12|
+----------+
Run Code Online (Sandbox Code Playgroud)
以下是DataFrame上面创建的代码:
import pyspark.sql.functions as f
rdd = sc.parallelize([("2017/11/25",), ("2017/12/21",), ("2017/09/12",)])
df = sqlContext.createDataFrame(rdd, ["date"]).withColumn("date", f.to_date(f.col("date"), "yyyy/MM/dd"))
df.show()
Run Code Online (Sandbox Code Playgroud)
我想要一个新列,每行的第一个日期,只需在所有日期将日期替换为"01"
+----------++----------+
| date| first_date|
+----------++----------+
|2017-11-25| 2017-11-01|
|2017-12-21| 2017-12-01|
|2017-09-12| 2017-09-01|
+----------+-----------+
Run Code Online (Sandbox Code Playgroud)
PySpark.sql.function中有一个last_day函数,但是没有first_day函数.
我尝试使用date_sub执行此操作但不起作用:我得到一个列而不是Iterable错误,因为date_sub的第二个参数不能是一个列而必须是一个整数.
f.date_sub(f.col('date'), f.dayofmonth(f.col('date')) - 1 )
Run Code Online (Sandbox Code Playgroud) 我试图在数据框中生成一个附加列,并根据全局值自动递增值。但是,所有行都是使用相同的值生成的,并且该值不会递增。
这是代码
def autoIncrement():
global rec
if (rec == 0) : rec = 1
else : rec = rec + 1
return int(rec)
rec=14
Run Code Online (Sandbox Code Playgroud)
UDF
autoIncrementUDF = udf(autoIncrement, IntegerType())
df1 = hiveContext.sql("select id,name,location,state,datetime,zipcode from demo.target")
df1.withColumn("id2", autoIncrementUDF()).show()
Run Code Online (Sandbox Code Playgroud)
这是结果 df
+---+------+--------+----------+-------------------+-------+---+
| id| name|location| state| datetime|zipcode|id2|
+---+------+--------+----------+-------------------+-------+---+
| 20|pankaj| Chennai| TamilNadu|2018-03-26 11:00:00| NULL| 15|
| 10|geetha| Newyork|New Jersey|2018-03-27 10:00:00| NULL| 15|
| 25| pawan| Chennai| TamilNadu|2018-03-27 11:25:00| NULL| 15|
| 30|Manish| Gurgoan| Gujarat|2018-03-27 11:00:00| NULL| 15|
+---+------+--------+----------+-------------------+-------+---+
Run Code Online (Sandbox Code Playgroud)
但我期待下面的结果
+---+------+--------+----------+-------------------+-------+---+
| id| …Run Code Online (Sandbox Code Playgroud) python user-defined-functions apache-spark apache-spark-sql pyspark
通常我Dataset.count在 3 个场景中看到整个代码库:
log.info("this ds has ${dataset.count} rows")if (dataset.count > 0) do x else do ydataset.persist.count它是否会通过强制查询优化器在任何这些场景中过早地急切来阻止查询优化器创建最有效的 dag?
我正在将 Spark 与 Scala 一起使用,并希望将整行传递给 udf 并选择 udf 中的每个列名和列值。我怎样才能做到这一点?
我正在尝试以下 -
inputDataDF.withColumn("errorField", mapCategory(ruleForNullValidation) (col(_*)))
def mapCategory(categories: Map[String, Boolean]) = {
udf((input:Row) => //write a recursive function to check if each row is in categories if yes check for null if null then false, repeat this for all columns and then combine results)
})
Run Code Online (Sandbox Code Playgroud) I am trying to standardize (mean = 0, std = 1) one column ('age') in my data frame. Below is my code in Spark (Python):
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
# Make my 'age' column an assembler type:
age_assembler = VectorAssembler(inputCols= ['age'], outputCol = "age_feature")
# Create a scaler that takes 'age_feature' as an input column:
scaler = StandardScaler(inputCol="age_feature", outputCol="age_scaled",
withStd=True, withMean=True)
# Creating a mini-pipeline for those 2 steps:
age_pipeline = Pipeline(stages=[age_assembler, …Run Code Online (Sandbox Code Playgroud) 我有以下代码,它基本上是在做特征工程管道:
token_q1=Tokenizer(inputCol='question1',outputCol='question1_tokens')
token_q2=Tokenizer(inputCol='question2',outputCol='question2_tokens')
remover_q1=StopWordsRemover(inputCol='question1_tokens',outputCol='question1_tokens_filtered')
remover_q2=StopWordsRemover(inputCol='question2_tokens',outputCol='question2_tokens_filtered')
q1w2model = Word2Vec(inputCol='question1_tokens_filtered',outputCol='q1_vectors')
q1w2model.setSeed(1)
q2w2model = Word2Vec(inputCol='question2_tokens_filtered',outputCol='q2_vectors')
q2w2model.setSeed(1)
pipeline=Pipeline(stages[token_q1,token_q2,remover_q1,remover_q2,q1w2model,q2w2model])
model=pipeline.fit(train)
result=model.transform(train)
result.show()
Run Code Online (Sandbox Code Playgroud)
我想将以下 UDF 添加到上述管道中:
charcount_q1 = F.udf(lambda row : sum([len(char) for char in row]),IntegerType())
Run Code Online (Sandbox Code Playgroud)
当我这样做时,我收到 Java 错误。有人可以指出我正确的方向吗?
但是,我使用以下基本有效的代码添加了此列:
charCountq1=train.withColumn("charcountq1", charcount_q1("question1"))
Run Code Online (Sandbox Code Playgroud)
但我想将它添加到管道中而不是这样做