当前位置: 首页 > 工具软件 > Hubs1 > 使用案例 >

java事件的接收_使用 Java 向/从 Azure 事件中心发送/接收事件(最新版) - Azure Event Hubs | Microsoft Docs...

田柏
2023-12-01

您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

使用 Java 向/从 Azure 事件中心 (azure-messaging-eventhubs) 发送/接收事件Use Java to send events to or receive events from Azure Event Hubs (azure-messaging-eventhubs)

06/23/2020

本文内容

本快速入门介绍如何使用 azure-messaging-eventhubs Java 包向事件中心发送事件以及从事件中心接收事件。This quickstart shows how to send events to and receive events from an event hub using the azure-messaging-eventhubs Java package.

重要

本快速入门使用新的 azure-messaging-eventhubs 库。This quickstart uses the new azure-messaging-eventhubs package. 有关使用旧的 azure-eventhubs 和 azure-eventhubs-eph 包的快速入门,请参阅使用 azure-eventhubs 和 azure-eventhubs-eph 发送和接收事件。For a quickstart that uses the old azure-eventhubs and azure-eventhubs-eph packages, see Send and receive events using azure-eventhubs and azure-eventhubs-eph.

先决条件Prerequisites

如果不熟悉 Azure 事件中心,请在阅读本快速入门之前参阅事件中心概述。If you're new to Azure Event Hubs, see Event Hubs overview before you do this quickstart.

若要完成本快速入门,需要具备以下先决条件:To complete this quickstart, you need the following prerequisites:

Microsoft Azure 订阅。Microsoft Azure subscription. 若要使用 Azure 服务(包括 Azure 事件中心),需要一个订阅。To use Azure services, including Azure Event Hubs, you need a subscription. 如果没有现有的 Azure 帐户,可以注册免费试用帐户,或者在创建帐户时使用 MSDN 订阅者权益。If you don't have an existing Azure account, you can sign up for a free trial or use your MSDN subscriber benefits when you create an account.

Java 开发环境。A Java development environment. 本快速入门使用 Eclipse。This quickstart uses Eclipse. 需要 Java 开发工具包 (JDK) 版本 8 或更高版本。Java Development Kit (JDK) with version 8 or above is required.

创建事件中心命名空间和事件中心。Create an Event Hubs namespace and an event hub. 第一步是使用 Azure 门户创建事件中心类型的命名空间,并获取应用程序与事件中心进行通信所需的管理凭据。The first step is to use the Azure portal to create a namespace of type Event Hubs, and obtain the management credentials your application needs to communicate with the event hub. 要创建命名空间和事件中心,请按照此文中的步骤操作。To create a namespace and an event hub, follow the procedure in this article. 然后,按照以下文章中的说明获取事件中心命名空间的连接字符串:获取连接字符串。Then, get the connection string for the Event Hubs namespace by following instructions from the article: Get connection string. 稍后将在本快速入门中使用连接字符串。You use the connection string later in this quickstart.

发送事件Send events

本部分介绍如何创建一个向事件中心发送事件的 Java 应用程序。This section shows you how to create a Java application to send events an event hub.

将引用添加到 Azure 事件中心库Add reference to Azure Event Hubs library

Maven 中心存储库中提供了事件中心的 Java 客户端库。The Java client library for Event Hubs is available in the Maven Central Repository. 可使用 Maven 项目文件中的以下依赖项声明引用此库:You can reference this library using the following dependency declaration inside your Maven project file:

com.azure

azure-messaging-eventhubs

5.0.1

编写代码以将消息发送到事件中心Write code to send messages to the event hub

对于以下示例,请首先在你最喜欢的 Java 开发环境中为控制台/shell 应用程序创建一个新的 Maven 项目。For the following sample, first create a new Maven project for a console/shell application in your favorite Java development environment. 添加一个名为 Sender 的类,并向该类中添加以下代码:Add a class named Sender, and add the following code to the class:

import com.azure.messaging.eventhubs.*;

import static java.nio.charset.StandardCharsets.UTF_8;

public class Sender {

public static void main(String[] args) {

}

}

连接字符串和事件中心Connection string and event hub

此代码使用事件中心命名空间的连接字符串以及事件中心的名称来生成事件中心客户端。This code uses the connection string to the Event Hubs namespace and the name of the event hub to build an Event Hubs client.

String connectionString = "";

String eventHubName = "";

创建事件中心生成者客户端Create an Event Hubs Producer client

此代码创建一个生成者客户端对象,用于生成事件/向事件中心发送事件。This code creates a producer client object that's used to produce/send events to the event hub.

EventHubProducerClient producer = new EventHubClientBuilder()

.connectionString(connectionString, eventHubName)

.buildProducerClient();

准备一批事件Prepare a batch of events

此代码准备一批事件。This code prepares a batch of events.

EventDataBatch batch = producer.createBatch();

batch.tryAdd(new EventData("First event"));

batch.tryAdd(new EventData("Second event"));

batch.tryAdd(new EventData("Third event"));

batch.tryAdd(new EventData("Fourth event"));

batch.tryAdd(new EventData("Fifth event"));

向事件中心发送事件批Send the batch of events to the event hub

此代码将上一步骤中准备的事件批发送到事件中心。This code sends the batch of events you prepared in the previous step to the event hub. 以下代码将在执行发送操作时阻塞。The following code blocks on the send operation.

producer.send(batch);

关闭和清理Close and cleanup

此代码关闭生成者。This code closes the producer.

producer.close();

发送事件的完整代码Complete code to send events

下面是将事件发送到事件中心的完整代码。Here is the complete code to send events to the event hub.

import com.azure.messaging.eventhubs.*;

public class Sender {

public static void main(String[] args) {

final String connectionString = "EVENT HUBS NAMESPACE CONNECTION STRING";

final String eventHubName = "EVENT HUB NAME";

// create a producer using the namespace connection string and event hub name

EventHubProducerClient producer = new EventHubClientBuilder()

.connectionString(connectionString, eventHubName)

.buildProducerClient();

// prepare a batch of events to send to the event hub

EventDataBatch batch = producer.createBatch();

batch.tryAdd(new EventData("First event"));

batch.tryAdd(new EventData("Second event"));

batch.tryAdd(new EventData("Third event"));

batch.tryAdd(new EventData("Fourth event"));

batch.tryAdd(new EventData("Fifth event"));

// send the batch of events to the event hub

producer.send(batch);

// close the producer

producer.close();

}

}

生成程序,并确保没有引发任何错误。Build the program, and ensure that there are no errors. 将在运行接收器程序后运行此程序。You'll run this program after you run the receiver program.

接收事件Receive events

The code in this tutorial is based on the EventProcessorClient sample on GitHub, which you can examine to see the full working application.

警告

如果在 Azure Stack Hub 上运行此代码,除非将特定的存储 API 版本作为目标,否则会遇到运行时错误。If you run this code on Azure Stack Hub, you will experience runtime errors unless you target a specific Storage API version. 这是因为事件中心 SDK 使用 Azure 中提供的最新 Azure 存储 API,而此 API 可能在 Azure Stack Hub 平台上不可用。That's because the Event Hubs SDK uses the latest available Azure Storage API available in Azure that may not be available on your Azure Stack Hub platform. Azure Stack Hub 支持的存储 Blob SDK 版本可能与 Azure 上通常提供的版本不同。Azure Stack Hub may support a different version of Storage Blob SDK than those typically available on Azure. 如果正在将 Azure Blob 存储用作检查点存储,请检查支持用于你的 Azure Stack Hub 版本的 Azure 存储 API 版本,并在你的代码中面向该版本。If you are using Azure Blog Storage as a checkpoint store, check the supported Azure Storage API version for your Azure Stack Hub build and target that version in your code.

例如,如果在 Azure Stack Hub 版本 2005 上运行,则存储服务的最高可用版本为版本 2019-02-02。For example, If you are running on Azure Stack Hub version 2005, the highest available version for the Storage service is version 2019-02-02. 默认情况下,事件中心 SDK 客户端库使用 Azure 上的最高可用版本(在 SDK 发布时为 2019-07-07)。By default, the Event Hubs SDK client library uses the highest available version on Azure (2019-07-07 at the time of the release of the SDK). 在这种情况下,除了执行本部分中的步骤以外,还需要添加相关代码,将存储服务 API 版本 2019-02-02 作为目标。In this case, besides following steps in this section, you will also need to add code to target the Storage service API version 2019-02-02. 如需通过示例来了解如何以特定的存储 API 版本为目标,请参阅 GitHub 上的此示例。For an example on how to target a specific Storage API version, see this sample on GitHub.

创建 Azure 存储和 Blob 容器Create an Azure Storage and a blob container

本快速入门将使用 Azure 存储(特别是 Blob 存储)作为检查点存储。In this quickstart, you use Azure Storage (specifically, Blob Storage) as the checkpoint store. 标记检查点是一个进程,被事件处理器用来标记或提交分区中最后一个成功处理的事件的位置。Checkpointing is a process by which an event processor marks or commits the position of the last successfully processed event within a partition. 标记检查点通常在处理事件的函数中进行。Marking a checkpoint is typically done within the function that processes the events. 了解有关检查点的更多信息,请参阅事件处理器。To learn more about checkpointing, see Event processor.

按照以下步骤创建 Azure 存储帐户。Follow these steps to create an Azure Storage account.

将事件中心库添加到 Java 项目Add Event Hubs libraries to your Java project

在 pom.xml 文件中添加以下依赖项。Add the following dependencies in the pom.xml file.

com.azure

azure-messaging-eventhubs

5.1.1

com.azure

azure-messaging-eventhubs-checkpointstore-blob

1.1.1

将以下“导入”语句添加到 Java 文件顶部。Add the following import statements at the top of the Java file.

import com.azure.messaging.eventhubs.EventHubClientBuilder;

import com.azure.messaging.eventhubs.EventProcessorClient;

import com.azure.messaging.eventhubs.EventProcessorClientBuilder;

import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;

import com.azure.messaging.eventhubs.models.ErrorContext;

import com.azure.messaging.eventhubs.models.EventContext;

import com.azure.storage.blob.BlobContainerAsyncClient;

import com.azure.storage.blob.BlobContainerClientBuilder;

import java.util.function.Consumer;

import java.util.concurrent.TimeUnit;

创建一个名为 Receiver 的类,并向该类中添加以下字符串变量。Create a class named Receiver, and add the following string variables to the class. 将占位符替换为正确的值。Replace the placeholders with the correct values.

private static final String EH_NAMESPACE_CONNECTION_STRING = "";

private static final String eventHubName = "";

private static final String STORAGE_CONNECTION_STRING = "";

private static final String STORAGE_CONTAINER_NAME = "";

将下面的 main 方法添加到该类。Add the following main method to the class.

public static void main(String[] args) throws Exception {

// Create a blob container client that you use later to build an event processor client to receive and process events

BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()

.connectionString(STORAGE_CONNECTION_STRING)

.containerName(STORAGE_CONTAINER_NAME)

.buildAsyncClient();

// Create a builder object that you will use later to build an event processor client to receive and process events and errors.

EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()

.connectionString(EH_NAMESPACE_CONNECTION_STRING, eventHubName)

.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)

.processEvent(PARTITION_PROCESSOR)

.processError(ERROR_HANDLER)

.checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));

// Use the builder object to create an event processor client

EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();

System.out.println("Starting event processor");

eventProcessorClient.start();

System.out.println("Press enter to stop.");

System.in.read();

System.out.println("Stopping event processor");

eventProcessorClient.stop();

System.out.println("Event processor stopped.");

System.out.println("Exiting process");

}

将处理事件和错误的两个帮助程序方法(PARTITION_PROCESSOR 和 ERROR_HANDLER)添加到 Receiver 类中。Add the two helper methods (PARTITION_PROCESSOR and ERROR_HANDLER) that process events and errors to the Receiver class.

public static final Consumer PARTITION_PROCESSOR = eventContext -> {

System.out.printf("Processing event from partition %s with sequence number %d with body: %s %n",

eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(), eventContext.getEventData().getBodyAsString());

if (eventContext.getEventData().getSequenceNumber() % 10 == 0) {

eventContext.updateCheckpoint();

}

};

public static final Consumer ERROR_HANDLER = errorContext -> {

System.out.printf("Error occurred in partition processor for partition %s, %s.%n",

errorContext.getPartitionContext().getPartitionId(),

errorContext.getThrowable());

};

完整代码应如下所示:The complete code should look like:

import com.azure.messaging.eventhubs.EventHubClientBuilder;

import com.azure.messaging.eventhubs.EventProcessorClient;

import com.azure.messaging.eventhubs.EventProcessorClientBuilder;

import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;

import com.azure.messaging.eventhubs.models.ErrorContext;

import com.azure.messaging.eventhubs.models.EventContext;

import com.azure.storage.blob.BlobContainerAsyncClient;

import com.azure.storage.blob.BlobContainerClientBuilder;

import java.util.function.Consumer;

import java.util.concurrent.TimeUnit;

public class Receiver {

private static final String EH_NAMESPACE_CONNECTION_STRING = "";

private static final String eventHubName = "";

private static final String STORAGE_CONNECTION_STRING = "";

private static final String STORAGE_CONTAINER_NAME = "";

public static final Consumer PARTITION_PROCESSOR = eventContext -> {

System.out.printf("Processing event from partition %s with sequence number %d with body: %s %n",

eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(), eventContext.getEventData().getBodyAsString());

if (eventContext.getEventData().getSequenceNumber() % 10 == 0) {

eventContext.updateCheckpoint();

}

};

public static final Consumer ERROR_HANDLER = errorContext -> {

System.out.printf("Error occurred in partition processor for partition %s, %s.%n",

errorContext.getPartitionContext().getPartitionId(),

errorContext.getThrowable());

};

public static void main(String[] args) throws Exception {

BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()

.connectionString(STORAGE_CONNECTION_STRING)

.containerName(STORAGE_CONTAINER_NAME)

.buildAsyncClient();

EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()

.connectionString(EH_NAMESPACE_CONNECTION_STRING, eventHubName)

.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)

.processEvent(PARTITION_PROCESSOR)

.processError(ERROR_HANDLER)

.checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));

EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();

System.out.println("Starting event processor");

eventProcessorClient.start();

System.out.println("Press enter to stop.");

System.in.read();

System.out.println("Stopping event processor");

eventProcessorClient.stop();

System.out.println("Event processor stopped.");

System.out.println("Exiting process");

}

}

生成程序,并确保没有引发任何错误。Build the program, and ensure that there are no errors.

运行应用程序Run the applications

先运行接收器应用程序。Run the receiver application first.

然后运行发送器应用程序。Then, run the sender application.

在接收器应用程序窗口中,确认已看到发送器应用程序发布的事件。In the receiver application window, confirm that you see the events that were published by the sender application.

在接收器应用程序窗口中按 ENTER 停止该应用程序。Press ENTER in the receiver application window to stop the application.

后续步骤Next steps

在 GitHub 上参阅以下示例:See the following samples on GitHub:

 类似资料: