生活随笔
收集整理的這篇文章主要介紹了
在MaxCompute中利用bitmap进行数据处理
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
很多數(shù)據(jù)開發(fā)者使用bitmap技術(shù)對用戶數(shù)據(jù)進(jìn)行編碼和壓縮,然后利用bitmap的與/或/非的極速處理速度,實(shí)現(xiàn)類似用戶畫像標(biāo)簽的人群篩選、運(yùn)營分析的7日活躍等分析。
本文給出了一個(gè)使用MaxCompute MapReduce開發(fā)一個(gè)對不同日期活躍用戶ID進(jìn)行bitmap編碼和計(jì)算的樣例。供感興趣的用戶進(jìn)一步了解、分析,并應(yīng)用在自己的場景下。
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Iterator;public class bitmapDemo2
{public static class BitMapper extends MapperBase {Record key;Record value;@Overridepublic void setup(TaskContext context) throws IOException {key = context.createMapOutputKeyRecord();value = context.createMapOutputValueRecord();}@Overridepublic void map(long recordNum, Record record, TaskContext context)throws IOException{RoaringBitmap mrb=new RoaringBitmap();long AID=0;{{{{AID=record.getBigint("id");mrb.add((int) AID);//獲取keykey.set(new Object[] {record.getString("active_date")});}}}}ByteBuffer outbb = ByteBuffer.allocate(mrb.serializedSizeInBytes());mrb.serialize(new DataOutputStream(new OutputStream(){ByteBuffer mBB;OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}public void close() {}public void flush() {}public void write(int b) {mBB.put((byte) b);}public void write(byte[] b) {mBB.put(b);}public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}}.init(outbb)));String serializedstring = Base64.getEncoder().encodeToString(outbb.array());value.set(new Object[] {serializedstring});context.write(key, value);}}public static class BitReducer extends ReducerBase {private Record result = null;public void setup(TaskContext context) throws IOException {result = context.createOutputRecord();}public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {long fcount = 0;RoaringBitmap rbm=new RoaringBitmap();while (values.hasNext()){Record val = values.next();ByteBuffer newbb = ByteBuffer.wrap(Base64.getDecoder().decode((String)val.get(0)));ImmutableRoaringBitmap irb = new ImmutableRoaringBitmap(newbb);RoaringBitmap p= new RoaringBitmap(irb);rbm.or(p);}ByteBuffer outbb = ByteBuffer.allocate(rbm.serializedSizeInBytes());rbm.serialize(new DataOutputStream(new OutputStream(){ByteBuffer mBB;OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}public void close() {}public void flush() {}public void write(int b) {mBB.put((byte) b);}public void write(byte[] b) {mBB.put(b);}public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}}.init(outbb)));String serializedstring = Base64.getEncoder().encodeToString(outbb.array());result.set(0, key.get(0));result.set(1, serializedstring);context.write(result);}}public static void main( String[] args ) throws OdpsException{System.out.println("begin.........");JobConf job = new JobConf();job.setMapperClass(BitMapper.class);job.setReducerClass(BitReducer.class);job.setMapOutputKeySchema(SchemaUtils.fromString("active_date:string"));job.setMapOutputValueSchema(SchemaUtils.fromString("id:string"));InputUtils.addTable(TableInfo.builder().tableName("bitmap_source").cols(new String[] {"id","active_date"}).build(), job);
// +------------+-------------+
// | id | active_date |
// +------------+-------------+
// | 1 | 20190729 |
// | 2 | 20190729 |
// | 3 | 20190730 |
// | 4 | 20190801 |
// | 5 | 20190801 |
// +------------+-------------+OutputUtils.addTable(TableInfo.builder().tableName("bitmap_target").build(), job);
// +-------------+------------+
// | active_date | bit_map |
// +-------------+------------+
// 20190729,OjAAAAEAAAAAAAEAEAAAAAEAAgA=3D
// 20190730,OjAAAAEAAAAAAAAAEAAAAAMA
// 20190801,OjAAAAEAAAAAAAEAEAAAAAQABQA=3DJobClient.runJob(job);}
}
對Java應(yīng)用打包后,上傳到MaxCompute項(xiàng)目中,即可在MaxCompute中調(diào)用該MR作業(yè),對輸入表的數(shù)據(jù)按日期作為key進(jìn)行用戶id的編碼,同時(shí)按照相同日期對bitmap后的用戶id取OR操作(根據(jù)需要可以取AND,例如存留場景),并將處理后的數(shù)據(jù)寫入目標(biāo)結(jié)構(gòu)表當(dāng)中供后續(xù)處理使用。
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的在MaxCompute中利用bitmap进行数据处理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。