当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka筛选器未筛选使用者记录

姚乐家
2023-03-14

应用筛选器之前的示例使用者记录是(在值中查找GP_ID):

 ConsumerRecord(topic = jdbc-project, partition = 0, offset = 0, CreateTime = 1551118248440, serialized key size = -1, serialized value size = 69, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"GP_ID": {"bytes": "@"}, "PROJECT_ID": {"bytes": "\u001E\u008C"}, "START_DATE": 1009843200000, "END_DATE": 1041292800000, "TITLE": "Project- FPH", "STATUS_CODE": "INACTIVE"})

KafkaRecordVO(projectId=7820, gpId=64)

当我在kafkaListenerContainerFactory()中按如下方式设置recordFilterStrategy时:

@Bean
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactoryProject() {
    ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(new RecordFilterStrategy<String, GenericRecord>() {
        @Override
        public boolean filter(ConsumerRecord<String, GenericRecord> consumerRecord) {
            long gpId= KafkaRecordVO.convertByteBufferToLong(consumerRecord.value().get("GP_ID"));
            if(gpId == 10766L || gpId == 10823L || gpId == 10459L || gpId == 10649L)
                return false;
            else
                return true;
        }
    });
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    return factory;
}

KafKareCordvo.ConvertByteBufferToLong正在将bytebuffer值转换为long值。

但是,当它被Kafka听众按以下方式消费时:

@KafkaListener(id = "project", topics = "jdbc-project", containerFactory = "kafkaListenerContainerFactoryProject")
public void consumeProject(ConsumerRecord<String, GenericRecord> record,Acknowledgment acknowledgment) {
    log.debug(record.toString());
    KafkaRecordVO recordVo = new KafkaRecordVO().projectId(record.value().get("PROJECT_ID"))
                                                .budgetYear(record.value().get("GP_ID"));
    log.debug(recordVo.toString());
}

这将返回删除我筛选的字段值的记录:“gp_id”

以下是应用筛选器(在值中查找GP_ID)后生成的示例日志:

ConsumerRecord(topic = jdbc-project, partition = 0, offset = 171275, CreateTime = 1551118279371, serialized key size = -1, serialized value size = 181, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = { "GP_ID": {"bytes": ""}, "PROJECT_ID": {"bytes": "\u0005â^"}, "START_DATE": 1470009600000, "END_DATE": 1532995200000, "TITLE": "Project 2016 - 2016", "STATUS_CODE": "INACTIVE"})

KafkaRecordVO(projectId=385630, gpId=0)
public static Long convertByteBufferToLong(Object byteBuff) {
    //After adding below line, the issue got resolved
    ByteBuffer buf = ((ByteBuffer) byteBuff).duplicate();
    byte[] arr = new byte[buf.remaining()];
    buf.get(arr);
    BigInteger bi =new BigInteger(1,arr);
    return bi.longValue();
}

共有1个答案

殷耀
2023-03-14

你的建议毫无意义;筛选器适配器具有以下代码...

@Override
public void onMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
    if (!filter(consumerRecord)) {
        switch (this.delegateType) {
            case ACKNOWLEDGING_CONSUMER_AWARE:
                this.delegate.onMessage(consumerRecord, acknowledgment, consumer);
                break;
            case ACKNOWLEDGING:
                this.delegate.onMessage(consumerRecord, acknowledgment);
                break;
            case CONSUMER_AWARE:
                this.delegate.onMessage(consumerRecord, consumer);
                break;
            case SIMPLE:
                this.delegate.onMessage(consumerRecord);
        }
    }
    else {
        ackFilteredIfNecessary(acknowledgment);
    }
}

它不对记录执行任何操作。

尝试在调试器中运行,看看是否可以看到正在发生的事情。

 类似资料:
  • 请使用以下的方法筛选网格中的数据: 右击一个单元格并在弹出式菜单选择“筛选”->“字段 xxx 值”来用当前单元格的值筛选记录。 “自定义筛选” 对话框能快速创建一个简单的筛选。只需简单地右击网格并在弹出式菜单选择“筛选”->“自定义筛选”。你可以使用字符“_”来代表在条件中任何单一符号,和使用字符“%”来代表在条件中任何一组符号。 你还可以使用更复杂的方式自定义你的筛选,右击字段并在弹出式菜单选

  • 请使用以下的方法筛选网格中的数据: 点击单元格进入编辑模式。按住 Control 键并点按单元格,然后在弹出式菜单选择“筛选”->“字段 xxx 值”来用当前单元格的值筛选记录。 你还可以使用更复杂的方式自定义你的筛选,在工具栏点击 。筛选向导会出现在网格的上方,你可以看到现有的筛选条件,简易地点击左侧的复选框来启用或禁用它。

  • 请使用以下的方法筛选网格中的数据: 右击一个单元格并在弹出式菜单选择“筛选”->“字段 xxx 值”来用当前单元格的值筛选记录。 “自定义筛选” 对话框能快速创建一个简单的筛选。只需简单地右击网格并在弹出式菜单选择“筛选”->“自定义筛选”。你可以使用字符“_”来代表在条件中任何单一符号,和使用字符“%”来代表在条件中任何一组符号。 你还可以使用更复杂的方式自定义你的筛选,右击字段并在弹出式菜单选

  • 首先,我试图使选择所有复选框,如果我单击表头中的选择所有复选框,整个表行将选择并显示一个复选框反向消息,即我选择了多少复选框。这里的问题是,如果我单击select all复选框,反向消息不会显示楼上的表,即我选择了多少行。 其次,如果我从任何列中筛选任何数字,相同的数字将显示同一列中有多少行具有相同的数字。如果我选中了所有复选框,那么反向消息将显示我选中了多少行复选框。这里,问题是显示整个表行计数

  • 有什么建议吗?

  • 筛选器。 Usage 全部引入 import { Picker } from 'beeshell'; 按需引入 import Picker from 'beeshell/dist/components/Picker'; Examples Code import { Picker } from 'beeshell'; <Picker ref={(c) => { this._pick