微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

SqlServer2008 点对点模式实例


------(一)、启用数据库的Service broker活动
-- Enabling Databases for Service broker Activity
USE master
GO
IF NOT EXISTS (SELECT name FROM sys.databases WHERE name = 'SSB_Test')

CREATE DATABASE SSB_Test
GO

ALTER DATABASE SSB_Test SET ENABLE_broKER
GO
ALTER DATABASE SSB_Test SET TRUSTWORTHY ON

GO

 

-----(二)、创建数据库主密钥
-- Creating the DatabaseMaster Key for Encryption
USE SSB_Test
GO
CREATE MASTER KEY
ENCRYPTION BY PASSWORD = 'I5Q7w1d3'
GO

 

-------(三)、管理消息类型
-- Managing Message Types
Use SSB_Test
GO
-- 发送消息类型
CREATE MESSAGE TYPE [//SSB_Test/SendMessages]
VALIDATION = NONE

GO


--目标数据库发送的消息类型
CREATE MESSAGE TYPE [//SSB_Test/ReceivedMessages]
VALIDATION = NONE
GO

 


------(四)、创建契约(Contract)
-- Creating Contracts
Use SSB_Test
GO
CREATE CONTRACT
[//SSB_Test/SSBContract]
( [//SSB_Test/SendMessages]
SENT BY INITIATOR,
[//SSB_Test/ReceivedMessages]
SENT BY TARGET
)
GO


-----------(五)、创建队列
Use SSB_Test
GO
--保存Bookdistribution过来的消息

CREATE QUEUE SSBSendQueue
WITH STATUS=ON
GO

USE SSB_Test
GO
--保存BookStore过来的消息
CREATE QUEUE SSBReceivedQueue
WITH STATUS=ON
GO

 

-------(六)、创建服务
-- Creating Services
Use SSB_Test
GO
CREATE SERVICE
[//SSB_Test/FromService]
ON QUEUE dbo.SSBSendQueue--指定的队列绑定到契约
([//SSB_Test/SSBContract])
GO


USE SSB_Test
GO
CREATE SERVICE [//SSB_Test/ToService]
ON QUEUE dbo.SSBReceivedQueue--指定的队列绑定到契约
([//SSB_Test/SSBContract])
GO

 

 


USE SSB_Test
GO
--创建一个表存放接收到的订单信息
CREATE TABLE dbo.SSBReceived
(ReceivedID int IDENTITY (1,1) NOT NULL,
conversation_handle uniqueidentifier NOT NULL,
conversation_group_id uniqueidentifier NOT NULL,
message_body VarBinary(max) NOT NULL)
GO

 

 

 


----1:
USE SSB_Test
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO

CREATE PROCEDURE [dbo].[Send_Messages]
@sendMsgContext VarBinary(max),
@result int OUTPUT
AS
 
DECLARE @Conv_Handler uniqueidentifier
DECLARE @sendMsg VarBinary(max);

SET XACT_ABORT ON
BEGIN TRANSACTION

BEGIN DIALOG CONVERSATION @Conv_Handler
FROM SERVICE [//SSB_Test/FromService]
TO SERVICE '//SSB_Test/ToService'
ON CONTRACT [//SSB_Test/SSBContract]
---WITH LIFETIME = 200;
SET @sendMsg = @sendMsgContext;

----第二步:发起会话
SEND ON CONVERSATION @Conv_Handler
MESSAGE TYPE [//SSB_Test/SendMessages]
(@sendMsg);


END CONVERSATION @Conv_Handler WITH CLEANUP;

SET @result = 1

COMMIT TRANSACTION

RETURN @result;

 


USE SSB_Test
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO

CREATE PROCEDURE [dbo].[Receive_Messages]
AS
DECLARE @Conv_Handler uniqueidentifier
DECLARE @Conv_Group uniqueidentifier
DECLARE @Receive_Msg VarBinary(max);
----DECLARE @TextResponseMsg varchar(8000)
----DECLARE @ResponseMsg VarBinary(max);
DECLARE @Message_Type_Name nvarchar(256);

SET XACT_ABORT ON
BEGIN TRAN;
--WHILE(1=1)
--BEGIN
---WAITFOR(
RECEIVE TOP(1) @Receive_Msg = message_body,
@Conv_Handler = conversation_handle,
@Conv_Group = conversation_group_id,
@Message_Type_Name = message_type_name
FROM dbo.SSBReceivedQueue
---),TIMEOUT 200


IF @Message_Type_Name = '//SSB_Test/SendMessages'
BEGIN
INSERT dbo.SSBReceived
(conversation_handle,conversation_group_id,message_body)
VALUES
(@Conv_Handler,@Conv_Group,@Receive_Msg)

END

IF @Message_Type_Name = 'http://schemas.microsoft.com/sql/Servicebroker/EndDialog'
BEGIN
END CONVERSATION @Conv_Handler;
END

COMMIT TRAN

GO

 

 二:程序调用发送存储过程

package com.dingli.json.util;

import java.nio.ByteBuffer;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.sqlException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

public class TestMutilServicebroker {
 /**
  * @param args
  */
 public static void main(String[] args) {
  ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(50);
  exec.scheduleAtFixedrate(new Servicebroker(),1000,
    TimeUnit.MILLISECONDS);
 }
}

class Servicebroker implements Runnable {
 protected final Logger logger = Logger.getLogger(Servicebroker.class);
 final static String driver = "net.sourceforge.jtds.jdbc.Driver";
 private static Connection conn = null;
 private CallableStatement cStmt = null;

 @Override
 public void run() {
  final Servicebroker test = new Servicebroker();
  final Connection con = test.getConnection();
  final long starttime = System.currentTimeMillis();
  Send_Message(con);
  // Mutil_Send_Message(con);
  long endtime = System.currentTimeMillis();
  logger.info("每次发送消息响应时间毫秒=" + (endtime - starttime));
 }

 private Connection getConnection() {
  try {
   Class.forName(driver);
   final String url = "jdbc:jtds:sqlserver://172.16.0.10:1433;DatabaseName=SSB_Test";
   // final String url =
   // "jdbc:jtds:sqlserver://172.16.0.10:1433;DatabaseName=BookStore";
   final String username = "sa";
   final String password = "Fleet2011@DB.";
   conn = DriverManager.getConnection(url,username,password);
  } catch (final ClassNotFoundException e) {
   e.printstacktrace();
  } catch (final sqlException e) {
   e.printstacktrace();
  }
  return conn;
 }

 private void Send_Message(Connection con) {

  try {
   cStmt = con.prepareCall("{call [dbo].[Send_Messages](?,?)}");
   final StringBuffer sb = new StringBuffer();
   ByteBuffer buffer = ByteBuffer.allocate(5000);
   sb.append("<order id=\"3439\" customer=\"22\" orderdate=\"2/15/2011\"><LineItem ItemNumber=\"1\" ISBN=\"1-59059-592-0\" Quantity=\"1\" /></order>");
   buffer.put(sb.toString().getBytes());
   cStmt.setBytes("@sendMsgContext",buffer.array());
   // cStmt.setString("@sendMsgContext",buffer);
   cStmt.setInt("@result",0);
   cStmt.execute();
   sb.delete(0,sb.length());

  } catch (final sqlException e) {
   e.printstacktrace();
  } finally {
   try {
    cStmt.close();
    con.close();
   } catch (final Exception e) {
    e.printstacktrace();
   }
  }
 }

 private void Mutil_Send_Message(Connection con) {
  try {
   cStmt = con
     .prepareCall("{call [dbo].[sp_PublishOrSubscriptionMessages](?,?,?)}");
   final StringBuffer sb = new StringBuffer();
   // sb.append("<order id=\"3439\" customer=\"22\" orderdate=\"2/15/2011\"><LineItem ItemNumber=\"1\" ISBN=\"1-59059-592-0\" Quantity=\"1\" /></order>");
   // cStmt.setString("@sendMsgContext",sb.toString());
   // cStmt.setInt("@result",0);

   sb.append("<?xml version=\"1.0\"?><Publish xmlns=\"http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe\"><Subject>Subject1</Subject></Publish>");
   cStmt.setString("@FromServerName","[AuthorService]");
   cStmt.setString("@ToServerName","PublisherService");
   cStmt.setString("@ContractName",
     "[http://ssb.csharp.at/SSB_Book/c10/PublishContract]");
   cStmt.setString("@MessageType",
     "[http://ssb.csharp.at/SSB_Book/c10/PublishMessage]");
   cStmt.setString("@MessageBody",sb.toString());
   cStmt.setInt("@result",sb.length());

  } catch (final sqlException e) {
   e.printstacktrace();
  } finally {
   try {
    cStmt.close();
    con.close();
   } catch (final Exception e) {
    e.printstacktrace();
   }
  }
 }
}

 

三:程序接收消息调用存储过程

package com.dingli.json.util;

import java.io.Serializable;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.sqlException;

import org.apache.log4j.Logger;

public class TestServicebrokerTwo implements Serializable {
 protected static Logger logger = Logger
   .getLogger(TestServicebrokerTwo.class);
 /**
  *
  */
 private static final long serialVersionUID = -8531232416149283642L;

 final String driver = "net.sourceforge.jtds.jdbc.Driver";
 Connection conn = null;
 CallableStatement cStmt = null;

 /**
  * @param args
  */
 public static void main(String[] args) {
  TestServicebrokerTwo test = new TestServicebrokerTwo();
  Connection con = null;
  while (true) {
   con = test.getConnection();
   long starttime = System.currentTimeMillis();
   test.Receive_Messages(con);
   long endtime = System.currentTimeMillis();
   logger.info("每次接收消息响应时间毫秒=" + (endtime - starttime));
  }
 }

 public Connection getConnection() {
  try {
   Class.forName(driver);
   String url = "jdbc:jtds:sqlserver://172.16.0.10:1433;DatabaseName=SSB_Test";
   String username = "sa";
   String password = "Fleet2011@DB.";
   conn = DriverManager.getConnection(url,password);
  } catch (ClassNotFoundException e) {
   e.printstacktrace();
  } catch (sqlException e) {
   e.printstacktrace();
  }
  return conn;

 }

 public void Receive_Messages(Connection con) {
  try {
   cStmt = con
     .prepareCall("{call [dbo].[Receive_Messages]}");
   cStmt.executeUpdate();
  } catch (sqlException ex) {
   ex.printstacktrace();
  } finally {
   ClearConnection(cStmt,con);
  }
 }

 public void ClearConnection(CallableStatement cStmt,Connection con) {
  try {
   cStmt.close();
   con.close();

  } catch (Exception e) {
   e.printstacktrace();
  }
 }
}

 

四:检验数据

select * from dbo.SSBReceivedQueue;

select * from dbo.SSBSendQueue;

select * from dbo.SSBReceived;

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐