[The Methanum project] Creating tools for building distributed systems with Star topology

  • Tutorial


The star is by far the most common topology for computer networks. This structure has several advantages: ease of scaling, reliability (failure of one machine does not affect others) and ease of administration. Of course, this solution from the physical level has long been implemented at the software level. Nevertheless, I present to my readers my version of .Net tools for building distributed systems with star topology.

Systems built on the basis of such a topology can be structurally organized, for example, as in the image below.



In this article I will describe the creation of minimal tools without many useful features that I once used in my development based on the presented architecture. However, this is quite enough to build really useful systems.

Methanum





The project received the code name Methanum solely because of the structural similarity of the topology with the methane molecule :). The central node acting as a communicator is called "Core". The remaining network applications are connected to the kernel and subscribe to events. Each network application can also emit events. Thus, through events, data is exchanged on the network. Events are a serializable Event class that can contain arbitrary data. Event minimally contains 2 fields - the Destination string field, which classifies the event, and the Data field, containing the key value dictionary. Key is a string, argument name, Value is of type object and can contain primitives (int, double, bool ...). For structures, you have to somewhat help the system serialize them.

First, create the “methanum” project of the C # class library and add files to it in the course of the text.

Event





As already mentioned, data is transmitted through events. An event is a class that includes a Data field and a field to identify the Destination event. I also left two more fields: Id - unique identifier and DataTime containing the time the event was created. These additional fields are needed solely for convenience, for example, to parse logs. The event class also contains a number of methods designed to simplify the life of the programmer, I think their purpose will be clear from the code and does not need additional explanations.

Event.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Web.Script.Serialization;
namespace methanum
{
    [DataContract]
    [KnownType(typeof(List))]
    public class Event {
        /// 
        /// A unique id of the event
        /// 
        [DataMember]
        public Guid Id { set; get; }
        /// 
        /// DateTime of event creation
        /// 
        [DataMember]
        public DateTime DataTime { get; set; }
        /// 
        /// Target
        /// 
        [DataMember]
        public string Destination { get; set; }
        /// 
        /// Data container
        /// 
        [DataMember]
        public Dictionary Data { get; set; }
        public Event() {
            Init();
        }
        public Event(string destination) {
            Init();
            Destination = destination;
        }
        private void Init() {
            Data = new Dictionary();
            Id = Guid.NewGuid();
            DataTime = DateTime.Now;
        }
        public override string ToString() {
            var properties = GetType().GetProperties();
            var sb = new StringBuilder();
            sb.AppendFormat("[{0}]", GetType().Name);
            foreach (var property in properties) {
                if (property.Name == "Data") {
                    sb.Append("\nData = ");
                    string s = string.Format(" {0}", '{');
                    s = Data.Keys.Aggregate(s,
                        (current, key) => current + String.Format("\n  {0}\t:{1}", key, Data[key]));
                    sb.AppendFormat("{0}\n{1}", s, '}');
                }
                else sb.AppendFormat("\n{0} = {1};", property.Name, property.GetValue(this, null));
            }
            return sb.ToString();
        }
        public void SetData(string key, object obj) {
            Data[key] = obj;
        }
        public object GetObj(string key) {
            return !Data.ContainsKey(key) ? null : Data[key];
        }
        public double GetDbl(string key) {
            return !Data.ContainsKey(key) ? Double.NaN : Convert.ToDouble(Data[key]);
        }
        public int GetInt(string key) {
            return !Data.ContainsKey(key) ? Int32.MinValue : Convert.ToInt32(Data[key]);
        }
        public bool GetBool(string key) {
            return Data.ContainsKey(key) && Convert.ToBoolean(Data[key]);
        }
        public string GetStr(string key) {
            return !Data.ContainsKey(key) ? null : Convert.ToString(Data[key]);
        }
        public void SetCustomData(string key, object value) {
            var serializer = new JavaScriptSerializer();
            var str = serializer.Serialize(value);
            SetData(key, str);
        }
        public object GetCustom(string key, Type valueType) {
            if (!Data.ContainsKey(key))
                return null;
            if (Data[key].GetType() != typeof(string))
                return null;
            var serializer = new JavaScriptSerializer();
            var str = (string) Data[key];
            var obj = serializer.Deserialize(str, valueType);
            return obj;
        }
    }
}



Gate





The core of the kernel is to implement the interface, let's call it the “gate interface”. The main goal of the gate is to provide functionality for registering clients and sending events asynchronously in both directions (from the application to the kernel and vice versa).

IGate.cs
using System.ServiceModel;
namespace methanum {
    [ServiceContract(CallbackContract = typeof(IListener))]
    public interface IGate {
        [OperationContract]
        void Subscribe();
        [OperationContract]
        void KillConnection();
        [OperationContract]
        void Fire(Event evt);
    }
}



Our data contract is duplex, in the forward direction - from the application to the kernel - we shoot events through IGate by calling the void Fire (Event evt) method. The callback - from the kernel to the application - occurs through the IListener interface, which will be discussed later.
The gate works according to the following principle. When the kernel starts, an object of the Gate class is inherited from the IGate interface. Gate has a static _subscribers field, which stores all active connections to the kernel. When calling the Subscribe () method, we add the current connection, if it has not been added yet. The KillConnection () method is used to delete the current connection. The most interesting is the Fire (Event evt) method, but there is nothing complicated in it either. We get half of the method to the IP address and port, only to display information in the console. I left this part of the code solely to demonstrate how to access the connection address, for example, to filter or log events to allowed addresses. The main work of this method is to bypass all existing connections and asynchronously call the Receive method on their IListener listeners. If we find a closed connection, then we immediately remove it from the list of active connections.

Gate.cs
using System;
using System.Collections.Generic;
using System.ServiceModel;
using System.ServiceModel.Channels;
namespace methanum {
    public class Gate : IGate {
        private static List _subscribers;
        public Gate() {
            if (_subscribers == null)
                _subscribers = new List();
        }
        public void Subscribe() {
            var oc = OperationContext.Current;
            if (!_subscribers.Exists(c => c.SessionId == oc.SessionId)) {
                _subscribers.Add(oc);
                Console.WriteLine("(subscribe \"{0}\")", oc.SessionId);
            }
        }
        public void KillConnection() {
            var oc = OperationContext.Current;
            _subscribers.RemoveAll(c => c.SessionId == oc.SessionId);
            Console.WriteLine("(kill \"{0}\")", oc.SessionId);
        }
        public void Fire(Event evt) {
            var currentOperationContext = OperationContext.Current;
            var remoteEndpointMessageProperty =
                currentOperationContext.IncomingMessageProperties[RemoteEndpointMessageProperty.Name] as
                    RemoteEndpointMessageProperty;
            var ip = "";
            var port = 0;
            if (remoteEndpointMessageProperty != null) {
                ip = remoteEndpointMessageProperty.Address;
                port = remoteEndpointMessageProperty.Port;
            }
            Console.WriteLine("(Fire (event . \"{0}\") (from . \"{1}:{2}\") (subscribers . {3}))", evt.Id, ip, port, _subscribers.Count);
            for (var i = _subscribers.Count - 1; i >= 0; i--) {
                var oc = _subscribers[i];
                if (oc.Channel.State == CommunicationState.Opened) {
                    var channel = oc.GetCallbackChannel();
                    try {
                        ((DelegateReceive) (channel.Receive)).BeginInvoke(evt, null, null);
                    }
                    catch (Exception e) {
                        Console.WriteLine(e.Message);
                    }
                }
                else {
                    _subscribers.RemoveAt(i);
                    Console.WriteLine("(dead . \"{0}\")", oc.SessionId);
                }
            }
        }
    }
}



Listener





To send a message from the kernel to the client, one Receive method is enough, which is defined in the IListener interface.

IListener.cs
using System.ServiceModel;
namespace methanum {
    public delegate void DelegateReceive(Event evt);
    interface IListener {
        [OperationContract(IsOneWay = true)]
        void Receive(Event evt);
    }
}



The Connector class is inherited from the IListener interface, which implements all the logic of interaction between the client application and the kernel. When an instance of the class is created, a connection to the kernel is created through which messages are transmitted and received. Messages are sent and received in separate threads to prevent blocking of applications and the kernel. To distinguish between events, they have a Destination field. Filtering events using if-then-else or switch-case constructions is inconvenient, therefore, a mechanism was implemented that allowed each event of interest to be matched with a handler. This mapping is stored in the Dictionary dictionary_handlers ;. When the event is accepted, a search is performed in the dictionary and, if the key is found, the corresponding handler is called.

Connector.cs
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.ServiceModel;
using System.Threading;
namespace methanum {
    public delegate void CbHandler(Event evt);
    public class Connector : IListener {
        private Dictionary _handlers;
        private NetTcpBinding _binding;
        private EndpointAddress _endpointToAddress;
        private InstanceContext _instance;
        private DuplexChannelFactory _channelFactory;
        private IGate _channel;
        private Thread _fireThread;
        private List _eventQueue;
        public event CbHandler ReceiveEvent;
        private bool _isSubscribed;
        private object _channelSync = new object();
        protected virtual void OnReceive(Event evt) {
            CbHandler handler = ReceiveEvent;
            if (handler != null) handler.BeginInvoke(evt, null, null);
        }
        //localhost:2255
        public Connector(string ipAddress) {
            init(ipAddress);
        }
        private void init(string ipAddress) {
            _handlers = new Dictionary();
            _binding = new NetTcpBinding();
            _endpointToAddress = new EndpointAddress(string.Format("net.tcp://{0}", ipAddress));
            _instance = new InstanceContext(this);
            Conect();
            _eventQueue = new List();
            _fireThread = new Thread(FireProc);
            _fireThread.IsBackground = true;
            _fireThread.Start();
        }
        private void Conect() {
            _isSubscribed = false;
            while (!_isSubscribed) {
                try {
                    _channelFactory = new DuplexChannelFactory(_instance, _binding, _endpointToAddress);
                    _channel = _channelFactory.CreateChannel();
                    _channel.Subscribe();
                    _isSubscribed = true;
                }
                catch (Exception e) {
                    if (!(e is EndpointNotFoundException)) throw e;
                    Thread.Sleep(1000);
                }
            }
        }
        private void ReConect() {
            lock (_channelSync) {
                try {
                    _channel.KillConnection();
                }
                catch (Exception e) {
                    Console.WriteLine("(ReConect-exception  \"{0}\"", e.Message);
                }
                Conect();
            }
        }
        public void Fire(Event evt) {
            lock (_eventQueue) {
                _eventQueue.Add(evt);
            }
        }
        private void FireProc() {
            while (true) {
                var isHasEventsToFire = false;
                lock (_eventQueue) {
                    isHasEventsToFire = _eventQueue.Any();
                }
                if (_isSubscribed && isHasEventsToFire) {
                    Event evt;
                    lock (_eventQueue) {
                        evt = _eventQueue.First();
                    }
                    try {
                        lock (_eventQueue) {
                            _eventQueue.Remove(evt);
                        }
                        _channel.Fire(evt); 
                    }
                    catch (Exception) {
                        if (_isSubscribed)
                            _isSubscribed = false;
                        ReConect();
                    }
                } else Thread.Sleep(10);
            }
        }
        public void SetHandler(string destination, CbHandler handler) {
            _handlers[destination] = handler;
        }
        public void DeleteHandler(string destination) {
            if(_handlers.ContainsKey(destination)) _handlers.Remove(destination);
        }
        public void Receive(Event evt) {
            if (_handlers.ContainsKey(evt.Destination)) {
                _handlers[evt.Destination].BeginInvoke(evt, null, null);
            }
            OnReceive(evt);
        }
        static public void HoldProcess() {
            var processName = Process.GetCurrentProcess().ProcessName;
            var defColor = Console.ForegroundColor;
            Console.ForegroundColor = ConsoleColor.Green;
            Console.WriteLine("The {0} is ready", processName);
            Console.WriteLine("Press  to terminate {0}", processName);
            Console.ForegroundColor = defColor;
            Console.ReadLine();
        }
    }
}



For convenience, create another small class that starts the service.

Srvrunner.cs
using System;
using System.ServiceModel;
namespace methanum {
    public class SrvRunner {
        private ServiceHost _sHost;
        public void Start(int port) {
            var uris = new[] { new Uri(string.Format("net.tcp://0.0.0.0:{0}", port)) };
            _sHost = new ServiceHost(typeof (Gate), uris);
            _sHost.Open();
            foreach (var uri2 in _sHost.BaseAddresses) {
                Console.WriteLine("Start on: {0}", uri2.ToString());
            }
        }
        public void Stop() {
            _sHost.Close();
        }
    }
}



Core





We have implemented all the classes necessary for communicating our applications. It remains to create a kernel to which our applications will connect. To do this, in the solution we create the “Core” project of the console application, we connect the methanum assembly to it. In general, we already wrote everything, it remains only to run.

Coremain.cs
using System;
using System.Linq;
using methanum;
namespace Core {
    internal class CoreMain {
        private static void Main(string[] args) {
            int port = 0;
            if ((!args.Any()) || (!int.TryParse(args[0], out port))) {
                Console.WriteLine("Usage:");
                Console.WriteLine("Core.exe port");
                Environment.Exit(1);
            }
            try {
                var coreSrv = new SrvRunner();
                coreSrv.Start(port);
                Console.WriteLine("The Core is ready.");
                Console.WriteLine("Press  to terminate Core.");
                Console.ReadLine();
                coreSrv.Stop();
            }
            catch (Exception e) {
                Console.WriteLine(e.Message);
            }
        }
    }
}



Usage example



For demonstration, we will create a primitive messenger: create another console application, add a link to the methanum assembly and insert the contents of the Program.cs file.

Program.cs
using System;
using System.Linq;
using methanum;
namespace ClentExamle {
    class Program {
        static void Main(string[] args) {
            if ((!args.Any())) {
                Console.WriteLine("Usage:");
                Console.WriteLine("ClentExample.exe coreAddress:port");
                Environment.Exit(1);
            }
            var userName = "";
            while (String.IsNullOrWhiteSpace(userName)) {
                Console.WriteLine("Please write user name:");
                userName = Console.ReadLine();   
            }
            try {
                var maingate = new Connector(args[0]);
                maingate.SetHandler("message", MsgHandler);
                Console.WriteLine("Hello {0}, now you can send messages", userName);
                while (true) {
                    var msg = Console.ReadLine();
                    var evt = new Event("message");
                    evt.SetData("name", userName);
                    evt.SetData("text", msg);
                    maingate.Fire(evt);
                }
            }
            catch (Exception e) {
                Console.WriteLine(e.Message);
            }
        }
        static private void MsgHandler(Event evt) {
            Console.WriteLine("[{0}] >> {1}", evt.GetStr("name"), evt.GetStr("text"));
        }
    }
}



Now run the Core.exe application by specifying a port on the command line, for example, “Core 2255”. Then we start several instances of ClentExample.exe with the command “ClentExample localhost: 2255”. Applications offer to enter a username, and then connect to the kernel. As a result, we get a broadcast primitive chat: each new message is sent by calling maingate.Fire (evt), received in the MsgHandler (Event evt) handler.



The full source is available on methanum gihaba .

Also popular now: