-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Open
Description
public class UserInterestTask {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = Property.getKafkaProperties("interest");
DataStreamSource<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>("con", new SimpleStringSchema(), properties));
dataStream.map(new GetLogFunction())
//同一个用户对同一个产品的兴趣度,我觉得这里的key分区应该要加上"productId",-->.keyBy("userId", "productId")
// 不然原文.keyBy("userId")的意思我理解是,用一个用户只要对不同产品的操作间隔时间(如购物 - 浏览 < 100s)则判定为一次兴趣事件,似乎不太对
.keyBy("userId", "productId")
.map(new UserHistoryWithInterestMapFunction());
env.execute("User Product History");
}
}
Roy-se7en
Metadata
Metadata
Assignees
Labels
No labels