博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop/MapReduce 使用马尔可夫模型的智能邮件营销
阅读量:2489 次
发布时间:2019-05-11

本文共 30956 字,大约阅读时间需要 103 分钟。

目的:用户的购买行为看起来是没有规律可循的,但其实从时间有序的角度看,也许是有规律可循的,例如,用户可能每一个月发工资时购买得多,每年某个时间(双十一、生日)等购买得比较多马尔科夫模型能够挖掘出时间上的规律,假设我们能够根据用户上一次购买记录推测其下一次购买时间,就可以在推测时间向其发送邮件进行营销至于营销的商品内容,可以根据其他推荐算法的结果。
输入:
,
,
,
...ZSY40NYPS6,1381872876,2013-01-01,110...ZSY40NYPS6,1381872920,2013-01-11,32...ZSY40NYPS6,1381873821,2013-03-04,111...ZSY40NYPS6,1381874034,2013-04-09,65...第一步:生成
,
,
,
,
,
,
...其中,purchaseDate1<=purchaseDate2<=purchaseDate3....需要对每个用户的交易日期排序,...ZSY40NYPS6,2013-01-01,110,2013-01-11,32,2013-03-04,111,2013-04-09,65...
package markov.step1;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;/** *  * CompositeKey: represents a pair of  * (String customerID, long timestamp). *  *  * We do a primary grouping pass on the customerID field to get all   * of the data of one type together, and then our "secondary sort"   * during the shuffle phase uses the timestamp long member (representing   * the purchase-date) to sort the pairs of PairOfLongInt so that they * arrive at the reducer partitioned and in sorted order. *   * @author Mahmoud Parsian * */public class CompositeKey implements WritableComparable
{ // natural key is (customerID) // composite key is a pair (customerID, timestamp) private String customerID; private long timestamp; public CompositeKey(String customerID, long timestamp) { set(customerID, timestamp); } public CompositeKey() { } public void set(String customerID, long timestamp) { this.customerID = customerID; this.timestamp = timestamp; } public String getCustomerID() { return this.customerID; } public long getTimestamp() { return this.timestamp; } @Override public void readFields(DataInput in) throws IOException { this.customerID = in.readUTF(); this.timestamp = in.readLong(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.customerID); out.writeLong(this.timestamp); } @Override public int compareTo(CompositeKey other) { if (this.customerID.compareTo(other.customerID) != 0) { return this.customerID.compareTo(other.customerID); } else if (this.timestamp != other.timestamp) { return timestamp < other.timestamp ? -1 : 1; } else { return 0; } } public static class CompositeKeyComparator extends WritableComparator { public CompositeKeyComparator() { super(CompositeKey.class); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return compareBytes(b1, s1, l1, b2, s2, l2); } } static { // register this comparator WritableComparator.define(CompositeKey.class, new CompositeKeyComparator()); } @Override public String toString() { return customerID; } }
package markov.step1;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;/** * CompositeKeyComparator *  * The purpose of this class is to enable comparison of two CompositeKey(s). *  *   * @author Mahmoud Parsian * */public class CompositeKeyComparator extends WritableComparator {	protected CompositeKeyComparator() {		super(CompositeKey.class, true);	}	@Override	public int compare(WritableComparable w1, WritableComparable w2) {		CompositeKey key1 = (CompositeKey) w1;		CompositeKey key2 = (CompositeKey) w2;		int comparison = key1.getCustomerID().compareTo(key2.getCustomerID());		if (comparison == 0) {			 // customerID's are equal here		     if (key1.getTimestamp() == key2.getTimestamp()) {		     	return 0;		     }		     else if (key1.getTimestamp() < key2.getTimestamp()) {		     	return -1;		     }		     else {		     	return 1;		     }		}		else {			return comparison;		}	}}
package markov.step1;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;/** *  * NaturalKeyGroupingComparator *  * This class is used during Hadoop's shuffle phase to group  * composite key's by the first part (natural) of their key. * The natural key is the "customerID". *   * @author Mahmoud Parsian * */public class NaturalKeyGroupingComparator extends WritableComparator {	protected NaturalKeyGroupingComparator() {		super(CompositeKey.class, true);	}	@Override	public int compare(WritableComparable w1, WritableComparable w2) {		CompositeKey key1 = (CompositeKey) w1;		CompositeKey key2 = (CompositeKey) w2;		return key1.getCustomerID().compareTo(key2.getCustomerID());	}} package markov.step1;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.Partitioner;import edu.umd.cloud9.io.pair.PairOfLongInt;/** * NaturalKeyPartitioner *  * This custom partitioner allow us to distribute how outputs from the  * map stage are sent to the reducers.  NaturalKeyPartitioner partitions  * the data output from the map phase (SecondarySortProjectionMapper) * before it is sent through the shuffle phase. Since we want a single * reducer to recieve all projected data for a single "customerID", we  * partition data output of the map phase by only the natural key component  * ("customerID"). Note that (CompositeKey, PairOfLongInt) is the (key, value) * generated by mappers. *  *  * @author Mahmoud Parsian * */public class NaturalKeyPartitioner implements   Partitioner
{ @Override public int getPartition(CompositeKey key, PairOfLongInt value, int numberOfPartitions) { return (int) (hash(key.getCustomerID()) % numberOfPartitions); } @Override public void configure(JobConf jobconf) { } /** * adapted from String.hashCode() */ static long hash(String str) { return Math.abs(str.hashCode()); } /** * adapted from String.hashCode() */ static long hash2(String str) { long h = 1125899906842597L; // prime int length = str.length(); for (int i = 0; i < length; i++) { h = 31*h + str.charAt(i); } return h; } }
package markov.step1;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reporter;import org.apache.commons.lang.StringUtils;import yidongpingjun.DateUtil;import edu.umd.cloud9.io.pair.PairOfLongInt;/**  * MapReduce job for projecting customer transaction data  * by using MapReduce's "secondary sort" (sort by shuffle function).  * Note that reducer values arrive sorted by implementing the "secondary sort"  * design pattern (no data is sorted in memory).  *  * This class implements the map() function for "secondary sort" design pattern.  *   * @author Mahmoud Parsian  *  */public class SecondarySortProjectionMapper extends MapReduceBase    implements Mapper
{ // reuse Hadoop's Writable objects private final CompositeKey reducerKey = new CompositeKey(); private final PairOfLongInt reducerValue = new PairOfLongInt(); @Override public void map(LongWritable inkey, Text value, OutputCollector
output, Reporter reporter) throws IOException { String[] tokens = StringUtils.split(value.toString(), ","); if (tokens.length != 4) { // not a proper format return; } // tokens[0] = customer-id // tokens[1] = transaction-id // tokens[2] = purchase-date // tokens[3] = amount long date; try { date = DateUtil.getDateAsMilliSeconds(tokens[2]); } catch(Exception e) { // date is in error, ignore the record return; } int amount = Integer.parseInt(tokens[3]); reducerValue.set(date, amount); reducerKey.set(tokens[0], date); // emit key-value pair output.collect(reducerKey, reducerValue); }} package markov.step1;import java.util.Iterator;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.JobConf;import yidongpingjun.DateUtil;import edu.umd.cloud9.io.pair.PairOfLongInt;/** * * SecondarySortProjectionReducer * * Data arrive sorted to reducer. * * MapReduce job for projecting customer transaction data * by using MapReduce's "secondary sort" (sort by shuffle * function). * Note that reducer values arrive sorted by implementing * the "secondary sort" design pattern (no data is sorted * in memory). * * This class implements the reduce() function for "secondary sort" * design pattern. * * @author Mahmoud Parsian * */public class SecondarySortProjectionReducer extends MapReduceBase implements Reducer
{ public void reduce(CompositeKey key, Iterator
values, OutputCollector
output, Reporter reporter) throws IOException { // note that values are sorted (by using MR's secondary sort) // below, builder will generate: // CustoerID,Date1,Amount1,Date2,Amount2,...,DateN,AmountN // where Date1 <= Date2 <= ... <= DateN StringBuilder builder = new StringBuilder(); builder.append(key.toString()); while (values.hasNext()) { builder.append(","); PairOfLongInt pair = values.next(); long timestamp = pair.getLeftElement(); // date as milliseconds String date = DateUtil.getDateAsString(timestamp); builder.append(date); // date as String builder.append(","); builder.append(pair.getRightElement()); // amount } output.collect(null, new Text(builder.toString())); } // reduce} package markov.step1;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.TextInputFormat;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.JobClient;import edu.umd.cloud9.io.pair.PairOfLongInt;import org.apache.log4j.Logger;/** * MapReduce job for projecting customer transaction data * by using MapReduce's "secondary sort" (sort by shuffle function). * Note that reducer values arrive sorted by implementing the "secondary sort" * design pattern (no data is sorted in memory). * * @author Mahmoud Parsian * */public class SecondarySortProjectionDriver { private static final Logger theLogger = Logger.getLogger(SecondarySortProjectionDriver.class); public static void main(String[] args) throws Exception { args = new String[2]; args[0] = "input/smart_email_training.txt"; args[1] = "output/smart_email_training"; long startTime = System.currentTimeMillis(); Configuration conf = new Configuration(); JobConf jobconf = new JobConf(conf, SecondarySortProjectionDriver.class); jobconf.setJobName("SecondarySortProjectionDriver"); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: SecondarySortProjectionDriver
"); System.exit(1); } // add jars to distributed cache // set mapper/reducer jobconf.setMapperClass(SecondarySortProjectionMapper.class); jobconf.setReducerClass(SecondarySortProjectionReducer.class); // define mapper's output key-value jobconf.setMapOutputKeyClass(CompositeKey.class); jobconf.setMapOutputValueClass(PairOfLongInt.class); // define reducer's output key-value jobconf.setOutputKeyClass(Text.class); jobconf.setOutputValueClass(Text.class); // define I/O FileInputFormat.setInputPaths(jobconf, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(jobconf, new Path(otherArgs[1])); jobconf.setInputFormat(TextInputFormat.class); jobconf.setOutputFormat(TextOutputFormat.class); jobconf.setCompressMapOutput(true); // the following 3 setting are needed for "secondary sorting" // Partitioner decides which mapper output goes to which reducer // based on mapper output key. In general, different key is in // different group (Iterator at the reducer side). But sometimes, // we want different key in the same group. This is the time for // Output Value Grouping Comparator, which is used to group mapper // output (similar to group by condition in SQL). The Output Key // Comparator is used during sort stage for the mapper output key. jobconf.setPartitionerClass(NaturalKeyPartitioner.class); jobconf.setOutputKeyComparatorClass(CompositeKeyComparator.class); jobconf.setOutputValueGroupingComparator(NaturalKeyGroupingComparator.class); JobClient.runJob(jobconf).waitForCompletion(); long elapsedTime = System.currentTimeMillis() - startTime; theLogger.info("elapsedTime (in milliseconds): "+ elapsedTime); System.exit(0); }}
第二步:将交易序列转换为状态序列对于每个用户的交易序列,每次取两个交易:2013-01-01,110,2013-01-11,32   2013-03-04,111,2013-04-09,65根据两个交易的时间差和金额差来标定不同的状态ZSY40NYPS6,ME,SL
 
package markov.step2;import java.io.IOException;import java.util.Date;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reporter;import yidongpingjun.DateUtil;public class StateSequenceMapper extends MapReduceBase implements Mapper
{ @Override public void map(LongWritable arg0, Text value, OutputCollector
output, Reporter arg3) throws IOException { // TODO Auto-generated method stub //000UDM50M4,2013-04-27,183,2013-06-19,62,2013-06-20,29 String[] tokens = value.toString().split(","); if(tokens.length < 5) { return; } String customerID = tokens[0]; int i = 4; while(i < tokens.length) { String sequence = ""; int amount = Integer.valueOf(tokens[i]); int priorAmount = Integer.valueOf(tokens[i-2]); Date date = DateUtil.getDate(tokens[i-1]); Date priorDate =DateUtil.getDate( tokens[i-3]); long daysDiff = (date.getTime() - priorDate.getTime())/1000/60/60/24; int amountDif = amount - priorAmount; char dd; char ad;; if(daysDiff < 30) { dd = 'S'; } else if(daysDiff < 60) { dd = 'M'; } else { dd = 'L'; } if(priorAmount < 0.9 * amount) { ad = 'L'; } else if(priorAmount < 1.1 * amount) { ad = 'E'; } else { ad = 'G'; } sequence = "" + dd + ad; Text outputKey = new Text(); outputKey.set(customerID); Text outputValue = new Text(); outputValue.set(sequence); output.collect(outputKey,outputValue); i+=2; } }}
package markov.step2;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;public class StateSequenceReducer extends MapReduceBase    implements Reducer
{ public void reduce(Text key, Iterator
values, OutputCollector
output, Reporter reporter) throws IOException { StringBuilder builder = new StringBuilder(); builder.append(key.toString()); while (values.hasNext()) { builder.append(","); builder.append(values.next().toString()); } output.collect(null, new Text(builder.toString())); }} package markov.step2;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.TextInputFormat;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.JobClient;import org.apache.log4j.Logger;public class StateSequenceDiver { private static final Logger theLogger = Logger.getLogger(StateSequenceDiver.class); public static void main(String[] args) throws Exception { args = new String[2]; args[0] = "output/smart_email_training"; args[1] = "output/smart_email_training2"; long startTime = System.currentTimeMillis(); Configuration conf = new Configuration(); JobConf jobconf = new JobConf(conf, StateSequenceDiver.class); jobconf.setJobName("StateSequenceDiver"); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: SecondarySortProjectionDriver
"); System.exit(1); } // add jars to distributed cache // set mapper/reducer jobconf.setMapperClass(StateSequenceMapper.class); jobconf.setReducerClass(StateSequenceReducer.class); // define mapper's output key-value jobconf.setMapOutputKeyClass(Text.class); jobconf.setMapOutputValueClass(Text.class); // define reducer's output key-value jobconf.setOutputKeyClass(Text.class); jobconf.setOutputValueClass(Text.class); // define I/O FileInputFormat.setInputPaths(jobconf, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(jobconf, new Path(otherArgs[1])); jobconf.setInputFormat(TextInputFormat.class); jobconf.setOutputFormat(TextOutputFormat.class); jobconf.setCompressMapOutput(true); JobClient.runJob(jobconf).waitForCompletion(); long elapsedTime = System.currentTimeMillis() - startTime; theLogger.info("elapsedTime (in milliseconds): "+ elapsedTime); System.exit(0); }}
 
第三步:生成马尔科夫状态转移矩阵。根据上步统计状态转移([ME,SL],1)累加
package markov.step3;import java.io.InputStream;import java.io.OutputStream;import java.io.BufferedReader;//import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.util.LineReader;/** * This class provides convenient methods for accessing  * some Input/Output methods. * * @author Mahmoud Parsian (mahmoud.parsian@yahoo.com) * */public class InputOutputUtil {    public static void close(LineReader reader) {        if (reader == null) {            return;        }        //        try {            reader.close();        }         catch (Exception ignore) {        }    }    public static void close(OutputStream stream) {        if (stream == null) {            return;        }        //        try {            stream.close();        }         catch (Exception ignore) {        }    }    public static void close(InputStream stream) {        if (stream == null) {            return;        }        //        try {            stream.close();        }         catch (Exception ignore) {        }    }    public static void close(FSDataInputStream stream) {        if (stream == null) {            return;        }        //        try {            stream.close();        }         catch (Exception ignore) {        }    }    public static void close(BufferedReader reader) {        if (reader == null) {            return;        }        //        try {            reader.close();        }         catch (Exception ignore) {        }    }} package markov.step3;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import edu.umd.cloud9.io.pair.PairOfStrings;import org.apache.commons.lang.StringUtils;/** * The MarkovStateTransitionModelMapper class implements MapReduce's * map() method. * *  * @author Mahmoud Parsian * */public class MarkovStateTransitionModelMapper 	extends Mapper
{ private PairOfStrings reducerKey = new PairOfStrings(); private static final IntWritable ONE = new IntWritable(1); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // value =
<,>
<,>
<,>...<,>
String[] items = StringUtils.split(value.toString(), ","); if (items.length > 2) { for (int i = 1; i < (items.length -1); i++) { reducerKey.set(items[i], items[i+1]); context.write(reducerKey, ONE); } } } } package markov.step3;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Reducer;import edu.umd.cloud9.io.pair.PairOfStrings;/** * The MarkovStateTransitionModelCombiner class implements MapReduce's * combine() method (in Hadoop, we call it reduce() method). * * This class implements the combine() function for Markov's * state transition model. * * @author Mahmoud Parsian * */public class MarkovStateTransitionModelCombiner extends Reducer
{ protected void reduce(PairOfStrings key, Iterable
values, Context context) throws IOException, InterruptedException { int partialSum = 0; for (IntWritable value : values) { partialSum += value.get(); } context.write(key, new IntWritable(partialSum)); } } package markov.step3;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import edu.umd.cloud9.io.pair.PairOfStrings;/** * The MarkovStateTransitionModelReducer class implements MapReduce's * reduce() method. * * * @author Mahmoud Parsian * */public class MarkovStateTransitionModelReducer extends Reducer
{ protected void reduce(PairOfStrings key, Iterable
values, Context context) throws IOException, InterruptedException { int finalCount = 0; for (IntWritable value : values) { finalCount += value.get(); } String fromState = key.getLeftElement(); String toState = key.getRightElement(); String outputkey = fromState + "," + toState; context.write(new Text(outputkey), new IntWritable(finalCount)); } } package markov.step3;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import edu.umd.cloud9.io.pair.PairOfStrings;/** * Markov state transition probability matrix Driver * * * @author Mahmoud Parsian * */public class MarkovStateTransitionModelDriver extends Configured implements Tool { @Override public int run(String[] args) throws Exception { args = new String[2]; args[0] = "output/smart_email_training2"; args[1] = "output/smart_email_training3"; @SuppressWarnings("deprecation") Job job = new Job(getConf()); job.setJobName("MarkovStateTransitionModelDriver"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MarkovStateTransitionModelMapper.class); job.setReducerClass(MarkovStateTransitionModelReducer.class); job.setCombinerClass(MarkovStateTransitionModelCombiner.class); // PairOfStrings = (fromState, toState) job.setMapOutputKeyClass(PairOfStrings.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); int status = job.waitForCompletion(true) ? 0 : 1; return status; } public static void main(String[] args) throws Exception { int statusCode = ToolRunner.run(new MarkovStateTransitionModelDriver(), args); System.exit(statusCode); }}
 
第四步:根据马尔科夫状态转移矩阵得到转移概率表
package markov.step3;/** * TableItem represents an item of a Markov State Transition Model  * as a Tuple3
* */public class TableItem { String fromState; String toState; int count; public TableItem(String fromState, String toState, int count) { this.fromState = fromState; this.toState = toState; this.count = count; } /** * for debugging ONLY */ public String toString() { return "{"+fromState+"," +toState+","+count+"}"; }}
package markov.step3;import java.util.List;import java.util.ArrayList;import java.io.IOException;import java.io.BufferedReader;import java.io.InputStreamReader;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.log4j.Logger;/** * Class containing a number of utility methods for manipulating  * Hadoop's SequenceFiles. * * * @author Mahmoud Parsian * */public class ReadDataFromHDFS {	private static final Logger THE_LOGGER = 		Logger.getLogger(ReadDataFromHDFS.class);	private ReadDataFromHDFS() {	}		public static List
readDirectory(String path) { return ReadDataFromHDFS.readDirectory(new Path(path)); } public static List
readDirectory(Path path) { FileSystem fs; try { fs = FileSystem.get(new Configuration()); } catch (IOException e) { THE_LOGGER.error("Unable to access the hadoop file system!", e); throw new RuntimeException("Unable to access the hadoop file system!"); } List
list = new ArrayList
(); try { FileStatus[] stat = fs.listStatus(path); for (int i = 0; i < stat.length; ++i) { if (stat[i].getPath().getName().startsWith("part")) { List
pairs = readFile(stat[i].getPath(), fs); list.addAll(pairs); } } } catch (IOException e) { THE_LOGGER.error("Unable to access the hadoop file system!", e); throw new RuntimeException("Error reading the hadoop file system!"); } return list; } @SuppressWarnings("unchecked") public static List
readFile(Path path, FileSystem fs) { THE_LOGGER.info("path="+path); List
list = new ArrayList
(); FSDataInputStream stream = null; BufferedReader reader = null; try { stream = fs.open(path); reader = new BufferedReader(new InputStreamReader(stream)); String line; while ((line = reader.readLine()) != null) { // line =
<,>
THE_LOGGER.info("line="+line); String[] tokens = line.split("\t"); // TAB separator if (tokens.length == 2) { String states = tokens[0]; int count = Integer.parseInt(tokens[1]); String[] twoStates = states.split(","); TableItem item = new TableItem(twoStates[0], twoStates[1], count); list.add(item); } } } catch (IOException e) { THE_LOGGER.error("readFileIntoCoxRegressionItem() failed!", e); throw new RuntimeException("readFileIntoCoxRegressionItem() failed!"); } finally { InputOutputUtil.close(reader); InputOutputUtil.close(stream); } return list; } public static void main(String[] args) throws Exception { String path = args[0]; List
list = readDirectory(path); THE_LOGGER.info("list="+list.toString()); } } package markov.step3;import java.util.Map;import java.util.List;import java.util.HashMap;/** * Markov state transition probability matrix builder * */public class StateTransitionTableBuilder { // // model.states=SL,SE,SG,ML,ME,MG,LL,LE,LG // // states
: key is the state and value is row/column in table // private Map
states = null; private double[][] table = null; private int numberOfStates; private int scale = 100; private void initStates(){ states = new HashMap
(); states.put("SL", 0); states.put("SE", 1); states.put("SG", 2); states.put("ML", 3); states.put("ME", 4); states.put("MG", 5); states.put("LL", 6); states.put("LE", 7); states.put("LG", 8); } public StateTransitionTableBuilder(int numberOfStates) { this.numberOfStates = numberOfStates; table = new double[numberOfStates][numberOfStates]; initStates(); } public StateTransitionTableBuilder(int numberOfStates, int scale) { this(numberOfStates); this.scale = scale; } public void add(String fromState, String toState, int count) { int row = states.get(fromState); int column = states.get(toState); table[row][column] = count; } public void normalizeRows() { // Laplace correction: the usual solution is to do a // Laplacian correction by upping all the counts by 1 // see: http://cs.nyu.edu/faculty/davise/ai/bayesText.html for (int r = 0; r < numberOfStates; r++) { boolean gotZeroCount = false; for (int c = 0; c < numberOfStates; c++) { if(table[r][c] == 0) { gotZeroCount = true; break; } } if (gotZeroCount) { for (int c = 0; c < numberOfStates; c++) { table[r][c] += 1; } } } //normalize for (int r = 0; r < numberOfStates; r++) { double rowSum = getRowSum(r); for (int c = 0; c < numberOfStates; c++) { table[r][c] = table[r][c] / rowSum; } } } public double getRowSum(int rowNumber) { double sum = 0.0; for (int column = 0; column < numberOfStates; column++) { sum += table[rowNumber][column]; } return sum; } public String serializeRow(int rowNumber) { StringBuilder builder = new StringBuilder(); for (int column = 0; column < numberOfStates; column++) { double element = table[rowNumber][column]; builder.append(String.format("%.4g", element)); if (column < (numberOfStates-1)) { builder.append(","); } } return builder.toString(); } public void persistTable() { for (int row = 0; row < numberOfStates; row++) { String serializedRow = serializeRow(row); System.out.println(serializedRow); } } public static void generateStateTransitionTable(String hdfsDirectory) { List
list = ReadDataFromHDFS.readDirectory(hdfsDirectory); StateTransitionTableBuilder tableBuilder = new StateTransitionTableBuilder(9); for (TableItem item : list) { tableBuilder.add(item.fromState, item.toState, item.count); } tableBuilder.normalizeRows(); tableBuilder.persistTable(); } public static void main(String[] args) { String hdfsDirectory = "output/smart_email_training3"; generateStateTransitionTable(hdfsDirectory); } } 运行结果:
0.05033,0.008262,0.7487,0.1432,0.0003689,0.01423,0.03306,8.889e-05,0.0017910.4791,0.01265,0.4071,0.07468,0.0002040,0.009386,0.01612,0.0002040,0.00061210.6671,0.008839,0.1261,0.1463,0.0009289,0.01387,0.03505,0.0002426,0.0015550.04773,0.0004718,0.7681,0.01487,0.0001862,0.1385,0.02863,1.242e-05,0.0014900.6215,0.002151,0.2925,0.01075,0.006452,0.05161,0.008602,0.002151,0.0043010.1072,0.002772,0.7044,0.1364,0.0003616,0.01374,0.03247,8.036e-05,0.0026120.06196,0.0004748,0.7678,0.02008,0.0001424,0.1262,0.003988,4.748e-05,0.019370.5036,0.007299,0.3431,0.04380,0.007299,0.05839,0.007299,0.007299,0.021900.1834,0.001920,0.6313,0.02544,0.0004801,0.1167,0.03889,0.0009602,0.0009602
第五步:根据马尔科夫模型预测下一个智能邮件营销日期
 
 
 
 

转载地址:http://mkqrb.baihongyu.com/

你可能感兴趣的文章
JPA多条件动态查询
查看>>
JPA自定义sql
查看>>
BigDecimal正确使用了吗?
查看>>
joplin笔记
查看>>
JNDI+springmvc使用
查看>>
vue+springboot分页交互
查看>>
vue+springboot打包发布
查看>>
XSL 开发总结
查看>>
beta阶段第六次scrum meeting
查看>>
SpringBoot+MybatisPlus实现批量添加的两种方式
查看>>
vue 设计结构
查看>>
Sqlerver2005+按照ID分组取前几条
查看>>
Python的编码和解码
查看>>
docker
查看>>
停车场系统安全岛设计施工要求
查看>>
Docker实战
查看>>
asp.net core结合Gitlab-CI实现自动化部署
查看>>
RDIFramework.NET ━ .NET快速信息化系统开发框架 V2.7 版本发布
查看>>
EasyNVR H5无插件摄像机直播解决方案前端解析之:关于直播页面和视频列表页面切换的问题...
查看>>
django搭建一个小型的服务器运维网站-拿来即用的bootstrap模板
查看>>