java - Concurrent JMS messages cause NULL -
i've tested jms messages sending serially , concurrently(5 threads send jms messages concurrently producer).
when send 100 messages concurrently, few messages payload @ receiving end null. when sent serially there no issue.
do need setup session pool or use mdb @ consumer side handle messages concurrently? setup of jms good, because receiving messages. missing here?
short description of pproject setup:
- publisher stateless session bean
- weblogic 8.1 jms server connection factory , destination retrieved through jndi
- consumer java class subscribes server jms queue , performs tasks. (this not mdb or threaded class, listens queue asynchronously)
edited
jmsconsumer package net; import java.io.bufferedwriter; import java.io.filewriter; import java.io.ioexception; import java.io.printwriter; import java.util.date; import java.util.hashtable; import javax.jms.exceptionlistener; import javax.jms.jmsexception; import javax.jms.message; import javax.jms.messagelistener; import javax.jms.queue; import javax.jms.queueconnection; import javax.jms.queueconnectionfactory; import javax.jms.queuereceiver; import javax.jms.queuesession; import javax.jms.session; import javax.jms.textmessage; import javax.naming.context; import javax.naming.initialcontext; public class readjms implements messagelistener, exceptionlistener { public final static string jndi_factory = "weblogic.jndi.wlinitialcontextfactory"; public final static string provider_url = "t3://address:7003"; public final static string jms_factory = "mss.queueconnectionfactory"; public final static string queue = "jms.queue"; @suppresswarnings("null") public void receivemessage() throws exception { // system.out.println("receivemessage().."); hashtable env = new hashtable(); env.put(context.initial_context_factory, jndi_factory); env.put(context.provider_url, provider_url); // define queue queuereceiver qreceiver = null; queuesession qsession = null; queueconnection qcon = null; readjms async = new readjms(); try { initialcontext ctx = new initialcontext(env); queueconnectionfactory qconfactory = (queueconnectionfactory) ctx .lookup(jms_factory); qcon = qconfactory.createqueueconnection(); qcon.setexceptionlistener(async); qsession = qcon.createqueuesession(false, session.auto_acknowledge); queue queue = (queue) ctx.lookup(queue); qreceiver = qsession.createreceiver(queue); qreceiver.setmessagelistener(async); qcon.start(); system.out.println("readingmessage().."); // textmessage msg = (textmessage) qreceiver.receive(); // system.out.println("message read " + queue + " : " // + msg.gettext()); // msg.acknowledge(); } catch (exception ex) { ex.printstacktrace(); } // } { // if (qreceiver != null) // qreceiver.close(); // if (qsession != null) // qsession.close(); // if (qcon != null) // qcon.close(); // } } public static void main(string[] args) throws exception { readjms test = new readjms(); system.out.println("init"); test.receivemessage(); while (true) { thread.sleep(10000); } } public void onexception(jmsexception arg0) { system.err.println("exception: " + arg0.getlocalizedmessage()); } public synchronized void onmessage(message arg0) { try { if(((textmessage)arg0).gettext() == null || ((textmessage)arg0).gettext().trim().length()==0){ system.out.println(" " + queue + " : " + ((textmessage) arg0).gettext()); } system.out.print("."); printwriter out = new printwriter(new bufferedwriter( new filewriter("output.txt", true))); date = new date(); out.println("message: "+now.tostring()+ " - "+((textmessage)arg0).gettext()+""); out.close(); } catch (jmsexception e) { e.printstacktrace(); } catch (ioexception e) { e.printstacktrace(); } } }
writejms
package net; import java.io.bufferedreader; import java.io.inputstreamreader; import java.util.hashtable; import javax.jms.queue; import javax.jms.queueconnection; import javax.jms.queueconnectionfactory; import javax.jms.queuesender; import javax.jms.queuesession; import javax.jms.session; import javax.jms.textmessage; import javax.naming.context; import javax.naming.initialcontext; public class writejms { public final static string jndi_factory = "weblogic.jndi.wlinitialcontextfactory"; public final static string provider_url = "t3://url:7003"; public final static string jms_factory = "mss.queueconnectionfactory"; public final static string queue = "jms.queue"; @suppresswarnings("unchecked") public void sendmessage() throws exception { @suppresswarnings("rawtypes") hashtable env = new hashtable(); env.put(context.initial_context_factory, jndi_factory); env.put(context.provider_url, provider_url); // define queue queuesender qsender = null; queuesession qsession = null; queueconnection qcon = null; try { initialcontext ctx = new initialcontext(env); queueconnectionfactory qconfactory = (queueconnectionfactory) ctx .lookup(jms_factory); qcon = qconfactory.createqueueconnection(); qsession = qcon.createqueuesession(false, session.auto_acknowledge); queue queue = (queue) ctx.lookup(queue); textmessage msg = qsession.createtextmessage(); msg.settext("<eventmessage><eventid>123</eventid><eventname>123</eventname><documentnumber>123</documentnumber><customerid>123</customerid><actddtaskdate>123</actddtaskdate><taskstatuserrormessage>123</taskstatuserrormessage></eventmessage>"); qsender = qsession.createsender(queue); qsender.send(msg); system.out.println("message [" + msg.gettext() + "] sent queue: " + queue); } catch (exception ex) { ex.printstacktrace(); } { if (qsender != null) qsender.close(); if (qsession != null) qsession.close(); if (qcon != null) qcon.close(); } } }
reusing session across multiple threads notoriously forbidden. don't this. create (connection session , producer) anew each message. inefficient, not incorrect , should not cause these errors. code give looks me.
i little surprised no exceptions occur @ sending side. can give more details jms implementation? perhaps there's more information in message broker's log?
have counted messages , number received equal amount sent? else sending messages same queue?
Comments
Post a Comment