小编Har*_*i N的帖子

Apache Flink - 使用数据流中的值动态创建流数据源

我正在尝试使用Apache Flink构建一个示例应用程序,它执行以下操作:

  1. 从Kafka队列中读取股票代码流(例如'CSCO','FB').
  2. 对于每个符号,执行当前价格的实时查找并流式传输下游处理的值.

*更新到原始帖子*

我将map函数移动到一个单独的类中,并且没有得到运行时错误消息" MapFunction的实现不再可序列化.该对象可能包含或引用非可序列化字段 ".

我现在面临的问题是,我试图写价格的卡夫卡主题"股票价格"没有收到它们.我正在尝试解决问题并发布任何更新.

public class RetrieveStockPrices { 
    @SuppressWarnings("serial") 
    public static void main(String[] args) throws Exception { 
        final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 

        Properties properties = new Properties(); 
        properties.setProperty("bootstrap.servers", "localhost:9092"); 
        properties.setProperty("zookeeper.connect", "localhost:2181"); 
        properties.setProperty("group.id", "stocks"); 

        DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties)); 

        DataStream<String> stockPrice = 
            streamOfStockSymbols 
            //get unique keys 
            .keyBy(new KeySelector<String, String>() { 
                @Override 
                public String getKey(String trend) throws Exception {
                    return trend; 
                }
                }) 
            //collect events over a window 
            .window(TumblingEventTimeWindows.of(Time.seconds(60))) 
            //return the last event …
Run Code Online (Sandbox Code Playgroud)

apache-flink

5
推荐指数
1
解决办法
1186
查看次数

无法连接到在 Google Compute Engine 虚拟机实例上运行的 HTTP 服务

我有一个在 f1-micro 实例上运行 COS 版本“cos-dev-61-9733-0-0”的 VM 实例。我已经配置了一个外部 IP 地址 146.xxx.xxx.106 并分配给这个实例。我正在尝试从我的本地工作站测试到此实例的 HTTP 连接,但到目前为止没有成功。

我在“防火墙”下启用了“允许 HTTP 流量”和“允许 HTTPS 流量”设置,即使它们在我保存后没有显示选中的框。然而,网络标签的值为“http-server, https-server”,如下所示:

在此处输入图片说明

我还确认“网络 > 防火墙设置”有一个默认的 HTTP 规则,如下所示:

在此处输入图片说明

我发现这里报告了一个类似的问题,但并没有帮助解决我的问题:无法访问 Google Cloud Compute Instance External IP任何有关我缺少的其他设置的建议将不胜感激。我查找了操作系统级别的防火墙设置,但找不到足够的 Chromium OS 文档。

以下是我遵循的步骤:

在 GCE 实例上:

$ sudo python -m SimpleHTTPServer 80

在 0.0.0.0 端口 80 上提供 HTTP ...

$ sudo netstat -antup

活动的 Internet 连接(服务器和已建立)

Proto Recv-Q Send-Q 本地地址外地址
状态PID/程序名称

tcp 0 0 0.0.0.0:22 0.0.0.0:*
听 638/sshd

tcp 0 0 0.0.0.0:5355 …

connectivity google-compute-engine

5
推荐指数
1
解决办法
2270
查看次数