Uutiset

Adafyn kuulumisia

NetMQ and creating a Dynamic Worker per task

In this post we will use NetMQ to build a solution where multiple clients can run commands on the server and the server dynamically created a worker for each command.

Background

ZeroMQ and its .NET port NetMQ are interesting technologies. They seem to have a rather smallish but very enthusiastic group of users and the “scene” gives similar vibes as the Redis community.

ZeroMQ is a technology for adding distributed messaging into your system. Pub & sub, reply & request and other types of communication patterns are available. ZeroMQ doesn’t require a server installation, so it’s a library instead of full blown server solution. For communication, you can use TCP, inproc and other techniques.

Dynamic workers

We needed to use NetMQ in a situation where there’s multiple clients running commands on the server. The idea was that the server would spin up a worker for each client request. Even though ZeroMQ’s documentation is good, finding an example with dynamic workers turned out cumbersome.

We ended up using the following topology:

Client: RequestSocket

Server: RouterSocket (TCP 5555) – DealerSocket (TCP 5556) & Poller

Worker: DealerSocket

Creating the server

The server has a frontend for the requests coming from the client. It also has the backend for communicating with the workers. Poller is used to handle the messages:

Console.WriteLine("Starting server");
using (var front = new RouterSocket())
using (var back = new DealerSocket())
{
    front.Bind("tcp://localhost:5555");
    back.Bind("tcp://localhost:5556");

    var poller = new NetMQPoller();
    poller.Add(front);
    poller.Add(back);

	front.ReceiveReady += (sender, eventArgs) => {...};
	back.ReceiveReady += (sender, eventArgs) => {...};

	poller.Run();
}

Creating the workers

When the server’s frontend receives a message, we want to spin up a new worker. In this example we use ThreadPool to create the worker:

front.ReceiveReady += (sender, eventArgs) =>
{
    var mqMessage = eventArgs.Socket.ReceiveMultipartMessage(3);

    var id = mqMessage.First;
    var content = mqMessage[2].ConvertToString();

    Console.WriteLine("Front received " + content);

    ThreadPool.QueueUserWorkItem(context =>
    {
		// The worker
		// Parameters are available from the context.
        var context = (Tuple<NetMQFrame, string>) context;

        var clientId = context.Item1;
        var message = context.Item2;

		// Run the command
        Thread.Sleep(TimeSpan.FromSeconds(3));

		// Send message to server's backend which then will return the reply to the client
        using (var workerConnection = new DealerSocket())
        {
            workerConnection.Connect("tcp://localhost:5556");

            var messageToClient = new NetMQMessage();
            messageToClient.Append(clientId);
            messageToClient.AppendEmptyFrame();
            messageToClient.Append("hello from worker");

            workerConnection.SendMultipartMessage(messageToClient);
        }

    }, Tuple.Create(id, content));
};

Returning the message to client

As we can see from the code above, the worker message sends the reply to the server’s backend. Only thing left is to route the reply back to the client:

                    back.ReceiveReady += (sender, eventArgs) =>
                    {
                        Console.WriteLine("Back received message, route to client");
                        var mqMessage = eventArgs.Socket.ReceiveMultipartMessage();
                        front.SendMultipartMessage(mqMessage);
                    };

The client

Our client uses RequestSocket to call the server’s frontend. We use a blocking call, so we wait for the server (the worker) to reply:

using (var client = new RequestSocket())
{
    client.Connect("tcp://localhost:5555");
    client.SendFrame("hello from client");
    var returned = client.ReceiveFrameString();
    Console.WriteLine(i1.ToString() + ": back at client " + returned);
}

Conclusion

This post shows one solution for spinning up worker tasks (threads) dynamically using NetMQ. Even though NetMQ only includes few basic concepts, the concepts are so flexible that it’s quite that there’s many other ways to handle this situation.