Skip to content

Conversation

sauravkumarrr
Copy link
Contributor

Problem

The RedisStreamSourceTask was experiencing ConcurrentModificationException during offset commits due to concurrent modification of the offset collection while it was being processed in the stream pipeline. This occurred when multiple threads attempted to modify the offset collection simultaneously during the commit process.

Stack trace:

java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.commitSourceTask(AbstractWorkerSourceTask.java:636)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:326)
	at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:111)
	at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.lambda$schedule$0(SourceTaskOffsetCommitter.java:79)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.util.ConcurrentModificationException
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1714)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:575)
	at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
	at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:616)
	at com.redis.kafka.connect.source.RedisStreamSourceTask.commit(RedisStreamSourceTask.java:121)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$commitSourceTask$11(AbstractWorkerSourceTask.java:630)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	... 3 more

Solution

Implemented thread-safe handling of offsets using concurrent collections:

  1. Replaced the offset collection with a thread-safe ConcurrentLinkedQueue wrapped in an AtomicReference
  2. Implemented atomic queue swapping during commits to prevent concurrent modification
  3. Added proper error handling with offset restoration capability
  4. Created isolation between offset collection and processing to prevent thread interference

Key changes:

  • Added AtomicReference<ConcurrentLinkedQueue<Map<String, ?>>> for thread-safe offset storage
  • Implemented atomic queue swapping in the commit method
  • Enhanced error recovery to restore failed offsets reliably

This change is fully backwards compatible and requires no configuration changes or external dependency updates.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant