User:Lindenb/Notebook/UMR915/20110615

From OpenWetWare
Jump to navigationJump to search

20110614        Top        20110616       


hadoop

(started on User:Lindenb/Notebook/UMR915/20110610 }

trying to change /etc/hosts: https://twitter.com/#!/fredebibs/status/79170165312454656

then

sudo /etc/rc.d/init.d/network restart

success !

[lindenb@srv-clc-04 hadoop-0.20.203.0]$  bin/hadoop namenode -format
11/06/15 08:32:13 INFO namenode.NameNode: STARTUP_MSG: 
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = srv-clc-04.u915.irt.univ-nantes.prive3/127.0.0.1
STARTUP_MSG:   args = [-format]
STARTUP_MSG:   version = 0.20.203.0
STARTUP_MSG:   build = http://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-203 -r 1099333; compiled by 'oom' on Wed May  4 07:57:50 PDT 2011
************************************************************/
11/06/15 08:32:13 INFO util.GSet: VM type       = 64-bit
11/06/15 08:32:13 INFO util.GSet: 2% max memory = 19.1675 MB
11/06/15 08:32:13 INFO util.GSet: capacity      = 2^21 = 2097152 entries
11/06/15 08:32:13 INFO util.GSet: recommended=2097152, actual=2097152
11/06/15 08:32:13 INFO namenode.FSNamesystem: fsOwner=lindenb
11/06/15 08:32:13 INFO namenode.FSNamesystem: supergroup=supergroup
11/06/15 08:32:13 INFO namenode.FSNamesystem: isPermissionEnabled=true
11/06/15 08:32:13 INFO namenode.FSNamesystem: dfs.block.invalidate.limit=100
11/06/15 08:32:13 INFO namenode.FSNamesystem: isAccessTokenEnabled=false accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s)
11/06/15 08:32:13 INFO namenode.NameNode: Caching file names occuring more than 10 times 
11/06/15 08:32:13 INFO common.Storage: Image file of size 113 saved in 0 seconds.
11/06/15 08:32:14 INFO common.Storage: Storage directory /home/lindenb/tmp/HADOOP/dfs/name has been successfully formatted.
11/06/15 08:32:14 INFO namenode.NameNode: SHUTDOWN_MSG: 
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at srv-clc-04.u915.irt.univ-nantes.prive3/127.0.0.1

start the server

[lindenb@srv-clc-04 hadoop-0.20.203.0]$ ./bin/start-all.sh 
namenode running as process 1151. Stop it first.
localhost: starting datanode, logging to /home/lindenb/package/hadoop-0.20.203.0/bin/../logs/hadoop-lindenb-datanode-srv-clc-04.u915.irt.univ-nantes.prive3.out
localhost: secondarynamenode running as process 1490. Stop it first.
jobtracker running as process 1588. Stop it first.
localhost: starting tasktracker, logging to /home/lindenb/package/hadoop-0.20.203.0/bin/../logs/hadoop-lindenb-tasktracker-srv-clc-04.u915.irt.univ-nantes.prive3.out

copy cDINA data

bin/hadoop fs  -mkdir myfolder
fs -copyFromLocal ~/Axiom_GW_Hu_SNP.r2.na31.annot.csv myfolder/


bin/hadoop fs -tail /user/lindenb/myfolder/Axiom_GW_Hu_SNP.r2.na31.annot.csv
s.135270 // CRMP1 // 1400 // collapsin response mediator protein 1 /// NM_001099433 // downstream // 100570 // Hs.479066 // JAKMIP1 // 152789 // janus kinase and microtubule interacting protein 1","10.7510808767857 // D4S2925 // D4S2366 // --- // --- /// 11.4380209151363 // D4S2285 // D4S431 // UT5936 // AFM262VG9 /// 10.964419629995 // --- // D4S2366 // 58988 // ---","D4S1059 // upstream // 121041 /// D4S2509E // downstream // 104845","0.108333333 // 0.891666667 // Caucasian /// 0.055555556 // 0.944444444 // Han Chinese /// 0.011111111 // 0.988888889 // Japanese /// 0.041666667 // 0.958333333 // Yoruban","0.183333333 // Caucasian /// 0.066666667 // Han Chinese /// 0.022222222 // Japanese /// 0.083333333 // Yoruban","60.0 // Caucasian /// 45.0 // Han Chinese /// 45.0 // Japanese /// 60.0 // Yoruban","YES","same","1","0","T // Caucasian /// T // Han Chinese /// T // Japanese /// T // Yoruban","0.108333333 // Caucasian /// 0.055555556 // Han Chinese /// 0.011111111 // Japanese /// 0.041666667 // Yoruban","---"


#serveur1: scp  /CGH/data/users/cdina/DATAS/CEDRIC/Axiom2/resultat/AxiomGT1.calls.txt lindenb@serveur4:
bin/hadoop fs -copyFromLocal ~/AxiomGT1.calls.txt myfolder/


$ ls -lah ~/Axiom*
-rw-r--r-- 1 lindenb lindenb 111M jun 15 09:03 /home/lindenb/AxiomGT1.calls.txt
-rw-r--r-- 1 lindenb lindenb 766M jun 10 12:35 /home/lindenb/Axiom_GW_Hu_SNP.r2.na31.annot.csv

du -h ~/tmp/HADOOP | tail -1
884M	/home/lindenb/tmp/HADOOP

Files

Axiom_GW_Hu_SNP.r2.na31.annot.csv

"Probe Set ID","dbSNP RS ID","Chromosome","Physical Position","Strand","ChrX pseudo-autosomal region 1","Cytoband","Flank","Allele A","Allele B","Associated Gene","Genetic Map","Microsatellite","Allele Frequencies","Heterozygous Allele Frequencies","Number of individuals/Number of chromosomes","In Hapmap","Strand Versus dbSNP","Probe Count","ChrX pseudo-autosomal region 2","Minor Allele","Minor Allele Frequency","OMIM"
"AX-11086612","rs10001348","4","29912308","+","0","p15.1","gtattcagttgaacacaaatcagtgcatgt[A/G]","A","G","ENST00000467087 // downstream // 2885305 // Hs.724550 // STIM2 // 57620 // stromal interaction molecule 2 /// ENST00000361762 // upstream // 809728 // Hs.479439 // PCDH7 // 5099 // protocadherin 7 /// NM_001169117 // downstream // 2885305 // Hs.724550 // STIM2 // 57620 // stromal interaction molecule 2 /// NM_032456 // upstream // 809728 // Hs.724529 // PCDH7 // 5099 // protocadherin 7","50.0229786923511 // D4S418 // D4S2408 // --- // --- /// 44.2128449948474 // D4S2397 // D4S2430 // ATA27C07 // GCT6F03 /// 42.4637111703432 // --- // --- // 226002 // 46437","D4S333 // upstream // 47955 /// D4S605 // downstream // 97312","0.872881356 // 0.127118644 // Caucasian /// 0.833333333 // 0.166666667 // Han Chinese /// 0.777777778 // 0.222222222 // Japanese /// 0.775 // 0.225 // Yoruban","0.254237288 // Caucasian /// 0.288888889 // Han Chinese /// 0.355555556 // Japanese /// 0.35 // Yoruban","60.0 // Caucasian /// 45.0 // Han Chinese /// 45.0 // Japanese /// 60.0 // Yoruban","YES","same","1","0","G // Caucasian /// G // Han Chinese /// G // Japanese /// G // Yoruban","0.127118644 // Caucasian /// 0.166666667 // Han Chinese /// 0.222222222 // Japanese /// 0.225 // Yoruban","---"
"AX-11086611","rs10001340","4","130341127","+","0","q28.2","[A/C]agggcattcatctcagcttactatttgggaaaaat","A","C","ENST00000281146 // downstream // 306645 // Hs.567679 // C4orf33 // 132321 // chromosome 4 open reading frame 33 /// ENST00000394248 // upstream // 3729342 // Hs.192859 // PCDH10 // 57575 // protocadherin 10 /// NM_173487 // downstream // 307285 // Hs.567679 // C4orf33 // 132321 // chromosome 4 open reading frame 33 /// NM_020815 // upstream // 3729342 // Hs.192859 // PCDH10 // 57575 // protocadherin 10","127.864057946266 // D4S1615 // D4S2365 // --- // --- /// 129.756132396152 // D4S2938 // D4S2365 // AFMA284WG5 // GATA10A12 /// 124.03426335901 // D4S2394 // --- // --- // 55218","D4S3198 // upstream // 331274 /// D4S2394 // downstream // 43310","0.815789474 // 0.184210526 // Caucasian /// 1.0 // 0.0 // Han Chinese /// 1.0 // 0.0 // Japanese /// 0.816666667 // 0.183333333 // Yoruban","0.368421053 // Caucasian /// 0.0 // Han Chinese /// 0.0 // Japanese /// 0.266666667 // Yoruban","60.0 // Caucasian /// 45.0 // Han Chinese /// 45.0 // Japanese /// 60.0 // Yoruban","YES","same","1","0","C // Caucasian /// C // Han Chinese /// C // Japanese /// C // Yoruban","0.184210526 // Caucasian /// 0.0 // Han Chinese /// 0.0 // Japanese /// 0.183333333 // Yoruban","---"
"AX-11086610","rs10001337","4","54351529","+","0","q12","atgaggagtagccacatgatctaagcacct[C/T]","T","C","ENST00000306888 // --- // 0 // Hs.518760 // LNX1 // 84708 // ligand of numb-protein X 1 /// ENST00000263925 // --- // 0 // Hs.518760 // LNX1 // 84708 // ligand of numb-protein X 1 /// NM_001126328 // intron // 0 // Hs.518760 // LNX1 // 84708 // ligand of numb-protein X 1 /// NM_032622 // intron // 0 // Hs.518760 // LNX1 // 84708 // ligand of numb-protein X 1","67.4182086016315 // D4S2971 // D4S1594 // --- // --- /// 62.2091955728879 // D4S2971 // UNKNOWN // AFMB312YG1 // GATA61B02 /// 61.6059658777947 // --- // GATA61B02 // 149925 // ---","D4S461 // upstream // 151923 /// D4S2583E // downstream // 24481","0.118644068 // 0.881355932 // Caucasian /// 0.111111111 // 0.888888889 // Han Chinese /// 0.122222222 // 0.877777778 // Japanese /// 0.025 // 0.975 // Yoruban","0.203389831 // Caucasian /// 0.133333333 // Han Chinese /// 0.244444444 // Japanese /// 0.05 // Yoruban","60.0 // Caucasian /// 45.0 // Han Chinese /// 45.0 // Japanese /// 60.0 // Yoruban","YES","same","1","0","T // Caucasian /// T // Han Chinese /// T // Japanese /// T // Yoruban","0.118644068 // Caucasian /// 0.111111111 // Han Chinese /// 0.122222222 // Japanese /// 0.025 // Yoruban","---"


Calls

probeset_id	A05.CEL	A06.CEL	A03.CEL	A04.CEL	A01.CEL	A10.CEL	A02.CEL	A11.CEL	A12.CEL	A09.CEL	A07.CEL	A08.CEL	B01.CEL	B02.CEL	B06.CEL	B03.CEL	B04.CEL	B05.CEL	B07.CEL	B08.CEL	B09.CEL	B10.CEL	C02.CEL	B11.CEL	B12.CEL	C01.CEL	C03.CEL	C04.CEL	C05.CEL	C06.CEL	C07.CEL	C08.CEL	C09.CEL	C10.CEL	C11.CEL	C12.CEL	D01.CEL	D02.CEL	D03.CEL	D04.CEL	D05.CEL	D06.CEL	D07.CEL	D08.CEL	D09.CEL	D10.CEL	D11.CEL	D12.CEL	E01.CEL	E02.CEL	E03.CEL	E04.CEL	E05.CEL	E06.CEL	E07.CEL	E08.CEL	E09.CEL	E10.CEL	E11.CEL	E12.CEL	F01.CEL	F02.CEL	F03.CEL	F04.CEL	F05.CEL	F06.CEL	F07.CEL	F08.CEL	H08.CEL	H07.CEL	G07.CEL	G08.CEL	H03.CEL	H04.CEL	H06.CEL	H05.CEL	G09.CEL	G10.CEL	H01.CEL	H02.CEL	G11.CEL	G12.CEL	G03.CEL	G04.CEL	H10.CEL	H09.CEL	F10.CEL	F09.CEL	G01.CEL	G02.CEL	F11.CEL	F12.CEL	G05.CEL	G06.CEL	H11.CEL	H12.CEL
AX-11086525	0	0	0	0	0	0	0	1	0	0	0	0	0	0	0	0	0	0	0	1	1	0	0	1	0	0	0	0	0	0	0	0	0	0	0	0	0	1	0	0	1	0	1	0	0	0	0	0	0	0	0	0	0	1	0	0	0	0	1	0	1	0	1	0	0	1	1	2	1	1	0	0	0	0	1	0	0	1	0	0	1	1	0	1	0	1	1	2	0	1	0	0	1	2	0	2
AX-11086526	2	1	2	2	2	2	2	2	2	2	2	2	2	2	2	2	2	2	2	2	1	2	2	2	2	2	2	2	2	2	2	2	1	2	2	0	2	2	2	1	1	1	2	2	2	2	2	2	2	2	2	1	1	1	2	1	2	1	2	2	0	2	1	2	2	1	2	2	2	2	1	2	2	2	2	2	1	2	1	1	2	2	1	2	2	2	2	2	2	2	2	2	1	2	2	2
AX-11086527	2	0	2	2	2	1	2	2	2	2	2	2	2	2	2	2	2	2	2	2	1	2	2	2	2	2	2	2	2	1	2	2	1	1	0	2	1	2	1	2	2	1	1	1	1	2	2	1	0	2	1	2	1	2	2	2	1	2	1	1	1	1	2	2	2	2	2	1	1	0	2	0	1	0	1	0	1	2	1	1	0	1	1	0	2	1	1	1	0	1	1	1	2	2	1	1

Join

AX-11086525	4	165621955	rs10000041	T T	T T	T T	T T	T T	T T	T T	T G	T T	T T	T T	T T	T T	T T	T T	T T	T T	T T	T T	T G	T G	T T	T T	T G	T T	T T	T T	T T	T T	T T	T T	T T	T T	T T	T T	T T	T T	T G	T T	T T	T G	T T	T G	T T	T T	T T	T T	T T	T T	T T	T T	T T	T T	T G	T T	T T	T T	T T	T G	T T	T G	T T	T G	T T	T T	T G	T G	G G	T G	T G	T T	T T	T T	T T	T G	T T	T T	T G	T T	T T	T G	T G	T T	T G	T T	T G	T G	G G	T T	T G	T T	T T	T G	G G	T T	G G
AX-11086526	4	5237152	rs10000042	C C	T C	C C	C C	C C	C C	C C	C C	C C	C C	C C	C C	C C	C C	C C	C C	C C	C C	C C	C C	T C	C C	C C	C C	C C	C C	C C	C C	C C	C C	C C	C C	T C	C C	C C	T T	C C	C C	C C	T C	T C	T C	C C	C C	C C	C C	C C	C C	C C	C C	C C	T C	T C	T C	C C	T C	C C	T C	C C	C C	T T	C C	T C	C C	C C	T C	C C	C C	C C	C C	T C	C C	C C	C C	C C	C C	T C	C C	T C	T C	C C	C C	T C	C C	C C	C C	C C	C C	C C	C C	C C	C C	T C	C C	C C	C C

GeneTitan pictures


package test.hadoop;

import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.join.CompositeInputFormat;
import org.apache.hadoop.mapred.join.TupleWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

@SuppressWarnings("deprecation")
public class Hadoop01 extends Configured implements Tool
 {
 static final String SORTED_MARKERS_PATH="markers.data";
 static final String SORTED_CALLS_PATH="calls.data";
 static final String JOIN_PATH="joined.data";
 static final String CHROM_COL="chrom.col";
 static final String POS_COL="pos.col";
 static final String ALLELE_A_COL="alleleA.col";
 static final String ALLELE_B_COL="alleleB.col";
 static final String PROBE_SET_ID_COL="probeSetId.col";
 static final String RS_ID_COL="rsId.col";

 static public class Genotypes implements Writable
	{
	private byte array[];
	public Genotypes()
	 	{
		this(0);
	 	}
	public Genotypes(int n)
 		{
		this.array=new byte[n];
 		}

	
	@Override
	public void readFields(DataInput in) throws IOException
		{
		int n=in.readInt();
		this.array=new byte[n];
		in.readFully(this.array);
		}
	@Override
	public void write(DataOutput out) throws IOException 
		{
		out.writeInt(this.array.length);
		out.write(this.array);
		}
	@Override
	public String toString()
		{
		return Arrays.asList(this.array).toString();
		}
	}
 
 
static public class Position implements WritableComparable<Position>
	{
	private String chrom="N/A";
	private int position=-1;
	
	public Position()
		{
		}
	public Position(String chrom,int position)
		{
		this.chrom=chrom;
		this.position=position;
		}
	public String getChrom()
		{
		return chrom;
		}
	
	public int getPosition()
		{
		return position;
		}
	
	@Override
	public void readFields(DataInput in) throws IOException {
		this.chrom=in.readUTF();
		this.position=in.readInt();
		}
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(chrom);
		out.writeInt(position);
		}
	@Override
	public int compareTo(Position o) {
		int i=chrom.compareTo(o.chrom);
		if(i!=0) return i;
		return position - o.position;
		}
	@Override
	public String toString() {
		return chrom+":"+position;
		}
	}	
	
static public class Marker implements WritableComparable<Marker>
	{
	private Position position=new Position();
	private String alleleA;
	private String alleleB;
	private String probeSetId;
	private String rsid;
	public Marker()
		{
		}
	
	public Position getPosition() {
		return position;
	}
	
	public String getAlleleA() {
		return alleleA;
	}
	
	public String getAlleleB() {
		return alleleB;
	}
	
	public String getProbeSetId() {
		return probeSetId;
	}
	
	public String getRsid() {
		return rsid;
	}
	
	@Override
	public void readFields(DataInput in) throws IOException {
		this.position.readFields(in);
		this.alleleA=in.readUTF();
		this.alleleB=in.readUTF();
		this.probeSetId=in.readUTF();
		this.rsid=in.readUTF();
		}
	@Override
	public void write(DataOutput out) throws IOException {
		this.position.write(out);
		out.writeUTF(this.alleleA);
		out.writeUTF(this.alleleB);
		out.writeUTF(this.probeSetId);
		out.writeUTF(this.rsid);
		}
	@Override
	public int compareTo(Marker o) {
		return position.compareTo(o.position);
		}
	@Override
	public String toString() {
		return this.probeSetId+" "+position;
		}
	}


public static class SortMarkerByProbIdMapper extends Mapper<LongWritable, Text, Text, Marker>
	{
	int chromCol=-1;
	int posCol=-1;
	int alleleACol=-1;
	int alleleBCol=-1;
	int probeSetIdCol=-1;
	int rsIdCol=-1;
	protected void setup(Context context) throws IOException ,InterruptedException
		{
		this.chromCol= context.getConfiguration().getInt(CHROM_COL,1);
		this.posCol= context.getConfiguration().getInt(POS_COL,1);
		this.alleleACol= context.getConfiguration().getInt(ALLELE_A_COL,1);
		this.alleleBCol= context.getConfiguration().getInt(ALLELE_B_COL,1);
		this.probeSetIdCol= context.getConfiguration().getInt(PROBE_SET_ID_COL,1);
		this.rsIdCol= context.getConfiguration().getInt(RS_ID_COL,1);
		super.setup(context);
		}
	@Override
	protected void map(
		LongWritable key,
		Text value,
		Context context)
		throws java.io.IOException ,InterruptedException
		{
		//ignore header and comment
		if( value.find("\"Probe Set ID\"")==0 ||
			value.find("#")==0 )
			{
			return;
			}
		
		List<String> array=splitCSV(new String(value.getBytes(),0,value.getLength()));
		if(array.get(this.chromCol).equals("---")) return;//undefined chromosome
		
		Marker m=new Marker();
		m.position=new Position(
			array.get(this.chromCol),
			Integer.parseInt(array.get(this.posCol))
			);
		m.alleleA=array.get(this.alleleACol);
		m.alleleB=array.get(this.alleleBCol);
		m.probeSetId=array.get(this.probeSetIdCol);
		m.rsid=array.get(this.rsIdCol);
		//System.err.println("Map.KEY"+key+" \nMap.VALUE:"+value);
		context.write(new Text(m.probeSetId),m);
		}
	}

public static class SortMarkerByProbIdReducer extends Reducer<Text, Marker, Text, Marker>
	{
	@Override
	protected void reduce(
			Text key,
			Iterable<Marker> values,
			Context context
			) throws java.io.IOException ,InterruptedException
		{
		Marker marker=null;
		for(Marker i:values)
			{
			if(marker!=null) throw new IOException("Duplicate marker id "+key);
			marker=i;
			}
		if(marker==null) return;
		context.write(key,marker);
		}
	}


public static class JoinReducer extends MapReduceBase implements org.apache.hadoop.mapred.Reducer<Writable, TupleWritable, Text, Text>
	{
	@Override
	public void reduce(Writable key, Iterator<TupleWritable> values,
			OutputCollector<Text, Text> output, Reporter reporter)
			throws IOException {
		System.err.println(""+key+" "+key.getClass());
		while(values.hasNext())
			{
			TupleWritable o=values.next();
		
			Marker m=(Marker)o.get(0);
			Genotypes g=(Genotypes)o.get(1);
			StringBuilder b=new StringBuilder();
			b.append(m.getPosition().getChrom());
			b.append("\t");
			b.append(m.getPosition().getPosition());
			b.append("\t");
			b.append(m.getRsid());
			for(int i=0;i< g.array.length;++i)
				{
				b.append("\t");
				switch(g.array[i])
					{
					case -1: b.append("0 0"); break;
					case 0: b.append(m.alleleA+" "+m.alleleA); break;
					case 1: b.append(m.alleleA+" "+m.alleleB); break;
					case 2: b.append(m.alleleB+" "+m.alleleB); break;
					default:throw new IllegalArgumentException(String.valueOf(g.array[i]));
					}
				}
			output.collect(new Text(m.probeSetId), new Text(b.toString()));
			}
		}
	}

public static class SortCallByProbIdMapper extends Mapper<LongWritable, Text, Text, Genotypes>
	{
	final private Pattern tab=Pattern.compile("[\t]");
	@Override
	protected void map(
		LongWritable key,
		Text value,
		Context context)
		throws java.io.IOException ,InterruptedException
		{
		//ignore header and comment
		if( value.find("probeset_id")==0 ||
			value.find("%")==0 )
			{
			return;
			}
		
		String tokens[]=tab.split(new String(value.getBytes(),0,value.getLength()));
		Genotypes genotypes=new Genotypes(tokens.length-1);
		for(int i=1;i< tokens.length;++i)
			{
			genotypes.array[i-1]=Byte.parseByte(tokens[i]);
			}
		context.write(new Text(tokens[0]),genotypes);
		}
	}

public static class SortCallByProbIdReducer extends Reducer<Text, Genotypes, Text, Genotypes>
	{
	@Override
	protected void reduce(
			Text key,
			Iterable<Genotypes> values,
			Context context
			) throws java.io.IOException ,InterruptedException
		{
		Genotypes array=null;
		for(Genotypes i:values)
			{
			if(array!=null) throw new IOException("Duplicate marker id "+key);
			array=i;
			}
		if(array==null) return;
		context.write(key,array);
		}
}


	private static List<String> splitCSV(String line)
		{
		List<String> L=new ArrayList<String>();
		int i=0;
		while(i< line.length())
			{
			if(line.charAt(i)!='\"') throw new IllegalArgumentException("expected quote");
			StringBuilder token=new StringBuilder();
			++i;
			while(i < line.length() && line.charAt(i)!='\"')
				{
				if(line.charAt(i)=='\"')
					{
					break;
					}
				else if(line.charAt(i)=='\\' && i+1< line.length())
					{
					++i;
					switch(line.charAt(i))
						{
						case 'n': token.append("\n");break;
						case 't': token.append("\t");break;
						case '\'': token.append("\'");break;
						case '\"': token.append("\"");break;
						case '\\': token.append("\\");break;
						default:throw new IllegalArgumentException("bad escape character in "+line);
						}
					}
				else
					{
					token.append(line.charAt(i));
					}
				++i;
				}
			if(line.charAt(i)!='\"') throw new IllegalArgumentException("expected quote after "+token);
			L.add(token.toString());
			
			++i;
			if(i==line.length()) break;
			if(line.charAt(i)!=',') throw new IllegalArgumentException("comma missing in "+line);
			++i;
			}
		return L;
		}

	
	@Override
	public int run(String[] args) throws Exception
		{
		Configuration conf = getConf();
		conf.addResource(new Path("/home/lindenb/package/hadoop-0.20.203.0/conf/core-site.xml"));
		sortMarkersByProbeid(conf);
		sortCallsByProbeid(conf);
		join(conf);
		return 0;
		}
	
	private void sortMarkersByProbeid(Configuration conf) throws Exception
		{
		//JobConf jobConf=new JobConf(conf);
		
		FileSystem fs=FileSystem.get(conf);
		final Path outPath=new Path(SORTED_MARKERS_PATH);
		final Path inPath=new Path("myfolder/Axiom_GW_Hu_SNP.r2.na31.annot.csv");
		
		List<String> header=null;
        BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(inPath)));
        String line;
        while((line=br.readLine())!=null)
        	{
        	if(line.startsWith("\"Probe Set ID\""))
        		{
        		header=splitCSV(line);
        		break;
        		}
        	}
        br.close();
        if(header==null || header.isEmpty()) throw new IOException("no header in "+inPath);
       
        
        for(int i=0;i<header.size();++i)
        	{
        	if(header.get(i).equals("Chromosome"))
        		{
        		conf.setInt(CHROM_COL, i);
        		}
        	else if(header.get(i).equals("Physical Position"))
	    		{
        		conf.setInt(POS_COL, i);
	    		}
        	else if(header.get(i).equals("Allele A"))
	    		{
        		conf.setInt(ALLELE_A_COL, i);
	    		}
        	else if(header.get(i).equals("Allele B"))
	    		{
        		conf.setInt(ALLELE_B_COL, i);
	    		}
        	else if(header.get(i).equals("Probe Set ID"))
	    		{
        		conf.setInt(PROBE_SET_ID_COL, i);
	    		}
        	else if(header.get(i).equals("dbSNP RS ID"))
	    		{
        		conf.setInt(RS_ID_COL, i);
	    		}
        	}

		Job job = new Job(conf, Hadoop01.class.getName());
		job.setJarByClass(Hadoop01.class);
		job.setMapperClass(SortMarkerByProbIdMapper.class);
		job.setReducerClass(SortMarkerByProbIdReducer.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Marker.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Marker.class);
		job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
		
		
		if(fs.exists(outPath))
			{
			fs.delete(outPath, true);
			}
		
		FileInputFormat.addInputPath(job, inPath);
		FileOutputFormat.setOutputPath(job, outPath);
		
     
		if(!job.waitForCompletion(true) )
			{
			throw new IOException("Cannot complete job");
			}
		}
	
	
	
	
	public void sortCallsByProbeid(Configuration conf) throws Exception
		{
		//JobConf jobConf=new JobConf(conf);
		
		FileSystem fs=FileSystem.get(conf);
		final Path outPath=new Path(SORTED_CALLS_PATH);
		final Path inPath=new Path("myfolder/AxiomGT1.calls.txt");

	
		
		
		Job job = new Job(conf, Hadoop01.class.getName());
		job.setJarByClass(Hadoop01.class);
		job.setMapperClass(SortCallByProbIdMapper.class);
		job.setReducerClass(SortCallByProbIdReducer.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Genotypes.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Genotypes.class);
		
		job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
		
		
		
		if(fs.exists(outPath))
			{
			fs.delete(outPath, true);
			}
		
		FileInputFormat.addInputPath(job, inPath);
		FileOutputFormat.setOutputPath(job, outPath);
		
	 
		if(!job.waitForCompletion(true) )
			{
			throw new IOException("Cannot complete job");
			}
		}	
	

	private void join(Configuration conf) throws Exception
		{	
		FileSystem fs=FileSystem.get(conf);
		Path outPath=new Path(JOIN_PATH);
		if(fs.exists(outPath))
			{
			fs.delete(outPath, true);
			}
		
		final String compose=CompositeInputFormat.compose(
	            "inner",
	            SequenceFileInputFormat.class,
	            new Path(SORTED_MARKERS_PATH),
	            new Path(SORTED_CALLS_PATH)
	            );
		System.err.println(compose);
		JobConf jobConf = new JobConf(conf, getClass());
	    jobConf.setJobName("join");
	    jobConf.setInputFormat(CompositeInputFormat.class);
	    jobConf.set("mapred.join.expr",
	    		compose);
	    
	    jobConf.setMapOutputKeyClass(Text.class);
	    jobConf.setMapOutputValueClass(TupleWritable.class);
	    jobConf.setOutputValueClass(Text.class);//TupleWritable ?
	    jobConf.setOutputKeyClass(Text.class);
	    jobConf.setOutputFormat(TextOutputFormat.class);
	    jobConf.setReducerClass(JoinReducer.class);
	    
	    //jobConf.setMapOutputValueClass(Text.class);
	    org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(jobConf, outPath);
	    JobClient.runJob(jobConf);
		}
	
	/*
public void todo(Configuration conf) throws IOException
	{
	String header[]=null;
    BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(inPath)));
    String line;
    while((line=br.readLine())!=null)
    	{
    	if(line.startsWith("probeset_id\t"))
    		{
    		header=line.split("[\t]");
    		break;
    		}
    	}
    br.close();
    if(header==null) throw new IOException("no header in "+inPath);
	
    conf.setStrings("calls", header);
	} */

	
public static void main(String[] args) throws Exception
	{ 
	//http://distributed-agility.blogspot.com/2010/01/hadoop-0201-example-inverted-line-index.html
	int res = ToolRunner.run(new Configuration(), new Hadoop01(), args);
	System.exit(res);
	}
}