我有一个滞后的问题,一个rospy订阅者听图像消息.
概述:
我有一个rosbag将图像流式传输到5Hz的/ camera/image_raw.我还有一个image_view节点,用于显示图像以供参考.此image_view以5Hz显示它们.
在我的rospy订户(使用queue = 1初始化)中,我还显示图像(用于比较与image_view节点的延迟时间).订户随后进行了一些繁重的处理.
预期结果:
由于队列大小为1,用户应处理最新帧,同时跳过所有其他帧.一旦完成处理,它应该继续前进到下一个最新的帧.应该没有旧帧排队.这将导致一个波涛汹涌,但不是滞后的视频(低fps,但没有"延迟"wrt rosbag流,如果这是有道理的)
实际结果:
订阅者落后于已发布的流.具体来说,image_view节点以5Hz显示图像,并且订户似乎将所有图像排队并逐个处理它们,而不是仅仅抓取最新图像.延迟也随着时间的推移而增长.当我停止rosbag流时,订户继续处理队列中的图像(即使队列= 1).
请注意,如果我将订户更改为具有非常大的缓冲区大小(如下所示),则会生成预期的行为:
self.subscriber = rospy.Subscriber("/camera/image_raw", Image, self.callback, queue_size = 1, buff_size=2**24)
Run Code Online (Sandbox Code Playgroud)
但是,这不是一个干净的解决方案.
在以下链接中也报告了此问题,我在其中找到了缓冲区大小解决方案.官方解释假设发布者可能实际上正在放慢速度,但事实并非如此,因为image_view订阅者以5Hz显示图像.
https://github.com/ros/ros_comm/issues/536,罗斯用户不是最新的,http://answers.ros.org/question/50112/unexpected-delay-in-rospy-subscriber/
任何帮助表示赞赏.谢谢!
码:
def callback(self, msg):
print "Processing frame | Delay:%6.3f" % (rospy.Time.now() - msg.header.stamp).to_sec()
orig_image = self.bridge.imgmsg_to_cv2(msg, "rgb8")
if (self.is_image_show_on):
bgr_image = cv2.cvtColor(orig_image, cv2.COLOR_RGB2BGR)
cv2.imshow("Image window", bgr_image)
cv2.waitKey(1)
result = process(orig_image) #heavy processing task
print result
Run Code Online (Sandbox Code Playgroud) 我是Rxjava的新手.我有以下代码:
System.out.println("1: " + Thread.currentThread().getId());
Observable.create(new rx.Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subcriber) {
System.out.println("2: " + Thread.currentThread().getId());
// query database
String result = ....
subcriber.onNext(result);
}
}).subscribeOn(Schedulers.newThread()).subscribe(countResult -> {
System.out.println("3: " + Thread.currentThread().getId());
});
Run Code Online (Sandbox Code Playgroud)
例如,输出将是:
1:50
2:100
3:100
我希望订阅者在ID为50的线程上运行.我该怎么做?
我在google pubsub中创建了一个主题,并在主题内创建了一个订阅,具有以下设置
然后我在 go 中编写了一个拉取器,使用它的Receive来拉取并确认已发布的消息
package main
import (
...
)
func main() {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, config.C.Project)
if err != nil {
// do things with err
}
sub := client.Subscription(config.C.PubsubSubscription)
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
msg.Ack()
})
if err != context.Canceled {
logger.Error(fmt.Sprintf("Cancelled: %s", err.Error()))
}
if err != nil {
logger.Error(fmt.Sprintf("Error: %s", err.Error()))
}
}
Run Code Online (Sandbox Code Playgroud)
没什么特别的,它工作得很好,但过了一段时间(~闲置 3 小时后),它停止接收新发布的消息,没有错误,什么也没有。我错过了什么吗?
你可以在这里查看:https: //labs.aweber.com/docs/code_samples/subs/create
通过api将新订阅者添加到列表的脚本需要这两个部分信息...只有我无法弄清楚这两个变量是什么!我已经击败了我的Aweber Subscriber账户和我的Aweber Labs账户的每个小方面......而且我无法在任何地方找到任何这些变量的参考.我已经向他们提交了一些门票,但还没有得到任何答复.
有人有任何想法吗?我已经尝试了我的帐户名,我的名单,但无济于事!
~~~~~~~~~~~~~~~~~~~~~~~~~~~
好的,我知道了!您可以通过在进行某些api调用后在aweber api中转储一些其他变量来获取这两个变量的值.
首先获取帐户ID:
$account = $aweber->getAccount($accessKey, $accessSecret);
Run Code Online (Sandbox Code Playgroud)
然后是vardump或print_r $帐户.
接下来我们得到列表ID:
$account = $aweber->getAccount($accessKey, $accessSecret);
$list_url = 'https://api.aweber.com/1.0/accounts/<id>/lists';
$lists = $account->loadFromUrl($list_url);
Run Code Online (Sandbox Code Playgroud)
然后是vardump或print_r $列表.
你们都准备好了!我很高兴我把它搞清楚了,它花了足够长的时间.希望这可以节省一些时间.
我试图从示例wuclient/wuserver在zeromq上实现一个懒惰的订阅者.客户端比服务器慢,所以它必须只获取服务器发送的最后一条消息.
到目前为止,我发现这样做的唯一方法是连接/断开客户端,但每个连接当然有不必要的成本,大约3ms:
server.cxx
int main () {
// Prepare our context and publisher
zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
publisher.bind("tcp://*:5556");
int counter = 0;
while (1) {
counter++;
// Send message to all subscribers
zmq::message_t message(20);
snprintf ((char *) message.data(), 20 ,
"%d", counter);
publisher.send(message);
std::cout << counter << std::endl;
usleep(100000);
}
return 0;
}
Run Code Online (Sandbox Code Playgroud)
client.cxx
int main (int argc, char *argv[])
{
zmq::context_t context (1);
zmq::socket_t subscriber (context, ZMQ_SUB);
while(1){
zmq::message_t update;
int counter;
subscriber.connect("tcp://localhost:5556"); // This call …Run Code Online (Sandbox Code Playgroud) 我已经为其中一个图像主题编写了一个ROS订阅者,并且我使用以下命令将缓冲区设置为1:
subscriber =rospy.Subscriber("/camera/rgb/image_mono/compressed",CompressedImage, callback, queue_size=1)
Run Code Online (Sandbox Code Playgroud)
但是我的订户仍然落后.知道可能导致这种情况的原因吗?我是否正确设置了队列大小?
我创建了一个 AuditLoggerBundle*,它有一个使用 Doctrine 事件(prePersist、preUpdate 和 preRemove)的服务,以便在 audit_log 表(AuditLog 实体)中创建一个新条目。
该捆绑包与我的其他捆绑包配合良好,但我想对其进行单元测试并对其进行功能测试。
问题是,为了对函数进行功能测试AuditLoggerListener,我需要至少有两个可以持久化、更新等的“假”实体。
在这个包中,我不知道如何做到这一点,因为我只有一个 AuditLog 实体,我需要使用两个实体(仅在测试中使用)。
这就是我看到持久功能测试的方式:
<?php
$animal = new Animal(); //this is a fake Auditable entity
$animal->setName('toto');
$em = new EntityManager(); //actually I will use the container to get this manager
$em->persist($animal);
$em->flush();
//Here we test that I have a new line in audit_log table with the right informations
Run Code Online (Sandbox Code Playgroud)
所以我的问题是我的包中没有任何 Animal 实体,我只需要这个来测试包,所以它必须只在测试数据库中创建,而不是在生产环境中创建(当我执行 app/console …
总结:我有一个节点在 ~300hz 发布消息,但是在另一个节点中订阅主题的回调只在 ~25hz 时被调用。订阅者节点中的 spinOnce 在 ~700hz 被调用,所以我不知道为什么它会丢失消息。
发布者节点:
#include <ros/ros.h>
#include <ros/console.h>
#include <nav_msgs/Odometry.h>
...
int main(int argc, char** argv)
{
ros::init(argc, argv, "sim_node");
ros::NodeHandle nh;
...
// Publishers
tf::TransformBroadcaster tfbr;
ros::Publisher odomPub = nh.advertise<nav_msgs::Odometry>("pose",10);
...
ros::Rate r(300); // loop rate
while(ros::ok())
{
...
// Publish pose and velocity
...
odomPub.publish(msg);
ros::spinOnce();
r.sleep();
}
ros::waitForShutdown();
return 0;
}
Run Code Online (Sandbox Code Playgroud)
订阅者节点:
#include <ros/ros.h>
#include <ros/console.h>
#include <nav_msgs/Odometry.h>
...
std::mutex mtx1, mtx2;
class DataHandler
{
private:
ros::NodeHandle nh;
ros::Publisher odomPub;
double …Run Code Online (Sandbox Code Playgroud) 我正在尝试在 Angular 2 中为我的服务使用 observable。
但我收到此错误:
Uncaught TypeError: Cannot read property 'isStopped' of undefined
Run Code Online (Sandbox Code Playgroud)
我的服务先睹为快:
import { Observable } from 'rxjs/Observable';
import { Injectable } from '@angular/core';
@Injectable()
export class Service{
getList(){
return new Observable((observer)=>{
observer.next(result);
})
}
}
Run Code Online (Sandbox Code Playgroud)
和实施:
import ...
@Component({...})
export class List implements OnInit {
list : any[];
list$ : Observable<Array<any>>;
constructor(...){
}
ngOnInit(){
this.list$ = this.Service.getList();
this.list$.subscribe(
(items) => {
this.list = items;
console.log("triggered");
},
(error)=>{
console.error(error);
},
()=>{
console.log("completed");
}
);
}
}
Run Code Online (Sandbox Code Playgroud)
有没有人有这个错误?我找不到任何相关的东西。 …
我在authProvider提供程序类中有这个方法:
getUser() {
return this.afAuth.authState.subscribe(user => {
return user;
});
}
Run Code Online (Sandbox Code Playgroud)
我想在不同的课程中订阅它,例如:
this.authProvider.getUser().subscribe(user => console.log(user));
Run Code Online (Sandbox Code Playgroud)
任何想法如何返回方法Observable内部getUser()?
subscriber ×10
ros ×3
angular ×2
image ×2
observable ×2
opencv ×2
python ×2
api ×1
aweber ×1
c++ ×1
callback ×1
datareader ×1
doctrine-orm ×1
go ×1
javascript ×1
php ×1
publisher ×1
rx-java ×1
symfony ×1
typescript ×1
zeromq ×1