我有一个包含1300万行和800列的PySpark数据帧.我需要对这些数据进行规范化,因此一直使用此代码,该代码适用于较小的开发数据集.
def z_score_w(col, w):
avg_ = avg(col).over(w)
stddev_ = stddev_pop(col).over(w)
return (col - avg_) / stddev_
w = Window().partitionBy().rowsBetween(-sys.maxsize, sys.maxsize)
norm_exprs = [z_score_w(signalsDF[x], w).alias(x) for x in signalsDF.columns]
normDF = signalsDF.select(norm_exprs)
Run Code Online (Sandbox Code Playgroud)
但是,在使用完整数据集时,我遇到了codegen的异常:
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893
)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
... 44 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V" of class "org.apache.
spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection" grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
at org.codehaus.janino.CodeContext.write(CodeContext.java:836)
at org.codehaus.janino.UnitCompiler.writeOpcode(UnitCompiler.java:10251)
at org.codehaus.janino.UnitCompiler.pushConstant(UnitCompiler.java:8933)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4346)
at org.codehaus.janino.UnitCompiler.access$7100(UnitCompiler.java:185)
at org.codehaus.janino.UnitCompiler$10.visitBooleanLiteral(UnitCompiler.java:3267)
Run Code Online (Sandbox Code Playgroud)
周围有一些Spark JIRA问题看似相似,但这些问题都标记为已解决.还有这个SO问题 …
window-functions apache-spark apache-spark-sql pyspark pyspark-sql
有人可以帮我理解在 python for 循环中将映射函数附加到 RDD 的行为吗?
对于以下代码:
rdd = spark.sparkContext.parallelize([[1], [2], [3]])
def appender(l, i):
return l + [i]
for i in range(3):
rdd = rdd.map(lambda x: appender(x, i))
rdd.collect()
Run Code Online (Sandbox Code Playgroud)
我得到输出:
[[1, 2, 2, 2], [2, 2, 2, 2], [3, 2, 2, 2]]
Run Code Online (Sandbox Code Playgroud)
而使用以下代码:
rdd = spark.sparkContext.parallelize([[1], [2], [3]])
def appender(l, i):
return l + [i]
rdd = rdd.map(lambda x: appender(x, 1))
rdd = rdd.map(lambda x: appender(x, 2))
rdd = rdd.map(lambda x: appender(x, 3))
rdd.collect()
Run Code Online (Sandbox Code Playgroud)
我得到预期的输出:
[[1, 1, 2, 3], …Run Code Online (Sandbox Code Playgroud) 我有一个包含(记录格式)json 字符串的数据框,如下所示:
In[9]: pd.DataFrame( {'col1': ['A','B'], 'col2': ['[{"t":"05:15","v":"20.0"}, {"t":"05:20","v":"25.0"}]',
'[{"t":"05:15","v":"10.0"}, {"t":"05:20","v":"15.0"}]']})
Out[9]:
col1 col2
0 A [{"t":"05:15","v":"20.0"}, {"t":"05:20","v":"2...
1 B [{"t":"05:15","v":"10.0"}, {"t":"05:20","v":"1...
Run Code Online (Sandbox Code Playgroud)
我想提取 json 并为每条记录向数据帧添加一个新行:
co1 t v
0 A 05:15:00 20
1 A 05:20:00 25
2 B 05:15:00 10
3 B 05:20:00 15
Run Code Online (Sandbox Code Playgroud)
我一直在试验以下代码:
def json_to_df(x):
df2 = pd.read_json(x.col2)
return df2
df.apply(json_to_df, axis=1)
Run Code Online (Sandbox Code Playgroud)
但结果数据帧被分配为元组,而不是创建新行。有什么建议吗?
我想使用布尔索引从pandas数据帧中选择一个日期时间索引作为列标题的列:
dates = pd.date_range('20130101', periods=6)
df = pd.DataFrame(np.random.randn(4, 6), index=list('ABCD'), columns=dates)
Run Code Online (Sandbox Code Playgroud)
收益:
2013-01-01 2013-01-02 2013-01-03 2013-01-04 2013-01-05 2013-01-06
A 0.173096 0.344348 1.059990 -1.246944 1.624399 -0.276052
B 0.277148 0.965226 -1.301612 -1.264500 -0.124489 1.704485
C -0.375106 0.103812 0.939749 -2.826329 -0.275420 0.664325
D 0.039756 0.631373 0.643565 -1.516543 -0.654626 -1.544038
Run Code Online (Sandbox Code Playgroud)
我只想返回前三列.
我想在日志文件中包含与每个日志行的请求相关联的 id 。我有通过日志过滤器检索请求 ID 的机制,工作正常。
我的问题是,当日志行包含换行符时,该行当然会被包裹到“裸”行上。有没有办法告诉日志库拆分消息,并将格式应用于拆分的每个元素?
import logging
logger = logging.getLogger()
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(requestid)s\t%(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
logger.debug("blah\nblah")
Run Code Online (Sandbox Code Playgroud)
输出:
xxx blah
blah
Run Code Online (Sandbox Code Playgroud)
期望的输出:
xxx blah
xxx blah
Run Code Online (Sandbox Code Playgroud) 我试图通过减去均值并除以每列的stddev来标准化spark数据帧中多列的值。这是我到目前为止的代码:
from pyspark.sql import Row
from pyspark.sql.functions import stddev_pop, avg
df = spark.createDataFrame([Row(A=1, B=6), Row(A=2, B=7), Row(A=3, B=8),
Row(A=4, B=9), Row(A=5, B=10)])
exprs = [x - (avg(x)) / stddev_pop(x) for x in df.columns]
df.select(exprs).show()
Run Code Online (Sandbox Code Playgroud)
这给了我结果:
+------------------------------+------------------------------+
|(A - (avg(A) / stddev_pop(A)))|(B - (avg(B) / stddev_pop(B)))|
+------------------------------+------------------------------+
| null| null|
+------------------------------+------------------------------+
Run Code Online (Sandbox Code Playgroud)
我希望的地方:
+------------------------------+------------------------------+
|(A - (avg(A) / stddev_pop(A)))|(B - (avg(B) / stddev_pop(B)))|
+------------------------------+------------------------------+
| -1.414213562| -1.414213562|
| -0.707106781| -0.707106781|
| 0| 0|
| 0.707106781| 0.707106781|
| 1.414213562| 1.414213562|
+------------------------------+------------------------------+
Run Code Online (Sandbox Code Playgroud)
我相信我可以使用mllib中的StandardScaler …
我正在使用以下代码来规范化PySpark DataFrame
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml import Pipeline
cols = ["a", "b", "c"]
df = spark.createDataFrame([(1, 0, 3), (2, 3, 2), (1, 3, 1), (3, 0, 3)], cols)
Pipeline(stages=[
VectorAssembler(inputCols=cols, outputCol='features'),
StandardScaler(withMean=True, inputCol='features', outputCol='scaledFeatures')
]).fit(df).transform(df).select(cols + ['scaledFeatures']).head()
Run Code Online (Sandbox Code Playgroud)
这给出了预期的结果:
Row(a=1, b=0, c=3, scaledFeatures=DenseVector([-0.7833, -0.866, 0.7833]))
Run Code Online (Sandbox Code Playgroud)
但是,当我在一个(更大)更大的数据集上运行管道时,从镶木地板文件加载我收到以下异常:
16/12/21 09:47:50 WARN TaskSetManager: Lost task 0.0 in stage 60.0 (TID 6370, 10.231.153.67): org.apache.spark.SparkException: Failed to execute user defined function($anonfu
n$2: (vector) => vector)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply2_2$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown …Run Code Online (Sandbox Code Playgroud) apache-spark pyspark spark-dataframe pyspark-sql apache-spark-mllib
如果有人能帮忙解决这个问题,我将非常感激,因为这几个小时让我有点疯狂!
我有一个 ndarray 如下:
array([[[0, 0],
[0, 2],
[0, 4]],
[[1, 0],
[1, 2],
[1, 4]]])
Run Code Online (Sandbox Code Playgroud)
我想将其转换为记录数组:
array([[(0, 0),
(0, 2),
(0, 4)],
[(1, 0),
(1, 2),
(1, 4)]],
dtype=[('x', '<i4'), ('y', '<i4')])
Run Code Online (Sandbox Code Playgroud) 我正在使用backbone.js来构建我的客户端应用程序.Backbone.sync适用于所有CRUD操作,但是我不清楚如何实现我需要做的服务器端请求之一.该请求涉及将包含算法参数的模型发送到服务器,并接收包含算法结果的响应(图表数据点,表格数据等).调用model.save()并不是正确的事情,因为backbone需要包含更新模型的响应,而理想情况下我想创建一个带响应的全新模型.
我应该回过头来jQuery.ajax({data:model.toJson(),...})创建一个带有响应的新模型吗?还是有什么别的聪明,我错过了?
非常感谢.
我正在使用pandas read_html函数将html表加载到数据帧中,但它失败了,因为源数据有一个colspan = 2合并的头,导致这个AssertionError:传递了6列,传递的数据有7列.
我已尝试使用标题kwarg(header = None,header = ['Code'...])的各种选项,但似乎没有任何效果.
有没有人知道使用pandas read_html解析合并列的任何方法和html表?
我在使用 MaterialDesignInXamlToolkit 覆盖非常简单的 TextBox 样式时遇到了困难。
应用程序.xaml
<Application x:Class="WpfApp1.App"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
StartupUri="MainWindow.xaml">
<Application.Resources>
<ResourceDictionary>
<ResourceDictionary.MergedDictionaries>
<ResourceDictionary Source="pack://application:,,,/MaterialDesignThemes.Wpf;component/Themes/MaterialDesignTheme.Light.xaml" />
<ResourceDictionary Source="pack://application:,,,/MaterialDesignThemes.Wpf;component/Themes/MaterialDesignTheme.Defaults.xaml" />
<ResourceDictionary Source="Themes/MaterialDesignTheme.Overrides.xaml" />
</ResourceDictionary.MergedDictionaries>
</ResourceDictionary>
</Application.Resources>
</Application>
Run Code Online (Sandbox Code Playgroud)
主窗口.xaml
<Window x:Class="WpfApp1.MainWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
xmlns:materialDesign="clr-namespace:MaterialDesignThemes.Wpf;assembly=MaterialDesignThemes.Wpf"
Title="MainWindow" Height="400" Width="300">
<Grid>
<TextBox VerticalAlignment="Center"
x:Name="NameTextBox"
materialDesign:HintAssist.Hint="Name">
</TextBox>
</Grid>
</Window>
Run Code Online (Sandbox Code Playgroud)
MaterialDesignTheme.Overrides.xaml
<ResourceDictionary xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
xmlns:materialDesign="clr-namespace:MaterialDesignThemes.Wpf;assembly=MaterialDesignThemes.Wpf">
<Style x:Key="MaterialDesignTextBox"
BasedOn="{StaticResource MaterialDesignTextBox}"
TargetType="{x:Type TextBox}">
<Setter Property="FontSize" Value="200" />
</Style>
</ResourceDictionary>
Run Code Online (Sandbox Code Playgroud)
但是,除非我x:Key从覆盖文件中的样式中删除 ,否则文本框中的字体将保持非常无聊的 12,而不是我追求的超级令人兴奋的200。
我正在尝试过滤PySpark数据帧的行,其中所有列的值都为零.
我希望使用这样的东西:
from pyspark.sql.functions import col
df.filter(all([(col(c) != 0) for c in df.columns]))
Run Code Online (Sandbox Code Playgroud)
但我得到一个ValueError:
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
Run Code Online (Sandbox Code Playgroud)
有没有办法执行逻辑和条件列表?
apache-spark ×5
pyspark ×5
python ×4
pandas ×3
pyspark-sql ×2
arrays ×1
backbone.js ×1
jquery ×1
logging ×1
lua ×1
numpy ×1