From: ishmal Date: Sat, 2 Dec 2006 22:58:03 +0000 (+0000) Subject: improve streaming X-Git-Url: https://git.tokkee.org/?a=commitdiff_plain;h=96d8c01e94b7fc84ec7072b9218b4d76bd0f480a;p=inkscape.git improve streaming --- diff --git a/src/pedro/pedroutil.cpp b/src/pedro/pedroutil.cpp index 6de3bbfd4..e08bed0a6 100644 --- a/src/pedro/pedroutil.cpp +++ b/src/pedro/pedroutil.cpp @@ -581,6 +581,14 @@ void Md5::append(const DOMString &str) append((unsigned char *)str.c_str(), str.size()); } +/* + * Update context to reflect the concatenation of a single character + */ +void Md5::append(unsigned char ch) +{ + append(&ch, 1); +} + /* * Final wrapup - pad to 64-byte boundary with the bit pattern * 1 0* (64-bit count of bits processed, MSB-first) diff --git a/src/pedro/pedroutil.h b/src/pedro/pedroutil.h index ce372382a..8f1d14cc7 100644 --- a/src/pedro/pedroutil.h +++ b/src/pedro/pedroutil.h @@ -260,6 +260,11 @@ public: */ virtual void init(); + /** + * + */ + virtual void append(unsigned char dataIn); + /** * */ diff --git a/src/pedro/pedroxmpp.cpp b/src/pedro/pedroxmpp.cpp index 8757d0402..408288187 100644 --- a/src/pedro/pedroxmpp.cpp +++ b/src/pedro/pedroxmpp.cpp @@ -409,84 +409,145 @@ public: /** * */ - XmppStream(); + XmppStream() + { reset(); } /** * */ - virtual ~XmppStream(); + XmppStream(const XmppStream &other) + { assign(other); } /** * */ - virtual void reset(); + XmppStream &operator=(const XmppStream &other) + { assign(other); return *this; } /** * */ - virtual int getState(); + virtual ~XmppStream() + {} /** * */ - virtual void setState(int val); + virtual void reset() + { + state = XmppClient::STREAM_AVAILABLE; + seqNr = 0; + messageId = ""; + sourceId = ""; + data.clear(); + } /** * */ - virtual DOMString getStreamId(); + virtual int getState() + { return state; } /** * */ - void setStreamId(const DOMString &val); + virtual void setState(int val) + { state = val; } /** * */ - virtual DOMString getIqId(); + virtual DOMString getStreamId() + { return streamId; } /** * */ - void setIqId(const DOMString &val); + void setStreamId(const DOMString &val) + { streamId = val; } /** * */ - virtual int getSeqNr(); + virtual DOMString getMessageId() + { return messageId; } /** * */ - virtual DOMString getPeerId(); + void setMessageId(const DOMString &val) + { messageId = val; } /** * */ - virtual void setPeerId(const DOMString &val); + virtual int getSeqNr() + { + seqNr++; + if (seqNr >= 65535) + seqNr = 0; + return seqNr; + } /** * */ - int available(); + virtual DOMString getPeerId() + { return sourceId; } /** * */ - void receiveData(std::vector &newData); + virtual void setPeerId(const DOMString &val) + { sourceId = val; } /** * */ - std::vector read(); + int available() + { return data.size(); } + + /** + * + */ + void receiveData(std::vector &newData) + { + std::vector::iterator iter; + for (iter=newData.begin() ; iter!=newData.end() ; iter++) + data.push_back(*iter); + } + + /** + * + */ + std::vector read() + { + if (state != XmppClient::STREAM_OPEN) + { + std::vectordummy; + return dummy; + } + std::vector ret = data; + data.clear(); + return ret; + } private: + void assign(const XmppStream &other) + { + streamId = other.streamId; + messageId = other.messageId; + sourceId = other.sourceId; + state = other.state; + seqNr = other.seqNr; + data = other.data; + } + DOMString streamId; - DOMString iqId; + DOMString messageId; DOMString sourceId; @@ -498,140 +559,8 @@ private: }; -/** - * - */ -XmppStream::XmppStream() -{ - reset(); -} - -/** - * - */ -XmppStream::~XmppStream() -{ - reset(); -} - -/** - * - */ -void XmppStream::reset() -{ - state = XmppClient::STREAM_AVAILABLE; - seqNr = 0; - data.clear(); -} - -/** - * - */ -int XmppStream::getState() -{ - return state; -} - -/** - * - */ -void XmppStream::setState(int val) -{ - state = val; -} - -/** - * - */ -DOMString XmppStream::getStreamId() -{ - return streamId; -} - -/** - * - */ -void XmppStream::setStreamId(const DOMString &val) -{ - streamId = val; -} - -/** - * - */ -DOMString XmppStream::getIqId() -{ - return iqId; -} - - -/** - * - */ -void XmppStream::setIqId(const DOMString &val) -{ - iqId = val; -} - -/** - * Source or destination JID - */ -void XmppStream::setPeerId(const DOMString &val) -{ - sourceId = val; -} - -/** - * Source or destination JID - */ -DOMString XmppStream::getPeerId() -{ - return sourceId; -} - -/** - * Stream packet sequence number - */ -int XmppStream::getSeqNr() -{ - seqNr++; - if (seqNr >= 65535) - seqNr = 0; - return seqNr; -} - -/** - * - */ -int XmppStream::available() -{ - return data.size(); -} -/** - * - */ -void XmppStream::receiveData(std::vector &newData) -{ - std::vector::iterator iter; - for (iter=newData.begin() ; iter!=newData.end() ; iter++) - data.push_back(*iter); -} -/** - * - */ -std::vector XmppStream::read() -{ - if (state != XmppClient::STREAM_OPEN) - { - std::vectordummy; - return dummy; - } - std::vector ret = data; - data.clear(); - return ret; -} @@ -692,6 +621,7 @@ void XmppClient::assign(const XmppClient &other) connected = other.connected; doRegister = other.doRegister; groupChats = other.groupChats; + streamPacket = other.streamPacket; } @@ -701,37 +631,21 @@ void XmppClient::init() msgId = 0; connected = false; doRegister = false; + streamPacket = "message"; - for (int i=0 ; i::iterator iter; + for (iter = outputStreams.begin(); iter!=outputStreams.end() ; iter++) + delete iter->second; + for (iter = inputStreams.begin(); iter!=inputStreams.end() ; iter++) + delete iter->second; + for (iter = fileSends.begin(); iter!=fileSends.end() ; iter++) + delete iter->second; groupChatsClear(); } @@ -1001,37 +915,13 @@ bool XmppClient::processMessage(Element *root) DOMString type = root->getTagAttribute("message", "type"); //####Check for embedded namespaces here - //# IN BAND BYTESTREAMS - DOMString ibbNamespace = "http://jabber.org/protocol/ibb"; - if (root->getTagAttribute("data", "xmlns") == ibbNamespace) - { - DOMString streamId = root->getTagAttribute("data", "sid"); - if (streamId.size() > 0) - { - for (int i=0 ; igetStreamId().c_str(), streamId.c_str()); - if (ins->getStreamId() == streamId) - { - //# We have a winner - if (ins->getState() != STREAM_OPEN) - { - XmppEvent event(XmppEvent::EVENT_ERROR); - event.setFrom(from); - event.setData("received unrequested stream data"); - dispatchXmppEvent(event); - return true; - } - DOMString data = root->getTagValue("data"); - std::vectorbinData = - Base64Decoder::decode(data); - ins->receiveData(binData); - } - } - } - } + //### FILE TRANSFERS + if (processFileMessage(root)) + return true; + + //### STREAMS + if (processInBandByteStreamMessage(root)) + return true; //#### NORMAL MESSAGES @@ -1201,130 +1091,13 @@ bool XmppClient::processIq(Element *root) //printf("###IQ xmlns:%s\n", xmlns.c_str()); //### FILE TRANSFERS - DOMString siNamespace = "http://jabber.org/protocol/si"; - if (root->getTagAttribute("si", "xmlns") == siNamespace) - { - if (type == "set") - { - DOMString streamId = root->getTagAttribute("si", "id"); - DOMString fname = root->getTagAttribute("file", "name"); - DOMString sizeStr = root->getTagAttribute("file", "size"); - DOMString hash = root->getTagAttribute("file", "hash"); - XmppEvent event(XmppEvent::XmppEvent::EVENT_FILE_RECEIVE); - event.setFrom(from); - event.setIqId(id); - event.setStreamId(streamId); - event.setFileName(fname); - event.setFileHash(hash); - event.setFileSize(atol(sizeStr.c_str())); - dispatchXmppEvent(event); - } - else //result - { - //printf("Got result\n"); - for (int i=0 ; igetIqId() == id && - from == outf->getPeerId()) - { - if (type == "error") - { - outf->setState(STREAM_ERROR); - error("user '%s' rejected file", from.c_str()); - return true; - } - else if (type == "result") - { - if (outf->getState() == STREAM_OPENING) - { - XmppEvent event(XmppEvent::XmppEvent::EVENT_FILE_ACCEPTED); - event.setFrom(from); - dispatchXmppEvent(event); - outf->setState(STREAM_OPEN); - } - else if (outf->getState() == STREAM_CLOSING) - { - outf->setState(STREAM_CLOSED); - } - return true; - } - } - } - } + if (processFileMessage(root)) return true; - } - - //### IN-BAND BYTESTREAMS - //### Incoming stream requests - DOMString ibbNamespace = "http://jabber.org/protocol/ibb"; - if (root->getTagAttribute("open", "xmlns") == ibbNamespace) - { - DOMString streamId = root->getTagAttribute("open", "sid"); - XmppEvent event(XmppEvent::XmppEvent::EVENT_STREAM_RECEIVE_INIT); - dispatchXmppEvent(event); - if (streamId.size()>0) - { - for (int i=0 ; igetStreamId() == streamId) - { - ins->setState(STREAM_OPENING); - ins->setIqId(id); - return true; - } - } - } - return true; - } - else if (root->getTagAttribute("close", "xmlns") == ibbNamespace) - { - XmppEvent event(XmppEvent::XmppEvent::EVENT_STREAM_RECEIVE_CLOSE); - dispatchXmppEvent(event); - DOMString streamId = root->getTagAttribute("close", "sid"); - if (streamId.size()>0) - { - for (int i=0 ; igetStreamId() == streamId && - from == ins->getPeerId()) - { - ins->setState(STREAM_CLOSING); - ins->setIqId(id); - return true; - } - } - } + //### STREAMS + if (processInBandByteStreamMessage(root)) return true; - } - //### Responses to outgoing requests - for (int i=0 ; igetIqId() == id) - { - if (type == "error") - { - outs->setState(STREAM_ERROR); - return true; - } - else if (type == "result") - { - if (outs->getState() == STREAM_OPENING) - { - outs->setState(STREAM_OPEN); - } - else if (outs->getState() == STREAM_CLOSING) - { - outs->setState(STREAM_CLOSED); - } - return true; - } - } - } + //###Normal Roster stuff if (root->getTagAttribute("query", "xmlns") == "jabber:iq:roster") @@ -1432,6 +1205,8 @@ bool XmppClient::processIq(Element *root) + + bool XmppClient::receiveAndProcess() { if (!keepGoing) @@ -2957,29 +2732,121 @@ bool XmppClient::groupChatPresence(const DOMString &groupJid, //######################################################################## -/** - * - */ -int XmppClient::outputStreamOpen(const DOMString &destId, - const DOMString &streamIdArg) +bool XmppClient::processInBandByteStreamMessage(Element *root) { - int i; - for (i=0; igetState() == STREAM_AVAILABLE) - break; - if (i>=outputStreamCount) + DOMString from = root->getAttribute("from"); + DOMString id = root->getAttribute("id"); + DOMString type = root->getAttribute("type"); + + //### Incoming stream requests + //Input streams are id's by stream id + DOMString ibbNamespace = "http://jabber.org/protocol/ibb"; + + if (root->getTagAttribute("open", "xmlns") == ibbNamespace) { - error("No available output streams"); - return -1; + DOMString streamId = root->getTagAttribute("open", "sid"); + XmppEvent event(XmppEvent::XmppEvent::EVENT_STREAM_RECEIVE_INIT); + dispatchXmppEvent(event); + std::map::iterator iter = + inputStreams.find(streamId); + if (iter != inputStreams.end()) + { + XmppStream *ins = iter->second; + ins->setState(STREAM_OPENING); + ins->setMessageId(id); + return true; + } + return true; } - int streamNr = i; - XmppStream *outs = outputStreams[streamNr]; - outs->setState(STREAM_OPENING); + else if (root->getTagAttribute("close", "xmlns") == ibbNamespace) + { + XmppEvent event(XmppEvent::XmppEvent::EVENT_STREAM_RECEIVE_CLOSE); + dispatchXmppEvent(event); + DOMString streamId = root->getTagAttribute("close", "sid"); + std::map::iterator iter = + inputStreams.find(streamId); + if (iter != inputStreams.end()) + { + XmppStream *ins = iter->second; + if (from == ins->getPeerId()) + { + ins->setState(STREAM_CLOSING); + ins->setMessageId(id); + return true; + } + } + return true; + } + else if (root->getTagAttribute("data", "xmlns") == ibbNamespace) + { + DOMString streamId = root->getTagAttribute("data", "sid"); + std::map::iterator iter = + inputStreams.find(streamId); + if (iter != inputStreams.end()) + { + XmppStream *ins = iter->second; + if (ins->getState() != STREAM_OPEN) + { + XmppEvent event(XmppEvent::EVENT_ERROR); + event.setFrom(from); + event.setData("received unrequested stream data"); + dispatchXmppEvent(event); + return true; + } + DOMString data = root->getTagValue("data"); + std::vectorbinData = + Base64Decoder::decode(data); + ins->receiveData(binData); + } + } + + //### Responses to outgoing requests + //Output streams are id's by message id + std::map::iterator iter = + outputStreams.find(id); + if (iter != outputStreams.end()) + { + XmppStream *outs = iter->second; + if (type == "error") + { + outs->setState(STREAM_ERROR); + return true; + } + else if (type == "result") + { + if (outs->getState() == STREAM_OPENING) + { + outs->setState(STREAM_OPEN); + } + else if (outs->getState() == STREAM_CLOSING) + { + outs->setState(STREAM_CLOSED); + } + return true; + } + } + + return false; +} + + +/** + * + */ +bool XmppClient::outputStreamOpen(const DOMString &destId, + const DOMString &streamIdArg) +{ char buf[32]; snprintf(buf, 31, "inband%d", getMsgId()); - DOMString iqId = buf; + DOMString messageId = buf; + + //Output streams are id's by message id + XmppStream *outs = new XmppStream(); + outputStreams[messageId] = outs; + + outs->setState(STREAM_OPENING); DOMString streamId = streamIdArg; if (streamId.size()<1) @@ -2987,17 +2854,20 @@ int XmppClient::outputStreamOpen(const DOMString &destId, snprintf(buf, 31, "stream%d", getMsgId()); DOMString streamId = buf; } - outs->setIqId(iqId); + outs->setMessageId(messageId); outs->setStreamId(streamId); outs->setPeerId(destId); + char *fmt = - "" + "<%s type='set' to='%s' id='%s'>" "\n"; + " xmlns='http://jabber.org/protocol/ibb'/>\n"; if (!write(fmt, - destId.c_str(), iqId.c_str(), - streamId.c_str())) + streamPacket.c_str(), + destId.c_str(), messageId.c_str(), + streamId.c_str(), + streamPacket.c_str())) { outs->reset(); return -1; @@ -3012,7 +2882,7 @@ int XmppClient::outputStreamOpen(const DOMString &destId, { printf("ERROR\n"); outs->reset(); - return -1; + return false; } Thread::sleep(1000); state = outs->getState(); @@ -3024,31 +2894,35 @@ int XmppClient::outputStreamOpen(const DOMString &destId, return -1; } - return streamNr; + return true; } /** * */ -int XmppClient::outputStreamWrite(int streamNr, - const unsigned char *buf, unsigned long len) +bool XmppClient::outputStreamWrite(const DOMString &streamId, + const std::vector &buf) { - XmppStream *outs = outputStreams[streamNr]; + std::map::iterator iter = + outputStreams.find(streamId); + if (iter == outputStreams.end()) + return false; + XmppStream *outs = iter->second; - unsigned long outLen = 0; - unsigned char *p = (unsigned char *)buf; + unsigned int len = buf.size(); + unsigned int pos = 0; - while (outLen < len) + while (pos < len) { - unsigned long chunksize = 1024; - if (chunksize + outLen > len) - chunksize = len - outLen; + unsigned int pos2 = pos + 1024; + if (pos2>len) + pos2 = len; Base64Encoder encoder; - encoder.append(p, chunksize); + for (unsigned int i=pos ; ireset(); - return -1; + return false; } pause(5000); + + pos = pos2; } - return outLen; + + return true; } /** * */ -int XmppClient::outputStreamClose(int streamNr) +bool XmppClient::outputStreamClose(const DOMString &streamId) { - XmppStream *outs = outputStreams[streamNr]; + std::map::iterator iter = + outputStreams.find(streamId); + if (iter == outputStreams.end()) + return false; + XmppStream *outs = iter->second; char buf[32]; snprintf(buf, 31, "inband%d", getMsgId()); - DOMString iqId = buf; - outs->setIqId(iqId); + DOMString messageId = buf; + outs->setMessageId(messageId); outs->setState(STREAM_CLOSING); char *fmt = - "" - "\n"; + "<%s type='set' to='%s' id='%s'>" + "\n"; if (!write(fmt, - outs->getPeerId().c_str(), - iqId.c_str(), - outs->getStreamId().c_str())) + streamPacket.c_str(), + outs->getPeerId().c_str(), + messageId.c_str(), + outs->getStreamId().c_str(), + streamPacket.c_str() + )) return false; int state = outs->getState(); @@ -3106,7 +2990,7 @@ int XmppClient::outputStreamClose(int streamNr) { printf("ERROR\n"); outs->reset(); - return -1; + return false; } Thread::sleep(1000); state = outs->getState(); @@ -3115,33 +2999,26 @@ int XmppClient::outputStreamClose(int streamNr) { printf("TIMEOUT ERROR\n"); outs->reset(); - return -1; + return false; } - outs->reset(); - return 1; + delete outs; + outputStreams.erase(streamId); + + return true; } /** * */ -int XmppClient::inputStreamOpen(const DOMString &fromJid, const DOMString &streamId, +bool XmppClient::inputStreamOpen(const DOMString &fromJid, + const DOMString &streamId, const DOMString &iqId) { - int i; - for (i=0 ; igetState() == STREAM_AVAILABLE) - break; - } - if (i>=inputStreamCount) - { - error("No available input streams"); - return -1; - } - int streamNr = i; - XmppStream *ins = inputStreams[streamNr]; + XmppStream *ins = new XmppStream(); + + inputStreams[streamId] = ins; ins->reset(); ins->setPeerId(fromJid); ins->setState(STREAM_CLOSED); @@ -3156,7 +3033,7 @@ int XmppClient::inputStreamOpen(const DOMString &fromJid, const DOMString &strea { printf("ERROR\n"); ins->reset(); - return -1; + return false; } Thread::sleep(1000); state = ins->getState(); @@ -3165,17 +3042,18 @@ int XmppClient::inputStreamOpen(const DOMString &fromJid, const DOMString &strea { printf("TIMEOUT ERROR\n"); ins->reset(); - return -1; + return false; } char *fmt = - "\n"; - if (!write(fmt, fromJid.c_str(), ins->getIqId().c_str())) + "<%s type='result' to='%s' id='%s'/>\n"; + if (!write(fmt, streamPacket.c_str(), + fromJid.c_str(), ins->getMessageId().c_str())) { - return -1; + return false; } ins->setState(STREAM_OPEN); - return streamNr; + return true; } @@ -3183,52 +3061,28 @@ int XmppClient::inputStreamOpen(const DOMString &fromJid, const DOMString &strea /** * */ -int XmppClient::inputStreamAvailable(int streamNr) +bool XmppClient::inputStreamClose(const DOMString &streamId) { - XmppStream *ins = inputStreams[streamNr]; - return ins->available(); -} - -/** - * - */ -std::vector XmppClient::inputStreamRead(int streamNr) -{ - XmppStream *ins = inputStreams[streamNr]; - return ins->read(); -} - -/** - * - */ -bool XmppClient::inputStreamClosing(int streamNr) -{ - XmppStream *ins = inputStreams[streamNr]; - if (ins->getState() == STREAM_CLOSING) - return true; - return false; -} - + std::map::iterator iter = + inputStreams.find(streamId); + if (iter == inputStreams.end()) + return false; + XmppStream *ins = iter->second; -/** - * - */ -int XmppClient::inputStreamClose(int streamNr) -{ - int ret=1; - XmppStream *ins = inputStreams[streamNr]; if (ins->getState() == STREAM_CLOSING) { char *fmt = "\n"; if (!write(fmt, ins->getPeerId().c_str(), - ins->getIqId().c_str())) + ins->getMessageId().c_str())) { - ret = -1; + return false; } } - ins->reset(); - return ret; + inputStreams.erase(streamId); + delete ins; + + return true; } @@ -3241,6 +3095,80 @@ int XmppClient::inputStreamClose(int streamNr) //######################################################################## +bool XmppClient::processFileMessage(Element *root) +{ + DOMString siNamespace = "http://jabber.org/protocol/si"; + if (root->getTagAttribute("si", "xmlns") != siNamespace) + return false; + + + Element *mainElement = root->getFirstChild(); + if (!mainElement) + return false; + + DOMString from = mainElement->getAttribute("from"); + DOMString id = mainElement->getAttribute("id"); + DOMString type = mainElement->getAttribute("type"); + + status("received file message from %s", from.c_str()); + + if (type == "set") + { + DOMString streamId = root->getTagAttribute("si", "id"); + DOMString fname = root->getTagAttribute("file", "name"); + DOMString sizeStr = root->getTagAttribute("file", "size"); + DOMString hash = root->getTagAttribute("file", "hash"); + XmppEvent event(XmppEvent::XmppEvent::EVENT_FILE_RECEIVE); + event.setFrom(from); + event.setIqId(id); + event.setStreamId(streamId); + event.setFileName(fname); + event.setFileHash(hash); + event.setFileSize(atol(sizeStr.c_str())); + dispatchXmppEvent(event); + return true; + } + + //##expecting result or error + //file sends id'd by message id's + std::map::iterator iter = + fileSends.find(id); + if (iter != fileSends.end()) + { + XmppStream *outf = iter->second; + if (from != outf->getPeerId()) + return true; + if (type == "error") + { + outf->setState(STREAM_ERROR); + error("user '%s' rejected file", from.c_str()); + return true; + } + else if (type == "result") + { + if (outf->getState() == STREAM_OPENING) + { + XmppEvent event(XmppEvent::XmppEvent::EVENT_FILE_ACCEPTED); + event.setFrom(from); + dispatchXmppEvent(event); + outf->setState(STREAM_OPEN); + } + else if (outf->getState() == STREAM_CLOSING) + { + outf->setState(STREAM_CLOSED); + } + return true; + } + } + + return true; +} + + + + + + /** * */ @@ -3254,20 +3182,6 @@ bool XmppClient::fileSend(const DOMString &destJidArg, DOMString fileName = fileNameArg; DOMString description = descriptionArg; - int i; - for (i=0; igetState() == STREAM_AVAILABLE) - break; - if (i>=fileSendCount) - { - error("No available file send streams"); - return false; - } - int fileSendNr = i; - XmppStream *outf = fileSends[fileSendNr]; - - outf->setState(STREAM_OPENING); - struct stat finfo; if (stat(fileName.c_str(), &finfo)<0) { @@ -3291,18 +3205,21 @@ bool XmppClient::fileSend(const DOMString &destJidArg, error("cannot open '%s' for sending", fileName.c_str()); return false; } - unsigned char *sendBuf = (unsigned char *)malloc(fileLen+1); - if (!sendBuf) - { - error("cannot cannot allocate send buffer for %s", fileName.c_str()); - return false; - } + std::vector sendBuf; + Md5 md5hash; for (long i=0 ; isetIqId(iqId); + DOMString messageId = buf; + + XmppStream *outf = new XmppStream(); + + outf->setState(STREAM_OPENING); + outf->setMessageId(messageId); + fileSends[messageId] = outf; snprintf(buf, 31, "stream%d", getMsgId()); DOMString streamId = buf; //outf->setStreamId(streamId); - DOMString hash = Md5::hashHex(sendBuf, fileLen); - printf("Hash:%s\n", hash.c_str()); - outf->setPeerId(destJid); char dtgBuf[81]; @@ -3340,7 +3259,7 @@ bool XmppClient::fileSend(const DOMString &destJidArg, strftime(dtgBuf, 80, "%Y-%m-%dT%H:%M:%Sz", timeVal); char *fmt = - "" + "<%s type='set' id='%s' to='%s'>" "" @@ -3351,15 +3270,17 @@ bool XmppClient::fileSend(const DOMString &destJidArg, "" //"" "" - "\n"; - if (!write(fmt, iqId.c_str(), destJid.c_str(), + "\n"; + if (!write(fmt, streamPacket.c_str(), + messageId.c_str(), destJid.c_str(), streamId.c_str(), offeredName.c_str(), fileLen, - hash.c_str(), dtgBuf, description.c_str())) + hash.c_str(), dtgBuf, description.c_str(), + streamPacket.c_str())) { - free(sendBuf); return false; } + int ret = true; int state = outf->getState(); for (int tim=0 ; tim<20 ; tim++) { @@ -3373,7 +3294,7 @@ bool XmppClient::fileSend(const DOMString &destJidArg, { printf("ERROR\n"); outf->reset(); - return false; + ret = false; } Thread::sleep(1000); state = outf->getState(); @@ -3381,30 +3302,27 @@ bool XmppClient::fileSend(const DOMString &destJidArg, if (state != STREAM_OPEN) { printf("TIMEOUT ERROR\n"); - outf->reset(); - return false; + ret = false; } - //free up this reqource - outf->reset(); + //free up this resource + fileSends.erase(messageId); + delete outf; - int streamNr = outputStreamOpen(destJid, streamId); - if (streamNr<0) + if (!outputStreamOpen(destJid, streamId)) { error("cannot open output stream %s", streamId.c_str()); - outf->reset(); return false; } - int ret = outputStreamWrite(streamNr, sendBuf, fileLen); - - if (ret<0) + if (!outputStreamWrite(streamId, sendBuf)) { } - outputStreamClose(streamNr); + if (!outputStreamClose(streamId)) + { + } - free(sendBuf); return true; } @@ -3468,25 +3386,27 @@ bool XmppClient::fileReceive(const DOMString &fromJid, const DOMString &fileHash) { char *fmt = - "" + "<%s type='result' to='%s' id='%s'>" "" "" "" "" "" "http://jabber.org/protocol/ibb" - "\n"; - if (!write(fmt, fromJid.c_str(), iqId.c_str())) + "\n"; + if (!write(fmt, streamPacket.c_str(), + fromJid.c_str(), iqId.c_str(), + streamPacket.c_str())) { return false; } - int streamNr = inputStreamOpen(fromJid, streamId, iqId); - if (streamNr < 0) + if (!inputStreamOpen(fromJid, streamId, iqId)) { return false; } + XmppStream *ins = inputStreams[streamId]; Md5 md5; FILE *f = fopen(fileName.c_str(), "wb"); @@ -3497,14 +3417,14 @@ bool XmppClient::fileReceive(const DOMString &fromJid, while (true) { - if (inputStreamAvailable(streamNr)<1) + if (ins->available()<1) { - if (inputStreamClosing(streamNr)) + if (ins->getState() == STREAM_CLOSING) break; pause(100); continue; } - std::vector ret = inputStreamRead(streamNr); + std::vector ret = ins->read(); std::vector::iterator iter; for (iter=ret.begin() ; iter!=ret.end() ; iter++) { @@ -3514,7 +3434,7 @@ bool XmppClient::fileReceive(const DOMString &fromJid, } } - inputStreamClose(streamNr); + inputStreamClose(streamId); fclose(f); DOMString hash = md5.finishHex(); diff --git a/src/pedro/pedroxmpp.h b/src/pedro/pedroxmpp.h index a2fbd87a2..34b6d0390 100644 --- a/src/pedro/pedroxmpp.h +++ b/src/pedro/pedroxmpp.h @@ -6,7 +6,7 @@ * Authors: * Bob Jamison * - * Copyright (C) 2005 Bob Jamison + * Copyright (C) 2005-2006 Bob Jamison * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -25,6 +25,7 @@ #include #include +#include #include @@ -1027,46 +1028,31 @@ public: /** * */ - virtual int outputStreamOpen(const DOMString &jid, + virtual bool outputStreamOpen(const DOMString &jid, const DOMString &streamId); /** * */ - virtual int outputStreamWrite(int streamId, - const unsigned char *buf, unsigned long len); + virtual bool outputStreamWrite(const DOMString &streamId, + const std::vector &buf); /** * */ - virtual int outputStreamClose(int streamId); + virtual bool outputStreamClose(const DOMString &streamId); /** * */ - virtual int inputStreamOpen(const DOMString &jid, + virtual bool inputStreamOpen(const DOMString &jid, const DOMString &streamId, const DOMString &iqId); /** * */ - virtual int inputStreamAvailable(int streamId); - - /** - * - */ - virtual std::vector inputStreamRead(int streamId); - - /** - * - */ - virtual bool inputStreamClosing(int streamId); - - /** - * - */ - virtual int inputStreamClose(int streamId); + virtual bool inputStreamClose(const DOMString &streamId); //####################### @@ -1166,19 +1152,27 @@ private: std::vectorgroupChats; - static const int outputStreamCount = 16; + //#### Roster + std::vectorroster; - XmppStream *outputStreams[outputStreamCount]; - static const int inputStreamCount = 16; + //#### Streams + + bool processInBandByteStreamMessage(Element *root); + + DOMString streamPacket; - XmppStream *inputStreams[inputStreamCount]; + std::map outputStreams; - static const int fileSendCount = 16; + std::map inputStreams; - XmppStream *fileSends[fileSendCount]; - std::vectorroster; + //#### File send + + bool processFileMessage(Element *root); + + std::map fileSends; + };