Redis Sink connector connectivity issue(Kafka -> Redis)

Hi,
I am trying to sink the data from kafka topic using ksqldb to Redis. Inorder to achieve this, I am using redis-source-sink connector(6.6.8) Redis Connector (Source and Sink) by Redis | Confluent Hub

Below are the properties used.

“name”: “kafka-connect-redis-10”,
“config”: {
“connector.class”: “com.redis.kafka.connect.RedisSinkConnector”,
“tasks.max”: “1”,
“redis.type”:“HASH”,
“redis.uri”:““uri_of_sentinel_server””,
“redis.username”: “username”,
“redis.password”: “”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “io.confluent.connect.avro.AvroConverter”,
“value.converter.enhanced.avro.schema.support”: “true”,
“value.converter.schema.registry.url”: “http://:”,
“redis.operation.timeout.ms”: 10000,
“redis.database”:3,
“topics”: “santoshtopic”

    }

The above property was used to create the connector instance. When the data is being pushed on to kafka topic,
I should be able to see the same records being sinked into Redis.But, I end up getting the below error.
Help here would be really appreciated since we are blocked.
Note : We need to connect via Sentinel URI
SAMPLE DATA:

DE:{“regioncode”:“DE”, “region”:“Delhi”}
Redis version : 6.0.9
Kafka version : apache-kafka-2.7
Logs are as below:

io.lettuce.core.RedisCommandExecutionException: ERR unknown command HSET, with args beginning with: santoshtopic:DE, REGIONNAME, Delhi,
at io.lettuce.core.internal.ExceptionFactory.createExecutionException(ExceptionFactory.java:147)
at io.lettuce.core.internal.Exceptions.bubble(Exceptions.java:72)
at io.lettuce.core.internal.Exceptions.fromSynchronization(Exceptions.java:102)
at io.lettuce.core.internal.Futures.awaitAll(Futures.java:226)
at io.lettuce.core.LettuceFutures.awaitAll(LettuceFutures.java:59)
at com.redis.spring.batch.RedisItemWriter.write(RedisItemWriter.java:71)
at com.redis.kafka.connect.sink.RedisSinkTask.put(RedisSinkTask.java:304)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: io.lettuce.core.RedisCommandExecutionException: ERR unknown command HSET, with args beginning with: santoshtopic:DE, REGIONNAME, Delhi,
at io.lettuce.core.internal.ExceptionFactory.createExecutionException(ExceptionFactory.java:147)
at io.lettuce.core.internal.ExceptionFactory.createExecutionException(ExceptionFactory.java:116)
at io.lettuce.core.protocol.AsyncCommand.completeResult(AsyncCommand.java:120)
at io.lettuce.core.protocol.AsyncCommand.complete(AsyncCommand.java:111)
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:63)
at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:746)
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:681)
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:598)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
io.lettuce.core.RedisCommandExecutionException: ERR unknown command HSET, with args beginning with: santoshtopic:DE, REGIONNAME, Delhi,
at io.lettuce.core.internal.ExceptionFactory.createExecutionException(ExceptionFactory.java:147)
at io.lettuce.core.internal.Exceptions.bubble(Exceptions.java:72)
at io.lettuce.core.internal.Exceptions.fromSynchronization(Exceptions.java:102)
at io.lettuce.core.internal.Futures.awaitAll(Futures.java:226)
at io.lettuce.core.LettuceFutures.awaitAll(LettuceFutures.java:59)
at com.redis.spring.batch.RedisItemWriter.write(RedisItemWriter.java:71)
at com.redis.kafka.connect.sink.RedisSinkTask.put(RedisSinkTask.java:304)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: io.lettuce.core.RedisCommandExecutionException: ERR unknown command HSET, with args beginning with: santoshtopic:DE, REGIONNAME, Delhi,
at io.lettuce.core.internal.ExceptionFactory.createExecutionException(ExceptionFactory.java:147)
at io.lettuce.core.internal.ExceptionFactory.createExecutionException(ExceptionFactory.java:116)
at io.lettuce.core.protocol.AsyncCommand.completeResult(AsyncCommand.java:120)
at io.lettuce.core.protocol.AsyncCommand.complete(AsyncCommand.java:111)
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:63)
at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:746)
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:681)
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:598)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

Hi,

What version of Redis are you using? Also what Redis URI scheme are you specifying? For sentinel it should be redis-sentinel://. See Redis URI and connection details · lettuce-io/lettuce-core Wiki · GitHub for details.

Hi Santosh,
I am also exploring going down a similar path with Confluent Kafka and the Redis Sink using Redis Sentinel, were you able to resolve your issue?