java - Concurrent JMS messages cause NULL -


i've tested jms messages sending serially , concurrently(5 threads send jms messages concurrently producer).

when send 100 messages concurrently, few messages payload @ receiving end null. when sent serially there no issue.

do need setup session pool or use mdb @ consumer side handle messages concurrently? setup of jms good, because receiving messages. missing here?

short description of pproject setup:

  1. publisher stateless session bean
  2. weblogic 8.1 jms server connection factory , destination retrieved through jndi
  3. consumer java class subscribes server jms queue , performs tasks. (this not mdb or threaded class, listens queue asynchronously)

edited

jmsconsumer  package net;  import java.io.bufferedwriter; import java.io.filewriter; import java.io.ioexception; import java.io.printwriter; import java.util.date; import java.util.hashtable;  import javax.jms.exceptionlistener; import javax.jms.jmsexception; import javax.jms.message; import javax.jms.messagelistener; import javax.jms.queue; import javax.jms.queueconnection; import javax.jms.queueconnectionfactory; import javax.jms.queuereceiver; import javax.jms.queuesession; import javax.jms.session; import javax.jms.textmessage; import javax.naming.context; import javax.naming.initialcontext;  public class readjms implements messagelistener, exceptionlistener {      public final static string jndi_factory = "weblogic.jndi.wlinitialcontextfactory";     public final static string provider_url = "t3://address:7003";      public final static string jms_factory = "mss.queueconnectionfactory";     public final static string queue = "jms.queue";      @suppresswarnings("null")     public void receivemessage() throws exception {         // system.out.println("receivemessage()..");         hashtable env = new hashtable();         env.put(context.initial_context_factory, jndi_factory);         env.put(context.provider_url, provider_url);          // define queue         queuereceiver qreceiver = null;         queuesession qsession = null;         queueconnection qcon = null;         readjms async = new readjms();         try {             initialcontext ctx = new initialcontext(env);              queueconnectionfactory qconfactory = (queueconnectionfactory) ctx                     .lookup(jms_factory);             qcon = qconfactory.createqueueconnection();             qcon.setexceptionlistener(async);             qsession = qcon.createqueuesession(false, session.auto_acknowledge);             queue queue = (queue) ctx.lookup(queue);             qreceiver = qsession.createreceiver(queue);             qreceiver.setmessagelistener(async);              qcon.start();             system.out.println("readingmessage()..");             // textmessage msg = (textmessage) qreceiver.receive();             // system.out.println("message read " + queue + " : "             // + msg.gettext());             // msg.acknowledge();         } catch (exception ex) {             ex.printstacktrace();         }         // } {         // if (qreceiver != null)         // qreceiver.close();         // if (qsession != null)         // qsession.close();         // if (qcon != null)         // qcon.close();         // }     }      public static void main(string[] args) throws exception {         readjms test = new readjms();         system.out.println("init");         test.receivemessage();         while (true) {             thread.sleep(10000);         }     }      public void onexception(jmsexception arg0) {         system.err.println("exception: " + arg0.getlocalizedmessage());     }      public synchronized void onmessage(message arg0) {         try {             if(((textmessage)arg0).gettext() == null || ((textmessage)arg0).gettext().trim().length()==0){                 system.out.println(" " + queue + " : "                         + ((textmessage) arg0).gettext());             }             system.out.print(".");             printwriter out = new printwriter(new bufferedwriter(                     new filewriter("output.txt", true)));             date = new date();             out.println("message: "+now.tostring()+ " - "+((textmessage)arg0).gettext()+"");             out.close();          } catch (jmsexception e) {             e.printstacktrace();         } catch (ioexception e) {             e.printstacktrace();         }     } } 

writejms

package net;  import java.io.bufferedreader; import java.io.inputstreamreader; import java.util.hashtable;  import javax.jms.queue; import javax.jms.queueconnection; import javax.jms.queueconnectionfactory; import javax.jms.queuesender; import javax.jms.queuesession; import javax.jms.session; import javax.jms.textmessage; import javax.naming.context; import javax.naming.initialcontext;  public class writejms {     public final static string jndi_factory = "weblogic.jndi.wlinitialcontextfactory";     public final static string provider_url = "t3://url:7003";     public final static string jms_factory = "mss.queueconnectionfactory";     public final static string queue = "jms.queue";      @suppresswarnings("unchecked")     public void sendmessage() throws exception {          @suppresswarnings("rawtypes")         hashtable env = new hashtable();         env.put(context.initial_context_factory, jndi_factory);         env.put(context.provider_url, provider_url);          // define queue         queuesender qsender = null;         queuesession qsession = null;         queueconnection qcon = null;         try {             initialcontext ctx = new initialcontext(env);              queueconnectionfactory qconfactory = (queueconnectionfactory) ctx                     .lookup(jms_factory);             qcon = qconfactory.createqueueconnection();              qsession = qcon.createqueuesession(false, session.auto_acknowledge);             queue queue = (queue) ctx.lookup(queue);              textmessage msg = qsession.createtextmessage();             msg.settext("<eventmessage><eventid>123</eventid><eventname>123</eventname><documentnumber>123</documentnumber><customerid>123</customerid><actddtaskdate>123</actddtaskdate><taskstatuserrormessage>123</taskstatuserrormessage></eventmessage>");             qsender = qsession.createsender(queue);             qsender.send(msg);             system.out.println("message [" + msg.gettext()                     + "] sent queue: " + queue);         } catch (exception ex) {             ex.printstacktrace();         } {             if (qsender != null)                 qsender.close();             if (qsession != null)                 qsession.close();             if (qcon != null)                 qcon.close();         }     }   } 

reusing session across multiple threads notoriously forbidden. don't this. create (connection session , producer) anew each message. inefficient, not incorrect , should not cause these errors. code give looks me.

i little surprised no exceptions occur @ sending side. can give more details jms implementation? perhaps there's more information in message broker's log?

have counted messages , number received equal amount sent? else sending messages same queue?


Comments

Popular posts from this blog

javascript - RequestAnimationFrame not working when exiting fullscreen switching space on Safari -

Python ctypes access violation with const pointer arguments -