-
Bug
-
Resolution: Fixed
-
Minor
-
None
-
-
GSoC - Coding Phase 1
It's a follow-up to the investigation in JENKINS-51842
Current receiver code:
@Override public final Command read() throws IOException, ClassNotFoundException, InterruptedException { Command cmd = null; consumer.subscribe(consumerTopics); while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(pollTimeout); for (ConsumerRecord<String, byte[]> record : records) { if (record.key().equals(consumerKey)) { cmd = Command.readFrom(channel, record.value()); } } if (cmd != null) { consumer.commitSync(); LOGGER.info("Received a command: " + cmd.toString()); return cmd; } } }
1) The code retrieves multiple records using consumer.poll(pollTimeout)
2) The records records are iterated in a cycle
3) If there are multiple records matching the "record.key().equals(consumerKey)" condition, only the last record will be processed later
The behavior leads to command loss on the initialization chain, because there are 2 paralell command sources: Channel initialization and PingThread