Tuesday, November 10, 2009

A simple class that saves and retrieves messages from MSMQ. This allows you to save objects in the message. Just send the object along with the key. Then get by the key and the type of object : Get("myKey",typeof(string}).




001 public class MyMessageQueue
002 {
003 private static MessageQueue mq;
004 private static log4net.ILog log = log4net.LogManager.GetLogger("MyMessageQueue");
005 private static string QUEUE_NAME = @".\Private$\mymessagequeue";
006 private static object lockObject = new object();
007 private static MessageQueue MQ
008 {
009 get
010 {
011 if (mq == null)
012 Init();
013 return mq;
014 }
015 }
016
017 private static void Init()
018 {
019 try
020 {
021 if (!MessageQueue.Exists(QUEUE_NAME))
022 {
023 mq = MessageQueue.Create(QUEUE_NAME);
024 log.Info("Queue created :" + QUEUE_NAME + " path :" + mq.Path);
025 }
026 else
027 {
028 mq = new MessageQueue(QUEUE_NAME);
029 log.Info("Queue connected :" + QUEUE_NAME + " path :" + mq.Path);
030 }
031 }
032 catch (Exception ex)
033 {
034 log.ErrorFormat("myMessageQueue.Init(): {1}", ex.Message);
035 }
036 }
037
038
039
040 public static object Get(string key, Type t)
041 {
042 lock (lockObject)
043 {
044 try
045 {
046 object reply = null;
047 using (MQ)
048 {
049 mq.MessageReadPropertyFilter.CorrelationId = true;
050 using (MessageQueueTransaction trans = new MessageQueueTransaction())
051 {
052 Message msg;
053 if (mq.Transactional)
054 {
055 trans.Begin();
056 msg = mq.ReceiveByCorrelationId(getCoorId(key), trans);
057 }
058 else
059 {
060 msg = mq.ReceiveByCorrelationId(getCoorId(key));
061 // get message given type
062 msg.Formatter = new XmlMessageFormatter(new Type[1] { t });
063 reply = msg.Body;
064 }
065 }
066 }
067 return reply;
068 }
069 catch (Exception ex)
070 {
071 log.ErrorFormat("myMessageQueue.Get(): {1}", ex.Message);
072 return null;
073 }
074 }
075 }
076
077 public static bool Send(string key, Object o)
078 {
079 bool success = false;
080
081 try
082 {
083 Message msg = null;
084 msg = new Message();
085 msg.Body = o;
086 msg.Label = key;
087 msg.Recoverable = true;
088 msg.TimeToBeReceived = new TimeSpan(5,0,0,0);
089 msg.AcknowledgeType = AcknowledgeTypes.PositiveReceive | AcknowledgeTypes.PositiveArrival;
090 msg.CorrelationId = getCoorId(key);
091
092 using (MQ)
093 {
094 using(MessageQueueTransaction trans = new MessageQueueTransaction()){
095 if (mq.Transactional)
096 {
097 trans.Begin();
098 mq.Send(msg, trans);
099 }
100 else
101 {
102 mq.Send(msg);
103 }
104 success = true;
105 }
106 }
107 }
108 catch (Exception ex)
109 {
110 log.ErrorFormat("myMessageQueue.Send(): {1}", ex.Message);
111 }
112 return success;
113 }
114
115 private static string getCoorId(string key)
116 {
117 string rst = new Guid() + @"\"+ key;
118 return rst;
119 }
120 }


No comments: