Java Interop - Netty + Clojure

Ari*_*Ari 6 clojure netty

我正试图通过clojure使用netty.我能够启动服务器,但是它无法初始化接受的套接字.以下是错误消息和代码.有谁知道什么是/或可能是错的?我相信问题在于(Channels/pipeline (server-handler))谢谢.

错误信息

#<NioServerSocketChannel [id: 0x01c888d9, /0.0.0.0:843]>
Jun 6, 2012 12:15:35 PM org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink
WARNING: Failed to initialize an accepted socket.
java.lang.IllegalArgumentException: No matching method found: pipeline
Run Code Online (Sandbox Code Playgroud)

project.clj

(defproject protocol "1.0.0-SNAPSHOT"
  :description "Upload Protocol Server"
  :dependencies [
    [org.clojure/clojure "1.2.1"]
    [io.netty/netty "3.4.5.Final"]])
Run Code Online (Sandbox Code Playgroud)

core.clj

(ns protocol.core
    (:import (java.net InetSocketAddress)
             (java.util.concurrent Executors)
             (org.jboss.netty.bootstrap ServerBootstrap)
             (org.jboss.netty.channel Channels ChannelPipelineFactory SimpleChannelHandler)
             (org.jboss.netty.channel.socket.nio NioServerSocketChannelFactory)
             (org.jboss.netty.buffer ChannelBuffers)))

(def policy
    "<content>Test</content>")


(defn server-handler
    "Returns netty handler."
    []
    (proxy [SimpleChannelHandler] []
        (messageReceived [ctx e]
            (let [ch (.getChannel e)]
                (.write ch policy)
                (.close ch)))

        (channelConnected [ctx e]
            (let [ch (.getChannel e)]
                (.write ch policy)
                (.close ch)))

        (exceptionCaught [ctx e]
            (let [ex (.getCause e)]
                (println "Exception" ex)
                (-> e .getChannel .close)))))

(defn setup-pipeline
    "Returns channel pipeline."
    []
    (proxy [ChannelPipelineFactory] []
        (getPipeline []
            (Channels/pipeline (server-handler)))))

(defn startup
    "Starts netty server."
    [port]
    (let [channel-factory (NioServerSocketChannelFactory. (Executors/newCachedThreadPool) (Executors/newCachedThreadPool))
          bootstrap (ServerBootstrap. channel-factory)]
        (.setPipelineFactory bootstrap (setup-pipeline))
        (.setOption bootstrap "child.tcpNoDelay" true)
        (.setOption bootstrap "child.keepAlive" true)
        (.bind bootstrap (InetSocketAddress. port))))
Run Code Online (Sandbox Code Playgroud)

Jes*_*jan 6

您的代码有三个问题

  1. Java与vararg Channels.channel()方法互操作.你可以制作一个通道处理程序的矢量并用它包装(into-array ChannelHandler ..)

  2. 您不能将String对象直接写入Netty Channel.你必须先将字符串写入ChannelBuffer并编写该缓冲区或使用StringCodecHandler.

  3. 写入Netty通道是异步的,因此您无法立即关闭它.你必须注册一个未来的监听器并在完成后关闭它.

这是工作代码.

  (ns clj-netty.core
   (:import (java.net InetSocketAddress)
            (java.util.concurrent Executors)
            (org.jboss.netty.bootstrap ServerBootstrap)
            (org.jboss.netty.buffer ChannelBuffers)
            (org.jboss.netty.channel Channels ChannelFutureListener ChannelHandler ChannelPipelineFactory SimpleChannelHandler)
           (org.jboss.netty.channel.socket.nio NioServerSocketChannelFactory)
           (org.jboss.netty.buffer ChannelBuffers)))


(def policy
  (ChannelBuffers/copiedBuffer
    (.getBytes "<content>Test</content>")))


(defn server-handler
  "Returns netty handler."
  []
  (proxy [SimpleChannelHandler] []
    (messageReceived [ctx e]
      (let [ch (.getChannel e)]
        (.addListener
          (.write ch policy)
          (ChannelFutureListener/CLOSE))))

    (channelConnected [ctx e]
      (let [ch (.getChannel e)]
        (.addListener
          (.write ch policy)
          (ChannelFutureListener/CLOSE))))

    (exceptionCaught [ctx e]
      (let [ex (.getCause e)]
        (println "Exception" ex)
        (-> e .getChannel .close)))))

(defn setup-pipeline
  "Returns channel pipeline."
  []
  (proxy [ChannelPipelineFactory] []
    (getPipeline []
      (let [handler (server-handler)]
        (Channels/pipeline (into-array ChannelHandler [handler]))))))



 (defn startup
      "Starts netty server."
      [port]
      (let [channel-factory (NioServerSocketChannelFactory. (Executors/newCachedThreadPool) (Executors/newCachedThreadPool))
            bootstrap (ServerBootstrap. channel-factory)]
        (.setPipelineFactory bootstrap (setup-pipeline))
        (.setOption bootstrap "child.tcpNoDelay" true)
        (.setOption bootstrap "child.keepAlive" true)
        (.bind bootstrap (InetSocketAddress. port))))
Run Code Online (Sandbox Code Playgroud)

看看Aleph(也使用Netty),它可以用很多Clojure API在许多不同的协议中构建客户端和服务器.