我定义了三个任务T1,T2和T3,然后执行T4如下任务:
class T4(luigi.Task)
def requires(self):
return [T1(), T2(), T3()]
Run Code Online (Sandbox Code Playgroud)
有没有一种自然的方式来告诉路易,我想这些任务T1,T2以及T3在并行执行?
我有一个我通过谷歌数据交换机创建的火花簇.我希望能够使用databricks 的csv库(请参阅https://github.com/databricks/spark-csv).所以我首先测试它是这样的:
我开始与我的集群的主节点进行ssh会话,然后我输入:
pyspark --packages com.databricks:spark-csv_2.11:1.2.0
Run Code Online (Sandbox Code Playgroud)
然后它启动了一个pyspark shell,我在其中输入:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('gs:/xxxx/foo.csv')
df.show()
Run Code Online (Sandbox Code Playgroud)
它奏效了.
我的下一步是使用以下命令从我的主机启动此作业:
gcloud beta dataproc jobs submit pyspark --cluster <my-dataproc-cluster> my_job.py
Run Code Online (Sandbox Code Playgroud)
但在这里它不起作用,我得到一个错误.我想因为我没有--packages com.databricks:spark-csv_2.11:1.2.0作为一个论点,但我尝试了10种不同的方式来给它,我没有管理.
我的问题是:
pyspark --packages com.databricks:spark-csv_2.11:1.2.0job.py来导入它吗?我试图最小化一个很长的函数(它是 500000 个子函数部分的总和),以便将一些参数拟合到概率模型中。我用这个scipy.optimize.minimize功能。我尝试了算法Powell和Nelder-Mead算法,鲍威尔在我的设置中看起来真的更快。但是,我真的不明白如何强制该过程在给定时间后给我一些结果,即使它们不是“最佳”。
我填写了 options maxiter, maxfev, xtoland ftol,但我并不真正理解这些选项,因为我试图print在我的函数中放入 a并且我注意到该算法对其进行了maxfev多次评估,但是当它达到 maxiter 点时,它会发送一个错误“达到最大迭代次数”。
任何人都可以向我解释他们是如何处理我正在使用的两种算法的吗?该文档非常不清楚。
我的代码:
def log_likelihood(r, alpha, a, b, customers):
if r <= 0 or alpha <= 0 or a <= 0 or b <= 0:
return -np.inf
c = sum([log_likelihood_individual(r, alpha, a, b, x, tx, t) for x, tx, t in customers])
print -c
return c
negative_ll = lambda params: -log_likelihood(*params,customers=customers)
params0 = (1, 1, …Run Code Online (Sandbox Code Playgroud) 我知道这可能是一个新手问题,所以请原谅我,但我没有找到对我/帮助我似乎有意义的答案。我正在从.csv文件导入数据,需要切出特定部分。我已经做到了,当我打印数据框时,所有需要的值都出现了。
df = pd.read_csv('C:/Users/Owner/Desktop/df.csv')
dfr = df[44:58] #rows I need
dfrc = dfr[['1','2','3','4','5']] #columns I need
dfrc.mean(axis=1 skipna=True) #there are some NaNs present in the last index
Run Code Online (Sandbox Code Playgroud)
返回什么
44 NaN
...
57 NaN
dtype: float64
Run Code Online (Sandbox Code Playgroud)
我不确定为什么会这样,但是我需要数值来表示索引/行的平均值。我希望论坛中的某人能够提供帮助。谢谢。
我有以下数据集df:
import numpy.random
import pandas
cat = pandas.Series(numpy.random.random_integers(0,400,1000000))
ids = pandas.Series(numpy.random.random_integers(0,10000,1000000))
team = pandas.Series(numpy.random.random_integers(0,1,1000000))
df = pandas.concat([ids,cat,team],axis=1)
df.columns = ['ids','cat','team']
Run Code Online (Sandbox Code Playgroud)
请注意,该列中只有 400 个不同的类别cat。因此,我想为机器学习分类准备数据集,即为 0 到 400 之间的每个不同类别值创建一列,对于每一行,如果 id 具有相应的类别,则写入 1,否则写入 0。我的目标是创建一个 groupby ids,并对每个类别列的 1 求和,如下所示:
df2 = pandas.get_dummies(df['cat'], sparse=True)
df2['ids'] = df['ids']
df3 = df2.groupby('ids').sum()
Run Code Online (Sandbox Code Playgroud)
我的问题是它groupby.sum()非常非常长,太长了(超过 30 分钟)。所以我需要一个不同的策略来进行计算。这是第二次尝试。
from sklearn import preprocessing
import numpy
text_encoder = preprocessing.OneHotEncoder(dtype=numpy.int)
X = text_encoder.fit_transform(df.drop(['team','ids'],axis=1).values).astype(int)
Run Code Online (Sandbox Code Playgroud)
但是,X是一个稀疏的 scipy 矩阵。这里我有两个选择:要么找到一种groupby.sum()有效处理这个稀疏 scipy 矩阵的方法,要么将其转换为真正的 numpy 矩阵,.toarray()如下所示: …
我有一个包含两列的数据框:score1哪个是numeric,truth1哪个是boolean.我想预测truth1使用score1.要做到这一点,我想要一个简单的线性模型,然后要求一个好的阈值,即一个阈值,它在我的ROC曲线中给出了75%的灵敏度.因此,我这样做:
roc_curve = roc(truth1 ~ score1 , data = my_data)
coords(roc=roc_curve, x = 0.75, input='sensitivity', ret='threshold')
Run Code Online (Sandbox Code Playgroud)
我的问题是coords返回'NA',因为0.75的灵敏度没有出现在ROC曲线中.所以这是我的问题:如何获得阈值,这给我的灵敏度至少为0.75,具有最大特异性?
我有一个很大pandas.DataFrame的内存负载,我想将它分成五个部分,以便使用模块中Pool的工作人员独立处理这些部分multiprocessing。
我的问题是我pandas.DataFrame的太大了,我无法将每个部分作为参数传递给我的multiprocessing函数:我收到一个pickle错误。
我的问题是:如何DataFrame将已经加载到 RAM 中的这些部分作为参数传递给我的函数,而不显式传递数据帧。
换句话说,是否可以将某种指向每个函数的 RAM 地址的指针传递DataFrame给multiprocessing函数,而不是完整的DataFrames?
我有一个pandas.DataFrame表格
low_bound high_bound name
0 10 'a'
10 20 'b'
20 30 'c'
30 40 'd'
40 50 'e'
Run Code Online (Sandbox Code Playgroud)
我有一个很长 pandas.Series的形式:
value
5.7
30.4
21
35.1
Run Code Online (Sandbox Code Playgroud)
我想为系列的每个值赋予与low_bound/high_bound/name DataFrame相关的相应名称.这是我的预期结果:
value name
5.7 'a'
30.4 'd'
21 'c'
35.1 'd'
Run Code Online (Sandbox Code Playgroud)
实际上,5.7名称是'a',因为5.7被排除在0到10之间.
什么是最有效的代码?我知道我可以通过迭代系列来解决问题,但也许有一个更快的矢量解决方案可以逃避我.
最后请注意我的界限可以是自定义和不规则的.为了这个例子,他们在这里是常规的.
我有以下 meta.yaml 文件来构建 conda 包:
package:
name: dsutils
version: 1.0
source:
git_rev: cat_files_bq_utils
git_url: https://**********.git
requirements:
build:
- python
- setuptools
run:
- python
- pandas
- scipy
- numpy
- scikit-learn ==0.17.0
- google-api-python-client
...
Run Code Online (Sandbox Code Playgroud)
当我尝试运行它时,出现以下错误:
Error: Packages/dependencies missing in current linux-64 channels:
- dsutils 1.0 py34_0 -> google-api-python-client
- google-api-python-client
Run Code Online (Sandbox Code Playgroud)
但是当我摆脱 google-api-python-client 依赖项时,包就成功构建了。请注意,google-api-python-client 无法通过conda install命令行安装,但可以使用 pip 安装。
我的问题是:如何在 meta.yaml conda 包文件中指定 pip 依赖项?
我有一个在bigquery中加载的数据表,我想通过pyspark .py文件在我的spark集群中导入它.
我在Dataproc + BigQuery示例中看到过- 任何可用的?有一种方法可以使用scala在spark集群中加载一个bigquery表,但有没有办法在pyspark脚本中执行此操作?
python google-bigquery apache-spark pyspark google-cloud-dataproc