在运行 ZeroMQ 基本 PUB / SUB C# 示例时,如果我先启动发布服务器,它们对我有用,但如果我先启动订阅服务器,则不会。当我这样做时,订阅者启动,但从未收到任何数据。从我所读到的内容来看,我认为我应该能够以任一顺序启动这些过程。
我正在使用nuget in的ZeroMQ 4.1.0.26软件包。NET 4.6,x64应用程序。这些都是在Windows上运行的。我在同一台机器上运行这两个应用程序。
下面是我正在运行的代码(这是ZeroMQ教程中示例的简化版本)。
订户:
static void Main(string[] args)
{
var endpoint = "tcp://127.0.0.1:5556";
// Socket to talk to server
using (var context = new ZContext())
using (var subscriber = new ZSocket(context, ZSocketType.SUB))
{
Console.WriteLine("I: Connecting to {0}…", endpoint);
subscriber.Connect(endpoint);
// Subscribe to zipcode
string zipCode = "90210 ";
Console.WriteLine("I: Subscribing to zip code {0}…", zipCode);
subscriber.Subscribe(zipCode);
while(true)
{
using (var replyFrame = subscriber.ReceiveFrame())
{
string reply = replyFrame.ReadString();
Console.WriteLine(reply);
}
}
}
}
发布者:
static void Main(string[] args)
{
using (var context = new ZContext())
using (var publisher = new ZSocket(context, ZSocketType.PUB))
{
var address = "tcp://*:5556";
Console.WriteLine("I: Publisher.Bind'ing on {0}", address);
publisher.Bind(address);
// Initialize random number generator
var rnd = new Random();
while (true)
{
// Get values that will fool the boss
int zipcode = 90210;
int temperature = rnd.Next(-55, +45);
// Send message to all subscribers
var update = string.Format("{0:D5} {1}", zipcode, temperature);
using (var updateFrame = new ZFrame(update))
{
publisher.Send(updateFrame);
}
Thread.Sleep(1000);
}
}
}
我尝试了以下建议中的建议:
使用 Python 等效发布者和订阅者:
因此,看起来 C# 订阅者代码有问题。
>
我的示例代码(下面的最新版本)有问题吗?
还是ZeroMQ的问题。网库?
这是正常工作的Python订阅者:
import sys
import zmq
# Socket to talk to server
context = zmq.Context()
socket = context.socket( zmq.SUB )
print( "Python: Collecting updates from weather server" )
socket.connect( "tcp://localhost:5556" )
socket.setsockopt_string( zmq.SUBSCRIBE, "" )
while True:
string = socket.recv_string()
zipcode, temperature = string.split()
print( zipcode + " " + temperature )
以下是C#订阅者的最新等效(非工作)版本:
static void Main(string[] args)
{
// Socket to talk to server
using (var context = new ZContext())
using (var socket = new ZSocket(context, ZSocketType.SUB))
{
Console.WriteLine("C#: Collecting updates from weather server");
socket.Connect("tcp://localhost:5556");
socket.Subscribe("");
while (true)
{
using (var replyFrame = socket.ReceiveFrame())
{
string reply = replyFrame.ReadString();
Console.WriteLine(reply);
}
}
}
}
如果您遵循此处重新发布的流程图,那么< code>PUB/SUB的问题就会凸显出来。
最近(EoY-2017)API v4.x的重新设计工作已经改变了一些设计方法(无论是在SUB
-端处理主题过滤器,还是在较新的API版本中,在PUB
-端,逆转开销分布/集中,排除流量缓冲/处理开销),PUB/SUB
操作顺序似乎保持不变(参见上述流程图)。
事后根因隔离测试:
感谢积极的代码重新测试,这有助于在根本原因歧视方面向前推进几步。
现在让我们关注为什么:
ZeroMQ通常有两个抽象层,ZeroMQ内部层,实现框架的核心设施,如ZeroMQ协议RFC规范中所定义的。这是负责所有RFC定义的交互变得兼容和交叉兼容的部分,以便以ZeroMQ发布的方式服务。
另一层由一个“外来”语言包装器/ API绑定构成,它帮助非ZeroMQ内部代码的其他语言使用ZeroMQ API为任何特定的第三方语言中介公开的服务,以便“外来”世界有机会在内部ZeroMQ层的世界中调用由库实现的服务。
因此,使用python<code>SUB</code>-probe(根据定义,它工作得很好)到C#语言绑定来隔离差异,其中<code>SUB</code>-probet仍然不工作。
正如Dijkstra在测试中指出的那样,一些测试按预期工作的事实并不意味着系统中没有其他错误/错误,绑定可能会有更多的问题,但是pythonSUB
-客户端的积极确认表明,ZeroMQ核心服务不是那些导致错误的人。
你的代码看起来不错。
如上所述,这是主要的可疑ATM
向C#绑定维护者报告修复ZeroMQ不符合行为的请求,如上面观察和重新测试的全部细节所记录的,是帮助不符合C#绑定顺利工作的最佳下一步。
我正在尝试让Spring WebSocket(Spring 4)在我一直在做的项目中工作。到目前为止,我已经打开了WebSocket,添加了订阅者并发送了消息。 从昨天开始,我一直在试图弄清楚为什么我的endpoint不处理HTML中包含的stomp客户端发送的消息。控制台中实际上也没有错误(好吧,我不完全确定这里的“连接到服务器未定义”是什么意思)。 以下是Chrome控制台的输出: 这是我的代
目前,我已经开始使用ActiveMQ处理JMS主题。我已经通过JAVA代码(如下所述)创建了发布者和持久订阅者,并且在订阅者端也收到了消息。 Publisher.Java 订阅者.java 我对以下主题有一些疑问, 如何检查有多少订阅者使用 Java JMS 在主题中主动查找消息? 如何从主题中获取活动和持久订阅者列表? 我们是否可以删除主题中发布的消息? 在这些情况下帮助我。 提前致谢。
简介 Redis 的列表类型键可以用来实现队列,并且支持阻塞式读取,所以 Redis 能够非常容易的实现一个高性能的优先队列。同时在更高层面上,Redis 还支持“发布/订阅”的消息模式,可以基于此构建一个聊天系统。 发布示例 发布(Publish)即将消息发布到频道中。示例代码: // 发送消息 Redis::publish('chan-1', 'Hello, World!'); // 发送消息
我需要在发布/订阅模式下调用Kafka消费者1000次。据我所知,为了让kafka在发布/订阅模式下工作,我需要给每个消费者一个新的groupId(props . put(" group . id ",String.valueOf(Instant.now())。toEpochMilli()));).但是当我这样做的时候,如果两个消费线程同时访问消费线程,就会出现问题。这个问题应该怎么解决?
是否有任何示例代码可以让一个主机发布事件,而另一个主机通过Esper框架接收事件(侦听器或订阅者)。我注意到Esper提供了不同的适配器(套接字、JMS和HTTP),但找不到相应的示例代码。谢谢
我想用Java实现各种各样的发布者/订阅者模式,但目前已经没有主意了。 有1个发布者和N个订阅者,发布者发布对象,然后每个订阅者需要按照正确的顺序对每个对象进行一次且仅处理一次。发布者和每个订阅者在自己的线程中运行。 在我最初的实现中,每个订阅者都有自己的阻塞队列,发布者将对象放入每个订阅者的队列中。这可以正常工作,但如果任何订阅者的队列已满,发布者将被阻塞。这会导致性能下降,因为每个订阅者处理对