小编Swa*_*ati的帖子

Kafka Stream根据json消息中的时间戳键对消息进行排序

我正在用 JSON 消息发布 Kafka,例如:

"UserID":111,"UpdateTime":06-13-2018 12:13:43.200Z,"Comments":2,"Like":10
"UserID":111,"UpdateTime":06-13-2018 12:13:40.200Z,"Comments":0,"Like":6
"UserID":222,"UpdateTime":06-13-2018 12:13:43.200Z,"Comments":1,"Like":10
"UserID":111,"UpdateTime":06-13-2018 12:13:44.600Z,"Comments":3,"Like":12
Run Code Online (Sandbox Code Playgroud)

我想UpdateTime使用 Kafka Streams基于10 秒时间窗口对消息进行排序,并在另一个 Kafka 主题中推回已排序的消息。我创建了一个流,它从输入主题中读取数据,然后我在 UserID 是消息中的键TimeWindowedKStream之后创建groupByKey()(虽然它不需要groupByKey然后排序,但我无法WindowedBy直接获取)。但是我无法UpdateTime进一步在 10 秒窗口中对消息进行排序。我的源代码是:

public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-sorting");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("UnsortedMessages");
        TimeWindowedKStream<String, String> countss = source.groupByKey().windowedBy(TimeWindows.of(10000L)
                 .until(10000L));
        /*
        SORTING CODE
            */
        outputMessage.toStream().to("SortedMessages", …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-streams

6
推荐指数
1
解决办法
3568
查看次数

将字符串日期时间替换为大熊猫数据框中的纪元

我的数据框看起来像:

Time,User,value 2018-03-30T14:18:49.600,U101,23 2018-03-30T14:18:49.800,U102,22 2018-03-30T14:18:50.000,U101,24

我想将字符串时间戳更改为TimeDataFrame列中以毫秒为单位的纪元。我可以通过将值保存在列表列表中并迭代每一行来做到这一点。但是我需要一种有效的方法,以便我可以在 Dataframe 本身中将 String 时间替换为纪元时间。提前致谢。

python python-datetime pandas

3
推荐指数
1
解决办法
3814
查看次数

到纪元的日期时间字符串:pandas 数据框

我的一只熊猫 df 有日期时间字符串列。格式如下:

TimeStamp                 value
11/12/2015 10:07:34 AM   24.5
11/12/2015 10:07:35 AM   55.1
so on
Run Code Online (Sandbox Code Playgroud)

我尝试TimeStamp使用以下方法将列的值转换为纪元:

dataframe['TimeStamp'] = pd.to_datetime(dataframe['TimeStamp']).values.astype(np.int64) // 10 ** 6
Run Code Online (Sandbox Code Playgroud)

将日期时间字符串转换为 unix 时间戳时出现错误。帮助将是非常appriciated 。谢谢。

datetime epoch python-3.x pandas

2
推荐指数
1
解决办法
1828
查看次数