Agent skill
azure-eventhub-java
Build real-time streaming applications with Azure Event Hubs SDK for Java. Use when implementing event streaming, high-throughput data ingestion, or building event-driven architectures.
Install this agent skill to your Project
npx add-skill https://github.com/microsoft/skills/tree/main/.github/plugins/azure-sdk-java/skills/azure-eventhub-java
SKILL.md
Azure Event Hubs SDK for Java
Build real-time streaming applications using the Azure Event Hubs SDK for Java.
Installation
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.19.0</version>
</dependency>
<!-- For checkpoint store (production) -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.20.0</version>
</dependency>
Client Creation
EventHubProducerClient
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
// With connection string
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString("<connection-string>", "<event-hub-name>")
.buildProducerClient();
// Full connection string with EntityPath
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString("<connection-string-with-entity-path>")
.buildProducerClient();
With DefaultAzureCredential
import com.azure.identity.DefaultAzureCredentialBuilder;
EventHubProducerClient producer = new EventHubClientBuilder()
.fullyQualifiedNamespace("<namespace>.servicebus.windows.net")
.eventHubName("<event-hub-name>")
.credential(new DefaultAzureCredentialBuilder().build())
.buildProducerClient();
EventHubConsumerClient
import com.azure.messaging.eventhubs.EventHubConsumerClient;
EventHubConsumerClient consumer = new EventHubClientBuilder()
.connectionString("<connection-string>", "<event-hub-name>")
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.buildConsumerClient();
Async Clients
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;
EventHubProducerAsyncClient asyncProducer = new EventHubClientBuilder()
.connectionString("<connection-string>", "<event-hub-name>")
.buildAsyncProducerClient();
EventHubConsumerAsyncClient asyncConsumer = new EventHubClientBuilder()
.connectionString("<connection-string>", "<event-hub-name>")
.consumerGroup("$Default")
.buildAsyncConsumerClient();
Core Patterns
Send Single Event
import com.azure.messaging.eventhubs.EventData;
EventData eventData = new EventData("Hello, Event Hubs!");
producer.send(Collections.singletonList(eventData));
Send Event Batch
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
// Create batch
EventDataBatch batch = producer.createBatch();
// Add events (returns false if batch is full)
for (int i = 0; i < 100; i++) {
EventData event = new EventData("Event " + i);
if (!batch.tryAdd(event)) {
// Batch is full, send and create new batch
producer.send(batch);
batch = producer.createBatch();
batch.tryAdd(event);
}
}
// Send remaining events
if (batch.getCount() > 0) {
producer.send(batch);
}
Send to Specific Partition
CreateBatchOptions options = new CreateBatchOptions()
.setPartitionId("0");
EventDataBatch batch = producer.createBatch(options);
batch.tryAdd(new EventData("Partition 0 event"));
producer.send(batch);
Send with Partition Key
CreateBatchOptions options = new CreateBatchOptions()
.setPartitionKey("customer-123");
EventDataBatch batch = producer.createBatch(options);
batch.tryAdd(new EventData("Customer event"));
producer.send(batch);
Event with Properties
EventData event = new EventData("Order created");
event.getProperties().put("orderId", "ORD-123");
event.getProperties().put("customerId", "CUST-456");
event.getProperties().put("priority", 1);
producer.send(Collections.singletonList(event));
Receive Events (Simple)
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.PartitionEvent;
// Receive from specific partition
Iterable<PartitionEvent> events = consumer.receiveFromPartition(
"0", // partitionId
10, // maxEvents
EventPosition.earliest(), // startingPosition
Duration.ofSeconds(30) // timeout
);
for (PartitionEvent partitionEvent : events) {
EventData event = partitionEvent.getData();
System.out.println("Body: " + event.getBodyAsString());
System.out.println("Sequence: " + event.getSequenceNumber());
System.out.println("Offset: " + event.getOffset());
}
EventProcessorClient (Production)
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
// Create checkpoint store
BlobContainerAsyncClient blobClient = new BlobContainerClientBuilder()
.connectionString("<storage-connection-string>")
.containerName("checkpoints")
.buildAsyncClient();
// Create processor
EventProcessorClient processor = new EventProcessorClientBuilder()
.connectionString("<eventhub-connection-string>", "<event-hub-name>")
.consumerGroup("$Default")
.checkpointStore(new BlobCheckpointStore(blobClient))
.processEvent(eventContext -> {
EventData event = eventContext.getEventData();
System.out.println("Processing: " + event.getBodyAsString());
// Checkpoint after processing
eventContext.updateCheckpoint();
})
.processError(errorContext -> {
System.err.println("Error: " + errorContext.getThrowable().getMessage());
System.err.println("Partition: " + errorContext.getPartitionContext().getPartitionId());
})
.buildEventProcessorClient();
// Start processing
processor.start();
// Keep running...
Thread.sleep(Duration.ofMinutes(5).toMillis());
// Stop gracefully
processor.stop();
Batch Processing
EventProcessorClient processor = new EventProcessorClientBuilder()
.connectionString("<connection-string>", "<event-hub-name>")
.consumerGroup("$Default")
.checkpointStore(new BlobCheckpointStore(blobClient))
.processEventBatch(eventBatchContext -> {
List<EventData> events = eventBatchContext.getEvents();
System.out.printf("Received %d events%n", events.size());
for (EventData event : events) {
// Process each event
System.out.println(event.getBodyAsString());
}
// Checkpoint after batch
eventBatchContext.updateCheckpoint();
}, 50) // maxBatchSize
.processError(errorContext -> {
System.err.println("Error: " + errorContext.getThrowable());
})
.buildEventProcessorClient();
Async Receiving
asyncConsumer.receiveFromPartition("0", EventPosition.latest())
.subscribe(
partitionEvent -> {
EventData event = partitionEvent.getData();
System.out.println("Received: " + event.getBodyAsString());
},
error -> System.err.println("Error: " + error),
() -> System.out.println("Complete")
);
Get Event Hub Properties
// Get hub info
EventHubProperties hubProps = producer.getEventHubProperties();
System.out.println("Hub: " + hubProps.getName());
System.out.println("Partitions: " + hubProps.getPartitionIds());
// Get partition info
PartitionProperties partitionProps = producer.getPartitionProperties("0");
System.out.println("Begin sequence: " + partitionProps.getBeginningSequenceNumber());
System.out.println("Last sequence: " + partitionProps.getLastEnqueuedSequenceNumber());
System.out.println("Last offset: " + partitionProps.getLastEnqueuedOffset());
Event Positions
// Start from beginning
EventPosition.earliest()
// Start from end (new events only)
EventPosition.latest()
// From specific offset
EventPosition.fromOffset(12345L)
// From specific sequence number
EventPosition.fromSequenceNumber(100L)
// From specific time
EventPosition.fromEnqueuedTime(Instant.now().minus(Duration.ofHours(1)))
Error Handling
import com.azure.messaging.eventhubs.models.ErrorContext;
.processError(errorContext -> {
Throwable error = errorContext.getThrowable();
String partitionId = errorContext.getPartitionContext().getPartitionId();
if (error instanceof AmqpException) {
AmqpException amqpError = (AmqpException) error;
if (amqpError.isTransient()) {
System.out.println("Transient error, will retry");
}
}
System.err.printf("Error on partition %s: %s%n", partitionId, error.getMessage());
})
Resource Cleanup
// Always close clients
try {
producer.send(batch);
} finally {
producer.close();
}
// Or use try-with-resources
try (EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.buildProducerClient()) {
producer.send(events);
}
Environment Variables
EVENT_HUBS_CONNECTION_STRING=Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=...
EVENT_HUBS_NAME=<event-hub-name>
STORAGE_CONNECTION_STRING=<for-checkpointing>
Best Practices
- Use EventProcessorClient: For production, provides load balancing and checkpointing
- Batch Events: Use
EventDataBatchfor efficient sending - Partition Keys: Use for ordering guarantees within a partition
- Checkpointing: Checkpoint after processing to avoid reprocessing
- Error Handling: Handle transient errors with retries
- Close Clients: Always close producer/consumer when done
Trigger Phrases
- "Event Hubs Java"
- "event streaming Azure"
- "real-time data ingestion"
- "EventProcessorClient"
- "event hub producer consumer"
- "partition processing"
Recommended Agent Skills
Expand your agent's capabilities with these related and highly-rated skills.
podcast-generation
Generate AI-powered podcast-style audio narratives using Azure OpenAI's GPT Realtime Mini model via WebSocket. Use when building text-to-speech features, audio narrative generation, podcast creation from content, or integrating with Azure OpenAI Realtime API for real audio output. Covers full-stack implementation from React frontend to Python FastAPI backend with WebSocket streaming.
mcp-builder
Guide for creating high-quality MCP (Model Context Protocol) servers that enable LLMs to interact with external services through well-designed tools. Use when building MCP servers to integrate external APIs or services, whether in Python (FastMCP), Node/TypeScript (MCP SDK), or C#/.NET (Microsoft MCP SDK).
frontend-design-review
Review and create distinctive, production-grade frontend interfaces with high design quality and design system compliance. Evaluates using three pillars: frictionless insight-to-action, quality craft, and trustworthy building. USE FOR: PR reviews, design reviews, accessibility audits, design system compliance checks, creative frontend design, UI code review, component reviews, responsive design checks, theme testing, and creating memorable UI. DO NOT USE FOR: Backend API reviews, database schema reviews, infrastructure or DevOps work, pure business logic without UI, or non-frontend code.
entra-agent-id
Microsoft Entra Agent ID (preview) for creating OAuth2-capable AI agent identities via Microsoft Graph beta API. Covers Agent Identity Blueprints, BlueprintPrincipals, Agent Identities, required permissions, sponsors, and Workload Identity Federation. Includes Microsoft Entra SDK for AgentID (containerized sidecar) for polyglot agent authentication (Docker/Kubernetes), 3P agent integration, autonomous and interactive agent patterns. Triggers: "agent identity", "agent id", "Agent Identity Blueprint", "BlueprintPrincipal", "entra agent", "agent identity provisioning", "Graph agent identity", "entra sidecar", "agent id sidecar", "auth sidecar", "3P agent", "third-party agent identity", "polyglot agent auth".
github-issue-creator
Convert raw notes, error logs, voice dictation, or screenshots into crisp GitHub-flavored markdown issue reports. Use when the user pastes bug info, error messages, or informal descriptions and wants a structured GitHub issue. Supports images/GIFs for visual evidence.
copilot-sdk
Build applications powered by GitHub Copilot using the Copilot SDK. Use when creating programmatic integrations with Copilot across Node.js/TypeScript, Python, Go, or .NET. Covers session management, custom tools, streaming, hooks, MCP servers, BYOK providers, session persistence, custom agents, skills, and deployment patterns. Requires GitHub Copilot CLI installed and a GitHub Copilot subscription (unless using BYOK).
Didn't find tool you were looking for?