Configuration Parameters for Consumers

You can control some of the aspects of how consumers read messages from topics by setting various configuration parameters every time you start a consumer.

Configuration parameters supported from Apache Kafka

auto.commit.interval.ms
The frequency in milliseconds that the offsets are committed.
Default: 1000ms
auto.offset.reset
Specifies what MapR Streams should do when there is no initial offset, such as when a consumer starts reading from a partition.
earliest
Reset the offset to the offset of the earliest message in the partition.
latest
Reset the offset to the offset of the latest message in the partition.
This is the default value.
none
Throws a NoOffsetForPartitionException exception when the consumer next polls for messages in its subscription and no offset exists. The consumer must unsubscribe from the partition before polling functions correctly.
Any other value throws an error to the consumer.
enable.auto.commit
If true, periodically commits the highest offsets of the messages fetched by the consumer in all of the partitions for the topics that the consumer is subscribed to.
Default: true
fetch.min.bytes
The minimum amount of data the server should return for a fetch request. If insufficient data is available, the server will wait for this minimum amount of data to accumulate before answering the request.

This minimum applies to the totality of what a consumer has subscribed to.

Works in conjunction with the timeout interval that is specified in the poll function. If the minimum number of bytes is not reached by the time that the interval expires, the poll returns with nothing.

For example, suppose the value is set to 6 bytes and the timeout on a poll is set to 100ms. If there are 5 bytes available and no further bytes come in before the 100ms expire, the poll returns with nothing.

Default: 1 byte

fetch.max.wait.ms
The maximum amount of time the MapR Streams server will block before answering the fetch request if there isn't sufficient data to satisfy the requirement given by fetch.min.bytes.
group.id
A string 2457 up to bytes long that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group ID, multiple consumer processes indicate that they are all part of the same consumer group. Putting consumers into groups provides benefits that are described in Consumer Groups.

It is possible for a single consumer to be in a group.

key.deserializer
The class that implements the Deserializer interface for deserializing keys.
max.partition.fetch.bytes
The number of bytes of message data to attempt to fetch for each partition in each poll request. These bytes will be read into memory for each partition, so this parameter helps control the memory that the consumer uses.
The size of the poll request must be at least as large as the maximum message size that the server allows or else it is possible for producers to send messages that are larger than the consumer can fetch. If the size of a message is larger than this value, MapR Streams returns a RecordTooLargeException.
Default: 64KB
value.deserializer
The name of the appropriate deserialization class in the org.apache.kafka.common.serialization package or a class that implements the Deserializer interface for deserializing values.

Configuration parameters that are specific to MapR Streams

streams.consumer.buffer.memory
Specifies how much memory to use for caching pre-fetched messages. Messages that are in subscribed topics and partitions are pre-fetched and cached to improve performance.
Default: 64MB
streams.consumer.default.stream
Specifies the path and name of the stream that the consumer subscribes to if, when subscribing to a topic, the consumer does not specify a stream.

This default value is also used for the KafkaConsumer.listTopics() method.