Protobuf-ActiveMQ-RPC
![Build Status] (http://img.shields.io/travis/MediaMiser/protobuf-activemq-rpc/master.svg?style=flat-square) ![Coverage Status] (http://img.shields.io/coveralls/MediaMiser/protobuf-activemq-rpc/master.svg?style=flat-square) ![Maven Central] (https://maven-badges.herokuapp.com/maven-central/com.mediamiser/protobuf-activemq-rpc/badge.svg?style=flat-square)
This library provides the classes necessary to create RPC clients and servers defined as [Google Protocol Buffer Services] (https://developers.google.com/protocol-buffers/docs/proto#services) using Apache ActiveMQ to communicate and act as a fault-tolerant message broker so that clients and servers do not need to communicate directly. View the [API documentation] (https://mediamiser.github.io/protobuf-activemq-rpc/apidocs/index.html) for more details.
This project was inspired/based in small part upon the protobuf-socket-rpc (MIT License) and protobuf-rpc-pro (Apache License 2.0) projects.
Requirements
To build this project, your development environment must have installed:
- [Oracle JDK 7+] (http://www.oracle.com/technetwork/java/javase/downloads/index.html)
- Apache Maven 3.x
- [Google Protocol Buffers 2.5.0] (https://developers.google.com/protocol-buffers/) (use
install-protobuff.sh
)
This project is designed to be used with:
Usage
To use this in your project, you should have an [Apache ActiveMQ] (http://activemq.apache.org/) cluster to connect to and add the following dependency to your project's pom.xml
file:
<dependency>
<groupId>com.mediamiser</groupId>
<artifactId>protobuf-activemq-rpc</artifactId>
<version>1.0.6</version>
</dependency>
RPCs and their input/output messages are defined in a language-agnostic manner using Google Protocol Buffers . An example can be found in tests as [src/test/proto/pings.proto
] (src/test/proto/pings.proto).
package com.mediamiser.service;
option java_generic_services = true;
option java_package = "com.mediamiser.service";
option java_outer_classname = "PingsProtocol";
message Host {
required string ip = 1;
optional string hostname = 2;
}
message Pings {
repeated uint32 time_ms = 1;
}
service PingsServiceV1 {
rpc ping(Host) returns (Pings);
}
...
This proto file can be compiled using Maven and [maven-protoc-plugin
] (http://sergei-ivanov.github.io/maven-protoc-plugin/index.html) (see pom.xml
for example usage). These plugins require that the protoc
compiler is executable from your shell and on your $PATH
(on Ubuntu you can install it using [sudo install-protobuff.sh
] (sbin/install-prodobuff.sh)). Running mvn compile
will generate a PingsProtocol.java
file providing messages and RPC stubs for PingsServiceV1
.
Useful links
When writing your RPCs and messages, take into account the protobuf:
- Style guide
- [Updating a message type while remaining backwards-compatible] (https://developers.google.com/protocol-buffers/docs/proto#updating)
Client
A client that includes this library and has built the proto files for PingsServiceV1
can try to reach a server hosting this service using the code below. A full client is demonstrated in [ExampleClient.java
] (src/test/java/com/mediamiser/service/ExampleClient.java).
Note that [ActiveMqChannel
] (https://mediamiser.github.io/protobuf-activemq-rpc/apidocs/com/mediamiser/service/ActiveMqChannel.html) is a heavyweight object and it should be recycled if possible e.g. by using an [object pool] (https://commons.apache.org/proper/commons-pool/).
// Connect to an ActiveMQ broker (in this case a local one on your development
// machine)
final Connection connection =
new ActiveMQConnectionFactory("tcp://localhost:61616")
.createConnection();
// Start the connection to allow it to be used to send RPC calls
connection.start();
// Specify which services are available through this ActiveMQ broker
final Set<ServiceDescriptor> availableServices =
Sets.newHashSet(PingsProtocol.PingsServiceV1.getDescriptor());
// Specify the time to live for RPC calls in milliseconds. ActiveMQ and the
// server will drop messages that expire, and clients that are blocking will
// block for this amount of time before reporting failure. This can be set to
// ActiveMqChannel.DISABLE_EXPIRY to disable message expiry for a specific
// client (call forwarders on remote machines all respect each channel's
// individual expiry time).
final long timeToLiveMs = 1000;
// Create a channel to the available services with a specific call expiry time
final ActiveMqChannel channel =
new ActiveMqChannel(connection, availableServices, timeToLiveMs);
// Create a controller to get status and error information
final RpcController controller = new ActiveMqController();
// Define a request
final Host request = Host.newBuilder()
.setIp("127.0.0.1")
.setHostname("42")
.build();
// Synchronously make 1 call and wait for its completion
final Pings responseTimes =
PingsProtocol.PingsServiceV1.newBlockingStub(channel)
.ping(controller, request);
// Clean up resources
channel.close();
connection.close();
Server
A server can create a provider for a service and extends the automatically generated PingsServiceV1
class. Providers are instances which implement remote procedures and they are executed by [ActiveMqCallForwarder
] (https://mediamiser.github.io/protobuf-activemq-rpc/apidocs/com/mediamiser/service/ActiveMqCallForwarder.html) instances (each of which can be thought of as a thread). Providers should be thread safe if you are going to share one provider instance with many forwarders.
// Copyright (c) 2014 MediaMiser Ltd. All rights reserved.
package com.mediamiser.service.providers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.mediamiser.service.PingsProtocol;
import com.mediamiser.service.PingsProtocol.Host;
import com.mediamiser.service.PingsProtocol.Pings;
/**
* This provider functions properly, but returns the requested hostname as a
* response time to help indicate that the correct response was delivered to
* the correct caller.
*
* @author Chris Fournier <[email protected]>
*/
public class PingsServiceProvider extends PingsProtocol.PingsServiceV1 {
private static final Logger LOG =
LoggerFactory.getLogger(PingsServiceProvider.class);
@Override
public void ping(final RpcController controller,
final Host request,
final RpcCallback<Pings> done) {
LOG.trace("Got request to ping {} ({})",
request.getIp(),
request.getHostname());
// Add fake times (in this case, the hostname, so that we can make sure
// that the correct message got sent back to the correct caller during
// tests)
final Pings.Builder responseTimes = Pings.newBuilder();
responseTimes.addTimeMs(Integer.parseInt(request.getHostname()));
// Return the response times via the callback
done.run(responseTimes.build());
}
}
This provider alone will not serve requests; it must be connect to at least one [ActiveMqCallForwarder
] (https://mediamiser.github.io/protobuf-activemq-rpc/apidocs/com/mediamiser/service/ActiveMqCallForwarder.html) which has a live connection to an ActiveMQ broker (as demonstrated superficially below and fully in [ExampleServer.java
] (src/test/java/com/mediamiser/service/ExampleServer.java)).
// Connect to ActiveMQ
final Connection connection =
new ActiveMQConnectionFactory("tcp://localhost:61616")
.createConnection();
// Construct services to host
final Set<Service> availableServices =
Sets.newHashSet(new PingsServiceProvider());
// Create a forwarder (akin to a thread) to host this service and store it
final ActiveMqCallForwarder forwarder =
new ActiveMqCallForwarder(connection, availableServices);
// Begin processing calls
connection.start();
...
// Cleanup resources
connection.stop();
forwarder.close();
connection.close();
Development
Feel free to report an issue or submit a pull-request. Changes can be tested locally using:
mvn clean test
Licensing
Protobuf-ActiveMQ-RPC is licensed under the BSD 3-Clause license.