当前位置: 首页 > 知识库问答 >
问题:

在spring boot中,我不能使用google pubsub模拟器发送消息

李飞翼
2023-03-14

我试图使用pubsub的模拟器发送推送消息,我也使用spring boot,这是我的配置:

依赖性:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
</dependency>
@Configuration
@AutoConfigureBefore(value= GcpPubSubAutoConfiguration.class)
@EnableConfigurationProperties(value= GcpPubSubProperties.class)
public class EmulatorPubSubConfiguration {
    @Value("${spring.gcp.pubsub.projectid}")
    private String projectId;

    @Value("${spring.gcp.pubsub.subscriptorid}")
    private String subscriptorId;

    @Value("${spring.gcp.pubsub.topicid}")
    private String topicId;

    @Bean
    public Publisher pubsubEmulator() throws IOException {
        String hostport = System.getenv("PUBSUB_EMULATOR_HOST");
        ManagedChannel channel = ManagedChannelBuilder.forTarget(hostport).usePlaintext().build();
        try {
            TransportChannelProvider channelProvider =
                    FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
            CredentialsProvider credentialsProvider = NoCredentialsProvider.create();

            // Set the channel and credentials provider when creating a `TopicAdminClient`.
            // Similarly for SubscriptionAdminClient
            TopicAdminClient topicClient =
                    TopicAdminClient.create(
                            TopicAdminSettings.newBuilder()
                                    .setTransportChannelProvider(channelProvider)
                                    .setCredentialsProvider(credentialsProvider)
                                    .build());

            ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
            // Set the channel and credentials provider when creating a `Publisher`.
            // Similarly for Subscriber
            return Publisher.newBuilder(topicName)
                    .setChannelProvider(channelProvider)
                    .setCredentialsProvider(credentialsProvider)
                    .build();
        } finally {
            channel.shutdown();
        }
    }
}

当然,我已经将PUBSUB_EMULATOR_HOST系统变量设置为localhost:8085,仿真程序在其中运行

我创建了一个用于测试的rest控制器:

  • 用于发送推送消息
@Autowired
private Publisher pubsubPublisher;

@PostMapping("/send1")
    public String publishMessage(@RequestParam("message") String message) throws InterruptedException, IOException {
        Publisher pubsubPublisher = this.getPublisher();
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
        ApiFuture<String> future =  pubsubPublisher.publish(pubsubMessage);
        //pubsubPublisher.publishAllOutstanding();
        try {
        // Add an asynchronous callback to handle success / failure
        ApiFutures.addCallback(future,
                new ApiFutureCallback<String>() {
                    @Override
                    public void onFailure(Throwable throwable) {
                        if (throwable instanceof ApiException) {
                            ApiException apiException = ((ApiException) throwable);
                            // details on the API exception
                            System.out.println(apiException.getStatusCode().getCode());
                            System.out.println(apiException.isRetryable());
                        }
                        System.out.println("Error publishing message : " + message);
                        System.out.println("Error publishing error : " + throwable.getMessage());
                        System.out.println("Error publishing cause : " + throwable.getCause());
                    }

                    @Override
                    public void onSuccess(String messageId) {
                        // Once published, returns server-assigned message ids (unique within the topic)
                        System.out.println(messageId);
                    }
                },
                MoreExecutors.directExecutor());
        }
        finally {
            if (pubsubPublisher != null) {
                // When finished with the publisher, shutdown to free up resources.
                pubsubPublisher.shutdown();
                pubsubPublisher.awaitTermination(1, TimeUnit.MINUTES);
            }
        }
    return "ok";
  • 获取消息:
@PostMapping("/pushtest")
    public String pushTest(@RequestBody CloudPubSubPushMessage request) {
        System.out.println( "------> message received: " + decode(request.getMessage().getData()) );
        return request.toString();
    }

我已经在模拟器中创建了我的主题和订阅,我遵循了以下教程:

我在模拟器中为get push消息设置endpoint“pushtest”,使用以下命令:

python subscriber.py PUBSUB_PROJECT_ID create-push TOPIC_ID SUBSCRIPTION_ID PUSH_ENDPOINT
python publisher.py PUBSUB_PROJECT_ID publish TOPIC_ID

我在“pushtest”endpoint中正确地获取消息。

我不知道为什么对我的欺负感到抱歉。

谢谢你的帮助。

共有1个答案

袁青青
2023-03-14

我发现问题了。

只在bean中注释这一行

channel.shutdown();

哈哈很简单。

 类似资料:
  • 我想使用SpringBoot向ActiveMQ队列发送消息。应用程序应在发送后终止,但仍保持活动状态。 这是我的申请代码: 在没有任何父节点的情况下使用以下依赖项(Maven): 和一行

  • 我试图使用Feign.HeaderMap注释在rest请求中传递HTTP头的映射,但它们出现在主体中。

  • 目标:当某个宏完成时,向Skype联系人发送消息。 资料来源:我四处搜索,发现了几个问题,所以试图做同样的事情。这是我使用Excel VBA向群聊发送Skype消息的代码的基础,也是这个代码的基础https://www.mrexcel.com/forum/excel-questions/424432-sending-skype-message-through-excel-vba.html这两个问题

  • 我在我的应用程序中点击发送按钮,模拟器出现以下错误消息: HAX正在工作,模拟器以快速virt模式运行 DYLD:惰性符号绑定失败:找不到符号:_UTF8_WRITE引用自:/users/nabil/documents/development/android/sdk/tools/emulator64-x86预期在:平面名称空间中DYLD:找不到符号:_UTF8_WRITE引用自:/users/na

  • 问题内容: 我在使用JMockit(1.21)模拟时遇到问题。请参阅以下内容,以简化我的实际课程。基本上我在我的代码中使用了某个地方,我希望对其进行模拟。 至于我的测试代码,此测试有效。 该测试失败。 我收到的错误消息: 我可以嘲笑像其他系统类和这样的,但只是似乎没有工作(也没有为此事)。我知道如何规避这是我的考验,所以我没有受到任何阻碍,但我不明白为什么不能嘲笑。删除Expectations块将