我正在向elasticsearch发出一个查询,我正在获得多种记录类型.如何将结果限制为一种类型?
我正在使用Cloudera包裹中的Spark 0.9.0运行CDH 4.4.
我有一堆通过Pig的AvroStorage UDF创建的Avro文件.我想在Spark中加载这些文件,使用通用记录或Avro文件上的架构.到目前为止,我试过这个:
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.commons.lang.StringEscapeUtils.escapeCsv
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
import java.net.URI
import java.io.BufferedInputStream
import java.io.File
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.file.DataFileStream
import org.apache.avro.io.DatumReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.mapred.FsInput
val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00016.avro"
val inURI = new URI(input)
val inPath = new Path(inURI)
val fsInput = new FsInput(inPath, sc.hadoopConfiguration)
val reader = new GenericDatumReader[GenericRecord]
val dataFileReader = DataFileReader.openReader(fsInput, reader)
val schemaString = dataFileReader.getSchema
val buf = scala.collection.mutable.ListBuffer.empty[GenericRecord]
while(dataFileReader.hasNext) {
buf += …Run Code Online (Sandbox Code Playgroud) 我有一个 Python/Flask Web 应用程序,我通过 Gunicorn 在 Amazon ECS 上的 docker 映像中部署它。一切顺利,然后突然,包括最后一个成功的请求,我在日志中看到了这一点:
[2017-03-29 21:49:42 +0000] [14] [DEBUG] GET /heatmap_column/e4c53623-2758-4863-af06-91bd002e0107/ADA
[2017-03-29 21:49:43 +0000] [1] [INFO] Handling signal: term
[2017-03-29 21:49:43 +0000] [14] [INFO] Worker exiting (pid: 14)
[2017-03-29 21:49:43 +0000] [8] [INFO] Worker exiting (pid: 8)
[2017-03-29 21:49:43 +0000] [12] [INFO] Worker exiting (pid: 12)
[2017-03-29 21:49:43 +0000] [10] [INFO] Worker exiting (pid: 10)
...
[2017-03-29 21:49:43 +0000] [1] [INFO] Shutting down: Master
进程消失,程序退出。然后ECS重启服务,再次运行docker镜像,但同时服务中断。
什么会导致我的程序收到 TERM 信号?我在网上找不到任何关于这种情况的参考。请注意,这仅发生在 ECS 上的 Docker 中,而不发生在本地。
我正在分析2015年美国国内航班的准时性能记录.我需要按尾号分组,并将每个尾号的所有航班的日期分类列表存储在数据库中,以便我的应用程序检索.我不确定实现这一目标的两个选项中哪一个是最好的选择.
# Load the parquet file
on_time_dataframe = sqlContext.read.parquet('../data/on_time_performance.parquet')
# Filter down to the fields we need to identify and link to a flight
flights = on_time_dataframe.rdd.map(lambda x:
(x.Carrier, x.FlightDate, x.FlightNum, x.Origin, x.Dest, x.TailNum)
)
Run Code Online (Sandbox Code Playgroud)
我可以通过减少排序来实现这一目标......
# Group flights by tail number, sorted by date, then flight number, then
origin/dest
flights_per_airplane = flights\
.map(lambda nameTuple: (nameTuple[5], [nameTuple]))\
.reduceByKey(lambda a, b: sorted(a + b, key=lambda x: (x[1],x[2],x[3],x[4])))
Run Code Online (Sandbox Code Playgroud)
或者我可以在随后的地图工作中实现它......
# Do same in a map step, more efficient or does pySpark know …Run Code Online (Sandbox Code Playgroud) 如何使用 Flask test_client 将多个文件上传到一个 API 端点?
我正在尝试使用 Flask test_client 将多个文件上传到一个 Web 服务,该服务接受多个文件并将它们组合成一个大文件。
我的控制器看起来像这样:
@app.route("/combine/file", methods=["POST"])
@flask_login.login_required
def combine_files():
user = flask_login.current_user
combined_file_name = request.form.get("file_name")
# Store file locally
file_infos = []
for file_data in request.files.getlist('file[]'):
# Get the content of the file
file_temp_path="/tmp/{}-request.csv".format(file_id)
file_data.save(file_temp_path)
# Create a namedtuple with information about the file
FileInfo = namedtuple("FileInfo", ["id", "name", "path"])
file_infos.append(
FileInfo(
id=file_id,
name=file_data.filename,
path=file_temp_path
)
)
...
Run Code Online (Sandbox Code Playgroud)
我的测试代码如下所示:
def test_combine_file(get_project_files):
project = get_project_files["project"]
r = web_client.post(
"/combine/file",
content_type='multipart/form-data',
buffered=True,
follow_redirects=True, …Run Code Online (Sandbox Code Playgroud) python flask python-unittest multiple-file-upload web-api-testing
我在一个Hadoop集群上运行了一个Pig工作,它将一堆数据压缩成R可以处理的事情来进行队列分析.我有以下脚本,从第二行到最后一行,我有以下格式的数据:
> names(data)
[1] "VisitWeek" "ThingAge" "MyMetric"
Run Code Online (Sandbox Code Playgroud)
VisitWeek是一个约会.ThingAge和MyMetric是整数.
数据看起来像:
2010-02-07 49 12345
Run Code Online (Sandbox Code Playgroud)
我到目前为止的脚本是:
# Load ggplot2 for charting
library(ggplot2);
# Our file has headers - column names
data = read.table('weekly_cohorts.tsv',header=TRUE,sep="\t");
# Print the names
names(data)
# Convert to dates
data$VisitWeek = as.Date(data$VisitWeek)
data$ThingCreation = as.Date(data$ThingCreation)
# Fill in the age column
data$ThingAge = as.integer(data$VisitWeek - data$ThingCreation)
# Filter data to thing ages lt 10 weeks (70 days) + a sanity check for gt 0, and drop the creation week column …Run Code Online (Sandbox Code Playgroud) 我有一个d3网络,其中点通过线连接.我想用弯曲的SVG路径替换线条.我忘记了数学来计算控制点的坐标.有谁知道如何做到这一点?
例如,请看下面的图片:

存在点A和B.我现在用线L连接它们.我想用曲线C替换L.为此,我需要找到一条垂直于线L中点的线,长度M(长度设置为L的百分比),作为样条C的控制点.然后我需要定义一个SVG路径来定义C.
如何在d3中使用SVG执行此操作?我很久以前就已经在拉斐尔/ SVG做过这个了,但数学让我感到安慰.而且我不确定它在D3中是如何完成的.
如何将逻辑回归结果的统计图添加到散点图中?我想要彩色的0/1区域,该区域描绘了分类器的决策边界。
import pandas as pd
import numpy as np
import pylab as pl
import statsmodels.api as sm
# Build X, Y from file
f = open('ex2data2.txt')
lines = f.readlines()
x1 = []
x2 = []
y = []
for line in lines:
line = line.replace("\n", "")
vals = line.split(",")
x1.append(float(vals[0]))
x2.append(float(vals[1]))
y.append(int(vals[2]))
x1 = np.array(x1)
x2 = np.array(x2)
y = np.array(y)
x = np.vstack([x1, x2]).T
# Scatter plot 0/1s
pos_mask = y == 1
neg_mask = y == 0
pos_x1 …Run Code Online (Sandbox Code Playgroud) 无论我做什么,我都无法在引号中使用引号来保持输出.我是什么做的?这是OS X中的bash.我的代码在这里:Gist
read -d '' sbt_text=$(cat <<"EOF"
#!/bin/bash
SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M";
java $SBT_OPTS -jar `dirname $0`/sbt-launch.jar "$@"
EOF
)
echo $sbt_text
#!/bin/bash > SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"; > java -jar /sbt-launch.jar ""
Run Code Online (Sandbox Code Playgroud) 我在TSV中有一系列时间数据,如下所示:
ID \t Date \t Value
-------------------------------
1234567 \t 2009-01-01T00:00:00.000Z \t 121
12131 \t 2009-06-01T00:00:00.000Z \t 151
12131 \t 2009-07-01T00:00:00.000Z \t 15153
...
Run Code Online (Sandbox Code Playgroud)
它很容易适应RAM,但对于Excel来说太大了.
每个ID每月有一个值,但并非所有ID都有12个月的条目.
数据跨越12个月,但并非所有ID都有12个月.我想查看每个ID的数据,如果上个月有一个条目,请将当前月减去上个月并将其存储在新列中以获得增量.如果前一个月没有条目,则返回0.然后,对于每个月,我希望这些增量的前100个正面和负面以及ID.
我想在R中这样做,因为它在Excel中很难并且它一直在崩溃.我安装了R,Rattle等,我已经完成了基本的例子,但是......学习曲线很陡峭.我真的很感激一些帮助:)
我在MongoDB的Python/Flask中有一个简单的Web服务:https://github.com/rjurney/Collecting-Data/blob/master/src/python/web/index.py
它一遍又一遍地重复@ app.route代码:
@app.route("/email/<message_id>")
def email(message_id):
email = emaildb.find_one({"message_id": message_id})
print email
return render_template('partials/email.html', email=email)
# Enable /emails and /emails/ to serve the last 20 emaildb in our inbox unless otherwise specified
default_offsets={'offset1': 0, 'offset2': 0 + config.EMAIL_RANGE}
@app.route('/', defaults=default_offsets)
@app.route('/emails', defaults=default_offsets)
@app.route('/emails/', defaults=default_offsets)
@app.route("/emails/<int:offset1>/<int:offset2>")
def list_emaildb(offset1, offset2):
offset1 = int(offset1)
offset2 = int(offset2)
emails = emaildb.find()[offset1:offset2] # Uses a MongoDB cursor
nav_offsets = get_offsets(offset1, offset2, config.EMAIL_RANGE)
data = {'emails': emails, 'nav_offsets': nav_offsets, 'nav_path': '/emails/'}
return render_template('partials/emails.html', …Run Code Online (Sandbox Code Playgroud) python ×5
flask ×3
apache-spark ×2
r ×2
amazon-ecs ×1
analytics ×1
api ×1
avro ×1
bash ×1
charts ×1
contour ×1
css ×1
d3.js ×1
docker ×1
dsl ×1
geometry ×1
ggplot2 ×1
gunicorn ×1
hadoop ×1
html5 ×1
mapreduce ×1
matplotlib ×1
mongodb ×1
newline ×1
pyspark ×1
python-2.7 ×1
quote ×1
scala ×1
scatter-plot ×1
schema ×1
search ×1
slug ×1
spline ×1
svg ×1
web-services ×1