我正在尝试使用Apache Flink构建一个示例应用程序,它执行以下操作:
*更新到原始帖子*
我将map函数移动到一个单独的类中,并且没有得到运行时错误消息" MapFunction的实现不再可序列化.该对象可能包含或引用非可序列化字段 ".
我现在面临的问题是,我试图写价格的卡夫卡主题"股票价格"没有收到它们.我正在尝试解决问题并发布任何更新.
public class RetrieveStockPrices {
@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "stocks");
DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties));
DataStream<String> stockPrice =
streamOfStockSymbols
//get unique keys
.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String trend) throws Exception {
return trend;
}
})
//collect events over a window
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
//return the last event …Run Code Online (Sandbox Code Playgroud) 我有一个在 f1-micro 实例上运行 COS 版本“cos-dev-61-9733-0-0”的 VM 实例。我已经配置了一个外部 IP 地址 146.xxx.xxx.106 并分配给这个实例。我正在尝试从我的本地工作站测试到此实例的 HTTP 连接,但到目前为止没有成功。
我在“防火墙”下启用了“允许 HTTP 流量”和“允许 HTTPS 流量”设置,即使它们在我保存后没有显示选中的框。然而,网络标签的值为“http-server, https-server”,如下所示:
我还确认“网络 > 防火墙设置”有一个默认的 HTTP 规则,如下所示:
我发现这里报告了一个类似的问题,但并没有帮助解决我的问题:无法访问 Google Cloud Compute Instance External IP。任何有关我缺少的其他设置的建议将不胜感激。我查找了操作系统级别的防火墙设置,但找不到足够的 Chromium OS 文档。
以下是我遵循的步骤:
在 GCE 实例上:
$ sudo python -m SimpleHTTPServer 80
在 0.0.0.0 端口 80 上提供 HTTP ...
$ sudo netstat -antup
活动的 Internet 连接(服务器和已建立)
Proto Recv-Q Send-Q 本地地址外地址
状态PID/程序名称tcp 0 0 0.0.0.0:22 0.0.0.0:*
听 638/sshdtcp 0 0 0.0.0.0:5355 …