AtomicKafka is a lightweight NPM Package developed to simplify the process of establishing bidirectional, real-time data streaming with Apache Kafka in your web-app. Website | Library | Demo Apps | Featured on Medium
Websocket connections between the client and the server that accept user-defined event strings and callbacks
Broker initialization and connection to Apache Kafka
Consumer and Producer classes are predefined to be as modular as possible
Consumer functions accept user-defined callback functions to support lightweight stream processing
React Hook that throttles the websocket event listener with a time interval to maintain client performance
Supports multiple Kafka streams
Benefits of AtomicKafka
Getting Started
1. Initialize Kafka cluster
AtomicKafka currently supports running Apache Kafka clusters either using a Docker image or by connecting to Confluent Cloud.
Docker:
Download this .yml and run the following command in your terminal:
docker-compose up -d
Confluent Cloud:
Follow the steps on Confluent Cloud to create a free account with Confluent cloud. Obtain the API_ACCESS_KEY, API_ACCESS_SECRET, and BOOTSTRAP_SERVER
2. Configure .env file
Include the following lines in your .env depending on your Kafka environment. Set the PORT variable to the port where AtomicKafkaServer will be initialized in the next step.
Docker .env config: (API_KEY and API_SECRET are intentionally left blank)
Initialize a server instance of your choice (HTTP, Node.js, etc). The example below contemplates a Node.js Express server.
ATTENTION: a Server instance must be created for every remote AtomicKafkaClient.
Initialize and configure expressApp according to desired specifications.
Require in AtomicKafkaServer.
Define a server that listens on the user-defined PORT environment variable.
Initialize an AtomicKafkaServer instance aks by passing in the server.
/* initialize and configure Node.js expressApp according to user specificationsthen add the following: */constAtomicKafkaServer=require('atomic-kafka/server');constserver=expressApp.listen(process.env.PORT,()=>{console.log(`Listening on port ${process.env.PORT}`);})constaks=newAtomicKafkaServer(server);
5A. Create the Consumer and enable the built-in websocket on the server
Initialize a newConsumer on the aks instance and pass in the group_ID_string.
Enable the built-in websocket by invoking socketConsume and passing in the group_ID_string, an event_string, and the topic_string.
/* in your TypeScript React Component */declarefunctionrequire(name:string);constAtomicKafkaClient=require('atomic-kafka/client').default;
7A. Create and implement Consumer client component (JS & TS)
Initialize akc as an AtomicKafkaClient. Pass in AtomicKafkaServer instance host's URI_STRING
Define a callback to process message payload through the React state management tool of your choice.
Implement useInterval to consume from the kafka cluster on interval.
Return the invocation of the consumer function on the akc instance. Pass in a user-defined websocket event_string, the previously defined callback, and the interval_delay in milliseconds.
functionConsumerComponent(){constakc=newAtomicKafkaClient('ATOMIC_KAFKA_SERVER_URI_STRING');constcallback=(payload)=>{/* user-provided data stream processing function definition that effects state change */}/* Throttles message consumption. Interval in milliseconds, can be any number */akc.useInterval(()=>akc.consumer('consumeMessageEvent',callback),4000);}
7B. Create and implement Producer client component (JS & TS)
Initialize akc as an AtomicKafkaClient. Pass in AtomicKafkaServer instance host's URI_STRING
Generate a payload formatted as an arbitrarily-nested JSON object. The example below defines a payload, but it can be generated at any point in the client according to the user's specification.
Invoke the consumer function. Pass in the websocket event_string and the payload.
functionProducerComponent(){constakc=newAtomicKafkaClient('ATOMIC_KAFKA_SERVER_URI_STRING');constpayload={/* Data to be sent to the cluster. Arbitrarily-nested JSON format. Can be defined anywhere in the app. */}akc.producer('produceMessageEvent',payload);}
Contribute
We want this open-sourced project to continue to improve. If you would like to make a contribution to AtomicKafka, please fork this repo, add your awesome changes to a well-named feature branch of this repository, and make a pull request. We look forward to your input! And if you want to support AtomicKafka, please click on the
⭐
button for us!