Running a Kafka Connector Inside a Container (Docker)
Everywhere you look these days in the software development world, you'll find containers in use in a variety of situations and by a vast number of developers and companies. This article from Docker does a good job of explaining containers. However, containers go far beyond Docker, including Kubernetes, Podman, Apache Mesos, and Red Hat OpenShift Container Platform among others.
I'm planning on writing a series of articles that go through various stages of deploying Kafka Connect in a containerized environment. For this article, I plan on getting to the point of deploying a multi-node distributed connector using docker. I will use one source connector in standalone mode that will be used to populate a Kafka topic with data and I will deploy a sink connector in distributed mode to pull the data back out.
Later articles will explore deploying other sink connectors in distributed mode, including the Kafka-Kinesis Connector, via containers. For this article, I will be using Docker via Docker Compose. I am hoping to look more into Podman and attempt deployment via Kubernetes in future articles. I am by no means an expert in any container technology, but I can mostly get around using containers in Docker. So, this is a learning experience on multiple fronts for me.
Picking a Kafka Container
For starters, we need a Kafka container! There are several to choose from, including wurstmeister/kafka-docker, Bitnami, and Confluent. I considered giving the Confluent version a try, but I'm not very familiar with the Confluent Platform and the Control Center, so I wasn't sure if there were any 'gotchas' when using it versus something like wurstmeister. Maybe I'll find the time to come back and give Confluent a better look in future articles. I decided to go with wurstmeister for this article.
While researching this, I found an excellent dev.to article that goes over how to deploy a connector in standalone mode. I used this as my starting point with the hopes that I would eventually end up with a container setup that would be usable to connect to virtually any Kafka broker and send data into Kinesis Firehose (for the Kafka-Kinesis Connector).
Step 1: Getting data into Kafka
I started out by cloning the repo from the previously referenced dev.to article:
git clone git@github.com:stockgeeks/docker-compose.git
I more or less ran the Docker Compose file as discussed in that article, by running docker-compose up
. I then placed a file in the connect-input-file directory (in my case a codenarc Groovy config file). Running a console consumer showed the file being output back out.
$ docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic simple-connect --from-beginning
As I worked through the examples on this page, I decided to go back and create a separate project that used the stockgeeks repo as the starting point. I will add to this repo as I try out different things and container technologies. You can checkout this repo by running git clone git@github.com:joelforjava/kafka-connect-container-examples.git
. From this point forward, this is the project I will be using.
Step 2: Getting data back out of Kafka
Next, I wanted to run Kafka Connect in distributed mode, pulling that same data back out using the FileStreamSinkConnector
sink connector. Rather than build on the existing docker setup, I decided to create a new Dockerfile
, docker-compose.yml
, and new configuration files for the sink connector inside of a new directory (distributed-connector
) in an attempt to keep everything somewhat organized.
This setup is shown in my kafka-connect-container-examples
repo under the branch step/2
.
version: '3.3'
services:
connect-distributed:
build:
context: .
dockerfile: Dockerfile
container_name: connect-distributed
ports:
- 18083:8083
volumes:
- ./connect-input-file:/tmp
In order for this container to interact with the already running Kafka broker, I will need to add this container to the existing network on which the kafka broker container is running. To do this, you will need to run docker network ls
to get a list of networks used by your various containers.
Your output will differ based on what containers you run and the networks you've previously created. Here's what my list looked like at the time:
NETWORK ID NAME DRIVER SCOPE
938a3db19507 bridge bridge local
cff74b7d60e4 build-system_sonarnet bridge local
b93a229f4eb2 host host local
da76ab07af40 kafka-connect-crash-course_default bridge local
In this case, kafka-connect-crash-course_default
is the network created by the original (project root) docker-compose.yml
file.
Next, I had to bring up the connect-distributed
service container, but not actually start it. Alternatively, I could've listed the network declaration in the Docker Compose file.
docker-compose up --no-start
Once the container is created, I can then run the following:
docker network connect kafka-connect-crash-course_default connect-distributed
Once you've connected the container with the sink connector (connect-distributed
) to the network, you can start up the service by running the docker-connect up
command. You should then be able to query the REST API by running curl http://localhost:18083/connectors
to get a list of currently running connectors, which should be an empty list.
Next, I created a JSON file, which pulled properties from the connect-file-sink.properties
file and used this to configure the connector instance:
curl -XPUT -H "Content-Type: application/json" --data "@distributed-connector/connect-file-sink.json" http://localhost:18083/connectors/file-sink-connector/config | jq
If all goes well with the configuration, you should see an output similar to the following:
{
"name": "file-sink-connector",
"config": {
"name": "file-sink-connector",
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": "1",
"topics": "simple-connect",
"file": "/tmp/my-output-file.txt",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
},
"tasks": [
{
"connector": "file-sink-connector",
"task": 0
}
],
"type": "sink"
}
At this point, as long as data was already in the simple-connect
topic, then you should see output in distributed-connector/connect-output-file/my-output-file.txt
.
Step 3: Scaling up connect-distributed instances
For this step, I did a little cleanup with the Docker Compose files and all of the various plugin config files. This can be seen in the step/3
branch. Before going any further, I had to run docker-compose down
to ensure the containers from the previous step are removed. Failure to do this will cause conflicts when you go to start up the instances listed in this step. Alternatively, you could rename the containers in this step, but I chose to keep the existing names.
I moved the 'standalone' config files into a new directory and renamed the directory used for the 'distributed' configuration files. I increased the tasks.max
value to 3
in an effort to see the tasks distributed across the scaled-up instances.
Now, we can finally take a look at the change required for the connect-distributed
service.
connect-distributed:
build:
context: ./distributed
dockerfile: Dockerfile
# container_name: connect-distributed
ports:
- 8083
depends_on:
- kafka
deploy:
replicas: 4
volumes:
- ./distributed/connect-output-file:/tmp
In order to scale out a docker compose service, you can't provide a hard-coded container_name
value, so that part is commented out and should ultimately be removed. You also can't do an explicit port mapping, e.g. 18083:8083
, but you can use a port range, such as "18083-18093:8083"
. In the example above, I let Docker assign the ports.
The example also lists 4 replicas
, but this setting is only valid in Swarm mode and is otherwise ignored. In version 2 of the docker compose files, there was a scale
parameter that could be used but it does not have a true equivalent in version 3 unless you count the Swarm setting.
For this step, I want to try running 3 instances of the connect-distributed service, so I enter the following command:
docker-compose up --scale connect-distributed=3
Soon, you should see logging outputs for the various services, including the 3 instances of the connect-distributed service.
Whether or not you mapped a port range for the connect-distributed service, you should then check the containers to see what host ports were assigned to the instances.
docker ps
You should see output similar to below:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
cd7c061d9ef2 docker-compose_connect-distributed "start-kafka.sh" 35 seconds ago Up 34 seconds 0.0.0.0:32776->8083/tcp docker-compose_connect-distributed_3
c4eb751169be docker-compose_connect-distributed "start-kafka.sh" 35 seconds ago Up 34 seconds 0.0.0.0:32775->8083/tcp docker-compose_connect-distributed_1
aa62908512ff docker-compose_connect-standalone "start-kafka.sh" 35 seconds ago Up 34 seconds 0.0.0.0:8083->8083/tcp connect-standalone
7722da0e7e48 docker-compose_connect-distributed "start-kafka.sh" 35 seconds ago Up 34 seconds 0.0.0.0:32774->8083/tcp docker-compose_connect-distributed_2
I've truncated the output to only show the connect containers. In this case, Docker assigned ports 32774
through 32776
to the scaled out connect-distributed services.
Now you should be able to perform the steps as done in Step 2 for querying the Connect REST API and for pushing a configuration by making use of one of the mapped ports.
Once the configuration is pushed, the file-sink-connector
connector does its job and pulls the data from Kafka, saving the data to the distributed/connect-output-file
directory. In addition, you can query Kafka using the Consumer Groups shell script to verify.
docker exec -it kafka /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --all-groups
For the new version of the simple-connect topic, we created 3 partitions and then set tasks.max
in the sink connector to 3, which resulted in one task per container and the summary below:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-file-sink-connector simple-connect 1 4667 4667 0 connector-consumer-file-sink-connector-1-f009d64b-b2ad-42e9-be14-a77b9acfc6c0 /172.23.0.6 connector-consumer-file-sink-connector-1
connect-file-sink-connector simple-connect 0 4666 4666 0 connector-consumer-file-sink-connector-0-e413eb56-30ec-4a3b-89fc-bf4b2aea01a9 /172.23.0.5 connector-consumer-file-sink-connector-0
connect-file-sink-connector simple-connect 2 4667 4667 0 connector-consumer-file-sink-connector-2-c34e154b-8efb-4b41-aea0-a133a5f8556c /172.23.0.7 connector-consumer-file-sink-connector-2
This concludes, for now, my experiment to run a sink connector in distributed mode all via Docker. This should come in handy in helping to migrate some of our Kafka Connectors from Virtual Machines to containers. My next steps will most likely be either trying this with Kubernetes or trying to get another plugin working, such as the Kafka-Kinesis Connector or the Elasticsearch connector. I'm sure it'll be all of the above at some point. Thank you, if you've read this far. I hope this has been useful to someone.