PulsarConsumer例子


import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.schema.JSONSchema;
public class ReceiveMsgTest {
    public static void main(String[] args) {
    String url = "http://192.168.1.48:8080";        
        try{
            PulsarClient client =PulsarClient.builder()
                    .serviceUrl(url)                    
                    .build();
             Consumer consumer=client.newConsumer(JSONSchema.of(UserModel.class))
             .topic("my-tenant/my-namespace/testschema-topic")           
             .subscriptionType(SubscriptionType.Exclusive)//订阅模式  Exclusive(独占,默认模式) Failover(灾备)Shared(共享)
             .subscriptionName("wbq_1")//订阅者名称
             .subscribe();
             while (true) {
                Message userModelmsg = consumer.receive();
                UserModel userModel=userModelmsg.getValue();
                System.out.println("receive message: " +userModel.getName()+"="+userModel.getAge());
                consumer.acknowledge(userModelmsg.getMessageId());//应答后此订阅者不会在收到此消息
             }
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

public class UserModel {

    private String name;

    private int age;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}

网站栏目:PulsarConsumer例子
本文地址:http://bzwzjz.com/article/pgodcc.html

其他资讯

Copyright © 2007-2020 广东宝晨空调科技有限公司 All Rights Reserved 粤ICP备2022107769号
友情链接: 网站建设 手机网站建设套餐 专业网站设计 成都网站制作 手机网站制作 成都网站制作 成都网站设计 成都网站建设 成都网站制作 成都定制网站建设 手机网站制作 企业网站设计 成都企业网站建设公司 重庆网站制作 重庆企业网站建设 成都网站建设 重庆网站建设 成都网站建设公司 网站设计 成都网站建设公司 成都商城网站制作 企业网站设计