我正在玩Spark.它是来自网站的默认预构建分发版(0.7.0),具有默认配置,群集模式,一个工作者(我的本地主机).我阅读了有关安装的文档,一切似乎都很好.
我有一个CSV文件(各种大小,1000到100万行).如果我使用小输入文件(例如1000行)运行我的应用程序,一切都很好,程序在几秒钟内完成并产生预期的输出.但是当我提供更大的文件(100.000行,或100万行)时,执行失败.我试图挖掘日志,但没有多大帮助(它重复整个过程大约9-10次,然后在失败后退出.此外,还有一些与从某些空源获取失败相关的错误).
第一个JavaRDD返回的结果Iterable对我来说是可疑的.如果我返回一个硬编码的单例列表(如res.add("something");返回res;),一切都很好,即使有一百万行.但是,如果我添加我想要的所有键(28个字符串,长度为6-20个字符),则该过程仅在输入较大时才会失败.问题是,我需要所有这些密钥,这是实际的业务逻辑.
我正在使用Linux amd64,四核,8GB内存.最新的Oracle Java7 JDK.Spark配置:
SPARK_WORKER_MEMORY=4g
SPARK_MEM=3g
SPARK_CLASSPATH=$SPARK_CLASSPATH:/my/super/application.jar
Run Code Online (Sandbox Code Playgroud)
我必须提一下,当我启动该程序时,它说:
13/05/30 11:41:52 WARN spark.Utils: Your hostname, *** resolves to a loopback address: 127.0.1.1; using 192.168.1.157 instead (on interface eth1)
13/05/30 11:41:52 WARN spark.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Run Code Online (Sandbox Code Playgroud)
这是我的计划.它基于JavaWordCount示例,最低限度地修改.
public final class JavaWordCount
{
public static void main(final String[] args) throws Exception
{
final JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
System.getenv("SPARK_HOME"), new String[] {"....jar" });
final JavaRDD<String> words = ctx.textFile(args[1], …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用raspberry pi从USB摄像头捕获图像并使用Django框架流式传输我尝试使用StreamingHttpResponse从Opencv2流式传输帧.但是,它只显示1帧而不替换图像.
如何实时替换图像?
这是我的代码.
from django.shortcuts import render
from django.http import HttpResponse,StreamingHttpResponse
import cv2
import time
class VideoCamera(object):
def __init__(self):
self.video = cv2.VideoCapture(0)
def __del__(self):
self.video.release()
def get_frame(self):
ret,image = self.video.read()
ret,jpeg = cv2.imencode('.jpg',image)
return jpeg.tobytes()
def gen(camera):
while True:
frame = camera.get_frame()
yield(frame)
time.sleep(1)
def index(request):
# response = HttpResponse(gen(VideoCamera())
return StreamingHttpResponse(gen(VideoCamera()),content_type="image/jpeg")
Run Code Online (Sandbox Code Playgroud) 我使用AngularFire2从Firebase数据库中获取数据(实时).
我做了什么:
{"class":{"student":{"Tom":"male","Mary":"female","Peter":"male","Laura":"female"},"numberOfStudent":10} }
app.component.ts
import { AngularFireDatabase } from 'angularfire2/database';
import { Observable } from 'rxjs/Observable';
...
export class AppComponent {
class: Observable<any>;
students: Observable<any[]>;
constructor(private db: AngularFireDatabase) {
this.class = db.object(‘class’).valueChanges();
this.students = db.list(‘class/student’).snapshotChanges();
}
}
Run Code Online (Sandbox Code Playgroud)app.component.html:
<h2>Class size: {{ (class | async)?.numberOfStudent }}</h2>
<ul>
<li *ngFor="let i of students | async">
{{i.key}} : {{i.value}}
</li>
</ul>Run Code Online (Sandbox Code Playgroud)
发生了什么:
班级人数:10
汤姆:
玛丽 :
彼得:
劳拉:
它不返回list的值.
任何建议表示赞赏.
我正在玩Storm,我想知道Storm在聚合时指定(如果可能)窗口大小(如果可能).例如,如果我们想在Twitter上找到前一小时的热门话题.我们如何指定螺栓应该每小时返回结果?这是以编程方式在每个螺栓内完成的吗?或者是指定"窗口"的某种方式?
消息被消费或处理后,如何从Kafka获得确认.可能听起来很愚蠢,但有没有办法知道收到确认的消息的开始和结束偏移量?
我正在通过wifi热点或wifi直接实时进行项目推送,但我无法通过一个热点连接10个以上的设备作为Android给出此限制.但我想连接40-50个设备,所以任何人都可以告诉我如何连接多个设备,这样一条消息就可以在一台设备上实时传送到所有50台设备,而无需任何路由器或其他硬件.
任何帮助,将不胜感激.
我正在经历实时聚合而不是实时更新。我有什么遗漏的吗?
2.4.2使用当前 docker 镜像的版本的可重现示例timescale/timescaledb:latest-pg12:
CREATE TABLE data
(
time TIMESTAMPTZ NOT NULL,
value DOUBLE PRECISION NOT NULL
);
SELECT create_hypertable('data', 'time', chunk_time_interval => interval '1d');
INSERT INTO data (time, value)
VALUES ('2020-01-01', 100);
CREATE MATERIALIZED VIEW data_daily WITH (timescaledb.continuous)
AS
SELECT time_bucket('1 day', time) AS time,
avg(value) AS avg,
count(*) AS count
FROM data
GROUP BY 1;
ALTER MATERIALIZED VIEW data_daily SET (timescaledb.materialized_only = false);
Run Code Online (Sandbox Code Playgroud)
现在,当我运行时,SELECT * FROM data_daily我得到了预期的结果:
time, avg, count
2020-01-01 00:00:00.000000, 100, …Run Code Online (Sandbox Code Playgroud) 我正在尝试找到一种用于绘制数据的工具(主要是线图等),可用于高性能应用程序.我的数据窗口通常包含500到几千个点,我对帧率为10左右感到满意.我在套接字上以二进制流的形式接收数据.我在Mac OS X上.
我尝试了几种解决方案,并在下面讨论我的经验.
R:非常慢,无法跟上,读取插座很痛苦,图形闪烁.
matplotlib:非常慢但有点可用.但是,它需要运行大量的Python机器,而IMO的API非常不透明.在不断更新的情况下,包含图形的窗口变为模态,并且出现Mac沙滩球 - 对用户交互不太好.
Gnuplot:更好的性能和API.(!),但通信大量的数据来的gnuplot通过生成临时ASCII文件发生-这意味着,如果我的帧率上升,我开始做顿磁盘读取,这是一个性能问题.
还有其他建议吗?
我有一个 Pandas 数据框,我每秒不断地附加一行数据,如下所示。
df.loc[time.strftime("%Y-%m-%d %H:%M:%S")] = [reading1, reading2, reading3]
>>>df
sensor1 sensor2 sensor3
2015-04-14 08:50:23 5.4 5.6 5.7
2015-04-14 08:50:24 5.5 5.6 5.8
2015-04-14 08:50:26 5.2 5.3 5.4
Run Code Online (Sandbox Code Playgroud)
如果我继续这样做,最终我将开始遇到内存问题(每次它都会调用整个 DataFrame)。
我只需要保留 X 行数据。即手术后,它将是:
>>>df
sensor1 sensor2 sensor3
(this row is gone)
2015-04-14 08:50:24 5.5 5.6 5.8
2015-04-14 08:50:26 5.2 5.3 5.4
2015-04-14 08:50:27 5.2 5.4 5.6
Run Code Online (Sandbox Code Playgroud)
有没有一种方法可以指定最大行数,以便在添加任何后续行时,同时删除最旧的行,而无需“检查数据帧的长度,如果数据帧的长度 > X,则删除第一行,追加新行”?
像这样,但对于 Pandas DataFrame:https ://stackoverflow.com/a/10155753/4783578
问题:我们正在尝试使用 AWS 产品 AppSync 制作聊天应用程序,我们希望获得最佳性能,但我们在 AppSync 和 Graphql 中面临实时订阅的问题,在某些情况下,单个用户需要处理数百个订阅我们认为这不是最佳解决方案,您有什么建议?
问题示例:
Mutation{
addMessage(conversation_id=Int!, content:String!) : Message
}
Subscription{
subscribeForNewMessages(convesration_id: Int!):Message
@aws_subscribe(mutations: ["addMessage"])
}
Run Code Online (Sandbox Code Playgroud)
这种设计的问题在于,用户需要调用此订阅并继续收听每个对话,如果对话数量巨大,我们希望这会压倒客户端。
问题 :
Q1:我们正在努力实现的是一个订阅多个(conversation_id),这怎么可能?这些人(https://github.com/apollographql/apollo-client/issues/2633)正在谈论类似的事情,我们对其进行了测试但它不起作用,这是一个有效的解决方案吗?
Q2:关于放大;在同时收听数百个订阅时,放大会表现良好吗?它会合并订阅和 websockets 还是将它们分开处理?
Q3:您对这些设计有何评论?其中将有一个服务,它将为聊天参与者广播(使用客户端 ID 调用突变)消息,并且客户端将仅订阅单个频道。如下所示: src2 :用于聊天应用程序的 AWS AppSync src2 :订阅 AWS AppSync 中的群组/私人聊天列表
real-time-data ×10
python ×2
android ×1
android-wifi ×1
angular ×1
angularfire2 ×1
apache-kafka ×1
apache-spark ×1
apache-storm ×1
aws-appsync ×1
chat ×1
dataframe ×1
django ×1
django-views ×1
facebook ×1
firebase ×1
graphql ×1
java ×1
opencv ×1
pandas ×1
performance ×1
timescaledb ×1
wifi-direct ×1