当前位置: 首页 > 知识库问答 >
问题:

KafkaTemplate.send(键,值,主题)与自定义分区?

徐星阑
2023-03-14

我看到并实现了KafkaTemplate方法。使用默认分区器类发送(主题、消息)。

但在这里,我不是在传递钥匙。我有一个简单的自定义分区器类,我还想发送到kafka服务器,比如KafkaTemplate(主题、键、消息),在producerConfig中,我为分区设置了customPartitioner类。

如果我提供自定义分区器,我看到KafkaTemboard的这个Will send(Topic, Key, Message)方法调用分区方法?但我没有完全得到它。

  1. 我的简单customPartitioner类:
public class CustomPartitionar implements Partitioner {
   private PartitionMapper newMapper;
   public CustomPartitionar(){
       newMapper = new PartitionMapper();
   }
   @Override
   public void configure(Map<String, ?> configs) {

   }
   @Override
   public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,Cluster cluster) {
       int partition = 0;
       String userName = (String) key;
       // Find the id of current user based on the username from another mapper class
       Integer userId = newMapper.findUserId(userName);
       // If the userId not found, default partition is 0
       if (userId != null) {
           partition = userId;
       }
       return partition;
   }
   @Override
   public void close() {
   }
}
config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitionar.class);
KafkaTemplate.send(message.getTopic(),message.getReceiver(),message)

这能用一种简单的方式实现吗?还是我遗漏了什么?

共有1个答案

赵嘉悦
2023-03-14

KafkaTemplate有几种发送方法:

/**
 * Send the data to the default topic with no key or partition.
 * @param data The data.
 * @return a Future for the {@link SendResult}.
 */
ListenableFuture<SendResult<K, V>> sendDefault(V data);

/**
 * Send the data to the default topic with the provided key and no partition.
 * @param key the key.
 * @param data The data.
 * @return a Future for the {@link SendResult}.
 */
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);

/**
 * Send the data to the default topic with the provided key and partition.
 * @param partition the partition.
 * @param key the key.
 * @param data the data.
 * @return a Future for the {@link SendResult}.
 */
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

/**
 * Send the data to the default topic with the provided key and partition.
 * @param partition the partition.
 * @param timestamp the timestamp of the record.
 * @param key the key.
 * @param data the data.
 * @return a Future for the {@link SendResult}.
 * @since 1.3
 */
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

/**
 * Send the data to the provided topic with no key or partition.
 * @param topic the topic.
 * @param data The data.
 * @return a Future for the {@link SendResult}.
 */
ListenableFuture<SendResult<K, V>> send(String topic, V data);

/**
 * Send the data to the provided topic with the provided key and no partition.
 * @param topic the topic.
 * @param key the key.
 * @param data The data.
 * @return a Future for the {@link SendResult}.
 */
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);

/**
 * Send the data to the provided topic with the provided key and partition.
 * @param topic the topic.
 * @param partition the partition.
 * @param key the key.
 * @param data the data.
 * @return a Future for the {@link SendResult}.
 */
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

/**
 * Send the data to the provided topic with the provided key and partition.
 * @param topic the topic.
 * @param partition the partition.
 * @param timestamp the timestamp of the record.
 * @param key the key.
 * @param data the data.
 * @return a Future for the {@link SendResult}.
 * @since 1.3
 */
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

/**
 * Send the provided {@link ProducerRecord}.
 * @param record the record.
 * @return a Future for the {@link SendResult}.
 * @since 1.3
 */
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

/**
 * Send a message with routing information in message headers. The message payload
 * may be converted before sending.
 * @param message the message to send.
 * @return a Future for the {@link SendResult}.
 * @see org.springframework.kafka.support.KafkaHeaders#TOPIC
 * @see org.springframework.kafka.support.KafkaHeaders#PARTITION_ID
 * @see org.springframework.kafka.support.KafkaHeaders#MESSAGE_KEY
 */
ListenableFuture<SendResult<K, V>> send(Message<?> message);
 类似资料:
  • 打开主题配置 选一个自己喜欢的主题吧

  • 自定义主题 Element 默认提供一套主题,CSS 命名采用 BEM 的风格,方便使用者覆盖样式。我们提供了四种方法,可以进行不同程度的样式自定义。 主题编辑器 使用在线主题编辑器,可以修改定制 Element 所有全局和组件的 Design Tokens,并可以方便地实时预览样式改变后的视觉。同时它还可以基于新的定制样式生成完整的样式文件包,供直接下载使用(关于如何使用下载的主题包,请参考本节

  • uView目前可以自定主题色,字体颜色,边框颜色等,所有组件内部的样式,都基于同一套主题,比如您修改了primary主题色,所有用到了primary颜色 的组件都会受影响。 教程 可以在打开的颜色拾取器中输入或者选择颜色,再点"确定"按钮即可。 颜色配置完后,在页面底部下载文件,会得到一个名为uview.theme.scss的文件。 将文件复制到项目的公共目录(视情况而定)中,再在项目根目录的un

  • Element Plus 默认提供一套主题,CSS 命名采用 BEM 的风格,方便使用者覆盖样式。我们提供了四种方法,可以进行不同程度的样式自定义。 仅替换主题色 如果仅希望更换 Element Plus 的主题色,推荐使用在线主题生成工具。Element Plus 默认的主题色是鲜艳、友好的蓝色。通过替换主题色,能够让 Element Plus 的视觉更加符合具体项目的定位。 使用上述工具,可以

  • 问题内容: 有没有简单的方法可以自定义现有主题?对于默认主题,有很多主题属性,但是在sphinxdoc中,我什至无法设置徽标或更改某些颜色。 还是可以向我推荐一个可以学习如何修改主题的网站? 问题答案: 我只想在我的狮身人面像文档中添加ReST删除线。这是我的做法: 在: (这使其看起来像默认主题(图2)) 在: 然后,在您的conf.py中: 此处更多信息:https : //sphinx.re

  • 我目前正在尝试自定义首选项部分的颜色。但我不知道该怎么做。 在我的应用程序中,我有两个主题:黑暗主题和光明主题。这是用户选择的一个选项。在我的主要活动中,我设置了用户选择的主题。 在我使用 attr 定义要显示的颜色之后。 当前列表首选项: 代码:style.xml 就像我说的,我已经试过解决办法了。我该怎么办? 非常感谢托马斯 编辑我将preferenceActivity更改为preferenc