我的生产者不会抛出任何错误,但数据没有被发送到目标主题。你能推荐任何技术来调试这种情况吗。
我在同步循环中调用合流Python Avro生产者,向主题发送数据,如下所示:
self.producer.produce(topic=test2, value=msg_dict)
在这个调用之后,我有一段代码来刷新队列:
num_messages_in_queue = self.producer.flush(timeout = 2.0)
print(f"flushed {num_messages_in_queue} messages from producer queue in iteration {num_iterations} ")
执行时没有任何错误。但在执行此代码后也不会触发回调。我的生产者发起如下:
def __init__(self,broker_url=None,topic=None,schema_registry_url=None,schema_path=None):
try:
with open(schema_path, 'r') as content_file:
schema = avro.loads(content_file.read())
except Exception as e:
print(f"Error when trying to read avro schema file : {schema_path}")
self.conf = {
'bootstrap.servers': broker_url,
'on_delivery': self.delivery_report,
'schema.registry.url': schema_registry_url,
'acks': -1, #This guarantees that the record will not be lost as long as at least one in-sync replica remains alive.
'enable.idempotence': False, #
"error_cb":self.error_cb
}
self.topic = topic
self.schema_path = schema_path
self.producer = AvroProducer(self.conf,default_key_schema=schema, default_value_schema=schema)
def delivery_report(self, err, msg):
print(f"began delivery_report")
if err is None:
print(f"delivery_report --> Delivered msg.value = {msg.value()} to topic= {msg.topic()} offset = {msg.offset} without err.")
else:
print(f"conf_worker AvroProducer failed to deliver message {msg.value()} to topic {self.topic}. got error= {err}")
执行完这段代码后,我将查看模式注册表容器上的主题,如下所示:
docker exec schema_registry_container kafka-avro-console-consumer --bootstrap-server kafka:29092 --topic test2 --from-beginning
我看到以下输出:
[2020-04-03 15:48:38,064]信息注册Kafka:type=Kafka.log4jController MBean(kafka.utils.log4jControllerRegistration$)[2020-04-03 15:48:38,742]信息ConsumerConfig值:auto.commit.interval.ms=5000 auto.offset.reset=最早引导服务器=[Kafka:29092]check.crcs=true client.dns.lookup=default client.id=connections.max.idle.ms=540000 default.api.timeout.ms=540000 default.api.timeout.ms=false exclude.internal.topics=true fetch.max.bytes=52428800 fetch.max.wait.ms=500 fetch.min.bytes=提交的key.deserializer=class>>org.apache.kafka.common.serialization.bytearraydeserializer max.partition.fetch.bytes=1048576 max.poll.interval.ms=30000 max.poll.interval.ms=500 metadata.max.age.ms=30000 metadata.max.age.ms=[]metrics.num.samples=2 metrics.recorders=[]metrics.num.samples=INFO 50热曲Est.timeout.ms=30000 retry.backoff.ms=100 sasl.client.callback.handler.class=null sasl.jaas.config=null sasl.kerberos.kinit.cmd=/usr/bin/kinit sasl.kerberos.min.time.before.relogin=60000 sasl.kerberos.service.name=null sasl.kerberos.ticket.rence.jitter=0.05 sasl.kerberos.ticket.renew.window.factor=0.8.window.Factor=0.8 sasl.login.refresh.window.jitter=0.05 sasl.mechanicol=GSSAPI security.protocol=PLAINTEXT send.buffer.bytes=131072 session.timeout.ms=10000 ssl.cipher.suites=null ssl.enabled.protocols=[TLSv1.2,TLSv1.1,TLSv1]ssl.endpoint.identification.algorithm=https ssl.key.password=null.实现=空ssl.TrustManager.Algorithmory=PKIX SSL.TrustStore.Location=null SSL.TrustStore.Password=null SSL.TrustStore.Type=JKS Value.Deserializer=class>>org.apache.Kafka.common.Serialization.ByteArrayDeserializer(org.apache.Kafka.clients.Consumer.consumerConfig)[2020-04-03 15:48:38,887]信息Kafka版本A3DB(org.apache.Kafka.common.utils.appInfoParser)[2020-04-03 15:48:39,221]信息集群ID:khkzipbvrkiozobbvp1fw(org.apache.Kafka.clients.metadata)[2020-04-03 15:48:39,224]信息[Consumer clientid=consumer-1,groupid=console-consumer-49056]发现的组协调器Kafka:29092(ID:2147483646 48:39,231]信息[Consumer clientid=consumer-1,groupid=console-consumer-49056]撤消以前分配的分区[](org.apache.Kafka.clients.Consumer.internals.consumercoordinator)[2020-04-03 15:48:39,231]信息[Consumer Clientid=Consumer-1,Groupid=Console-Consumer-49056](重新)加入组>(org.apache.kafka.clients.Consumer.Internals.AbstractConcordinator)[2020-04-03 15:48:42,264]信息[Consumer Clientid=Consumer-1,Groupid=Console-49056]成功加入了与第1代(org.apache.kafka.Clients.Console-Consumer-49056]的组[2020-04-03 15:48:42-consumer-49056]设置新分配的分区[test2-0]>(org.apache.kafka.clients.Consumer.internals.consumerCoordinator)[2020-04-03 15:48:42,293]信息[Consumer clientid=consumer-1,groupid=console-consumer-49056]将分区test2-0的偏移量重置为偏移量0.>(org.apache.kafka.clients.Consumer.internals.fetcher)
所以答案是如此微不足道,以至于它的禁运!但它确实指出了这样一个事实,即在多层基础结构中,错误设置的单个值可能会导致静默失败,这可能是非常繁琐的跟踪。
因此,问题来自于在my docker-compose.yml文件中错误的param设置my,其中没有设置broker_url的env变量。应用程序代码需要这个变量来引用kafka代理。但是,没有为这个丢失的param抛出异常,它正在默默地失败。
我希望当遇到错误时,用户会收到对其http调用的异常响应。该错误确实被捕获并显示在spring日志中,但不会返回给用户。为什么? 我的方法是: 日志中的错误: 错误处理程序: xception.java 例外回复。JAVA 用户收到403。。我不知道为什么,但我应该接受我的错误,没有任何身体。。 InternalServerExc0019在try/catch之外工作。.
我已经创建了一个Spring Boot和RestAPI应用程序项目。所有东西都运行并执行,但它没有在后端(MySQL)中创建任何表。它在控制台中也没有显示任何错误。我对spring boot和API是新手。 下面是我的代码: user.java: 我尝试添加spring.jpa.hibernate.ddl-auto=update和spring.jpa.hibernate.ddl-auto=crea
我有一个应用程序正在尝试调用一个服务,而另一个服务似乎正在超时。问题是我的应用程序没有收到任何超时异常,尽管我确实看到控制台上打印出一个错误: 客户端是使用以下设置创建的: 的值为20000,我已经验证了它的设置是否正确。 捕捉服务调用的代码如下所示: 即使我更改了我的块来捕获任何内容,我仍然没有捕获任何异常。WebSphere检测到事务超时,但我不知道为什么应用程序在web服务调用中没有检测到超
我在PHPmyadmin中的查询有结果,但在C#中,a.read()不返回数据。
我目前正在尝试使用chart.js在ReactJS中呈现条形图 信息: 官方网页-http://www.chartjs.org/docs/ GitHub-https://github.com/reactjs/react-chartjs 我的问题是,即使我的终端没有给我任何错误,我的本地服务器没有呈现任何东西。 这是我的代码: 如果我犯了一些愚蠢的错误,请容忍我,这是我编写的第一个React应用程序
问题内容: 将其插入运行Android 2.3的模拟器中,而不是在4中运行。在4中,它可以完美运行。我正在尝试通过https连接到实时服务器。它使用有效的Thawte证书,在所有浏览器以及Android 3和4中均可正常运行。 如果有人有代码帮助,请谢谢。另外,如果有人对安全解决方法有任何建议,我将不胜感激。我仍在学习,并且已经在这个问题上解决了一个星期。它必须结束,所以我可以继续工作和学习。嗯