小编fly*_*zai的帖子

你需要在Pyspark SQL中使用lit()?

我试图弄清楚你需要在哪里使用一个lit值,这个值literal column在文档中被定义为.

以此为例udf,它返回SQL列数组的索引:

def find_index(column, index):
    return column[index]
Run Code Online (Sandbox Code Playgroud)

如果我要将一个整数传入此中,我会收到错误.我需要将lit(n)值传递给udf以获取数组的正确索引.

有没有我可以更好地学习的时候用硬性规定的地方lit和可能col呢?

python apache-spark apache-spark-sql pyspark

21
推荐指数
2
解决办法
4万
查看次数

如何使用boto3(或其他方式)在emr上自动执行pyspark作业?

我正在创建一个解析大量服务器数据的工作,然后将其上传到Redshift数据库中.

我的工作流程如下:

  • 从S3获取日志数据
  • 使用spark dataframes或spark sql来解析数据并写回S3
  • 将数据从S3上传到Redshift.

我已经开始讨论如何自动执行此操作,以便我的进程旋转EMR集群,引导正确的安装程序,并运行我的python脚本,其中包含用于解析和编写的代码.

有没有人有任何可以与我分享的示例,教程或经验,以帮助我学习如何做到这一点?

python amazon-s3 amazon-emr apache-spark pyspark

10
推荐指数
1
解决办法
9645
查看次数

如何根据PEP8格式化长SQL查询

根据PEP 8(最大行长度),行不应超过79个字符.

但是,当我尝试拆分查询时,会遇到连续字符和无效标记等问题.

例如,根据PEP8,格式化此查询的最佳方法是什么?

cursor.execute("SELECT pivot_id FROM aud_qty WHERE hshake1 is NULL AND ((strftime('%s', DATETIME('now')) - strftime('%s', sent_to_pivot)) / (60)) > 30;")
Run Code Online (Sandbox Code Playgroud)

python pep8 python-2.7

9
推荐指数
1
解决办法
5411
查看次数

在同一个调用中从Spark Dataframes split方法中选择数组元素?

我正在拆分HTTP请求以查看元素,我想知道是否有一种方法可以指定我想在同一个调用中查看的元素而无需执行其他操作.

例如:

from pyspark.sql import functions as fn

df.select(fn.split(df.http_request, '/').alias('http'))
Run Code Online (Sandbox Code Playgroud)

给我一个新Dataframe的数组行如下:

+--------------------+
|                http|
+--------------------+
|[, courses, 26420...|
Run Code Online (Sandbox Code Playgroud)

我想要索引1(课程)中的项目,而不必再做另一个select语句指定df.select(df.http[1])或其他.这可能吗?

python apache-spark apache-spark-sql pyspark

8
推荐指数
2
解决办法
6756
查看次数

Render_to_string和response.content.decode()不匹配

我正在编写我的第一个Django应用程序,跟着这本书:

http://chimera.labs.oreilly.com/books/1234000000754/ch05.html#_passing_python_variables_to_be_rendered_in_the_template

在书中有一个测试,验证html是否按预期返回.这是测试:

def test_home_page_returns_correct_html(self):
        request = HttpRequest()
        response = home_page(request)
        expected_html = render_to_string('home.html')
        print(expected_html)
        print(response.content.decode())
        self.assertEqual(response.content.decode(), expected_html)
Run Code Online (Sandbox Code Playgroud)

我的测试在测试中失败,assertEqual因为我csrf token在我的HTML中添加了一个Django Template Language.这是我的HTML页面的样子:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>To-Do lists</title>
</head>
<body>
    <h1>Your To-Do list</h1>
    <form method="POST">
            <input name="item_text" id="id_new_item" placeholder="Enter a to-do item"/>
            {% csrf_token %}
    </form>

    <table id="id_list_table">
        <tr><td>{{ new_item_list }}</td></tr>
    </table>
</body>
</html>
Run Code Online (Sandbox Code Playgroud)

由于render_to_string方法不包括令牌,我的断言失败.以下是print我的测试中包含的两个陈述:

F<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>To-Do lists</title>
</head> …
Run Code Online (Sandbox Code Playgroud)

python django html5

7
推荐指数
1
解决办法
1712
查看次数

是否可以在不使用boto3下载的情况下获取S3文件的内容?

我正在研究从Redshift数据库转储文件的过程,并且不希望本地下载文件来处理数据.我看到Java有一个StreamingObject类可以做我想要的,但我没有看到类似的东西boto3.

python amazon-s3 boto3

7
推荐指数
1
解决办法
4739
查看次数

子进程命令没有使用ls命令查找文件?

我正在创建一个程序,它将提取一个帐号列表,然后运行一个ls -lh命令来查找每个帐号.当我在没有Python的Linux服务器上运行我的命令时,它会提取文件没问题,但是当我通过Python执行它时它说它无法找到它们.

import subprocess as sp
sp.call(['cd', input_dir])
for i, e in enumerate(piv_id_list):
    proc_out = sp.Popen(['ls', '-lh', '*CSV*APP*{0}.zip'.format(e)])
    proc_out_list.append(proc_out)
    print(proc_out)
Run Code Online (Sandbox Code Playgroud)

这是通过Python解释器运行命令时的一些示例输出:

>>> ls:无法访问*CSV1000*APP*:没有这样的文件或目录

但是通过Linux同样的命令:

ls -lh *CSV*APP*
Run Code Online (Sandbox Code Playgroud)

它会像它应该的那样返回输出.

python subprocess python-2.6

6
推荐指数
2
解决办法
1116
查看次数

记录模块未写入文件

我正在使用logging模块,并且传递了与当前正在执行的其他作业相同的参数:

import logging
from inst_config import config3

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] - %(message)s',
    filename=config3.GET_LOGFILE(config3.REQUESTS_USAGE_LOGFILE))
logging.warning('This should go in the file.')

if __name__ == '__main__':
    logging.info('Starting unload.')
Run Code Online (Sandbox Code Playgroud)

使用此方法创建文件名:

REQUESTS_USAGE_LOGFILE = r'C:\RunLogs\Requests_Usage\requests_usage_runlog_{}.txt'.format(
        CUR_MONTH)
def GET_LOGFILE(logfile):
    """Truncates file and returns."""
    with open(logfile, 'w'):
        pass
    return logfile
Run Code Online (Sandbox Code Playgroud)

但是,当我运行它时,它正在创建文件,然后仍将日志记录信息输出到控制台。我正在跑步Powershell

只是尝试将其放在主语句中,如下所示:

if __name__ == '__main__':
    logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] - %(message)s',
    filename=config3.GET_LOGFILE(config3.REQUESTS_USAGE_LOGFILE))

    logging.warning('This should go in the file.')
Run Code Online (Sandbox Code Playgroud)

仍然没有运气。

python windows powershell logging

6
推荐指数
2
解决办法
9497
查看次数

你能使用Spark SQL/Hive/Presto直接从Parquet/S3复制到Redshift吗?

我们存储了大量的服务器数据S3(很快就会采用Parquet格式化).数据需要一些转换,因此它不能是S3的直接副本.我将Spark用来访问数据,但我想知道是不是用Spark操纵它,写回S3,然后复制到Redshift如果我可以跳过一步并运行查询来拉/变换数据,然后直接复制到Redshift?

hadoop amazon-s3 apache-spark apache-spark-sql

6
推荐指数
1
解决办法
4954
查看次数

如何在键(元组)的第一个元素上对字典进行排序

我有一个字典,其中每个键都是值的元组,我想使用该sorted()方法在元组的第一个元素上对字典进行排序。我的代码如下所示:

def mapData(header_list, dict_obj):
    master_dict = {}
    client_section_list = []
    for element in header_list:
        for row in dict_obj:
            if (row['PEOPLE_ID'], row['DON_DATE']) == element:
                client_section_list.append(row)
        element = list(element)
        element_list = [client_section_list[0]['DEDUCT_AMT'],
                    client_section_list[0]['ND_AMT'],
                    client_section_list[0]['DEDUCT_YTD'],
                    client_section_list[0]['NONDEDUCT_YTD']
                    ]
        try:
            element_list.append((float(client_section_list[0]['DEDUCT_YTD']) +
                                 float(client_section_list[0]['NONDEDUCT_YTD'])
                                 ))
        except ValueError:
            pass

    element.extend(element_list)
    element = tuple(element)
    master_dict[element] = client_section_list
    client_section_list = []
return sorted(master_dict, key=lambda key: key[master_dict[(1)]]
Run Code Online (Sandbox Code Playgroud)

最后一行是我试图找到一种对其进行排序的方法。我的元组看起来像这样:

(312178078,6/22/15,25,0,25,0,25.0)
Run Code Online (Sandbox Code Playgroud)

python sorting dictionary python-2.7

5
推荐指数
1
解决办法
5782
查看次数