Writing Your Own Sink Connector for Your Kafka Stack

Written by anilselim | Published 2017/08/25
Tech Story Tags: docker | kafka | kafka-connect | elasticsearch | elastic

TLDRvia the TL;DR App

There can be no Kafka Connector for your system, or available ones may not meet your requirements. On both cases, you have to write your own Kafka Connector and there are not many online resources about it. I’ll try to write my adventure to help others suffering with the same pain.

Here is my case: I want to read data from a Kafka topic and write them to Elastic index if data has “insert” flag in it’s status field, or delete them if status is “delete”. I know I couldn’t use official or any other open source Elastic sink connectors as they have one generic behavior option, not depending on data, but connector configuration.

For local development and testing, I’ve used Landoop’s fast-data-dev project as it includes Zookeeper, Kafka, Connect and sufficient UI tools in just one docker container.

Getting Ready

If you want to write your own source or sink connector, you have to use Java, because our main idea is to create some jars from our project that is going to be a plug-in for our local Kafka Connect cluster, or standalone server. So, make sure that you have JDK on your local. I’ve used IntelliJ IDEA as I am a JetBrains fan.

Create Your Project Using Custom Connect Maven Archetype

There is a maven archetype for your project, a bunch of skeleton classes included. In order to use it via IntelliJ, you can add it when creating a new project. Here is its Github repository.

Not forget to use the latest version. Check it out from its repo.

What we are going to build?

As I said, we are going to create a custom Elastic Sink Connector that is going to behave according to the topic data. I am going to explain the most important parts of the project, please clone my repository to catch the following snippets.

ElasticSinkConnectorConfig.java

You will use this class in order to get configuration properties, its pretty straight forward class. First you should define your config element, such as:

Link to source

ELASTIC_PORT is the config property that will be used for our Elastic transport client, or you can say Elastic driver. ELASTIC_PORT_DOC is the explanation property that are used by ConfigDef static method.

ConfigDef is a method in kafka common package and we are binding all of our properties here:

Link to source

And the rest of the file includes getters and setters of properties, not a complex stuff here.

ElasticSinkConnector.java

If you have something to do when starting to consume from a topic, or stopping the connector, this is the class you should implement.

Just going over the methods:

  • version() returns the version of the connector,
  • start() takes configuration fromconnector.properties and pass them to ElasticSinkConnectorConfig class where we discussed above,
  • taskClass() returns the class that does the actual work — gets data from the topic and processes. (We are going to talk about this in a minute),
  • stop() is kind of teardown function for your connector,
  • config() returns the config class — ElasticSinkConnectorConfig for us,

and taskConfigs() is for your tasks. We have distributed the configs to all the tasks here: (by the way you should give tasks.max property from the worker configuration, and here is what value it should be)

By the way, I still could not realized how many task should be set for an optimal configuration.. Link to source

ElasticSinkTask.java

Here is the class you got the data from configured topic, in start() method, I create an instance ofElasticServiceImpl, I didn’t use any dependency injection strategy as testing the plug in overriding methods are really difficult and some kind of anti corruption layer is needed. For that reason, when I get the data in put() method, I directly send it to ElasticServiceImpl after a little modification and test my service instead of ElasticSinkTask class. In the constructor of elastic service, I passed null as the ElasticClient, because I am going to pass it in the test class. So, my main processing class became ElasticServiceImpl then.

The irony here is that I did not write any test class yet, I’ll implement them.

The most important thing for this class is you should be careful of using put() method as Kafka Connect commits the data offsets in this method. So, proper error handling should be done here.

ElasticServiceImpl.java

In constructor, we setup JsonConverter and Gson serializer just before creating an instance of ElasticClient.

In process() method, I convert record strings to Record data type since it is more readable and more processable.

Topic message representation. Link to source

After playing with some Gson methods to convert strings to Records, it is ready to be sent to Elastic.

ElasticClientImpl.java

As an Elastic client, I have tried using Transport Client at first as it connects like another node to your Elastic cluster and bulk indexing is little bit faster than http protocol. However, I faced with an error and could not find any solution, even I asked it to StackOverflow. I think it can be related to fast-data-dev container, or docker networking configuration etc. Then I switched to Jest Client as http protocol performance is also great related to the same article I have mentioned above.

In bulkSend() method, simply we add each data in dataList to the Bulk, if behavior of Record is insert, then Index request is added, else Delete request is attached to the Bulk.

We send it to Elastic and log if any error occurs:

Link to source

Conclusion

To conclude, we have to write our own sink or source connector if there is no community implementation for our system. Also there can be ready to use connectors, but you may need to change its behaviour.

The most challenging part of this process is testing your connector. What you implement is somekind of plugin to your connect cluster and it is not a standalone application. Writing unit tests are tricky as you override methods of Kafka Connect package and kind of anti corruption layer is needed. Also debugging is quite difficult.

You can test the connector using the docker-compose file I have written and following the steps in my repository.

Thanks,

Sometimes I tweet useful piece of information: @_skynyrd

Helpful resources I have used:


Published by HackerNoon on 2017/08/25