Saturday, 17 December 2016

Part3: Build your own monitoring system using Riemann,Graphite,Collectd.

In previous article , part2, we have discussed about Graphite integration with riemann. In this article I will give overview of Collectd and some advanced stream processing options in riemann.

Collectd Overview

Collectd is a daemon and gathers metrics from various sources, e.g. the operating system, applications, logfiles and external devices, and stores this information or makes it available over the network.

Collectd itself is a big item to discuss and there are lot of things you can achieve with it.But here I will discuss only the area of our interest. What we will do is we will tell collectd to send the metrics collected by it to the graphite server!!!!!!!!! pretty amazing right???

Collectd installation and Plugin concept

Download collectd from this link according to the flavour of your Linux distribution.

For my case steps are:

1) sudo apt-get install collectd 
2) service start collectd

Done. Your collectd is installed and running.
Now lets take a look at very important config file related to collectd.
In my case it is located at /etc/collectd/collectd.conf
If you will open this file you can observer some list of plugins and configuration related to each plugin.

In collectd we have concept of plugins. We need different types of plugin to fetch different types of metrics and doing monitoring activity.
  
Above is plugin to fetch cpu related info of the system on which collectd is running. We will se outcome of this plugin very soon on Graphite.

Now you must have figured out that if we want collectd to forward this metrics to graphite than we must be having some plugin for that. Ohhhh yeah!!!! your guess is right. We do have a plugin for it.





















two things we are doing here. First we are defining write_graphite plugin and second we are providing config for that plugin. The host name of graphite server is localhost as it is installed in same VM. All collectd related graphs will be rendered in graphite under prefix name we have set here. After adding graphite related plugin save the file and restart the collectd service.


Below is the outcome on Graphite dashboard for collectd:

























Here I am stopping my discussion for collectd and moving towards last and important section, riemann stream processing.

Riemann stream processing examples

I will show you few stream processing examples in riemann.

1) Send email based on service status

Below is the configuration for sending mail from Gmail. You can do similar things for your SMTP.
Add this configuration in your riemann.config file and restart riemann.
 (def email (mailer {:host "smtp.gmail.com"  
             :port 465  
             :ssl true  
             :tls true  
             :user "myaccout@gmail.com"  
             :pass "mypassword"  
             :from "myaccout@gmail.com"}))  
 (streams  
   (where (state "critical")  
    (email "xyz@gmail.com")))  

two things we are doing here.
1) Declaring email related configuration. This could be vary depending on the SMTP provider.
2) I am defining one stream rule such that if state of any service is critical than send out mail to some email id.

Lets send "critical" state from java code for our "fridge" service created in part1.
 RiemannClient c = RiemannClient.tcp("localhost", 5555);  
     c.connect();  
     c.event().  
         service("fridge").  
         state("critical").  
         metric(10).  
         tags("appliance", "cold").  
         send().  
         deref(5000, java.util.concurrent.TimeUnit.MILLISECONDS);  

Lets see the received mail in xyz@gmail.com













This the default mail template used by riemann. You can change the format and details of email. I am leaving that part for your assignment.


2) Email the exception 
Add  below stream processing rule in your riemann.config file.
 (streams  
   (where (service "exception-alert")  
    (email "xyz@gmail.com")))  

Lets send some exception from java code:
 RiemannClient c = RiemannClient.tcp("localhost", 5555);  
     c.connect();  
     try {  
       // some business logic  
       throw new NullPointerException("NullPointer exception in your system..Somebody will be in trouble!!! ");  
     } catch (Exception e) {  
       c.event().  
           service("exception-alert").  
           state(e.getLocalizedMessage()). // you can send full stacktrace also  
           tags("error", "exception", "failure").  
           send().  
           deref(5000, java.util.concurrent.TimeUnit.MILLISECONDS);  
     }  

Lets see the received mail in xyz@gmail.com:












what else you can do ?
1) Send email alert if some VM/service is down.
2) Filter and process stream depending on hostname,service name, metric value, service state, tag values etc... and perform some actions based on that.
3) You can set threshold values for metrics received and perform some actions if threshold value is crossed. e.g: VM cpu is very high, above 95%, some business specific constraint value is violated...

These are just few examples I have given. Check out the link I have posted at the end of article for riemann.

Below is the updated architecture diagram:



























Collectd daemon will send all system related generic metrics to Graphite.


In this three series of article I have just scratched the surface for this area. There are thousand different things and possibilities you can think and achieve with this monitoring framework.

Below are the useful links for different types of config,plugins and integration of other systems you can do with riemann,Graphite and collectd. I have just explained 3% of entire.Rest things you can add as per your need and use case of system.

Riemann:
http://riemann.io/clients.html
http://riemann.io/howto.html

Graphite:
https://graphiteapp.org/#integrations
http://graphite.readthedocs.io/en/latest/tools.html
http://grafana.org/

Collectd:
https://collectd.org/
https://collectd.org/wiki/index.php/Plugin

This is the last article of this series.
Hope you have enjoyed!!!

Please post your comments and doubts!!!

Part2: Build your own monitoring system using Riemann,Graphite,Collectd.

In previous article , part1, we have discussed about riemann installation and basic event sending from java application. In this article we will see riemann integration with Graphite. First lets discuss about why we need Graphite.

1) In Riemann the events are stored only till the TTL-time to live- value , we need something to store the events for longer term so that in future we can look at the statistics and get idea about system behaviour at the time of failure or error scenarios.
2) Riemann is stateless system and the riemann-dash board is also stateless. There are ways to store the definition of created dashboards but still they will show live data only.

While graphite has 2 great capabilities.
1)Storage
2)Easy and powerful dashboard UI.

Lets start with Graphite installation.

Graphite Installation

In this link you can see 4 different way of installing graphite and other component needed for it.
I am using 4th way : Installing From Synthesize

Synthesize provides script which automates installation of all necessary dependency and components needed for Graphite. But Synthesize installation method available for Ubuntu 14.04 version only. If you are using some other version and flavour of Linux than you should go with other way.

Installation steps for my case:
$ cd synthesize
$ sudo ./install
that's it !! Done!!

open Graphite dashboard in browser:





Lets integrate graphite with riemann.

Riemann Graphite Integration

Open riemann.config file. In my case it is located at /etc/riemann/riemann.config



































The red part I have highlighted is the newly added config for Graphite.
First I have provided location of Graphite VM. In my case it is the same machine so I am using localhost.
The next thing is stream processing rules.You can specify which services you want to render on Graphite. Here I am declaring both "fridge" and "jvm.nonheap.memory" service to render on Graphite dashboard. We have created this services in part1.



















As you can see Graphite has capabilities to store the metrics so you can configure time/date range. One more thing you should observe is Graphite creates new folder structure for each "." present in the service name. Here jvm.nonheap.memory folder structure you can see. So that you can organise and send your metrics accordingly.


What next you can do 

Grafana is the next thing you can add in your framework. In simple word Grafana is a dashboard which can operate upon the data stored in Graphite storage. So basically Graphite will be there but Grafana can use Graphite's data and provide much much better and advanced dashboard options.
Explore more on this from here : http://docs.grafana.org/

Below is the updated architecture diagram:



























So riemann processes the events and pushes the metrics data associated with events to Graphite for storage. Garphite stores it and display it on the dashboard. Grafana can leverage the data present with Graphite for further rendering.

That's it for now...

part3 is my next article on this series.

Please post your comments and doubts!!!

Part1: Build your own monitoring system using Riemann,Graphite,Collectd.

In this 3 article series of build your own monitoring system for your application I will give basic idea about the different tools and technologies you can use and I will demonstrate how they communicate with each other. I will also explain "what will be the next step"  or "what else you can add in this monitoring framework".

The main components we will discuss are :
1)Riemann
2)Graphite
3)Collectd

I will also give overview for some surrounding tools and plugins that we can attach to the above main components.

For all this exercise I am using Ubuntu 14.04.5 LTS. You can use any other Linux distribution of your choice. I will show installation steps for Ubuntu , but for other Linux distribution, the steps are not much different and difficult. Many user guides are available on the internet. Windows users I am feeling sorry for you as Riemann and Graphite are not supported in windows as of now.

Lets start with Riemann.

Riemann

As per http://riemann.io/ ,what is riemann : A network event stream processing system, in Clojure

This is the theory. Let me break down the words and explain you as a developer point of view.

Network: Some server accessible on network (true... when you start riemann, it starts a server port and listen on that port for events , what is event ???)

Event : Some event in your application which has some data, some metrics associated with it, which can be stored and can be analysed.
E.g: User response time for some DB related operation, time to complete some ETL process , Number of times some operation performed ,Some memory or cpu related metrics.....

Stream processing : With both Network and Event now you have flow of events coming into the riemann. So definitely you are going to do something with that stream of events. We will write some rules for processing of the events. 

Clojure: The event processing rules in riemann we have to write as a Clojure script in riemann.config file.
Lets start with installation.

Riemann Installation

I will explain it in very brief as it is very simple!!!

1) Download riemann installer of your flavor(.deb, .rpm , .tar) from http://riemann.io/
2)Install it. For my case I just have to run the .deb file and its done, riemann installed.
3) Lets download some utility and dashboard for riemann. For that ruby will be needed.
    Run below commands to download dashboard and utility.
     - sudo apt install ruby
     -sudo gem install riemann-client riemann-tools riemann-dash

The installation part is done for riemann.

Starting riemann and riemann dashboarad

Lets start riemann and riemann-dash

1)  service riemann start  OR riemann OR riemann /etc/riemann/riemann.config
      You can start riemann using any one of the above commands.



























So you can see that riemann server has started listening on port 5555. This is the default port configuration. You can override that settings from riemann.config file.

2) Run command: riemann-dash 





So riemann dashboard has started on port 4567, Lets open it in the browser.





















Now press "Ctrl" and click on the area I have marked with red circle in the above image.
Once clicked , after that press "e" and you can see one popup window. Populate the values of that popup in the same way I did it in the below image.





















"true" in query section means display all the streams coming into riemann.
Once you click on "Apply", you can see some system events being sent to riemann.






















Now the next thing we are going to do is very important and interesting.

Riemann-Clients

We have riemann server up and running. Now we want to send our user defined events from our application to riemann for processing. In http://riemann.io/clients.html page you can see riemann has already client library present for C,C++,C#,Clojure,Elixir,Erlang,Go,Java,Lua,Node.js,OCaml,Perl,Python,Rubu,Scala and also supporting many other Tools, programs and plugins which can be integrated with riemann.

I am going to use riemann-java-client for this purpose and will send my own user defined events from my java program using the riemann java client library.

Pom file structure:
 <?xml version="1.0" encoding="UTF-8"?>  
 <project xmlns="http://maven.apache.org/POM/4.0.0"  
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
   <modelVersion>4.0.0</modelVersion>  
   <groupId>com.ali</groupId>  
   <artifactId>riemann</artifactId>  
   <version>1.0-SNAPSHOT</version>  
 <repositories>  
   <repository>  
     <id>clojars.org</id>  
     <url>http://clojars.org/repo</url>  
   </repository>  
 </repositories>  
   <dependencies>  
     <dependency>  
       <groupId>io.riemann</groupId>  
       <artifactId>riemann-java-client</artifactId>  
       <version>0.4.2</version>  
     </dependency>  
   </dependencies>  
 </project>  


Below example java class I have copied from riemann-java client git homepage only with some modifications.
 import io.riemann.riemann.client.RiemannClient;  
   
 import java.io.IOException;  
   
   
 public class BasicEventTest {  
   public static void main(String... args) throws IOException, InterruptedException {  
     RiemannClient c = RiemannClient.tcp("localhost", 5555); // creating connection object 
     c.connect();  // connecting to riemann server
     int temperature = 1;  
     while (true) {  
       if (temperature > 10) temperature = 1;  
       Thread.sleep(1000);  
       c.event().                // creating event
           service("fridge").  
           state("running").  
           metric(temperature++).  
           tags("appliance", "cold").  
           send().              // sending event
           deref(5000, java.util.concurrent.TimeUnit.MILLISECONDS);  
     }  
   }  
 }  
   

Let me explain what we are doing here:

1) creating riemann client which will connect to specified riemann server .
2) Creating event and giving service name, state, metric value, tags for extra metadata and sending it to riemann server.

Once you run this program , you can see the event on the riemann dashboard.

you can see that the temperature we are sending as a metric value is associated with service name "fridge". Once you click on the fridge service it will show more details associated with down below.
Here in our case value of temperature is changing every one second because we are sending new metric value every 1 second in our java program.
ttl value is 60 here so each event will be present in the riemann for 60 seconds only.  You can configure it in riemann.config file.
Remember this point as this will be the main point for need of Graphite  , In part2 I will explain need and benefit of graphite.

This service and metric was dummy. But you can think of any useful events and metrics inside your application and send them to riemann for processing.

Lets see one more example of Event sending from java application:
 import io.riemann.riemann.client.RiemannClient;  
   
 import java.io.IOException;  
 import java.lang.management.ManagementFactory;  
 import java.lang.management.MemoryMXBean;  
   
 public class BasicEventTest {  
   public static void main(String... args) throws IOException, InterruptedException {  
     RiemannClient c = RiemannClient.tcp("localhost", 5555);  
     c.connect();  
     while (true) {  
       Thread.sleep(1500);  
       MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();  
       c.event().  
           service("jvm.nonheap.memory").  
           // state("running").   state is not needed here  
           metric(memoryMXBean.getNonHeapMemoryUsage().getUsed() / 1024).  
           tags("jvm nonheap used memory").  
           send().  
           deref(5000, java.util.concurrent.TimeUnit.MILLISECONDS);  
     }  
   }  
 }  
   

In this example we are sending JVM non heap used memory using Memory Mbean and sending its value to riemann.


























Similarly we can send all jvm related metrics like jvm cpu, thread count ,heap memory usage etc...using Mbeans.

In this article we have discussed basic event sending from riemann client to riemann server and its basic display on riemann dashboard.

In part2 we will see riemann integration with Graphite and also discuss why it is needed.
In part3 we will see some advanced stream processing examples in riemann and some basic overview about Collected.

Below diagram depicts the architecture covered till now in this series.
After each article of this series I will update this diagram with newly learned things.























That's it for now...

part2 is my next article on this series.

Please post your comments and doubts!!!

Friday, 21 October 2016

Create and read PKCS #8 format private key in java program.

In this short article I will show you how to store private key in pkcs8 format in java and again read back the stored key in java.

PKCS #8 defines a standard syntax for storing private key information. There are 2 ways we can store private key in pkcs8 format.

1) unencrypted key
2) encrypted key

I will create both types of keys in java and store them in file. After that I will read them from file and create privatekey java object from stored file. We are using bouncy castle API for this program.

1) Create pkcs8 key

Code to create pkcs8 :
 import org.bouncycastle.openssl.PKCS8Generator;  
 import org.bouncycastle.openssl.jcajce.JcaPEMWriter;  
 import org.bouncycastle.openssl.jcajce.JcaPKCS8Generator;  
 import org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8EncryptorBuilder;  
 import org.bouncycastle.operator.OperatorCreationException;  
 import org.bouncycastle.operator.OutputEncryptor;  
 import org.bouncycastle.util.io.pem.PemObject;  
   
 import java.io.FileOutputStream;  
 import java.io.IOException;  
 import java.io.StringWriter;  
 import java.security.KeyPair;  
 import java.security.KeyPairGenerator;  
 import java.security.NoSuchAlgorithmException;  
 import java.security.SecureRandom;  
 import java.security.spec.InvalidKeySpecException;  
   
 public class App3 {  
   public static void main(String[] args) throws NoSuchAlgorithmException, IOException, OperatorCreationException, InvalidKeySpecException {  
   
     KeyPairGenerator kpGen = KeyPairGenerator.getInstance("RSA");  
     kpGen.initialize(2048, new SecureRandom());  
     KeyPair keyPair = kpGen.generateKeyPair();  
   
   
     //unencrypted form of PKCS#8 file  
     JcaPKCS8Generator gen1 = new JcaPKCS8Generator(keyPair.getPrivate(), null);  
     PemObject obj1 = gen1.generate();  
     StringWriter sw1 = new StringWriter();  
     try (JcaPEMWriter pw = new JcaPEMWriter(sw1)) {  
       pw.writeObject(obj1);  
     }  
     String pkcs8Key1 = sw1.toString();  
     FileOutputStream fos1 = new FileOutputStream("D:\\privatekey-unencrypted.pkcs8");  
     fos1.write(pkcs8Key1.getBytes());  
     fos1.flush();  
     fos1.close();  
   
     //encrypted form of PKCS#8 file  
     JceOpenSSLPKCS8EncryptorBuilder encryptorBuilder = new JceOpenSSLPKCS8EncryptorBuilder(PKCS8Generator.PBE_SHA1_RC2_128);  
     encryptorBuilder.setRandom(new SecureRandom());  
     encryptorBuilder.setPasssword("abcde".toCharArray()); // password  
     OutputEncryptor encryptor = encryptorBuilder.build();  
   
     JcaPKCS8Generator gen2 = new JcaPKCS8Generator(keyPair.getPrivate(), encryptor);  
     PemObject obj2 = gen2.generate();  
     StringWriter sw2 = new StringWriter();  
     try (JcaPEMWriter pw = new JcaPEMWriter(sw2)) {  
       pw.writeObject(obj2);  
     }  
     String pkcs8Key2 = sw2.toString();  
     FileOutputStream fos2 = new FileOutputStream("D:\\privatekey-encrypted.pkcs8");  
     fos2.write(pkcs8Key2.getBytes());  
     fos2.flush();  
     fos2.close();  
   }  
 }  
   

 
So you can see that for unencrypted key we do not provide any encryptor object which contains information about algorithm, password etc. While creating encrypted key we do provide that details.

As an outcome of this program we will have below 2 files in our file system:






Lets open them in notepad and check the difference.

encrypted key file:























unencrypted key file:























You can see the difference in start and end tag of both the files.

2) Read pkcs8 key

Code to read pkcs8:
 import org.bouncycastle.util.encoders.Base64;  
 import javax.crypto.EncryptedPrivateKeyInfo;  
 import javax.crypto.SecretKeyFactory;  
 import javax.crypto.spec.PBEKeySpec;  
 import java.io.IOException;  
 import java.nio.file.Files;  
 import java.nio.file.Paths;  
 import java.security.InvalidKeyException;  
 import java.security.KeyFactory;  
 import java.security.NoSuchAlgorithmException;  
 import java.security.PrivateKey;  
 import java.security.spec.InvalidKeySpecException;  
 import java.security.spec.PKCS8EncodedKeySpec;  
   
 public class App4 {  
   
   public static void main(String[] args) throws IOException, NoSuchAlgorithmException, InvalidKeySpecException, InvalidKeyException {  
   
     String encrypted = new String(Files.readAllBytes(Paths.get("D:\\privatekey-encrypted.pkcs8")));  
     String unencrypted = new String(Files.readAllBytes(Paths.get("D:\\privatekey-unencrypted.pkcs8")));  
   
     //Create object from unencrypted private key  
     unencrypted = unencrypted.replace("-----BEGIN PRIVATE KEY-----", "");  
     unencrypted = unencrypted.replace("-----END PRIVATE KEY-----", "");  
     byte[] encoded = Base64.decode(unencrypted);  
     PKCS8EncodedKeySpec kspec = new PKCS8EncodedKeySpec(encoded);  
     KeyFactory kf = KeyFactory.getInstance("RSA");  
     PrivateKey unencryptedPrivateKey = kf.generatePrivate(kspec);  
   
     //Create object from encrypted private key  
     encrypted = encrypted.replace("-----BEGIN ENCRYPTED PRIVATE KEY-----", "");  
     encrypted = encrypted.replace("-----END ENCRYPTED PRIVATE KEY-----", "");  
     EncryptedPrivateKeyInfo pkInfo = new EncryptedPrivateKeyInfo(Base64.decode(encrypted));  
     PBEKeySpec keySpec = new PBEKeySpec("abcde".toCharArray()); // password  
     SecretKeyFactory pbeKeyFactory = SecretKeyFactory.getInstance(pkInfo.getAlgName());  
     PKCS8EncodedKeySpec encodedKeySpec = pkInfo.getKeySpec(pbeKeyFactory.generateSecret(keySpec));  
     KeyFactory keyFactory = KeyFactory.getInstance("RSA");  
     PrivateKey encryptedPrivateKey = keyFactory.generatePrivate(encodedKeySpec);  
   
     //comparing both private key for equality  
     System.out.println(unencryptedPrivateKey.equals(encryptedPrivateKey));  
   }  
 }  
   


output:








So here you can after generating back the private key object from file we have compared them for equality and they returned true because they have been created from same private key and stored in file.

That's it for now...

Please post your comments and doubts!!!









Sunday, 16 October 2016

Create PKCS#7/P7B Format in java program.

In this short article I will show you how to create pkcs7 format file to store certificate or chain of certificate.

A pkcs7 format  has a file extention of .p7b or .p7c. It can only contains certificates and chain certificates, not the private key.

I will use 3 certificates I have created in previous article and store them in pkcs7 format.

Code:
 import sun.security.pkcs.ContentInfo;  
 import sun.security.pkcs.PKCS7;  
 import sun.security.pkcs.SignerInfo;  
 import sun.security.x509.AlgorithmId;  
 import sun.security.x509.X509CertImpl;  
 import java.io.FileInputStream;  
 import java.io.FileOutputStream;  
 import java.io.IOException;  
 import java.security.cert.CertificateException;  
 import java.security.cert.X509Certificate;  
   
 public class App2 {  
   
   public static void main(String[] args) throws IOException, CertificateException {  
     //loading certificates stored as file  
     FileInputStream rootCAFile = new FileInputStream("D:\\rootCA.cer");  
     FileInputStream intermedCAFile = new FileInputStream("D:\\intermedCA.cer");  
     FileInputStream endUserCertFile = new FileInputStream("D:\\endUserCert.cer");  
   
     //create certificate objects from fileinputstream  
     X509Certificate rootCA = new X509CertImpl(rootCAFile);  
     X509Certificate intermedCA = new X509CertImpl(intermedCAFile);  
     X509Certificate endUserCert = new X509CertImpl(endUserCertFile);  
   
     //create the certificate hierarchy array  
     X509Certificate[] chain = new X509Certificate[3];  
     chain[0] = endUserCert;  
     chain[1] = intermedCA;  
     chain[2] = rootCA;  
   
     //create pkcs7 object with the cert chain created  
     PKCS7 pkcs7 = new PKCS7(new AlgorithmId[0], new ContentInfo(ContentInfo.DATA_OID, null),  
         chain, new SignerInfo[0]);  
   
     // store it as .p7b or .p7c format file  
     FileOutputStream fos = new FileOutputStream("D:\\bundle.p7b");  
     pkcs7.encodeSignedData(fos);  
     fos.close();  
   }  
 }  
   


As an outcome you will have below file in your file system.



Lets open the file and check out the content.



So you can see that it has packaged all the certificates we have provided in the cert chain.

That's it for now...
Stay tuned for my next article related to pkcs8 format....

Please post your comments and doubts!!!











Create Version 3, X.509 certificate hierarchy in java using bouncy castle API.

In this article I am going to talk about creating SSL certificates in java program using bouncy castle API.

Bouncy castle is a lightweight cryptography API. It is implementation of Java Cryptography Extension(JCE) and the Java Cryptography Architecture.(JCA).

I will create very basic certificates which contains only necessary properties.You can explore the API for more operations and properties you can apply to a certificate.

Please import below dependency in your code.
   
     <!-- https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-jdk15on -->  
     <dependency>  
       <groupId>org.bouncycastle</groupId>  
       <artifactId>bcpkix-jdk15on</artifactId>  
       <version>1.55</version>  
     </dependency>  
     <!-- https://mvnrepository.com/artifact/joda-time/joda-time -->  
     <dependency>  
       <groupId>joda-time</groupId>  
       <artifactId>joda-time</artifactId>  
       <version>2.9.4</version>  
     </dependency>  


I am performing below steps in the code:
Steps:
1) Creating a self signed root certificate.
2) Creating an intermediate certificate signed by root certificate created in step 1.
3) Creating an end user certificate signed by intermediate certificate created in step 2.

Code:
 import org.bouncycastle.asn1.x500.X500Name;  
 import org.bouncycastle.asn1.x509.BasicConstraints;  
 import org.bouncycastle.asn1.x509.Extension;  
 import org.bouncycastle.asn1.x509.KeyUsage;  
 import org.bouncycastle.cert.X509v3CertificateBuilder;  
 import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;  
 import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder;  
 import org.bouncycastle.jce.provider.BouncyCastleProvider;  
 import org.bouncycastle.operator.OperatorCreationException;  
 import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;  
 import org.joda.time.DateTime;  
   
 import java.io.FileOutputStream;  
 import java.io.IOException;  
 import java.math.BigInteger;  
 import java.security.*;  
 import java.security.cert.CertificateEncodingException;  
 import java.security.cert.CertificateException;  
 import java.security.cert.X509Certificate;  
 import java.util.Random;  
   
   
 public class App {  
   
   public static void main(String[] args) throws KeyStoreException, NoSuchAlgorithmException, CertificateException, IOException, OperatorCreationException, InvalidKeyException, NoSuchProviderException, SignatureException, UnrecoverableKeyException {  
     Security.addProvider(new BouncyCastleProvider());  
   
     // Create self signed Root CA certificate  
     KeyPair rootCAKeyPair = generateKeyPair();  
     X509v3CertificateBuilder builder = new JcaX509v3CertificateBuilder(  
         new X500Name("CN=rootCA"), // issuer authority  
         BigInteger.valueOf(new Random().nextInt()), //serial number of certificate  
         DateTime.now().toDate(), // start of validity  
         new DateTime(2025, 12, 31, 0, 0, 0, 0).toDate(), //end of certificate validity  
         new X500Name("CN=rootCA"), // subject name of certificate  
         rootCAKeyPair.getPublic()); // public key of certificate  
     // key usage restrictions  
     builder.addExtension(Extension.keyUsage, true, new KeyUsage(KeyUsage.keyCertSign));  
     builder.addExtension(Extension.basicConstraints, false, new BasicConstraints(true));  
     X509Certificate rootCA = new JcaX509CertificateConverter().getCertificate(builder  
         .build(new JcaContentSignerBuilder("SHA256withRSA").setProvider("BC").  
             build(rootCAKeyPair.getPrivate()))); // private key of signing authority , here it is self signed  
     saveToFile(rootCA, "D:\\rootCA.cer");  
   
   
     //create Intermediate CA cert signed by Root CA  
     KeyPair intermedCAKeyPair = generateKeyPair();  
     builder = new JcaX509v3CertificateBuilder(  
         rootCA, // here rootCA is issuer authority  
         BigInteger.valueOf(new Random().nextInt()), DateTime.now().toDate(),  
         new DateTime(2025, 12, 31, 0, 0, 0, 0).toDate(),  
         new X500Name("CN=IntermedCA"), intermedCAKeyPair.getPublic());  
     builder.addExtension(Extension.keyUsage, true, new KeyUsage(KeyUsage.keyCertSign));  
     builder.addExtension(Extension.basicConstraints, false, new BasicConstraints(true));  
     X509Certificate intermedCA = new JcaX509CertificateConverter().getCertificate(builder  
         .build(new JcaContentSignerBuilder("SHA256withRSA").setProvider("BC").  
             build(rootCAKeyPair.getPrivate())));// private key of signing authority , here it is signed by rootCA  
     saveToFile(intermedCA, "D:\\intermedCA.cer");  
   
     //create end user cert signed by Intermediate CA  
     KeyPair endUserCertKeyPair = generateKeyPair();  
     builder = new JcaX509v3CertificateBuilder(  
         intermedCA, //here intermedCA is issuer authority  
         BigInteger.valueOf(new Random().nextInt()), DateTime.now().toDate(),  
         new DateTime(2025, 12, 31, 0, 0, 0, 0).toDate(),  
         new X500Name("CN=endUserCert"), endUserCertKeyPair.getPublic());  
     builder.addExtension(Extension.keyUsage, true, new KeyUsage(KeyUsage.digitalSignature));  
     builder.addExtension(Extension.basicConstraints, false, new BasicConstraints(false));  
     X509Certificate endUserCert = new JcaX509CertificateConverter().getCertificate(builder  
         .build(new JcaContentSignerBuilder("SHA256withRSA").setProvider("BC").  
             build(intermedCAKeyPair.getPrivate())));// private key of signing authority , here it is signed by intermedCA  
     saveToFile(endUserCert, "D:\\endUserCert.cer");  
   }  
   
   private static KeyPair generateKeyPair() throws NoSuchAlgorithmException, NoSuchProviderException {  
     KeyPairGenerator kpGen = KeyPairGenerator.getInstance("RSA", "BC");  
     kpGen.initialize(2048, new SecureRandom());  
     return kpGen.generateKeyPair();  
   }  
   
   private static void saveToFile(X509Certificate certificate, String filePath) throws IOException, CertificateEncodingException {  
     FileOutputStream fileOutputStream = new FileOutputStream(filePath);  
     fileOutputStream.write(certificate.getEncoded());  
     fileOutputStream.flush();  
     fileOutputStream.close();  
   }  
   
 }  
   


Here you can see, for rootCA I am using its own private key to sign the certificate. For intermedCA I am using rootCA private key for signing. For end user certificate I am using IntermedCA private key for signing.
In real life also you can see a chain of certificate similar to this. Open any certificate of HTTPS connection from browser and observe the certificate chain and properties of each certificate.

"SHA256withRSA" is a signature algorithm I am using it for signing the certificates.
In keyUsage "keyCertSign" restricts the certificate usage only for signing other certificates. While "digitalSignature" usage needed for SSL client , our web browser, for example which uses that certificate for entity authentication and data origin authentication with integrity.

The "true" flag in BasicConstraints mark the certificate as a CA certificate which can sign other certificates. "false" flag mark the certificate as end entity of the certificate chain.

As an out come of this program you will have 3 certificates in the specified file path.





Now open the rootCA first:


























You can see certificate not trusted message. So lets add this certificate in trusted root certificate store

1) Click on "Install certificate" button.
2) select "Local Machine" , click next
3) select second option of "place all certificates in the following store"
4) Browse and select "Trusted Root Certification Authority"
5) Click "Next" than click "Finish".
6) Import successful.


Same thing perform for "Intermed CA" certificate , with only change in step 4: Browse and select "Intermediate Certification Authority".

Now close the certificate and reopen it.
Lets examine each certificate now:

1) RootCA



              




2)Intermed CA
























3) End user certificate






















































There are other ways to create certificates and certificate chain. For e.g:  java "keytool" command or using tool like "keystore explorer"....
But Many times you have situation where you have to do certificate operations from program only.
So this will be very helpful at that time.

In next article I will post more on the topic of certificates and the operations you can perform using bouncy castle from java program.

Please post your comments and doubts!!!


Tuesday, 11 October 2016

Performance comparison of multi-threaded client-server application between blocking and Non-blocking IO in java.

In this article I am going to talk about basic multi-threaded client-server application. We will create and observe behavior using blocking IO (OIO) and after that We will create similar application using NIO and observe the behavior and performance impacts.

1) Blocking IO multi-threaded client and server program

Server Code:
 import io.netty.util.concurrent.DefaultThreadFactory;   
  import java.io.BufferedReader;   
  import java.io.IOException;   
  import java.io.InputStreamReader;   
  import java.io.PrintWriter;   
  import java.net.ServerSocket;   
  import java.net.Socket;   
  import java.util.concurrent.ExecutorService;   
  import java.util.concurrent.Executors;   
     
  public class OioServer {   
   public static void main(String... args) throws IOException {   
    ExecutorService threadPool = Executors.newFixedThreadPool(5, new DefaultThreadFactory("serverpool"));   
    try (ServerSocket listener = new ServerSocket(6689)) {   
     while (true) {   
      // this statement remains blocked till the time any new connection request is received from client   
      Socket socket = listener.accept();   
      threadPool.execute(new MyServerThread(socket));   
     }   
    }   
   }   
   private static class MyServerThread implements Runnable {   
    private final Socket socket;   
     
    private MyServerThread(Socket socket) {   
     this.socket = socket;   
    }   
    public void run() {   
     try {   
      socket.setKeepAlive(true);   
      BufferedReader clientDataStream = new BufferedReader(new InputStreamReader(socket.getInputStream()));   
      PrintWriter out = new PrintWriter(socket.getOutputStream(), true);   
      // first message from client is client name   
      String clientName = clientDataStream.readLine();   
      //sending back acknowledgement to client   
      out.println(clientName + " connected");   
      System.out.println(clientName + " connected");   
      // second message from client is dummy data to process by server   
      System.out.println(clientDataStream.readLine() + " Processed by server");   
     } catch (IOException e) {   
      e.printStackTrace();   
     } finally {   
      try {   
       socket.close();   
      } catch (IOException e) {   
       e.printStackTrace();   
      }   
     }   
    }   
   }   
  }   




Let me explain what is happening in this program:

1) we are creating a thread pool of fixed size 5. As this is server you have to manage resources so you can not create infinite threads or cached thread pool which keeps on creating threads. Else the server will be out of processing and memory resources.For this example I have kept the count very low but in real life it could be much higher.

2) In the main thread we are running an infinite while loop which accepts a connection from client , once the client connection received it submits the handling of that connection to a thread .

3) As we have 5 threads only for processing actual requests , at a time only 5 client connections will be handled. If client request come and thread pool has no free thread than it queues up that thread. When ever any thread become free in thread pool the queued task will be assigned to that thread.

4) In the client handling thread , first we are expecting client name from client . Second we are sending acknowledgement back to the client. Third we are again expecting some data from client  to be processed by server. Than we just sysout that data in server.

5) clientDataStream.readLine() is a blocking operation and that method call remains blocked till the time some data is received from client. The processing thread remains occupied till the time that method call is not over.

Client Code:
 import io.netty.util.concurrent.DefaultThreadFactory;   
  import java.io.BufferedReader;   
  import java.io.IOException;   
  import java.io.InputStreamReader;   
  import java.io.PrintWriter;   
  import java.net.Socket;   
  import java.util.concurrent.ExecutorService;   
  import java.util.concurrent.Executors;   
     
  public class OioClient {   
   public static void main(String... args) throws IOException, InterruptedException {   
    int count = 1;   
    ExecutorService threadPool = Executors.newCachedThreadPool(new DefaultThreadFactory("clientpool"));   
    while (true) {   
     threadPool.execute(new MyClientThread(count));   
     if (++count > 10) break;   
     Thread.sleep(500); // just to make sure ordering  
   
    }   
   }   
     
   private static class MyClientThread implements Runnable {   
    private final int number;   
     
    private MyClientThread(int number) {   
     this.number = number;   
    }   
    public void run() {   
     Socket s = null;   
     try {   
      //trying to connect to server   
      s = new Socket("localhost", 6689);   
      s.setKeepAlive(true);   
      BufferedReader serverDataStream = new BufferedReader(new InputStreamReader(s.getInputStream()));   
      PrintWriter out = new PrintWriter(s.getOutputStream(), true);   
      //sending client name to server   
      out.println("client" + number);   
      //receive response from server   
      System.out.println(serverDataStream.readLine());   
      // sleep for 15 seconds   
      Thread.sleep(15000L);   
      //send some data to server for processing   
      out.println("client" + number + " Data");   
     } catch (Exception e) {   
      System.out.println("In the Exception for Client " + number);   
      e.printStackTrace();   
     } finally {   
      try {   
       if (s != null) {   
        s.close();   
       }   
      } catch (IOException e) {   
       e.printStackTrace();   
      }   
     }   
    }   
     
   }   
  }   
     

What we are doing here is:
1) In the main thread we are creating 10 client threads and submit them for processing.

2) Each client thread First connecting to server. Than sending its name to server. After that it expects response from server. Than it is sleeping for 15 seconds and after that sending some data to server for processing. Than the thread is over. It will close the socket and come out of execution.

3) As we have seen that ,At a time server can handle 5 threads only and here each client thread has 15 second sleep inside the client server communication. So each thread in the server code which handles client communication will take minimum 15 seconds or more to become free and available for processing some new client request.


Client outcome:

















As you can see after getting acknowledgement from first 5 threads it will take long pause because the remaining 5 threads submitted for execution but server has no threads available for processing that next 5 connection as it is busy handling the first 5 connection request from client.


Server outcome:






















As you can see when server receives first 5 connection from client it prints the client name and send acknowledgement to client and after that each server thread waits for client to send some data for processing , but client takes 15 seconds delay to send that data so all server threads remains in blocked state and no new client connection request will be accepted , all of the rest connection request threads will be queued up for processing. Once each processing threads receives response from client they are processing it and those threads will be available for queued connection request. Once all queued request filled up thread-pool again than all of them remains in blocked state till the time they receive processing data from client.


  // second message from client is dummy data to process by server   
     System.out.println(clientDataStream.readLine() + " Processed by server");   

Above lines from server code makes the threads block as client will send data after 15 seconds delay.

Now Lets see similar client server interaction with NIO using Netty framework.
I will keep all the parameters like thread pool size , sleep time exactly same as OIO program.


2) NIO multi-threaded client and server program using Netty

If you are new to Netty please read my previous article in which I gave basic introduction related to Netty.
http://techxperiment.blogspot.in/2016/09/demonstration-of-basic-difference.html

Server Code:
 import io.netty.bootstrap.ServerBootstrap;   
  import io.netty.channel.*;   
  import io.netty.channel.nio.NioEventLoopGroup;   
  import io.netty.channel.socket.SocketChannel;   
  import io.netty.channel.socket.nio.NioServerSocketChannel;   
  import io.netty.handler.codec.LineBasedFrameDecoder;   
  import io.netty.handler.codec.string.StringDecoder;   
  import io.netty.handler.codec.string.StringEncoder;   
  import io.netty.util.concurrent.DefaultThreadFactory;   
     
     
  public class NioServer {   
   public static void main(String... args) throws InterruptedException {   
    EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 1 thread to accept connections   
    //5 threads to process the connections   
    EventLoopGroup workerGroup = new NioEventLoopGroup(5, new DefaultThreadFactory("serverpool"));   
    try {   
     ServerBootstrap b = new ServerBootstrap();   
     b.group(bossGroup, workerGroup)   
       .channel(NioServerSocketChannel.class)   
       .childHandler(new ChannelInitializer<SocketChannel>() {   
        @Override   
        public void initChannel(SocketChannel ch) throws Exception {   
         ch.pipeline().addLast(new LineBasedFrameDecoder(50));   
         ch.pipeline().addLast(new StringDecoder());   
         ch.pipeline().addLast(new StringEncoder());   
     
         ch.pipeline().addLast(new MyServerChannelHandler());   
        }   
       })   
       .childOption(ChannelOption.SO_KEEPALIVE, true);   
     // Bind and start to accept incoming connections on port 8881.   
     ChannelFuture f = b.bind(8881).sync();   
     f.addListener(channelFuture -> {   
      if (channelFuture.isSuccess()) {   
       System.out.println("Server started...");   
      } else {   
       System.out.println(channelFuture.cause());   
      }   
     });   
     f.channel().closeFuture().sync();   
    } finally {   
     workerGroup.shutdownGracefully();   
     bossGroup.shutdownGracefully();   
    }   
   }   
     
   private static class MyServerChannelHandler extends SimpleChannelInboundHandler<String> {   
    @Override   
    public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {   
     if (!msg.contains("Data")) { // if client sends its name   
      System.out.println(msg + " connected"); // print on console   
      ctx.writeAndFlush(msg + " connected" + "\n"); //sending back acknowledgement to client   
     } else { // if client sends some data for processing   
      System.out.println(msg + " Processed by server"); // print on console   
     }   
    }   
   }   
  }   


What we are doing here:

1) creating 1 thread which accepts the client connections and 5 threads which can actually process that request. Which is exactly same as OIO server code.

2)MyServerChannelHandler handles the client request exactly in similar way of OIO server program. I wrote comments to make it clear the response flow of server.


Client Code:
 import io.netty.bootstrap.Bootstrap;   
  import io.netty.channel.*;   
  import io.netty.channel.nio.NioEventLoopGroup;   
  import io.netty.channel.socket.SocketChannel;   
  import io.netty.channel.socket.nio.NioSocketChannel;   
  import io.netty.handler.codec.LineBasedFrameDecoder;   
  import io.netty.handler.codec.string.StringDecoder;   
  import io.netty.handler.codec.string.StringEncoder;   
  import io.netty.util.concurrent.DefaultThreadFactory;   
  import io.netty.util.concurrent.GenericFutureListener;   
  import java.util.concurrent.ExecutorService;   
  import java.util.concurrent.Executors;   
     
  public class NioClient {   
   public static void main(String... args) throws InterruptedException {   
    int count = 1;   
    ExecutorService threadPool = Executors.newCachedThreadPool(new DefaultThreadFactory("clientpool"));   
    while (true) {   
     threadPool.execute(new MyClientThread(count));   
     if (++count > 10) break;   
     Thread.sleep(500); // just to make sure ordering   
    }   
   }   
     
   private static class MyClientThread implements Runnable {   
    private final int number;   
     
    private MyClientThread(int number) {   
     this.number = number;   
    }   
    @Override   
    public void run() {   
     EventLoopGroup workerGroup = new NioEventLoopGroup(1);   
     try {   
      Bootstrap b = new Bootstrap();   
      b.group(workerGroup);   
      b.channel(NioSocketChannel.class);   
      b.option(ChannelOption.SO_KEEPALIVE, true);   
      b.handler(new ChannelInitializer<SocketChannel>() {   
       @Override   
       public void initChannel(SocketChannel ch) throws Exception {   
        ch.pipeline().addLast(new LineBasedFrameDecoder(50));   
        ch.pipeline().addLast(new StringDecoder());   
        ch.pipeline().addLast(new StringEncoder());   
        ch.pipeline().addLast(new MyClientChannelHandler());   
       }   
      });   
      ChannelFuture f = null;   
      try {   
       //Trying to connect to server   
       f = b.connect("localhost", 8881).sync();   
      } catch (InterruptedException e) {   
       e.printStackTrace();   
      }   
      f.addListener(new GenericFutureListener<ChannelFuture>() {   
       public void operationComplete(ChannelFuture channelFuture) throws Exception {   
        if (channelFuture.isSuccess()) {   
         // any logger statements when client connected to server   
        } else {   
         System.out.println(channelFuture.cause());   
        }   
       }   
      });   
      //sending client name to server   
      f.channel().writeAndFlush("client" + number + "\n");   
      // sleep for 15 seconds   
      Thread.sleep(15000);   
      //send some data to server for processing   
      f.channel().writeAndFlush("client" + number + " Data" + "\n");   
     
      // Wait until the connection is closed.   
      try {   
       f.channel().closeFuture().sync();   
      } catch (InterruptedException e) {   
       e.printStackTrace();   
      }   
     } catch (InterruptedException e) {   
      e.printStackTrace();   
     } finally {   
      workerGroup.shutdownGracefully();   
     }   
    }   
   }   
     
   private static class MyClientChannelHandler extends SimpleChannelInboundHandler<String> {   
    @Override   
    public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {   
     System.out.println(msg); // simply print on console whatever data received from client   
    }   
   }   
  }   

Here we are doing:
1) creating 10 threads which will try to connect to server and interact.

2) each thread once connected , first sends its name to client. Second sleep for 15 seconds , after that it sends data for processing to server.

3) MyClientChannelHandler will simply prints what ever response received from server.

Lets see the outcome of this program:

Client outcome:













Here we can see that all 10 clients get connected and received acknowledgement from server without any delay even though server has only 5 threads to process connections.
I will explain the reason very soon.

Server outcome:






















As you can see even though server has 5 threads only to handle client connection ,still it allows 10 client connection at a time. After that a long pause , as all client takes 15 seconds delay to send processing data . The key thing is server threads are not blocked and do not wait for any data from client. They will process it once it is available and mean while they can handle other client connections. This is the key aspect of NIO performance benefit over OIO.



 @Override   
    public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {   
     if (!msg.contains("Data")) { // if client sends its name   
      System.out.println(msg + " connected"); // print on console   
      ctx.writeAndFlush(msg + " connected" + "\n"); //sending back acknowledgement to client   
     } else { // if client sends some data for processing   
      System.out.println(msg + " Processed by server"); // print on console   
     }   
    }   


Above method does not block server threads and do not wait for any data from client. It will be asynchronously called when ever any data is available from client. Till that time it can utilize the same server threads for some completely different client connections.

In this article I have talked about NIO benefit on server side but similar thing you can think for client side also.

Now imagine a scenario of client server application where there are thousands of client concurrently accessing server and both of them have chain of operations like reading data from user, writing data in DB, calculate some business logic. Imagine if all the time server and client threads remains blocked than client can see significant delay in response from server and can not resume some other work till the time it receives outcome from response while on server side even though threads are idle and waiting for some data from client so they can not be utilized for other clients.
NIO is must and extremely powerful tool for high performance network applications.

I hope I am correctly able to make my point in this article.

Please post your comments and doubts!!!