使用 Java 將事件傳送至 Azure 事件中樞或從中接收事件 (azure-messaging-eventhubs)Use Java to send events to or receive events from Azure Event Hubs (azure-messaging-eventhubs)
06/23/2020
本文內容
本快速入門說明如何使用 azure-messaging-eventhubs Java 套件,來傳送事件至事件中樞和從事件中樞接收事件。This quickstart shows how to send events to and receive events from an event hub using the azure-messaging-eventhubs Java package.
重要
本快速入門使用新的 azure-messaging-eventhubs 套件。This quickstart uses the new azure-messaging-eventhubs package. 如需使用舊有 azure-eventhubs 和 azure-eventhubs-eph 套件的快速入門,請參閱使用 azure-eventhubs 和 azure-eventhubs-eph 傳送和接收事件。For a quickstart that uses the old azure-eventhubs and azure-eventhubs-eph packages, see Send and receive events using azure-eventhubs and azure-eventhubs-eph.
必要條件Prerequisites
如果您對 Azure 事件中樞並不熟悉,在進行此快速入門之前,請先參閱事件中樞概述。If you're new to Azure Event Hubs, see Event Hubs overview before you do this quickstart.
若要完成本快速入門,您必須符合下列必要條件:To complete this quickstart, you need the following prerequisites:
Microsoft Azure 訂用帳戶。Microsoft Azure subscription. 若要使用 Azure 服務 (包括 Azure 事件中樞),您需要訂用帳戶。To use Azure services, including Azure Event Hubs, you need a subscription. 如果您沒有現有的 Azure 帳戶,您可以申請免費試用,或是在建立帳戶時使用 MSDN 訂閱者權益。If you don't have an existing Azure account, you can sign up for a free trial or use your MSDN subscriber benefits when you create an account.
Java 開發環境。A Java development environment. 本快速入門使用 Eclipse。This quickstart uses Eclipse. 需要含第 8 版或更新版本的 Java 開發套件 (JDK)。Java Development Kit (JDK) with version 8 or above is required.
建立事件中樞命名空間和事件中樞。Create an Event Hubs namespace and an event hub. 第一個步驟是使用 Azure 入口網站來建立「事件中樞」類型的命名空間,然後取得您應用程式與「事件中樞」進行通訊所需的管理認證。The first step is to use the Azure portal to create a namespace of type Event Hubs, and obtain the management credentials your application needs to communicate with the event hub. 若要建立命名空間和事件中樞,請依照這篇文章中的程序操作。To create a namespace and an event hub, follow the procedure in this article. 然後,依照下列文章中的指示,取得事件中樞命名空間的連接字串:取得連接字串。Then, get the connection string for the Event Hubs namespace by following instructions from the article: Get connection string. 您稍後會在本快速入門中使用連接字串。You use the connection string later in this quickstart.
傳送事件Send events
本節說明如何建立可將事件傳送至事件中樞的 Java 應用程式。This section shows you how to create a Java application to send events an event hub.
加入 Azure 事件中樞程式庫的參考Add reference to Azure Event Hubs library
Maven 中央存放庫中具有適用於事件中樞的 Java 用戶端程式庫。The Java client library for Event Hubs is available in the Maven Central Repository. 您可以在 Maven 專案檔中用下列相依性宣告來參照此程式庫:You can reference this library using the following dependency declaration inside your Maven project file:
com.azure
azure-messaging-eventhubs
5.0.1
撰寫程式碼以將訊息傳送到事件中樞Write code to send messages to the event hub
針對下列範例,在您最喜愛的 Java 開發環境中,先為主控台/殼層應用程式建立新的 Maven 專案。For the following sample, first create a new Maven project for a console/shell application in your favorite Java development environment. 新增名為 Sender 的類別,並在此類別中新增下列程式碼:Add a class named Sender, and add the following code to the class:
import com.azure.messaging.eventhubs.*;
import static java.nio.charset.StandardCharsets.UTF_8;
public class Sender {
public static void main(String[] args) {
}
}
連接字串和事件中樞Connection string and event hub
此程式碼會使用事件中樞命名空間的連接字串,以及事件中樞名稱來建置事件中樞用戶端。This code uses the connection string to the Event Hubs namespace and the name of the event hub to build an Event Hubs client.
String connectionString = "";
String eventHubName = "";
建立事件中樞生產者用戶端Create an Event Hubs Producer client
此程式碼會建立生產者用戶端物件,其用來產生事件/將事件傳送到事件中樞。This code creates a producer client object that's used to produce/send events to the event hub.
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.buildProducerClient();
準備事件批次Prepare a batch of events
此程式碼會準備一批事件。This code prepares a batch of events.
EventDataBatch batch = producer.createBatch();
batch.tryAdd(new EventData("First event"));
batch.tryAdd(new EventData("Second event"));
batch.tryAdd(new EventData("Third event"));
batch.tryAdd(new EventData("Fourth event"));
batch.tryAdd(new EventData("Fifth event"));
將這批事件傳送到事件中樞Send the batch of events to the event hub
此程式碼會將您在上一個步驟中準備的事件批次傳送到事件中樞。This code sends the batch of events you prepared in the previous step to the event hub. 下列程式碼會封鎖傳送作業。The following code blocks on the send operation.
producer.send(batch);
關閉並清除Close and cleanup
此程式碼會關閉生產者。This code closes the producer.
producer.close();
用以傳送事件的完整程式碼Complete code to send events
以下是可將事件傳送至事件中樞的完整程式碼。Here is the complete code to send events to the event hub.
import com.azure.messaging.eventhubs.*;
public class Sender {
public static void main(String[] args) {
final String connectionString = "EVENT HUBS NAMESPACE CONNECTION STRING";
final String eventHubName = "EVENT HUB NAME";
// create a producer using the namespace connection string and event hub name
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.buildProducerClient();
// prepare a batch of events to send to the event hub
EventDataBatch batch = producer.createBatch();
batch.tryAdd(new EventData("First event"));
batch.tryAdd(new EventData("Second event"));
batch.tryAdd(new EventData("Third event"));
batch.tryAdd(new EventData("Fourth event"));
batch.tryAdd(new EventData("Fifth event"));
// send the batch of events to the event hub
producer.send(batch);
// close the producer
producer.close();
}
}
建置程式,並確定沒有任何錯誤。Build the program, and ensure that there are no errors. 執行接收者程式之後,您將會執行此程式。You'll run this program after you run the receiver program.
接收事件Receive events
本教學課程中的程式碼是根據 GitHub 上的 EventProcessorClient 程式碼,您可以檢查該程式碼以查看完整的運作中應用程式。The code in this tutorial is based on the EventProcessorClient sample on GitHub, which you can examine to see the full working application.
警告
如果您在 Azure Stack Hub 上執行此程式碼,除非您以特定的儲存體 API 版本為目標,否則會遇到執行階段錯誤。If you run this code on Azure Stack Hub, you will experience runtime errors unless you target a specific Storage API version. 這是因為事件中樞 SDK 會使用 Azure 中可用的最新可用 Azure 儲存體 API,而這可能無法在您的 Azure Stack Hub 平台上使用。That's because the Event Hubs SDK uses the latest available Azure Storage API available in Azure that may not be available on your Azure Stack Hub platform. Azure Stack Hub 可能支援不同版本的儲存體 Blob SDK,而不是 Azure 上一般可用的版本。Azure Stack Hub may support a different version of Storage Blob SDK than those typically available on Azure. 如果您使用 Azure Blog 儲存體作為檢查點存放區,請檢查 Azure Stack Hub 組建所支援的 Azure 儲存體 API 版本,並在您的程式碼中以該版本作為目標。If you are using Azure Blog Storage as a checkpoint store, check the supported Azure Storage API version for your Azure Stack Hub build and target that version in your code.
例如,如果您在 Azure Stack Hub 2005 版上執行,儲存體服務的最高可用版本為 2019-02-02。For example, If you are running on Azure Stack Hub version 2005, the highest available version for the Storage service is version 2019-02-02. 根據預設,事件中樞 SDK 用戶端程式庫會使用 Azure 上的最高可用版本 (在 SDK 發行時為 2019-07-07)。By default, the Event Hubs SDK client library uses the highest available version on Azure (2019-07-07 at the time of the release of the SDK). 在此情況下,除了本節中的以下步驟外,您還需要新增程式碼,以將儲存體服務 API 版本設為 2019-02-02 為目標。In this case, besides following steps in this section, you will also need to add code to target the Storage service API version 2019-02-02. 如需如何設定特定儲存體 API 版本目標的範例,請參閱 GitHub 上的此範例。For an example on how to target a specific Storage API version, see this sample on GitHub.
建立 Azure 儲存體 Blob 容器Create an Azure Storage and a blob container
在本快速入門中,您會使用 Azure 儲存體 (明確來說是 Blob 儲存體) 作為檢查點存放區。In this quickstart, you use Azure Storage (specifically, Blob Storage) as the checkpoint store. 檢查點是事件處理器用來標記或認可資料分割內上次成功處理事件位置的程序。Checkpointing is a process by which an event processor marks or commits the position of the last successfully processed event within a partition. 標記檢查點通常是在處理事件的函式中完成。Marking a checkpoint is typically done within the function that processes the events. 若要深入了解檢查點,請參閱事件處理器。To learn more about checkpointing, see Event processor.
請遵循這些步驟來建立 Azure 儲存體帳戶。Follow these steps to create an Azure Storage account.
將事件中樞程式庫新增至您的 JAVA 專案Add Event Hubs libraries to your Java project
在 pom.xml 中新增下列相依性。Add the following dependencies in the pom.xml file.
com.azure
azure-messaging-eventhubs
5.1.1
com.azure
azure-messaging-eventhubs-checkpointstore-blob
1.1.1
在 JAVA 檔案頂端新增下列 import 陳述式。Add the following import statements at the top of the Java file.
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import java.util.function.Consumer;
import java.util.concurrent.TimeUnit;
建立名為 Receiver 的類別,並且將下列字串變數新增至此類別。Create a class named Receiver, and add the following string variables to the class. 將預留位置取代為正確的值。Replace the placeholders with the correct values.
private static final String EH_NAMESPACE_CONNECTION_STRING = "";
private static final String eventHubName = "";
private static final String STORAGE_CONNECTION_STRING = "";
private static final String STORAGE_CONTAINER_NAME = "";
將下列 main 方法新增至類別。Add the following main method to the class.
public static void main(String[] args) throws Exception {
// Create a blob container client that you use later to build an event processor client to receive and process events
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
.connectionString(STORAGE_CONNECTION_STRING)
.containerName(STORAGE_CONTAINER_NAME)
.buildAsyncClient();
// Create a builder object that you will use later to build an event processor client to receive and process events and errors.
EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
.connectionString(EH_NAMESPACE_CONNECTION_STRING, eventHubName)
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.processEvent(PARTITION_PROCESSOR)
.processError(ERROR_HANDLER)
.checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));
// Use the builder object to create an event processor client
EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();
System.out.println("Starting event processor");
eventProcessorClient.start();
System.out.println("Press enter to stop.");
System.in.read();
System.out.println("Stopping event processor");
eventProcessorClient.stop();
System.out.println("Event processor stopped.");
System.out.println("Exiting process");
}
將處理事件和錯誤的兩個協助程式方法 (PARTITION_PROCESSOR 和 ERROR_HANDLER) 新增至 Receiver 類別。Add the two helper methods (PARTITION_PROCESSOR and ERROR_HANDLER) that process events and errors to the Receiver class.
public static final Consumer PARTITION_PROCESSOR = eventContext -> {
System.out.printf("Processing event from partition %s with sequence number %d with body: %s %n",
eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(), eventContext.getEventData().getBodyAsString());
if (eventContext.getEventData().getSequenceNumber() % 10 == 0) {
eventContext.updateCheckpoint();
}
};
public static final Consumer ERROR_HANDLER = errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
};
完整程式碼看起來應該類似:The complete code should look like:
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import java.util.function.Consumer;
import java.util.concurrent.TimeUnit;
public class Receiver {
private static final String EH_NAMESPACE_CONNECTION_STRING = "";
private static final String eventHubName = "";
private static final String STORAGE_CONNECTION_STRING = "";
private static final String STORAGE_CONTAINER_NAME = "";
public static final Consumer PARTITION_PROCESSOR = eventContext -> {
System.out.printf("Processing event from partition %s with sequence number %d with body: %s %n",
eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(), eventContext.getEventData().getBodyAsString());
if (eventContext.getEventData().getSequenceNumber() % 10 == 0) {
eventContext.updateCheckpoint();
}
};
public static final Consumer ERROR_HANDLER = errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
};
public static void main(String[] args) throws Exception {
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
.connectionString(STORAGE_CONNECTION_STRING)
.containerName(STORAGE_CONTAINER_NAME)
.buildAsyncClient();
EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
.connectionString(EH_NAMESPACE_CONNECTION_STRING, eventHubName)
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.processEvent(PARTITION_PROCESSOR)
.processError(ERROR_HANDLER)
.checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));
EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();
System.out.println("Starting event processor");
eventProcessorClient.start();
System.out.println("Press enter to stop.");
System.in.read();
System.out.println("Stopping event processor");
eventProcessorClient.stop();
System.out.println("Event processor stopped.");
System.out.println("Exiting process");
}
}
建置程式,並確定沒有任何錯誤。Build the program, and ensure that there are no errors.
執行應用程式Run the applications
先執行接收者應用程式。Run the receiver application first.
然後,執行傳送者應用程式。Then, run the sender application.
在接收者應用程式視窗中,確認您看到傳送者應用程式所發佈的事件。In the receiver application window, confirm that you see the events that were published by the sender application.
按下接收者應用程式視窗中 ENTER 以停止應用程式。Press ENTER in the receiver application window to stop the application.
後續步驟Next steps
請參閱 GitHub 上的下列範例:See the following samples on GitHub: