Sample Java Consumer

You need to first add the following dependency to the POM file:
<dependency>
  <groupId>commons-logging</groupId>
  <artifactId>commons-logging</artifactId>
  <version>1.1.1</version>
</dependency>        
/* This code is successfully tested for common-logging version 1.11 and 1.2. */

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SampleConsumer {
    // Set the stream and topic to read from
    public static String topic = "/<path to and name of the stream>:<name of topic>";

    // Declare a new consumer.
    public static KafkaConsumer<Integer, String> consumer;

    public static void main(String[] args) {
        configureConsumer();

        // Subscribe to the topic.
        consumer.subscribe(Collections.singletonList(topic));

        // Set the timeout interval for requests for unread messages.
        Duration pollTimeout = Duration.ofMillis(1000);

        try {
            while (true) {
                ConsumerRecords<Integer, String> records = consumer.poll(pollTimeout);
                records.forEach(record -> {
                    System.out.printf("%s %d %d %s %s \n", record.topic(),
                            record.partition(), record.offset(), record.key(), record.value());

                });
            }
        } finally {
            consumer.close();
        }
    }

    /* Set the value for a configuration parameter. 
       This configuration parameter specifies which 
       class to use to deserialize the value of each message. */
    public static void configureConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer(props);
    }
}