小编Ale*_*ani的帖子

Airflow DAG中的外部文件

我正在尝试访问Airflow任务中的外部文件来读取一些sql,我得到"找不到文件".有人遇到过这个吗?

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

dag = DAG(
    'my_dat',
    start_date=datetime(2017, 1, 1),
    catchup=False,
    schedule_interval=timedelta(days=1)
)

def run_query():
    # read the query
    query = open('sql/queryfile.sql')
    # run the query
    execute(query)

tas = PythonOperator(
    task_id='run_query', dag=dag, python_callable=run_query)
Run Code Online (Sandbox Code Playgroud)

日志状态如下:

IOError: [Errno 2] No such file or directory: 'sql/queryfile.sql'
Run Code Online (Sandbox Code Playgroud)

我知道我可以简单地将查询复制并粘贴到同一个文件中,但实际上并不是很简洁.有多个查询,文本真的很大,嵌入Python代码会损害可读性.

python airflow

11
推荐指数
3
解决办法
8225
查看次数

Python - 时间序列对齐和"迄今为止"的函数

我有一个包含以下前三列的数据集.包括购物篮ID(唯一标识符),销售金额(美元)和交易日期.我想为数据集的每一行计算以下列,我想在Python中使用它.

以前出售同一个篮子(如果有的话); 目前购物篮的销售数量 ; 当前购物篮的平均日期(如果有); 当前购物篮的最大日期(如果有)

Basket  Sale   Date       PrevSale SaleCount MeanToDate MaxToDate
88      $15 3/01/2012                1      
88      $30 11/02/2012      $15      2         $23        $30
88      $16 16/08/2012      $30      3         $20        $30
123     $90 18/06/2012               1      
477     $77 19/08/2012               1      
477     $57 11/12/2012      $77      2         $67        $77
566     $90 6/07/2012                1      
Run Code Online (Sandbox Code Playgroud)

我是Python的新手,我很难找到任何可以用花哨方式做的事情.我已经按照BasketID和Date对数据进行了分类(如上所述),因此我可以通过向每个单一篮子向前移动一次来批量进行先前的销售.不知道如何以有效的方式获得MeanToDate和MaxToDate除了循环...任何想法?

python date time-series alignment pandas

5
推荐指数
1
解决办法
3943
查看次数

通过Python中的JayDeBeApi JDBC连接到DB2

我一直在努力通过OSX(maveriks)上的Python客户端连接到DB2.一个有效的选项似乎是使用JayDeBeApi,但运行以下代码...

import jaydebeapi
import jpype

jar = '/opt/IBM/db2/V10.1/java/db2jcc4.jar' # location of the jdbc driver jar
args='-Djava.class.path=%s' % jar
jvm = jpype.getDefaultJVMPath()
jpype.startJVM(jvm, args)

jaydebeapi.connect('com.ibm.db2.jcc.DB2Driver',
'jdbc:db2://server:port/database','myusername','mypassword')
Run Code Online (Sandbox Code Playgroud)

我会收到以下错误

Traceback (most recent call last):
  File "<pyshell#67>", line 2, in <module>
    'jdbc:db2://server:port/database','myusername','mypassword')
  File "/Library/Python/2.7/site-packages/jaydebeapi/dbapi2.py", line 269, in connect
    jconn = _jdbc_connect(jclassname, jars, libs, *driver_args)
  File "/Library/Python/2.7/site-packages/jaydebeapi/dbapi2.py", line 117, in _jdbc_connect_jpype
    return jpype.java.sql.DriverManager.getConnection(*driver_args)
com.ibm.db2.jcc.am.SqlSyntaxErrorExceptionPyRaisable: com.ibm.db2.jcc.am.SqlSyntaxErrorException: [jcc][t4][10205][11234][3.63.123] Null userid is not supported. ERRORCODE=-4461, SQLSTATE=42815
Run Code Online (Sandbox Code Playgroud)

所以基本上我连接到服务器,但由于某种原因我没有使用提供的用户名和密码.关于如何正确传递用户名和密码的任何想法?我无法准确找到这个问题的进一步说明,欢迎提出任何建议或提示.

python db2 jdbc jaydebeapi

4
推荐指数
1
解决办法
1万
查看次数

Python多进程共享内存与使用参数

我试图了解在不同进程之间共享相同数据源的最有效和耗费更少内存的方法.

想象一下以下代码,简化了我的问题.

import pandas as pd
import numpy as np
from multiprocessing import Pool

# method #1
def foo(i): return data[i]
if __name__ == '__main__':
    data = pd.Series(np.array(range(100000)))
    pool = Pool(2)
    print pool.map(foo,[10,134,8,1])

# method #2
def foo((data,i)): return data[i]
if __name__ == '__main__':
    data = pd.Series(np.array(range(100000)))
    pool = Pool(2)
    print pool.map(foo,[(data,10),(data,134),(data,8),(data,1)])
Run Code Online (Sandbox Code Playgroud)

在第一种方法中,将使用全局变量(在Windows上不起作用,仅在Linux/OSX上),然后由函数访问.在第二种方法中,我将"数据"作为参数的一部分传递.

就过程中使用的内存而言,两种方法之间会有区别吗?

# method #3
def foo((data,i)): return data[i]
if __name__ == '__main__':
    data = pd.Series(np.array(range(100000)))
    pool = Pool(2)
    # reduce the size of the argument passed
    data1 …
Run Code Online (Sandbox Code Playgroud)

python dataset shared-memory multiprocessing pandas

4
推荐指数
1
解决办法
1257
查看次数

Pandas保存并读取DataFrame到json

我正在尝试将一些带有日期的pandas DataFrame保存到json然后再读回来.虽然当我从json读回来时,我得到的日期是不同的格式(大多数时候需要再次解析日期).

import pandas as pd
df = pd.Series(range(7),pd.date_range('2014-10-01','2014-10-07')).reset_index()
df.columns = ['LoadDate','number']
print df

    LoadDate  number
0 2014-10-01       0
1 2014-10-02       1
2 2014-10-03       2
3 2014-10-04       3
4 2014-10-05       4
5 2014-10-06       5
6 2014-10-07       6
Run Code Online (Sandbox Code Playgroud)

现在,如果我要导出并回读,我会得到以下结果(不需要的)

print pd.read_json(df.to_json())

        LoadDate  number
0  1412121600000       0
1  1412208000000       1
2  1412294400000       2
3  1412380800000       3
4  1412467200000       4
5  1412553600000       5
6  1412640000000       6
Run Code Online (Sandbox Code Playgroud)

我希望能够以原始格式回读日期.我怎么能以干净的方式做到这一点?

这是有趣的部分.

df.columns = ['date','number']
print pd.read_json(df.to_json())

        date  number
0 2014-10-01       0
1 2014-10-02       1
2 2014-10-03 …
Run Code Online (Sandbox Code Playgroud)

python json pandas

2
推荐指数
1
解决办法
4734
查看次数