public class MyProducer {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.put("bootstrap.servers", "kafka.kafka-cluster-shared.non-prod-5-az-scus.prod.us.walmart.net:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "xxxxxxxxx.KafkaJsonSerializer");
properties.put("acks", "1");
properties.put("retries", "2");
properties.put("batch.size", "16384");
properties.put("linger.ms", "1");
properties.put("buffer.memory", "33554432");
KafkaProducer<String, pharmacyData> kafkaProducer = new KafkaProducer<String, pharmacyData>(
properties);
String topic = "insights";
//try {
Gson gson = new Gson();
Reader reader = Files.newBufferedReader(Paths.get("......./part.json"));
List<pharmacyData> pdata = new Gson().fromJson(reader, new TypeToken<List<pharmacyData>>() {}.getType());
//pdata.forEach(System.out::println);
reader.close();
//} catch (Exception e) {
//e.printStackTrace();
//}
for (pharmacyData data : pdata) {
kafkaProducer.send(new ProducerRecord<String, pharmacyData>(topic, data), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println(recordMetadata.partition() + "--" + recordMetadata.serializedValueSize());
} else {
e.printStackTrace();
}
}
});
}
kafkaProducer.close();
}
}
public class pharmacyData {
private String load_date;
private String store_nbr;
private String state;
private String pmp_flag;
private String zero_flag;
private String submit_ts;
public pharmacyData(String load_date, String store_nbr, String state, String pmp_flag, String zero_flag, String submit_ts) {
this.load_date = load_date;
this.store_nbr = store_nbr;
this.state = state;
this.pmp_flag = pmp_flag;
this.zero_flag = zero_flag;
this.submit_ts = submit_ts;
}
public String getLoad_date() {
return load_date;
}
public void setLoad_date(String load_date) {
this.load_date = load_date;
}
public String getStore_nbr() {
return store_nbr;
}
public void setStore_nbr(String store_nbr) {
this.store_nbr = store_nbr;
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
public String getPmp_flag() {
return pmp_flag;
}
public void setPmp_flag(String pmp_flag) {
this.pmp_flag = pmp_flag;
}
public String getZero_flag() {
return zero_flag;
}
public void setZero_flag(String zero_flag) {
this.zero_flag = zero_flag;
}
public String getSubmit_ts() {
return submit_ts;
}
public void setSubmit_ts(String submit_ts) {
this.submit_ts = submit_ts;
}
@Override
public String toString() {
return "pharmacyData{" +
"load_date='" + load_date + '\'' +
", store_nbr='" + store_nbr + '\'' +
", state='" + state + '\'' +
", pmp_flag='" + pmp_flag + '\'' +
", zero_flag='" + zero_flag + '\'' +
", submit_ts='" + submit_ts + '\'' +
'}';
}
}
public class KafkaJsonSerializer implements Serializer {
private Logger logger = LogManager.getLogger(this.getClass());
@Override
public void configure(Map map, boolean b) {
}
@Override
public byte[] serialize(String s, Object o) {
byte[] retVal = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
retVal = objectMapper.writeValueAsBytes(o);
} catch (Exception e) {
logger.error(e.getMessage());
}
return retVal;
}
@Override
public void close() {
}
}
public class KafkaJsonDeserializer implements Deserializer {
@Override
public void configure(Map map, boolean b) {
}
@Override
public Object deserialize(String s, byte[] bytes) {
ObjectMapper mapper = new ObjectMapper();
pharmacyData pdata = null;
try {
pdata = mapper.readValue(bytes, pharmacyData.class);
} catch (Exception e) {
e.printStackTrace();
}
return pdata;
}
@Override
public void close() {
}
}
这是消费者:
public class MyConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "kafka.kafka-cluster-shared.non-prod-5-az-scus.prod.us.walmart.net:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "xxxxxxxx.KafkaJsonDeserializer");
properties.put("group.id", "consumer-group-1");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
KafkaConsumer<String, pharmacyData> consumer = new KafkaConsumer<>(properties);
String topic = "insights";
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, pharmacyData> consumerRecords = consumer.poll(100);
for (ConsumerRecord<String, pharmacyData> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key() + "--" + consumerRecord.toString());
//System.out.println(consumerRecord.offset() + "--" + consumerRecord.partition());
}
}
}
}
有人能帮我解决这些问题吗?非常感谢!
问题内容: 我正在尝试使用UDP将序列化的对象从服务器进程发送到Java中的客户端进程。问题是客户端在接收方法上被阻止。有人可以帮忙吗? 这是用于发送对象的服务器代码: 这是用于接收对象的客户端代码: 我只想接收能够重构的对象,但无法接收数据包本身。 问题答案: 我最终不知道要完成什么,但是使用UDP并不是那么容易…主要原因是在DatagramPacket对象的说明中: 数据报包用于实现无连接包传
问题内容: 我正在尝试将JSON对象从Javascript / Jquery发送到PHP,并且在控制台中收到错误消息msg。我究竟做错了什么。我是JS和PHP的新手。 jQuery文件: PHP文件 问题答案: 菲尔(Phil)的出色回答,但自从OP标题说 从javascript( 不是jQuery )发送json对象到php 这是使用(原始)javascript的方法,以防万一有人寻找此方法:
本文向大家介绍Springmvc发送json数据转Java对象接收,包括了Springmvc发送json数据转Java对象接收的使用技巧和注意事项,需要的朋友参考一下 1、导包 基于maven 2、jsp代码 3、控制器代码 4、配置json转换器 如果不使用注解驱动<mvc:annotation-driven />,就需要给处理器适配器配置json转换器 在springmvc.xml配置文件中,
我正在尝试构建一个JSON以与POST一起发送,但似乎我做错了什么: 这样,我得到了400错误请求错误,它根本不会触发REST。 相反,如果我把硬编码字符串如下,它工作得很好: 有什么想法吗? 谢谢,
我正在尝试使用适配器运行我的应用程序,但在执行此操作时,我遇到了以下错误: ReviewActivity StandupPlayerMain.java
我正在构建这个学校项目,我们必须在NodeJs和自由选择前端中创建自己的API。我写了以下代码:[在公共地图]app.js [这里是对本地服务器的请求]server.js [最后是带有输入字段的HTML,其中的值应与POST req]index.HTML一起发送 在函数addPoe()中,让newPoe用于测试目的。标题、作者和文本应来自表格。谁能看出我做错了什么? 编辑:在makeRequest