标签: real-time-data

Spark集群在更大的输入上失败,适用于小型

我正在玩Spark.它是来自网站的默认预构建分发版(0.7.0),具有默认配置,群集模式,一个工作者(我的本地主机).我阅读了有关安装的文档,一切似乎都很好.

我有一个CSV文件(各种大小,1000到100万行).如果我使用小输入文件(例如1000行)运行我的应用程序,一切都很好,程序在几秒钟内完成并产生预期的输出.但是当我提供更大的文件(100.000行,或100万行)时,执行失败.我试图挖掘日志,但没有多大帮助(它重复整个过程大约9-10次,然后在失败后退出.此外,还有一些与从某些空源获取失败相关的错误).

第一个JavaRDD返回的结果Iterable对我来说是可疑的.如果我返回一个硬编码的单例列表(如res.add("something");返回res;),一切都很好,即使有一百万行.但是,如果我添加我想要的所有键(28个字符串,长度为6-20个字符),则该过程在输入较大时才会失败.问题是,我需要所有这些密钥,这是实际的业务逻辑.

我正在使用Linux amd64,四核,8GB内存.最新的Oracle Java7 JDK.Spark配置:

SPARK_WORKER_MEMORY=4g
SPARK_MEM=3g
SPARK_CLASSPATH=$SPARK_CLASSPATH:/my/super/application.jar
Run Code Online (Sandbox Code Playgroud)

我必须提一下,当我启动该程序时,它说:

13/05/30 11:41:52 WARN spark.Utils: Your hostname, *** resolves to a loopback address: 127.0.1.1; using 192.168.1.157 instead (on interface eth1)
13/05/30 11:41:52 WARN spark.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Run Code Online (Sandbox Code Playgroud)

这是我的计划.它基于JavaWordCount示例,最低限度地修改.

public final class JavaWordCount
{
    public static void main(final String[] args) throws Exception
    {
        final JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
            System.getenv("SPARK_HOME"), new String[] {"....jar" });

        final JavaRDD<String> words = ctx.textFile(args[1], …
Run Code Online (Sandbox Code Playgroud)

java cluster-computing real-time-data apache-spark

11
推荐指数
1
解决办法
7158
查看次数

如何实时流式传输具有django帧的opencv框架?

我正在尝试使用raspberry pi从USB摄像头捕获图像并使用Django框架流式传输我尝试使用StreamingHttpResponse从Opencv2流式传输帧.但是,它只显示1帧而不替换图像.

如何实时替换图像?

这是我的代码.

from django.shortcuts import render
from django.http import HttpResponse,StreamingHttpResponse
import cv2
import time

class VideoCamera(object):
    def __init__(self):
        self.video = cv2.VideoCapture(0)
    def __del__(self):
        self.video.release()

    def get_frame(self):
        ret,image = self.video.read()
        ret,jpeg = cv2.imencode('.jpg',image)
        return jpeg.tobytes()

def gen(camera):
    while True:
        frame = camera.get_frame()
        yield(frame)
        time.sleep(1)

def index(request):
    # response = HttpResponse(gen(VideoCamera())
    return StreamingHttpResponse(gen(VideoCamera()),content_type="image/jpeg")
Run Code Online (Sandbox Code Playgroud)

python django opencv real-time-data django-views

8
推荐指数
1
解决办法
3364
查看次数

AngularFire2:实时数据库:如何获取密钥和值

我使用AngularFire2从Firebase数据库中获取数据(实时).

我做了什么:

  • Firebase数据库

{"class":{"student":{"Tom":"male","Mary":"female","Peter":"male","Laura":"female"},"numberOfStudent":10} }

  • app.component.ts

    import { AngularFireDatabase } from 'angularfire2/database';
    import { Observable } from 'rxjs/Observable';
    
    ...
    export class AppComponent {
    
       class: Observable<any>;
       students: Observable<any[]>;
    
    constructor(private db: AngularFireDatabase) {
       this.class = db.object(‘class’).valueChanges();
       this.students = db.list(‘class/student’).snapshotChanges();
     }
    
    } 
    
    Run Code Online (Sandbox Code Playgroud)
  • app.component.html:

<h2>Class size: {{ (class | async)?.numberOfStudent }}</h2>
<ul>
  <li *ngFor="let i of students | async">
    {{i.key}} : {{i.value}}
  </li>
</ul>
Run Code Online (Sandbox Code Playgroud)

发生了什么:

班级人数:10

汤姆:

玛丽 :

彼得:

劳拉:

它不返回list的值.

任何建议表示赞赏.

real-time-data firebase angularfire2 angular

8
推荐指数
4
解决办法
2万
查看次数

(Twitter)风暴的聚合窗口

我正在玩Storm,我想知道Storm在聚合时指定(如果可能)窗口大小(如果可能).例如,如果我们想在Twitter上找到前一小时的热门话题.我们如何指定螺栓应该每小时返回结果?这是以编程方式在每个螺栓内完成的吗?或者是指定"窗口"的某种方式?

real-time-data real-time-systems apache-storm

7
推荐指数
1
解决办法
4561
查看次数

如何从卡夫卡获得确认

消息被消费或处理后,如何从Kafka获得确认.可能听起来很愚蠢,但有没有办法知道收到确认的消息的开始和结束偏移量?

real-time-data apache-kafka

6
推荐指数
1
解决办法
6285
查看次数

我们可以在Android中连接超过10台具有Wi-Fi热点的设备吗?

我正在通过wifi热点或wifi直接实时进行项目推送,但我无法通过一个热点连接10个以上的设备作为Android给出此限制.但我想连接40-50个设备,所以任何人都可以告诉我如何连接多个设备,这样一条消息就可以在一台设备上实时传送到所有50台设备,而无需任何路由器或其他硬件.

任何帮助,将不胜感激.

android real-time-data android-wifi wifi-direct

6
推荐指数
1
解决办法
8606
查看次数

实时聚合不是最新的

我正在经历实时聚合而不是实时更新。我有什么遗漏的吗?

2.4.2使用当前 docker 镜像的版本的可重现示例timescale/timescaledb:latest-pg12

CREATE TABLE data
(
    time  TIMESTAMPTZ      NOT NULL,
    value DOUBLE PRECISION NOT NULL
);

SELECT create_hypertable('data', 'time', chunk_time_interval => interval '1d');

INSERT INTO data (time, value)
VALUES ('2020-01-01', 100);

CREATE MATERIALIZED VIEW data_daily WITH (timescaledb.continuous)
AS
SELECT time_bucket('1 day', time) AS time,
       avg(value)                 AS avg,
       count(*)                   AS count
FROM data
GROUP BY 1;
ALTER MATERIALIZED VIEW data_daily SET (timescaledb.materialized_only = false);
Run Code Online (Sandbox Code Playgroud)

现在,当我运行时,SELECT * FROM data_daily我得到了预期的结果:

time, avg, count
2020-01-01 00:00:00.000000, 100, …
Run Code Online (Sandbox Code Playgroud)

real-time-data timescaledb continuous-aggregates

6
推荐指数
1
解决办法
700
查看次数

高性能实时数据显示

我正在尝试找到一种用于绘制数据的工具(主要是线图等),可用于高性能应用程序.我的数据窗口通常包含500到几千个点,我对帧率为10左右感到满意.我在套接字上以二进制流的形式接收数据.我在Mac OS X上.

我尝试了几种解决方案,并在下面讨论我的经验.

R:非常慢,无法跟上,读取插座很痛苦,图形闪烁.

matplotlib:非常慢但有点可用.但是,它需要运行大量的Python机器,而IMO的API非常不透明.在不断更新的情况下,包含图形的窗口变为模态,并且出现Mac沙滩球 - 对用户交互不太好.

Gnuplot:更好的性能和API.(!),但通信大量的数据来的gnuplot通过生成临时ASCII文件发生-这意味着,如果我的帧率上升,我开始做顿磁盘读取,这是一个性能问题.

还有其他建议吗?

performance visualization real-time-data

5
推荐指数
1
解决办法
5609
查看次数

如何指定 Pandas 数据框的行数?

我有一个 Pandas 数据框,我每秒不断地附加一行数据,如下所示。

df.loc[time.strftime("%Y-%m-%d %H:%M:%S")] = [reading1, reading2, reading3]
>>>df
                     sensor1 sensor2 sensor3
2015-04-14 08:50:23    5.4     5.6     5.7
2015-04-14 08:50:24    5.5     5.6     5.8
2015-04-14 08:50:26    5.2     5.3     5.4
Run Code Online (Sandbox Code Playgroud)

如果我继续这样做,最终我将开始遇到内存问题(每次它都会调用整个 DataFrame)。

我只需要保留 X 行数据。即手术后,它将是:

>>>df
                     sensor1 sensor2 sensor3
(this row is gone)
2015-04-14 08:50:24    5.5     5.6     5.8
2015-04-14 08:50:26    5.2     5.3     5.4
2015-04-14 08:50:27    5.2     5.4     5.6
Run Code Online (Sandbox Code Playgroud)

有没有一种方法可以指定最大行数,以便在添加任何后续行时,同时删除最旧的行,而无需“检查数据帧的长度,如果数据帧的长度 > X,则删除第一行,追加新行”?

像这样,但对于 Pandas DataFrame:https ://stackoverflow.com/a/10155753/4783578

python data-analysis real-time-data dataframe pandas

5
推荐指数
1
解决办法
2万
查看次数

AppSync/Graphql 多个订阅还是多个 id 的一个订阅?

问题:我们正在尝试使用 AWS 产品 AppSync 制作聊天应用程序,我们希望获得最佳性能,但我们在 AppSync 和 Graphql 中面临实时订阅的问题,在某些情况下,单个用户需要处理数百个订阅我们认为这不是最佳解决方案,您有什么建议?

问题示例:

Mutation{
    addMessage(conversation_id=Int!, content:String!) : Message
}
Subscription{
   subscribeForNewMessages(convesration_id: Int!):Message
        @aws_subscribe(mutations: ["addMessage"])
}
Run Code Online (Sandbox Code Playgroud)

这种设计的问题在于,用户需要调用此订阅并继续收听每个对话,如果对话数量巨大,我们希望这会压倒客户端。

问题 :

Q1:我们正在努力实现的是一个订阅多个(conversation_id),这怎么可能?这些人(https://github.com/apollographql/apollo-client/issues/2633)正在谈论类似的事情,我们对其进行了测试但它不起作用,这是一个有效的解决方案吗?

Q2:关于放大;在同时收听数百个订阅时,放大会表现良好吗?它会合并订阅和 websockets 还是将它们分开处理?

Q3:您对这些设计有何评论?其中将有一个服务,它将为聊天参与者广播(使用客户端 ID 调用突变)消息,并且客户端将仅订阅单个频道。如下所示: src2 :用于聊天应用程序的 AWS AppSync src2 :订阅 AWS AppSync 中的群组/私人聊天列表

facebook chat real-time-data graphql aws-appsync

5
推荐指数
1
解决办法
2719
查看次数