i'm trying build stomp library can used connect rabbitmq chrome apps. initial experiments outside chrome worked well. however, i'm unable convert socket code work within chrome apps. can client api code below modified work within chrome apps?
my initial experiments have been successful. i'd love see complete socket api in client side dart libraries use such code directly chrome apps. while there effort provide socket api using js socket capability of chrome not clean 1 in dart.io package available server side code.
here's code. use it, deploy rabbitmq , enable stomp plugin.
import 'dart:io'; import 'dart:async'; void main() { list<string> versions = ["1.2","1.1","1.0"]; string host = "127.0.0.1"; // localhost int port = 61613; // rabbitmq default port socket.connect(host, 61613).then((connection) { string hostpath="/"; string login = "guest"; // rabbitmq default login string passcode = "guest"; // rabbitmq default passcode stomp(connection, versions, hostpath, login, passcode); // stomp connect connection .transform(new stomptransformer()) .listen((frame) { if(frame.headers.containskey("ack")) { ack(connection, frame.headers["ack"]); } dumpstompframe(frame); stdout.write("enter message> "); }, ondone: () { print("done"); }, onerror: (e) { print (e); } ); subscribe(connection, "/queue/a", 1, "client", true); stdin .transform(new stringdecoder()) .transform(new linetransformer()) .listen((line) { send(connection, "/queue/a", line); }); }); } /* * stomp * accept-version:1.0,1.1,2.0 * host:/ * * ^@ */ void stomp(socket connection, list<string> versions, string hostpath, string login, string passcode) { connection.writeln("stomp"); if(versions.length > 0) { connection.writeln("accept-version:${versions.join(',')}"); } connection.writeln("host:$hostpath"); connection.writeln("login:$login"); connection.writeln("passcode:$passcode"); connection.writeln(); connection.add([0x00]); } /* * subscribe * id:0 * destination:/queue/foo * ack:client * * ^@ */ void subscribe(socket connection, string destination, int id, string ack, bool persistent) { connection.writeln("subscribe"); connection.writeln("id:$id"); connection.writeln("destination:$destination"); connection.writeln("ack:$ack"); connection.writeln("persistent:$persistent"); connection.writeln(); connection.add([0x00]); } /* * unsubscribe * id:0 * * ^@ */ void unsubscribe(socket connection, int id) { connection.writeln("unsubscribe"); connection.writeln("id:$id"); connection.writeln(); connection.add([0x00]); } /* * ack * id:12345 * transaction:tx1 * * ^@ */ void ack(socket connection, string id, [string transaction]) { connection.writeln("ack"); connection.writeln("id:$id"); if(?transaction) { connection.writeln("transaction:$transaction"); } connection.writeln(); connection.add([0x00]); } /* * nack * id:12345 * transaction:tx1 * * ^@ */ void nack(socket connection, string id, [string transaction]) { connection.writeln("nack"); connection.writeln("id:$id"); if(?transaction) { connection.writeln("transaction:$transaction"); } connection.writeln(); connection.add([0x00]); } /* * begin * transaction:tx1 * * ^@ */ void begin(socket connection, string transaction) { connection.writeln("begin"); connection.writeln("transaction:$transaction"); connection.writeln(); connection.add([0x00]); } /* * commit * transaction:tx1 * * ^@ */ void commit(socket connection, string transaction) { connection.writeln("commit"); connection.writeln("transaction:$transaction"); connection.writeln(); connection.add([0x00]); } /* * abort * transaction:tx1 * * ^@ */ void abort(socket connection, string transaction) { connection.writeln("abort"); connection.writeln("transaction:$transaction"); connection.writeln(); connection.add([0x00]); } /* * send * destination:/queue/a * content-type:text/plain * * hello queue * ^@ */ void send(socket connection, string queue, string message) { connection.writeln("send"); connection.writeln("destination:$queue"); connection.writeln("content-type:text/plain"); connection.writeln(); connection.write(message); connection.add([0x00]); } class stompserverframe { string frame; map<string, string> headers = new map<string, string>(); list<int> body = new list<int>(); string tostring() { stringbuffer sb = new stringbuffer(); sb.writeln(frame); for(string key in headers.keys) { sb.writeln("$key=${headers[key]}"); } sb.writeln(new string.fromcharcodes(body)); return sb.tostring(); } } void dumpstompframe(stompserverframe frame) { print("begin stomp frame dump"); print(frame.tostring()); print("end stomp frame dump"); } class stomptransformer extends streameventtransformer<list<int>, stompserverframe> { list<string> serverframes = ['connected', 'message', 'receipt', 'error']; string state = 'command'; // 'command', 'headers', 'body' list<int> token = new list<int>(); stompserverframe stompserverframe = new stompserverframe(); stomptransformer() {} int lastvalue = -1; void handledata(list<int> intlist, eventsink<stompserverframe> sink) { for(int b in intlist) { switch(state) { case 'command': if(b == 0x0a) { // done command stompserverframe.frame = new string.fromcharcodes(token); state = 'headers'; token.clear(); } else { token.add(b); } lastvalue = b; break; case 'headers': if(b == 0x0a && lastvalue == 0x0a) { // done headers state = 'body'; token.clear(); lastvalue = -1; } else if(b == 0x0a && lastvalue != 0x0a) { // done header string tokenstring = new string.fromcharcodes(token); list<string> tokenstringparts = tokenstring.split(":"); if(tokenstringparts.length == 2) { stompserverframe.headers.putifabsent(tokenstringparts.elementat(0), () => tokenstringparts.elementat(1)); } else { // possible header format error print("was here $tokenstring"); } token.clear(); lastvalue = b; } else { token.add(b); lastvalue = b; } break; case 'body': if(b == 0x00) { // done body sink.add(stompserverframe); stompserverframe = new stompserverframe(); state = 'command'; token.clear(); lastvalue = -1; } else { stompserverframe.body.add(b); } break; default: break; } } } }
Comments
Post a Comment