标签: subscriber

ROS图像用户滞后

我有一个滞后的问题,一个rospy订阅者听图像消息.

概述:

我有一个rosbag将图像流式传输到5Hz的/ camera/image_raw.我还有一个image_view节点,用于显示图像以供参考.此image_view以5Hz显示它们.

在我的rospy订户(使用queue = 1初始化)中,我还显示图像(用于比较与image_view节点的延迟时间).订户随后进行了一些繁重的处理.

预期结果:

由于队列大小为1,用户应处理最新帧,同时跳过所有其他帧.一旦完成处理,它应该继续前进到下一个最新的帧.应该没有旧帧排队.这将导致一个波涛汹涌,但不是滞后的视频(低fps,但没有"延迟"wrt rosbag流,如果这是有道理的)

实际结果:

订阅者落后于已发布的流.具体来说,image_view节点以5Hz显示图像,并且订户似乎将所有图像排队并逐个处理它们,而不是仅仅抓取最新图像.延迟也随着时间的推移而增长.当我停止rosbag流时,订户继续处理队列中的图像(即使队列= 1).

请注意,如果我将订户更改为具有非常大的缓冲区大小(如下所示),则会生成预期的行为:

self.subscriber = rospy.Subscriber("/camera/image_raw", Image, self.callback,  queue_size = 1, buff_size=2**24)
Run Code Online (Sandbox Code Playgroud)

但是,这不是一个干净的解决方案.

在以下链接中也报告了此问题,我在其中找到了缓冲区大小解决方案.官方解释假设发布者可能实际上正在放慢速度,但事实并非如此,因为image_view订阅者以5Hz显示图像.

https://github.com/ros/ros_comm/issues/536,罗斯用户不是最新的,http://answers.ros.org/question/50112/unexpected-delay-in-rospy-subscriber/

任何帮助表示赞赏.谢谢!

码:

def callback(self, msg):
    print "Processing frame | Delay:%6.3f" % (rospy.Time.now() - msg.header.stamp).to_sec()
    orig_image = self.bridge.imgmsg_to_cv2(msg, "rgb8")
    if (self.is_image_show_on):
        bgr_image = cv2.cvtColor(orig_image, cv2.COLOR_RGB2BGR)
        cv2.imshow("Image window", bgr_image)
        cv2.waitKey(1)

    result = process(orig_image) #heavy processing task
    print result
Run Code Online (Sandbox Code Playgroud)

python opencv image subscriber ros

5
推荐指数
1
解决办法
1424
查看次数

Rxjava:订阅特定的帖子

我是Rxjava的新手.我有以下代码:

    System.out.println("1: " + Thread.currentThread().getId());
    Observable.create(new rx.Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subcriber) {
            System.out.println("2: " + Thread.currentThread().getId());
            // query database
            String result = ....
            subcriber.onNext(result);
        }

    }).subscribeOn(Schedulers.newThread()).subscribe(countResult -> {
        System.out.println("3: " + Thread.currentThread().getId());
    });
Run Code Online (Sandbox Code Playgroud)

例如,输出将是:

1:50
2:100
3:100

我希望订阅者在ID为50的线程上运行.我该怎么做?

multithreading subscriber rx-java

5
推荐指数
1
解决办法
2315
查看次数

Google pubsub golang 订阅者在闲置几个小时后停止接收新发布的消息

我在google pubsub中创建了一个主题,并在主题内创建了一个订阅,具有以下设置

在此输入图像描述

然后我在 go 中编写了一个拉取器,使用它的Receive来拉取并确认已发布的消息

package main

import (
    ...
)

func main() {
    ctx := context.Background()

    client, err := pubsub.NewClient(ctx, config.C.Project)
    if err != nil {
       // do things with err
    }
    sub := client.Subscription(config.C.PubsubSubscription)
    err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
        msg.Ack()
    })

    if err != context.Canceled {
      logger.Error(fmt.Sprintf("Cancelled: %s", err.Error()))
    }
    if err != nil {
      logger.Error(fmt.Sprintf("Error: %s", err.Error()))
    }
  }
Run Code Online (Sandbox Code Playgroud)

没什么特别的,它工作得很好,但过了一段时间(~闲置 3 小时后),它停止接收新发布的消息,没有错误,什么也没有。我错过了什么吗?

go subscriber google-cloud-pubsub

5
推荐指数
1
解决办法
2824
查看次数

什么是Aweber API变量$ account_id和$ list_id?

你可以在这里查看:https: //labs.aweber.com/docs/code_samples/subs/create

通过api将新订阅者添加到列表的脚本需要这两个部分信息...只有我无法弄清楚这两个变量是什么!我已经击败了我的Aweber Subscriber账户和我的Aweber Labs账户的每个小方面......而且我无法在任何地方找到任何这些变量的参考.我已经向他们提交了一些门票,但还没有得到任何答复.

有人有任何想法吗?我已经尝试了我的帐户名,我的名单,但无济于事!

~~~~~~~~~~~~~~~~~~~~~~~~~~~

好的,我知道了!您可以通过在进行某些api调用后在aweber api中转储一些其他变量来获取这两个变量的值.

首先获取帐户ID:

$account = $aweber->getAccount($accessKey, $accessSecret);
Run Code Online (Sandbox Code Playgroud)

然后是vardump或print_r $帐户.

接下来我们得到列表ID:

$account = $aweber->getAccount($accessKey, $accessSecret);
$list_url = 'https://api.aweber.com/1.0/accounts/<id>/lists';
$lists = $account->loadFromUrl($list_url);
Run Code Online (Sandbox Code Playgroud)

然后是vardump或print_r $列表.

你们都准备好了!我很高兴我把它搞清楚了,它花了足够长的时间.希望这可以节省一些时间.

api subscriber aweber

4
推荐指数
1
解决办法
3193
查看次数

zeromq中的lazy pub/sub,只获取最后一条消息

我试图从示例wuclient/wuserver在zeromq上实现一个懒惰的订阅者.客户端比服务器慢,所以它必须只获取服务器发送的最后一条消息.

到目前为止,我发现这样做的唯一方法是连接/断开客户端,但每个连接当然有不必要的成本,大约3ms:

server.cxx

int main () {
    //  Prepare our context and publisher
    zmq::context_t context (1);
    zmq::socket_t publisher (context, ZMQ_PUB);
    publisher.bind("tcp://*:5556");
    int counter = 0;
    while (1) {
        counter++;

        //  Send message to all subscribers
        zmq::message_t message(20);
        snprintf ((char *) message.data(), 20 ,
                  "%d", counter);
        publisher.send(message);
        std::cout     << counter <<  std::endl;
        usleep(100000);
      }
      return 0;
    }
Run Code Online (Sandbox Code Playgroud)

client.cxx

int main (int argc, char *argv[])
{
  zmq::context_t context (1);
  zmq::socket_t subscriber (context, ZMQ_SUB);
  while(1){

    zmq::message_t update;
    int counter;

    subscriber.connect("tcp://localhost:5556"); // This call …
Run Code Online (Sandbox Code Playgroud)

datareader lazy-evaluation publisher subscriber zeromq

4
推荐指数
1
解决办法
1511
查看次数

Ros订阅者不是最新的

我已经为其中一个图像主题编写了一个ROS订阅者,并且我使用以下命令将缓冲区设置为1:

subscriber =rospy.Subscriber("/camera/rgb/image_mono/compressed",CompressedImage, callback,  queue_size=1)
Run Code Online (Sandbox Code Playgroud)

但是我的订户仍然落后.知道可能导致这种情况的原因吗?我是否正确设置了队列大小?

python opencv image subscriber ros

4
推荐指数
2
解决办法
4021
查看次数

如何在没有任何实体的情况下测试学说 EventListener/Subscriber

我创建了一个 AuditLoggerBundle*,它有一个使用 Doctrine 事件(prePersist、preUpdate 和 preRemove)的服务,以便在 audit_log 表(AuditLog 实体)中创建一个新条目。

该捆绑包与我的其他捆绑包配合良好,但我想对其进行单元测试并对其进行功能测试

问题是,为了对函数进行功能测试AuditLoggerListener,我需要至少有两个可以持久化、更新等的“假”实体。

在这个包中,我不知道如何做到这一点,因为我只有一个 AuditLog 实体,我需要使用两个实体(仅在测试中使用)。

  1. 第一个实体将是“可审计的”(如果我对这个实体进行持久化、更新或删除,我必须在 audit_log 中有一个新条目)。
  2. 第二个将是“不可审计的”(当我对此实体执行持久化、更新或删除时,我不能在 audit_log 表中有新条目)。*
  3. 这两个实体可以与唯一的 EntityClass 相关,但不能是 AuditLog 的实例

这就是我看到持久功能测试的方式:

<?php
$animal = new Animal(); //this is a fake Auditable entity
$animal->setName('toto');
$em = new EntityManager(); //actually I will use the container to get this manager
$em->persist($animal);
$em->flush();

//Here we test that I have a new line in audit_log table with the right informations
Run Code Online (Sandbox Code Playgroud)

所以我的问题是我的包中没有任何 Animal 实体,我只需要这个来测试包,所以它必须只在测试数据库中创建,而不是在生产环境中创建(当我执行 app/console …

php event-listener subscriber symfony doctrine-orm

4
推荐指数
1
解决办法
4773
查看次数

ROS 订阅者回调丢失消息

总结:我有一个节点在 ~300hz 发布消息,但是在另一个节点中订阅主题的回调只在 ~25hz 时被调用。订阅者节点中的 spinOnce 在 ~700hz 被调用,所以我不知道为什么它会丢失消息。

发布者节点:

#include <ros/ros.h>
#include <ros/console.h>
#include <nav_msgs/Odometry.h>

...

int main(int argc, char** argv)
{
    ros::init(argc, argv, "sim_node");
    ros::NodeHandle nh;

    ...

    // Publishers
    tf::TransformBroadcaster tfbr;
    ros::Publisher odomPub = nh.advertise<nav_msgs::Odometry>("pose",10);

   ...

    ros::Rate r(300); // loop rate
    while(ros::ok())
    {
        ...

        // Publish pose and velocity
            ...
        odomPub.publish(msg);

        ros::spinOnce();
        r.sleep();
    }
    
    ros::waitForShutdown();
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

订阅者节点:

#include <ros/ros.h>
#include <ros/console.h>
#include <nav_msgs/Odometry.h>

...

std::mutex mtx1, mtx2;

class DataHandler
{
private:
    ros::NodeHandle nh;
    ros::Publisher odomPub;
    double …
Run Code Online (Sandbox Code Playgroud)

c++ callback subscriber ros

4
推荐指数
1
解决办法
3780
查看次数

无法读取未定义 [Angular2] 的属性“isStopped”

我正在尝试在 Angular 2 中为我的服务使用 observable。

但我收到此错误:

Uncaught TypeError: Cannot read property 'isStopped' of undefined
Run Code Online (Sandbox Code Playgroud)

我的服务先睹为快:

import { Observable } from 'rxjs/Observable';
import { Injectable } from '@angular/core';

@Injectable()

export class Service{

  getList(){
    return new Observable((observer)=>{
      observer.next(result);
    })
  }

}
Run Code Online (Sandbox Code Playgroud)

和实施:

import ...

@Component({...})

export class List implements OnInit {
  list : any[];
  list$ : Observable<Array<any>>;

  constructor(...){
  }

  ngOnInit(){
    this.list$ = this.Service.getList();
    this.list$.subscribe(
      (items) => {
        this.list = items;
        console.log("triggered");
      },
      (error)=>{
        console.error(error);
      },
      ()=>{
        console.log("completed");
      }
    );
  }

}
Run Code Online (Sandbox Code Playgroud)

有没有人有这个错误?我找不到任何相关的东西。 …

javascript subscriber observable angular

3
推荐指数
2
解决办法
3856
查看次数

如何以角4返回一个Observable?

我在authProvider提供程序类中有这个方法:

getUser() {
    return this.afAuth.authState.subscribe(user => {
        return user;
    });
}
Run Code Online (Sandbox Code Playgroud)

我想在不同的课程中订阅它,例如:

this.authProvider.getUser().subscribe(user => console.log(user));
Run Code Online (Sandbox Code Playgroud)

任何想法如何返回方法Observable内部getUser()

subscriber observable typescript angular

3
推荐指数
1
解决办法
1万
查看次数