使用flex套接字读取AMF对象

Syl*_*ain 5 java sockets apache-flex amf

我目前正在尝试使用套接字和AMF序列化对象在java和flex之间进行通信.

在java方面,我使用BlazeDS的Amf3Input和Amf3Output(flex-messaging-common.jar和flex-messaging-core.jar).

连接已正确建立,如果我尝试将对象从flex发送到java,我可以轻松读取对象:

FLEX方面:

protected function button2_clickHandler(event:MouseEvent):void
{
    var tmp:FlexAck = new FlexAck;
    tmp.id="123456789123456789123456789";
    tmp.name="A";
    tmp.source="Aaaaaa";
    tmp.ackGroup=false;
    s.writeObject(tmp);
    s.flush();
}
Run Code Online (Sandbox Code Playgroud)

JAVA方面:

ServerSocket servSoc = new ServerSocket(8888);
Socket s = servSoc.accept();
Amf3Output amf3Output = new Amf3Output(SerializationContext.getSerializationContext());
amf3Output.setOutputStream(s.getOutputStream());
Amf3Input amf3Input = new Amf3Input(SerializationContext.getSerializationContext());
amf3Input.setInputStream(s.getInputStream());
while(true)
{
    try
    {
      Object obj = amf3Input.readObject();
      if(obj!=null){
          if (obj instanceof AckOrder){
          System.out.println(((AckOrder)obj).getId());
      }
      }                 
}
catch (Exception e)
{
      e.printStackTrace();
  break;
}
  }
  amf3Output.close();
  amf3Input.close();
  servSoc.close();
Run Code Online (Sandbox Code Playgroud)

通过这种方式它可以很好地工作,但问题是读取从java端发送的对象.

我在java中使用的代码是:

for(int i=0;i<10;i++){
    ack = new AckOrder(i,"A","B", true);
    amf3Output.writeObject(ack);
    amf3Output.writeObjectEnd();
    amf3Output.flush();
}
Run Code Online (Sandbox Code Playgroud)

我在ProgressEvent.SOCKET_DATA上有一个处理程序:

trace((s.readObject()as FlexAck).id);

但我有错误,如:错误#2030:检测到文件结束错误#2006:索引超出范围

如果我在ByteArrays上添加操作,我设法读取第一个对象,但不是以下内容.

s.readBytes(tmp,tmp.length);
content = clone(tmp);
(content.readObject());
trace("########################## OK OBJECT RECEIVED");
var ack:FlexAck = (tmp.readObject() as FlexAck); 
trace("**********************> id = "+ack.id);
Run Code Online (Sandbox Code Playgroud)

我花了很多时间试图在几个论坛等中找到一些东西,但没有任何帮助.

所以,如果有人能帮助我,那就太好了.

谢谢

西尔

编辑:

这是一个我认为应该工作的例子,但我不希望它能更好地说明我的目标(永久连接套接字和消息交换).

Java类:

import java.net.ServerSocket;
import java.net.Socket;
import awl.oscare.protocol.AckOrder;
import flex.messaging.io.SerializationContext;
import flex.messaging.io.amf.Amf3Input;
import flex.messaging.io.amf.Amf3Output;


public class Main {
public static void main(String[] args) {
        while(true)
        {
        try {
        ServerSocket servSoc = new ServerSocket(8888);
        Socket s = servSoc.accept();
        System.out.println("connection accepted");
        Amf3Output amf3Output = new Amf3Output(SerializationContext.getSerializationContext());
        amf3Output.setOutputStream(s.getOutputStream());
        Amf3Input amf3Input = new Amf3Input(SerializationContext.getSerializationContext());
        amf3Input.setInputStream(s.getInputStream());
        while(true)
        {
            try
            {
                System.out.println("Reading object");
                Object obj = amf3Input.readObject();
                if(obj!=null)
                {
                    System.out.println(obj.getClass());
                    if (obj instanceof AckOrder)
                    {
                        AckOrder order = new AckOrder();
                          order.setId(((AckOrder)obj).getId());
order.setName(((AckOrder)obj).getName());
                          order.setSource(((AckOrder)obj).getSource());
                        order.setAckGroup(((AckOrder)obj).isAckGroup());
                          System.out.println(((AckOrder)obj).getId());
                        amf3Output.writeObject(order);
                        amf3Output.writeObjectEnd();
                        amf3Output.flush();
                    }
                }
            }
            catch (Exception e)
            {
                e.printStackTrace();
                break;
            }
        }
        amf3Output.close();
        amf3Input.close();
        servSoc.close();
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    }

}
}
Run Code Online (Sandbox Code Playgroud)

Java Serializable对象:

package protocol;

import java.io.Serializable;

public class AckOrder implements Serializable {
  private static final long serialVersionUID = 5106528318894546695L;
  private String id;
private String name;
private String source;
private boolean ackGroup = false;

public String getId() {
    return this.id;
}

public void setId(String id) {
    this.id = id;
}

public String getName() {
    return this.name;
}

public void setName(String name) {
    this.name = name;
}

public void setSource(String source) {
    this.source = source;
}

public String getSource() {
    return this.source;
}

public void setAckGroup(boolean ackGroup) {
    this.ackGroup = ackGroup;
}

public boolean isAckGroup() {
    return this.ackGroup;
}

public AckOrder()
{
    super();
}
}
Run Code Online (Sandbox Code Playgroud)

Flex侧:

主要灵活代码:

<fx:Script>
    <![CDATA[
        import mx.collections.ArrayCollection;
        import mx.controls.Alert;
        import mx.events.FlexEvent;
        import mx.utils.object_proxy;


        private var _socket:Socket = new Socket();;

        private function onCreationComplete():void
        {
            this._socket.connect("localhost",8888);
            this._socket.addEventListener(ProgressEvent.SOCKET_DATA, onData);
        }

        private function onData(e:ProgressEvent):void
        {

            if(this._socket.bytesAvailable)
            {
                this._socket.endian = Endian.LITTLE_ENDIAN;
                var objects:Array = [];
                try{
                    while(this._socket.bytesAvailable > 0)
                    {
                        objects.push(this._socket.readObject());
                    }
                }catch(e:Error){trace(e.message);}
                    trace("|"+(objects)+"|");
            }

        }

        protected function sendButton_clickHandler(event:MouseEvent):void
        {
            var tmp:FlexAck = new FlexAck;
            tmp.id="1";
            tmp.name="A";
            tmp.source="B";
            tmp.ackGroup=false;
            this._socket.writeObject(tmp);
            this._socket.flush();
        }


    ]]>
</fx:Script>
<s:Button x="0" y="0" name="send" label="Send" click="sendButton_clickHandler(event)"/>
Run Code Online (Sandbox Code Playgroud)

Flex可序列化对象:

package
{

[Bindable]
[RemoteClass(alias="protocol.AckOrder")] 
public class FlexAck
{
    public function FlexAck()
    {
    }

    public var id:String;
    public var name:String;
    public var source:String;
    public var ackGroup:Boolean;

}
}
Run Code Online (Sandbox Code Playgroud)

编辑25/05/2011:

我在我的flex代码中添加了那些监听器:

this._socket.addEventListener(Event.ACTIVATE,onActivate);
                this._socket.addEventListener(Event.CLOSE,onClose);
                this._socket.addEventListener(Event.CONNECT,onConnect);
                this._socket.addEventListener(Event.DEACTIVATE,onDeactivate);
                this._socket.addEventListener(IOErrorEvent.IO_ERROR,onIOerror);
            this._socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR,onSecurityError);
Run Code Online (Sandbox Code Playgroud)

但是没有错误,我仍然无法正确接收对象.

J_A*_*A_X 0

看完代码,我想你在Java端要做的事情是这样的:

for(int i=0;i<10;i++){
    ack = new AckOrder(i,"A","B", true);
    amf3Output.writeObject(ack);
}
amf3Output.flush();
Run Code Online (Sandbox Code Playgroud)

当您执行“刷新”时,您将通过套接字发送信息,因此一次只能发送一个对象。在 Flex 端,您应该始终尝试查看对象的长度,并确保您不会越过它,否则会导致此错误。

编辑:

private var _socket:Socket = new Socket();

private function onCreationComplete():void
{
    // Add connection socket info here
    this._socket.addEventListener(ProgressEvent.SOCKET_DATA, onData);
}

// This gets called every time we get new info, as in after the server flushes
private function onData(e:ProgressEvent):void
{
    if(this._socket.bytesAvailable)
    {
        this._socket.endian = Endian.LITTLE_ENDIAN; // Might not be needed, but often is
        // Try to get objects
        var objects:Array = [];
        try{
            while(this._socket.bytesAvailable > 0)
            {
                objects.push(this._socket.readObject());
            }
        }catch(e:Error){}
        // Do something with objects array
    }
}
Run Code Online (Sandbox Code Playgroud)

由于一切都是异步的,因此 onData 函数会不断被调用(每次服务器发送信息时)。