當前位置:網站首頁>Flink(50):Flink之綜合練習(二)

Flink(50):Flink之綜合練習(二)

2022-01-28 10:44:55 電光閃爍

目錄

0. 相關文章鏈接

1. 需求

2. 數據

3. 編程步驟

4. 代碼實現

5. 效果展示


0. 相關文章鏈接

Flink文章匯總

1. 需求

在電商領域會有這麼一個場景,如果用戶買了商品,在訂單完成之後,一定時間之內沒有做出評價,系統自動給與五星好評,我們今天主要使用Flink的定時器來簡單實現這一功能。

 

2. 數據

        自定義source模擬生成一些訂單數據,在這裏,我們生了一個最簡單的二元組Tuple3,包含用戶id,訂單id和訂單完成時間個字段。

/**
 * 自定義source實時產生訂單數據Tuple3<用戶id,訂單id, 訂單生成時間>
 */
public static class MySource implements SourceFunction<Tuple3<String, String, Long>> {
    private boolean flag = true;
    @Override
    public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {
        Random random = new Random();
        while (flag) {
            String userId = random.nextInt(5) + "";
            String orderId = UUID.randomUUID().toString();
            long currentTimeMillis = System.currentTimeMillis();
            ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis));
            Thread.sleep(500);
        }
    }

    @Override
    public void cancel() {
        flag = false;
    }
}

3. 編程步驟

  • 創建執行環境env
  • 創建自定義數據源source
  • 進行數據轉換計算transformation
    • 設置經過interval毫秒用戶未對訂單做出評價,自動給與好評.為了演示方便,設置5s的時間
    • 分組後使用自定義KeyedProcessFunction完成定時判斷超時訂單並自動好評
    • 定義MapState類型的狀態,key是訂單號,value是訂單完成時間
    • 創建MapState
    • 注册定時器
    • 定時器被觸發時執行並輸出結果
  • 創建數據輸出sink
  • 啟動執行execute

4. 代碼實現

package cn.itcast.action;

import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.UUID;

/**
 * Author itcast
 * Desc
 * 在電商領域會有這麼一個場景,如果用戶買了商品,在訂單完成之後,一定時間之內沒有做出評價,系統自動給與五星好評,
 * 我們今天主要使用Flink的定時器來簡單實現這一功能。
 */
public class OrderAutomaticFavorableComments {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.source
        DataStreamSource<Tuple3<String, String, Long>> sourceDS = env.addSource(new MySource());
        //這裏可以使用訂單生成時間作為事件時間,代碼和之前的一樣
        //這裏不作為重點,所以簡化處理!

        //3.transformation
        //設置經過interval用戶未對訂單做出評價,自動給與好評.為了演示方便,設置5000ms的時間
        long interval = 5000L;
        //分組後使用自定義KeyedProcessFunction完成定時判斷超時訂單並自動好評
        sourceDS.keyBy(0) //實際中可以對用戶id進行分組
                //KeyedProcessFunction:進到窗口的數據是分好組的
                //ProcessFunction:進到窗口的數據是不區分分組的
                .process(new TimerProcessFuntion(interval));
        //4.execute
        env.execute();
    }

    /**
     * 自定義source實時產生訂單數據Tuple2<訂單id, 訂單生成時間>
     */
    public static class MySource implements SourceFunction<Tuple3<String, String, Long>> {
        private boolean flag = true;
        @Override
        public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {
            Random random = new Random();
            while (flag) {
                String userId = random.nextInt(5) + "";
                String orderId = UUID.randomUUID().toString();
                long currentTimeMillis = System.currentTimeMillis();
                ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis));
                Thread.sleep(500);
            }
        }

        @Override
        public void cancel() {
            flag = false;
        }
    }

    /**
     * 自定義處理函數用來給超時訂單做自動好評!
     * 如一個訂單進來:<訂單id, 2020-10-10 12:00:00>
     * 那麼該訂單應該在12:00:00 + 5s 的時候超時!
     * 所以我們可以在訂單進來的時候設置一個定時器,在訂單時間 + interval的時候觸發!
     * KeyedProcessFunction<K, I, O>
     * KeyedProcessFunction<Tuple就是String, Tuple3<用戶id, 訂單id, 訂單生成時間>, Object>
     */
    public static class TimerProcessFuntion extends KeyedProcessFunction<Tuple, Tuple3<String, String, Long>, Object> {
        private long interval;

        public TimerProcessFuntion(long interval) {
            this.interval = interval;//傳過來的是5000ms/5s
        }

        //3.1定義MapState類型的狀態,key是訂單號,value是訂單完成時間
        //定義一個狀態用來記錄訂單信息
        //MapState<訂單id, 訂單完成時間>
        private MapState<String, Long> mapState;

        //3.2初始化MapState
        @Override
        public void open(Configuration parameters) throws Exception {
            //創建狀態描述器
            MapStateDescriptor<String, Long> mapStateDesc = new MapStateDescriptor<>("mapState", String.class, Long.class);
            //根據狀態描述器初始化狀態
            mapState = getRuntimeContext().getMapState(mapStateDesc);
        }


        //3.3注册定時器
        //處理每一個訂單並設置定時器
        @Override
        public void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<Object> out) throws Exception {
            mapState.put(value.f1, value.f2);
            //如一個訂單進來:<訂單id, 2020-10-10 12:00:00>
            //那麼該訂單應該在12:00:00 + 5s 的時候超時!
            //在訂單進來的時候設置一個定時器,在訂單時間 + interval的時候觸發!!!
            ctx.timerService().registerProcessingTimeTimer(value.f2 + interval);
        }

        //3.4定時器被觸發時執行並輸出結果並sink
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
            //能够執行到這裏說明訂單超時了!超時了得去看看訂單是否評價了(實際中應該要調用外部接口/方法查訂單系統!,我們這裏沒有,所以模擬一下)
            //沒有評價才給默認好評!並直接輸出提示!
            //已經評價了,直接輸出提示!
            Iterator<Map.Entry<String, Long>> iterator = mapState.iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, Long> entry = iterator.next();
                String orderId = entry.getKey();
                //調用訂單系統查詢是否已經評價
                boolean result = isEvaluation(orderId);
                if (result) {//已評價
                    System.out.println("訂單(orderid: " + orderId + ")在" + interval + "毫秒時間內已經評價,不做處理");
                } else {//未評價
                    System.out.println("訂單(orderid: " + orderId + ")在" + interval + "毫秒時間內未評價,系統自動給了默認好評!");
                    //實際中還需要調用訂單系統將該訂單orderId設置為5星好評!
                }
                //從狀態中移除已經處理過的訂單,避免重複處理
                iterator.remove();
            }
        }

        //在生產環境下,可以去查詢相關的訂單系統.
        private boolean isEvaluation(String key) {
            return key.hashCode() % 2 == 0;//隨機返回訂單是否已評價
        }
    }
}

5. 效果展示


注:此博客根據某馬2020年賀歲視頻改編而來 -> B站網址

注:其他相關文章鏈接由此進 -> Flink文章匯總


版權聲明
本文為[電光閃爍]所創,轉載請帶上原文鏈接,感謝
https://cht.chowdera.com/2022/01/202201281044552928.html

隨機推薦