------(一)、启用数据库的Service broker活动
-- Enabling Databases for Service broker Activity
USE master
GO
IF NOT EXISTS (SELECT name FROM sys.databases WHERE name = 'SSB_Test')
CREATE DATABASE SSB_Test
GO
ALTER DATABASE SSB_Test SET ENABLE_broKER
GO
ALTER DATABASE SSB_Test SET TRUSTWORTHY ON
GO
-----(二)、创建数据库主密钥
-- Creating the DatabaseMaster Key for Encryption
USE SSB_Test
GO
CREATE MASTER KEY
ENCRYPTION BY PASSWORD = 'I5Q7w1d3'
GO
-------(三)、管理消息类型
-- Managing Message Types
Use SSB_Test
GO
-- 发送消息类型
CREATE MESSAGE TYPE [//SSB_Test/SendMessages]
VALIDATION = NONE
GO
--目标数据库发送的消息类型
CREATE MESSAGE TYPE [//SSB_Test/ReceivedMessages]
VALIDATION = NONE
GO
------(四)、创建契约(Contract)
-- Creating Contracts
Use SSB_Test
GO
CREATE CONTRACT
[//SSB_Test/SSBContract]
( [//SSB_Test/SendMessages]
SENT BY INITIATOR,
[//SSB_Test/ReceivedMessages]
SENT BY TARGET
)
GO
-----------(五)、创建队列
Use SSB_Test
GO
--保存Bookdistribution过来的消息
CREATE QUEUE SSBSendQueue
WITH STATUS=ON
GO
USE SSB_Test
GO
--保存BookStore过来的消息
CREATE QUEUE SSBReceivedQueue
WITH STATUS=ON
GO
-------(六)、创建服务
-- Creating Services
Use SSB_Test
GO
CREATE SERVICE
[//SSB_Test/FromService]
ON QUEUE dbo.SSBSendQueue--指定的队列绑定到契约
([//SSB_Test/SSBContract])
GO
USE SSB_Test
GO
CREATE SERVICE [//SSB_Test/ToService]
ON QUEUE dbo.SSBReceivedQueue--指定的队列绑定到契约
([//SSB_Test/SSBContract])
GO
USE SSB_Test
GO
--创建一个表存放接收到的订单信息
CREATE TABLE dbo.SSBReceived
(ReceivedID int IDENTITY (1,1) NOT NULL,
conversation_handle uniqueidentifier NOT NULL,
conversation_group_id uniqueidentifier NOT NULL,
message_body VarBinary(max) NOT NULL)
GO
----1:
USE SSB_Test
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE PROCEDURE [dbo].[Send_Messages]
@sendMsgContext VarBinary(max),
@result int OUTPUT
AS
DECLARE @Conv_Handler uniqueidentifier
DECLARE @sendMsg VarBinary(max);
SET XACT_ABORT ON
BEGIN TRANSACTION
BEGIN DIALOG CONVERSATION @Conv_Handler
FROM SERVICE [//SSB_Test/FromService]
TO SERVICE '//SSB_Test/ToService'
ON CONTRACT [//SSB_Test/SSBContract]
---WITH LIFETIME = 200;
SET @sendMsg = @sendMsgContext;
----第二步:发起会话
SEND ON CONVERSATION @Conv_Handler
MESSAGE TYPE [//SSB_Test/SendMessages]
(@sendMsg);
END CONVERSATION @Conv_Handler WITH CLEANUP;
SET @result = 1
COMMIT TRANSACTION
RETURN @result;
USE SSB_Test
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE PROCEDURE [dbo].[Receive_Messages]
AS
DECLARE @Conv_Handler uniqueidentifier
DECLARE @Conv_Group uniqueidentifier
DECLARE @Receive_Msg VarBinary(max);
----DECLARE @TextResponseMsg varchar(8000)
----DECLARE @ResponseMsg VarBinary(max);
DECLARE @Message_Type_Name nvarchar(256);
SET XACT_ABORT ON
BEGIN TRAN;
--WHILE(1=1)
--BEGIN
---WAITFOR(
RECEIVE TOP(1) @Receive_Msg = message_body,
@Conv_Handler = conversation_handle,
@Conv_Group = conversation_group_id,
@Message_Type_Name = message_type_name
FROM dbo.SSBReceivedQueue
---),TIMEOUT 200
IF @Message_Type_Name = '//SSB_Test/SendMessages'
BEGIN
INSERT dbo.SSBReceived
(conversation_handle,conversation_group_id,message_body)
VALUES
(@Conv_Handler,@Conv_Group,@Receive_Msg)
END
IF @Message_Type_Name = 'http://schemas.microsoft.com/sql/Servicebroker/EndDialog'
BEGIN
END CONVERSATION @Conv_Handler;
END
COMMIT TRAN
GO
二:程序调用发送存储过程
package com.dingli.json.util;
import java.nio.ByteBuffer;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.sqlException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
public class TestMutilServicebroker {
/**
* @param args
*/
public static void main(String[] args) {
ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(50);
exec.scheduleAtFixedrate(new Servicebroker(),1000,
TimeUnit.MILLISECONDS);
}
}
class Servicebroker implements Runnable {
protected final Logger logger = Logger.getLogger(Servicebroker.class);
final static String driver = "net.sourceforge.jtds.jdbc.Driver";
private static Connection conn = null;
private CallableStatement cStmt = null;
@Override
public void run() {
final Servicebroker test = new Servicebroker();
final Connection con = test.getConnection();
final long starttime = System.currentTimeMillis();
Send_Message(con);
// Mutil_Send_Message(con);
long endtime = System.currentTimeMillis();
logger.info("每次发送消息响应时间毫秒=" + (endtime - starttime));
}
private Connection getConnection() {
try {
Class.forName(driver);
final String url = "jdbc:jtds:sqlserver://172.16.0.10:1433;DatabaseName=SSB_Test";
// final String url =
// "jdbc:jtds:sqlserver://172.16.0.10:1433;DatabaseName=BookStore";
final String username = "sa";
final String password = "Fleet2011@DB.";
conn = DriverManager.getConnection(url,username,password);
} catch (final ClassNotFoundException e) {
e.printstacktrace();
} catch (final sqlException e) {
e.printstacktrace();
}
return conn;
}
private void Send_Message(Connection con) {
try {
cStmt = con.prepareCall("{call [dbo].[Send_Messages](?,?)}");
final StringBuffer sb = new StringBuffer();
ByteBuffer buffer = ByteBuffer.allocate(5000);
sb.append("<order id=\"3439\" customer=\"22\" orderdate=\"2/15/2011\"><LineItem ItemNumber=\"1\" ISBN=\"1-59059-592-0\" Quantity=\"1\" /></order>");
buffer.put(sb.toString().getBytes());
cStmt.setBytes("@sendMsgContext",buffer.array());
// cStmt.setString("@sendMsgContext",buffer);
cStmt.setInt("@result",0);
cStmt.execute();
sb.delete(0,sb.length());
} catch (final sqlException e) {
e.printstacktrace();
} finally {
try {
cStmt.close();
con.close();
} catch (final Exception e) {
e.printstacktrace();
}
}
}
private void Mutil_Send_Message(Connection con) {
try {
cStmt = con
.prepareCall("{call [dbo].[sp_PublishOrSubscriptionMessages](?,?,?)}");
final StringBuffer sb = new StringBuffer();
// sb.append("<order id=\"3439\" customer=\"22\" orderdate=\"2/15/2011\"><LineItem ItemNumber=\"1\" ISBN=\"1-59059-592-0\" Quantity=\"1\" /></order>");
// cStmt.setString("@sendMsgContext",sb.toString());
// cStmt.setInt("@result",0);
sb.append("<?xml version=\"1.0\"?><Publish xmlns=\"http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe\"><Subject>Subject1</Subject></Publish>");
cStmt.setString("@FromServerName","[AuthorService]");
cStmt.setString("@ToServerName","PublisherService");
cStmt.setString("@ContractName",
"[http://ssb.csharp.at/SSB_Book/c10/PublishContract]");
cStmt.setString("@MessageType",
"[http://ssb.csharp.at/SSB_Book/c10/PublishMessage]");
cStmt.setString("@MessageBody",sb.toString());
cStmt.setInt("@result",sb.length());
} catch (final sqlException e) {
e.printstacktrace();
} finally {
try {
cStmt.close();
con.close();
} catch (final Exception e) {
e.printstacktrace();
}
}
}
}
三:程序接收消息调用存储过程
package com.dingli.json.util;
import java.io.Serializable;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.sqlException;
import org.apache.log4j.Logger;
public class TestServicebrokerTwo implements Serializable {
protected static Logger logger = Logger
.getLogger(TestServicebrokerTwo.class);
/**
*
*/
private static final long serialVersionUID = -8531232416149283642L;
final String driver = "net.sourceforge.jtds.jdbc.Driver";
Connection conn = null;
CallableStatement cStmt = null;
/**
* @param args
*/
public static void main(String[] args) {
TestServicebrokerTwo test = new TestServicebrokerTwo();
Connection con = null;
while (true) {
con = test.getConnection();
long starttime = System.currentTimeMillis();
test.Receive_Messages(con);
long endtime = System.currentTimeMillis();
logger.info("每次接收消息响应时间毫秒=" + (endtime - starttime));
}
}
public Connection getConnection() {
try {
Class.forName(driver);
String url = "jdbc:jtds:sqlserver://172.16.0.10:1433;DatabaseName=SSB_Test";
String username = "sa";
String password = "Fleet2011@DB.";
conn = DriverManager.getConnection(url,password);
} catch (ClassNotFoundException e) {
e.printstacktrace();
} catch (sqlException e) {
e.printstacktrace();
}
return conn;
}
public void Receive_Messages(Connection con) {
try {
cStmt = con
.prepareCall("{call [dbo].[Receive_Messages]}");
cStmt.executeUpdate();
} catch (sqlException ex) {
ex.printstacktrace();
} finally {
ClearConnection(cStmt,con);
}
}
public void ClearConnection(CallableStatement cStmt,Connection con) {
try {
cStmt.close();
con.close();
} catch (Exception e) {
e.printstacktrace();
}
}
}
四:检验数据
select * from dbo.SSBReceivedQueue;
select * from dbo.SSBSendQueue;
select * from dbo.SSBReceived;
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。