top of page
Writer's pictureMadeira Team

Service Broker Sample: Multi-Threading

Introduction

Service Broker is a cool new feature since SQL Server 2005. Yes, SQL 2012 just came out so it’s hardly “new” anymore. But in reality, many DBAs and DB users are still not aware of this feature, or don’t know how to utilize it, or are afraid to do so because they’re not familiar with it.

Using this short series of posts, I’m hoping to ease the minds of such people.

In short, Service Broker is an asynchronous queuing engine working inside SQL Server. Using this engine we can send and receive binary or XML messages between queues. When a message is inserted in a queue, Service Broker executes an “activation procedure” that you specified, which is then supposed to handle messages from that queue.

Since we have a lot to cover in our sample, I won’t go into too much detail introducing Service Broker. But Microsoft has an excellent and detailed chapter about Service Broker available right here: http://msdn.microsoft.com/en-us/library/bb522893.aspx

Sample 1: Multi-Threading

I managed to encounter several times a scenario in which an application needed to execute several queries independent of each other, and then group them together somehow. However, each of these queries takes a while to execute.

One might wonder: If only we could use some kind of multi-threading mechanism to execute them in parallel, and then the total running time will be that of the slowest query only and not the total of them.

Well, it just so happens that our first sample will show how you can implement “multi-threading” in SQL Server using Service Broker queues!

The Design

First, I’ll explain the general idea of the design:

Since Service Broker only works with binary or XML messages, we’ll use XML to send a “query request”, and wait for a “query response”.

Specifically, we’ll send a message containing some kind of a dynamic query. But that dynamic query must return its result in XML form using an output parameter. Then, Service Broker will execute the query, retrieve the result from the output parameter, and send the result back as the response.

After that, it’s up to us to gather all the responses together and do whatever it is we wanted to do with it.

Also, for the sake of our sample, we’ll need to simulate long execution times. We’ll do this simply by adding a WAITFOR DELAY command into our dynamic queries which will cause the query to wait for a random number of seconds (between 10 and 30).

The Scripts

Available below is a ZIP file with the two scripts we’ll use:

One is the installation script, responsible for creating all the objects we require – the database, the service broker procedures, endpoints and queues. Everything you need to implement multi-threading in SQL Server.

The other is a sample of how to use this mechanism, using simple queries and a WAITFOR DELAY command to simulate long execution times.


The scripts are commented to help you understand what’s going on. But we’ll go over them bit by bit just in case.

The Installation Script

In order to explain the installation script, I’ll list all the objects that we’re creating and explain the purpose of each object. More detailed information about the implementation can be found in the script itself.



SB_PQ_Test:

That’s the database we’ll be using to create all the necessary objects.



SB_PQ_ServiceBrokerLogs:

This is a logging table which will be used to log the start and end of each query execution, and any errors that may occur.



SB_PQ_ExecuteDynamicQuery:

This is the procedure which will actually execute the dynamic queries. It builds the dynamic command, uses sp_executesql to execute it, and returns the result as an output parameter. The name of the output parameter itself can be dynamic.



SB_PQ_HandleQueue:

This is the activation procedure that Service Broker executes when it creates a “reader instance”. Or in other words, when there’s something in the request queue.

This procedure is the most important part because it’s responsible for retrieving messages from the request queue, execute the query (using SB_PQ_ExecuteDynamicQuery), and handle any errors that may occur in-between.


SB_PQ_Request_Queue:

This is the queue where all query requests will be waiting to be processed. Its activation procedure is SB_PQ_HandleQueue.



SB_PQ_Response_Queue:

This is the queue into which query responses will be sent. Its doesn’t have an activation procedure because query response extraction is done manually by the client process.



[//SB_PQ/Message]:

This is a simple message type of “Well Formed XML”. We’ll be using it to verify our messages.



[//SB_PQ/Contract]:

This is a contract defining that both target and initiator must send messages of type [//SB_PQ/Message].



[//SB_PQ/ProcessReceivingService]:

This is the service endpoint working as the “address” of request queries. We will be sending messages into this service. The queue it’ll be using to receive messages is SB_PQ_Request_Queue.



[//SB_PQ/ProcessStartingService]:

This is the service endpoint working as the “address” of response queries.

We will be sending messages from this service. The queue it’ll be using to receive messages is SB_PQ_Response_Queue.



SB_PQ_Start_Query:

This is the public procedure that will be used for starting an execution thread, by sending a request query to [//SB_PQ/ProcessStartingService].

You can think of it as an “API” simplifying our use of all the above objects.

It receives as parameters the SQL command that should be executed, the name of the output parameter (without the @), and the conversation group ID.



SB_PQ_Get_Response_One:

This is the public procedure that will be used to retrieve the results of our queries. You may also think of it as part of the “API” simplifying our use of the above objects. It receives as a parameter the conversation group ID to be used, and returns the first result it finds as an output parameter. This procedure is also responsible for ending the dialog conversations which are no longer needed.

Whew, that was a long list! Please note that since Service Broker works asynchronously in the background, error handling is extremely important. Notice all the error handling that I implemented, especially in the SB_PQ_HandleQueue procedure.

The Sample Script

Now we can really get down to business. I’ll go over the important parts in the script and explain each of them.

REMEMBER! This is only the sample script, it is not an integral part of our “API”, but is an example of how to use it:

Transact-SQL

-- random workload simulation settings SET @MinDelay = 10 SET @MaxDelay = 30 SET @ConvGroup = NEWID(); -- the messages will be grouped in a specific conversation group SET @NumOfSegments = 3; -- number of "threads" to use -- create several segments SET @Segment = 1; WHILE @Segment <= @NumOfSegments BEGIN

1

2

3

4

5

6

7

8

9

-- random workload simulation settings

SET @MinDelay = 10

SET @MaxDelay = 30

SET @ConvGroup = NEWID(); -- the messages will be grouped in a specific conversation group

SET @NumOfSegments = 3; -- number of "threads" to use

-- create several segments

SET @Segment = 1;

WHILE @Segment <= @NumOfSegments

BEGIN

In our sample, we create several “segments” (or threads) where each of them will have a random delay between 10 and 30 seconds, and execute a simple query returning its details. This will simulate a situation of “multiple queries”, where each “parallel query” will be represented by a “segment”. In the above script we define the minimum and maximum delay in seconds, set the conversation group ID by which our threads will be grouped, total number of threads (3 in our case), and we start looping per each of the threads.

Transact-SQL

-- random delay between @MinDelay and @MaxDelay seconds to simulate long execution time SET @Delay = '00:00:' + CONVERT(varchar(8), ROUND(RAND() * (@MaxDelay - @MinDelay),0) + @MinDelay)

1

2

-- random delay between @MinDelay and @MaxDelay seconds to simulate long execution time

SET @Delay = '00:00:' + CONVERT(varchar(8), ROUND(RAND() * (@MaxDelay - @MinDelay),0) + @MinDelay)

The above script simply prepares the delay representation by generating a random number of seconds as we defined earlier (between 10 and 30).

Transact-SQL

-- build our dynamic SQL command. note the use of XML as the result. SET @SQL = N' WAITFOR DELAY ''' + @Delay + '''; SET @SB_PQ_Result = ( SELECT Segment = ' + CONVERT(nvarchar(max), @Segment) + N', Delay = ''' + @Delay + N''', StartDate = GETDATE(), Name = QUOTENAME(name), object_id, type, modify_date FROM SB_PQ_Test.sys.tables AS Tab FOR XML AUTO, ELEMENTS ); ';

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

-- build our dynamic SQL command. note the use of XML as the result.

SET @SQL = N'

WAITFOR DELAY ''' + @Delay + ''';

SET @SB_PQ_Result =

(

SELECT

Segment = ' + CONVERT(nvarchar(max), @Segment) + N',

Delay = ''' + @Delay + N''',

StartDate = GETDATE(),

Name = QUOTENAME(name),

object_id, type, modify_date

FROM

SB_PQ_Test.sys.tables AS Tab

FOR XML AUTO, ELEMENTS

);

';

In the script above we create the dynamic SQL command which will be executed by the current thread. Note the use of XML AUTO, ELEMENTS to format the results in XML and put them in the output parameter.

Also note the use of WAITFOR DELAY at the start to simulate long execution time.

Transact-SQL

-- Send request to queue EXEC SB_PQ_Start_Query @SQL, @OutputParam, @ConvGroup;

1

2

-- Send request to queue

EXEC SB_PQ_Start_Query @SQL, @OutputParam, @ConvGroup;

The above script executes the API procedure SB_PQ_Start_Query with the SQL command we prepared, the name of the output parameter, and the conversation group ID. This will put the message in the request queue and continue executing without waiting for a response (the magic of asynchronous processing!).

Transact-SQL

-- increment segment index SET @Segment = @Segment + 1; END

1

2

3

-- increment segment index

SET @Segment = @Segment + 1;

END

The above script increases the segment (thread) index so we can continue to the next one.

Transact-SQL

-- init final result DECLARE @TotalResult XML; SET @TotalResult = '<Tables> </Tables>'

1

2

3

-- init final result

DECLARE @TotalResult XML;

SET @TotalResult = '<Tables> </Tables>'

The above script initializes an XML variable called @TotalResult which will contain all the results we’re about to receive.

Transact-SQL

-- count based on number of segments that we created earlier SET @Segment = 1; WHILE @Segment <= @NumOfSegments BEGIN

1

2

3

4

-- count based on number of segments that we created earlier

SET @Segment = 1;

WHILE @Segment <= @NumOfSegments

BEGIN

In this part of the script we’re resetting the @Segment index and looping again based on the number of threads in @NumOfSegments.

Transact-SQL

-- Get segment from response queue EXEC SB_PQ_Get_Response_One @ConvGroup, @CurrentResult OUTPUT

1

2

-- Get segment from response queue

EXEC SB_PQ_Get_Response_One @ConvGroup, @CurrentResult OUTPUT

In the above script, we use the API procedure SB_PQ_Get_Response_One to retrieve a single result from any of the threads (whichever finished executing first). The result will be returned in the output variable @CurrentResult.

Transact-SQL

-- insert into TotalResults using XML DML (syntax for SQL2008 and newer) SET @TotalResult.modify(' insert sql:variable("@CurrentResult") into (/Tables)[1] ');

1

2

3

4

-- insert into TotalResults using XML DML (syntax for SQL2008 and newer)

SET @TotalResult.modify('

insert sql:variable("@CurrentResult")

into (/Tables)[1] ');

The above script uses XML DML language with reference to the @CurrentResult variable to insert its contents into the @TotalResult variable.

Please note that this method of “inserting” a SQL variable into an XML document is SQL 2008 syntax and won’t work in older versions.

If you have SQL Server 2005, you will need to work a little harder to collect the results (for example, declaring @TotalResult as nvarchar(4000) without a closing tag, concatenating the results one-by-one, and only at the end concatenate the closing tag as well).

Transact-SQL

-- increment segment index SET @Segment = @Segment + 1; END

1

2

3

-- increment segment index

SET @Segment = @Segment + 1;

END

The above script increases the thread index so we can continue to the next one.

Transact-SQL

-- return final result (as XML) SELECT @TotalResult.query('.') AS FinalResult -- return final result (as relational table) SELECT Segment = T.XRecord.query('.').value('(/Tab/Segment)[1]','varchar(100)'), Delay = T.XRecord.query('.').value('(/Tab/Delay)[1]','varchar(8)'), StartDate = T.XRecord.query('.').value('(/Tab/StartDate)[1]','datetime') FROM @TotalResult.nodes('/Tables/Tab') AS T(XRecord)

1

2

3

4

5

6

7

8

9

10

-- return final result (as XML)

SELECT @TotalResult.query('.') AS FinalResult

-- return final result (as relational table)

SELECT

Segment = T.XRecord.query('.').value('(/Tab/Segment)[1]','varchar(100)'),

Delay = T.XRecord.query('.').value('(/Tab/Delay)[1]','varchar(8)'),

StartDate = T.XRecord.query('.').value('(/Tab/StartDate)[1]','datetime')

FROM

@TotalResult.nodes('/Tables/Tab') AS T(XRecord)

The above script shows two methods of how you can return your final data.

The first method returns the simple XML representation of @TotalResult.

The second method shows how you can break down each element in @TotalResult (using the nodes() method) and display the values of each element (using the value() method). Remember that this structure is something that we defined at the start of or sample, and has nothing to do with the API specifically.

Transact-SQL

-- check the SB logs to see how many unique sessions executed our requests DECLARE @NumOfSessions INT; SELECT @NumOfSessions = COUNT(DISTINCT SPID) FROM SB_PQ_ServiceBrokerLogs WHERE LogDate >= @StartTime PRINT CONVERT(nvarchar(100),@NumOfSessions) + ' unique sessions participated in execution' SELECT * FROM SB_PQ_ServiceBrokerLogs WHERE LogDate >= @StartTime

1

2

3

4

5

6

7

8

9

10

11

12

-- check the SB logs to see how many unique sessions executed our requests

DECLARE @NumOfSessions INT;

SELECT @NumOfSessions = COUNT(DISTINCT SPID)

FROM SB_PQ_ServiceBrokerLogs

WHERE LogDate >= @StartTime

PRINT CONVERT(nvarchar(100),@NumOfSessions) + ' unique sessions participated in execution'

SELECT *

FROM SB_PQ_ServiceBrokerLogs

WHERE LogDate >= @StartTime

The above script is a sort of “debug” script which will display information from our Service Broker logging table, specifically showing how many unique sessions participated in execution since the start of the script, and displays the contents of the logging table from that period of time.

Execution Delay (AKA “First Execution Problem”)

It takes Service Broker several seconds before activating more than one reader instance. However, because our activation procedure continues waiting for new requests even after finishing an execution, once a reader instance is activated it’ll stay active until it’s manually shut down.

As a result of this behavior, your first execution in multi-threading may not be as fast as you’d expect. But from the second try onwards it shouldn’t take longer than the longest running query.

Customizing for Your Version

The above sample script is a very crude sample of how to use the API we created using the installation script. But this architecture can be used for just about anything you want to execute multi-threaded.

Your own implementation will probably be much different.

You may want to perform a query on a large range of records, and prefer to break down that range to several sub-ranges, where each sub-range will be a separate thread / segment.

Maybe you have several different queries and you need to do different things with them, so you’ll have several different executions of the API procedures (you’d need to think how to identify each query though, perhaps by adding that information in the XML somewhere).

Maybe you’re not even performing queries. Maybe it’s some maintenance tasks you wish to perform simultaneously (you still should have a response of some kind, for example a message notifying of success or failure).

In any case, you’ll need to follow these rules:


Your SQL command must make use of an output parameter of type XML. You don’t need to declare it inside your command, but simply put into it the XML result you want to expect.


Take note of the name of the output parameter you’re using and provide it during the execution of the SB_PQ_Start_Query procedure.


If there’s a chance other sessions will be using this API at the same time, it’s extremely important to make use of the Conversation Group ID parameter so you’d have a way of separating between the data of different sessions.


In any case, the thread will try to return a response even if there was an error in your query (NULL will be returned). So in order to prevent clogging on the response queue, don’t forget to use the SB_PQ_Get_Response_One procedure to take them out of the queue and better yet – try to keep track of the “threads” that you initiate so you’d know how many times you need to use SB_PQ_Get_Response_One.

Conclusion

This was but one example of how you can use Service Broker to improve your work with SQL Server. I’ll be working on new examples so check back soon!

I hope my post wasn’t too complicated or confusing. But if you have any questions, please post them in the comment section below, and I’d be happy to help!

Photo courtesy of Brett L.

0 comments

Comments


STAY IN TOUCH

Get New posts delivered straight to your inbox

Thank you for subscribing!

bottom of page