如何读取序列化堆栈?
我正在 Spark 上构建分布式 NLP 应用程序。我定期遇到这些 NotSerialized 异常,并且总是摸索着解决它们。但是,我从未找到关于序列化堆栈中所有内容含义的良好文档。
如何读取 Scala 中伴随 NotSerialized 错误的序列化堆栈?如何查明导致错误的类或对象?堆栈中的“field”、“object”、“writeObject”和“writeReplace”字段有何意义?
示例如下:
Caused by: java.io.NotSerializableException: MyPackage.testing.PreprocessTest$$typecreator1$1
Serialization stack:
- object not serializable (class: MyPackage.testing.PreprocessTest$$typecreator1$1, value: MyPackage.testing.PreprocessTest$$typecreator1$1@27f6854b)
- writeObject data (class: scala.reflect.api.SerializedTypeTag)
- object (class scala.reflect.api.SerializedTypeTag, scala.reflect.api.SerializedTypeTag@4a571516)
- writeReplace data (class: scala.reflect.api.SerializedTypeTag)
- object (class scala.reflect.api.TypeTags$TypeTagImpl, TypeTag[String])
- field (class: MyPackage.package$$anonfun$deserializeMaps$1, name: evidence$1$1, type: interface scala.reflect.api.TypeTags$TypeTag)
- object (class MyPackage.package$$anonfun$deserializeMaps$1, <function1>)
- field (class: MyPackage.package$$anonfun$deserializeMaps$1$$anonfun$apply$4, name: $outer, type: class MyPackage.package$$anonfun$deserializeMaps$1)
- object (class MyPackage.package$$anonfun$deserializeMaps$1$$anonfun$apply$4, <function1>)
- field (class: MyPackage.package$$anonfun$deserializeMaps$1$$anonfun$apply$4$$anonfun$apply$5, …Run Code Online (Sandbox Code Playgroud) 我发现了一些有趣的东西,但无法解释它.我正在编写一个简单的例程来反转字符串.这很好,没有抱怨.
我的问题在于printf.当我单独打印原始字符串时,它打印正确,但是当我打印原始字符串作为第一个arg,而函数调用reverse作为第二个时,两者都显示为反转.
输出:
|| abcdthelgko ||
Orig Str:| okglehtdcba |,Rev Str | okglehtdcba |
码:
char* ReverseStr(char* str, int len)
{
char *start = &str[0], *end = &str[len-1];
char temp;
while(start < end)
{
temp = *start;
*start = *end;
*end = temp;
start++;
end--;
}
return str;
}
int main()
{
char str_unique[] = "abcdthelgko";
int str_unique_len = sizeof(str_unique)/sizeof(str_unique[0]);
printf("\n || %s || \n", str_unique);
printf("Orig Str: | %s |, Rev Str | %s |\n", str_unique, ReverseStr(str_unique, …Run Code Online (Sandbox Code Playgroud) 我有两个相同长度的列表.我想检查一个列表中的条件.如果条件为真,则在另一个列表上运行一个非常大的内存/处理密集型函数.
我的第一次尝试是这样的:
records = [(a, deadly_func(b)) for a, b in zip(listA, listB) if a == "condition"]
Run Code Online (Sandbox Code Playgroud)
这立即分配了我桌面上的所有内存,并在我杀了它之前继续了一段时间.显然,它在listB中的所有30,000个项目上运行了deadly_func(b),而意图是使用'if'语句将listB过滤到大约30个项目.
我能够制作一个有效的版本:
records = [(a, i) for a, i in zip(listA, range(len(listB)) if a == "condition"]
records = [(a, deadly_func(listB[i]) for a, i in records]
Run Code Online (Sandbox Code Playgroud)
为什么我的第一次尝试不起作用?是否有更多的pythonic方式使这项工作?
编辑:谢谢你的回复.这是两个版本的实际代码
不工作:
import shapefile, shapely.geometry as shpgeo
lat = 42.3968243
lon = -71.0313479
sf = shapefile.Reader("/opt/ziplfs/tl_2014_us_zcta510.shp")
records = [(r[0], shpgeo.shape(s.__geo_interface__)) for r, s in zip(sf.records(), sf.shapes()) if haversine(lon, lat, float(r[8]), float(r[7])) < 10]
Run Code Online (Sandbox Code Playgroud)
haversine()是一个用户自制的半正弦函数,取两对lat和long并以km为单位返回距离.
from math import sqrt, …Run Code Online (Sandbox Code Playgroud) 这是有关如何json.dumps用于写入gzip文件的绝佳答案。我想做的是使用dump方法代替直接将json序列化为GzipFile对象。
示例代码:
import gzip, json
data = # a dictionary of data here
with gzip.open(write_file, 'w') as zipfile:
json.dump(data, zipfile)
Run Code Online (Sandbox Code Playgroud)
引发的错误是
TypeError: memoryview: a bytes-like objet is required, not 'str'
Run Code Online (Sandbox Code Playgroud)
我相信是由于gzip write()方法想要将字节对象传递给它而引起的。根据文档,
json模块总是产生str对象,而不是byte对象。因此,fp.write()必须支持str输入。
有没有一种方法来包装json串输出为字节,这样GzipFile的write()将处理它?还是使用该方法json.dumps并将encode()结果字符串转换为bytes对象的唯一方法,如其他链接的答案一样?
我有一些想要在应用程序中使用的自定义 jdbc 驱动程序。当我将 Spark 提交到 Kubernetes Spark 集群时,我将这些包含为 --py-files:
spark-submit --py-files s3a://bucket/pyfiles/pyspark_jdbc.zip my_application.py
Run Code Online (Sandbox Code Playgroud)
这给了我:
java.io.FileNotFoundException: File file:/opt/spark/work-dir/pyspark_jdbc.zip does not exist
Run Code Online (Sandbox Code Playgroud)
正如其他答案告诉我的那样,我实际上需要将该 zip 文件添加到 PYTHONPATH 中。现在,我发现至少在 Spark 2.3+ 中不再如此,但让我们这样做:
spark.sparkContext.addPyFile("pyspark_jdbc.zip")
Run Code Online (Sandbox Code Playgroud)
查看集群日志,我看到:
19/10/21 22:40:56 INFO Utils: Fetching s3a://bucket/pyfiles/pyspark_jdbc.zip to
/var/data/spark-52e390f5-85f4-41c4-9957-ff79f1433f64/spark-402e0a00-6806-40a7-a17d-5adf39a5c2d4/userFiles-680c1bce-ad5f-4a0b-9160-2c3037eefc29/fetchFileTemp5609787392859819321.tmp
Run Code Online (Sandbox Code Playgroud)
因此,pyfiles 肯定被导入,但导入/var/data/...而不是导入我的工作目录。因此,当我将 .zip 文件的位置添加到 python 路径时,我不知道它在哪里。在尝试添加 python 文件之前对集群进行一些诊断:
> print(sys.path)
[...,
'/var/data/spark-52e390f5-85f4-41c4-9957-ff79f1433f64/spark-402e0a00-6806-40a7-a17d-5adf39a5c2d4/userFiles-680c1bce-ad5f-4a0b-9160-2c3037eefc29',
'/opt/spark/work-dir/s3a',
'//bucket/pyfiles/pyspark_jdbc.zip'
...]
> print(os.getcwd())
/opt/spark/work-dir
> subprocess.run(["ls", "-l"])
total 0
Run Code Online (Sandbox Code Playgroud)
所以我们看到 pyspark 确实尝试将s3a://我通过--py-filesPYTHONPATH 添加的文件添加到 PYTHONPATH 中,只是它错误地解释了:并且没有正确添加路径。该/var/data/...目录位于 PYTHONPATH …
我正在尝试在 Scala/Spark 中创建一个与语言无关的文本解析器。我正在使用正则表达式从大量文本中删除标点符号。
list_of_strings.map(_.replaceAll("""[\p{Punct}]"""))
Run Code Online (Sandbox Code Playgroud)
但是,我发现中文文本中有许多标点符号没有删除。到目前为止,我已将[?????]这些字符添加到我的正则表达式中。请注意,其中许多与拉丁标点符号相似,但并不完全相同。
Java 正则表达式中是否有一个“规范”函数可以让我去除其他语言中的标点符号?除了中文,我还担心阿拉伯语。
此外,我想确保一种语言中的标点符号在另一种语言中不是关键的语言组件。我不熟悉中文或阿拉伯语,所以我正在寻找由母语人士预先制作的东西。
我有一个 DataFrame 字段,它是Seq[Seq[String]]我构建的 UDF 以将所述列转换为 Seq[String] 列;基本上,一个flatten来自 Scala的函数的 UDF 。
def combineSentences(inCol: String, outCol: String): DataFrame => DataFrame = {
def flatfunc(seqOfSeq: Seq[Seq[String]]): Seq[String] = seqOfSeq match {
case null => Seq.empty[String]
case _ => seqOfSeq.flatten
}
df: DataFrame => df.withColumn(outCol, udf(flatfunc _).apply(col(inCol)))
}
Run Code Online (Sandbox Code Playgroud)
我的用例是字符串,但显然,这可能是通用的。您可以在 DataFrame 转换链中使用此函数,例如:
df.transform(combineSentences(inCol, outCol))
Run Code Online (Sandbox Code Playgroud)
是否有一个 Spark 内置函数可以做同样的事情?我一直没能找到一个。