Skip to content

Usage Standalone

ran edited this page Jun 11, 2020 · 4 revisions
  1. clone this project from GitHub to your local.

    git clone https://github.com/streamnative/aop.git
    cd aop
  2. build the project.

    mvn clean install -DskipTests
  3. copy the nar package to pulsar protocols directory.

    cp ./amqp-impl/target/pulsar-protocol-handler-amqp-${version}.nar $PULSAR_HOME/protocols/pulsar-protocol-handler-amqp-${version}.nar
  4. modify pulsar standalone conf

    # conf file: $PULSAR_HOME/conf/standalone.conf
    
    # add amqp configs
    messagingProtocols=amqp
    protocolHandlerDirectory=./protocols
    
    amqpListeners=amqp://127.0.0.1:5672
    advertisedAddress=127.0.0.1
    
  5. start pulsar use standalone mode

    $PULSAR_HOME/bin/pulsar standalone
    
  6. add namespace for vhost

    # for example, the vhost name is `vhost1`
    bin/pulsar-admin namespaces create -b 1 public/vhost1
    # set retention for the namespace
    bin/pulsar-admin namespaces set-retention -s 100M -t 2d public/vhost1
    
  7. use RabbitMQ client test

    # add RabbitMQ client dependency in your project
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.8.0</version>
    </dependency>
    
    // Java Code
    
    // create connection
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setVirtualHost("vhost1");
    connectionFactory.setHost("127.0.0.1");
    connectionFactory.setPort(5672);
    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();
    
    String exchange = "ex";
    String queue = "qu";
    
    // exchage declare
    channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT, true, false, false, null);
    
    // queue declare and bind
    channel.queueDeclare(queue, true, false, false, null);
    channel.queueBind(queue, exchange, "");
    
    // publish some messages
    for (int i = 0; i < 100; i++) {
        channel.basicPublish(exchange, "", null, ("hello - " + i).getBytes());
    }
    
    // consume messages
    CountDownLatch countDownLatch = new CountDownLatch(100);
    channel.basicConsume(queue, true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("receive msg: " + new String(body));
            countDownLatch.countDown();
        }
    });
    countDownLatch.await();
    
    // release resource
    channel.close();
    connection.close();
    
Clone this wiki locally