Skip to main content

I’m trying to debug this code in a service framework class;

consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(props);
consumer.subscribe(java.util.Collections.singletonList(topicName));
try {
while (true) {
org.apache.kafka.clients.consumer.ConsumerRecords records = consumer.poll(100L);
for (org.apache.kafka.clients.consumer.ConsumerRecord record : records) {
// Some more code here
}
}
} finally {
consumer.close();
}

Compilation of this code gives this ‘helpful’ error:

tsource error] ; is missing

 

The (almost exact) same code in Eclipse works fine:

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100L);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}

The error is triggered by this line:

for (org.apache.kafka.clients.consumer.ConsumerRecord record : records)

Anyone have any idea what the problem might be? Or have some tips on how to pinpoint the issue?

Hey Marc,

Your code is perfectly fine, but it’s the service definer that’s not cooperating. This issue is that you’re using a foreach construction that the service definer can’t handle at the moment. I can reproduce it with a simple string array

String"] cars = {"Volvo", "BMW", "Ford", "Mazda"};
for (String car : cars) {
System.out.println(car);
}

I looked at the kafka documentation and saw that the ConsumerRecords has an iterator. Can you try to use it like that for now?

org.apache.kafka.clients.consumer.ConsumerRecords<String, String> records = consumer.poll(100L);
java.util.Iterator recordIterator = records.iterator();
while(recordIterator.hasNext()){
org.apache.kafka.clients.consumer.ConsumerRecord<String, String> r = recordIterator.next();
// Do stuff with r
}

Kind regards,

Gert


Thanks for the pointer, @Gert.K ! I did have to explicitly cast the result of the iterator to an org.apache.kafka.clients.consumer.ConsumerRecord, but apart from that your sample code worked perfectly!

java.util.Iterator recordIterator = consumer.poll(100L).iterator();
while (recordIterator.hasNext()) {
org.apache.kafka.clients.consumer.ConsumerRecord record = (org.apache.kafka.clients.consumer.ConsumerRecord)recordIterator.next();
log.debug("Event received: " + record.offset() + " " + record.key() + " " + record.value());
}

 


Reply