本文共 30956 字,大约阅读时间需要 103 分钟。
目的:用户的购买行为看起来是没有规律可循的,但其实从时间有序的角度看,也许是有规律可循的,例如,用户可能每一个月发工资时购买得多,每年某个时间(双十一、生日)等购买得比较多马尔科夫模型能够挖掘出时间上的规律,假设我们能够根据用户上一次购买记录推测其下一次购买时间,就可以在推测时间向其发送邮件进行营销至于营销的商品内容,可以根据其他推荐算法的结果。
输入:, , , ...ZSY40NYPS6,1381872876,2013-01-01,110...ZSY40NYPS6,1381872920,2013-01-11,32...ZSY40NYPS6,1381873821,2013-03-04,111...ZSY40NYPS6,1381874034,2013-04-09,65...第一步:生成 , , , , , , ...其中,purchaseDate1<=purchaseDate2<=purchaseDate3....需要对每个用户的交易日期排序,...ZSY40NYPS6,2013-01-01,110,2013-01-11,32,2013-03-04,111,2013-04-09,65...
package markov.step1;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;/** * * CompositeKey: represents a pair of * (String customerID, long timestamp). * * * We do a primary grouping pass on the customerID field to get all * of the data of one type together, and then our "secondary sort" * during the shuffle phase uses the timestamp long member (representing * the purchase-date) to sort the pairs of PairOfLongInt so that they * arrive at the reducer partitioned and in sorted order. * * @author Mahmoud Parsian * */public class CompositeKey implements WritableComparable{ // natural key is (customerID) // composite key is a pair (customerID, timestamp) private String customerID; private long timestamp; public CompositeKey(String customerID, long timestamp) { set(customerID, timestamp); } public CompositeKey() { } public void set(String customerID, long timestamp) { this.customerID = customerID; this.timestamp = timestamp; } public String getCustomerID() { return this.customerID; } public long getTimestamp() { return this.timestamp; } @Override public void readFields(DataInput in) throws IOException { this.customerID = in.readUTF(); this.timestamp = in.readLong(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.customerID); out.writeLong(this.timestamp); } @Override public int compareTo(CompositeKey other) { if (this.customerID.compareTo(other.customerID) != 0) { return this.customerID.compareTo(other.customerID); } else if (this.timestamp != other.timestamp) { return timestamp < other.timestamp ? -1 : 1; } else { return 0; } } public static class CompositeKeyComparator extends WritableComparator { public CompositeKeyComparator() { super(CompositeKey.class); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return compareBytes(b1, s1, l1, b2, s2, l2); } } static { // register this comparator WritableComparator.define(CompositeKey.class, new CompositeKeyComparator()); } @Override public String toString() { return customerID; } }
package markov.step1;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;/** * CompositeKeyComparator * * The purpose of this class is to enable comparison of two CompositeKey(s). * * * @author Mahmoud Parsian * */public class CompositeKeyComparator extends WritableComparator { protected CompositeKeyComparator() { super(CompositeKey.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { CompositeKey key1 = (CompositeKey) w1; CompositeKey key2 = (CompositeKey) w2; int comparison = key1.getCustomerID().compareTo(key2.getCustomerID()); if (comparison == 0) { // customerID's are equal here if (key1.getTimestamp() == key2.getTimestamp()) { return 0; } else if (key1.getTimestamp() < key2.getTimestamp()) { return -1; } else { return 1; } } else { return comparison; } }}
package markov.step1;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;/** * * NaturalKeyGroupingComparator * * This class is used during Hadoop's shuffle phase to group * composite key's by the first part (natural) of their key. * The natural key is the "customerID". * * @author Mahmoud Parsian * */public class NaturalKeyGroupingComparator extends WritableComparator { protected NaturalKeyGroupingComparator() { super(CompositeKey.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { CompositeKey key1 = (CompositeKey) w1; CompositeKey key2 = (CompositeKey) w2; return key1.getCustomerID().compareTo(key2.getCustomerID()); }} package markov.step1;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.Partitioner;import edu.umd.cloud9.io.pair.PairOfLongInt;/** * NaturalKeyPartitioner * * This custom partitioner allow us to distribute how outputs from the * map stage are sent to the reducers. NaturalKeyPartitioner partitions * the data output from the map phase (SecondarySortProjectionMapper) * before it is sent through the shuffle phase. Since we want a single * reducer to recieve all projected data for a single "customerID", we * partition data output of the map phase by only the natural key component * ("customerID"). Note that (CompositeKey, PairOfLongInt) is the (key, value) * generated by mappers. * * * @author Mahmoud Parsian * */public class NaturalKeyPartitioner implements Partitioner{ @Override public int getPartition(CompositeKey key, PairOfLongInt value, int numberOfPartitions) { return (int) (hash(key.getCustomerID()) % numberOfPartitions); } @Override public void configure(JobConf jobconf) { } /** * adapted from String.hashCode() */ static long hash(String str) { return Math.abs(str.hashCode()); } /** * adapted from String.hashCode() */ static long hash2(String str) { long h = 1125899906842597L; // prime int length = str.length(); for (int i = 0; i < length; i++) { h = 31*h + str.charAt(i); } return h; } }
package markov.step1;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reporter;import org.apache.commons.lang.StringUtils;import yidongpingjun.DateUtil;import edu.umd.cloud9.io.pair.PairOfLongInt;/** * MapReduce job for projecting customer transaction data * by using MapReduce's "secondary sort" (sort by shuffle function). * Note that reducer values arrive sorted by implementing the "secondary sort" * design pattern (no data is sorted in memory). * * This class implements the map() function for "secondary sort" design pattern. * * @author Mahmoud Parsian * */public class SecondarySortProjectionMapper extends MapReduceBase implements Mapper{ // reuse Hadoop's Writable objects private final CompositeKey reducerKey = new CompositeKey(); private final PairOfLongInt reducerValue = new PairOfLongInt(); @Override public void map(LongWritable inkey, Text value, OutputCollector output, Reporter reporter) throws IOException { String[] tokens = StringUtils.split(value.toString(), ","); if (tokens.length != 4) { // not a proper format return; } // tokens[0] = customer-id // tokens[1] = transaction-id // tokens[2] = purchase-date // tokens[3] = amount long date; try { date = DateUtil.getDateAsMilliSeconds(tokens[2]); } catch(Exception e) { // date is in error, ignore the record return; } int amount = Integer.parseInt(tokens[3]); reducerValue.set(date, amount); reducerKey.set(tokens[0], date); // emit key-value pair output.collect(reducerKey, reducerValue); }} package markov.step1;import java.util.Iterator;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.JobConf;import yidongpingjun.DateUtil;import edu.umd.cloud9.io.pair.PairOfLongInt;/** * * SecondarySortProjectionReducer * * Data arrive sorted to reducer. * * MapReduce job for projecting customer transaction data * by using MapReduce's "secondary sort" (sort by shuffle * function). * Note that reducer values arrive sorted by implementing * the "secondary sort" design pattern (no data is sorted * in memory). * * This class implements the reduce() function for "secondary sort" * design pattern. * * @author Mahmoud Parsian * */public class SecondarySortProjectionReducer extends MapReduceBase implements Reducer { public void reduce(CompositeKey key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { // note that values are sorted (by using MR's secondary sort) // below, builder will generate: // CustoerID,Date1,Amount1,Date2,Amount2,...,DateN,AmountN // where Date1 <= Date2 <= ... <= DateN StringBuilder builder = new StringBuilder(); builder.append(key.toString()); while (values.hasNext()) { builder.append(","); PairOfLongInt pair = values.next(); long timestamp = pair.getLeftElement(); // date as milliseconds String date = DateUtil.getDateAsString(timestamp); builder.append(date); // date as String builder.append(","); builder.append(pair.getRightElement()); // amount } output.collect(null, new Text(builder.toString())); } // reduce} package markov.step1;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.TextInputFormat;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.JobClient;import edu.umd.cloud9.io.pair.PairOfLongInt;import org.apache.log4j.Logger;/** * MapReduce job for projecting customer transaction data * by using MapReduce's "secondary sort" (sort by shuffle function). * Note that reducer values arrive sorted by implementing the "secondary sort" * design pattern (no data is sorted in memory). * * @author Mahmoud Parsian * */public class SecondarySortProjectionDriver { private static final Logger theLogger = Logger.getLogger(SecondarySortProjectionDriver.class); public static void main(String[] args) throws Exception { args = new String[2]; args[0] = "input/smart_email_training.txt"; args[1] = "output/smart_email_training"; long startTime = System.currentTimeMillis(); Configuration conf = new Configuration(); JobConf jobconf = new JobConf(conf, SecondarySortProjectionDriver.class); jobconf.setJobName("SecondarySortProjectionDriver"); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: SecondarySortProjectionDriver
第二步:将交易序列转换为状态序列对于每个用户的交易序列,每次取两个交易:2013-01-01,110,2013-01-11,32 2013-03-04,111,2013-04-09,65根据两个交易的时间差和金额差来标定不同的状态ZSY40NYPS6,ME,SL
package markov.step2;import java.io.IOException;import java.util.Date;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reporter;import yidongpingjun.DateUtil;public class StateSequenceMapper extends MapReduceBase implements Mapper{ @Override public void map(LongWritable arg0, Text value, OutputCollector output, Reporter arg3) throws IOException { // TODO Auto-generated method stub //000UDM50M4,2013-04-27,183,2013-06-19,62,2013-06-20,29 String[] tokens = value.toString().split(","); if(tokens.length < 5) { return; } String customerID = tokens[0]; int i = 4; while(i < tokens.length) { String sequence = ""; int amount = Integer.valueOf(tokens[i]); int priorAmount = Integer.valueOf(tokens[i-2]); Date date = DateUtil.getDate(tokens[i-1]); Date priorDate =DateUtil.getDate( tokens[i-3]); long daysDiff = (date.getTime() - priorDate.getTime())/1000/60/60/24; int amountDif = amount - priorAmount; char dd; char ad;; if(daysDiff < 30) { dd = 'S'; } else if(daysDiff < 60) { dd = 'M'; } else { dd = 'L'; } if(priorAmount < 0.9 * amount) { ad = 'L'; } else if(priorAmount < 1.1 * amount) { ad = 'E'; } else { ad = 'G'; } sequence = "" + dd + ad; Text outputKey = new Text(); outputKey.set(customerID); Text outputValue = new Text(); outputValue.set(sequence); output.collect(outputKey,outputValue); i+=2; } }}
package markov.step2;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;public class StateSequenceReducer extends MapReduceBase implements Reducer{ public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { StringBuilder builder = new StringBuilder(); builder.append(key.toString()); while (values.hasNext()) { builder.append(","); builder.append(values.next().toString()); } output.collect(null, new Text(builder.toString())); }} package markov.step2;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.TextInputFormat;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.JobClient;import org.apache.log4j.Logger;public class StateSequenceDiver { private static final Logger theLogger = Logger.getLogger(StateSequenceDiver.class); public static void main(String[] args) throws Exception { args = new String[2]; args[0] = "output/smart_email_training"; args[1] = "output/smart_email_training2"; long startTime = System.currentTimeMillis(); Configuration conf = new Configuration(); JobConf jobconf = new JobConf(conf, StateSequenceDiver.class); jobconf.setJobName("StateSequenceDiver"); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: SecondarySortProjectionDriver
第三步:生成马尔科夫状态转移矩阵。根据上步统计状态转移([ME,SL],1)累加
package markov.step3;import java.io.InputStream;import java.io.OutputStream;import java.io.BufferedReader;//import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.util.LineReader;/** * This class provides convenient methods for accessing * some Input/Output methods. * * @author Mahmoud Parsian (mahmoud.parsian@yahoo.com) * */public class InputOutputUtil { public static void close(LineReader reader) { if (reader == null) { return; } // try { reader.close(); } catch (Exception ignore) { } } public static void close(OutputStream stream) { if (stream == null) { return; } // try { stream.close(); } catch (Exception ignore) { } } public static void close(InputStream stream) { if (stream == null) { return; } // try { stream.close(); } catch (Exception ignore) { } } public static void close(FSDataInputStream stream) { if (stream == null) { return; } // try { stream.close(); } catch (Exception ignore) { } } public static void close(BufferedReader reader) { if (reader == null) { return; } // try { reader.close(); } catch (Exception ignore) { } }} package markov.step3;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import edu.umd.cloud9.io.pair.PairOfStrings;import org.apache.commons.lang.StringUtils;/** * The MarkovStateTransitionModelMapper class implements MapReduce's * map() method. * * * @author Mahmoud Parsian * */public class MarkovStateTransitionModelMapper extends Mapper{ private PairOfStrings reducerKey = new PairOfStrings(); private static final IntWritable ONE = new IntWritable(1); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // value = <,> <,> <,>...<,> String[] items = StringUtils.split(value.toString(), ","); if (items.length > 2) { for (int i = 1; i < (items.length -1); i++) { reducerKey.set(items[i], items[i+1]); context.write(reducerKey, ONE); } } } } package markov.step3;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Reducer;import edu.umd.cloud9.io.pair.PairOfStrings;/** * The MarkovStateTransitionModelCombiner class implements MapReduce's * combine() method (in Hadoop, we call it reduce() method). * * This class implements the combine() function for Markov's * state transition model. * * @author Mahmoud Parsian * */public class MarkovStateTransitionModelCombiner extends Reducer { protected void reduce(PairOfStrings key, Iterable values, Context context) throws IOException, InterruptedException { int partialSum = 0; for (IntWritable value : values) { partialSum += value.get(); } context.write(key, new IntWritable(partialSum)); } } package markov.step3;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import edu.umd.cloud9.io.pair.PairOfStrings;/** * The MarkovStateTransitionModelReducer class implements MapReduce's * reduce() method. * * * @author Mahmoud Parsian * */public class MarkovStateTransitionModelReducer extends Reducer { protected void reduce(PairOfStrings key, Iterable values, Context context) throws IOException, InterruptedException { int finalCount = 0; for (IntWritable value : values) { finalCount += value.get(); } String fromState = key.getLeftElement(); String toState = key.getRightElement(); String outputkey = fromState + "," + toState; context.write(new Text(outputkey), new IntWritable(finalCount)); } } package markov.step3;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import edu.umd.cloud9.io.pair.PairOfStrings;/** * Markov state transition probability matrix Driver * * * @author Mahmoud Parsian * */public class MarkovStateTransitionModelDriver extends Configured implements Tool { @Override public int run(String[] args) throws Exception { args = new String[2]; args[0] = "output/smart_email_training2"; args[1] = "output/smart_email_training3"; @SuppressWarnings("deprecation") Job job = new Job(getConf()); job.setJobName("MarkovStateTransitionModelDriver"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MarkovStateTransitionModelMapper.class); job.setReducerClass(MarkovStateTransitionModelReducer.class); job.setCombinerClass(MarkovStateTransitionModelCombiner.class); // PairOfStrings = (fromState, toState) job.setMapOutputKeyClass(PairOfStrings.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); int status = job.waitForCompletion(true) ? 0 : 1; return status; } public static void main(String[] args) throws Exception { int statusCode = ToolRunner.run(new MarkovStateTransitionModelDriver(), args); System.exit(statusCode); }}
第四步:根据马尔科夫状态转移矩阵得到转移概率表
package markov.step3;/** * TableItem represents an item of a Markov State Transition Model * as a Tuple3* */public class TableItem { String fromState; String toState; int count; public TableItem(String fromState, String toState, int count) { this.fromState = fromState; this.toState = toState; this.count = count; } /** * for debugging ONLY */ public String toString() { return "{"+fromState+"," +toState+","+count+"}"; }}
package markov.step3;import java.util.List;import java.util.ArrayList;import java.io.IOException;import java.io.BufferedReader;import java.io.InputStreamReader;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.log4j.Logger;/** * Class containing a number of utility methods for manipulating * Hadoop's SequenceFiles. * * * @author Mahmoud Parsian * */public class ReadDataFromHDFS { private static final Logger THE_LOGGER = Logger.getLogger(ReadDataFromHDFS.class); private ReadDataFromHDFS() { } public static ListreadDirectory(String path) { return ReadDataFromHDFS.readDirectory(new Path(path)); } public static List readDirectory(Path path) { FileSystem fs; try { fs = FileSystem.get(new Configuration()); } catch (IOException e) { THE_LOGGER.error("Unable to access the hadoop file system!", e); throw new RuntimeException("Unable to access the hadoop file system!"); } List list = new ArrayList (); try { FileStatus[] stat = fs.listStatus(path); for (int i = 0; i < stat.length; ++i) { if (stat[i].getPath().getName().startsWith("part")) { List pairs = readFile(stat[i].getPath(), fs); list.addAll(pairs); } } } catch (IOException e) { THE_LOGGER.error("Unable to access the hadoop file system!", e); throw new RuntimeException("Error reading the hadoop file system!"); } return list; } @SuppressWarnings("unchecked") public static List readFile(Path path, FileSystem fs) { THE_LOGGER.info("path="+path); List list = new ArrayList (); FSDataInputStream stream = null; BufferedReader reader = null; try { stream = fs.open(path); reader = new BufferedReader(new InputStreamReader(stream)); String line; while ((line = reader.readLine()) != null) { // line = <,> THE_LOGGER.info("line="+line); String[] tokens = line.split("\t"); // TAB separator if (tokens.length == 2) { String states = tokens[0]; int count = Integer.parseInt(tokens[1]); String[] twoStates = states.split(","); TableItem item = new TableItem(twoStates[0], twoStates[1], count); list.add(item); } } } catch (IOException e) { THE_LOGGER.error("readFileIntoCoxRegressionItem() failed!", e); throw new RuntimeException("readFileIntoCoxRegressionItem() failed!"); } finally { InputOutputUtil.close(reader); InputOutputUtil.close(stream); } return list; } public static void main(String[] args) throws Exception { String path = args[0]; List list = readDirectory(path); THE_LOGGER.info("list="+list.toString()); } } package markov.step3;import java.util.Map;import java.util.List;import java.util.HashMap;/** * Markov state transition probability matrix builder * */public class StateTransitionTableBuilder { // // model.states=SL,SE,SG,ML,ME,MG,LL,LE,LG // // states : key is the state and value is row/column in table // private Map states = null; private double[][] table = null; private int numberOfStates; private int scale = 100; private void initStates(){ states = new HashMap (); states.put("SL", 0); states.put("SE", 1); states.put("SG", 2); states.put("ML", 3); states.put("ME", 4); states.put("MG", 5); states.put("LL", 6); states.put("LE", 7); states.put("LG", 8); } public StateTransitionTableBuilder(int numberOfStates) { this.numberOfStates = numberOfStates; table = new double[numberOfStates][numberOfStates]; initStates(); } public StateTransitionTableBuilder(int numberOfStates, int scale) { this(numberOfStates); this.scale = scale; } public void add(String fromState, String toState, int count) { int row = states.get(fromState); int column = states.get(toState); table[row][column] = count; } public void normalizeRows() { // Laplace correction: the usual solution is to do a // Laplacian correction by upping all the counts by 1 // see: http://cs.nyu.edu/faculty/davise/ai/bayesText.html for (int r = 0; r < numberOfStates; r++) { boolean gotZeroCount = false; for (int c = 0; c < numberOfStates; c++) { if(table[r][c] == 0) { gotZeroCount = true; break; } } if (gotZeroCount) { for (int c = 0; c < numberOfStates; c++) { table[r][c] += 1; } } } //normalize for (int r = 0; r < numberOfStates; r++) { double rowSum = getRowSum(r); for (int c = 0; c < numberOfStates; c++) { table[r][c] = table[r][c] / rowSum; } } } public double getRowSum(int rowNumber) { double sum = 0.0; for (int column = 0; column < numberOfStates; column++) { sum += table[rowNumber][column]; } return sum; } public String serializeRow(int rowNumber) { StringBuilder builder = new StringBuilder(); for (int column = 0; column < numberOfStates; column++) { double element = table[rowNumber][column]; builder.append(String.format("%.4g", element)); if (column < (numberOfStates-1)) { builder.append(","); } } return builder.toString(); } public void persistTable() { for (int row = 0; row < numberOfStates; row++) { String serializedRow = serializeRow(row); System.out.println(serializedRow); } } public static void generateStateTransitionTable(String hdfsDirectory) { List list = ReadDataFromHDFS.readDirectory(hdfsDirectory); StateTransitionTableBuilder tableBuilder = new StateTransitionTableBuilder(9); for (TableItem item : list) { tableBuilder.add(item.fromState, item.toState, item.count); } tableBuilder.normalizeRows(); tableBuilder.persistTable(); } public static void main(String[] args) { String hdfsDirectory = "output/smart_email_training3"; generateStateTransitionTable(hdfsDirectory); } } 运行结果:
0.05033,0.008262,0.7487,0.1432,0.0003689,0.01423,0.03306,8.889e-05,0.0017910.4791,0.01265,0.4071,0.07468,0.0002040,0.009386,0.01612,0.0002040,0.00061210.6671,0.008839,0.1261,0.1463,0.0009289,0.01387,0.03505,0.0002426,0.0015550.04773,0.0004718,0.7681,0.01487,0.0001862,0.1385,0.02863,1.242e-05,0.0014900.6215,0.002151,0.2925,0.01075,0.006452,0.05161,0.008602,0.002151,0.0043010.1072,0.002772,0.7044,0.1364,0.0003616,0.01374,0.03247,8.036e-05,0.0026120.06196,0.0004748,0.7678,0.02008,0.0001424,0.1262,0.003988,4.748e-05,0.019370.5036,0.007299,0.3431,0.04380,0.007299,0.05839,0.007299,0.007299,0.021900.1834,0.001920,0.6313,0.02544,0.0004801,0.1167,0.03889,0.0009602,0.0009602
第五步:根据马尔科夫模型预测下一个智能邮件营销日期
转载地址:http://mkqrb.baihongyu.com/