DataHub Java接入实时数据

DataHub Java接入实时数据DataHubJava接入实时数据序言问题代码总结序言Datahub的相关介绍和优势,我在这里就不一一赘述,留个官方文档的连接([DataHub官方文档](https://help.aliyun.com/document_detail/47439.html?spm=a2c0j.8235941.654670.ddoc.26d91a22JWAbt9)),大家可以自己去看看。我想在这里记录的是…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全家桶1年46,售后保障稳定

DataHub Java接入实时数据

序言

  Datahub的相关介绍和优势,我在这里就不一一赘述。大家可以自己去看官方文档。我想在这里记录的是我做这个
  需求中遇到的一点问题和它们的解决办法,如果大家有更好的思路和办法,欢迎大家指正。

Jetbrains全家桶1年46,售后保障稳定

问题

1、在项目开始是只运行一次任务代码,即DataHub的接流任务。由于写博客的时间距离我当初写代码的时间比较久了,
我已经记不得这个问题当初具体的详细情景,项目用的spring的框架。在网上也找了很久的办法,最后,总之,我用
了一个取巧的方法,利用spring的定时任务,每十年执行一次,相信应该没有一个项目会连续在服务器上跑十年吧。。。
/** * 一次性定时任务,每十年执行一次 */
@Scheduled(fixedRate = 1000*60*60*24*365*10)
2、第二个问题是关于DataHub的游标问题。在早期的DataHub的产品中并没有提供游标的存储,用户需要自己存储游
标,以便在项目重启后、或接流异常中断以后继续读取数据。当然,目前的DataHub已经支持游标的存储,只需要我们
进行简单的配置。在这里,需要注意一点,一个project下多个topic只要一个游标就可以了。这块可以去看官方的示
例代码,对着改就行了。
3、利用多线程对多个topic进行接流。个人认为,这是我多线程目前为止用的最好的一次了。
4、几个线程池概念,这块有机会还是要重新理一下。
Java通过Executors提供四种线程池,分别为:
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,
则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照
指定顺序(FIFO, LIFO, 优先级)执行。

代码

package com.bywin.apsarabrain.trafficgis.web.api.road.task;

import com.aliyun.datahub.DatahubClient;
import com.aliyun.datahub.DatahubConfiguration;
import com.aliyun.datahub.auth.AliyunAccount;
import com.aliyun.datahub.common.data.Field;
import com.aliyun.datahub.common.data.FieldType;
import com.aliyun.datahub.common.data.RecordSchema;
import com.aliyun.datahub.exception.OffsetResetedException;
import com.aliyun.datahub.exception.OffsetSessionChangedException;
import com.aliyun.datahub.exception.SubscriptionOfflineException;
import com.aliyun.datahub.model.*;
import com.bywin.apsarabrain.trafficgis.biz.service.IDwdTfcEvtSftAmapreportRtServ;
import com.bywin.apsarabrain.trafficgis.dal.entity.DwdTfcEvtSftAmapreportRtEntity;
import com.bywin.apsarabrain.trafficgisb.common.util.StringUtil;
import org.codehaus.jackson.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/** * @Author: zyye * @Date: 2018/10/29 22:00 * @Description: 交通事件(一次性定时任务,每十年执行一次) */
@Component("TrafficEventTask")
public class TrafficEventTask { 
   

    private static final Logger LOGGER = LoggerFactory.getLogger("TrafficEventTask");

    @Autowired
    private IDwdTfcEvtSftAmapreportRtServ serv;

    @Value("${dataHub.accessId}")
    private String accessId;

    @Value("${dataHub.accessKey}")
    private String accessKey;

    @Value("${dataHub.endpoint}")
    private String endpoint;

    @Value("${dataHub.projectName}")
    private String projectName;

    @Value("${dataHub.topicName}")
    private String topicName;

    @Value("${dataHub.subId}")
    private String subId;

    DatahubConfiguration conf;

    private DatahubClient client;

    /** * 一个基于数组结构的有界阻塞队列,按FIFO原则进行排序 */
    static BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10);

    /** * 线程池 */
    static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 10,
            1, TimeUnit.MINUTES, blockingQueue);

    /** * 一次性定时任务,每十年执行一次 */
    @Scheduled(fixedRate = 1000*60*60*24*365*10)
    public void run() { 
   
        LOGGER.info("--------------run trafficEventTask----------------");
        //多台服务器,只需要一台服务器执行定时任务,通过路径判断
        if (StringUtil.isRunTask()){ 
   
            LOGGER.info("++++++++++++++++++++runing trafficEventTask++++++++++++++++++++++++");
            AliyunAccount account = new AliyunAccount(accessId, accessKey);
            conf = new DatahubConfiguration(account, endpoint);
            client = new DatahubClient(conf);
            //查询有几个shard
            ListShardResult listShardResult = client.listShard(projectName, topicName);

            //根据shard的数量创建线程消费
            for(int i=0;i<listShardResult.getShards().size();i++)
            { 
   
                String shardId = listShardResult.getShards().get(i).getShardId();
                Runnable runnable=new TaskWithoutResult(shardId);
                threadPoolExecutor.submit(runnable);
            }
            //threadPoolExecutor.execute(()-> System.out.println(Thread.currentThread().getName()));
            //threadPoolExecutor.shutdown();//不会触发中断
            //threadPoolExecutor.shutdownNow();//会触发中断
        }
    }

    /** * 无返回值的多线程任务 * @author zyye * */
    class TaskWithoutResult implements Runnable
    { 
   
        /** * dataHub的shardId */
        private String shardId;

        public TaskWithoutResult(String shardId)
        { 
   
            this.shardId=shardId;
        }

        @Override
        public void run(){ 
   
            LOGGER.info("线程_shardId="+shardId+"::"+Thread.currentThread()+"开始运行");
            try { 
   
                task(shardId);
            } catch (Exception e) { 
   
                //捕捉中断异常
                LOGGER.info("线程_shardId="+shardId+"::"+Thread.currentThread()+"被中断");
            }
            LOGGER.info("线程_shardId="+shardId+"::"+Thread.currentThread()+"结束运行");
        }
    }

    /** * 根据dataHub的shardId从多线程里读取数据 * @param shardId */
    private void task(String shardId) { 
   
        LOGGER.info("开始执行dataHub任务!shardId="+shardId);

        //获得dataHub上对应的字段
        RecordSchema schema = new RecordSchema();
        schema.addField(new Field("src_evt_id", FieldType.STRING));
        schema.addField(new Field("gmt_create", FieldType.STRING));
        schema.addField(new Field("lng", FieldType.DOUBLE));
        schema.addField(new Field("lat", FieldType.DOUBLE));
        schema.addField(new Field("geohash", FieldType.STRING));
        schema.addField(new Field("evt_desc", FieldType.STRING));
        schema.addField(new Field("evt_start_time", FieldType.STRING));
        schema.addField(new Field("evt_end_time", FieldType.STRING));
        schema.addField(new Field("evt_update_time", FieldType.STRING));
        schema.addField(new Field("evt_detail", FieldType.STRING));
        schema.addField(new Field("evt_type_no", FieldType.BIGINT));
        schema.addField(new Field("sub_evt_type_no", FieldType.BIGINT));
        schema.addField(new Field("nick_name", FieldType.STRING));
        schema.addField(new Field("picture", FieldType.STRING));
        schema.addField(new Field("road_name", FieldType.STRING));
        schema.addField(new Field("region_shape", FieldType.STRING));
        schema.addField(new Field("dt", FieldType.STRING));
        schema.addField(new Field("adcode", FieldType.STRING));

        try { 
   
            boolean bExit = false;
            GetTopicResult topicResult = client.getTopic(projectName, topicName);
            // 首先初始化offset上下文
            OffsetContext offsetCtx = client.initOffsetContext(projectName, topicName, subId, shardId);
            // 开始消费的cursor
            String cursor = null;
            if (!offsetCtx.hasOffset()) { 
   
                // 之前没有存储过点位,先获取初始点位,比如这里获取当前该shard最早的数据
                GetCursorResult cursorResult = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);
                cursor = cursorResult.getCursor();
            } else { 
   
                // 否则,获取当前已消费点位的下一个cursor
                cursor = client.getNextOffsetCursor(offsetCtx).getCursor();
            }
// System.out.println("Start consume records, begin offset context:" + offsetCtx.toObjectNode().toString()
// + ", cursor:" + cursor);
            long recordNum = 0L;
            while (!bExit) { 
   
                try { 
   
                    GetRecordsResult recordResult = client.getRecords(projectName, topicName, shardId, cursor, 10,
                            topicResult.getRecordSchema());
                    List<RecordEntry> records = recordResult.getRecords();
                    if (records.size() == 0) { 
   
                        // 将最后一次消费点位上报
                        client.commitOffset(offsetCtx);
// System.out.println("commit offset suc! offset context: " + offsetCtx.toObjectNode().toString());
                        // 可以先休眠(30秒)一会,再继续消费新记录
                        Thread.sleep(1000*30);
// System.out.println("sleep 1s and continue consume records! shard id:" + shardId);
                    } else { 
   
                        //将dataHub的数据序列化以后存到数据库
                        List<DwdTfcEvtSftAmapreportRtEntity> eventList = new ArrayList<>();
                        for (RecordEntry record : records) { 
   
                            // 处理记录逻辑
                            DwdTfcEvtSftAmapreportRtEntity dwdTfcEvtSftAmapreportRtEntity = new DwdTfcEvtSftAmapreportRtEntity();
                            JsonNode jsonNode = record.toJsonNode().get("Data");
// System.out.println(jsonNode.toString());
                            eventList.add(dwdTfcEvtSftAmapreportRtEntity.jsonNodeToEntity(jsonNode));
                            // 上报点位,该示例是每处理100条记录上报一次点位
                            offsetCtx.setOffset(record.getOffset());
                            recordNum++;
                            if (recordNum % 100 == 0) { 
   
                                client.commitOffset(offsetCtx);
// System.out.println("commit offset suc! offset context: " + offsetCtx.toObjectNode().toString());
                            }
                        }
                        serv.batchInsert(eventList);
                        cursor = recordResult.getNextCursor();
                    }
                } catch (SubscriptionOfflineException e) { 
   
                    // 订阅下线,退出
                    bExit = true;
                    e.printStackTrace();
                } catch (OffsetResetedException e) { 
   
                    // 点位被重置,更新offset上下文
                    client.updateOffsetContext(offsetCtx);
                    cursor = client.getNextOffsetCursor(offsetCtx).getCursor();
                    LOGGER.info("Restart consume shard:" + shardId + ", reset offset:"
                            + offsetCtx.toObjectNode().toString() + ", cursor:" + cursor);
                } catch (OffsetSessionChangedException e) { 
   
                    // 其他consumer同时消费了该订阅下的相同shard,退出
                    bExit = true;
                    e.printStackTrace();
                } catch (Exception e) { 
   
                    //bExit = true;
                    //当线程意外中断时,等待一段时间再继续运行,而不是退出
                    LOGGER.info("thread shardId="+shardId+"is Exception........................");
                    Thread.sleep(1000*30);
                    e.printStackTrace();
                }
            }
        } catch (Exception e) { 
   
            e.printStackTrace();
        }
    }
}

总结

一个人的未来永远不会取决于你目前所在的位置,而是取决你的心想要到达的地方。
我希望这是一个开始,我期待生活中的更多可能性。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/234670.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • mysql数据库备份和还原的命令_Mysql数据库备份和还原常用的命令

    mysql数据库备份和还原的命令_Mysql数据库备份和还原常用的命令Mysql数据库备份和还原常用的命令是进行Mysql数据库备份和还原的关键,没有命令,什么都无从做起,更谈不上什么备份还原,只有给系统这个命令,让它去执行,才能完成Mysql数据库备份和还原的操作,下面就是操作的常用命令。一、备份命令1、备份MySQL数据库的命令mysqldump-hhostname-uusername-ppassworddatabasename>backupf…

    2022年5月4日
    54
  • 第一章《初识数据库》

    第一章《初识数据库》

    2021年5月28日
    94
  • journalctl清空日志

    journalctl清空日志Howtoclearjournalctl步骤journalctl–flushjournalctl–rotatejournalctl–vacuum-time=1sjournalctl–vacuum-time=2djournalctl–vacuum-size=50M

    2022年5月22日
    121
  • tomcat8.5支持jdk1.8吗_tomcat jdk版本

    tomcat8.5支持jdk1.8吗_tomcat jdk版本最近接收到任务要让公司的框架支持Http2协议,主要是RPC之间走Http2。通过查找官网以及上网找资料和咨询大神的帮助,终于找到以下两种方式,蠢人不多话,直接上代码。网上大多数都是抄来抄去的,所以希望本文能帮助到大家,共同学习吖springboot2.1.4+tomcat9+java8 这个方法是我从外网查找到的,但是缺点就是需要额外的再开一个端口来接收h2c的请求有兴趣想看原…

    2022年10月26日
    0
  • Red5搭建直播平台

    Red5搭建直播平台Red5搭建直播平台

    2022年4月23日
    44
  • virsh console 进不去虚拟机_virsh 命令

    virsh console 进不去虚拟机_virsh 命令参考自链接http://www.2cto.com/os/201411/354288.html下的文章,感谢作者,自己整理备份,以备查用。问题描述:       先执行命令virshstartmycentos,启动虚拟机。       当执行命令virshconsolemycentos后出现如下显示:       virshconsolemycentos

    2022年8月12日
    3

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号