小编mgo*_*ser的帖子

Pickle Spark RDD 并将其读入 Python

我正在尝试通过 pickle 来序列化 Spark RDD,并将 pickled 文件直接读入 Python。

a = sc.parallelize(['1','2','3','4','5'])
a.saveAsPickleFile('test_pkl')
Run Code Online (Sandbox Code Playgroud)

然后我将 test_pkl 文件复制到本地。如何将它们直接读入Python?当我尝试使用普通的 pickle 包时,当我尝试读取“test_pkl”的第一个 pickle 部分时,它会失败:

pickle.load(open('part-00000','rb'))

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib64/python2.6/pickle.py", line 1370, in load
    return Unpickler(file).load()
  File "/usr/lib64/python2.6/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib64/python2.6/pickle.py", line 970, in load_string
    raise ValueError, "insecure string pickle"
ValueError: insecure string pickle
Run Code Online (Sandbox Code Playgroud)

我认为 Spark 使用的 pickle 方法与 python pickle 方法不同(如果我错了,请纠正我)。有什么方法可以让我从 Spark 中腌制数据并将这个腌制对象直接从文件中读取到 python 中吗?

python pickle apache-spark pyspark

5
推荐指数
1
解决办法
1万
查看次数

在pyspark中使用缓冲区收集RDD

我想要一种方法一次(或小批量)从我的RDD返回行,以便我可以在需要时在本地收集行.我的RDD足够大,无法容纳名称节点上的内存,因此运行collect()会导致错误.

有没有办法重新创建collect()操作但使用生成器,以便RDD中的行传递到缓冲区?另一种选择是take()从缓存的RDD一次到100000行,但我不认为take()允许您指定起始位置?

apache-spark pyspark

5
推荐指数
1
解决办法
1451
查看次数

在 HDFStore 组中存储多个对象

我想在 HDFStore 中存储多个对象,但我想通过分组来组织它。类似的东西:

import pandas as pd
my_store = pd.HDFStore('my_local_store.h5')
my_store._handle.createGroup('/', 'data_source_1') # this works, but I'm not sure what it does
my_store['/data_source_1']['part-1'] = pd.DataFrame({'b':[1,2,9,2,3,5,2,5]}) # this does not work
my_store['/data_source_1']['part-2'] = pd.DataFrame({'b':[3,8,4,2,5,5,6,1]}) # this does not work either
Run Code Online (Sandbox Code Playgroud)

python hdf5 pandas hdfstore

5
推荐指数
1
解决办法
1043
查看次数

正则表达式删除首字母缩略词的句号?

我想从一串文本中删除首字母缩略词的句点,但我也希望o在常规句点(例如句子末尾)中留下.

那么下面这句话:

"The C.I.A. is a department in the U.S. Government."
Run Code Online (Sandbox Code Playgroud)

应该成为

"The CIA is a department in the US Government."
Run Code Online (Sandbox Code Playgroud)

有没有一种干净的方法来使用Python做到这一点?到目前为止,我有两个步骤:

words = "The C.I.A. is a department in the U.S. Government."
words = re.sub(r'([A-Z].[A-Z.]*)\.', r'\1', words)
print words
# The C.I.A is a department in the U.S Government.    
words = re.sub(r'\.([A-Z])', r'\1', words)
print words
# The CIA is a department in the US Government.
Run Code Online (Sandbox Code Playgroud)

python regex

5
推荐指数
1
解决办法
1744
查看次数

嵌套GridSearchCV

对于给定的模型类型,我都想1)为各种模型类型调整参数,以及2)找到最佳的调整模型类型。我想用GridSearchCV这个。

我能够执行以下操作,但是我也担心这不能按我期望的方式工作,并且我还担心也许您不需要嵌套GridSearchCV-是否可以使用一个嵌套GridSearchCV

我对嵌套GridSearchCV的一个担心是,我可能还会进行嵌套的交叉验证,因此与其对66%的火车数据进行网格搜索,还不如对43.56%的火车数据进行网格搜索。我还有一个担心是,我增加了代码复杂度。

这是我GridSearchCV使用虹膜数据集的嵌套示例:

import numpy as np 
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.decomposition import KernelPCA
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC

iris_raw_data = load_iris()
iris_df = pd.DataFrame(np.c_[iris_raw_data.data, iris_raw_data.target], 
                       columns=iris_raw_data.feature_names + ['target'])
iris_category_labels = {0:'setosa', 1:'versicolor', 2:'virginica'}
iris_df['species_name'] = iris_df['target'].apply(lambda l: iris_category_labels[int(l)])

features = ['sepal length (cm)', 'sepal width …
Run Code Online (Sandbox Code Playgroud)

python scikit-learn grid-search

5
推荐指数
0
解决办法
570
查看次数

避免在 Linux 中将进度条写入文件

我有一个打印输出的进程,但此输出还包括加载栏。

我想将此输出写入文件并显示此输出。

通常我可以这样做:

./my_process.sh | tee -a my_log_file.txt

或者

./my_process.sh >> my_log_file.txt
tail -f my_log_file.txt
Run Code Online (Sandbox Code Playgroud)

这会将所有内容打印到我的终端,但它也会将所有内容打印到日志文件,包括进度条的每一步!

我想排除进度条迭代被打印到日志文件中。

出于我的目的,任何带有回车符的行都可以从日志文件中排除。如何排除回车行附加到日志文件,同时仍将它们打印到终端上的标准输出?

linux bash stdout

4
推荐指数
1
解决办法
1559
查看次数

内存高效的笛卡尔加入PySpark

我有一个庞大的字符串数据集,可以放入我的spark集群中的单个节点上的内存中.问题是它占用了单个节点的大部分内存.

这些ID长约30个字符.例如:

ids
O2LWk4MAbcrOCWo3IVM0GInelSXfcG
HbDckDXCye20kwu0gfeGpLGWnJ2yif
o43xSMBUJLOKDxkYEQbAEWk4aPQHkm
Run Code Online (Sandbox Code Playgroud)

我希望写入所有id对的列表.例如:

id1,id2
O2LWk4MAbcrOCWo3IVM0GInelSXfcG,HbDckDXCye20kwu0gfeGpLGWnJ2yif
O2LWk4MAbcrOCWo3IVM0GInelSXfcG,o43xSMBUJLOKDxkYEQbAEWk4aPQHkm
HbDckDXCye20kwu0gfeGpLGWnJ2yif,O2LWk4MAbcrOCWo3IVM0GInelSXfcG
# etc...
Run Code Online (Sandbox Code Playgroud)

所以我需要交叉加入数据集本身.我希望使用10节点集群在PySpark上执行此操作,但它需要内存效率.

cartesian-product cross-join apache-spark pyspark

4
推荐指数
1
解决办法
8565
查看次数

网格搜索分类的自定义评分功能

我想在 scikit-learn 中执行 a GridSearchCVfor a RandomForestClassifier,并且我有一个我想使用的自定义评分函数。

评分函数仅在提供概率时才有效(例如, rfc.predict_proba(...)必须调用而不是rfc.predict(...)

如何指示 GridSearchCV 使用predict_proba()而不是predict()

from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier

def my_custom_loss_func(ground_truth, predictions):
    # predictions must be probabilities - e.g. model.predict_proba()
    # example code here:
    diff = np.abs(ground_truth - predictions).max()
    return np.log(1 + diff)

param_grid = {'min_samples_leaf': [1, 2, 5, 10, 20, 50, 100], 'n_estimators': [100, 200, 300]}
grid = GridSearchCV(RandomForestClassifier(), param_grid=param_grid,
                    scoring=my_custom_loss_func)
Run Code Online (Sandbox Code Playgroud)

python classification scikit-learn grid-search

4
推荐指数
1
解决办法
1930
查看次数

Python,字典复制和更新不起作用?

为什么以下代码显示None为b,而不是{'a':1,'e':2}?Python 2.7.3

>>>> d = {'a' :1 }
>>>> b = d.copy().update({'e':2})
>>>> print b
None
>>>> d.update({'c':3})
>>>> print d
{'a': 1, 'c': 3}
Run Code Online (Sandbox Code Playgroud)

python dictionary copy

2
推荐指数
1
解决办法
977
查看次数

删除postgres中的重复项

对于给定的重复"external_id",我想删除除一行之外的所有行.下面的查询大约需要两分钟来运行我的5,000,000行表,我觉得必须有更快的方法来执行此任务."id"是主键,"external_id"是btree索引列:

delete from posts p1 using (select distinct on (1)
        external_id, id
        from posts
        order by 1 desc, 2 desc) p_recent 
    where p1.external_id = p_recent.external_id
    and p1.id != p_recent.id;
Run Code Online (Sandbox Code Playgroud)

如何提高此查询的性能?

编辑:下面的查询计划:

Delete on posts p1  (cost=2322413.28..2673548.11 rows=5583248 width=45) (actual time=148064.026..148064.026 rows=0 loops=1)
   ->  Hash Join  (cost=2322413.28..2673548.11 rows=5583248 width=45) (actual time=148064.025..148064.025 rows=0 loops=1)
         Hash Cond: ((p_recent.external_id)::text = (p1.external_id)::text)
         Join Filter: (p1.id <> p_recent.id)
         ->  Subquery Scan on p_recent  (cost=1565918.17..1649666.91 rows=5583249 width=54) (actual time=80975.573..98202.920 rows=5947083 loops=1)
               ->  Unique  (cost=1565918.17..1593834.42 rows=5583249 width=15) (actual …
Run Code Online (Sandbox Code Playgroud)

postgresql join sql-delete postgresql-9.3

2
推荐指数
1
解决办法
234
查看次数

postgres 中的顺序扫描花费的时间令人惊讶。如何确定硬件瓶颈?

我有一个普通的 postgres 数据库在一台小型服务器上运行,只有一个名为“posts”的表。该表大小约为 5GB,包含 900 万行。

当我运行一个简单的顺序扫描操作时,大约需要51 秒!:

EXPLAIN ANALYZE select count(*) from posts;
                                                    QUERY PLAN                                                        
--------------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=988701.41..988701.42 rows=1 width=0) (actual time=51429.607..51429.607 rows=1 loops=1)
   ->  Seq Scan on posts  (cost=0.00..966425.33 rows=8910433 width=0) (actual time=0.004..49530.025 rows=9333639 loops=1)
 Total runtime: 51429.639 ms
(3 rows)
Run Code Online (Sandbox Code Playgroud)
  • 服务器规格:
    • 至强 E3-1220v2
    • 4GB内存
    • 500GB 硬盘(原厂 7200rpm,无 RAID)
    • postgres 9.1
    • 乌班图12.04
    • 无 L1 或 L2 缓存
    • Postgres 在 4 个核心之一上运行
    • Postgres 配置是标准的,没什么特别的
    • 我已经隔离了服务器,并且服务器上没有运行任何其他重要的内容

当查询运行时,磁盘读取速度约为 122M/s(根据 iotop),“IO>”约为 90%。只有 1 个核心的使用率为其容量的 12%。看起来此操作中几乎没有使用内存,可能约为 5MB。

从这些统计数据来看,瓶颈似乎是 IO,但我很困惑,因为磁盘的读取速度更快,(根据我使用的速度测试,sudo hdparm …

sql postgresql io performance

1
推荐指数
1
解决办法
1028
查看次数

Python如何评估或声明多个条件?

偶尔,我有这个问题,我想检查:

a = 'my string'
if 'string1' in a or 'string2' in a or 'string3' in a ... or 'stringN' in a:
    [[do something]]
else:
    [[something else]]
Run Code Online (Sandbox Code Playgroud)

假设我知道有90%的机会'string1' in a评估True.Python是否还会评估'string2' in a在这种情况下'string1' in aTrue哪个?或者在技术上写作更有效:

if 'string1' in a:
    [[do something]]
elif 'string2' in a:
    [[do something]]
elif 'string3' in a:
    [[do something]]
...
elif 'stringN' in a:
    [[do something]]
else:
    [[something else]]
Run Code Online (Sandbox Code Playgroud)

python boolean-logic conditional-statements

1
推荐指数
1
解决办法
1758
查看次数

修改goroutine中的结构?

我正在尝试使用goroutine,似乎我无法在goroutine中修改结构的值(下面的示例).这有什么工作吗?

编辑:如果我放置一个睡眠语句,似乎代码运行,表明如果给定更多时间,goroutines将运行,但它们在main()中的所有内容都已执行后完成运行.在继续之前,我如何"等待"我的goroutines完成?

package main

import (
    "fmt"
)

type num struct {
    val int
}

func (d *num) cube_val() {
    fmt.Println("changing value...")
    d.val = d.val*d.val*d.val 
}

func main() {
    a := []num{num{1},num{3},num{2},num{5},num{4}}
    for i := range a {
        go a[i].cube_val()
    }
    // code that waits for go routines to finish should get inserted here ...
    fmt.Println(a) // change does NOT happen

    for i := range a {
        a[i].cube_val()
    }
    fmt.Println(a) // change happens, and fmt.Println statements worked?
}
Run Code Online (Sandbox Code Playgroud)

concurrency struct go goroutine

0
推荐指数
1
解决办法
583
查看次数