Пример работы Cache JMS Gateway в случае Sonic MQ

Интро

Зачем связываться с JMS (Java Messaging Service)?  Причины:

 

  • Способ асинхронного взаимодействия с джава приложениями
  • Интеграция с готовыми системами, которые в качестве стандартных для себя способа  взаимодействия имеют   обмен сообщениями по JMS
  • Сбор и анализ событий от  джава приложений для накопления их в Ensemble BAM, а в дальнейшем и в InterSystems DeepSee
  • Расширение интеграционных возможностей Ensemble к примеру с помощью open source реализации JMS, ActiveMQ, пример работы с которой приведу немного позже

Сценарий

Работаем на двух машинах – oleo и vmx. Брокер Sonic MQ  работает на  oleo. Ensemble 2008.1  и файлы нашего проекта – на vmx.

Из Cache посылается сообщение в  JMS topic (тема). Подписчики потребляют (consume) сообщения от издателей (publishers). Издателями являются java приложения  и объекты Cache.

Привел английские и русские варианты, так как в JMS в частности и в messaging в общем употребляются устойчивые английские термины, удачных русских переводов нет.

Описание

1. Стартуем Sonic MQ broker (если необходимо,  создаем пользователей в  Sonic MQ Management Console (startmc) – дефолтовый аккаунт в сонике user: Administrator, password:Administrator, в тестах используется еще пользователь  SALES:password)

 Пример старта:

C:\Sonic\75\MQ7.5\bin>startcontainer.bat
[08/09/17 12:20:50] (info) Open container boot file "C:\Sonic\75\MQ7.5\container.xml"
[08/09/17 12:20:51] (info) Open Directory Service boot file "C:\Sonic\75\MQ7.5\ds.xml"
[08/09/17 12:21:43] (info) Management connection (re)established (Socket[addr=oleo/10.10.10.11,port=2506,localport=2612])

....
[08/09/17 12:21:05] (info) The container working directory is "C:\Sonic\75\MQ7.5"
[08/09/17 12:21:07] (config)

        Sonic Management
        Release 7.5.0 Build Number 327
        Copyright (c) 1999-2006, Sonic Software Corporation.
        All rights reserved.
....
[08/09/17 12:21:43] ID=MgmtBroker (info) TCP_ACCEPTOR: accepting connections on tcp://oleo:2506
[08/09/17 12:21:43] ID=MgmtBroker (info) SonicMQ Broker started
[08/09/17 12:21:43] (info) ...startup complete

2. Стартуем пример на джаве, который идет вместе с Sonic MQ

C:\Sonic\75\MQ7.5\samples\TopicPubSub\Chat>..\..\SonicMQ Chat -u Administrator -p Administrator (or -u SALES)

3. Стартуем пример на джаве и на  vmx (тут и далее используем jdk 1.6)

run_sonic_client.bat: 
java -cp ../lib/jms.jar;../src/java;../lib/broker.jar;../lib/mfcontext.jar;../lib/sonic_Client.jar  Chat -b oleo:2506 -u SALES

4. Стартуем JMS Gateway

run_jms_gateway.bat:

java -classpath ../lib/cachedb_jdk16.jar;../lib/jms.jar;../lib/sonic_Client.jar;../lib/broker.jar;../lib/mfcontext.jar com.intersys.gateway.JMSGateway 9700 ../log/jms_gateway.log 

5. Импортируем необходимые класссы Java в Сache

d ##class(samples.jms.Installer).importSonicJMS()
samples.jms.Installer:
Class samples.jms.Installer Extends %Net.Remote.ImportHelper
{
ClassMethod importSonicJMS(classPathString As %String = "C:/projects/isc/jms/lib/broker.jar;C:/projects/isc/jms/lib/sonic_Client.jar;C:/projects/isc/jms/lib/mfcontext.jar", port As %Integer = 9700, host As %String = "127.0.0.1") [ Final ]
{
            s exclusions=##class(%ListOfDataTypes).%New()  
            d exclusions.Insert("java.awt")
        s classPath=##class(%ListOfDataTypes).%New()
            d classPath.Insert(classPathString)

            d ..Import("progress.message.jclient.ConnectionFactory",classPath,port,host,exclusions)
            d ..Import("progress.message.jclient.TopicConnectionFactory",classPath,port,host,exclusions)      
            d ..Import("progress.message.jclient.QueueConnectionFactory",classPath,port,host,exclusions)
}}

6. Запускаем подписчика:

d ##class(samples.jms.Subscriber).run()

Class samples.jms.Subscriber [ Abstract, ClassType = "" ]
{
ClassMethod run() [ Final ]
{
            s sonicBrokerJar = "C:/projects/isc/jms/lib/broker.jar;C:/projects/isc/jms/lib/sonic_Client.jar;C:/projects/isc/jms/lib/mfcontext.jar"
            s port = 9700
            s host  = "127.0.0.1"
            Set %objlasterror="",$ZT="Error",namespace=$zu(5),timeout=2
            // get an instance of JMSGateway and connect to JMS Listener Super Server
            Set jmsgateway=##class(EnsLib.JavaGateway.JMSGateway).%New()
            Set classPath=##class(%ListOfDataTypes).%New()
            Do classPath.Insert(sonicBrokerJar)
            Set status=jmsgateway.%Connect(host,port,namespace,timeout,classPath) If 'status Goto Error
           
            Set factory=##class(progress.message.jclient.TopicConnectionFactory).%New(jmsgateway,"oleo:2506")
            Set connection=factory.createTopicConnection("SALES","password")
            Set subSession=connection.createTopicSession(0,##class(javax.jms.Session).%GetParameter("AUTOuACKNOWLEDGE"))
            Set topic=subSession.createTopic("jms.samples.chat")
            Set subscriber=subSession.createSubscriber(topic)

    Do subscriber.setMessageListener(jmsgateway)
            Do connection.start()

            // wait for a single message, then print it out
            Write "Waiting for a single message",!,!
           
            Set message=jmsgateway.%GetMessage()
            Write "Received Message: "_message.getJMSMessageID(),!
            Write "Message Text: "_message.getText()
           
            Do connection.stop()
            Do connection.close()
            // disconnect
            Do jmsgateway.%Disconnect()
            Write !,!,"Test Successfully Completed"
            Quit
Error ; an error occurred
            Use 0
            If %objlasterror'="" { Write $system.OBJ.DisplayError(%objlasterror) } Else {       Write $ze }
}
}

7.Публикуем сообщение

d ##class(samples.jms.Publisher).run()

Class samples.jms.Publisher 
{

ClassMethod run() [ Final ]
{
            s sonicBrokerJar = "C:/projects/isc/jms/lib/broker.jar;C:/projects/isc/jms/lib/sonic_Client.jar;C:/projects/isc/jms/lib/mfcontext.jar"
            s port = 9700
            s host = "127.0.0.1"
            Set %objlasterror="",$ZT="Error",namespace=$zu(5),timeout=2
            // get an instance of JMSGateway and connect to JMSGateway Super Server
            Set jmsgateway=##class(EnsLib.JavaGateway.JMSGateway).%New()
            Set classPath=##class(%ListOfDataTypes).%New()
            Do classPath.Insert(sonicBrokerJar)
            Set status=jmsgateway.%Connect(host,port,namespace,timeout,classPath) If 'status Goto Error   
           
            Set factory=##class(progress.message.jclient.TopicConnectionFactory).%New(jmsgateway,"oleo:2506")
            Set connection=factory.createTopicConnection("SALES","password")
            Set pubSession=connection.createTopicSession(0,##class(javax.jms.Session).%GetParameter("AUTOuACKNOWLEDGE"))
           
    // create topic, publisher
    Set topic=pubSession.createTopic("jms.samples.chat")
            Set publisher=pubSession.createPublisher(topic)

            // print topic name
            Write "Topic Name = ",publisher.getTopic().getTopicName(),!

    // start the connection
    Do connection.start()

    // create and publish a message
            Set message=pubSession.createTextMessage()
            Do message.setText("JMSPublisher: Hello JMS!")
            Do publisher.publish(message)

            Do connection.stop()
            Do connection.close()
            // disconnect
            Do jmsgateway.%Disconnect()
            Write !,"Test Successfully Completed"
            Quit
Error ; an error occurred
            Use 0
            If %objlasterror'="" { Write $system.OBJ.DisplayError(%objlasterror) } Else {       Write $ze }
}
}
Примечание: JMS - стандарт, описывающий, как обмениваться   сообщениями между собой джава приложениям. Входит в J2EE и позволяет озволяющий приложениям, выполненным на платформе J2EE, создавать, посылать, получать и читать сообщения. Коммуникация между компонентами, использующими JMS, асинхронна (процедура не дожидается ответа на своё сообщение) и независима от исполнения компонентов.
JMS поддерживает две модели обмена сообщения: «от пункта к пункту» и «издатель-подписчик». Модель «от пункта к пункту» характеризуется следующим:
	?	Каждое сообщение имеет только одного адресата
	?	Сообщение попадает в «почтовый ящик», или «очередь» адресата и может быть прочитано когда угодно. Если адресат не работал в момент отсылки сообщения, сообщение не пропадёт.
	?	После получения сообщения адресат посылает извещение.
Модель «издатель-подписчик» характеризуется следующим:
	?	Подписчик подписывается на определённую «тему»
	?	Издатель публикует своё сообщение. Его получают все подписчики этой темы
	?	Получатель должен работать и быть подписан в момент отправки сообщения