我正在开发一个应用程序,它从DAQ读取数据,通过蓝牙传输数据.数据包大小可以改变,采样率(1Hz - 512Hz)也可以改变,并且我能够使用缓冲器循环并从设备读取数据.
我的问题是,当数据包大小和采样率变化时,如何正确处理数据?如何确定缓冲区大小?
目前我只是打开一个套接字,打开一个输入流,然后使用while循环(当套接字打开== true)从流中读取并处理数据(简单解码,而不是一个无关的任务).
例如,数据包中有23个字节,此时我的采样率非常低,为1Hz.我有一个256字节的缓冲区,这意味着它不会在缓冲区的末尾容纳一个完整的数据包,并且我已经编写了代码以便它运行到下一个缓冲区.读取数据后,从缓冲区中获取一个数据包,进行解码并存储,执行下一个数据包等.
我应该如何流式传输,并正确操作数据?最终,我将在512Hz的44个数据包中抓取一些东西,正好在蓝牙传输限制,我希望能够尽可能有效地处理它,并在进程中丢弃数据包时显示错误等.
TL; DR:如何使用缓冲区和/或中断正确地传输数据.
我正在尝试在 python 中重新创建一个函数来估计数据流的二阶矩。
\n\n正如乌尔曼书《海量数据集的挖掘》中所述,第二个时刻:
\n\n\n\n\n是 m_i \xe2\x80\x99s 的平方和。它有时被称为意外数,因为它衡量流中元素分布的不均匀程度。
\n
其中 m_i 元素是流中的单义元素。
\n\n例如,有这个玩具问题\\数据流:
\n\na, b, c, b, d, a, c, d, a, b, d, c, a, a, b\nRun Code Online (Sandbox Code Playgroud)\n\n我们这样计算第二个时刻:
\n\n5^2 + 4^2 + 3^2 + 3^2 = 59\nRun Code Online (Sandbox Code Playgroud)\n\n(因为\'a\'在数据流中出现了5次,\'b\'出现了4次,以此类推)
\n\n由于我们无法将所有数据流存储在内存中,因此我们可以使用一种算法来估计二阶矩:
\n\nAlon -Matias-Szegedy 算法(AMS 算法),使用以下公式估计二阶矩:
\n\nE(n *(2 * X.value \xe2\x88\x92 1))\nRun Code Online (Sandbox Code Playgroud)\n\n其中 X 是流的单义元素,是随机选择的,X.value 是一个计数器,当我们读取流时,每当我们遇到 x 元素从我们选择它时起的另一个出现时,它就会加 1 。
\n\nn表示数据流的长度,“E”是平均值。
\n\n以前面的数据流为例,假设我们在数据流的第 13 个位置选择了“a”,在第 8 个位置选择了“d”,在第 …
我有一个带有 flink 的流处理过程,可以在单个路径中处理 csv 文件。我想知道每个处理文件的文件名。
我目前正在使用此函数将 csv 文件读入路径(dataPath)。
val recs:DataStream[CallCenterEvent] = env
.readFile[CallCenterEvent](
CsvReader.getReaderFormat[CallCenterEvent](dataPath, c._2),
dataPath,
FileProcessingMode.PROCESS_CONTINUOUSLY,
c._2.fileInterval)
.uid("source-%s-%s".format(systemConfig.name, c._1))
.name("%s records reading".format(c._1))
Run Code Online (Sandbox Code Playgroud)
并使用此函数获取 TupleCsvInputFormat。
def getReaderFormat[T <: Product : ClassTag : TypeInformation](dataPath:String, conf:URMConfiguration): TupleCsvInputFormat[T] = {
val typeInfo = implicitly[TypeInformation[T]]
val format: TupleCsvInputFormat[T] = new TupleCsvInputFormat[T](new Path(dataPath), typeInfo.asInstanceOf[CaseClassTypeInfo[T]])
if (conf.quoteCharacter != null && !conf.quoteCharacter.equals(""))
format.enableQuotedStringParsing(conf.quoteCharacter.charAt(0))
format.setFieldDelimiter(conf.fieldDelimiter)
format.setSkipFirstLineAsHeader(conf.ignoreFirstLine)
format.setLenient(true)
return format
}
Run Code Online (Sandbox Code Playgroud)
该过程运行正常,但我找不到获取处理的每个 csv 文件的文件名的方法。
提前致谢
这是我的第二篇文章,现在我已经习惯了这里的功能!这更多是计算机科学的理论问题,但是,我的问题是什么意思?
“解析文本文件或数据流”
这是一项作业,我咨询过的书籍和网络资源已经过时或模糊。我已经在SinglyLinkedList上实现了可序列化的接口,该接口可将文件保存到磁盘或从磁盘加载文件,以便以后可以传输/编辑和访问该文件。这是否足以满足相当模糊的要求?
考虑此问题时要注意的事项:
我会很感激的反馈
java serialization text-parsing data-stream singly-linked-list
好的,所以我知道可以使用广播系统制造一个流来多次收听一个流,但这不是我在这里想要做的。
我也在编辑这个,因为我收到的一个答案目前无法解决我的问题,因此我们将不胜感激。
由于某种原因,我拥有的代码实际上并没有完全删除流,如果重新使用,它会尝试重新收听已经被收听和关闭的同一个流,但没有一个有效(显然)。我没有尝试再次收听相同的流,而是尝试创建一个新的流来收听。(删除并清除原始第一个流中的所有信息)。
原帖继续如下:
我正在使用 DataStream 模板将数据传入和/或传出程序的各个部分,我不完全确定如何纠正这个问题。我确定这是一个愚蠢的 newb 错误,但我还没有使用 DataStreams 来理解为什么会发生这种情况。
现在不要误会我的意思,通过我的程序的一个周期运行得非常好,完全没有问题。但是,一旦我通过程序完成了一个循环,如果我尝试第二次执行,我会收到错误消息:
错误状态:流已被收听。
因此,由此我知道我的程序没有创建新流,而是尝试重新使用原始流,而且我不是 100% 确定如何停止此功能(或者即使我应该停止)。(老实说,我希望完成多个周期的次数很少到零,但我想在这些错误成为问题之前解决它们。)
编辑:要遵循的最小可重复示例
文件 1 (main.dart)
import 'package:flutter/cupertino.dart';
import 'dart:async';
import './page2.dart';
import './stream.dart';
void main() => runApp(MyApp());
DataStream stream = DataStream();
class MyApp extends StatelessWidget {
// This widget is the root of your application.
@override
Widget build(BuildContext context) {
return CupertinoApp(
title: 'Splash Test',
theme: CupertinoThemeData(
primaryColor: Color.fromARGB(255, 0, 0, 255),
),
home: MyHomePage(title: 'Splash Test Home Page'),
);
}
}
class MyHomePage extends …Run Code Online (Sandbox Code Playgroud) 我有一个 CSV 文件,其中包含几分钟内记录的随机传感器的数据。现在我想将该数据从 CSV 文件传输到我的 pyhton 代码,就好像它直接从传感器本身接收数据一样。(代码用于从两个不同的传感器/csv 文件中获取读数并对其求平均值)有人建议使用 Apache Spark 来流式传输数据,但我觉得这对我来说有点太复杂了。可能有更简单的解决方案吗?
当在一定时间内未从流中接收到任何数据时,我该如何使用Flink的DataStream API实现一个操作符来发送事件?
我在批处理中使用 apache flink 一段时间了,但现在我们想将此批处理作业转换为流作业。我遇到的问题是如何运行端到端测试。
它如何在批处理作业中工作
使用批处理时,我们使用 Cucumber 创建端到端测试。
流作业中的问题
我们希望对流作业执行类似的操作,但流作业并未真正完成。
所以:
我们可以在每次测试后等待 5 秒,并假设所有内容都已处理完毕,但这会大大减慢一切速度。
问题:
有哪些方法或最佳实践可以在流式 Flink 作业上运行端到端测试,而不会在 x 秒后强制终止 Flink 作业
integration-testing end-to-end data-stream apache-flink flink-streaming
你如何在数组中找到最长的不同(!)元素行?
我们有一个矩阵我们的任务是在这个矩阵中找到一行,其中是不同元素的最长行
例如0 1 2 3 4 1 2 3应该计数0 1 2 3 4 = 5个元素
我是 AWS 菜鸟,我想弄清楚 Amazon Kinesis Data Stream 和 EventBridge 产品之间的区别。有人可以为不熟悉 AWS 技术堆栈的人解释这一点吗?
amazon-web-services data-stream amazon-kinesis aws-event-bridge
如何将整数值转换为字节数组,然后通过字节流将它们发送到客户端程序,该程序将字节数组转换回整数?
我的节目是乒乓球比赛.一旦运行,它就会创建一个服务器,客户端现在使用对象流通过Internet连接到该服务器.一切都运作良好,但似乎效率不高.我的意思是球在试图通过更新循环保持同步时来回震动.我可能已经松散地编程了,但这是我能想到的最好的.我希望有更多了解这种事情如何运作的人可以帮助我澄清一些事情.
我的问题直截了当.我需要知道更好的方式来更有效地通过互联网发送球位置和球员位置.目前花费的时间太长了.虽然,我可能会以错误的方式更新它.
流的构建方式:
oostream = new ObjectOutputStream(new BufferedOutputStream(socket.getOutputStream()));
oostream.flush();
oistream = new ObjectInputStream(new BufferedInputStream(socket.getInputStream()));
Run Code Online (Sandbox Code Playgroud)
这是玩家2的更新循环:
IntData id = new IntData();
while (running) {
id.ballx = ballx;
id.bally = bally;
id.player2Y = player2Y;
oostream.writeObject(id);
oostream.flush();
Thread.sleep(updaterate);
id = (IntData) oistream.readObject();
player1Y = id.player1Y;
ballx = id.ballx;
bally = id.bally;
}
Run Code Online (Sandbox Code Playgroud)
播放器1是服务器主机.这是玩家1的更新循环:
IntData id = new IntData();
while (running) {
id = (IntData) oistream.readObject();
player2Y = id.player2Y;
ballx = id.ballx;
bally = id.bally;
Thread.sleep(updaterate);
id.ballx …Run Code Online (Sandbox Code Playgroud) 我希望能够使用curl命令保存我返回的数据流.我已经尝试使用cat命令,并使用curl命令管道,但是我做错了.我目前使用的代码是:
cat > file.txt | curl http://datastream.com/data
Run Code Online (Sandbox Code Playgroud)
任何帮助,将不胜感激.
从传感器,有一个看起来像一系列元组的数据流:
sensor: (-0.560303, -0.627686, 0.467468)
sensor: (-0.561829, -0.626160, 0.466125)
sensor: (-0.556091, -0.623352, 0.471497)
sensor: (-0.558411, -0.625977, 0.468811)
sensor: (-0.557312, -0.626587, 0.468262)
sensor: (-0.557800, -0.625854, 0.465820)
sensor: (-0.563599, -0.624512, 0.464722)
sensor: (-0.555847, -0.623230, 0.467163)
sensor: (-0.557861, -0.621033, 0.468811)
sensor: (-0.555420, -0.625061, 0.470520)
sensor: (-0.559082, -0.626221, 0.475891)
sensor: (-0.559814, -0.625977, 0.466309)
sensor: (-0.561768, -0.624756, 0.467163)
sensor: (-0.551941, -0.628906, 0.469055)
sensor: (-0.556946, -0.626465, 0.471313)
sensor: (-0.558533, -0.626038, 0.469421)
sensor: (-0.557922, -0.625061, 0.467285)
sensor: (-0.562622, -0.623657, 0.469971)
sensor: (-0.554443, -0.625977, 0.465759)
sensor: (-0.559265, -0.626282, …Run Code Online (Sandbox Code Playgroud) data-stream ×13
apache-flink ×3
c++ ×2
csv ×2
java ×2
python ×2
algorithm ×1
android ×1
bash ×1
bigdata ×1
bluetooth ×1
byte ×1
c ×1
data-mining ×1
end-to-end ×1
flutter ×1
ios ×1
networking ×1
performance ×1
random ×1
save ×1
scala ×1
streaming ×1
text-parsing ×1
timer ×1